Datus-Agent 深度分析文档 – 模块 4:存储与知识管理
本文档深入分析 Datus-Agent 的存储与知识管理系统,包括 LanceDB 向量存储、RAG 系统、知识图谱、Subject Tree 层级管理。
目录
-
存储架构概览 -
LanceDB 向量存储 -
RAG 系统详解 -
Subject Tree 层级管理 -
知识存储模块 -
多租户隔离
存储架构概览
整体存储架构
┌─────────────────────────────────────────────────────────────────────────┐│ Datus Storage Layer │├─────────────────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Schema │ │ Semantic │ │ Reference │ ││ │ Metadata │ │ Model │ │ SQL │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │ │ │ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Ext │ │ Document │ │ Feedback │ ││ │ Knowledge │ │ Store │ │ Store │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ ││ ┌──────────────────────────────────────────────────────────────────┐ ││ │ Subject Tree Store │ ││ │ (Domain Hierarchy Management) │ ││ └──────────────────────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────────────────┐│ LanceDB Vector Backend ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ ││ │ Vector │ │ Scalar │ │ FTS │ │ RDB │ ││ │ Index │ │ Index │ │ Index │ │ (SQLite) │ ││ └──────────────┘ └──────────────┘ └──────────────┘ └────────────┘ │└─────────────────────────────────────────────────────────────────────────┘
存储目录结构
datus/storage/├── base.py # 存储基类├── backend_holder.py # 后端连接管理├── embedding_models.py # 嵌入模型├── rag_scope.py # RAG 范围控制├── scoped_filter.py # 范围过滤器│├── schema_metadata/ # Schema 元数据存储│ ├── store.py│ ├── local_init.py│ ├── benchmark_init.py│ └── ...│├── semantic_model/ # 语义模型存储│ ├── store.py│ ├── adapter_init.py│ └── ...│├── reference_sql/ # 参考 SQL 存储│ ├── store.py│ └── ...│├── ext_knowledge/ # 外部知识存储│ ├── store.py│ └── ...│├── metric/ # 指标存储│ ├── store.py│ └── ...│├── document/ # 文档存储│ ├── store.py│ └── ...│├── subject_tree/ # Subject Tree 存储│ ├── store.py│ └── ...│├── feedback/ # 反馈存储├── task/ # 任务存储└── rdb/ # 关系数据存储
存储抽象层次
┌─────────────────────────────────────────────────────────────┐│ Storage Abstraction │├─────────────────────────────────────────────────────────────┤│ ││ Layer 1: StorageBase ││ ┌────────────────────────────────────────────────────────┐ ││ │ class StorageBase: │ ││ │ + db: VectorDatabase │ ││ │ + _get_current_timestamp(): str │ ││ └────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ Layer 2: BaseEmbeddingStore ││ ┌────────────────────────────────────────────────────────┐ ││ │ class BaseEmbeddingStore(StorageBase): │ ││ │ + model: EmbeddingModel │ ││ │ + table_name: str │ ││ │ + vector_source_name: str │ ││ │ + search(query, top_n, where): pa.Table │ ││ │ + batch_store(data): None │ ││ │ + truncate(): None │ ││ └────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ Layer 3: Domain-Specific Stores ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ SchemaStorage│ │SemanticModel │ │ ExtKnowledge │ ││ │ │ │ Storage │ │ Store │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │└─────────────────────────────────────────────────────────────┘
LanceDB 向量存储
LanceDB 简介
Datus-Agent 使用 LanceDB 作为向量数据库后端,原因:
- 本地优先
:支持本地文件存储,无需额外服务 - 混合搜索
:同时支持向量搜索和标量过滤 - 全文搜索
:内置 FTS(Full-Text Search)支持 - 列式存储
:基于 Apache Arrow/Parquet,高效压缩 - 多版本控制
:支持 ACID 事务和时间旅行
存储基类实现
classStorageBase:"""所有存储组件的基类"""def__init__(self, db: Optional[VectorDatabase] = None):"""初始化存储基类 Args: db: 可选的预创建 VectorDatabase 连接 如果提供,直接使用;否则使用全局命名空间连接 """if db isnotNone: self.db: VectorDatabase = dbelse:from datus.storage.backend_holder import create_vector_connection self.db = create_vector_connection()def_get_current_timestamp(self) -> str:"""获取当前时间戳(UTC ISO 格式)"""return datetime.now(timezone.utc).isoformat()classBaseEmbeddingStore(StorageBase):"""所有嵌入存储的基类"""def__init__( self, table_name: str, embedding_model: EmbeddingModel, on_duplicate_columns: str = "vector", schema: Optional[pa.Schema] = None, vector_source_name: str = "definition", vector_column_name: str = "vector", unique_columns: Optional[List[str]] = None, db: Optional[VectorDatabase] = None, table_prefix: str = "", extra_fields: Optional[List[pa.Field]] = None, default_values: Optional[Dict[str, Any]] = None, scope_indices: Optional[List[str]] = None,):super().__init__(db=db) self.model = embedding_model self.batch_size = embedding_model.batch_size self.table_name = f"{table_prefix}{table_name}"if table_prefix else table_name self.vector_source_name = vector_source_name self.vector_column_name = vector_column_name self.on_duplicate_columns = on_duplicate_columns# 追加额外字段到 schemaif schema isnotNoneand extra_fields: schema = pa.schema(list(schema) + extra_fields)# 确保 datasource_id 字段存在(用于基于 subject-tree 的存储)if schema isnotNone: existing_names = {f.name for f in schema}if"datasource_id"notin existing_names: schema = pa.schema(list(schema) + [pa.field("datasource_id", pa.string())]) self._schema = schema self._unique_columns = unique_columns self._default_values: Dict[str, Any] = dict(default_values) if default_values else {} self._scope_indices: List[str] = list(scope_indices or [])# 延迟表初始化 self._shared = _SharedTableState() self._table_lock = Lock() self._write_lock = Lock() @propertydeftable(self) -> Optional[VectorTable]:return self._shared.table @table.setterdeftable(self, value: Optional[VectorTable]): self._shared.table = valuedef_ensure_table_ready(self):"""确保表已准备好,带有适当的错误处理"""if self._shared.initialized:returnwith self._table_lock:if self._shared.initialized:return# 检查嵌入模型可用性 self._check_embedding_model_ready()# 使用嵌入函数初始化表 self._ensure_table(self._schema) self._shared.initialized = True# 自动为范围字段创建标量索引for col in self._scope_indices: self._create_scalar_index(col)def_apply_default_values(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""为缺失默认值的行填充默认值"""ifnot self._default_values:return datafor row in data:for k, v in self._default_values.items(): row.setdefault(k, v)return data
向量搜索实现
classBaseEmbeddingStore(StorageBase):def_search_all( self, where: WhereExpr = None, select_fields: Optional[List[str]] = None, limit: Optional[int] = None,) -> pa.Table:"""搜索所有数据(带过滤和字段选择)""" self._ensure_table_ready()if limit: row_limit = limitelse: row_limit = self.table.count_rows(where) if where else self.table.count_rows() result = self.table.search_all( where=where, select_fields=select_fields, limit=row_limit )# 移除向量列(除非明确请求)if self.vector_column_name in result.column_names: result = result.drop([self.vector_column_name])return resultdefsearch( self, query_text: str, top_n: int = 10, where: WhereExpr = None, query_type: str = "vector",) -> pa.Table:"""执行搜索(支持向量和全文)""" self._ensure_table_ready()# 生成查询向量 query_vector = self.model.embed_query(query_text)if query_type == "vector":# 向量相似性搜索 result = self.table.search( query_vector=query_vector, vector_column=self.vector_column_name, top_n=top_n, where=where, )elif query_type == "fts":# 全文搜索 result = self.table.fts_search( query=query_text, top_n=top_n, where=where, )elif query_type == "hybrid":# 混合搜索(向量 + 全文) result = self.table.hybrid_search( query_text=query_text, query_vector=query_vector, top_n=top_n, where=where, )else:raise ValueError(f"Unknown query type: {query_type}")return resultdefcreate_fts_index(self, fields: List[str]):"""创建全文搜索索引""" self._ensure_table_ready() self.table.create_fts_index(fields)def_create_scalar_index(self, column: str):"""创建标量索引""" self._ensure_table_ready() self.table.create_scalar_index(column)
批量存储优化
classBaseEmbeddingStore(StorageBase):defbatch_store(self, data: List[Dict[str, Any]]) -> None:"""批量存储数据(优化性能)""" self._ensure_table_ready()ifnot data:return# 应用默认值 data = self._apply_default_values(data)# 生成向量 texts_to_embed = [item.get(self.vector_source_name, "") for item in data] vectors = self.model.embed_documents(texts_to_embed)# 添加向量到数据for item, vector inzip(data, vectors): item[self.vector_column_name] = vector# 批量插入with self._write_lock: self.table.add(data)defupsert( self, data: List[Dict[str, Any]], on_conflict_columns: List[str] = None,) -> None:"""Upsert 数据(更新或插入)""" self._ensure_table_ready()ifnot data:return# 应用默认值 data = self._apply_default_values(data)# 生成向量 texts_to_embed = [item.get(self.vector_source_name, "") for item in data] vectors = self.model.embed_documents(texts_to_embed)for item, vector inzip(data, vectors): item[self.vector_column_name] = vector# Upsert 操作with self._write_lock:if on_conflict_columns: self.table.upsert(data, on_conflict=on_conflict_columns)elif self._unique_columns: self.table.upsert(data, on_conflict=self._unique_columns)else: self.table.add(data)deftruncate(self) -> None:"""删除整个表并重置状态(管理员操作)"""with self._table_lock: self.db.drop_table(self.table_name, ignore_missing=True) self._shared.table = None self._shared.initialized = Falsedeftruncate_scoped(self) -> None:"""删除当前连接可见的所有行 在 LOGICAL 隔离模式下,后端自动范围化删除到当前 datasource_id 在 PHYSICAL 模式下,这等同于 truncate() """from datus.storage.backend_holder import get_isolation_type self._ensure_table_ready()if get_isolation_type() == "logical":# LOGICAL 模式 - 通过后端的 _ds_where() 范围化删除 self.table.delete(self.table._ds_where())else:# PHYSICAL 模式 - 删除整个表 self.truncate()
RAG 系统详解
RAG 架构
┌─────────────────────────────────────────────────────────────┐│ RAG System │├─────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Query │ │ Retrieve │ │ Generate │ ││ │ Processing │────▶│ (RAG) │────▶│ Response │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │ ││ ┌──────────────────┼──────────────────┐ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Schema │ │ Semantic │ │ Reference │ ││ │ Metadata │ │ Model │ │ SQL │ ││ │ RAG │ │ RAG │ │ RAG │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │ │ │ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Ext │ │ Document │ │ Feedback │ ││ │ Knowledge │ │ Search │ │ Search │ ││ │ RAG │ │ │ │ │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ │└─────────────────────────────────────────────────────────────┘
SchemaMetadataRAG
classSchemaWithValueRAG:"""Schema 元数据 RAG 接口"""def__init__(self, agent_config: AgentConfig):from datus.storage.registry import get_storagefrom datus.storage.rag_scope import _build_sub_agent_filter self.agent_config = agent_config self.datasource_id = agent_config.current_namespace or""# 获取存储实例 self.storage: SchemaStorage = get_storage( SchemaStorage, "schema_metadata", namespace=self.datasource_id )# 构建 sub-agent 过滤器 self._sub_agent_filter = Nonedefsearch_similar( self, query_text: str, catalog_name: str = "", database_name: str = "", schema_name: str = "", top_n: int = 5, table_type: TABLE_TYPE = "table", query_type: str = "vector",) -> pa.Table:"""搜索相似的表 schema"""# 构建 WHERE 子句 where = _build_where_clause( catalog_name=catalog_name, database_name=database_name, schema_name=schema_name, table_type=table_type, )return self.storage.do_search_similar( query_text=query_text, top_n=top_n, where=where, query_type=query_type, )defsearch_top_tables_by_every_schema( self, query_text: str, database_name: str = "", catalog_name: str = "", all_schemas: Optional[Set[str]] = None, top_n: int = 20,) -> pa.Table:"""从每个 schema 搜索 top N 表"""if all_schemas isNone: all_schemas = self.storage.search_all_schemas( catalog_name=catalog_name, database_name=database_name ) results = []for schema in all_schemas: result = self.storage.search_similar( query_text=query_text, database_name=database_name, catalog_name=catalog_name, schema_name=schema, top_n=top_n, ) results.append(result)return pa.concat_tables(results, promote_options="default")defget_schema_size(self) -> int:"""获取 schema 数量""" self.storage._ensure_table_ready()return self.storage.table.count_rows()defget_value_size(self) -> int:"""获取值数量(列级别)"""# 计算所有表的总列数 schemas = self.storage._search_all(select_fields=["definition"]) total_columns = 0for schema in schemas["definition"].to_pylist():# 解析 CREATE TABLE 语句,计算列数 total_columns += self._count_columns_in_schema(schema)return total_columns
SemanticModelRAG
classSemanticModelRAG:"""语义模型 RAG 接口"""def__init__( self, agent_config: "AgentConfig", sub_agent_name: Optional[str] = None, datasource_id: Optional[str] = None,):from datus.storage.rag_scope import _build_sub_agent_filterfrom datus.storage.registry import get_storage self.datasource_id = datasource_id or agent_config.current_namespace or"" self.storage: SemanticModelStorage = get_storage( SemanticModelStorage, "semantic_model", namespace=self.datasource_id )# 构建 sub-agent 过滤器 self._sub_agent_filter = _build_sub_agent_filter( agent_config, sub_agent_name, self.storage, "tables" )def_sub_agent_conditions(self) -> list:"""构建 sub-agent 过滤条件""" conditions = []if self._sub_agent_filter: conditions.append(self._sub_agent_filter)return conditionsdefsearch_objects( self, query_text: str, kinds: Optional[List[str]] = None, table_name: Optional[str] = None, top_n: int = 10,) -> List[Dict[str, Any]]:"""搜索语义对象""" conditions = []if kinds: conditions.append(in_("kind", kinds))if table_name: conditions.append(eq("table_name", table_name)) where_clause = And(conditions) if conditions elseNone results = self.storage.search( query_txt=query_text, top_n=top_n, where=where_clause, ).to_pylist()return resultsdefget_semantic_model( self, catalog_name: str = "", database_name: str = "", schema_name: str = "", table_name: str = "", select_fields: Optional[List[str]] = None,) -> Optional[Dict[str, Any]]:"""从粒度存储重建语义模型对象"""ifnot table_name: logger.warning("get_semantic_model called without table_name")returnNone base_conds = self._sub_agent_conditions()# 构建表过滤条件 table_conds = [ eq("kind", "table"), eq("table_name", table_name) ] + base_condsif catalog_name: table_conds.append(eq("catalog_name", catalog_name))if database_name: table_conds.append(eq("database_name", database_name))if schema_name: table_conds.append(eq("schema_name", schema_name))# 搜索表对象 table_objs = self.storage._search_all(where=And(table_conds)).to_pylist()# 回退 1:宽泛匹配ifnot table_objs and (catalog_name or database_name or schema_name): logger.debug(f"Semantic model not found for {table_name} with full filters") broad_conds = [ eq("kind", "table"), eq("table_name", table_name), ] + base_conds table_objs = self.storage._search_all(where=And(broad_conds)).to_pylist()# 回退 2:不区分大小写ifnot table_objs:if table_name.lower() != table_name: lower_conds = [ eq("kind", "table"), eq("table_name", table_name.lower()), ] + base_conds table_objs = self.storage._search_all(where=And(lower_conds)).to_pylist()ifnot table_objs:returnNone# 获取表对象 semantic_model = table_objs[0] model_name = semantic_model.get("name", table_name)# 查找子列 children_conds = [ eq("kind", "column"), eq("table_name", semantic_model.get("table_name", table_name)), ] + base_conds column_objs = self.storage._search_all(where=And(children_conds)).to_pylist()# 构建完整的语义模型 semantic_model["columns"] = column_objsreturn semantic_modeldefget_size(self) -> int:"""获取语义对象数量""" self.storage._ensure_table_ready()return self.storage.table.count_rows()
ExtKnowledgeRAG
classExtKnowledgeRAG:"""外部知识 RAG 接口"""def__init__( self, agent_config: "AgentConfig", sub_agent_name: Optional[str] = None,):from datus.storage.rag_scope import _build_sub_agent_filterfrom datus.storage.registry import get_storage self.datasource_id = agent_config.current_namespace or"" self.store: ExtKnowledgeStore = get_storage( ExtKnowledgeStore, "ext_knowledge", namespace=self.datasource_id )# 构建 sub-agent 过滤器 self._sub_agent_filter = _build_sub_agent_filter( agent_config, sub_agent_name, self.store, "ext_knowledge" )defsearch_knowledge( self, query_text: str, subject_path: Optional[List[str]] = None, top_n: int = 5,) -> List[Dict[str, Any]]:"""搜索知识""" conditions = []if self._sub_agent_filter: conditions.append(self._sub_agent_filter)if subject_path:# 构建 subject 路径过滤 conditions.append(self.store.build_subject_filter(subject_path)) where_clause = And(conditions) if conditions elseNone results = self.store.search( query_txt=query_text, top_n=top_n, where=where_clause, ).to_pylist()return resultsdefget_knowledge_size(self) -> int:"""获取知识条目数量""" self.store._ensure_table_ready()return self.store.table.count_rows()
Subject Tree 层级管理
Subject Tree 架构
┌─────────────────────────────────────────────────────────────┐│ Subject Tree Structure │├─────────────────────────────────────────────────────────────┤│ ││ ┌──────────────────────────────────────────────────────┐ ││ │ Root │ ││ │ │ │ ││ │ ┌───────────┼───────────┐ │ ││ │ ▼ ▼ ▼ │ ││ │ ┌────────┐ ┌────────┐ ┌────────┐ │ ││ │ │Finance │ │ Sales │ │ HR │ │ ││ │ │ │ │ │ │ │ │ │ │ │ ││ │ │ ▼ │ │ ▼ │ │ ▼ │ │ ││ │ │Revenue │ │ Orders │ │ Hiring │ │ ││ │ │ │ │ │ │ │ │ │ │ │ ││ │ │ ▼ │ │ ▼ │ │ ▼ │ │ ││ │ │ Q1/Q2 │ │Returns │ │Onboard │ │ ││ │ └────────┘ └────────┘ └────────┘ │ ││ └──────────────────────────────────────────────────────┘ ││ ││ Subject Path Examples: ││ - ["Finance", "Revenue", "Q1"] ││ - ["Sales", "Orders", "Returns"] ││ - ["HR", "Hiring", "Onboard"] ││ │└─────────────────────────────────────────────────────────────┘
Subject Tree 存储
classBaseSubjectEmbeddingStore(BaseEmbeddingStore):"""基于 Subject Tree 的嵌入存储基类"""def__init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 初始化 Subject Treefrom datus.storage.subject_tree.store import SubjectTreeStore self.subject_tree = SubjectTreeStore( db=self.db, table_prefix=self.table_prefix, )defcreate_subject_index(self):"""创建 Subject Tree 索引""" self.subject_tree.create_indices()defget_subject_path_by_id(self, subject_node_id: str) -> List[str]:"""根据节点 ID 获取 subject 路径"""return self.subject_tree.get_path_by_node_id(subject_node_id)defget_node_id_by_path(self, subject_path: List[str]) -> Optional[str]:"""根据路径获取节点 ID"""return self.subject_tree.get_node_id_by_path(subject_path)classSubjectTreeStore(StorageBase):"""Subject Tree 存储"""def__init__(self, db: Optional[VectorDatabase] = None, table_prefix: str = ""):super().__init__(db=db) self.table_name = f"{table_prefix}subject_tree"# 创建表 schema self._schema = pa.schema([ pa.field("node_id", pa.string()), # 唯一节点 ID pa.field("parent_node_id", pa.string()), # 父节点 ID pa.field("name", pa.string()), # 节点名称 pa.field("path", pa.string()), # 完整路径(JSON) pa.field("depth", pa.int32()), # 深度 pa.field("created_at", pa.string()), # 创建时间 ]) self._shared = _SharedTableState() self._table_lock = Lock()defcreate_indices(self):"""创建索引""" self._ensure_table_ready()# 标量索引 self._create_scalar_index("node_id") self._create_scalar_index("parent_node_id") self._create_scalar_index("depth")# FTS 索引 self.create_fts_index(["name"])deffind_or_create_path(self, subject_path: List[str]) -> str:"""查找或创建 subject 路径 Returns: node_id: 节点 ID """ifnot subject_path:raise ValueError("subject_path cannot be empty")# 检查是否已存在 existing = self._find_by_path(subject_path)if existing:return existing["node_id"]# 递归创建路径 parent_node_id = Nonefor i, name inenumerate(subject_path): current_path = subject_path[:i + 1]# 检查当前层级是否存在 existing = self._find_by_path(current_path)if existing: parent_node_id = existing["node_id"]continue# 创建新节点 node_id = self._create_node( name=name, parent_node_id=parent_node_id, path=current_path, depth=i + 1, ) parent_node_id = node_idreturn parent_node_iddef_find_by_path(self, subject_path: List[str]) -> Optional[Dict]:"""根据路径查找节点""" self._ensure_table_ready() path_json = json.dumps(subject_path) result = self.table.search_all( where=eq("path", path_json), select_fields=["node_id", "parent_node_id", "name", "path", "depth"], limit=1, )if result.num_rows > 0:return result.to_pylist()[0]returnNonedef_create_node( self, name: str, parent_node_id: Optional[str], path: List[str], depth: int,) -> str:"""创建新节点"""import uuid node_id = str(uuid.uuid4()) data = {"node_id": node_id,"parent_node_id": parent_node_id,"name": name,"path": json.dumps(path),"depth": depth,"created_at": self._get_current_timestamp(), } self.table.add([data])return node_iddefget_path_by_node_id(self, node_id: str) -> List[str]:"""根据节点 ID 获取路径""" self._ensure_table_ready() result = self.table.search_all( where=eq("node_id", node_id), select_fields=["path"], limit=1, )if result.num_rows > 0: path_json = result["path"][0].as_py()return json.loads(path_json)return []defget_children(self, parent_node_id: Optional[str] = None) -> List[Dict]:"""获取子节点""" self._ensure_table_ready() where = eq("parent_node_id", parent_node_id) if parent_node_id elseNone result = self.table.search_all( where=where, select_fields=["node_id", "name", "path", "depth"], )return result.to_pylist()defget_tree(self, root_node_id: Optional[str] = None) -> Dict:"""获取完整树结构""" children = self.get_children(root_node_id) tree = {}for child in children: node_id = child["node_id"] child_tree = self.get_tree(node_id) tree[child["name"]] = child_tree or {}return tree
ScopedFilterBuilder
classScopedFilterBuilder:"""构建范围过滤器的工具类""" @staticmethoddefbuild_table_filter( tables: List[str], dialect: str = "",) -> Optional[Node]:"""构建表过滤器 Args: tables: 表名列表(支持 "schema.table" 格式) dialect: 数据库方言 Returns: 过滤条件节点 """ifnot tables:returnNone# 解析表名 parsed_tables = []for table in tables:if"."in table: parts = table.split(".")iflen(parts) == 2: parsed_tables.append({"schema_name": parts[0],"table_name": parts[1], })eliflen(parts) == 3: parsed_tables.append({"database_name": parts[0],"schema_name": parts[1],"table_name": parts[2], })else: parsed_tables.append({"table_name": table})# 构建 OR 条件 conditions = []for parsed in parsed_tables: cond_parts = []if"schema_name"in parsed: cond_parts.append(eq("schema_name", parsed["schema_name"]))if"table_name"in parsed: cond_parts.append(eq("table_name", parsed["table_name"]))if"database_name"in parsed: cond_parts.append(eq("database_name", parsed["database_name"])) conditions.append(And(cond_parts))return Or(conditions) if conditions elseNone @staticmethoddefbuild_subject_filter( subject_paths: List[List[str]], subject_tree: SubjectTreeStore,) -> Optional[Node]:"""构建 subject 路径过滤器 Args: subject_paths: subject 路径列表 subject_tree: Subject Tree 存储 Returns: 过滤条件节点 """ifnot subject_paths:returnNone# 获取所有匹配的节点 ID node_ids = []for path in subject_paths: node_id = subject_tree.get_node_id_by_path(path)if node_id: node_ids.append(node_id)ifnot node_ids:returnNone# 构建 IN 条件return in_("subject_node_id", node_ids)
知识存储模块
SchemaMetadata 存储
classSchemaStorage(BaseMetadataStorage):"""存储和管理 schema lineage 数据"""def__init__(self, embedding_model: EmbeddingModel, **kwargs):super().__init__( table_name="schema_metadata", embedding_model=embedding_model, vector_source_name="definition", **kwargs, )def_extract_table_name(self, schema_text: str) -> str:"""从 CREATE TABLE 语句提取表名""" words = schema_text.split()iflen(words) < 3or words[0].upper() != "CREATE"or words[1].upper() != "TABLE":return"" idx = 2# 跳过 IF NOT EXISTSif idx + 2 < len(words) and words[idx].upper() == "IF"and words[idx + 1].upper() == "NOT": idx += 3if idx >= len(words):return"" name = words[idx]# 处理 "mytable(id INT)" 格式 paren_pos = name.find("(")if paren_pos > 0: name = name[:paren_pos]return name.strip("()").strip()defsearch_all_schemas( self, database_name: str = "", catalog_name: str = "") -> Set[str]:"""搜索所有 schema 名称""" search_result = self._search_all( where=_build_where_clause( database_name=database_name, catalog_name=catalog_name ), select_fields=["schema_name"], )returnset(search_result["schema_name"].to_pylist())defsearch_top_tables_by_every_schema( self, query_text: str, database_name: str = "", catalog_name: str = "", all_schemas: Optional[Set[str]] = None, top_n: int = 20,) -> pa.Table:"""从每个 schema 搜索 top N 表"""if all_schemas isNone: all_schemas = self.search_all_schemas( catalog_name=catalog_name, database_name=database_name ) results = []for schema in all_schemas: result = self.search_similar( query_text=query_text, database_name=database_name, catalog_name=catalog_name, schema_name=schema, top_n=top_n, ) results.append(result)return pa.concat_tables(results, promote_options="default")
ExtKnowledge 存储
classExtKnowledgeStore(BaseSubjectEmbeddingStore):"""存储和管理外部业务知识"""def__init__(self, embedding_model: EmbeddingModel, **kwargs):super().__init__( table_name="ext_knowledge", embedding_model=embedding_model, schema=pa.schema( base_schema_columns() + [ pa.field("id", pa.string()), pa.field("search_text", pa.string()), pa.field("explanation", pa.string()), pa.field("vector", pa.list_(pa.float32(), list_size=embedding_model.dim_size)), ] ), vector_source_name="search_text", unique_columns=["id"], **kwargs, )defcreate_indices(self):"""创建索引"""# 使用基类方法创建 subject 索引 self.create_subject_index()# 为知识特定字段创建 FTS 索引 self._ensure_table_ready() self.create_fts_index(["search_text", "explanation"])defbatch_store_knowledge(self, knowledge_entries: List[Dict]) -> None:"""批量存储知识条目"""ifnot knowledge_entries:return# 验证和过滤条目,添加 id 字段 valid_entries = []for entry in knowledge_entries: subject_path = entry.get("subject_path", []) name = entry.get("name") search_text = entry.get("search_text", "") explanation = entry.get("explanation", "")# 验证必填字段ifnotall([subject_path, name, search_text, explanation]): logger.warning(f"Skipping entry with missing required fields: {entry}")continue# 从 subject_path + name 生成 id entry_with_id = entry.copy() entry_with_id["id"] = gen_subject_item_id(subject_path, name) valid_entries.append(entry_with_id)# 使用基类的 batch_store 方法 self.batch_store(valid_entries)defstore_knowledge( self, subject_path: List[str], name: str, search_text: str, explanation: str,):"""存储单个知识条目"""# 查找或创建 subject tree 路径 subject_node_id = self.subject_tree.find_or_create_path(subject_path)# 从 subject_path + name 生成 id knowledge_id = gen_subject_item_id(subject_path, name) data = [ {"id": knowledge_id,"subject_node_id": subject_node_id,"name": name,"search_text": search_text,"explanation": explanation,"created_at": self._get_current_timestamp(), } ] self.store_batch(data)
多租户隔离
隔离模式
classIsolationType(str, Enum):"""多租户隔离类型""" PHYSICAL = "physical"# 物理隔离:每个租户独立数据库 LOGICAL = "logical"# 逻辑隔离:共享数据库,通过 datasource_id 过滤defget_isolation_type() -> str:"""获取当前隔离类型"""# 从配置或环境变量读取return os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)
范围化存储
classBaseEmbeddingStore(StorageBase):deftruncate_scoped(self) -> None:"""删除当前连接可见的所有行 LOGICAL 隔离模式:后端自动范围化删除到当前 datasource_id PHYSICAL 隔离模式:删除整个表 """from datus.storage.backend_holder import get_isolation_type self._ensure_table_ready()if get_isolation_type() == "logical":# LOGICAL 模式 - 通过后端的 _ds_where() 范围化删除 self.table.delete(self.table._ds_where())else:# PHYSICAL 模式 - 删除整个表 self.truncate()def_search_all( self, where: WhereExpr = None, select_fields: Optional[List[str]] = None, limit: Optional[int] = None,) -> pa.Table:"""搜索所有数据(自动应用范围过滤)""" self._ensure_table_ready()# 在 LOGICAL 模式下自动应用 datasource_id 过滤if get_isolation_type() == "logical": ds_where = self.table._ds_where()if where: where = And([where, ds_where])else: where = ds_where# ... 继续搜索逻辑
后端连接管理
# backend_holder.py_vector_db: Optional[VectorDatabase] = None_isolation_type: Optional[str] = Nonedefcreate_vector_connection() -> VectorDatabase:"""创建向量数据库连接"""global _vector_db, _isolation_typeif _vector_db isnotNone:return _vector_db# 从配置读取隔离类型 _isolation_type = os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)# 创建 LanceDB 连接from lancedb import connect db_path = get_lancedb_path() _vector_db = connect(db_path)return _vector_dbdefget_isolation_type() -> str:"""获取隔离类型"""if _isolation_type isNone: _isolation_type = os.getenv("DATUS_ISOLATION_TYPE", IsolationType.PHYSICAL)return _isolation_type
Sub-Agent 范围过滤
# rag_scope.pydef_build_sub_agent_filter( agent_config: "AgentConfig", sub_agent_name: Optional[str], storage: "BaseEmbeddingStore", check_scope_attr: str,) -> Optional[Node]:"""根据 sub-agent 的 scoped context 构建范围过滤器"""ifnot sub_agent_name:returnNone# 获取 sub-agent 配置 raw_config = agent_config.sub_agent_config(sub_agent_name)ifnot raw_config:returnNone sub_agent_config = SubAgentConfig.model_validate(raw_config)# 检查是否有范围化的上下文ifnot sub_agent_config.has_scoped_context_by(check_scope_attr):returnNone# 获取范围值 scope_value = getattr(sub_agent_config.scoped_context, check_scope_attr, None)ifnot scope_value:returnNone# 根据类型构建过滤器if check_scope_attr == "tables": dialect = getattr(agent_config, "db_type", "")return ScopedFilterBuilder.build_table_filter(scope_value, dialect)elif check_scope_attr in ("metrics", "sqls", "ext_knowledge"): subject_tree = getattr(storage, "subject_tree", None)if subject_tree isNone:raise DatusException( code=ErrorCode.COMMON_VALIDATION_FAILED, message=(f"Cannot build scope filter for sub-agent '{sub_agent_name}' "f"(scope attr='{check_scope_attr}'): storage has no subject_tree." ), )return ScopedFilterBuilder.build_subject_filter(scope_value, subject_tree)returnNone
总结
Datus-Agent 的存储与知识管理系统体现了以下设计原则:
- 统一抽象
:BaseEmbeddingStore 提供通用接口 - 延迟初始化
:使用 _SharedTableState 实现单例模式 - 范围化访问
:支持物理和逻辑多租户隔离 - Subject Tree
:领域层级管理,支持细粒度访问控制 - 混合搜索
:向量搜索 + 全文搜索 + 标量过滤 - 批量优化
:批量存储和嵌入生成
上一模块:工具与连接器架构下一模块:LLM 抽象与多供应商支持
夜雨聆风