dify源码分析(1)
[+] Running 10/10⠿ Container docker-sandbox-1 Started 16.7s⠿ Container docker-ssrf_proxy-1 Started 16.9s⠿ Container docker-web-1 Started 16.7s⠿ Container docker-worker-1 Started 50.2s⠿ Container docker-plugin_daemon-1 Started 49.7s⠿ Container docker-api-1 Started 50.2s⠿ Container docker-nginx-1 Started 48.1s⠿ Container docker-redis-1 Started 2.0s⠿ Container docker-db-1 Healthy 33.8s⠿ Container docker-weaviate-1 Started
langgenius/dify-sandbox:0.2.12
用户上传的 Python 脚本、自定义工具等,需在沙盒中隔离运行。
docker-ssrf_proxy-1,SSRF(服务器端请求伪造)代理,用于安全地处理外部网络请求, 对应的配置位于dify/docker/ssrf_proxy/squid.conf.template使用的镜像是:
ubuntu/squid:latest
接下来的几个存储组件,docker-redis-1 缓存和消息队列服务,用于提升性能和任务调度docker-db-1数据库服务(通常是PostgreSQL),存储应用数据docker-weaviate-1向量数据库,用于存储和检索嵌入向量(embeddings),支持AI应用的语义搜索。
docker-nginx-1 Web服务器,负责反向代理和负载均衡。
剩下的就是dify定义的四个容器:1,docker-web-1 Dify的前端Web界面,提供用户操作界面。2,docker-worker-1 后台工作进程,处理异步任务(如模型推理、数据处理)。3,docker-plugin_daemon-1 插件守护进程,管理插件的生命周期和运行。4,docker-api-1 Dify的后端API服务,提供RESTful接口。
web对应的源码位于 dify/web/目录,是基于react的next.js框架开发的,我们接触操作的页面就是它实现的。
剩下的后端源码都是基于python实现的,源码位于目录:dify/api/
可以使用如下命令启动服务和worker
uv run flask run --host 0.0.0.0 --port=5001 --debug
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
我们从入口文件dify/api/app.py开始阅读,它是基于flask框架提供的http服务
from app_factory import create_appapp = create_app()celery = app.extensions["celery"]
if __name__ == "__main__":app.run(host="0.0.0.0", port=5001)
在dify/api/app_factory.py中初始化了flask框架
def create_app() -> DifyApp:start_time = time.perf_counter()app = create_flask_app_with_configs()initialize_extensions(app)end_time = time.perf_counter()if dify_config.DEBUG:logger.info("Finished create_app (%s ms)", round((end_time - start_time) * 1000, 2))return app
def create_flask_app_with_configs() -> DifyApp:"""create a raw flask appwith configs loaded from .env file"""dify_app = DifyApp(__name__)
from flask import Flaskclass DifyApp(Flask):pass
至此完成了服务器的初始化。API服务模块采用经典三层架构设计:
• 接口层(controllers/):负责HTTP请求接收与响应处理
• 核心逻辑层(core/):封装系统核心能力,如agent、rag、workflow
• 业务服务层(services/):承上启下的中间层,封装复杂业务流程
在dify/api/controllers/service_api/__init__.py,使用flask_restx定义namespace,然后在这个ns下定义了各种接口
from flask import Blueprintfrom flask_restx import Namespacefrom libs.external_api import ExternalApibp = Blueprint("service_api", __name__, url_prefix="/v1")api = ExternalApi(bp,version="1.0",title="Service API",description="API for application services",)service_api_ns = Namespace("service_api", description="Service operations", path="/")api.add_namespace(service_api_ns)
比如chat接口,定义位于dify/api/controllers/service_api/app/conversation.py
@service_api_ns.route("/conversations")class ConversationApi(Resource):try:with Session(db.engine) as session:return ConversationService.pagination_by_last_id(session=session,app_model=app_model,user=end_user,last_id=last_id,limit=query_args.limit,invoke_from=InvokeFrom.SERVICE_API,sort_by=query_args.sort_by,)
对话补全 dify/api/controllers/service_api/app/completion.py
@service_api_ns.route("/completion-messages")class CompletionApi(Resource):@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))def post(self, app_model: App, end_user: EndUser):"""Create a completion for the given prompt.This endpoint generates a completion based on the provided inputs and query.Supports both blocking and streaming response modes."""try:response = AppGenerateService.generate(app_model=app_model,user=end_user,args=args,invoke_from=InvokeFrom.SERVICE_API,streaming=streaming,)return helper.compact_generate_response(response)
工作流dify/api/controllers/service_api/app/workflow.py
@service_api_ns.route("/workflows/run/<string:workflow_run_id>")class WorkflowRunDetailApi(Resource):@service_api_ns.marshal_with(build_workflow_run_model(service_api_ns))def get(self, app_model: App, workflow_run_id: str):"""Get a workflow task running detail.Returns detailed information about a specific workflow run."""session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)workflow_run = workflow_run_repo.get_workflow_run_by_id(tenant_id=app_model.tenant_id,app_id=app_model.id,run_id=workflow_run_id,)return workflow_run
@service_api_ns.route("/workflows/run")class WorkflowRunApi(Resource):@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))def post(self, app_model: App, end_user: EndUser):"""Execute a workflow.Runs a workflow with the provided inputs and returns the results.Supports both blocking and streaming response modes."""try:response = AppGenerateService.generate(app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming)return helper.compact_generate_response(response)
可以看到,都是调用了AppGenerateService.generate来完成答案的生成。对应代码目录在dify/api/services/app_generate_service.py
class AppGenerateService:@classmethod@trace_span(AppGenerateHandler)def generate(cls,app_model: App,user: Union[Account, EndUser],args: Mapping[str, Any],invoke_from: InvokeFrom,streaming: bool = True,root_node_id: str | None = None,):"""App Content Generate:param app_model: app model:param user: user:param args: args:param invoke_from: invoke from:param streaming: streaming:return:"""try:request_id = rate_limit.enter(request_id)if app_model.mode == AppMode.COMPLETION:return rate_limit.generate(CompletionAppGenerator.convert_to_event_stream(CompletionAppGenerator().generate(app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming),),request_id=request_id,)elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent:return rate_limit.generate(AgentChatAppGenerator.convert_to_event_stream(AgentChatAppGenerator().generate(app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming),),request_id,)elif app_model.mode == AppMode.CHAT:return rate_limit.generate(ChatAppGenerator.convert_to_event_stream(ChatAppGenerator().generate(app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming),),request_id=request_id,)elif app_model.mode == AppMode.ADVANCED_CHAT:workflow_id = args.get("workflow_id")workflow = cls._get_workflow(app_model, invoke_from, workflow_id)return rate_limit.generate(AdvancedChatAppGenerator.convert_to_event_stream(AdvancedChatAppGenerator().generate(app_model=app_model,workflow=workflow,user=user,args=args,invoke_from=invoke_from,streaming=streaming,),),request_id=request_id,)elif app_model.mode == AppMode.WORKFLOW:workflow_id = args.get("workflow_id")workflow = cls._get_workflow(app_model, invoke_from, workflow_id)return rate_limit.generate(WorkflowAppGenerator.convert_to_event_stream(WorkflowAppGenerator().generate(app_model=app_model,workflow=workflow,user=user,args=args,invoke_from=invoke_from,streaming=streaming,root_node_id=root_node_id,call_depth=0,),),request_id,)else:raise ValueError(f"Invalid app mode {app_model.mode}")except Exception:quota_charge.refund()rate_limit.exit(request_id)raisefinally:if not streaming:rate_limit.exit(request_id)
根据不同的请求类型,将请求分发到core,比如其中completition源码位于learn/go-dify/api/core/app/apps/completion/app_generator.py
class CompletionAppGenerator(MessageBasedAppGenerator):@overloaddef generate(self,app_model: App,user: Union[Account, EndUser],args: Mapping[str, Any],invoke_from: InvokeFrom,streaming: Literal[True],) -> Generator[str | Mapping[str, Any], None, None]: ...def generate(self,app_model: App,user: Union[Account, EndUser],args: Mapping[str, Any],invoke_from: InvokeFrom,streaming: bool = True,) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:"""Generate App response.:param app_model: App:param user: account or end user:param args: request args:param invoke_from: invoke from source:param streaming: is stream"""# init application generate entityapplication_generate_entity = CompletionAppGenerateEntity(task_id=str(uuid.uuid4()),app_config=app_config,model_conf=ModelConfigConverter.convert(app_config),file_upload_config=file_extra_config,inputs=self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id),query=query,files=list(file_objs),user_id=user.id,stream=streaming,invoke_from=invoke_from,extras={},trace_manager=trace_manager,)# new thread with request context@copy_current_request_contextdef worker_with_context():return self._generate_worker(flask_app=current_app._get_current_object(), # type: ignoreapplication_generate_entity=application_generate_entity,queue_manager=queue_manager,message_id=message.id,)worker_thread = threading.Thread(target=worker_with_context)worker_thread.start()# return response or stream generatorresponse = self._handle_response(application_generate_entity=application_generate_entity,queue_manager=queue_manager,conversation=conversation,message=message,user=user,stream=streaming,)def _generate_worker(self,flask_app: Flask,application_generate_entity: CompletionAppGenerateEntity,queue_manager: AppQueueManager,message_id: str,):"""Generate worker in a new thread.:param flask_app: Flask app:param application_generate_entity: application generate entity:param queue_manager: queue manager:param message_id: message ID:return:"""with flask_app.app_context():try:# get messagemessage = self._get_message(message_id)# chatbot apprunner = CompletionAppRunner()runner.run(application_generate_entity=application_generate_entity,queue_manager=queue_manager,message=message,)
继续跟进dify/api/core/app/apps/completion/app_runner.py
class CompletionAppRunner(AppRunner):"""Completion Application Runner"""def run(self, application_generate_entity: CompletionAppGenerateEntity, queue_manager: AppQueueManager, message: Message):"""Run application:param application_generate_entity: application generate entity:param queue_manager: application queue manager:param message: message:return:"""try:# process sensitive_word_avoidance_, inputs, query = self.moderation_for_inputs(app_id=app_record.id,tenant_id=app_config.tenant_id,app_generate_entity=application_generate_entity,inputs=inputs,query=query or "",message_id=message.id,)invoke_result = model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=application_generate_entity.model_conf.parameters,stop=stop,stream=application_generate_entity.stream,user=application_generate_entity.user_id,)
终于到了调用llm的地方dify/api/core/model_manager.py
class ModelInstance:"""Model instance class"""@overloaddef invoke_llm(self,prompt_messages: Sequence[PromptMessage],model_parameters: dict | None = None,tools: Sequence[PromptMessageTool] | None = None,stop: list[str] | None = None,stream: Literal[True] = True,user: str | None = None,callbacks: list[Callback] | None = None,) -> Generator: ...def invoke_llm(self,prompt_messages: Sequence[PromptMessage],model_parameters: dict | None = None,tools: Sequence[PromptMessageTool] | None = None,stop: Sequence[str] | None = None,stream: bool = True,user: str | None = None,callbacks: list[Callback] | None = None,) -> Union[LLMResult, Generator]:"""Invoke large language model:param prompt_messages: prompt messages:param model_parameters: model parameters:param tools: tools for tool calling:param stop: stop words:param stream: is stream response:param user: unique user id:param callbacks: callbacks:return: full response or stream response chunk generator result"""return cast(Union[LLMResult, Generator],self._round_robin_invoke(function=self.model_type_instance.invoke,model=self.model,credentials=self.credentials,prompt_messages=prompt_messages,model_parameters=model_parameters,tools=tools,stop=stop,stream=stream,user=user,callbacks=callbacks,),)
逐层跟进下去,发现调用了dify/api/core/model_runtime/model_providers/__base/large_language_model.py
class LargeLanguageModel(AIModel):"""Model class for large language model."""model_type: ModelType = ModelType.LLM# pydantic configsmodel_config = ConfigDict(protected_namespaces=())def invoke(self,model: str,credentials: dict,prompt_messages: list[PromptMessage],model_parameters: dict | None = None,tools: list[PromptMessageTool] | None = None,stop: list[str] | None = None,stream: bool = True,user: str | None = None,callbacks: list[Callback] | None = None,) -> Union[LLMResult, Generator[LLMResultChunk, None, None]]:"""Invoke large language model:param model: model name:param credentials: model credentials:param prompt_messages: prompt messages:param model_parameters: model parameters:param tools: tools for tool calling:param stop: stop words:param stream: is stream response:param user: unique user id:param callbacks: callbacks:return: full response or stream response chunk generator result"""# validate and filter model parametersif model_parameters is None:model_parameters = {}self.started_at = time.perf_counter()callbacks = callbacks or []if dify_config.DEBUG:callbacks.append(LoggingCallback())# trigger before invoke callbacksself._trigger_before_invoke_callbacks(model=model,credentials=credentials,prompt_messages=prompt_messages,model_parameters=model_parameters,tools=tools,stop=stop,stream=stream,user=user,callbacks=callbacks,)result: Union[LLMResult, Generator[LLMResultChunk, None, None]]try:from core.plugin.impl.model import PluginModelClientplugin_model_manager = PluginModelClient()result = plugin_model_manager.invoke_llm(tenant_id=self.tenant_id,user_id=user or "unknown",plugin_id=self.plugin_id,provider=self.provider_name,model=model,credentials=credentials,model_parameters=model_parameters,prompt_messages=prompt_messages,tools=tools,stop=list(stop) if stop else None,stream=stream,)
它其实是调用了model插件dify/api/core/plugin/impl/model.py
class PluginModelClient(BasePluginClient):def invoke_llm(self,tenant_id: str,user_id: str,plugin_id: str,provider: str,model: str,credentials: dict,prompt_messages: list[PromptMessage],model_parameters: dict | None = None,tools: list[PromptMessageTool] | None = None,stop: list[str] | None = None,stream: bool = True,) -> Generator[LLMResultChunk, None, None]:"""Invoke llm"""response = self._request_with_plugin_daemon_response_stream(method="POST",path=f"plugin/{tenant_id}/dispatch/llm/invoke",type_=LLMResultChunk,data=jsonable_encoder({"user_id": user_id,"data": {"provider": provider,"model_type": "llm","model": model,"credentials": credentials,"prompt_messages": prompt_messages,"model_parameters": model_parameters,"tools": tools,"stop": stop,"stream": stream,},}),headers={"X-Plugin-ID": plugin_id,"Content-Type": "application/json",},)try:yield from response


夜雨聆风
