
技术合作微信:meta5201

为啥非要搭个token中转站?
这些东西必须备齐(不踩坑版)
1. 基础环境(必选)
2. 核心依赖(必装)
pip install fastapi uvicorn requests pymysql python-dotenv loguru,3. 其他准备(可选,但建议备齐)
三、核心步骤:手把手教你搭中转站(附代码片段,可直接用)
第一步:搭建数据库(存储token和调用记录)
CREATE TABLE `ai_token` (`id` INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',`platform` VARCHAR(50) NOT NULL COMMENT 'AI平台(比如openai、wenxin、tongyi)',`token` VARCHAR(255) NOT NULL COMMENT 'AI接口token',`remaining_quota` INT NOT NULL DEFAULT 0 COMMENT '剩余额度(根据自己的需求设置,比如调用次数)',`status` TINYINT NOT NULL DEFAULT 1 COMMENT '状态:1-可用,0-不可用(比如token失效、额度耗尽)',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AI token信息表';
CREATE TABLE `ai_token_call_log` (`id` INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',`token_id` INT NOT NULL COMMENT '关联的token主键ID',`platform` VARCHAR(50) NOT NULL COMMENT 'AI平台',`request_params` TEXT COMMENT '请求参数(比如调用AI的prompt、模型等)',`response_result` TEXT COMMENT '响应结果',`call_status` TINYINT NOT NULL COMMENT '调用状态:1-成功,0-失败',`error_msg` VARCHAR(255) DEFAULT NULL COMMENT '失败原因(如果调用失败)',`call_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '调用时间',FOREIGN KEY (`token_id`) REFERENCES `ai_token` (`id`) ON DELETE CASCADE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AI token调用日志表';
第二步:配置环境变量(安全存储敏感信息)
数据库配置DB_HOST=127.0.0.1 # 数据库地址,本地的话就是127.0.0.1,远程的话填服务器IPDB_PORT=3306 # 数据库端口,默认3306DB_USER=root # 数据库用户名DB_PASSWORD=123456 # 数据库密码DB_NAME=ai_token_transfer # 数据库名,和我们刚才创建的一致# 中转站服务配置SERVER_HOST=0.0.0.0 # 服务监听地址,0.0.0.0表示允许所有IP访问SERVER_PORT=8000 # 服务端口,可自定义,比如8000、8888# 日志配置LOG_LEVEL=INFO # 日志级别,INFO就行,调试的时候可以改成DEBUG保存好.env文件,后续我们的代码会读取这个文件的配置,不用再手动修改代码里的敏感信息,非常方便。第三步:编写核心代码(中转站的核心逻辑)这一步是重点,我们分三个部分来写:数据库连接、token调度逻辑、接口编写,每一部分都有代码片段,大家可以直接复制修改。首先,创建一个main.py文件,这是我们中转站的入口文件,所有的核心逻辑都在这里。1. 导入依赖和配置from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport pymysqlfrom pymysql.cursors import DictCursorfrom dotenv import load_dotenvimport osfrom loguru import loggerimport requestsfrom datetime import datetime# 加载环境变量load_dotenv()# 初始化FastAPI应用app = FastAPI(title="AI Token中转站", description="AI token统一管理、调度服务", version="1.0.0")# 数据库连接配置DB_CONFIG = { "host": os.getenv("DB_HOST"), "port": int(os.getenv("DB_PORT")), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "db": os.getenv("DB_NAME"), "charset": "utf8mb4"}# 初始化日志logger.add("token_transfer.log", level=os.getenv("LOG_LEVEL"), rotation="1 day", retention="7 days")2. 数据库连接工具函数编写一个工具函数,用来连接数据库,避免每次操作数据库都重复写连接代码,提高复用性:def get_db_connection(): """获取数据库连接""" try: connection = pymysql.connect(**DB_CONFIG) logger.info("数据库连接成功") return connection except Exception as e: logger.error(f"数据库连接失败:{str(e)}") raise HTTPException(status_code=500, detail="数据库连接失败")
3. Token调度逻辑(核心中的核心)
def get_available_token(platform: str):"""获取可用的token:param platform: AI平台(比如openai、wenxin):return: 可用的token信息(字典)"""connection = get_db_connection()try:with connection.cursor(DictCursor) as cursor:查询该平台下,状态为可用、剩余额度>0的token,按剩余额度降序排列,优先选额度多的 sql = """ SELECT id, token, remaining_quota FROM ai_token WHERE platform = %s AND status = 1 AND remaining_quota > 0 ORDER BY remaining_quota DESC LIMIT 1 """ cursor.execute(sql, (platform,)) token_info = cursor.fetchone() if not token_info: logger.error(f"平台{platform}无可用token(要么状态不可用,要么额度耗尽)") raise HTTPException(status_code=400, detail=f"平台{platform}无可用token") return token_info finally: connection.close()
def update_token_quota(token_id: int, reduce_quota: int = 1):"""更新token的剩余额度(每次调用扣减1个额度,可自定义扣减数量):param token_id: token的主键ID:param reduce_quota: 扣减的额度,默认1"""connection = get_db_connection()try:with connection.cursor() as cursor:查询当前token的剩余额度 cursor.execute("SELECT remaining_quota FROM ai_token WHERE id = %s", (token_id,)) current_quota = cursor.fetchone()[0] if current_quota < reduce_quota: # 额度不足,将token状态改为不可用 cursor.execute("UPDATE ai_token SET status = 0 WHERE id = %s", (token_id,)) connection.commit() logger.warning(f"token_id={token_id} 额度不足,已设置为不可用") return False # 扣减额度 new_quota = current_quota - reduce_quota cursor.execute( "UPDATE ai_token SET remaining_quota = %s, update_time = %s WHERE id = %s", (new_quota, datetime.now(), token_id) ) connection.commit() logger.info(f"token_id={token_id} 额度更新成功,剩余额度:{new_quota}") return True except Exception as e: connection.rollback() logger.error(f"更新token额度失败:{str(e)}") raise HTTPException(status_code=500, detail="更新token额度失败") finally: connection.close()
def record_call_log(token_id: int, platform: str, request_params: dict, response_result: dict = None, call_status: int = 1, error_msg: str = None):"""记录token调用日志:param token_id: token的主键ID:param platform: AI平台:param request_params: 请求参数:param response_result: 响应结果:param call_status: 调用状态(1-成功,0-失败):param error_msg: 失败原因"""connection = get_db_connection()try:with connection.cursor() as cursor:sql = """INSERT INTO ai_token_call_log(token_id, platform, request_params, response_result, call_status, error_msg, call_time)VALUES (%s, %s, %s, %s, %s, %s, %s)"""cursor.execute(sql,(token_id,platform,str(request_params),转为字符串存储 str(response_result) if response_result else None, call_status, error_msg, datetime.now() ) ) connection.commit() logger.info(f"token_id={token_id} 调用日志记录成功") except Exception as e: connection.rollback() logger.error(f"记录调用日志失败:{str(e)}") finally: connection.close()
4. 编写前端对接接口
class AICallRequest(BaseModel):"""AI调用请求参数模型"""platform: strAI平台,比如"openai"、"wenxin"、"tongyi" request_params: dict # 调用AI接口的请求参数,比如prompt、model等 reduce_quota: int = 1 # 每次调用扣减的额度,默认1然后,编写接口(POST请求,路径为/api/ai/call):@app.post("/api/ai/call", summary="调用AI接口(通过中转站调度token)")def ai_call(request: AICallRequest): """ 前端调用AI接口的统一入口,中转站自动调度可用token,返回AI响应结果 """ platform = request.platform request_params = request.request_params reduce_quota = request.reduce_quota try: # 1. 获取可用token token_info = get_available_token(platform) token_id = token_info["id"] token = token_info["token"] logger.info(f"获取到可用token:token_id={token_id},platform={platform}") # 2. 调用对应AI平台的接口(这里以OpenAI为例,其他平台可类比修改) # 不同AI平台的接口地址、请求头、参数格式不同,大家根据实际情况修改 if platform == "openai": ai_url = "https://api.openai.com/v1/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}" } # 调用OpenAI接口 response = requests.post(ai_url, json=request_params, headers=headers, timeout=30) response.raise_for_status() # 若请求失败,抛出异常 ai_response = response.json() elif platform == "wenxin": # 百度文心一言的接口调用逻辑,大家根据文心的SDK或接口文档修改 ai_url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions" headers = { "Content-Type": "application/json" } request_params["access_token"] = token response = requests.post(ai_url, json=request_params, headers=headers, timeout=30) response.raise_for_status() ai_response = response.json() elif platform == "tongyi": # 阿里通义千问的接口调用逻辑,类比修改 ai_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}" } response = requests.post(ai_url, json=request_params, headers=headers, timeout=30) response.raise_for_status() ai_response = response.json() else: logger.error(f"不支持的AI平台:{platform}") raise HTTPException(status_code=400, detail=f"不支持的AI平台:{platform}") # 3. 更新token剩余额度 update_token_quota(token_id, reduce_quota) # 4. 记录调用日志(成功) record_call_log( token_id=token_id, platform=platform, request_params=request_params, response_result=ai_response, call_status=1 ) # 5. 返回AI响应结果给前端 return { "code": 200, "message": "调用成功", "data": ai_response } except requests.exceptions.RequestException as e: # 调用AI接口失败,记录日志 error_msg = f"调用AI接口失败:{str(e)}" logger.error(error_msg) # 如果获取到了token,记录失败日志 if "token_id" in locals(): record_call_log( token_id=token_id, platform=platform, request_params=request_params, call_status=0, error_msg=error_msg ) raise HTTPException(status_code=500, detail=error_msg) except Exception as e: # 其他异常 error_msg = f"系统异常:{str(e)}" logger.error(error_msg) if "token_id" in locals(): record_call_log( token_id=token_id, platform=platform, request_params=request_params, call_status=0, error_msg=error_msg ) raise HTTPException(status_code=500, detail=error_msg)
第四步:启动服务,测试接口
{"platform": "openai","request_params": {"model": "gpt-3.5-turbo","messages": [{"role": "user", "content": "你好,AI token中转站怎么搭建?"}]},"reduce_quota": 1}
第五步:部署到生产环境(可选,按需操作)
[program:ai_token_transfer]command=uvicorn main:app --host 0.0.0.0 --port 8000directory=/root/ai_token_transfermain.py所在的目录user=rootautostart=true # 开机自动启动autorestart=true # 服务崩溃自动重启redirect_stderr=truestdout_logfile=/root/ai_token_transfer/supervisor.log # 日志文件路径
这些问题一定要注意(血的教训)
中转站搭完,省下来的时间都是自己的


技术合作微信:meta5201

夜雨聆风