乐于分享
好东西不私藏

dify源码分析(1)

dify源码分析(1)

        在本地部署完dify后Dify使用deepseek,我们发现启动了10个容器,我们先从这10个容器的功能开始探索dify的源码
[+] Running 10/10 ⠿ Container docker-sandbox-1        Started                                                                        16.7s ⠿ Container docker-ssrf_proxy-1     Started                                                                        16.9s ⠿ Container docker-web-1            Started                                                                        16.7s ⠿ Container docker-worker-1         Started                                                                        50.2s ⠿ Container docker-plugin_daemon-1  Started                                                                        49.7s ⠿ Container docker-api-1            Started                                                                        50.2s ⠿ Container docker-nginx-1          Started                                                                        48.1s ⠿ Container docker-redis-1          Started                                                                         2.0s ⠿ Container docker-db-1             Healthy                                                                        33.8s ⠿ Container docker-weaviate-1       Started 
    首先是两个安全组件,阅读docker-compose.yaml,发现docker-sandbox-1 使用的镜像是
langgenius/dify-sandbox:0.2.12

用户上传的 Python 脚本、自定义工具等,需在沙盒中隔离运行。

    docker-ssrf_proxy-1,SSRF(服务器端请求伪造)代理,用于安全地处理外部网络请求, 对应的配置位于dify/docker/ssrf_proxy/squid.conf.template使用的镜像是:

ubuntu/squid:latest

     接下来的几个存储组件,docker-redis-1‌ 缓存和消息队列服务,用于提升性能和任务调度docker-db-1‌数据库服务(通常是PostgreSQL),存储应用数据docker-weaviate-1‌向量数据库,用于存储和检索嵌入向量(embeddings),支持AI应用的语义搜索。

      docker-nginx-1 Web服务器,负责反向代理和负载均衡。

      剩下的就是dify定义的四个容器:1,docker-web-1‌ Dify的前端Web界面,提供用户操作界面。2,docker-worker-1‌ 后台工作进程,处理异步任务(如模型推理、数据处理)。3,docker-plugin_daemon-1‌ 插件守护进程,管理插件的生命周期和运行。4,docker-api-1‌ Dify的后端API服务,提供RESTful接口。

     web对应的源码位于 dify/web/目录,是基于react的next.js框架开发的,我们接触操作的页面就是它实现的。

      剩下的后端源码都是基于python实现的,源码位于目录:dify/api/

可以使用如下命令启动服务和worker

uv run flask run --host 0.0.0.0 --port=5001 --debug
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention

    我们从入口文件dify/api/app.py开始阅读,它是基于flask框架提供的http服务

    from app_factory import create_app    app = create_app()    celery = app.extensions["celery"]
if __name__ == "__main__":    app.run(host="0.0.0.0", port=5001)

        在dify/api/app_factory.py中初始化了flask框架

def create_app() -> DifyApp:    start_time = time.perf_counter()    app = create_flask_app_with_configs()    initialize_extensions(app)    end_time = time.perf_counter()    if dify_config.DEBUG:        logger.info("Finished create_app (%s ms)"round((end_time - start_time) * 10002))    return app
def create_flask_app_with_configs() -> DifyApp:    """    create a raw flask app    with configs loaded from .env file    """    dify_app = DifyApp(__name__)
from flask import Flaskclass DifyApp(Flask):    pass

至此完成了服务器的初始化。API服务模块采用经典三层架构设计:

• 接口层(controllers/):负责HTTP请求接收与响应处理

• 核心逻辑层(core/):封装系统核心能力,如agent、rag、workflow

• 业务服务层(services/):承上启下的中间层,封装复杂业务流程

在dify/api/controllers/service_api/__init__.py,使用flask_restx定义namespace,然后在这个ns下定义了各种接口

from flask import Blueprintfrom flask_restx import Namespacefrom libs.external_api import ExternalApibp = Blueprint("service_api", __name__, url_prefix="/v1")api = ExternalApi(    bp,    version="1.0",    title="Service API",    description="API for application services",)service_api_ns = Namespace("service_api", description="Service operations", path="/")api.add_namespace(service_api_ns)

比如chat接口,定义位于dify/api/controllers/service_api/app/conversation.py

@service_api_ns.route("/conversations")class ConversationApi(Resource):         try:            with Session(db.engine) as session:                return ConversationService.pagination_by_last_id(                    session=session,                    app_model=app_model,                    user=end_user,                    last_id=last_id,                    limit=query_args.limit,                    invoke_from=InvokeFrom.SERVICE_API,                    sort_by=query_args.sort_by,                )

对话补全 dify/api/controllers/service_api/app/completion.py

@service_api_ns.route("/completion-messages")class CompletionApi(Resource):    @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))    def post(self, app_model: App, end_user: EndUser):        """Create a completion for the given prompt.        This endpoint generates a completion based on the provided inputs and query.        Supports both blocking and streaming response modes.        """         try:            response = AppGenerateService.generate(                app_model=app_model,                user=end_user,                args=args,                invoke_from=InvokeFrom.SERVICE_API,                streaming=streaming,            )            return helper.compact_generate_response(response)

工作流dify/api/controllers/service_api/app/workflow.py

@service_api_ns.route("/workflows/run/<string:workflow_run_id>")class WorkflowRunDetailApi(Resource):    @service_api_ns.marshal_with(build_workflow_run_model(service_api_ns))    def get(self, app_model: App, workflow_run_id: str):        """Get a workflow task running detail.        Returns detailed information about a specific workflow run.        """         session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)        workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)        workflow_run = workflow_run_repo.get_workflow_run_by_id(            tenant_id=app_model.tenant_id,            app_id=app_model.id,            run_id=workflow_run_id,        )        return workflow_run
@service_api_ns.route("/workflows/run")class WorkflowRunApi(Resource):    @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))    def post(self, app_model: App, end_user: EndUser):        """Execute a workflow.        Runs a workflow with the provided inputs and returns the results.        Supports both blocking and streaming response modes.        """         try:            response = AppGenerateService.generate(                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming            )            return helper.compact_generate_response(response)

可以看到,都是调用了AppGenerateService.generate来完成答案的生成。对应代码目录在dify/api/services/app_generate_service.py

class AppGenerateService:    @classmethod    @trace_span(AppGenerateHandler)    def generate(        cls,        app_model: App,        user: Union[AccountEndUser],        args: Mapping[str, Any],        invoke_from: InvokeFrom,        streaming: bool = True,        root_node_id: str | None = None,    ):        """        App Content Generate        :param app_model: app model        :param user: user        :param args: args        :param invoke_from: invoke from        :param streaming: streaming        :return:        """         try:            request_id = rate_limit.enter(request_id)            if app_model.mode == AppMode.COMPLETION:                return rate_limit.generate(                    CompletionAppGenerator.convert_to_event_stream(                        CompletionAppGenerator().generate(                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming                        ),                    ),                    request_id=request_id,                )            elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent:                return rate_limit.generate(                    AgentChatAppGenerator.convert_to_event_stream(                        AgentChatAppGenerator().generate(                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming                        ),                    ),                    request_id,                )            elif app_model.mode == AppMode.CHAT:                return rate_limit.generate(                    ChatAppGenerator.convert_to_event_stream(                        ChatAppGenerator().generate(                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming                        ),                    ),                    request_id=request_id,                )            elif app_model.mode == AppMode.ADVANCED_CHAT:                workflow_id = args.get("workflow_id")                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)                return rate_limit.generate(                    AdvancedChatAppGenerator.convert_to_event_stream(                        AdvancedChatAppGenerator().generate(                            app_model=app_model,                            workflow=workflow,                            user=user,                            args=args,                            invoke_from=invoke_from,                            streaming=streaming,                        ),                    ),                    request_id=request_id,                )            elif app_model.mode == AppMode.WORKFLOW:                workflow_id = args.get("workflow_id")                workflow = cls._get_workflow(app_model, invoke_from, workflow_id)                return rate_limit.generate(                    WorkflowAppGenerator.convert_to_event_stream(                        WorkflowAppGenerator().generate(                            app_model=app_model,                            workflow=workflow,                            user=user,                            args=args,                            invoke_from=invoke_from,                            streaming=streaming,                            root_node_id=root_node_id,                            call_depth=0,                        ),                    ),                    request_id,                )            else:                raise ValueError(f"Invalid app mode {app_model.mode}")        except Exception:            quota_charge.refund()            rate_limit.exit(request_id)            raise        finally:            if not streaming:                rate_limit.exit(request_id)

根据不同的请求类型,将请求分发到core,比如其中completition源码位于learn/go-dify/api/core/app/apps/completion/app_generator.py

class CompletionAppGenerator(MessageBasedAppGenerator):    @overload    def generate(        self,        app_model: App,        user: Union[Account, EndUser],        args: Mapping[strAny],        invoke_from: InvokeFrom,        streaming: Literal[True],    ) -> Generator[str | Mapping[strAny], NoneNone]: ...    def generate(        self,        app_model: App,        user: Union[Account, EndUser],        args: Mapping[strAny],        invoke_from: InvokeFrom,        streaming: bool = True,    ) -> Union[Mapping[strAny], Generator[str | Mapping[strAny], NoneNone]]:        """        Generate App response.        :param app_model: App        :param user: account or end user        :param args: request args        :param invoke_from: invoke from source        :param streaming: is stream        """         # init application generate entity        application_generate_entity = CompletionAppGenerateEntity(            task_id=str(uuid.uuid4()),            app_config=app_config,            model_conf=ModelConfigConverter.convert(app_config),            file_upload_config=file_extra_config,            inputs=self._prepare_user_inputs(                user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id            ),            query=query,            files=list(file_objs),            user_id=user.id,            stream=streaming,            invoke_from=invoke_from,            extras={},            trace_manager=trace_manager,        )        # new thread with request context        @copy_current_request_context        def worker_with_context():            return self._generate_worker(                flask_app=current_app._get_current_object(),  # type: ignore                application_generate_entity=application_generate_entity,                queue_manager=queue_manager,                message_id=message.id,            )        worker_thread = threading.Thread(target=worker_with_context)        worker_thread.start()        # return response or stream generator        response = self._handle_response(            application_generate_entity=application_generate_entity,            queue_manager=queue_manager,            conversation=conversation,            message=message,            user=user,            stream=streaming,        )  def _generate_worker(        self,        flask_app: Flask,        application_generate_entity: CompletionAppGenerateEntity,        queue_manager: AppQueueManager,        message_id: str,    ):        """        Generate worker in a new thread.        :param flask_app: Flask app        :param application_generate_entity: application generate entity        :param queue_manager: queue manager        :param message_id: message ID        :return:        """        with flask_app.app_context():            try:                # get message                message = self._get_message(message_id)                # chatbot app                runner = CompletionAppRunner()                runner.run(                    application_generate_entity=application_generate_entity,                    queue_manager=queue_manager,                    message=message,                )

继续跟进dify/api/core/app/apps/completion/app_runner.py

class CompletionAppRunner(AppRunner):    """    Completion Application Runner    """    def run(        self, application_generate_entity: CompletionAppGenerateEntity, queue_manager: AppQueueManager, message: Message    ):        """        Run application        :param application_generate_entity: application generate entity        :param queue_manager: application queue manager        :param message: message        :return:        """        try:            # process sensitive_word_avoidance            _, inputs, query = self.moderation_for_inputs(                app_id=app_record.id,                tenant_id=app_config.tenant_id,                app_generate_entity=application_generate_entity,                inputs=inputs,                query=query or "",                message_id=message.id,            )       invoke_result = model_instance.invoke_llm(            prompt_messages=prompt_messages,            model_parameters=application_generate_entity.model_conf.parameters,            stop=stop,            stream=application_generate_entity.stream,            user=application_generate_entity.user_id,        )

终于到了调用llm的地方dify/api/core/model_manager.py

class ModelInstance:    """    Model instance class    """    @overload    def invoke_llm(        self,        prompt_messages: Sequence[PromptMessage],        model_parameters: dict | None = None,        tools: Sequence[PromptMessageTool] | None = None,        stop: list[str] | None = None,        stream: Literal[True] = True,        user: str | None = None,        callbacks: list[Callback] | None = None,    ) -> Generator: ...    def invoke_llm(        self,        prompt_messages: Sequence[PromptMessage],        model_parameters: dict | None = None,        tools: Sequence[PromptMessageTool] | None = None,        stop: Sequence[str] | None = None,        stream: bool = True,        user: str | None = None,        callbacks: list[Callback] | None = None,    ) -> Union[LLMResult, Generator]:        """        Invoke large language model        :param prompt_messages: prompt messages        :param model_parameters: model parameters        :param tools: tools for tool calling        :param stop: stop words        :param stream: is stream response        :param user: unique user id        :param callbacks: callbacks        :return: full response or stream response chunk generator result        """         return cast(            Union[LLMResult, Generator],            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                prompt_messages=prompt_messages,                model_parameters=model_parameters,                tools=tools,                stop=stop,                stream=stream,                user=user,                callbacks=callbacks,            ),        )

逐层跟进下去,发现调用了dify/api/core/model_runtime/model_providers/__base/large_language_model.py

class LargeLanguageModel(AIModel):    """    Model class for large language model.    """    model_type: ModelType = ModelType.LLM    # pydantic configs    model_config = ConfigDict(protected_namespaces=())    def invoke(        self,        model: str,        credentials: dict,        prompt_messages: list[PromptMessage],        model_parameters: dict | None = None,        tools: list[PromptMessageTool] | None = None,        stop: list[str] | None = None,        stream: bool = True,        user: str | None = None,        callbacks: list[Callback] | None = None,    ) -> Union[LLMResult, Generator[LLMResultChunk, NoneNone]]:        """        Invoke large language model        :param model: model name        :param credentials: model credentials        :param prompt_messages: prompt messages        :param model_parameters: model parameters        :param tools: tools for tool calling        :param stop: stop words        :param stream: is stream response        :param user: unique user id        :param callbacks: callbacks        :return: full response or stream response chunk generator result        """        # validate and filter model parameters        if model_parameters is None:            model_parameters = {}        self.started_at = time.perf_counter()        callbacks = callbacks or []        if dify_config.DEBUG:            callbacks.append(LoggingCallback())        # trigger before invoke callbacks        self._trigger_before_invoke_callbacks(            model=model,            credentials=credentials,            prompt_messages=prompt_messages,            model_parameters=model_parameters,            tools=tools,            stop=stop,            stream=stream,            user=user,            callbacks=callbacks,        )        result: Union[LLMResult, Generator[LLMResultChunk, NoneNone]]        try:            from core.plugin.impl.model import PluginModelClient            plugin_model_manager = PluginModelClient()            result = plugin_model_manager.invoke_llm(                tenant_id=self.tenant_id,                user_id=user or "unknown",                plugin_id=self.plugin_id,                provider=self.provider_name,                model=model,                credentials=credentials,                model_parameters=model_parameters,                prompt_messages=prompt_messages,                tools=tools,                stop=list(stop) if stop else None,                stream=stream,            )

它其实是调用了model插件dify/api/core/plugin/impl/model.py

class PluginModelClient(BasePluginClient):   def invoke_llm(        self,        tenant_id: str,        user_id: str,        plugin_id: str,        provider: str,        model: str,        credentials: dict,        prompt_messages: list[PromptMessage],        model_parameters: dict | None = None,        tools: list[PromptMessageTool] | None = None,        stop: list[str] | None = None,        stream: bool = True,    ) -> Generator[LLMResultChunk, NoneNone]:        """        Invoke llm        """        response = self._request_with_plugin_daemon_response_stream(            method="POST",            path=f"plugin/{tenant_id}/dispatch/llm/invoke",            type_=LLMResultChunk,            data=jsonable_encoder(                {                    "user_id": user_id,                    "data": {                        "provider": provider,                        "model_type""llm",                        "model": model,                        "credentials": credentials,                        "prompt_messages": prompt_messages,                        "model_parameters": model_parameters,                        "tools": tools,                        "stop": stop,                        "stream": stream,                    },                }            ),            headers={                "X-Plugin-ID": plugin_id,                "Content-Type""application/json",            },        )        try:            yield from response 
本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » dify源码分析(1)

评论 抢沙发

1 + 8 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮