meeting_memory/tools/meeting_tools.py

82 lines
3.3 KiB
Python

from __future__ import annotations
from typing import Any
from meeting_memory.service import MeetingMemoryService
from .registry import ToolContext, ToolRegistry
def build_meeting_registry() -> ToolRegistry:
registry = ToolRegistry()
meeting_service = MeetingMemoryService()
registry.register(
name="store_meeting_memory",
description="将会议原文写入长期会议记忆系统,抽取实体、事实、指标、行动项,并同步归档与图谱索引。",
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "会议转录文本文件路径。与 content 二选一;优先使用 file_path。",
},
"content": {
"type": "string",
"description": "直接传入会议原文内容。适用于用户在消息里直接给出文本。",
},
"force": {
"type": "boolean",
"description": "是否在发现重复会议时强制覆盖。默认 false。",
},
"use_multistep_extraction": {
"type": "boolean",
"description": "是否使用多阶段抽取流程。默认 true。",
},
},
"additionalProperties": False,
},
handler=lambda ctx, args: _store_meeting_memory(meeting_service, ctx, args),
)
registry.register(
name="query_meeting_memory",
description="查询长期会议记忆中的最近会议、实体关系、关键事实、指标变化、待办事项和上下文摘要。",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "用户查询。"},
"top_k": {"type": "integer", "description": "最多返回多少条候选上下文。", "minimum": 1, "maximum": 20},
},
"required": ["query"],
"additionalProperties": False,
},
handler=lambda ctx, args: _query_meeting_memory(meeting_service, ctx, args),
)
return registry
def _store_meeting_memory(service: MeetingMemoryService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
live_event_sink = ctx.session.get("_live_event_sink")
def progress_callback(payload: dict[str, Any]) -> None:
if callable(live_event_sink):
live_event_sink({"kind": "subagent_progress", "payload": payload})
if bool(ctx.session.get("_cancel_requested")):
raise RuntimeError("Agent run cancelled by user.")
result = service.store_meeting_memory(
file_path=args.get("file_path"),
content=args.get("content"),
force=bool(args.get("force", False)),
progress_callback=progress_callback,
use_multistep_extraction=bool(args.get("use_multistep_extraction", True)),
)
ctx.session["last_memory_archive_path"] = result.get("archive_path")
return result
def _query_meeting_memory(service: MeetingMemoryService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]:
result = service.query_meeting_memory(query=str(args["query"]), top_k=int(args.get("top_k") or 5))
ctx.session["last_memory_query"] = str(args["query"])
return result