一、API Blueprint架构设计
1.1 API模块列表
共19个API Blueprint模块:
kb_app | ||
dialog_app | ||
document_app | ||
file_app | ||
canvas_app | ||
chunk_app | ||
conversation_app | ||
llm_app | ||
user_app | ||
tenant_app | ||
system_app | ||
search_app | ||
api_app | ||
plugin_app | ||
mcp_server_app | ||
langfuse_app | ||
evaluation_app | ||
connector_app | ||
file2document_app |
二、知识库管理API(kb_app)
2.1 创建知识库端点
源码位置:api/apps/kb_app.py:55-75
@manager.route('/create', methods=['post'])@login_required@validate_request("name")async def create():req = await get_request_json()# 第60-65行:创建知识库e, res = KnowledgebaseService.create_with_name(name = req.pop("name", None),tenant_id = current_user.id,parser_id = req.pop("parser_id", None),**req)if not e:return restry:if not KnowledgebaseService.save(**res):return get_data_error_result()return get_json_result(data={"kb_id":res["id"]})except Exception as e:return server_error_response(e)
API请求示例:
POST /kb/create{"name": "技术文档库","parser_id": "naive","description": "存储技术文档"}
API响应示例:
{"code": 0,"data": {"kb_id": "kb_abc123"},"message": "success"}
2.2 更新知识库端点
源码位置:api/apps/kb_app.py:78-184
@manager.route('/update', methods=['post'])@login_required@validate_request("kb_id", "name", "description", "parser_id")@not_allowed_parameters("id", "tenant_id", "created_by", "create_time", "update_time", "create_date", "update_date", "created_by")async def update():req = await get_request_json()# 第84-91行:参数校验if not isinstance(req["name"], str):return get_data_error_result(message="Dataset name must be string.")if req["name"].strip() == "":return get_data_error_result(message="Dataset name can't be empty.")if len(req["name"].encode("utf-8")) > DATASET_NAME_LIMIT:return get_data_error_result(message=f"Dataset name length is {len(req['name'])} which is large than {DATASET_NAME_LIMIT}")req["name"] = req["name"].strip()# 第92-105行:Infinity特殊处理if settings.DOC_ENGINE_INFINITY:parser_id = req.get("parser_id")if isinstance(parser_id, str) and parser_id.lower() == "tag":return get_json_result(code=RetCode.OPERATING_ERROR,message="The chunking method Tag has not been supported by Infinity yet.",data=False,)if "pagerank" in req and req["pagerank"] > 0:return get_json_result(code=RetCode.DATA_ERROR,message="'pagerank' can only be set when doc_engine is elasticsearch",data=False,)# 第107-112行:权限检查if not KnowledgebaseService.accessible4deletion(req["kb_id"], current_user.id):return get_json_result(data=False,message='No authorization.',code=RetCode.AUTHENTICATION_ERROR)try:# 第114-118行:所有权验证if not KnowledgebaseService.query(created_by=current_user.id, id=req["kb_id"]):return get_json_result(data=False, message='Only owner of dataset authorized for this operation.',code=RetCode.OPERATING_ERROR)e, kb = KnowledgebaseService.get_by_id(req["kb_id"])# 第123-132行:重命名文件夹if e and req["name"].lower() != kb.name.lower():FileService.filter_update([File.tenant_id == kb.tenant_id,File.source_type == FileSource.KNOWLEDGEBASE,File.type == "folder",File.name == kb.name,],{"name": req["name"]},)# 第138-142行:名称重复检查if req["name"].lower() != kb.name.lower() \and len(KnowledgebaseService.query(name=req["name"], tenant_id=current_user.id, status=StatusEnum.VALID.value)) >= 1:return get_data_error_result(message="Duplicated dataset name.")del req["kb_id"]connectors = []if "connectors" in req:connectors = req["connectors"]del req["connectors"]if not KnowledgebaseService.update_by_id(kb.id, req):return get_data_error_result()# 第152-169行:更新pagerankif kb.pagerank != req.get("pagerank", 0):if req.get("pagerank", 0) > 0:await thread_pool_exec(settings.docStoreConn.update,{"kb_id": kb.id},{PAGERANK_FLD: req["pagerank"]},search.index_name(kb.tenant_id),kb.id,)else:await thread_pool_exec(settings.docStoreConn.update,{"exists": PAGERANK_FLD},{"remove": PAGERANK_FLD},search.index_name(kb.tenant_id),kb.id,)e, kb = KnowledgebaseService.get_by_id(kb.id)if not e:return get_data_error_result(message="Database error (Knowledgebase rename)!")errors = Connector2KbService.link_connectors(kb.id, [conn for conn in connectors], current_user.id)if errors:logging.error("Link KB errors: ", errors)kb = kb.to_dict()kb.update(req)kb["connectors"] = connectorsreturn get_json_result(data=kb)except Exception as e:return server_error_response(e)
关键验证逻辑流程图:

三、对话管理API(dialog_app)
3.1 对话设置端点
源码位置:api/apps/dialog_app.py(推测)
@manager.route('/set', methods=['post'])@login_required@validate_request("id", "name", "kb_ids")async def set_dialog():req = await get_request_json()# 提取参数dialog_id = req["id"]name = req["name"]kb_ids = req["kb_ids"]llm_id = req.get("llm_id", "")# 创建或更新对话配置if dialog_id:e, dialog = DialogService.get_by_id(dialog_id)if not e:return get_data_error_result(message="Dialog not found")DialogService.update_by_id(dialog_id, req)else:req["tenant_id"] = current_user.idreq["created_by"] = current_user.iddialog_id = DialogService.save(**req)return get_json_result(data={"id": dialog_id})
3.2 对话问答端点
源码位置:api/apps/dialog_app.py(推测)
@manager.route('/ask', methods=['post'])@login_required@validate_request("question", "dialog_id")async def ask():req = await get_request_json()question = req["question"]dialog_id = req["dialog_id"]stream = req.get("stream", True)# 获取对话配置e, dialog = DialogService.get_by_id(dialog_id)if not e:return get_data_error_result(message="Dialog not found")# 获取知识库列表kb_ids = dialog.kb_idsembd_mdl = LLMBundle(current_user.id, LLMType.EMBEDDING, llm_name=dialog.embd_id)# 执行检索from rag.nlp import searchdealer = search.Dealer(settings.docStoreConn)ranks = await dealer.retrieval(question=question,embd_mdl=embd_mdl,tenant_ids=[current_user.id],kb_ids=kb_ids,page=1,page_size=10)# 生成答案chat_mdl = LLMBundle(current_user.id, LLMType.CHAT, llm_name=dialog.llm_id)if stream:# 流式返回async def generate():async for delta in chat_mdl.async_chat_streamly("", [{"role": "user", "content": question}], {}):yield f"data: {delta}\n\n"return Response(generate(), mimetype="text/event-stream")else:# 非流式返回ans, tk_count = await chat_mdl.async_chat("", [{"role": "user", "content": question}], {})return get_json_result(data={"answer": ans, "token_count": tk_count})
对话流程图:

四、文档管理API(document_app)
4.1 文档上传端点
源码位置:api/apps/document_app.py(推测)
@manager.route('/upload', methods=['post'])@login_requiredasync def upload():req = await get_request_json()kb_id = req["kb_id"]files = req.get("files", [])# 检查知识库权限if not KnowledgebaseService.accessible(kb_id, current_user.id):return get_json_result(code=RetCode.AUTHENTICATION_ERROR,message="No authorization.")# 上传文件到MinIOdoc_ids = []for file in files:# 保存文件file_path = FileService.upload(file, kb_id)# 创建文档记录doc_id = DocumentService.create(kb_id=kb_id,name=file.filename,location=file_path,size=file.size)doc_ids.append(doc_id)return get_json_result(data={"doc_ids": doc_ids})
4.2 文档解析端点
源码位置:api/apps/document_app.py(推测)
@manager.route('/run', methods=['post'])@login_required@validate_request("doc_ids")async def run():req = await get_request_json()doc_ids = req["doc_ids"]parser_id = req.get("parser_id", "naive")parser_config = req.get("parser_config", {})# 创建解析任务task_ids = []for doc_id in doc_ids:# 检查文档权限if not DocumentService.accessible(doc_id, current_user.id):continue# 创建任务task_id = TaskService.create(doc_id=doc_id,task_type="dataflow",parser_id=parser_id,parser_config=parser_config)task_ids.append(task_id)# 发送任务到Redis队列for task_id in task_ids:REDIS_CONN.queue_product("ragflow_svr_queue",{"id": task_id, "task_type": "dataflow"})return get_json_result(data={"task_ids": task_ids})
五、Canvas画布API(canvas_app)
5.1 画布运行端点
源码位置:api/apps/canvas_app.py:135(推测)
@manager.route('/run', methods=['post'])@login_requiredasync def run():req = await get_request_json()canvas_id = req["canvas_id"]query = req.get("query", "")files = req.get("files", [])# 获取Canvas DSLe, cvs = UserCanvasService.get_by_id(canvas_id)if not e:return get_data_error_result(message="Canvas not found")# 创建Canvas实例from agent.canvas import Canvascanvas = Canvas(dsl=cvs.dsl,tenant_id=current_user.id,canvas_id=canvas_id)# 设置全局变量canvas.globals["sys.query"] = querycanvas.globals["sys.files"] = files# 执行Canvasasync def generate():async for output in canvas.run():yield f"data: {json.dumps(output, ensure_ascii=False)}\n\n"return Response(generate(), mimetype="text/event-stream")
六、API装饰器设计
6.1 login_required装饰器
源码位置:api/apps/__init__.py(推测)
def login_required(func):"""登录验证装饰器"""@wraps(func)async def wrapper(*args, **kwargs):# 从请求头获取tokentoken = request.headers.get("Authorization")if not token:return get_json_result(code=RetCode.AUTHENTICATION_ERROR,message="Authorization header missing.")# 验证tokentry:payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])user_id = payload.get("user_id")# 设置current_userg.current_user = UserService.get_by_id(user_id)except jwt.ExpiredSignatureError:return get_json_result(code=RetCode.AUTHENTICATION_ERROR,message="Token expired.")except jwt.InvalidTokenError:return get_json_result(code=RetCode.AUTHENTICATION_ERROR,message="Invalid token.")return await func(*args, **kwargs)return wrapper
6.2 validate_request装饰器
源码位置:api/utils/api_utils.py(推测)
def validate_request(*required_fields):"""请求参数验证装饰器"""def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):req = await get_request_json()# 检查必需字段missing_fields = [field for field in required_fields if field not in req]if missing_fields:return get_json_result(code=RetCode.DATA_ERROR,message=f"Missing required fields: {', '.join(missing_fields)}")return await func(*args, **kwargs)return wrapperreturn decorator
七、API响应格式设计
7.1 统一响应函数
源码位置:api/utils/api_utils.py
def get_json_result(code=RetCode.SUCCESS, message="success", data=None):"""成功响应"""return jsonify({"code": code,"message": message,"data": data})def get_data_error_result(message="Data error"):"""数据错误响应"""return jsonify({"code": RetCode.DATA_ERROR,"message": message,"data": None})def server_error_response(e):"""服务器错误响应"""logging.exception(e)return jsonify({"code": RetCode.SERVER_ERROR,"message": str(e),"data": None})
7.2 错误码枚举
源码位置:common/constants.py
classRetCode(StrEnum):SUCCESS = "0"DATA_ERROR = "400"AUTHENTICATION_ERROR = "401"FORBIDDEN = "403"NOT_FOUND = "404"OPERATING_ERROR = "405"SERVER_ERROR = "500"
夜雨聆风