乐于分享
好东西不私藏

Datus-Agent 深度分析文档 – 模块 4:存储与知识管理

Datus-Agent 深度分析文档 – 模块 4:存储与知识管理

本文档深入分析 Datus-Agent 的存储与知识管理系统,包括 LanceDB 向量存储、RAG 系统、知识图谱、Subject Tree 层级管理。


目录

  1. 存储架构概览
  2. LanceDB 向量存储
  3. RAG 系统详解
  4. Subject Tree 层级管理
  5. 知识存储模块
  6. 多租户隔离

存储架构概览

整体存储架构

┌─────────────────────────────────────────────────────────────────────────┐│                        Datus Storage Layer                               │├─────────────────────────────────────────────────────────────────────────┤│                                                                          ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │  Schema      │     │  Semantic    │     │   Reference  │            ││  │  Metadata    │     │    Model     │     │     SQL      │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││         │                   │                   │                        ││         ▼                   ▼                   ▼                        ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐            ││  │   Ext        │     │   Document   │     │   Feedback   │            ││  │  Knowledge   │     │    Store     │     │    Store     │            ││  └──────────────┘     └──────────────┘     └──────────────┘            ││                                                                          ││  ┌──────────────────────────────────────────────────────────────────┐  ││  │                     Subject Tree Store                            │  ││  │              (Domain Hierarchy Management)                        │  ││  └──────────────────────────────────────────────────────────────────┘  ││                                                                          │└─────────────────────────────────────────────────────────────────────────┘                              │                              ▼┌─────────────────────────────────────────────────────────────────────────┐│                      LanceDB Vector Backend                              ││  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  ┌────────────┐ ││  │  Vector      │  │   Scalar     │  │     FTS      │  │   RDB      │ ││  │  Index       │  │   Index      │  │    Index     │  │  (SQLite)  │ ││  └──────────────┘  └──────────────┘  └──────────────┘  └────────────┘ │└─────────────────────────────────────────────────────────────────────────┘

存储目录结构

datus/storage/├── base.py                    # 存储基类├── backend_holder.py          # 后端连接管理├── embedding_models.py        # 嵌入模型├── rag_scope.py               # RAG 范围控制├── scoped_filter.py           # 范围过滤器├── schema_metadata/           # Schema 元数据存储│   ├── store.py│   ├── local_init.py│   ├── benchmark_init.py│   └── ...├── semantic_model/            # 语义模型存储│   ├── store.py│   ├── adapter_init.py│   └── ...├── reference_sql/             # 参考 SQL 存储│   ├── store.py│   └── ...├── ext_knowledge/             # 外部知识存储│   ├── store.py│   └── ...├── metric/                    # 指标存储│   ├── store.py│   └── ...├── document/                  # 文档存储│   ├── store.py│   └── ...├── subject_tree/              # Subject Tree 存储│   ├── store.py│   └── ...├── feedback/                  # 反馈存储├── task/                      # 任务存储└── rdb/                       # 关系数据存储

存储抽象层次

┌─────────────────────────────────────────────────────────────┐│                   Storage Abstraction                        │├─────────────────────────────────────────────────────────────┤│                                                              ││  Layer 1: StorageBase                                        ││  ┌────────────────────────────────────────────────────────┐ ││  │ class StorageBase:                                     │ ││  │   + db: VectorDatabase                                 │ ││  │   + _get_current_timestamp(): str                      │ ││  └────────────────────────────────────────────────────────┘ ││                          │                                   ││                          ▼                                   ││  Layer 2: BaseEmbeddingStore                                 ││  ┌────────────────────────────────────────────────────────┐ ││  │ class BaseEmbeddingStore(StorageBase):                 │ ││  │   + model: EmbeddingModel                              │ ││  │   + table_name: str                                    │ ││  │   + vector_source_name: str                            │ ││  │   + search(query, top_n, where): pa.Table              │ ││  │   + batch_store(data): None                            │ ││  │   + truncate(): None                                   │ ││  └────────────────────────────────────────────────────────┘ ││                          │                                   ││                          ▼                                   ││  Layer 3: Domain-Specific Stores                             ││  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      ││  │ SchemaStorage│  │SemanticModel │  │ ExtKnowledge │      ││  │              │  │   Storage    │  │    Store     │      ││  └──────────────┘  └──────────────┘  └──────────────┘      ││                                                              │└─────────────────────────────────────────────────────────────┘

LanceDB 向量存储

LanceDB 简介

Datus-Agent 使用 LanceDB 作为向量数据库后端,原因:

  1. 本地优先
    :支持本地文件存储,无需额外服务
  2. 混合搜索
    :同时支持向量搜索和标量过滤
  3. 全文搜索
    :内置 FTS(Full-Text Search)支持
  4. 列式存储
    :基于 Apache Arrow/Parquet,高效压缩
  5. 多版本控制
    :支持 ACID 事务和时间旅行

存储基类实现

classStorageBase:"""所有存储组件的基类"""def__init__(self, db: Optional[VectorDatabase] = None):"""初始化存储基类        Args:            db: 可选的预创建 VectorDatabase 连接                如果提供,直接使用;否则使用全局命名空间连接        """if db isnotNone:            self.db: VectorDatabase = dbelse:from datus.storage.backend_holder import create_vector_connection            self.db = create_vector_connection()def_get_current_timestamp(self) -> str:"""获取当前时间戳(UTC ISO 格式)"""return datetime.now(timezone.utc).isoformat()classBaseEmbeddingStore(StorageBase):"""所有嵌入存储的基类"""def__init__(        self,        table_name: str,        embedding_model: EmbeddingModel,        on_duplicate_columns: str = "vector",        schema: Optional[pa.Schema] = None,        vector_source_name: str = "definition",        vector_column_name: str = "vector",        unique_columns: Optional[List[str]] = None,        db: Optional[VectorDatabase] = None,        table_prefix: str = "",        extra_fields: Optional[List[pa.Field]] = None,        default_values: Optional[Dict[strAny]] = None,        scope_indices: Optional[List[str]] = None,):super().__init__(db=db)        self.model = embedding_model        self.batch_size = embedding_model.batch_size        self.table_name = f"{table_prefix}{table_name}"if table_prefix else table_name        self.vector_source_name = vector_source_name        self.vector_column_name = vector_column_name        self.on_duplicate_columns = on_duplicate_columns# 追加额外字段到 schemaif schema isnotNoneand extra_fields:            schema = pa.schema(list(schema) + extra_fields)# 确保 datasource_id 字段存在(用于基于 subject-tree 的存储)if schema isnotNone:            existing_names = {f.name for f in schema}if"datasource_id"notin existing_names:                schema = pa.schema(list(schema) + [pa.field("datasource_id", pa.string())])        self._schema = schema        self._unique_columns = unique_columns        self._default_values: Dict[strAny] = dict(default_values) if default_values else {}        self._scope_indices: List[str] = list(scope_indices or [])# 延迟表初始化        self._shared = _SharedTableState()        self._table_lock = Lock()        self._write_lock = Lock()    @propertydeftable(self) -> Optional[VectorTable]:return self._shared.table    @table.setterdeftable(self, value: Optional[VectorTable]):        self._shared.table = valuedef_ensure_table_ready(self):"""确保表已准备好,带有适当的错误处理"""if self._shared.initialized:returnwith self._table_lock:if self._shared.initialized:return# 检查嵌入模型可用性            self._check_embedding_model_ready()# 使用嵌入函数初始化表            self._ensure_table(self._schema)            self._shared.initialized = True# 自动为范围字段创建标量索引for col in self._scope_indices:                self._create_scalar_index(col)def_apply_default_values(self, data: List[Dict[strAny]]) -> List[Dict[strAny]]:"""为缺失默认值的行填充默认值"""ifnot self._default_values:return datafor row in data:for k, v in self._default_values.items():                row.setdefault(k, v)return data

向量搜索实现

classBaseEmbeddingStore(StorageBase):def_search_all(        self,         where: WhereExpr = None        select_fields: Optional[List[str]] = None,        limit: Optional[int] = None,) -> pa.Table:"""搜索所有数据(带过滤和字段选择)"""        self._ensure_table_ready()if limit:            row_limit = limitelse:            row_limit = self.table.count_rows(where) if where else self.table.count_rows()        result = self.table.search_all(            where=where,             select_fields=select_fields,             limit=row_limit        )# 移除向量列(除非明确请求)if self.vector_column_name in result.column_names:            result = result.drop([self.vector_column_name])return resultdefsearch(        self,        query_text: str,        top_n: int = 10,        where: WhereExpr = None,        query_type: str = "vector",) -> pa.Table:"""执行搜索(支持向量和全文)"""        self._ensure_table_ready()# 生成查询向量        query_vector = self.model.embed_query(query_text)if query_type == "vector":# 向量相似性搜索            result = self.table.search(                query_vector=query_vector,                vector_column=self.vector_column_name,                top_n=top_n,                where=where,            )elif query_type == "fts":# 全文搜索            result = self.table.fts_search(                query=query_text,                top_n=top_n,                where=where,            )elif query_type == "hybrid":# 混合搜索(向量 + 全文)            result = self.table.hybrid_search(                query_text=query_text,                query_vector=query_vector,                top_n=top_n,                where=where,            )else:raise ValueError(f"Unknown query type: {query_type}")return resultdefcreate_fts_index(self, fields: List[str]):"""创建全文搜索索引"""        self._ensure_table_ready()        self.table.create_fts_index(fields)def_create_scalar_index(self, column: str):"""创建标量索引"""        self._ensure_table_ready()        self.table.create_scalar_index(column)

批量存储优化

classBaseEmbeddingStore(StorageBase):defbatch_store(self, data: List[Dict[strAny]]) -> None:"""批量存储数据(优化性能)"""        self._ensure_table_ready()ifnot data:return# 应用默认值        data = self._apply_default_values(data)# 生成向量        texts_to_embed = [item.get(self.vector_source_name, ""for item in data]        vectors = self.model.embed_documents(texts_to_embed)# 添加向量到数据for item, vector inzip(data, vectors):            item[self.vector_column_name] = vector# 批量插入with self._write_lock:            self.table.add(data)defupsert(        self,         data: List[Dict[strAny]],         on_conflict_columns: List[str] = None,) -> None:"""Upsert 数据(更新或插入)"""        self._ensure_table_ready()ifnot data:return# 应用默认值        data = self._apply_default_values(data)# 生成向量        texts_to_embed = [item.get(self.vector_source_name, ""for item in data]        vectors = self.model.embed_documents(texts_to_embed)for item, vector inzip(data, vectors):            item[self.vector_column_name] = vector# Upsert 操作with self._write_lock:if on_conflict_columns:                self.table.upsert(data, on_conflict=on_conflict_columns)elif self._unique_columns:                self.table.upsert(data, on_conflict=self._unique_columns)else:                self.table.add(data)deftruncate(self) -> None:"""删除整个表并重置状态(管理员操作)"""with self._table_lock:            self.db.drop_table(self.table_name, ignore_missing=True)            self._shared.table = None            self._shared.initialized = Falsedeftruncate_scoped(self) -> None:"""删除当前连接可见的所有行        在 LOGICAL 隔离模式下,后端自动范围化删除到当前 datasource_id        在 PHYSICAL 模式下,这等同于 truncate()        """from datus.storage.backend_holder import get_isolation_type        self._ensure_table_ready()if get_isolation_type() == "logical":# LOGICAL 模式 - 通过后端的 _ds_where() 范围化删除            self.table.delete(self.table._ds_where())else:# PHYSICAL 模式 - 删除整个表            self.truncate()

RAG 系统详解

RAG 架构

┌─────────────────────────────────────────────────────────────┐│                      RAG System                              │├─────────────────────────────────────────────────────────────┤│                                                              ││  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐ ││  │   Query      │     │   Retrieve   │     │   Generate   │ ││  │  Processing  │────▶│     (RAG)    │────▶│   Response   │ ││  └──────────────┘     └──────────────┘     └──────────────┘ ││                            │                                   ││         ┌──────────────────┼──────────────────┐               ││         ▼                  ▼                  ▼               ││  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       ││  │   Schema     │  │   Semantic   │  │   Reference  │       ││  │   Metadata   │  │    Model     │  │     SQL      │       ││  │     RAG      │  │     RAG      │  │     RAG      │       ││  └──────────────┘  └──────────────┘  └──────────────┘       ││         │                  │                  │               ││         ▼                  ▼                  ▼               ││  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       ││  │   Ext        │  │   Document   │  │   Feedback   │       ││  │  Knowledge   │  │    Search    │  │    Search    │       ││  │     RAG      │  │              │  │              │       ││  └──────────────┘  └──────────────┘  └──────────────┘       ││                                                              │└─────────────────────────────────────────────────────────────┘

SchemaMetadataRAG

classSchemaWithValueRAG:"""Schema 元数据 RAG 接口"""def__init__(self, agent_config: AgentConfig):from datus.storage.registry import get_storagefrom datus.storage.rag_scope import _build_sub_agent_filter        self.agent_config = agent_config        self.datasource_id = agent_config.current_namespace or""# 获取存储实例        self.storage: SchemaStorage = get_storage(            SchemaStorage, "schema_metadata"            namespace=self.datasource_id        )# 构建 sub-agent 过滤器        self._sub_agent_filter = Nonedefsearch_similar(        self,        query_text: str,        catalog_name: str = "",        database_name: str = "",        schema_name: str = "",        top_n: int = 5,        table_type: TABLE_TYPE = "table",        query_type: str = "vector",) -> pa.Table:"""搜索相似的表 schema"""# 构建 WHERE 子句        where = _build_where_clause(            catalog_name=catalog_name,            database_name=database_name,            schema_name=schema_name,            table_type=table_type,        )return self.storage.do_search_similar(            query_text=query_text,            top_n=top_n,            where=where,            query_type=query_type,        )defsearch_top_tables_by_every_schema(        self,        query_text: str,        database_name: str = "",        catalog_name: str = "",        all_schemas: Optional[Set[str]] = None,        top_n: int = 20,) -> pa.Table:"""从每个 schema 搜索 top N 表"""if all_schemas isNone:            all_schemas = self.storage.search_all_schemas(                catalog_name=catalog_name,                 database_name=database_name            )        results = []for schema in all_schemas:            result = self.storage.search_similar(                query_text=query_text,                database_name=database_name,                catalog_name=catalog_name,                schema_name=schema,                top_n=top_n,            )            results.append(result)return pa.concat_tables(results, promote_options="default")defget_schema_size(self) -> int:"""获取 schema 数量"""        self.storage._ensure_table_ready()return self.storage.table.count_rows()defget_value_size(self) -> int:"""获取值数量(列级别)"""# 计算所有表的总列数        schemas = self.storage._search_all(select_fields=["definition"])        total_columns = 0for schema in schemas["definition"].to_pylist():# 解析 CREATE TABLE 语句,计算列数            total_columns += self._count_columns_in_schema(schema)return total_columns

SemanticModelRAG

classSemanticModelRAG:"""语义模型 RAG 接口"""def__init__(        self,        agent_config: "AgentConfig",        sub_agent_name: Optional[str] = None,        datasource_id: Optional[str] = None,):from datus.storage.rag_scope import _build_sub_agent_filterfrom datus.storage.registry import get_storage        self.datasource_id = datasource_id or agent_config.current_namespace or""        self.storage: SemanticModelStorage = get_storage(            SemanticModelStorage, "semantic_model"            namespace=self.datasource_id        )# 构建 sub-agent 过滤器        self._sub_agent_filter = _build_sub_agent_filter(            agent_config,             sub_agent_name,             self.storage, "tables"        )def_sub_agent_conditions(self) -> list:"""构建 sub-agent 过滤条件"""        conditions = []if self._sub_agent_filter:            conditions.append(self._sub_agent_filter)return conditionsdefsearch_objects(        self,        query_text: str,        kinds: Optional[List[str]] = None,        table_name: Optional[str] = None,        top_n: int = 10,) -> List[Dict[strAny]]:"""搜索语义对象"""        conditions = []if kinds:            conditions.append(in_("kind", kinds))if table_name:            conditions.append(eq("table_name", table_name))        where_clause = And(conditions) if conditions elseNone        results = self.storage.search(            query_txt=query_text,            top_n=top_n,            where=where_clause,        ).to_pylist()return resultsdefget_semantic_model(        self,        catalog_name: str = "",        database_name: str = "",        schema_name: str = "",        table_name: str = "",        select_fields: Optional[List[str]] = None,) -> Optional[Dict[strAny]]:"""从粒度存储重建语义模型对象"""ifnot table_name:            logger.warning("get_semantic_model called without table_name")returnNone        base_conds = self._sub_agent_conditions()# 构建表过滤条件        table_conds = [            eq("kind""table"),             eq("table_name", table_name)        ] + base_condsif catalog_name:            table_conds.append(eq("catalog_name", catalog_name))if database_name:            table_conds.append(eq("database_name", database_name))if schema_name:            table_conds.append(eq("schema_name", schema_name))# 搜索表对象        table_objs = self.storage._search_all(where=And(table_conds)).to_pylist()# 回退 1:宽泛匹配ifnot table_objs and (catalog_name or database_name or schema_name):            logger.debug(f"Semantic model not found for {table_name} with full filters")            broad_conds = [                eq("kind""table"),                eq("table_name", table_name),            ] + base_conds            table_objs = self.storage._search_all(where=And(broad_conds)).to_pylist()# 回退 2:不区分大小写ifnot table_objs:if table_name.lower() != table_name:                lower_conds = [                    eq("kind""table"),                    eq("table_name", table_name.lower()),                ] + base_conds                table_objs = self.storage._search_all(where=And(lower_conds)).to_pylist()ifnot table_objs:returnNone# 获取表对象        semantic_model = table_objs[0]        model_name = semantic_model.get("name", table_name)# 查找子列        children_conds = [            eq("kind""column"),            eq("table_name", semantic_model.get("table_name", table_name)),        ] + base_conds        column_objs = self.storage._search_all(where=And(children_conds)).to_pylist()# 构建完整的语义模型        semantic_model["columns"] = column_objsreturn semantic_modeldefget_size(self) -> int:"""获取语义对象数量"""        self.storage._ensure_table_ready()return self.storage.table.count_rows()

ExtKnowledgeRAG

classExtKnowledgeRAG:"""外部知识 RAG 接口"""def__init__(        self,        agent_config: "AgentConfig",        sub_agent_name: Optional[str] = None,):from datus.storage.rag_scope import _build_sub_agent_filterfrom datus.storage.registry import get_storage        self.datasource_id = agent_config.current_namespace or""        self.store: ExtKnowledgeStore = get_storage(            ExtKnowledgeStore, "ext_knowledge"            namespace=self.datasource_id        )# 构建 sub-agent 过滤器        self._sub_agent_filter = _build_sub_agent_filter(            agent_config,             sub_agent_name,             self.store, "ext_knowledge"        )defsearch_knowledge(        self,        query_text: str,        subject_path: Optional[List[str]] = None,        top_n: int = 5,) -> List[Dict[strAny]]:"""搜索知识"""        conditions = []if self._sub_agent_filter:            conditions.append(self._sub_agent_filter)if subject_path:# 构建 subject 路径过滤            conditions.append(self.store.build_subject_filter(subject_path))        where_clause = And(conditions) if conditions elseNone        results = self.store.search(            query_txt=query_text,            top_n=top_n,            where=where_clause,        ).to_pylist()return resultsdefget_knowledge_size(self) -> int:"""获取知识条目数量"""        self.store._ensure_table_ready()return self.store.table.count_rows()

Subject Tree 层级管理

Subject Tree 架构

┌─────────────────────────────────────────────────────────────┐│                    Subject Tree Structure                     │├─────────────────────────────────────────────────────────────┤│                                                              ││  ┌──────────────────────────────────────────────────────┐  ││  │                    Root                                │  ││  │                     │                                  │  ││  │         ┌───────────┼───────────┐                     │  ││  │         ▼           ▼           ▼                     │  ││  │    ┌────────┐  ┌────────┐  ┌────────┐                │  ││  │    │Finance │  │  Sales │  │   HR   │                │  ││  │    │   │    │  │   │    │  │   │    │                │  ││  │    │   ▼    │  │   ▼    │  │   ▼    │                │  ││  │    │Revenue │  │ Orders │  │ Hiring │                │  ││  │    │   │    │  │   │    │  │   │    │                │  ││  │    │   ▼    │  │   ▼    │  │   ▼    │                │  ││  │    │ Q1/Q2  │  │Returns │  │Onboard │                │  ││  │    └────────┘  └────────┘  └────────┘                │  ││  └──────────────────────────────────────────────────────┘  ││                                                              ││  Subject Path Examples:                                      ││  - ["Finance", "Revenue", "Q1"]                             ││  - ["Sales", "Orders", "Returns"]                           ││  - ["HR", "Hiring", "Onboard"]                              ││                                                              │└─────────────────────────────────────────────────────────────┘

Subject Tree 存储

classBaseSubjectEmbeddingStore(BaseEmbeddingStore):"""基于 Subject Tree 的嵌入存储基类"""def__init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 初始化 Subject Treefrom datus.storage.subject_tree.store import SubjectTreeStore        self.subject_tree = SubjectTreeStore(            db=self.db,            table_prefix=self.table_prefix,        )defcreate_subject_index(self):"""创建 Subject Tree 索引"""        self.subject_tree.create_indices()defget_subject_path_by_id(self, subject_node_id: str) -> List[str]:"""根据节点 ID 获取 subject 路径"""return self.subject_tree.get_path_by_node_id(subject_node_id)defget_node_id_by_path(self, subject_path: List[str]) -> Optional[str]:"""根据路径获取节点 ID"""return self.subject_tree.get_node_id_by_path(subject_path)classSubjectTreeStore(StorageBase):"""Subject Tree 存储"""def__init__(self, db: Optional[VectorDatabase] = None, table_prefix: str = ""):super().__init__(db=db)        self.table_name = f"{table_prefix}subject_tree"# 创建表 schema        self._schema = pa.schema([            pa.field("node_id", pa.string()),        # 唯一节点 ID            pa.field("parent_node_id", pa.string()), # 父节点 ID            pa.field("name", pa.string()),           # 节点名称            pa.field("path", pa.string()),           # 完整路径(JSON)            pa.field("depth", pa.int32()),           # 深度            pa.field("created_at", pa.string()),     # 创建时间        ])        self._shared = _SharedTableState()        self._table_lock = Lock()defcreate_indices(self):"""创建索引"""        self._ensure_table_ready()# 标量索引        self._create_scalar_index("node_id")        self._create_scalar_index("parent_node_id")        self._create_scalar_index("depth")# FTS 索引        self.create_fts_index(["name"])deffind_or_create_path(self, subject_path: List[str]) -> str:"""查找或创建 subject 路径        Returns:            node_id: 节点 ID        """ifnot subject_path:raise ValueError("subject_path cannot be empty")# 检查是否已存在        existing = self._find_by_path(subject_path)if existing:return existing["node_id"]# 递归创建路径        parent_node_id = Nonefor i, name inenumerate(subject_path):            current_path = subject_path[:i + 1]# 检查当前层级是否存在            existing = self._find_by_path(current_path)if existing:                parent_node_id = existing["node_id"]continue# 创建新节点            node_id = self._create_node(                name=name,                parent_node_id=parent_node_id,                path=current_path,                depth=i + 1,            )            parent_node_id = node_idreturn parent_node_iddef_find_by_path(self, subject_path: List[str]) -> Optional[Dict]:"""根据路径查找节点"""        self._ensure_table_ready()        path_json = json.dumps(subject_path)        result = self.table.search_all(            where=eq("path", path_json),            select_fields=["node_id""parent_node_id""name""path""depth"],            limit=1,        )if result.num_rows > 0:return result.to_pylist()[0]returnNonedef_create_node(        self,        name: str,        parent_node_id: Optional[str],        path: List[str],        depth: int,) -> str:"""创建新节点"""import uuid        node_id = str(uuid.uuid4())        data = {"node_id": node_id,"parent_node_id": parent_node_id,"name": name,"path": json.dumps(path),"depth": depth,"created_at": self._get_current_timestamp(),        }        self.table.add([data])return node_iddefget_path_by_node_id(self, node_id: str) -> List[str]:"""根据节点 ID 获取路径"""        self._ensure_table_ready()        result = self.table.search_all(            where=eq("node_id", node_id),            select_fields=["path"],            limit=1,        )if result.num_rows > 0:            path_json = result["path"][0].as_py()return json.loads(path_json)return []defget_children(self, parent_node_id: Optional[str] = None) -> List[Dict]:"""获取子节点"""        self._ensure_table_ready()        where = eq("parent_node_id", parent_node_id) if parent_node_id elseNone        result = self.table.search_all(            where=where,            select_fields=["node_id""name""path""depth"],        )return result.to_pylist()defget_tree(self, root_node_id: Optional[str] = None) -> Dict:"""获取完整树结构"""        children = self.get_children(root_node_id)        tree = {}for child in children:            node_id = child["node_id"]            child_tree = self.get_tree(node_id)            tree[child["name"]] = child_tree or {}return tree

ScopedFilterBuilder

classScopedFilterBuilder:"""构建范围过滤器的工具类"""    @staticmethoddefbuild_table_filter(        tables: List[str],        dialect: str = "",) -> Optional[Node]:"""构建表过滤器        Args:            tables: 表名列表(支持 "schema.table" 格式)            dialect: 数据库方言        Returns:            过滤条件节点        """ifnot tables:returnNone# 解析表名        parsed_tables = []for table in tables:if"."in table:                parts = table.split(".")iflen(parts) == 2:                    parsed_tables.append({"schema_name": parts[0],"table_name": parts[1],                    })eliflen(parts) == 3:                    parsed_tables.append({"database_name": parts[0],"schema_name": parts[1],"table_name": parts[2],                    })else:                parsed_tables.append({"table_name": table})# 构建 OR 条件        conditions = []for parsed in parsed_tables:            cond_parts = []if"schema_name"in parsed:                cond_parts.append(eq("schema_name", parsed["schema_name"]))if"table_name"in parsed:                cond_parts.append(eq("table_name", parsed["table_name"]))if"database_name"in parsed:                cond_parts.append(eq("database_name", parsed["database_name"]))            conditions.append(And(cond_parts))return Or(conditions) if conditions elseNone    @staticmethoddefbuild_subject_filter(        subject_paths: List[List[str]],        subject_tree: SubjectTreeStore,) -> Optional[Node]:"""构建 subject 路径过滤器        Args:            subject_paths: subject 路径列表            subject_tree: Subject Tree 存储        Returns:            过滤条件节点        """ifnot subject_paths:returnNone# 获取所有匹配的节点 ID        node_ids = []for path in subject_paths:            node_id = subject_tree.get_node_id_by_path(path)if node_id:                node_ids.append(node_id)ifnot node_ids:returnNone# 构建 IN 条件return in_("subject_node_id", node_ids)

知识存储模块

SchemaMetadata 存储

classSchemaStorage(BaseMetadataStorage):"""存储和管理 schema lineage 数据"""def__init__(self, embedding_model: EmbeddingModel, **kwargs):super().__init__(            table_name="schema_metadata",            embedding_model=embedding_model,            vector_source_name="definition",            **kwargs,        )def_extract_table_name(self, schema_text: str) -> str:"""从 CREATE TABLE 语句提取表名"""        words = schema_text.split()iflen(words) < 3or words[0].upper() != "CREATE"or words[1].upper() != "TABLE":return""        idx = 2# 跳过 IF NOT EXISTSif idx + 2 < len(words) and words[idx].upper() == "IF"and words[idx + 1].upper() == "NOT":            idx += 3if idx >= len(words):return""        name = words[idx]# 处理 "mytable(id INT)" 格式        paren_pos = name.find("(")if paren_pos > 0:            name = name[:paren_pos]return name.strip("()").strip()defsearch_all_schemas(        self,         database_name: str = ""        catalog_name: str = "") -> Set[str]:"""搜索所有 schema 名称"""        search_result = self._search_all(            where=_build_where_clause(                database_name=database_name,                 catalog_name=catalog_name            ),            select_fields=["schema_name"],        )returnset(search_result["schema_name"].to_pylist())defsearch_top_tables_by_every_schema(        self,        query_text: str,        database_name: str = "",        catalog_name: str = "",        all_schemas: Optional[Set[str]] = None,        top_n: int = 20,) -> pa.Table:"""从每个 schema 搜索 top N 表"""if all_schemas isNone:            all_schemas = self.search_all_schemas(                catalog_name=catalog_name,                 database_name=database_name            )        results = []for schema in all_schemas:            result = self.search_similar(                query_text=query_text,                database_name=database_name,                catalog_name=catalog_name,                schema_name=schema,                top_n=top_n,            )            results.append(result)return pa.concat_tables(results, promote_options="default")

ExtKnowledge 存储

classExtKnowledgeStore(BaseSubjectEmbeddingStore):"""存储和管理外部业务知识"""def__init__(self, embedding_model: EmbeddingModel, **kwargs):super().__init__(            table_name="ext_knowledge",            embedding_model=embedding_model,            schema=pa.schema(                base_schema_columns()                + [                    pa.field("id", pa.string()),                    pa.field("search_text", pa.string()),                    pa.field("explanation", pa.string()),                    pa.field("vector", pa.list_(pa.float32(), list_size=embedding_model.dim_size)),                ]            ),            vector_source_name="search_text",            unique_columns=["id"],            **kwargs,        )defcreate_indices(self):"""创建索引"""# 使用基类方法创建 subject 索引        self.create_subject_index()# 为知识特定字段创建 FTS 索引        self._ensure_table_ready()        self.create_fts_index(["search_text""explanation"])defbatch_store_knowledge(self, knowledge_entries: List[Dict]) -> None:"""批量存储知识条目"""ifnot knowledge_entries:return# 验证和过滤条目,添加 id 字段        valid_entries = []for entry in knowledge_entries:            subject_path = entry.get("subject_path", [])            name = entry.get("name")            search_text = entry.get("search_text""")            explanation = entry.get("explanation""")# 验证必填字段ifnotall([subject_path, name, search_text, explanation]):                logger.warning(f"Skipping entry with missing required fields: {entry}")continue# 从 subject_path + name 生成 id            entry_with_id = entry.copy()            entry_with_id["id"] = gen_subject_item_id(subject_path, name)            valid_entries.append(entry_with_id)# 使用基类的 batch_store 方法        self.batch_store(valid_entries)defstore_knowledge(        self,        subject_path: List[str],        name: str,        search_text: str,        explanation: str,):"""存储单个知识条目"""# 查找或创建 subject tree 路径        subject_node_id = self.subject_tree.find_or_create_path(subject_path)# 从 subject_path + name 生成 id        knowledge_id = gen_subject_item_id(subject_path, name)        data = [            {"id": knowledge_id,"subject_node_id": subject_node_id,"name": name,"search_text": search_text,"explanation": explanation,"created_at": self._get_current_timestamp(),            }        ]        self.store_batch(data)

多租户隔离

隔离模式

classIsolationType(str, Enum):"""多租户隔离类型"""    PHYSICAL = "physical"# 物理隔离:每个租户独立数据库    LOGICAL = "logical"# 逻辑隔离:共享数据库,通过 datasource_id 过滤defget_isolation_type() -> str:"""获取当前隔离类型"""# 从配置或环境变量读取return os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)

范围化存储

classBaseEmbeddingStore(StorageBase):deftruncate_scoped(self) -> None:"""删除当前连接可见的所有行        LOGICAL 隔离模式:后端自动范围化删除到当前 datasource_id        PHYSICAL 隔离模式:删除整个表        """from datus.storage.backend_holder import get_isolation_type        self._ensure_table_ready()if get_isolation_type() == "logical":# LOGICAL 模式 - 通过后端的 _ds_where() 范围化删除            self.table.delete(self.table._ds_where())else:# PHYSICAL 模式 - 删除整个表            self.truncate()def_search_all(        self,         where: WhereExpr = None        select_fields: Optional[List[str]] = None,        limit: Optional[int] = None,) -> pa.Table:"""搜索所有数据(自动应用范围过滤)"""        self._ensure_table_ready()# 在 LOGICAL 模式下自动应用 datasource_id 过滤if get_isolation_type() == "logical":            ds_where = self.table._ds_where()if where:                where = And([where, ds_where])else:                where = ds_where# ... 继续搜索逻辑

后端连接管理

# backend_holder.py_vector_db: Optional[VectorDatabase] = None_isolation_type: Optional[str] = Nonedefcreate_vector_connection() -> VectorDatabase:"""创建向量数据库连接"""global _vector_db, _isolation_typeif _vector_db isnotNone:return _vector_db# 从配置读取隔离类型    _isolation_type = os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)# 创建 LanceDB 连接from lancedb import connect    db_path = get_lancedb_path()    _vector_db = connect(db_path)return _vector_dbdefget_isolation_type() -> str:"""获取隔离类型"""if _isolation_type isNone:        _isolation_type = os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)return _isolation_type

Sub-Agent 范围过滤

# rag_scope.pydef_build_sub_agent_filter(    agent_config: "AgentConfig",    sub_agent_name: Optional[str],    storage: "BaseEmbeddingStore",    check_scope_attr: str,) -> Optional[Node]:"""根据 sub-agent 的 scoped context 构建范围过滤器"""ifnot sub_agent_name:returnNone# 获取 sub-agent 配置    raw_config = agent_config.sub_agent_config(sub_agent_name)ifnot raw_config:returnNone    sub_agent_config = SubAgentConfig.model_validate(raw_config)# 检查是否有范围化的上下文ifnot sub_agent_config.has_scoped_context_by(check_scope_attr):returnNone# 获取范围值    scope_value = getattr(sub_agent_config.scoped_context, check_scope_attr, None)ifnot scope_value:returnNone# 根据类型构建过滤器if check_scope_attr == "tables":        dialect = getattr(agent_config, "db_type""")return ScopedFilterBuilder.build_table_filter(scope_value, dialect)elif check_scope_attr in ("metrics""sqls""ext_knowledge"):        subject_tree = getattr(storage, "subject_tree"None)if subject_tree isNone:raise DatusException(                code=ErrorCode.COMMON_VALIDATION_FAILED,                message=(f"Cannot build scope filter for sub-agent '{sub_agent_name}' "f"(scope attr='{check_scope_attr}'): storage has no subject_tree."                ),            )return ScopedFilterBuilder.build_subject_filter(scope_value, subject_tree)returnNone

总结

Datus-Agent 的存储与知识管理系统体现了以下设计原则:

  1. 统一抽象
    :BaseEmbeddingStore 提供通用接口
  2. 延迟初始化
    :使用 _SharedTableState 实现单例模式
  3. 范围化访问
    :支持物理和逻辑多租户隔离
  4. Subject Tree
    :领域层级管理,支持细粒度访问控制
  5. 混合搜索
    :向量搜索 + 全文搜索 + 标量过滤
  6. 批量优化
    :批量存储和嵌入生成

上一模块:工具与连接器架构下一模块:LLM 抽象与多供应商支持