接入图谱知识库

graph_memory
Bifang 2026-06-24 15:27:29 +08:00
parent 77e0732648
commit 277a0f9626
32 changed files with 3185 additions and 1667 deletions

View File

@ -143,23 +143,9 @@ python .\main_cli.py --offline
单条离线导入测试:
```powershell
python .\main_cli.py --offline --once "导入 D:\github_project\my_code\meeting_agent\examples\huiyi.txt 知识库 合川分公司"
python .\main_cli.py --offline --once "导入 D:\github_project\my_code\meeting_agent\examples\huiyi.txt"
```
## 知识库选择约定
当前只是演示 demo不做团队隔离。`data/` 下每个子文件夹视作一个独立知识库,例如:
```text
data/
├─ 合川分公司/
└─ 另一个知识库/
```
未来 Web 界面可以列出这些文件夹,让用户选择一个知识库后,把文件夹名作为 `knowledge_base_id` 传给工具进行针对性提问。`team_id` 只是旧兼容参数,后续推荐使用 `knowledge_base_id`
在当前这版 Web 里,系统会把每个知识库自动映射成一个可进入的会话入口;选中对应会话后提问,就会优先基于该知识库回答。
## Agent 工具
- `current_time`:获取当前时间。
@ -170,11 +156,10 @@ data/
- `execute_shell`:执行 shell 命令做环境检查或调试。
- `run_python`:执行简短 Python 代码做快速处理。
- `tool_trace_query`:按需查询本地保存的工具调用轨迹。
- `import_meeting_transcript`:导入会议原文,交给会议整理子 Agent 做项目化整理,并更新指定知识库的 Markdown 台账。
- `query_knowledge`:查询指定知识库周会台账。
- `list_knowledge_bases`:列出 `data/` 下已有知识库。
- `store_meeting_memory`:把会议原文写入长期会议记忆,执行抽取、归档、状态合并与图谱索引。
- `query_meeting_memory`:查询长期会议记忆中的会议上下文、实体关系、行动项和指标信息。
LLM Agent 会在对话中自动选择这些工具。会议整理本身由 `prompt/zh/meeting_digest.yaml` 驱动的子 Agent 完成,主 Agent 只负责调工具和查询台账
LLM Agent 会在对话中自动选择这些工具。当前知识功能已经切换到图谱化的 `meeting_memory` 管线,不再维护旧版 Markdown 台账知识库
## 多轮会话机制
@ -185,7 +170,7 @@ LLM Agent 会在对话中自动选择这些工具。会议整理本身由 `promp
- 工具调用的参数、思考、结果会单独保存到本地 `tool_trace.json`
- 默认不会把大量工具过程长期塞回主上下文;需要时由 Agent 调用 `tool_trace_query` 再查。
CLI 默认使用会话 `cli_default`。Web 侧每个聊天或知识库会映射到独立的本地会话目录。
CLI 默认使用会话 `cli_default`。Web 侧每个聊天会映射到独立的本地会话目录。
## 输出数据
@ -193,10 +178,8 @@ CLI 默认使用会话 `cli_default`。Web 侧每个聊天或知识库会映射
```text
data/
└─ 合川分公司/ # 一个知识库示例
├─ state.json
├─ team_ledger.md
└─ meetings/
├─ meeting_state.json
└─ raw/
```
`team_ledger.md` 是面向人和 Agent 共同读取的主文档;`state.json` 和 `meetings/*.json` 是可复用的结构化中间结果。

View File

@ -26,8 +26,8 @@ class TerminalRenderer:
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Meeting knowledge agent CLI.")
parser.add_argument("--data-dir", default="data", help="台账和结构化数据保存目录。")
parser = argparse.ArgumentParser(description="Meeting memory agent CLI.")
parser.add_argument("--data-dir", default="data", help="会议记忆与原文归档保存目录。")
parser.add_argument("--once", default="", help="只执行一条消息,适合测试。")
parser.add_argument("--offline", action="store_true", help="只用于本地演示:强制使用离线规则 provider不调用大模型。")
parser.add_argument("--list-models", action="store_true", help="列出当前 OpenAI-compatible 服务暴露的模型名。")
@ -66,9 +66,9 @@ def main() -> None:
renderer.render_event(event)
return
print("Meeting Knowledge Interactive Agent")
print(r"示例:导入 D:\github_project\my_code\meeting_agent\examples\huiyi.txt 到知识库 合川分公司")
print("示例:专线护航还有哪些待办?知识库 合川分公司")
print("Meeting Memory Interactive Agent")
print(r"示例:导入 D:\github_project\my_code\meeting_agent\examples\huiyi.txt")
print("示例:最近一次会议提到了哪些待办?")
print("输入 exit / quit 退出。")
while True:
try:

View File

@ -1,5 +1,3 @@
"""Meeting knowledge demo package."""
from meeting_memory.config import config
from .service import MeetingKnowledgeService
__all__ = ["MeetingKnowledgeService"]
__all__ = ["config"]

View File

@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field
load_dotenv()
PACKAGE_ROOT = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(PACKAGE_ROOT)
class LLMConfig(BaseModel):
api_key: str = Field(default=os.getenv("LLM_API_KEY", ""))
base_url: str = Field(default=os.getenv("LLM_BASE_URL", "https://api.deepseek.com/v1"))
model: str = Field(default=os.getenv("LLM_MODEL", "deepseek-chat"))
max_tokens: int = Field(default=64000)
temperature: float = Field(default=0.95)
class EmbeddingConfig(BaseModel):
api_key: str = Field(default=os.getenv("EMBEDDING_API_KEY", ""))
api_base: str = Field(default=os.getenv("EMBEDDING_BASE_URL", "https://api.openai.com/v1"))
model: str = Field(default=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"))
class StorageConfig(BaseModel):
data_dir: str = Field(default=os.path.join(PROJECT_ROOT, "data"))
raw_dir: str = Field(default=os.path.join(PROJECT_ROOT, "data", "raw"))
class Neo4jConfig(BaseModel):
enabled: bool = Field(default=os.getenv("NEO4J_ENABLED", "false").lower() == "true")
uri: str = Field(default=os.getenv("NEO4J_URI", "bolt://localhost:7687"))
user: str = Field(default=os.getenv("NEO4J_USER", "neo4j"))
password: str = Field(default=os.getenv("NEO4J_PASSWORD", ""))
database: str = Field(default=os.getenv("NEO4J_DATABASE", "neo4j"))
class ProjectConfig(BaseModel):
llm: LLMConfig = Field(default_factory=LLMConfig)
embedding: EmbeddingConfig = Field(default_factory=EmbeddingConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
neo4j: Neo4jConfig = Field(default_factory=Neo4jConfig)
state_path: str = Field(default=os.path.join(PROJECT_ROOT, "data", "meeting_state.json"))
config = ProjectConfig()

View File

@ -0,0 +1,143 @@
import hashlib
import logging
import math
import re
from typing import Any, Dict, List, Set, Tuple
logger = logging.getLogger(__name__)
_MINHASH_SEEDS = [
5483, 7361, 8919, 1103, 4597, 6827, 2341, 9973,
3851, 6217, 7589, 1447, 5021, 8363, 2711, 9539,
1973, 6689, 4157, 8831, 3209, 7547, 1627, 5981,
4363, 8707, 2857, 7193, 1543, 6299, 4729, 9067,
]
_BAND_SIZE = 4
_NUM_BANDS = len(_MINHASH_SEEDS) // _BAND_SIZE
_FUZZY_JACCARD_THRESHOLD = 0.9
def normalize_name(name: str) -> str:
return re.sub(r'\s+', ' ', name.strip().lower())
def shannon_entropy(text: str) -> float:
if not text:
return 0.0
lowered = text.lower()
freq: Dict[str, int] = {}
for c in lowered:
freq[c] = freq.get(c, 0) + 1
entropy = 0.0
for count in freq.values():
p = count / len(lowered)
entropy -= p * math.log2(p)
return entropy
def _shingle(s: str, k: int = 3) -> Set[str]:
return {s[i:i + k] for i in range(len(s) - k + 1)}
def _minhash_signature(text: str) -> List[int]:
shingles = _shingle(text)
sig = []
for seed in _MINHASH_SEEDS:
min_hash = min(
int(hashlib.md5(f'{seed}{s}'.encode()).hexdigest()[:8], 16)
for s in shingles
)
sig.append(min_hash)
return sig
def _lsh_bands(signature: List[int]) -> Set[Tuple[int, int]]:
bands = set()
for i in range(_NUM_BANDS):
start = i * _BAND_SIZE
band_hash = hash(tuple(signature[start:start + _BAND_SIZE]))
bands.add((i, band_hash))
return bands
def _jaccard_similarity(sig_a: List[int], sig_b: List[int]) -> float:
if not sig_a or not sig_b:
return 0.0
intersection = sum(1 for a, b in zip(sig_a, sig_b) if a == b)
return intersection / len(sig_a)
def resolve_with_similarity(
extracted: List[Dict[str, Any]],
existing: List[Dict[str, Any]],
) -> Tuple[List[int], List[int]]:
resolved_indices: List[int] = []
unresolved_indices: List[int] = []
existing_norm: Dict[str, Dict] = {}
for c in existing:
norm = normalize_name(c.get('name', ''))
if norm:
existing_norm[norm] = c
for c in existing:
c_name = c.get('name', '')
if c_name:
entropy = shannon_entropy(c_name)
tokens = normalize_name(c_name).split()
if len(c_name) >= 6 and len(tokens) >= 2 and entropy >= 1.5:
c['_minhash'] = _minhash_signature(c_name)
else:
c['_minhash'] = None
for idx, entity in enumerate(extracted):
name = entity.get('name', '')
if not name:
unresolved_indices.append(idx)
continue
norm = normalize_name(name)
if norm in existing_norm:
match = existing_norm[norm]
entity['_resolved_to'] = match
entity['name'] = match.get('name', entity['name'])
resolved_indices.append(idx)
continue
tokens = norm.split()
entropy = shannon_entropy(name)
can_fuzzy = len(name) >= 6 and len(tokens) >= 2 and entropy >= 1.5
if can_fuzzy:
entity_sig = _minhash_signature(name)
entity_bands = _lsh_bands(entity_sig)
best_match = None
best_score = 0.0
for c in existing:
c_sig = c.get('_minhash')
if c_sig is None:
continue
c_bands = _lsh_bands(c_sig)
if not entity_bands & c_bands:
continue
score = _jaccard_similarity(entity_sig, c_sig)
if score >= _FUZZY_JACCARD_THRESHOLD and score > best_score:
best_score = score
best_match = c
if best_match is not None:
entity['_resolved_to'] = best_match
entity['name'] = best_match.get('name', entity['name'])
entity['_match_score'] = best_score
resolved_indices.append(idx)
continue
unresolved_indices.append(idx)
for c in existing:
c.pop('_minhash', None)
return resolved_indices, unresolved_indices

View File

@ -1,260 +1,633 @@
from __future__ import annotations
import hashlib
import json
import logging
import re
from pathlib import Path
import sys
from enum import Enum
from typing import Any, List, Optional
from .models import ExtractedMeeting
from openai import OpenAI
from pydantic import BaseModel, Field
from meeting_memory.config import config
from meeting_memory.prompts import extract_entities as prompt_extract_entities
from meeting_memory.prompts import extract_facts as prompt_extract_facts
from meeting_memory.prompts import resolve_entities as prompt_dedupe_nodes
from meeting_memory.prompts import resolve_facts as prompt_dedupe_edges
from meeting_memory.prompts import summarize_entity as prompt_summarize
logger = logging.getLogger(__name__)
client = OpenAI(
api_key=config.llm.api_key or None,
base_url=config.llm.base_url if config.llm.base_url else None,
)
TIME_RE = re.compile(r"^\d{1,2}:\d{2}$")
SPEAKER_RE = re.compile(r"^未知说话人\d+")
class EntityType(str, Enum):
DEPARTMENT = 'Department'
PROJECT = 'Project'
METRIC = 'Metric'
PERSON = 'Person'
SYSTEM = 'System'
DOCUMENT = 'Document'
PARTICIPANT = 'participant'
UNKNOWN = 'Unknown'
KEYWORDS = {
"action": ("", "需要", "", "争取", "预计", "本周", "下周", "今天", "月底", "会后", "汇报", "完成", "制定", "上报"),
"risk": ("问题", "风险", "未达标", "靠后", "恶化", "影响", "没有", "没达", "退单", "故障", "不满"),
"done": ("已完成", "完成了", "已经完成", "达标", "解决了", "已提交", "已汇报", "已报备"),
# Normalization map: legacy LLM output → canonical type
_ENTITY_TYPE_ALIASES = {
'组织': 'Department',
'organization': 'Department',
'部门': 'Department',
'指标': 'Metric',
'kpi': 'Metric',
'项目': 'Project',
}
METRIC_PATTERNS = (
re.compile(r"(?P<name>弱光).*?(?P<value>\d+(?:\.\d+)?)"),
re.compile(r"(?P<name>转化率).*?(?P<value>\d+(?:\.\d+)?)"),
re.compile(r"(?P<name>退单率).*?(?P<value>\d+(?:\.\d+)?)"),
re.compile(r"(?P<name>蓝单).*?(?P<value>\d+(?:\.\d+)?)"),
re.compile(r"(?P<name>满意度).*?(?P<value>\d+(?:\.\d+)?)"),
re.compile(r"(?P<name>KPI).*?(?P<value>\d+(?:\.\d+)?)"),
)
TARGET_PATTERNS = (
re.compile(r"目标(?P<target>\d+(?:\.\d+)?%?)"),
re.compile(r"达到(?P<target>\d+(?:\.\d+)?%?)"),
re.compile(r"低于(?P<target>\d+(?:\.\d+)?%?)"),
re.compile(r"小于(?P<target>\d+(?:\.\d+)?%?)"),
)
def _canonical_entity_type(raw: str) -> str:
normalized = raw.strip()
if normalized in _ENTITY_TYPE_ALIASES:
return _ENTITY_TYPE_ALIASES[normalized]
for member in EntityType:
if member.value.lower() == normalized.lower():
return member.value
return EntityType.UNKNOWN.value
def extract_meeting(text: str, *, source_path: str | Path, meeting_date: str) -> ExtractedMeeting:
source = str(Path(source_path).resolve())
meeting_id = _meeting_id(source, text)
utterances = _parse_utterances(text)
meeting = ExtractedMeeting(meeting_id=meeting_id, meeting_date=meeting_date, source_path=source)
for item in utterances:
content = _clean_sentence(item["content"])
if len(content) < 8:
continue
evidence = {"time": item["time"], "speaker": item["speaker"], "text": content}
meeting.evidence.append(evidence)
if _contains(content, KEYWORDS["done"]):
meeting.completed_items.append(_item(content, evidence, status="已完成/已处理"))
if _contains(content, KEYWORDS["risk"]):
meeting.risks.append(_risk(content, evidence))
if _contains(content, KEYWORDS["action"]):
meeting.action_items.append(_action(content, evidence))
metric = _metric(content, evidence)
if metric:
meeting.metrics.append(metric)
meeting.summary = _make_summary(meeting)
meeting.followups = _make_followups(meeting)
meeting.metrics = _dedupe_dicts(meeting.metrics, ("name", "value", "evidence"))
meeting.action_items = _dedupe_dicts(meeting.action_items, ("title", "evidence"))
meeting.risks = _dedupe_dicts(meeting.risks, ("title", "evidence"))
meeting.completed_items = _dedupe_dicts(meeting.completed_items, ("title", "evidence"))
return meeting
def _neo4j_labels(entity_type: str) -> list[str]:
canonical = _canonical_entity_type(entity_type)
labels = ['Entity']
if canonical != EntityType.UNKNOWN.value:
labels.append(canonical)
return labels
def _parse_utterances(text: str) -> list[dict[str, str]]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
utterances: list[dict[str, str]] = []
speaker = ""
time = ""
pending: list[str] = []
def flush() -> None:
nonlocal pending
if pending:
utterances.append({"speaker": speaker, "time": time, "content": " ".join(pending)})
pending = []
for line in lines:
if SPEAKER_RE.match(line):
flush()
speaker = line
time = ""
continue
if TIME_RE.match(line):
flush()
time = line
continue
pending.append(line)
flush()
return utterances
class Entity(BaseModel):
name: str
entity_type: str = EntityType.UNKNOWN.value
description: str = ''
def _clean_sentence(text: str) -> str:
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[啊哈嗯呃]{2,}", "", text)
return text
class Relation(BaseModel):
source_entity_name: str
target_entity_name: str
relation_type: str
fact: str = ''
valid_at: str = ''
invalid_at: str = ''
evidence: str = ''
qualifiers: List[str] = Field(default_factory=list)
confidence: float = 0.0
def _contains(text: str, words: tuple[str, ...]) -> bool:
return any(word in text for word in words)
class ActionItem(BaseModel):
task: str
assignee: str = ''
deadline: str = ''
status: str = '待办'
priority: str = ''
def _item(content: str, evidence: dict[str, str], *, status: str) -> dict[str, str]:
return {
"title": _short_title(content),
"status": status,
"owner": _guess_owner(content),
"deadline": _guess_deadline(content),
"evidence": _evidence_ref(evidence),
class Decision(BaseModel):
content: str
proposer: str = ''
status: str = '已决'
class MeetingMetric(BaseModel):
metric_name: str
value: str
target: str = ''
owner: str = ''
trend: str = ''
unit: str = ''
class DepartmentInfo(BaseModel):
name: str
description: str = ''
projects: List[str] = Field(default_factory=list)
class MeetingExtraction(BaseModel):
title: str
date: str = ''
participants: List[str] = Field(default_factory=list)
agenda: List[str] = Field(default_factory=list)
entities: List[Entity] = Field(default_factory=list)
relations: List[Relation] = Field(default_factory=list)
action_items: List[ActionItem] = Field(default_factory=list)
decisions: List[Decision] = Field(default_factory=list)
metrics: List[MeetingMetric] = Field(default_factory=list)
departments: List[DepartmentInfo] = Field(default_factory=list)
summary: str = ''
def _call_llm(
messages: list[dict],
response_model: type | None = None,
stream: bool = False,
max_tokens: int | None = None,
) -> Any:
kwargs = {
'model': config.llm.model,
'messages': messages,
'max_tokens': max_tokens or config.llm.max_tokens,
'temperature': config.llm.temperature,
}
if response_model is not None:
kwargs['response_format'] = {'type': 'json_object'}
if stream:
kwargs['stream'] = True
if not stream:
response = client.chat.completions.create(**kwargs)
content = response.choices[0].message.content
if content is None:
raise ValueError('LLM returned empty response')
return content
def _action(content: str, evidence: dict[str, str]) -> dict[str, str]:
return {
"title": _short_title(content),
"owner": _guess_owner(content),
"deadline": _guess_deadline(content),
"next_step": _short_title(content, limit=80),
"status": "待跟进",
"evidence": _evidence_ref(evidence),
}
def _risk(content: str, evidence: dict[str, str]) -> dict[str, str]:
return {
"title": _short_title(content),
"impact": _guess_impact(content),
"current_handling": _short_title(content, limit=90),
"evidence": _evidence_ref(evidence),
}
def _metric(content: str, evidence: dict[str, str]) -> dict[str, str] | None:
for pattern in METRIC_PATTERNS:
match = pattern.search(content)
if not match:
kwargs['stream'] = True
response = client.chat.completions.create(**kwargs)
chunks: List[str] = []
print('\n[LLM] 开始流式输出:')
for event in response:
if not event.choices:
continue
name = match.group("name")
value = match.group("value")
target = ""
for target_pattern in TARGET_PATTERNS:
target_match = target_pattern.search(content)
if target_match:
target = target_match.group("target")
break
status = "待判断"
if "达标" in content and "未达标" not in content:
status = "达标"
elif "没达" in content or "未达标" in content or "靠后" in content:
status = "未达标/靠后"
elif "向好" in content or "提升" in content or "改善" in content:
status = "改善中"
return {
"name": name,
"value": value,
"target": target,
"status": status,
"trend": _guess_trend(content),
"evidence": _evidence_ref(evidence),
}
return None
delta = event.choices[0].delta.content
if not delta:
continue
chunks.append(delta)
sys.stdout.write(delta)
sys.stdout.flush()
print('\n[LLM] 输出结束')
return ''.join(chunks)
def _make_summary(meeting: ExtractedMeeting) -> list[str]:
parts = []
if meeting.metrics:
parts.append(f"抽取到 {len(meeting.metrics)} 条指标状态。")
if meeting.action_items:
parts.append(f"抽取到 {len(meeting.action_items)} 条待跟进事项。")
if meeting.risks:
parts.append(f"抽取到 {len(meeting.risks)} 条问题/风险线索。")
if meeting.completed_items:
parts.append(f"抽取到 {len(meeting.completed_items)} 条已完成或已处理事项。")
return parts or ["本次会议已导入,未抽取到明显结构化事项。"]
def _make_followups(meeting: ExtractedMeeting) -> list[dict[str, str]]:
followups = []
for item in meeting.action_items[:20]:
if item["deadline"] or any(word in item["title"] for word in ("汇报", "完成", "上报", "回复")):
followups.append(
{
"title": item["title"],
"owner": item["owner"],
"deadline": item["deadline"],
"evidence": item["evidence"],
}
)
return followups
def _short_title(text: str, *, limit: int = 44) -> str:
text = re.sub(r"[。!?].*", "", text).strip(" ,。")
return text if len(text) <= limit else text[:limit].rstrip() + "..."
def _guess_owner(text: str) -> str:
for owner in ("运维", "商务部", "市场部", "综合部", "政企部", "施工", "客服", "分局", "四公司", "市公司"):
if owner in text:
return owner
return "待确认"
def _guess_deadline(text: str) -> str:
patterns = (
r"今天",
r"本周内?",
r"下周",
r"月底",
r"\d+月\d+号?之前",
r"\d+月底",
r"\d+周之内",
r"会后",
)
for pattern in patterns:
match = re.search(pattern, text)
def _try_parse_json(content: str) -> dict | list:
try:
return json.loads(content)
except json.JSONDecodeError:
logger.warning('JSON parsing failed; trying to repair extracted block')
match = re.search(r'\{.*\}|\[.*\]', content, re.DOTALL)
if match:
return match.group(0)
return ""
try:
return json.loads(match.group())
except json.JSONDecodeError as exc:
logger.error('Repaired JSON still failed to parse: %s', exc)
raise
def _guess_impact(text: str) -> str:
for word in ("指标", "考核", "转化率", "退单率", "满意度", "专线", "客户", "故障"):
if word in text:
return f"影响{word}相关工作"
return "影响待确认"
def _normalize_string(name: str) -> str:
return re.sub(r'[\s]+', ' ', name.strip().lower())
def _guess_trend(text: str) -> str:
if "恶化" in text or "下降" in text:
return "变差/下降"
if "向好" in text or "提升" in text or "改善" in text:
return "向好/改善"
if "平稳" in text:
return "平稳"
return ""
def _format_episodes_for_context(episodes: list[dict] | None) -> str:
if not episodes:
return ''
return '\n'.join(
f'[Episode {i}] {ep.get("content", "")}'
for i, ep in enumerate(episodes)
)
def _evidence_ref(evidence: dict[str, str]) -> str:
return f"{evidence.get('time', '')} {evidence.get('speaker', '')}".strip()
# ===== Step 1: 实体节点抽取 =====
def extract_entities_from_text(
text: str,
previous_episodes: list[dict] | None = None,
entity_types: list[dict] | None = None,
stream: bool = False,
) -> list[dict]:
context = {
'episode_content': text,
'previous_episodes': previous_episodes or [],
'entity_types': entity_types or [],
}
messages = prompt_extract_entities(context)
content = _call_llm(messages, stream=stream)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.error('Failed to parse entity extraction result: %s', exc)
return []
if isinstance(data, dict):
data = data.get('entities', data.get('extracted_entities', []))
if not isinstance(data, list):
return []
result = []
for item in data:
if isinstance(item, dict) and item.get('name', '').strip():
result.append({
'name': item['name'].strip(),
'entity_type': item.get('entity_type', 'Entity'),
'description': item.get('description', ''),
'evidence': item.get('evidence', ''),
})
return result
def _dedupe_dicts(items: list[dict[str, str]], keys: tuple[str, ...]) -> list[dict[str, str]]:
seen: set[tuple[str, ...]] = set()
output: list[dict[str, str]] = []
for item in items:
marker = tuple(str(item.get(key, "")) for key in keys)
if marker in seen:
# ===== Step 2: 实体去重 =====
def resolve_entities_against_graph(
extracted: list[dict],
existing: list[dict],
episode_content: str = '',
) -> list[dict]:
if not existing:
return extracted
context = {
'extracted_entities': extracted,
'existing_entities': existing,
'episode_content': episode_content,
}
messages = prompt_dedupe_nodes(context)
content = _call_llm(messages)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.warning('LLM dedup failed, keeping all extracted: %s', exc)
return extracted
if isinstance(data, dict):
data = data.get('entity_resolutions', data.get('resolutions', []))
extracted_by_id = {i: e for i, e in enumerate(extracted)}
existing_by_id = {c.get('candidate_id'): c for c in existing}
for resolution in (data if isinstance(data, list) else []):
if not isinstance(resolution, dict):
continue
seen.add(marker)
output.append(item)
return output
rid = resolution.get('id')
dup_id = resolution.get('duplicate_candidate_id', -1)
if rid is None or rid not in extracted_by_id:
continue
if dup_id >= 0 and dup_id in existing_by_id:
extracted_by_id[rid]['_resolved_to'] = existing_by_id[dup_id]
extracted_by_id[rid]['name'] = resolution.get('name', extracted_by_id[rid]['name'])
return [e for e in extracted_by_id.values() if '_resolved_to' not in e]
def _meeting_id(source: str, text: str) -> str:
digest = hashlib.sha1((source + "\n" + text[:4000]).encode("utf-8", errors="ignore")).hexdigest()[:10]
return f"meeting_{digest}"
# ===== Step 3: 事实关系抽取 =====
def extract_facts_from_text(
text: str,
entities: list[dict],
reference_time: str = '',
previous_episodes: list[dict] | None = None,
stream: bool = False,
) -> list[dict]:
if len(entities) < 2:
return []
context = {
'episode_content': text,
'entities': entities,
'reference_time': reference_time,
'previous_episodes': previous_episodes or [],
}
messages = prompt_extract_facts(context)
content = _call_llm(messages, stream=stream)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.error('Failed to parse fact extraction result: %s', exc)
return []
if isinstance(data, dict):
data = data.get('edges', data.get('facts', data.get('relations', [])))
if not isinstance(data, list):
return []
entity_names = {_normalize_string(e.get('name', '')) for e in entities}
result = []
for item in data:
if not isinstance(item, dict):
continue
src = _normalize_string(item.get('source_entity_name', ''))
tgt = _normalize_string(item.get('target_entity_name', ''))
if src not in entity_names or tgt not in entity_names:
continue
if src == tgt:
continue
result.append({
'source_entity_name': item['source_entity_name'],
'target_entity_name': item['target_entity_name'],
'relation_type': item.get('relation_type', '关联'),
'fact': item.get('fact', ''),
'valid_at': item.get('valid_at', ''),
'invalid_at': item.get('invalid_at', ''),
'evidence': item.get('evidence', ''),
'qualifiers': item.get('qualifiers', []),
'confidence': item.get('confidence', 0.0),
})
return result
# ===== Step 4: 事实去重/矛盾检测 =====
def resolve_facts_against_graph(
new_fact: dict,
existing_facts: list[dict],
invalidation_candidates: list[dict],
) -> dict:
if not existing_facts:
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
context = {
'new_fact': new_fact.get('fact', ''),
'existing_facts': existing_facts,
'invalidation_candidates': invalidation_candidates,
}
messages = prompt_dedupe_edges(context)
content = _call_llm(messages)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.warning('Fact dedup failed, treating as new: %s', exc)
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
if not isinstance(data, dict):
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
return {
'is_duplicate': len(data.get('duplicate_facts', [])) > 0,
'is_contradicted': len(data.get('contradicted_facts', [])) > 0,
'resolved': new_fact,
'duplicate_facts': data.get('duplicate_facts', []),
'contradicted_facts': data.get('contradicted_facts', []),
}
# ===== Step 5: 实体摘要 =====
def extract_entity_summary(
entity_name: str,
episodes: list[str],
existing_summary: str = '',
previous_episodes: list[dict] | None = None,
) -> str:
context = {
'entity_name': entity_name,
'episodes': episodes,
'existing_summary': existing_summary,
'previous_episodes': previous_episodes or [],
}
messages = prompt_summarize(context)
content = _call_llm(messages, max_tokens=1024)
try:
data = _try_parse_json(content)
except Exception:
logger.warning('Failed to parse summary, using empty')
return ''
if isinstance(data, dict):
return data.get('summary', '')
return ''
# ===== 统一入口(兼容原有接口) =====
EXTRACTION_SYSTEM_PROMPT = """
你是一个专业的会议知识抽取助手你的任务是从中文会议记录中抽取结构化事实尤其要抽出更细粒度更有语义深度的关系
输出要求
1. 只输出一个 JSON 对象不要输出解释文字
2. 关系抽取不要停留在"部门汇报了工作"这种浅层描述要尽可能向下细化到
- 责任归属
- 目标值 / 当前值 / 趋势
- 约束条件
- 因果 / 影响
- 时间要求
- 依赖关系
- 部署 / 决策 / 要求 / 风险 / 支撑关系
3. 每条关系尽量同时给出
- subject / predicate / object
- fact: 一句自然语言事实表述
- qualifiers: 限定条件范围状态数值约束等
- evidence: 原文中的关键短句或压缩证据
- confidence: 0 1 之间
- valid_at / invalid_at: 如果文中明确提到时间可填写否则留空
4. 如果原文存在多个事实不要只抽象概括要拆成多条关系
5. 避免空泛关系词优先使用更具体的谓词例如
- 负责 / 汇报 / 目标值 / 当前值 / 低于 / 高于 / 要求 / 督导 / 推进 / 影响 / 支撑 / 依赖 / 计划 / 完成 / 截止于
"""
def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction:
user_prompt = f"""
请从下面会议记录中提取结构化信息并重点做"深层关系抽取""层次结构识别"
输出 JSON 字段
- title
- date
- participants
- agenda
- entities: name, entity_type, description
- entity_type 请使用: Department部门Project项目Metric指标Person人物System系统Document文档
- relations:
- source_entity_name: 源实体名称
- target_entity_name: 目标实体名称
- relation_type: 关系类型 HAS_PROJECTHAS_METRIC负责汇报目标值推进依赖
- fact: 一句自然语言事实描述
- valid_at可选
- invalid_at可选
- evidence: 原文证据
- qualifiers: 限定条件列表
- confidence: 0~1
- action_items: task, assignee, deadline, status, priority
- decisions: content, proposer, status
- metrics: metric_name, value, target, owner, trend, unit
- departments: [{{"name": "部门名称", "description": "", "projects": ["项目名1", "项目名2"]}}]
- summary
层次关系规则
1. Department 管辖 Project relation_type HAS_PROJECT
2. Project 拥有 Metric relation_type HAS_METRIC
3. 其他事实关系负责汇报目标值等直接用 relation_type 表达
关系抽取规则
1. 不要只抽"汇报了工作"这种会议动作要尽量继续下钻出具体事实
2. 如果一句话里同时包含"主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势"应拆成多条关系或在 qualifiers 中保留这些细节
3. 对于"要求、部署、负责、依赖、影响、约束、目标、风险"类信息优先保留
4. fact 必须是一句完整自然可检索的事实描述
5. qualifiers 用于补充数值范围状态条件截止时间优先级等信息
6. evidence 用原文中的关键词短句不要太长
7. confidence 取值 0 1
会议记录如下
{text}
"""
content = _call_llm([
{'role': 'system', 'content': EXTRACTION_SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt},
], stream=stream)
data = _try_parse_json(content)
data = _normalize_meeting_data(data)
return MeetingExtraction(**data)
def _normalize_meeting_data(data: dict) -> dict:
if not isinstance(data, dict):
return {}
return {
'title': _as_str(data.get('title')),
'date': _as_str(data.get('date')),
'participants': _as_str_list(data.get('participants')),
'agenda': _as_str_list(data.get('agenda')),
'entities': _normalize_entities(data.get('entities')),
'relations': _normalize_relations(data.get('relations')),
'action_items': _normalize_action_items(data.get('action_items')),
'decisions': _normalize_decisions(data.get('decisions')),
'metrics': _normalize_metrics(data.get('metrics')),
'departments': _normalize_departments(data.get('departments')),
'summary': _as_str(data.get('summary')),
}
def _as_str(value) -> str:
if value is None:
return ''
if isinstance(value, str):
return value
return str(value)
def _as_float(value) -> float:
if value is None or value == '':
return 0.0
try:
numeric = float(value)
return max(0.0, min(1.0, numeric))
except (TypeError, ValueError):
return 0.0
def _as_str_list(value) -> List[str]:
if isinstance(value, dict):
items = []
for key, item in value.items():
key_text = _as_str(key)
value_text = _as_str(item)
if key_text and value_text:
items.append(f'{key_text}: {value_text}')
elif key_text:
items.append(key_text)
elif value_text:
items.append(value_text)
return items
if not isinstance(value, list):
return []
return [_as_str(item) for item in value if item is not None]
def _normalize_entities(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for entity in value:
if not isinstance(entity, dict):
continue
items.append({
'name': _as_str(entity.get('name')),
'entity_type': _as_str(entity.get('entity_type')),
'description': _as_str(entity.get('description')),
})
return items
def _normalize_relations(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for relation in value:
if not isinstance(relation, dict):
continue
source = _as_str(relation.get('source_entity_name') or relation.get('subject', ''))
target = _as_str(relation.get('target_entity_name') or relation.get('object', ''))
rtype = _as_str(relation.get('relation_type') or relation.get('predicate', ''))
fact = _as_str(relation.get('fact'))
if not fact and source and rtype and target:
fact = f'{source} {rtype} {target}'
items.append({
'source_entity_name': source,
'target_entity_name': target,
'relation_type': rtype,
'fact': fact,
'qualifiers': _as_str_list(relation.get('qualifiers')),
'evidence': _as_str(relation.get('evidence')),
'confidence': _as_float(relation.get('confidence')),
'valid_at': _as_str(relation.get('valid_at')),
'invalid_at': _as_str(relation.get('invalid_at')),
})
return items
def _normalize_action_items(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for action in value:
if not isinstance(action, dict):
continue
items.append({
'task': _as_str(action.get('task')),
'assignee': _as_str(action.get('assignee')),
'deadline': _as_str(action.get('deadline')),
'status': _as_str(action.get('status')) or '待办',
'priority': _as_str(action.get('priority')) or '',
})
return items
def _normalize_decisions(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for decision in value:
if not isinstance(decision, dict):
continue
items.append({
'content': _as_str(decision.get('content')),
'proposer': _as_str(decision.get('proposer')),
'status': _as_str(decision.get('status')) or '已决',
})
return items
def _normalize_metrics(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for metric in value:
if not isinstance(metric, dict):
continue
items.append({
'metric_name': _as_str(metric.get('metric_name')),
'value': _as_str(metric.get('value')),
'target': _as_str(metric.get('target')),
'owner': _as_str(metric.get('owner')),
'trend': _as_str(metric.get('trend')),
'unit': _as_str(metric.get('unit')),
})
return items
def _normalize_departments(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for dept in value:
if not isinstance(dept, dict):
continue
name = _as_str(dept.get('name'))
if not name:
continue
items.append({
'name': name,
'description': _as_str(dept.get('description')),
'projects': _as_str_list(dept.get('projects')),
})
return items

File diff suppressed because it is too large Load Diff

View File

@ -1,29 +0,0 @@
from __future__ import annotations
from pathlib import Path
def read_text_auto(path: str | Path) -> str:
"""Read common Chinese transcript encodings with a small fallback chain."""
file_path = Path(path).expanduser().resolve()
encodings = ("utf-8-sig", "utf-8", "gb18030", "gbk")
last_error: Exception | None = None
for encoding in encodings:
try:
return file_path.read_text(encoding=encoding)
except UnicodeDecodeError as exc:
last_error = exc
raise UnicodeDecodeError(
"unknown",
b"",
0,
1,
f"Unable to decode {file_path}: {last_error}",
)
def ensure_dir(path: str | Path) -> Path:
directory = Path(path)
directory.mkdir(parents=True, exist_ok=True)
return directory

View File

@ -1,640 +0,0 @@
from __future__ import annotations
import json
import re
from datetime import date
from pathlib import Path
from typing import Any
from .io_utils import ensure_dir
from .models import ExtractedMeeting
class LedgerStore:
def __init__(self, root: str | Path = "data") -> None:
self.root = Path(root).resolve()
ensure_dir(self.root)
def import_meeting(self, team_id: str, meeting: ExtractedMeeting) -> dict[str, Any]:
team_dir = ensure_dir(self.root / team_id)
state_path = team_dir / "state.json"
meetings_dir = ensure_dir(team_dir / "meetings")
state = self._load_state(state_path, team_id)
meeting_dict = meeting.to_dict()
existing_ids = {item.get("meeting_id") for item in state.get("meetings", [])}
if meeting.meeting_id in existing_ids:
state["meetings"] = [
meeting_dict if item.get("meeting_id") == meeting.meeting_id else item
for item in state.get("meetings", [])
]
imported = False
else:
state["meetings"].append(meeting_dict)
imported = True
state["last_meeting_date"] = meeting.meeting_date
state["last_updated"] = date.today().isoformat()
state["version"] = int(state.get("version", 0)) + (1 if imported else 0)
state = self._compact_state(state)
state_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
(meetings_dir / f"{meeting.meeting_id}.json").write_text(
json.dumps(meeting_dict, ensure_ascii=False, indent=2),
encoding="utf-8",
)
ledger_path = team_dir / "team_ledger.md"
ledger_path.write_text(self.render_markdown(state), encoding="utf-8")
return {
"team_id": team_id,
"knowledge_base_id": team_id,
"meeting_id": meeting.meeting_id,
"ledger_path": str(ledger_path),
"state_path": str(state_path),
"summary": meeting.summary,
"imported": imported,
"counts": {
"projects": len(meeting.projects),
"metrics": len(meeting.metrics),
"actions": len(meeting.action_items),
"risks": len(meeting.risks),
"completed": len(meeting.completed_items),
"followups": len(meeting.followups),
},
}
def query(self, team_id: str, query: str, *, limit: int = 8) -> dict[str, Any]:
resolved_team = self.resolve_team_id(team_id)
if resolved_team is None:
teams = self.list_team_ids()
if teams:
return {
"answer": f"未找到知识库 `{team_id}` 的台账。当前已有知识库:" + "".join(teams) + "。请在问题里指定知识库名。",
"matches": [],
"available_knowledge_bases": teams,
}
return {"answer": "当前 data 目录下还没有任何知识库台账,请先导入会议。", "matches": [], "available_knowledge_bases": []}
team_dir = self.root / resolved_team
state = json.loads((team_dir / "state.json").read_text(encoding="utf-8"))
ledger_path = team_dir / "team_ledger.md"
if _looks_like_recent_meeting_query(query):
return self._recent_meeting_answer(state, ledger_path)
haystacks = self._flatten_state(state)
keywords = [word for word in _tokenize_query(query) if word]
scored: list[tuple[int, dict[str, str]]] = []
for item in haystacks:
score = sum(item["search_text"].count(word) for word in keywords)
if score:
scored.append((score, item))
scored.sort(key=lambda pair: pair[0], reverse=True)
matches = [item for _, item in scored[:limit]]
if matches:
answer = "根据当前会议台账,找到这些相关内容:\n" + "\n".join(
f"- [{item['type']}] {item['display_text']}" for item in matches
)
else:
answer = "台账中没有命中明显相关内容。可以换成项目名、负责人、指标名、风险关键词或时间要求再问。"
return {
"answer": answer,
"matches": matches,
"ledger_path": str(ledger_path),
"team_id": resolved_team,
"knowledge_base_id": resolved_team,
}
def list_team_ids(self) -> list[str]:
if not self.root.exists():
return []
return sorted(path.name for path in self.root.iterdir() if path.is_dir() and (path / "state.json").exists())
def resolve_team_id(self, team_id: str | None) -> str | None:
teams = self.list_team_ids()
if team_id and team_id != "default_team" and team_id in teams:
return team_id
if team_id and team_id != "default_team" and (self.root / team_id / "state.json").exists():
return team_id
if (team_id == "default_team" or not team_id) and len(teams) == 1:
return teams[0]
return None
def build_agent_context(self, team_id: str) -> str:
resolved = self.resolve_team_id(team_id)
if resolved is None:
return ""
state = json.loads((self.root / resolved / "state.json").read_text(encoding="utf-8"))
lines = []
for project in state.get("projects", [])[:20]:
lines.append(
f"- 项目: {project.get('name', '')} | 状态: {project.get('status', '待确认')} | "
f"最近更新: {project.get('latest_update', '')} | 最近提及: {project.get('last_meeting_date', '')}"
)
recent_meetings = state.get("meetings", [])[-5:]
if recent_meetings:
lines.append("")
lines.append("最近会议摘要:")
for meeting in recent_meetings:
title = meeting.get("title") or Path(meeting.get("source_path", "")).stem
summary = "".join(meeting.get("summary", [])[:2])
lines.append(f"- {meeting.get('meeting_date', '')} | {title} | {summary}")
return "\n".join(lines)
def render_markdown(self, state: dict[str, Any]) -> str:
latest = state["meetings"][-1] if state.get("meetings") else {}
knowledge_base_id = state.get("knowledge_base_id") or state["team_id"]
lines = [
"---",
f"knowledge_base_id: {knowledge_base_id}",
f"team_id: {state['team_id']}",
"doc_type: meeting_project_ledger",
f"last_meeting_date: {state.get('last_meeting_date', '')}",
f"last_updated: {state.get('last_updated', '')}",
f"version: {state.get('version', 0)}",
f"source_meeting_id: {latest.get('meeting_id', '')}",
"---",
"",
f"# {knowledge_base_id} 会议知识台账",
"",
"## 1. 当前总览",
"",
f"- 当前维护项目数:{len(state.get('projects', []))}",
f"- 最近一次会议:{state.get('last_meeting_date', '')}",
]
if latest.get("summary"):
lines.append(f"- 最近会议摘要:{''.join(latest.get('summary', [])[:3])}")
lines.extend(["", "## 2. 项目索引", "", "| 项目 | 当前状态 | 最近提及 | 最新更新 |", "|---|---|---|---|"])
projects = state.get("projects", [])
if projects:
for project in projects:
lines.append(
"| "
+ " | ".join(
_escape_md(str(value))
for value in (
project.get("name", ""),
project.get("status", "待确认"),
project.get("last_meeting_date", ""),
project.get("latest_update", ""),
)
)
+ " |"
)
else:
lines.append("| 暂无 | | | |")
lines.extend(["", "## 3. 项目详情", ""])
if not projects:
lines.append("暂无项目记录。")
for index, project in enumerate(projects, start=1):
lines.extend(
[
f"### 3.{index} {project.get('name', '')}",
"",
f"- 当前状态:{project.get('status', '待确认')}",
f"- 最近提及:{project.get('last_meeting_date', '')}",
f"- 最新更新:{project.get('latest_update', '') or project.get('summary', '') or '待确认'}",
]
)
if project.get("summary"):
lines.append(f"- 项目说明:{project.get('summary')}")
if project.get("keywords"):
lines.append(f"- 检索关键词:{''.join(project.get('keywords', []))}")
lines.extend(["", "#### 指标", "", "| 指标 | 当前值 | 目标 | 状态 | 趋势 | 备注 |", "|---|---|---|---|---|---|"])
metrics = project.get("metrics", [])
if metrics:
for metric in metrics:
lines.append(
"| "
+ " | ".join(
_escape_md(str(metric.get(key, "")))
for key in ("name", "value", "target", "status", "trend", "note")
)
+ " |"
)
else:
lines.append("| 暂无 | | | | | |")
lines.extend(["", "#### 待办", "", "| 事项 | 责任对象 | 截止时间 | 状态 | 下一步 |", "|---|---|---|---|---|"])
actions = project.get("actions", [])
if actions:
for action in actions:
lines.append(
"| "
+ " | ".join(
_escape_md(str(action.get(key, "")))
for key in ("title", "owner", "deadline", "status", "next_step")
)
+ " |"
)
else:
lines.append("| 暂无 | | | | |")
lines.extend(["", "#### 风险", "", "| 风险/问题 | 当前状态 | 影响 | 责任对象 | 处理动作 |", "|---|---|---|---|---|"])
risks = project.get("risks", [])
if risks:
for risk in risks:
lines.append(
"| "
+ " | ".join(
_escape_md(str(risk.get(key, "")))
for key in ("title", "status", "impact", "owner", "next_step")
)
+ " |"
)
else:
lines.append("| 暂无 | | | | |")
lines.extend(["", "#### 已完成", ""])
completed = project.get("completed", [])
if completed:
for item in completed:
lines.append(f"- {item.get('title', '')}{item.get('owner', '待确认')}{item.get('status', '已完成')}")
else:
lines.append("- 暂无。")
lines.extend(["", "#### 决策", ""])
decisions = project.get("decisions", [])
if decisions:
lines.extend(f"- {item}" for item in decisions)
else:
lines.append("- 暂无。")
lines.extend(["", "#### 下一步", ""])
next_steps = project.get("next_steps", [])
if next_steps:
lines.extend(f"- {item}" for item in next_steps)
else:
lines.append("- 暂无。")
lines.extend(["", "#### 更新历史", ""])
history = project.get("history", [])[-6:]
if history:
for item in reversed(history):
lines.append(
f"- {item.get('meeting_date', '')} | {item.get('status', '待确认')} | {item.get('update', '')}"
)
else:
lines.append("- 暂无。")
lines.append("")
lines.extend(["## 4. 会议导入记录", "", "| 日期 | 标题 | meeting_id | 来源 |", "|---|---|---|---|"])
for meeting in state.get("meetings", [])[-20:]:
title = meeting.get("title") or Path(meeting.get("source_path", "")).stem
lines.append(
"| "
+ " | ".join(
_escape_md(str(value))
for value in (
meeting.get("meeting_date", ""),
title,
meeting.get("meeting_id", ""),
meeting.get("source_path", ""),
)
)
+ " |"
)
lines.append("")
return "\n".join(lines)
def _recent_meeting_answer(self, state: dict[str, Any], ledger_path: Path) -> dict[str, Any]:
meetings = state.get("meetings", [])
if not meetings:
return {
"answer": "该知识库台账存在,但还没有会议导入记录。",
"matches": [],
"ledger_path": str(ledger_path),
"team_id": state.get("team_id", ""),
"knowledge_base_id": state.get("knowledge_base_id") or state.get("team_id", ""),
}
latest = meetings[-1]
title = latest.get("title") or Path(latest.get("source_path", "")).stem
parts = [
f"最近一次会议:{latest.get('meeting_date', '')}",
f"会议标题:{title}",
f"来源文件:{latest.get('source_path', '')}",
"",
"会议摘要:",
]
parts.extend(f"- {item}" for item in latest.get("summary", [])[:6])
projects = latest.get("projects", [])
if projects:
parts.extend(["", "本次涉及项目:"])
for project in projects[:8]:
parts.append(
f"- {project.get('name', '')}:状态={project.get('status', '待确认')}"
f"更新={project.get('summary', '') or project.get('progress', '') or '待确认'}"
)
return {
"answer": "\n".join(parts),
"matches": [],
"ledger_path": str(ledger_path),
"team_id": state.get("team_id", ""),
"knowledge_base_id": state.get("knowledge_base_id") or state.get("team_id", ""),
"meeting_id": latest.get("meeting_id", ""),
}
def _flatten_state(self, state: dict[str, Any]) -> list[dict[str, str]]:
result: list[dict[str, str]] = []
for project in state.get("projects", []):
project_name = str(project.get("name", ""))
result.append(
_search_item(
item_type="项目",
display_text=f"{project_name}|状态:{project.get('status', '待确认')}|更新:{project.get('latest_update', '')}",
project=project_name,
raw_values=[
project_name,
project.get("status", ""),
project.get("summary", ""),
project.get("latest_update", ""),
" ".join(project.get("keywords", [])),
],
)
)
for metric in project.get("metrics", []):
result.append(
_search_item(
item_type="指标",
display_text=(
f"{project_name}{metric.get('name', '')}={metric.get('value', '')}"
f"|目标:{metric.get('target', '')}|状态:{metric.get('status', '')}"
),
project=project_name,
raw_values=list(metric.values()),
)
)
for action in project.get("actions", []):
result.append(
_search_item(
item_type="待办",
display_text=(
f"{project_name}{action.get('title', '')}|责任:{action.get('owner', '待确认')}"
f"|截止:{action.get('deadline', '')}|状态:{action.get('status', '')}"
),
project=project_name,
raw_values=list(action.values()),
)
)
for risk in project.get("risks", []):
result.append(
_search_item(
item_type="风险",
display_text=(
f"{project_name}{risk.get('title', '')}|状态:{risk.get('status', '')}"
f"|影响:{risk.get('impact', '')}"
),
project=project_name,
raw_values=list(risk.values()),
)
)
for item in project.get("completed", []):
result.append(
_search_item(
item_type="完成",
display_text=f"{project_name}{item.get('title', '')}|责任:{item.get('owner', '待确认')}",
project=project_name,
raw_values=list(item.values()),
)
)
for item in project.get("decisions", []):
result.append(
_search_item(
item_type="决策",
display_text=f"{project_name}{item}",
project=project_name,
raw_values=[project_name, item],
)
)
for item in project.get("next_steps", []):
result.append(
_search_item(
item_type="下一步",
display_text=f"{project_name}{item}",
project=project_name,
raw_values=[project_name, item],
)
)
for meeting in state.get("meetings", []):
for item in meeting.get("summary", []):
result.append(
_search_item(
item_type="会议摘要",
display_text=f"{meeting.get('meeting_date', '')}{item}",
raw_values=[meeting.get("meeting_date", ""), item],
)
)
return result
def _load_state(self, state_path: Path, team_id: str) -> dict[str, Any]:
if state_path.exists():
state = json.loads(state_path.read_text(encoding="utf-8"))
else:
state = {
"team_id": team_id,
"knowledge_base_id": team_id,
"version": 0,
"last_updated": "",
"last_meeting_date": "",
"meetings": [],
}
state.setdefault("projects", [])
state.setdefault("current_metrics", [])
state.setdefault("open_actions", [])
state.setdefault("risks", [])
state.setdefault("completed_items", [])
state.setdefault("followups", [])
return state
def _compact_state(self, state: dict[str, Any]) -> dict[str, Any]:
project_index: dict[str, dict[str, Any]] = {}
for meeting in state.get("meetings", []):
for raw_project in meeting.get("projects", []):
project = _normalize_project_state(raw_project)
marker = _project_key(project.get("name", ""), project.get("aliases", []))
if not marker:
continue
existing = project_index.get(marker)
if existing is None:
existing = {
"name": project.get("name", ""),
"aliases": project.get("aliases", []),
"status": project.get("status", "待确认"),
"summary": project.get("summary", ""),
"latest_update": project.get("summary", "") or project.get("progress", ""),
"last_meeting_date": meeting.get("meeting_date", ""),
"last_meeting_id": meeting.get("meeting_id", ""),
"keywords": project.get("keywords", []),
"decisions": [],
"next_steps": [],
"metrics": [],
"actions": [],
"risks": [],
"completed": [],
"history": [],
}
project_index[marker] = existing
_merge_project(existing, project, meeting)
projects = sorted(
project_index.values(),
key=lambda item: (item.get("last_meeting_date", ""), item.get("name", "")),
reverse=True,
)
state["projects"] = projects
state["current_metrics"] = _limit_items(_collect_from_projects(projects, "metrics"), 60)
state["open_actions"] = _limit_items([item for item in _collect_from_projects(projects, "actions") if item.get("status") != "已完成"], 120)
state["risks"] = _limit_items(_collect_from_projects(projects, "risks"), 100)
state["completed_items"] = _limit_items(_collect_from_projects(projects, "completed"), 120)
state["followups"] = _limit_items(
[
{
"title": item.get("title", ""),
"owner": item.get("owner", ""),
"deadline": item.get("deadline", ""),
"project": item.get("project", ""),
"status": item.get("status", ""),
}
for item in state["open_actions"]
],
80,
)
return state
def _normalize_project_state(project: dict[str, Any]) -> dict[str, Any]:
return {
"name": str(project.get("name", "")).strip(),
"aliases": _unique_text(project.get("aliases", [])),
"status": str(project.get("status", "")).strip() or "待确认",
"summary": str(project.get("summary", "")).strip(),
"progress": str(project.get("progress", "")).strip(),
"keywords": _unique_text(project.get("keywords", [])),
"decisions": _unique_text(project.get("decisions", [])),
"next_steps": _unique_text(project.get("next_steps", [])),
"metrics": [dict(item) for item in project.get("metrics", []) if item.get("name")],
"actions": [dict(item) for item in project.get("actions", []) if item.get("title")],
"risks": [dict(item) for item in project.get("risks", []) if item.get("title")],
"completed": [dict(item) for item in project.get("completed", []) if item.get("title")],
}
def _merge_project(target: dict[str, Any], incoming: dict[str, Any], meeting: dict[str, Any]) -> None:
target["aliases"] = _unique_text(target.get("aliases", []) + incoming.get("aliases", []) + [incoming.get("name", "")])
target["status"] = incoming.get("status") or target.get("status", "待确认")
target["summary"] = incoming.get("summary") or target.get("summary", "")
target["latest_update"] = incoming.get("summary") or incoming.get("progress") or target.get("latest_update", "")
target["last_meeting_date"] = meeting.get("meeting_date", "") or target.get("last_meeting_date", "")
target["last_meeting_id"] = meeting.get("meeting_id", "") or target.get("last_meeting_id", "")
target["keywords"] = _unique_text(target.get("keywords", []) + incoming.get("keywords", []))
target["decisions"] = _unique_text(target.get("decisions", []) + incoming.get("decisions", []))
target["next_steps"] = _unique_text(target.get("next_steps", []) + incoming.get("next_steps", []) + [item.get("title", "") for item in incoming.get("actions", []) if item.get("title")])
target["metrics"] = _merge_named_entries(target.get("metrics", []), incoming.get("metrics", []), "name", incoming.get("name", ""))
target["actions"] = _merge_named_entries(target.get("actions", []), incoming.get("actions", []), "title", incoming.get("name", ""))
target["risks"] = _merge_named_entries(target.get("risks", []), incoming.get("risks", []), "title", incoming.get("name", ""))
target["completed"] = _merge_named_entries(target.get("completed", []), incoming.get("completed", []), "title", incoming.get("name", ""))
update = incoming.get("summary") or incoming.get("progress") or "本次会议有更新"
history_item = {
"meeting_date": meeting.get("meeting_date", ""),
"meeting_id": meeting.get("meeting_id", ""),
"status": incoming.get("status", "待确认"),
"update": update,
}
target["history"] = _append_unique_history(target.get("history", []), history_item)
def _merge_named_entries(current: list[dict[str, Any]], incoming: list[dict[str, Any]], key: str, project_name: str) -> list[dict[str, Any]]:
merged: dict[str, dict[str, Any]] = {}
for item in current + incoming:
marker = _normalize_name(str(item.get(key, "")))
if not marker:
continue
merged[marker] = {**merged.get(marker, {}), **item, "project": project_name}
return list(merged.values())
def _append_unique_history(history: list[dict[str, Any]], item: dict[str, Any]) -> list[dict[str, Any]]:
marker = (item.get("meeting_id", ""), item.get("update", ""))
output = [entry for entry in history if (entry.get("meeting_id", ""), entry.get("update", "")) != marker]
output.append(item)
return output[-12:]
def _collect_from_projects(projects: list[dict[str, Any]], key: str) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for project in projects:
for item in project.get(key, []):
items.append({**item, "project": project.get("name", "")})
return items
def _limit_items(items: list[dict[str, Any]], size: int) -> list[dict[str, Any]]:
return items[-size:]
def _search_item(*, item_type: str, display_text: str, raw_values: list[Any], project: str = "") -> dict[str, str]:
search_text = "".join(str(value) for value in raw_values if value)
if project:
search_text = f"{project}{search_text}"
return {
"type": item_type,
"project": project,
"display_text": display_text,
"search_text": search_text,
}
def _project_key(name: str, aliases: list[str]) -> str:
normalized = _normalize_name(name)
if normalized:
return normalized
for alias in aliases:
normalized = _normalize_name(alias)
if normalized:
return normalized
return ""
def _normalize_name(text: str) -> str:
return re.sub(r"[\W_]+", "", text).lower()
def _unique_text(values: list[Any]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for raw in values:
text = str(raw).strip()
if not text or text in seen:
continue
seen.add(text)
output.append(text)
return output
def _escape_md(text: str) -> str:
return text.replace("|", "\\|").replace("\n", " ")
def _tokenize_query(query: str) -> list[str]:
stop_words = {"查询", "团队", "team", "team_id", "哪些", "还有", "一下", "帮我", "关于", "知识库", "项目"}
words = [part for part in re.split(r"[\s、,.!?:=]+", query) if len(part) >= 2]
tokens: list[str] = []
for word in words:
if word in stop_words:
continue
tokens.append(word)
if re.search(r"[\u4e00-\u9fff]", word) and len(word) > 4:
for size in range(2, min(6, len(word)) + 1):
for index in range(0, len(word) - size + 1):
tokens.append(word[index : index + size])
return list(dict.fromkeys(tokens))
def _looks_like_recent_meeting_query(query: str) -> bool:
return any(word in query for word in ("最近", "最新", "上次")) and any(word in query for word in ("会议", "内容", "纪要", "摘要"))

View File

@ -1,393 +0,0 @@
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any, Callable
from core_agent.config import apply_compat_env_aliases, build_core_agent_config, load_core_agent_env
from core_agent.exceptions import AgentCancelled
from providers.openai_compatible import OpenAICompatibleProvider
from providers.prompt_loader import load_prompt
from .models import ExtractedMeeting
SubagentProgressCallback = Callable[[dict[str, Any]], None]
class MeetingDigestAgent:
def __init__(self, workspace: str | Path) -> None:
self.workspace = Path(workspace).resolve()
prompt = load_prompt("meeting_digest")
self.system_prompt = prompt["system"]
self.instructions = prompt.get("instructions", "")
self.provider = self._build_provider()
def analyze(
self,
*,
transcript_text: str,
source_path: str | Path,
meeting_date: str,
knowledge_base_id: str,
existing_context: str = "",
progress_callback: SubagentProgressCallback | None = None,
should_cancel: Callable[[], bool] | None = None,
) -> ExtractedMeeting:
source = str(Path(source_path).resolve())
meeting_id = _meeting_id(source, transcript_text)
if self.provider is None:
self._emit(
progress_callback,
kind="subagent_status",
phase="fallback",
text="Agent 未检测到可用模型,切换到降级整理。",
)
return _fallback_meeting(
meeting_id=meeting_id,
meeting_date=meeting_date,
source_path=source,
transcript_text=transcript_text,
)
self._emit(
progress_callback,
kind="subagent_status",
phase="prepare",
text="Agent 正在加载历史台账并构造整理提示。",
)
self._raise_if_cancelled(should_cancel)
user_message = self._build_user_message(
transcript_text=transcript_text,
source_path=source,
meeting_date=meeting_date,
knowledge_base_id=knowledge_base_id,
existing_context=existing_context,
)
self._emit(
progress_callback,
kind="subagent_status",
phase="call_model",
text="Agent 正在调用模型整理项目、待办、风险和指标。",
)
turn = None
for event in self.provider.stream_generate(
[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_message},
],
[],
should_cancel=should_cancel,
):
self._raise_if_cancelled(should_cancel)
if event.type == "reasoning" and event.delta:
self._emit(
progress_callback,
kind="subagent_reasoning",
phase="stream",
text=event.delta,
)
elif event.type == "content" and event.delta:
self._emit(
progress_callback,
kind="subagent_output",
phase="stream",
text=event.delta,
)
elif event.type == "turn":
turn = event.turn
if turn is None:
raise RuntimeError("Meeting digest agent stream ended without a final turn")
self._raise_if_cancelled(should_cancel)
self._emit(
progress_callback,
kind="subagent_status",
phase="parse",
text="Agent 已完成分析,正在解析结构化结果。",
)
payload = _parse_json_object(turn.content or "")
self._emit(
progress_callback,
kind="subagent_status",
phase="parse_done",
text="Agent 已完成结构化解析,准备写入项目台账。",
)
return _meeting_from_payload(
payload,
meeting_id=meeting_id,
meeting_date=meeting_date,
source_path=source,
)
def _build_provider(self) -> OpenAICompatibleProvider | None:
load_core_agent_env(self.workspace)
apply_compat_env_aliases()
config = build_core_agent_config()
if not config.api_key or not config.base_url or not config.model:
return None
return OpenAICompatibleProvider(
model=config.model,
api_key=config.api_key,
base_url=config.base_url,
timeout=config.timeout,
temperature=min(config.temperature, 0.1),
)
def _build_user_message(
self,
*,
transcript_text: str,
source_path: str,
meeting_date: str,
knowledge_base_id: str,
existing_context: str,
) -> str:
context = existing_context.strip() or "暂无历史台账。请直接从本次会议中识别项目并建立初始状态。"
parts: list[str] = []
if self.instructions:
parts.append(self.instructions)
parts.append(
f"知识库: {knowledge_base_id}\n"
f"会议日期: {meeting_date}\n"
f"来源文件: {source_path}\n\n"
"当前历史台账摘要:\n"
f"{context}\n\n"
"会议原文:\n"
f"{transcript_text}"
)
return "\n\n---\n\n".join(parts)
def _emit(
self,
callback: SubagentProgressCallback | None,
*,
kind: str,
phase: str,
text: str,
) -> None:
if callback is None:
return
callback(
{
"channel": "meeting_digest",
"kind": kind,
"phase": phase,
"text": text,
}
)
def _raise_if_cancelled(self, should_cancel: Callable[[], bool] | None) -> None:
if should_cancel and should_cancel():
raise AgentCancelled("Agent run cancelled by user.")
def _meeting_from_payload(
payload: dict[str, Any],
*,
meeting_id: str,
meeting_date: str,
source_path: str,
) -> ExtractedMeeting:
title = _as_text(payload.get("meeting_title"))
overview = _as_text(payload.get("overview"))
summary = _normalize_text_list(payload.get("summary"))
if overview and overview not in summary:
summary.insert(0, overview)
projects = [_normalize_project(item) for item in _as_list(payload.get("projects"))]
projects = [item for item in projects if item["name"]]
metrics: list[dict[str, Any]] = []
actions: list[dict[str, Any]] = []
risks: list[dict[str, Any]] = []
completed_items: list[dict[str, Any]] = []
followups: list[dict[str, Any]] = []
evidence: list[dict[str, str]] = []
for project in projects:
project_name = project["name"]
for metric in project["metrics"]:
metrics.append({**metric, "project": project_name})
for action in project["actions"]:
entry = {**action, "project": project_name}
actions.append(entry)
if entry.get("status") != "已完成":
followups.append(entry)
for risk in project["risks"]:
risks.append({**risk, "project": project_name})
for item in project["completed"]:
completed_items.append({**item, "project": project_name})
for snippet in project["evidence"]:
evidence.append({"project": project_name, "text": snippet})
if not summary:
if projects:
summary = [f"本次会议识别到 {len(projects)} 个项目更新。"]
else:
summary = ["本次会议已导入,但未识别出明确项目。"]
return ExtractedMeeting(
meeting_id=meeting_id,
meeting_date=meeting_date,
source_path=source_path,
title=title,
overview=overview,
summary=summary,
projects=projects,
metrics=metrics,
action_items=actions,
risks=risks,
completed_items=completed_items,
followups=followups,
evidence=evidence,
)
def _normalize_project(raw: Any) -> dict[str, Any]:
item = raw if isinstance(raw, dict) else {}
actions = [_normalize_action(value) for value in _as_list(item.get("actions"))]
risks = [_normalize_risk(value) for value in _as_list(item.get("risks"))]
completed = [_normalize_completed(value) for value in _as_list(item.get("completed"))]
metrics = [_normalize_metric(value) for value in _as_list(item.get("metrics"))]
decisions = _normalize_text_list(item.get("decisions"))
next_steps = _normalize_text_list(item.get("next_steps"))
evidence = _normalize_text_list(item.get("evidence"))
aliases = _normalize_text_list(item.get("aliases"))
keywords = _normalize_text_list(item.get("keywords"))
return {
"name": _as_text(item.get("name")),
"status": _as_text(item.get("status")) or "待确认",
"summary": _as_text(item.get("summary")),
"progress": _as_text(item.get("progress")),
"decisions": decisions,
"next_steps": next_steps,
"metrics": [value for value in metrics if value["name"]],
"actions": [value for value in actions if value["title"]],
"risks": [value for value in risks if value["title"]],
"completed": [value for value in completed if value["title"]],
"keywords": keywords,
"aliases": aliases,
"evidence": evidence,
}
def _normalize_metric(raw: Any) -> dict[str, str]:
item = raw if isinstance(raw, dict) else {}
return {
"name": _as_text(item.get("name")),
"value": _as_text(item.get("value")),
"target": _as_text(item.get("target")),
"status": _as_text(item.get("status")),
"trend": _as_text(item.get("trend")),
"note": _as_text(item.get("note")),
"evidence": _as_text(item.get("evidence")),
}
def _normalize_action(raw: Any) -> dict[str, str]:
item = raw if isinstance(raw, dict) else {}
return {
"title": _as_text(item.get("title")),
"owner": _as_text(item.get("owner")),
"deadline": _as_text(item.get("deadline")),
"status": _as_text(item.get("status")) or "待跟进",
"next_step": _as_text(item.get("next_step")),
"evidence": _as_text(item.get("evidence")),
}
def _normalize_risk(raw: Any) -> dict[str, str]:
item = raw if isinstance(raw, dict) else {}
return {
"title": _as_text(item.get("title")),
"status": _as_text(item.get("status")) or "关注中",
"impact": _as_text(item.get("impact")),
"owner": _as_text(item.get("owner")),
"next_step": _as_text(item.get("next_step")),
"deadline": _as_text(item.get("deadline")),
"evidence": _as_text(item.get("evidence")),
}
def _normalize_completed(raw: Any) -> dict[str, str]:
item = raw if isinstance(raw, dict) else {}
return {
"title": _as_text(item.get("title")),
"status": _as_text(item.get("status")) or "已完成",
"owner": _as_text(item.get("owner")),
"evidence": _as_text(item.get("evidence")),
}
def _parse_json_object(text: str) -> dict[str, Any]:
cleaned = text.strip()
fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", cleaned, flags=re.S)
if fenced:
cleaned = fenced.group(1)
if cleaned.startswith("{") and cleaned.endswith("}"):
return json.loads(cleaned)
match = re.search(r"\{.*\}", cleaned, flags=re.S)
if match:
return json.loads(match.group(0))
raise ValueError("Meeting digest agent did not return valid JSON")
def _fallback_meeting(
*,
meeting_id: str,
meeting_date: str,
source_path: str,
transcript_text: str,
) -> ExtractedMeeting:
lines = [line.strip("-* \t") for line in transcript_text.splitlines() if line.strip()]
important = [line for line in lines if len(line) >= 12][:12]
summary = important[:4] or ["本次会议已导入,当前环境未配置模型,暂未完成智能整理。"]
project = {
"name": Path(source_path).stem[:48] or "未命名项目",
"status": "待确认",
"summary": summary[0],
"progress": "",
"decisions": [],
"next_steps": summary[1:3],
"metrics": [],
"actions": [],
"risks": [],
"completed": [],
"keywords": [],
"aliases": [],
"evidence": important[:6],
}
return ExtractedMeeting(
meeting_id=meeting_id,
meeting_date=meeting_date,
source_path=source_path,
title=Path(source_path).stem,
overview=summary[0],
summary=summary,
projects=[project],
evidence=[{"text": line} for line in important[:6]],
)
def _normalize_text_list(value: Any) -> list[str]:
return [text for text in (_as_text(item) for item in _as_list(value)) if text]
def _as_list(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _as_text(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _meeting_id(source: str, text: str) -> str:
import hashlib
digest = hashlib.sha1((source + "\n" + text[:4000]).encode("utf-8", errors="ignore")).hexdigest()[:10]
return f"meeting_{digest}"

View File

@ -0,0 +1,427 @@
import hashlib
import logging
from typing import Callable, List, Optional
from meeting_memory.config import config
from meeting_memory.dedup_helpers import resolve_with_similarity
from meeting_memory.extractor import (
MeetingExtraction,
extract_entities_from_text,
extract_facts_from_text,
extract_meeting_info as monolithic_extract,
)
from meeting_memory.extractor import (
resolve_entities_against_graph,
resolve_facts_against_graph,
)
from meeting_memory.graph_store import graph_store
from meeting_memory.meeting_state import MeetingStateStore
from meeting_memory.raw_store import raw_meeting_store
logger = logging.getLogger(__name__)
state_store = MeetingStateStore(config.state_path)
ProgressCallback = Callable[[int, int, str], None]
class MeetingProcessor:
def process_meeting_file(self, filepath: str, force: bool = False) -> Optional[str]:
with open(filepath, 'r', encoding='utf-8') as file_obj:
text = file_obj.read()
return self.process_meeting_text(text, force=force)
def process_meeting_text(
self,
text: str,
force: bool = False,
interactive: bool = True,
progress_callback: Optional[ProgressCallback] = None,
use_multistep_extraction: bool = True,
) -> Optional[str]:
def report(step: int, total: int, message: str) -> None:
if progress_callback:
progress_callback(step, total, message)
print(f'[{step}/{total}] {message}')
if use_multistep_extraction:
return self._process_multistep(text, force, interactive, report)
else:
return self._process_monolithic(text, force, interactive, report)
def _process_monolithic(
self, text: str, force: bool, interactive: bool,
report: Callable,
) -> Optional[str]:
total_steps = 7
report(1, total_steps, '计算内容哈希')
content_hash = self._compute_content_hash(text)
if not force and state_store.has_content_hash(content_hash):
logger.info('Duplicate content hash skipped: %s', content_hash[:12])
return None
if not force:
report(2, total_steps, 'Neo4j 语义相似去重检索')
similar = graph_store.find_similar_episode(text, threshold=0.92)
if similar:
meta = similar['metadata']
if not interactive:
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}')
while True:
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
if choice == 'o':
force = True
break
print('请输入 s 或 o。')
else:
report(2, total_steps, '跳过语义去重,按覆盖模式继续')
report(3, total_steps, '调用大模型抽取结构化信息(单步模式)')
meeting_data = self._extract_monolithic(text)
if not meeting_data:
logger.error('Failed to extract meeting information')
return None
data_dict = meeting_data.model_dump()
return self._finish_pipeline(data_dict, content_hash, text, force, interactive, report, total_steps)
def _process_multistep(
self, text: str, force: bool, interactive: bool,
report: Callable,
) -> Optional[str]:
total_steps = 10
report(1, total_steps, '计算内容哈希')
content_hash = self._compute_content_hash(text)
if not force and state_store.has_content_hash(content_hash):
logger.info('Duplicate content hash skipped: %s', content_hash[:12])
return None
if not force:
report(2, total_steps, 'Neo4j 语义相似去重检索')
similar = graph_store.find_similar_episode(text, threshold=0.92)
if similar:
meta = similar['metadata']
if not interactive:
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}')
while True:
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
if choice == 'o':
force = True
break
print('请输入 s 或 o。')
else:
report(2, total_steps, '跳过语义去重,按覆盖模式继续')
# Step 3: 提取标题、日期、参与人等元信息
report(3, total_steps, '抽取会议元信息(标题、日期、参与者等)')
meta_info = self._extract_monolithic(text, stream=interactive)
if not meta_info:
logger.error('Failed to extract meeting metadata')
return None
data_dict = meta_info.model_dump()
data_dict['_content_hash'] = content_hash
data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict)
data_dict['_original_text'] = text
# Step 4: 抽取实体节点LLM 调用 1
report(4, total_steps, '第 1 步实体抽取:识别会议中提及的实体')
use_stream = interactive
previous_episodes = self._get_previous_episodes_context(data_dict)
extracted_entities = extract_entities_from_text(
text, previous_episodes=previous_episodes, stream=use_stream
)
logger.info('Extracted %d entities from meeting', len(extracted_entities))
if not extracted_entities:
logger.warning('No entities extracted, aborting')
return None
# Step 5: 实体去重(与已有图谱对比 + LLM 裁决)
report(5, total_steps, '实体去重:与图谱中已有实体对比')
resolved_entities = self._dedup_entities(extracted_entities, text)
data_dict['entities'] = resolved_entities
logger.info('After dedup: %d entities remain', len(resolved_entities))
# Step 6: 抽取事实关系LLM 调用 2
report(6, total_steps, '事实抽取:提取实体间的结构化关系')
reference_time = data_dict.get('date', '')
extracted_facts = extract_facts_from_text(
text, resolved_entities,
reference_time=reference_time,
previous_episodes=previous_episodes,
stream=use_stream,
)
logger.info('Extracted %d facts from meeting', len(extracted_facts))
# Step 7: 事实去重与矛盾检测
report(7, total_steps, '事实解析:去重与矛盾检测')
resolved_facts = self._dedup_facts(extracted_facts, data_dict)
data_dict['relations'] = resolved_facts
logger.info('After dedup: %d facts remain', len(resolved_facts))
# Step 8: 检查标题和日期重复
report(8, total_steps, '检查标题和日期重复')
should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive)
if should_skip:
return None
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
# Step 9: 归档 + 合并行动项/指标
report(9, total_steps, '归档和状态合并')
raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date)
data_dict['_original_text_path'] = raw_path
meeting_filename = f'{graph_store.meeting_id(data_dict)}.md'
data_dict['action_items'] = state_store.merge_action_items(
data_dict.get('action_items', []),
meeting_title, meeting_date, meeting_filename,
)
data_dict['metrics'] = state_store.merge_metrics(
data_dict.get('metrics', []),
meeting_title, meeting_date, meeting_filename,
)
state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename)
state_store.save()
# Step 10: 写入 Neo4j
report(10, total_steps, '写入 Neo4j 图谱')
graph_store.upsert_meeting_subgraph(data_dict)
logger.info('Meeting processed (multi-step): %s', meeting_title)
return raw_path
def _get_previous_episodes_context(self, data_dict: dict) -> list:
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
series_info = state_store.get_series_info(meeting_title)
if not series_info:
return []
processed = series_info.get('processed_titles', [])
if not processed:
return []
rows = graph_store.run_query('''
MATCH (m:Meeting)
WHERE m.title IN $titles
OPTIONAL MATCH (m)-[:HAS_EPISODE]->(ep:Episode)
RETURN m.title AS title, m.date AS date, ep.summary AS summary, ep.content AS content
ORDER BY m.date DESC
LIMIT 3
''', titles=processed[-3:])
return [{'content': r.get('content', r.get('summary', '')), 'timestamp': r.get('date', '')} for r in rows]
def _dedup_entities(self, extracted: list, text: str) -> list:
try:
existing = graph_store.get_entities_map()
if not existing:
return extracted
existing_list = [
{
'candidate_id': i,
'name': v['name'],
'entity_type': v.get('entity_type', ''),
'summary': v.get('summary', '') or v.get('description', ''),
}
for i, v in enumerate(existing.values())
]
resolved_indices, unresolved_indices = resolve_with_similarity(
extracted, existing_list
)
logger.info(
'Deterministic dedup: %d resolved to existing, %d need LLM',
len(resolved_indices), len(unresolved_indices),
)
resolved_set = set(resolved_indices)
unresolved_set = set(unresolved_indices)
kept = [
e for i, e in enumerate(extracted)
if '_resolved_to' not in e
]
if unresolved_indices:
unresolved = [extracted[i] for i in unresolved_indices]
llm_resolved = resolve_entities_against_graph(
unresolved, existing_list, episode_content=text,
)
kept.extend(llm_resolved)
return kept
except Exception as exc:
logger.warning('Entity dedup failed, keeping all extracted: %s', exc)
return extracted
def _dedup_facts(self, facts: list, data_dict: dict) -> list:
resolved = []
for fact in facts:
try:
source = fact.get('source_entity_name', '')
target = fact.get('target_entity_name', '')
fact_text = fact.get('fact', '')
exact_existing = graph_store.get_facts_between(source, target)
hybrid_candidates = graph_store.search_related_facts_hybrid(
fact_text, limit=5
) if fact_text else []
invalidation_candidates = [
c for c in hybrid_candidates
if c.get('fact', '') not in {e.get('fact', '') for e in exact_existing}
]
all_existing = list(exact_existing)
seen_facts = {e.get('fact', '') for e in exact_existing}
for c in hybrid_candidates:
cf = c.get('fact', '')
if cf and cf not in seen_facts:
all_existing.append(c)
seen_facts.add(cf)
if not all_existing:
resolved.append(fact)
continue
result = resolve_facts_against_graph(
fact, all_existing, invalidation_candidates
)
if not isinstance(result, dict):
resolved.append(fact)
continue
if result.get('is_duplicate'):
logger.debug('Skipped duplicate fact: %s', fact.get('fact', ''))
continue
if result.get('is_contradicted'):
contradicted_indices = result.get('contradicted_facts', [])
for idx in contradicted_indices:
if idx < len(all_existing):
target_fact = all_existing[idx]
graph_store.mark_relation_expired(
source_name=target_fact.get('source_name', source),
target_name=target_fact.get('target_name', target),
relation_type=target_fact.get('relation_type', ''),
)
logger.info(
'Invalidated %d contradicted facts for: %s',
len(contradicted_indices), fact.get('fact', ''),
)
resolved.append(fact)
except Exception as exc:
logger.warning('Fact dedup failed, keeping: %s', exc)
resolved.append(fact)
return resolved
def _finish_pipeline(
self, data_dict: dict, content_hash: str, text: str,
force: bool, interactive: bool, report: Callable, total_steps: int,
) -> Optional[str]:
data_dict['_content_hash'] = content_hash
data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict)
report(4, total_steps, '检查标题和日期重复')
should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive)
if should_skip:
return None
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
report(5, total_steps, '归档原始会议文本')
raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date)
data_dict['_original_text'] = text
data_dict['_original_text_path'] = raw_path
meeting_filename = f'{graph_store.meeting_id(data_dict)}.md'
report(6, total_steps, '合并行动项和指标状态')
data_dict['action_items'] = state_store.merge_action_items(
data_dict.get('action_items', []), meeting_title, meeting_date, meeting_filename,
)
data_dict['metrics'] = state_store.merge_metrics(
data_dict.get('metrics', []), meeting_title, meeting_date, meeting_filename,
)
state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename)
state_store.save()
report(7, total_steps, '写入 Neo4j 图谱和检索数据')
graph_store.upsert_meeting_subgraph(data_dict)
logger.info('Meeting processed: %s', meeting_title)
return raw_path
def _handle_duplicate(self, data_dict: dict, force: bool, interactive: bool = True) -> bool:
title = data_dict.get('title', '')
date = data_dict.get('date', '')
existing = graph_store.get_meeting(title, date)
if not existing:
return False
if force:
logger.info('Duplicate meeting found; overwriting in force mode: %s', title)
self._remove_old(data_dict, existing)
return False
if not interactive:
logger.info('Skipped duplicate meeting in non-interactive mode: %s', title)
return True
print(f'\n发现重复会议:{title} ({date})')
while True:
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped duplicate meeting: %s', title)
return True
if choice == 'o':
self._remove_old(data_dict, existing)
return False
print('请输入 s 或 o。')
def _remove_old(self, data_dict: dict, existing: Optional[dict] = None) -> None:
meeting_id = graph_store.meeting_id(data_dict)
graph_store.remove_meeting_subgraph(meeting_id)
new_hash = data_dict.get('_content_hash', '')
if new_hash:
state_store.remove_content_hash(new_hash)
if existing:
old_hash = existing.get('content_hash', '')
if old_hash and old_hash != new_hash:
state_store.remove_content_hash(old_hash)
logger.info('Removed old meeting artifacts: %s', data_dict.get('title', ''))
def _compute_content_hash(self, text: str) -> str:
normalized = text.strip().replace('\r\n', '\n')
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
def _extract_monolithic(self, text: str, *, stream: bool = True) -> Optional[MeetingExtraction]:
try:
return monolithic_extract(text, stream=stream)
except Exception as exc:
logger.error('LLM extraction failed: %s', exc)
return None
def query(self, question: str, top_k: int = 3) -> str:
return graph_store.format_search_context(question, top_k=top_k)
def stats(self) -> dict:
return {
'graph': graph_store.get_stats(),
'state': state_store.get_stats(),
'raw_dir': config.storage.raw_dir,
'state_path': config.state_path,
}
meeting_processor = MeetingProcessor()

View File

@ -0,0 +1,187 @@
import hashlib
import json
import logging
import os
import re
from typing import List, Optional
logger = logging.getLogger(__name__)
def _item_id(task: str, assignee: str) -> str:
raw = f"{task}|{assignee}"
return hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
def _metric_id(metric_name: str, owner: str) -> str:
raw = f"{metric_name}|{owner}"
return hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
class MeetingStateStore:
def __init__(self, state_path: str):
self.state_path = state_path
self._state = self._load()
def _load(self) -> dict:
if os.path.exists(self.state_path):
try:
with open(self.state_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as exc:
logger.warning("Failed to load state file, creating a new one: %s", exc)
return {
"action_items": {},
"metrics": {},
"meeting_series": {},
"content_hashes": {},
}
def save(self):
os.makedirs(os.path.dirname(self.state_path), exist_ok=True)
with open(self.state_path, "w", encoding="utf-8") as f:
json.dump(self._state, f, ensure_ascii=False, indent=2)
def _ensure_series(self, meeting_title: str, meeting_date: str) -> str:
series_name = self._detect_series(meeting_title)
series = self._state["meeting_series"].get(series_name)
if not series:
series = {"latest_date": meeting_date, "processed_titles": []}
self._state["meeting_series"][series_name] = series
if meeting_date > series.get("latest_date", ""):
series["latest_date"] = meeting_date
if meeting_title not in series["processed_titles"]:
series["processed_titles"].append(meeting_title)
return series_name
def _detect_series(self, title: str) -> str:
cleaned = re.sub(r"\uFF08\d{4}\u7B2C\w+\u671F\uFF09", "", title)
cleaned = re.sub(r"\(\d{4}\u7B2C\w+\u671F\)", "", cleaned)
cleaned = re.sub(r"\d{4}\u7B2C\w+\u671F", "", cleaned)
cleaned = re.sub(r"\d{4}\u5E74\u7B2C\w+\u6B21", "", cleaned)
cleaned = cleaned.strip("-_ ")
return cleaned or title
def merge_action_items(
self,
new_items: List[dict],
meeting_title: str,
meeting_date: str,
meeting_filename: str,
) -> List[dict]:
series_name = self._ensure_series(meeting_title, meeting_date)
merged = []
for item in new_items:
task = item.get("task", "")
assignee = item.get("assignee", "")
iid = _item_id(task, assignee)
history_entry = {
"date": meeting_date,
"meeting": meeting_filename,
"status": item.get("status", "待办"),
"priority": item.get("priority", ""),
"deadline": item.get("deadline", ""),
}
existing = self._state["action_items"].get(iid)
if existing:
existing["history"].append(history_entry)
existing["latest"] = history_entry
latest = existing["history"][-1]
item["_item_id"] = iid
item["_history"] = list(existing["history"])
item["status"] = latest["status"]
item["priority"] = latest["priority"]
item["deadline"] = latest["deadline"]
else:
self._state["action_items"][iid] = {
"item_id": iid,
"task": task,
"assignee": assignee,
"series": series_name,
"created_meeting": meeting_filename,
"history": [history_entry],
"latest": history_entry,
}
item["_item_id"] = iid
item["_history"] = [history_entry]
merged.append(item)
return merged
def merge_metrics(
self,
new_metrics: List[dict],
meeting_title: str,
meeting_date: str,
meeting_filename: str,
) -> List[dict]:
merged = []
for metric in new_metrics:
metric_name = metric.get("metric_name", "")
owner = metric.get("owner", "")
mid = _metric_id(metric_name, owner)
history_entry = {
"date": meeting_date,
"meeting": meeting_filename,
"value": metric.get("value", ""),
"target": metric.get("target", ""),
"trend": metric.get("trend", ""),
}
existing = self._state["metrics"].get(mid)
if existing:
existing["history"].append(history_entry)
existing["latest"] = history_entry
metric["_metric_id"] = mid
metric["_history"] = list(existing["history"])
else:
self._state["metrics"][mid] = {
"metric_id": mid,
"metric_name": metric_name,
"owner": owner,
"history": [history_entry],
"latest": history_entry,
}
metric["_metric_id"] = mid
metric["_history"] = [history_entry]
merged.append(metric)
return merged
def get_action_item_history(self, item_id: str) -> Optional[dict]:
return self._state["action_items"].get(item_id)
def get_metric_history(self, metric_id: str) -> Optional[dict]:
return self._state["metrics"].get(metric_id)
def get_series_info(self, title: str) -> Optional[dict]:
series_name = self._detect_series(title)
return self._state["meeting_series"].get(series_name)
def has_content_hash(self, content_hash: str) -> bool:
return content_hash in self._state["content_hashes"]
def add_content_hash(self, content_hash: str, title: str, date: str, filename: str):
self._state["content_hashes"][content_hash] = {
"title": title,
"date": date,
"filename": filename,
}
def remove_content_hash(self, content_hash: str):
self._state["content_hashes"].pop(content_hash, None)
def get_stats(self) -> dict:
return {
"action_items_tracked": len(self._state["action_items"]),
"metrics_tracked": len(self._state["metrics"]),
"meeting_series": len(self._state["meeting_series"]),
"content_hashes": len(self._state["content_hashes"]),
}

View File

@ -1,38 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ExtractedMeeting:
meeting_id: str
meeting_date: str
source_path: str
title: str = ""
overview: str = ""
summary: list[str] = field(default_factory=list)
projects: list[dict[str, Any]] = field(default_factory=list)
metrics: list[dict[str, Any]] = field(default_factory=list)
action_items: list[dict[str, Any]] = field(default_factory=list)
risks: list[dict[str, Any]] = field(default_factory=list)
completed_items: list[dict[str, Any]] = field(default_factory=list)
followups: list[dict[str, Any]] = field(default_factory=list)
evidence: list[dict[str, str]] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"meeting_id": self.meeting_id,
"meeting_date": self.meeting_date,
"source_path": self.source_path,
"title": self.title,
"overview": self.overview,
"summary": self.summary,
"projects": self.projects,
"metrics": self.metrics,
"action_items": self.action_items,
"risks": self.risks,
"completed_items": self.completed_items,
"followups": self.followups,
"evidence": self.evidence,
}

View File

@ -0,0 +1,5 @@
from .extract_nodes import extract_entities
from .extract_edges import extract_facts
from .dedupe_nodes import resolve_entities
from .dedupe_edges import resolve_facts
from .summarize_nodes import summarize_entity

View File

@ -0,0 +1,49 @@
from typing import Any
def resolve_facts(context: dict[str, Any]) -> list[dict]:
existing_facts = context.get('existing_facts', [])
new_fact = context.get('new_fact', '')
invalidation_candidates = context.get('invalidation_candidates', [])
existing_text = '\n'.join(
f' [idx={i}] {f.get("fact", "")}' for i, f in enumerate(existing_facts)
)
invalidation_text = '\n'.join(
f' [idx={i + len(existing_facts)}] {f.get("fact", "")}'
for i, f in enumerate(invalidation_candidates)
)
user_prompt = f"""
<已有事实>
{existing_text}
</已有事实>
<事实失效候选>
{invalidation_text}
</事实失效候选>
<新事实>
{new_fact}
</新事实>
注意idx 编号是连续的已有事实从 0 开始失效候选紧随其后
任务
1. **重复检测**如果<新事实><已有事实>中的某条描述的是完全相同的客观事实返回该 idx
2. **矛盾检测**如果<新事实><已有事实><失效候选>中的某条相互矛盾如状态已更新数值已变更返回该 idx
返回格式
{{"duplicate_facts": [idx列表], "contradicted_facts": [idx列表]}}
如果没有重复或矛盾返回空列表
示例
- 新事实"张三负责宽带运维项目" vs 已有"张三负责宽带运维" 重复相同事实
- 新事实"宽带用户数当前值 8500" vs 已有"宽带用户数目标值 10000" 不重复不矛盾数值维度不同
- 新事实"宽带用户数当前值 9000" vs 已有"宽带用户数 8000" 矛盾同一指标数值更新
"""
return [
{'role': 'system', 'content': '你是事实去重和矛盾检测助手。判断新事实与已有事实的关系。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,49 @@
from typing import Any
def resolve_entities(context: dict[str, Any]) -> list[dict]:
extracted = context.get('extracted_entities', [])
existing = context.get('existing_entities', [])
episode_content = context.get('episode_content', '')
extracted_text = '\n'.join(
f' [{i}] {e.get("name", "")}{e.get("entity_type", "未知")}{e.get("description", "")}'
for i, e in enumerate(extracted)
)
existing_text = '\n'.join(
f' [candidate_id={c.get("candidate_id", i)}] {c.get("name", "")}{c.get("entity_type", "未知")}{c.get("summary", "")[:100]}'
for i, c in enumerate(existing)
)
user_prompt = f"""
<当前会议内容>
{episode_content}
</当前会议内容>
<新抽取的实体>
{extracted_text}
</新抽取的实体>
<图谱中已有的实体>
{existing_text}
</图谱中已有的实体>
任务判断<新抽取的实体>中的每一个是否与<图谱中已有的实体>中的某个是同一个真实世界对象
判断标准
- **是重复**两个名称指向同一个真实世界的人组织地点项目指标等
- **不是重复**名称相似但指向不同实体如两个同名但不同的人同名的不同项目
对每个新抽取的实体返回
- id: 对应新抽取实体列表中的序号
- name: 实体的最佳名称优先使用已有实体中的更完整名称
- duplicate_candidate_id: 匹配到的已有实体的 candidate_id如果无匹配则填 -1
返回格式 JSON 数组[{{"id": 0, "name": "张三", "duplicate_candidate_id": -1}}, ...]
必须为新抽取的每个实体返回一条记录id 0 开始连续编号
"""
return [
{'role': 'system', 'content': '你是实体去重助手。判断两个实体是否指向同一个真实世界对象。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,66 @@
from typing import Any
def extract_facts(context: dict[str, Any]) -> list[dict]:
previous = context.get('previous_episodes', [])
current = context.get('episode_content', '')
entities = context.get('entities', [])
reference_time = context.get('reference_time', '')
previous_section = ''
if previous:
import json
previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n</历史上下文>\n'
entities_text = '\n'.join(
f' [{i}] {e.get("name", "")}{e.get("entity_type", "未知")}' for i, e in enumerate(entities)
)
user_prompt = f"""
{previous_section}
<当前会议内容>
{current}
</当前会议内容>
<已抽取实体>
{entities_text}
</已抽取实体>
<参考时间>
{reference_time}
</参考时间>
抽取规则
1. <当前会议内容>中抽取上述<已抽取实体>之间的**事实关系**
2. 每条关系必须涉及两个**不同**的实体
3. 返回 JSON 数组格式
[{{
"source_entity_name": "源实体名称(必须来自上方的实体列表)",
"target_entity_name": "目标实体名称(必须来自上方的实体列表)",
"relation_type": "关系类型,如 负责、汇报、隶属于、参与、目标值、截止于、影响、依赖于",
"fact": "一句自然语言的事实描述,保留原文中所有具体细节(数值、时间、地点等)",
"valid_at": "该事实开始成立的时间ISO 8601格式如 2025-04-30T00:00:00Z不明确则留空",
"invalid_at": "该事实不再成立的时间,不明确则留空",
"evidence": "原文中的关键证据短句",
"qualifiers": ["限定条件列表,如数值、范围、状态、截止时间等"],
"confidence": 置信度0到1之间
}}]
4. relation_type 避免使用"关联""涉及"等空泛词优先使用具体谓词
负责汇报目标值当前值低于高于要求督导推进支撑依赖计划完成截止于参与隶属于分管协调审批
5. 层次关系结构隶属使用以下固定 relation_type
HAS_PROJECT: 部门管辖项目Department -> Project
HAS_METRIC: 项目拥有指标Project -> Metric
PART_OF: 实体属于某个上级实体
6. 同一对实体之间可能既有层次关系HAS_PROJECT也有事实关系负责汇报需要分别抽取
7. fact 必须是一句完整的自然语言事实保留所有具体信息人名数值产品名地点等
8. 如果根据上下文可以判断事实的开始/结束时间填入 valid_at / invalid_at
"""
return [
{'role': 'system', 'content': '你是一个专业的事实关系抽取专家。从会议记录中抽取实体间的结构化事实关系。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,56 @@
from typing import Any
SYSTEM_PROMPT = (
'你是会议纪要实体抽取专家。'
'从会议记录中抽取明确的实体节点包括部门Department、项目Project、指标Metric、人物Person、系统System、文档Document等。'
'不要抽取抽象概念、情感、时间日期或泛泛的名词。'
)
def extract_entities(context: dict[str, Any]) -> list[dict]:
previous = context.get('previous_episodes', [])
current = context.get('episode_content', '')
entity_types = context.get('entity_types', [])
entity_types_section = ''
if entity_types:
entity_types_section = '\n'.join(
f' - {t["type"]}: {t["description"]}' for t in entity_types
)
else:
entity_types_section = ' - 未限定类型,请根据上下文自行判断'
previous_section = ''
if previous:
import json
previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n</历史上下文>\n'
user_prompt = f"""
{previous_section}
<当前会议内容>
{current}
</当前会议内容>
<实体类型>
{entity_types_section}
</实体类型>
抽取规则
1. 只抽取当前会议内容中**明确提及**的实体
2. 每个实体必须是有唯一标识的具体事物人名组织名地名项目名指标名称等
3. 不要抽取代词抽象概念增长改善风险时间日期
4. 如果同一实体在不同来源中以不同名称出现如简称/全称保留最完整的形式
5. 必须返回 JSON 数组格式[{{"name": "实体名称", "entity_type": "类型", "description": "描述", "evidence": "原文证据"}}]
6. description 写一段对该实体的简要描述20字以内
7. evidence 从原文中摘录提及该实体的关键短句
注意实体类型建议使用 Department部门Project项目Metric指标Person人物System系统Document文档请确保
- 部门Department会议中提到的具体部门名称"技术部""市场部"
- 项目Project部门负责的具体项目名称
- 指标Metric项目中提到的具体量化指标"响应时间""完成率"
"""
return [
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,41 @@
from typing import Any
def summarize_entity(context: dict[str, Any]) -> list[dict]:
entity_name = context.get('entity_name', '')
existing_summary = context.get('existing_summary', '')
episodes = context.get('episodes', [])
previous = context.get('previous_episodes', [])
existing_section = ''
if existing_summary:
existing_section = f'\n<已有摘要>\n{existing_summary}\n</已有摘要>\n'
previous_section = ''
if previous:
import json
previous_section = f'\n<历史内容>\n{json.dumps(previous, ensure_ascii=False)}\n</历史内容>\n'
episodes_text = '\n---\n'.join(episodes) if isinstance(episodes, list) else episodes
user_prompt = f"""
{previous_section}
<当前内容>
{episodes_text}
</当前内容>
{existing_section}
为实体 **{entity_name}** 生成一段信息密集的摘要
规则
1. 只使用<当前内容><已有摘要>中的事实不要推测
2. 保留所有实质性的人名角色地点日期数值
3. 用第三人称直接陈述事实
4. 不要使用"提及了""讨论了""指出"等元语言动词直接陈述事实
5. 如果会议对已有信息做了更新采用更新的说法
6. 摘要不超过 500
7. 返回 JSON{{"summary": "摘要内容"}}
"""
return [
{'role': 'system', 'content': '你是实体摘要助手。根据会议内容为实体生成信息密集的摘要。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,55 @@
import logging
import os
from datetime import datetime
from meeting_memory.config import config
logger = logging.getLogger(__name__)
def _sanitize_filename(name: str) -> str:
if not name:
return "untitled"
invalid = '<>:"/\\|?*'
for char in invalid:
name = name.replace(char, "")
name = name.replace(" ", "_").strip("._")
return name or "untitled"
class RawMeetingStore:
def __init__(self):
self.raw_dir = config.storage.raw_dir
os.makedirs(self.raw_dir, exist_ok=True)
def save(self, text: str, title: str = "", date: str = "") -> str:
os.makedirs(self.raw_dir, exist_ok=True)
date_str = date or datetime.now().strftime("%Y-%m-%d")
safe_date = _sanitize_filename(date_str)[:40]
safe_title = _sanitize_filename(title)[:60]
filename = f"{safe_date}_{safe_title}.md"
filepath = os.path.join(self.raw_dir, filename)
content = "\n".join(
[
"---",
f'title: "{title}"',
f'date: "{date_str}"',
"status: archived",
"---",
"",
f"# {title or 'Untitled Meeting'}",
"",
text,
"",
]
)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
logger.info("Saved raw meeting text: %s", filepath)
return filepath
raw_meeting_store = RawMeetingStore()

View File

@ -0,0 +1,142 @@
import logging
import re
from collections import Counter
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
def _cosine_similarity(left: List[float], right: List[float]) -> float:
if not left or not right or len(left) != len(right):
return 0.0
dot = sum(a * b for a, b in zip(left, right))
left_norm = sum(a * a for a in left) ** 0.5
right_norm = sum(b * b for b in right) ** 0.5
if left_norm == 0 or right_norm == 0:
return 0.0
return dot / (left_norm * right_norm)
def _keyword_terms(text: str) -> List[str]:
normalized = (text or '').lower()
raw_terms = re.findall(r'[a-z0-9]+|[\u4e00-\u9fff]{2,}', normalized)
stopwords = {'是什么', '多少', '分别', '以及', '还有', '当前值', '目标值'}
terms: List[str] = []
for raw in raw_terms:
if raw in stopwords:
continue
if raw not in terms:
terms.append(raw)
if re.fullmatch(r'[\u4e00-\u9fff]{4,}', raw):
for size in (2, 3, 4):
for idx in range(0, len(raw) - size + 1):
piece = raw[idx: idx + size]
if piece not in stopwords and piece not in terms:
terms.append(piece)
return terms
def reciprocal_rank_fusion(
ranked_lists: List[List[Dict[str, Any]]],
k: int = 60,
key: str = 'text',
) -> List[Dict[str, Any]]:
if not ranked_lists:
return []
scores: Dict[str, Dict] = {}
for ranked in ranked_lists:
for rank, item in enumerate(ranked):
item_id = item.get(key, str(id(item)))
if item_id not in scores:
enriched = dict(item)
enriched['_fusion_scores'] = []
scores[item_id] = {'item': enriched, 'score': 0.0}
scores[item_id]['score'] += 1.0 / (k + rank + 1)
scores[item_id]['item']['_fusion_scores'].append(1.0 / (k + rank + 1))
sorted_items = sorted(
scores.values(),
key=lambda x: (-x['score'], x['item'].get('title', '')),
)
results = []
for entry in sorted_items:
item = entry['item']
item['score'] = round(entry['score'], 4)
results.append(item)
return results
def _keyword_score(text: str, question: str) -> float:
source = (text or '').lower()
terms = _keyword_terms(question)
if not source or not terms:
return 0.0
hits = sum(1 for term in terms if term in source)
return hits / len(terms)
def bm25_score(
query_terms: List[str],
doc_freqs: Dict[str, int],
doc_term_counts: Dict[str, int],
doc_length: float,
avg_doc_length: float,
total_docs: int,
k1: float = 1.5,
b: float = 0.75,
) -> float:
score = 0.0
for term in query_terms:
tf = doc_term_counts.get(term, 0)
if tf == 0:
continue
df = doc_freqs.get(term, 1)
idf = ((total_docs - df + 0.5) / (df + 0.5) + 1.0)
if idf <= 0:
continue
tf_norm = (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * doc_length / avg_doc_length))
score += idf * tf_norm
return score
def search_bfs(
seed_entities: List[str],
get_edges_fn,
max_depth: int = 2,
max_nodes: int = 50,
) -> Dict[str, Any]:
if not seed_entities:
return {'nodes': [], 'edges': [], 'paths': []}
visited = set(seed_entities)
queue = [(entity, 0) for entity in seed_entities]
collected_edges: List[Dict] = []
all_nodes = set(seed_entities)
while queue:
current, depth = queue.pop(0)
if depth >= max_depth:
continue
edges = get_edges_fn(current)
for edge in edges:
source = edge.get('source_name', '')
target = edge.get('target_name', '')
edge_key = f'{source}->{target}:{edge.get("relation_type", "")}'
if edge_key not in {f'{e.get("source_name","")}->{e.get("target_name","")}:{e.get("relation_type","")}' for e in collected_edges}:
collected_edges.append(edge)
for neighbor in (source, target):
if neighbor and neighbor != current and neighbor not in visited:
visited.add(neighbor)
all_nodes.add(neighbor)
if len(all_nodes) < max_nodes:
queue.append((neighbor, depth + 1))
if len(all_nodes) >= max_nodes:
break
return {
'nodes': list(all_nodes),
'edges': collected_edges,
}

View File

@ -1,57 +1,79 @@
from __future__ import annotations
from datetime import date
from pathlib import Path
from typing import Any, Callable
from .io_utils import read_text_auto
from .ledger import LedgerStore
from .meeting_digest_agent import MeetingDigestAgent
SubagentProgressCallback = Callable[[dict[str, Any]], None]
CancelCheck = Callable[[], bool]
from meeting_memory.config import config
from meeting_memory.graph_store import graph_store
from meeting_memory.meeting_processor import meeting_processor
class MeetingKnowledgeService:
def __init__(self, data_dir: str | Path = "data", *, workspace: str | Path | None = None) -> None:
self.store = LedgerStore(data_dir)
self.workspace = Path(workspace or Path.cwd()).resolve()
self.digest_agent = MeetingDigestAgent(self.workspace)
ProgressCallback = Callable[[dict[str, Any]], None]
def import_meeting_transcript(
class MeetingMemoryService:
"""Expose the imported meeting-memory pipeline behind app-friendly methods."""
def __init__(self, workspace: str | Path | None = None) -> None:
self.workspace = Path(workspace).resolve() if workspace else None
def store_meeting_memory(
self,
*,
file_path: str,
knowledge_base_id: str | None = None,
team_id: str | None = None,
meeting_date: str | None = None,
progress_callback: SubagentProgressCallback | None = None,
should_cancel: CancelCheck | None = None,
file_path: str | None = None,
content: str | None = None,
force: bool = False,
progress_callback: ProgressCallback | None = None,
use_multistep_extraction: bool = True,
) -> dict[str, Any]:
text = read_text_auto(file_path)
library_id = knowledge_base_id or team_id or "default_team"
digest = self.digest_agent.analyze(
transcript_text=text,
source_path=file_path,
meeting_date=meeting_date or date.today().isoformat(),
knowledge_base_id=library_id,
existing_context=self.store.build_agent_context(library_id),
progress_callback=progress_callback,
should_cancel=should_cancel,
source_path = (file_path or "").strip()
text = (content or "").strip()
if not source_path and not text:
raise ValueError("Either file_path or content is required.")
resolved_path: Path | None = None
if source_path:
resolved_path = Path(source_path).expanduser().resolve()
text = resolved_path.read_text(encoding="utf-8")
def report(step: int, total: int, message: str) -> None:
if callable(progress_callback):
progress_callback(
{
"step": step,
"total_steps": total,
"message": message,
}
)
archive_path = meeting_processor.process_meeting_text(
text,
force=force,
interactive=False,
progress_callback=report,
use_multistep_extraction=use_multistep_extraction,
)
return self.store.import_meeting(library_id, digest)
return {
"success": bool(archive_path),
"stored": bool(archive_path),
"archive_path": archive_path,
"source_file_path": str(resolved_path) if resolved_path else None,
"data_dir": config.storage.data_dir,
"raw_dir": config.storage.raw_dir,
"graph_enabled": graph_store.enabled,
"stats": meeting_processor.stats(),
"message": "Meeting memory stored." if archive_path else "Meeting skipped or processing failed.",
}
def query_knowledge(
self,
*,
query: str,
knowledge_base_id: str | None = None,
team_id: str | None = None,
limit: int = 8,
) -> dict[str, Any]:
library_id = knowledge_base_id or team_id or "default_team"
return self.store.query(library_id, query, limit=limit)
def query_meeting_memory(self, *, query: str, top_k: int = 5) -> dict[str, Any]:
return {
"success": True,
"query": query,
"top_k": top_k,
"answer": meeting_processor.query(query, top_k=top_k),
"graph_enabled": graph_store.enabled,
"stats": meeting_processor.stats(),
}
def list_knowledge_bases(self) -> dict[str, Any]:
bases = self.store.list_team_ids()
return {"knowledge_bases": bases, "count": len(bases)}
def stats(self) -> dict[str, Any]:
return meeting_processor.stats()

View File

@ -0,0 +1,3 @@
from meeting_memory.services.embedding_service import EmbeddingService, embedding_service
__all__ = ["EmbeddingService", "embedding_service"]

View File

@ -0,0 +1,29 @@
from typing import List, Optional
from openai import OpenAI as OpenAIClient
from meeting_memory.config import config
class EmbeddingService:
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
):
self._client = OpenAIClient(
api_key=api_key or config.embedding.api_key or "not-needed",
base_url=api_base or config.embedding.api_base or None,
)
self._model = model or config.embedding.model
def embed_text(self, text: str) -> List[float]:
response = self._client.embeddings.create(
model=self._model,
input=text,
)
return response.data[0].embedding
embedding_service = EmbeddingService()

View File

@ -1,20 +1,10 @@
system: |
你是一个会议知识库维护 Agent专门维护“周会经营台账”
system: |
你是一个会议长期记忆维护 Agent
你必须优先使用工具来导入和查询知识,而不是凭空猜测。
当前 demo 约定:
- data/ 下的每个子文件夹就是一个知识库。
- 用户指定知识库时,把文件夹名作为 knowledge_base_id 传给工具。
- 用户未指定知识库时,可以省略 knowledge_base_id如果 data/ 下只有一个知识库,工具会自动使用它。
- 用户询问有哪些知识库/库列表时,调用 list_knowledge_bases。
- team_id 是旧兼容参数,优先使用 knowledge_base_id。
工作原则:
- 用户要求导入会议转录时,调用 import_meeting_transcript。
- 导入不是简单摘抄,而是把“单次会议原文”交给会议整理子 Agent更新到该知识库唯一的项目台账 Markdown。
- 同一知识库下,每次只处理一份会议原文;如果会议提到历史项目,就更新原有项目状态;如果出现新项目,就新增项目条目。
- 用户询问项目状态、指标、待办、风险、完成情况、决策、追踪项或最近会议内容时,调用 query_knowledge。
- 回答要短,说明工具结果、台账路径、关键命中和下一步。
- 用户要求导入、保存、记录、归档会议原文时,调用 store_meeting_memory。
- `store_meeting_memory` 会把会议原文写入长期记忆系统,并抽取会议实体、事实关系、行动项、指标和原文归档。
- 用户询问最近会议、项目状态、关键决策、指标变化、待办、风险、人物关系或历史上下文时,调用 query_meeting_memory。
- 回答要短,说明工具结果、归档路径、关键命中和下一步。
- ASR 内容可能有口语、方言、错字和未知说话人,引用时要保留“待确认”的态度。

View File

@ -1,92 +0,0 @@
system: |
你是一个专门负责"会议整理"的子 Agent。
你的唯一任务,是把单次会议原文整理成结构化 JSON供主系统把内容更新进同一个项目 Markdown 台账。
你的工作目标:
- 输入是一份会议原文,以及该知识库当前已有的历史台账摘要。
- 你必须先理解"历史里已经有哪些项目",优先复用已有项目名称。
- 如果本次会议提到了历史项目,就把它识别成同一个项目,并输出新的状态、进展、待办、风险、完成项等更新。
- 如果本次会议出现了历史中没有的新项目,再新增项目。
- 只保留真正重要、可执行、可追踪的信息,不要把整段口语照搬进结果。
- 不确定的信息可以写"待确认",但不要编造。
输出要求:
- 只能输出一个 JSON 对象,不要输出解释,不要加 Markdown 代码块。
- 所有字段尽量给出中文内容。
- 如果某个数组没有内容,返回空数组。
instructions: |
你输出的 JSON 必须严格遵循以下结构:
{
"meeting_title": "本次会议标题,无法判断可为空",
"overview": "对本次会议最重要的整体概括1-2句话",
"summary": [
"3-6条高价值摘要每条一句话"
],
"projects": [
{
"name": "项目名,优先复用历史命名",
"aliases": ["本次会议里提到的别名,可为空"],
"status": "项目当前状态,如推进中/阻塞/待启动/已完成/待确认",
"summary": "该项目这次会议的核心更新",
"progress": "进度说明,简短清晰",
"keywords": ["便于检索的关键词"],
"decisions": [
"本次会议已明确的决策"
],
"next_steps": [
"下一步动作"
],
"metrics": [
{
"name": "指标名",
"value": "当前值",
"target": "目标值",
"status": "达标/未达标/改善中/待确认等",
"trend": "上升/下降/持平/待确认",
"note": "必要补充",
"evidence": "简短来源片段,可为空"
}
],
"actions": [
{
"title": "待办事项",
"owner": "责任人或责任部门",
"deadline": "时间要求,没有就空",
"status": "待跟进/进行中/已完成/待确认",
"next_step": "下一步动作",
"evidence": "简短来源片段,可为空"
}
],
"risks": [
{
"title": "风险或问题",
"status": "关注中/阻塞/待确认/已缓解等",
"impact": "影响描述",
"owner": "责任人或责任部门",
"next_step": "当前处理动作",
"deadline": "时间要求,没有就空",
"evidence": "简短来源片段,可为空"
}
],
"completed": [
{
"title": "已完成事项",
"status": "已完成/已关闭",
"owner": "责任人或责任部门",
"evidence": "简短来源片段,可为空"
}
],
"evidence": [
"支持该项目判断的关键原文短句最多3-5条"
]
}
]
}
额外约束:
- 不要把整个会议标题、整段发言、长串 ASR 内容直接塞进 title/summary/action/risk。
- 优先抽取:项目更新、负责人、截止时间、指标变化、风险、决策、下一步。
- 如果会议内容与历史项目高度相似但叫法略有变化,优先归并到历史项目。
请根据下方的历史台账摘要和会议原文,输出 JSON

View File

@ -6,7 +6,7 @@ from pathlib import Path
def load_prompt(name: str, *, language: str = "zh") -> dict[str, str]:
path = Path(__file__).resolve().parent.parent / "prompt" / language / f"{name}.yaml"
if not path.exists():
return {"system": "你是一个会议知识库维护 Agent。"}
return {"system": "你是一个会议长期记忆维护 Agent。"}
text = path.read_text(encoding="utf-8-sig")
return _parse_yaml_sections(text)

View File

@ -1,4 +1,4 @@
from __future__ import annotations
from __future__ import annotations
import json
import re
@ -16,31 +16,17 @@ class RuleBasedMeetingProvider(AgentProvider):
if last.get("role") == "tool":
return AssistantTurn(content=_summarize_tool_result(last.get("name", ""), last.get("content", "{}")))
user_message = _last_user_message(messages)
knowledge_base_id = _extract_knowledge_base_id(user_message)
path = _extract_path(user_message)
if _looks_like_list_bases(user_message):
return AssistantTurn(
tool_calls=[ToolCall(id=f"call_{uuid.uuid4().hex}", name="list_knowledge_bases", arguments={})]
)
if _looks_like_import(user_message) and path:
args: dict[str, Any] = {"file_path": path}
if knowledge_base_id:
args["knowledge_base_id"] = knowledge_base_id
meeting_date = _extract_date(user_message)
if meeting_date:
args["meeting_date"] = meeting_date
return AssistantTurn(
tool_calls=[ToolCall(id=f"call_{uuid.uuid4().hex}", name="import_meeting_transcript", arguments=args)]
tool_calls=[ToolCall(id=f"call_{uuid.uuid4().hex}", name="store_meeting_memory", arguments={"file_path": path})]
)
query_args: dict[str, Any] = {"query": user_message, "limit": 8}
if knowledge_base_id:
query_args["knowledge_base_id"] = knowledge_base_id
return AssistantTurn(
tool_calls=[
ToolCall(
id=f"call_{uuid.uuid4().hex}",
name="query_knowledge",
arguments=query_args,
name="query_meeting_memory",
arguments={"query": user_message, "top_k": 5},
)
]
)
@ -51,21 +37,15 @@ def _summarize_tool_result(tool_name: str, raw: str) -> str:
data = json.loads(raw)
except json.JSONDecodeError:
return raw
if tool_name == "import_meeting_transcript":
counts = data.get("counts", {})
if tool_name == "store_meeting_memory":
return (
"已导入会议并更新知识库周会台账。\n"
f"- knowledge_base_id: {data.get('knowledge_base_id') or data.get('team_id')}\n"
f"- meeting_id: {data.get('meeting_id')}\n"
f"- ledger: {data.get('ledger_path')}\n"
f"- 抽取:指标 {counts.get('metrics', 0)},待办 {counts.get('actions', 0)},风险 {counts.get('risks', 0)}"
f"完成事项 {counts.get('completed', 0)},追踪项 {counts.get('followups', 0)}"
"已尝试写入会议长期记忆。\n"
f"- stored: {data.get('stored')}\n"
f"- archive: {data.get('archive_path')}\n"
f"- graph_enabled: {data.get('graph_enabled')}"
)
if tool_name == "query_knowledge":
if tool_name == "query_meeting_memory":
return data.get("answer", json.dumps(data, ensure_ascii=False, indent=2))
if tool_name == "list_knowledge_bases":
bases = data.get("knowledge_bases", [])
return "当前已有知识库:" + ("".join(bases) if bases else "暂无")
return json.dumps(data, ensure_ascii=False, indent=2)
@ -77,7 +57,7 @@ def _last_user_message(messages: List[Dict[str, Any]]) -> str:
def _looks_like_import(text: str) -> bool:
return any(word in text for word in ("导入", "读取", "更新", "import")) and bool(_extract_path(text))
return any(word in text for word in ("导入", "读取", "保存", "记录", "归档", "import")) and bool(_extract_path(text))
def _extract_path(text: str) -> str:
@ -86,18 +66,3 @@ def _extract_path(text: str) -> str:
return quoted.group(1)
match = re.search(r"([A-Za-z]:\\[^\s]+?\.(?:txt|md))", text)
return match.group(1) if match else ""
def _extract_knowledge_base_id(text: str) -> str | None:
match = re.search(r"(?:knowledge_base_id|知识库|库|team_id|团队|team)[:= ]*([A-Za-z0-9_\-\u4e00-\u9fff]+)", text)
return match.group(1) if match else None
def _extract_date(text: str) -> str:
match = re.search(r"\d{4}-\d{2}-\d{2}", text)
return match.group(0) if match else ""
def _looks_like_list_bases(text: str) -> bool:
return any(word in text for word in ("有哪些知识库", "知识库列表", "列出知识库", "可用知识库", "有哪些库"))

View File

@ -1,4 +1,6 @@
openai>=1.0.0
pydantic>=2.0.0
python-dotenv>=1.0.0
fastapi>=0.115.0
uvicorn>=0.30.0
neo4j>=5.26.0

View File

@ -1,15 +1,13 @@
from __future__ import annotations
from pathlib import Path
from .general_tools import build_general_registry
from .meeting_tools import build_meeting_registry
from .registry import ToolRegistry
def build_default_registry(data_dir: str | Path = "data") -> ToolRegistry:
def build_default_registry(data_dir: str | None = None) -> ToolRegistry:
registry = ToolRegistry()
for source in (build_general_registry(), build_meeting_registry(data_dir=data_dir)):
for source in (build_general_registry(), build_meeting_registry()):
for definition in source.definitions():
function = definition["function"]
name = function["name"]

View File

@ -1,115 +1,81 @@
from __future__ import annotations
from __future__ import annotations
from pathlib import Path
from typing import Any
from meeting_memory.service import MeetingKnowledgeService
from meeting_memory.service import MeetingMemoryService
from .registry import ToolContext, ToolRegistry
def build_meeting_registry(data_dir: str | Path = "data") -> ToolRegistry:
def build_meeting_registry() -> ToolRegistry:
registry = ToolRegistry()
meeting_service = MeetingKnowledgeService(data_dir=data_dir, workspace=Path.cwd())
meeting_service = MeetingMemoryService()
registry.register(
name="import_meeting_transcript",
description="导入一份会议原文交给会议整理子Agent按项目梳理重点内容并把结果更新到指定知识库的同一份Markdown项目台账",
name="store_meeting_memory",
description="将会议原文写入长期会议记忆系统,抽取实体、事实、指标、行动项,并同步归档与图谱索引",
parameters={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "会议转录文本文件路径。"},
"knowledge_base_id": {
"file_path": {
"type": "string",
"description": "知识库ID对应 data/ 下的文件夹名。用户未说明时可以省略;工具会在只有一个知识库时自动使用该库",
"description": "会议转录文本文件路径。与 content 二选一;优先使用 file_path",
},
"team_id": {
"content": {
"type": "string",
"description": "兼容旧参数:等同于 knowledge_base_id。新代码优先使用 knowledge_base_id。",
"description": "直接传入会议原文内容。适用于用户在消息里直接给出文本。",
},
"force": {
"type": "boolean",
"description": "是否在发现重复会议时强制覆盖。默认 false。",
},
"use_multistep_extraction": {
"type": "boolean",
"description": "是否使用多阶段抽取流程。默认 true。",
},
"meeting_date": {"type": "string", "description": "会议日期YYYY-MM-DD可省略。"},
},
"required": ["file_path"],
"additionalProperties": False,
},
handler=lambda ctx, args: _import_meeting(meeting_service, ctx, args),
handler=lambda ctx, args: _store_meeting_memory(meeting_service, ctx, args),
)
registry.register(
name="query_knowledge",
description="查询指定知识库项目台账中的项目状态、待办、风险、指标、决策、完成事项和最近会议更新",
name="query_meeting_memory",
description="查询长期会议记忆中的最近会议、实体关系、关键事实、指标变化、待办事项和上下文摘要",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "用户查询。"},
"knowledge_base_id": {
"type": "string",
"description": "知识库ID对应 data/ 下的文件夹名。用户未说明时可以省略;工具会在只有一个知识库时自动使用该库。",
},
"team_id": {
"type": "string",
"description": "兼容旧参数:等同于 knowledge_base_id。新代码优先使用 knowledge_base_id。",
},
"limit": {"type": "integer", "description": "最多返回多少条命中。", "minimum": 1, "maximum": 20},
"top_k": {"type": "integer", "description": "最多返回多少条候选上下文。", "minimum": 1, "maximum": 20},
},
"required": ["query"],
"additionalProperties": False,
},
handler=lambda ctx, args: _query_knowledge(meeting_service, ctx, args),
)
registry.register(
name="list_knowledge_bases",
description="列出 data/ 下已有的知识库ID用于让用户或Web界面选择要查询的库。",
parameters={"type": "object", "properties": {}, "additionalProperties": False},
handler=lambda ctx, args: _list_knowledge_bases(meeting_service, ctx, args),
handler=lambda ctx, args: _query_meeting_memory(meeting_service, ctx, args),
)
return registry
def _resolve_knowledge_base_arg(ctx: ToolContext, args: dict[str, Any]) -> str:
return str(
args.get("knowledge_base_id")
or args.get("team_id")
or ctx.session.get("last_knowledge_base_id")
or ctx.session.get("last_team_id")
or "default_team"
)
def _import_meeting(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
file_path = str(args["file_path"])
knowledge_base_id = _resolve_knowledge_base_arg(ctx, args)
meeting_date = args.get("meeting_date")
def _store_meeting_memory(service: MeetingMemoryService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
live_event_sink = ctx.session.get("_live_event_sink")
def progress_callback(payload: dict[str, Any]) -> None:
if callable(live_event_sink):
live_event_sink({"kind": "subagent_progress", "payload": payload})
def should_cancel() -> bool:
return bool(ctx.session.get("_cancel_requested"))
if bool(ctx.session.get("_cancel_requested")):
raise RuntimeError("Agent run cancelled by user.")
result = service.import_meeting_transcript(
file_path=file_path,
knowledge_base_id=knowledge_base_id,
meeting_date=meeting_date,
result = service.store_meeting_memory(
file_path=args.get("file_path"),
content=args.get("content"),
force=bool(args.get("force", False)),
progress_callback=progress_callback,
should_cancel=should_cancel,
use_multistep_extraction=bool(args.get("use_multistep_extraction", True)),
)
ctx.session["last_knowledge_base_id"] = result.get("knowledge_base_id") or knowledge_base_id
ctx.session["last_team_id"] = result.get("knowledge_base_id") or knowledge_base_id
ctx.session["last_ledger_path"] = result.get("ledger_path")
ctx.session["last_memory_archive_path"] = result.get("archive_path")
return result
def _query_knowledge(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
knowledge_base_id = _resolve_knowledge_base_arg(ctx, args)
limit = int(args.get("limit") or 8)
result = service.query_knowledge(query=str(args["query"]), knowledge_base_id=knowledge_base_id, limit=limit)
if result.get("knowledge_base_id"):
ctx.session["last_knowledge_base_id"] = result["knowledge_base_id"]
ctx.session["last_team_id"] = result["knowledge_base_id"]
def _query_meeting_memory(service: MeetingMemoryService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
result = service.query_meeting_memory(query=str(args["query"]), top_k=int(args.get("top_k") or 5))
ctx.session["last_memory_query"] = str(args["query"])
return result
def _list_knowledge_bases(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
return service.list_knowledge_bases()

View File

@ -27,7 +27,7 @@ from core_agent.config import (
require_model_config,
)
from core_agent.session import ConversationSession
from meeting_memory.service import MeetingKnowledgeService
from meeting_memory.service import MeetingMemoryService
from providers.openai_compatible import OpenAICompatibleProvider
from providers.rule_based import RuleBasedMeetingProvider
from tools.default_tools import build_default_registry
@ -38,6 +38,7 @@ DATA_DIR = WORKSPACE / "data"
FRONTEND_DIST_DIR = WORKSPACE / "webui" / "dist"
SIDEBAR_STATE_PATH = WORKSPACE / ".webui_sidebar_state.json"
UPLOADS_DIR = WORKSPACE / "agent_memory" / "uploads"
MEMORY_CARD_ID = "meeting-memory"
def now_iso() -> str:
@ -234,12 +235,7 @@ class ChatStore:
record.session.session_state["workspace"] = str(WORKSPACE.resolve())
def ensure_seed_records(self) -> None:
meeting_service = MeetingKnowledgeService(self._data_dir, workspace=WORKSPACE)
for knowledge_base_id in meeting_service.list_knowledge_bases().get("knowledge_bases", []):
chat_id = self._seed_chat_id(knowledge_base_id)
with self._lock:
if chat_id not in self._records:
self._create_record(chat_id, knowledge_base_id, title=knowledge_base_id)
return
def list_records(self) -> list[ChatRecord]:
self.ensure_seed_records()
@ -437,42 +433,34 @@ def build_agent(force_offline: bool = False) -> tuple[CoreAgent, Any, str]:
def list_knowledge_base_cards(data_dir: Path) -> list[dict[str, Any]]:
cards: list[dict[str, Any]] = []
if not data_dir.exists():
return cards
for path in sorted(data_dir.iterdir()):
if not path.is_dir():
continue
state_path = path / "state.json"
if not state_path.exists():
continue
state = json.loads(state_path.read_text(encoding="utf-8"))
meetings = state.get("meetings", [])
latest = meetings[-1] if meetings else {}
cards.append(
{
"knowledge_base_id": path.name,
"title": path.name,
"meeting_count": len(meetings),
"last_meeting_date": state.get("last_meeting_date", ""),
"updated_at": state.get("last_updated", ""),
"latest_summary": latest.get("summary", [])[:3],
"counts": {
"metrics": len(state.get("current_metrics", [])),
"actions": len(state.get("open_actions", [])),
"risks": len(state.get("risks", [])),
"followups": len(state.get("followups", [])),
},
}
)
return cards
service = MeetingMemoryService(workspace=WORKSPACE)
stats = service.stats()
graph = stats.get("graph", {})
state = stats.get("state", {})
return [
{
"knowledge_base_id": MEMORY_CARD_ID,
"title": "Meeting Memory",
"meeting_count": int(graph.get("meetings", 0) or 0),
"last_meeting_date": "",
"updated_at": now_iso(),
"latest_summary": [
f"Meetings {graph.get('meetings', 0)}",
f"Entities {graph.get('entities', 0)}",
f"Action items {state.get('action_items_tracked', 0)}",
],
"counts": {
"metrics": int(state.get("metrics_tracked", 0) or 0),
"actions": int(state.get("action_items_tracked", 0) or 0),
"risks": 0,
"followups": 0,
},
}
]
def default_knowledge_base_id(data_dir: Path) -> str | None:
cards = list_knowledge_base_cards(data_dir)
if len(cards) == 1:
return cards[0]["knowledge_base_id"]
return None
return MEMORY_CARD_ID
def _read_auth_token(request: Request, authorization: str | None = Header(default=None)) -> str | None: