from __future__ import annotations from pathlib import Path from typing import Any from meeting_memory.service import MeetingKnowledgeService from .registry import ToolContext, ToolRegistry def build_meeting_registry(data_dir: str | Path = "data") -> ToolRegistry: registry = ToolRegistry() meeting_service = MeetingKnowledgeService(data_dir=data_dir, workspace=Path.cwd()) registry.register( name="import_meeting_transcript", description="导入一份会议原文,交给会议整理子Agent按项目梳理重点内容,并把结果更新到指定知识库的同一份Markdown项目台账。", parameters={ "type": "object", "properties": { "file_path": {"type": "string", "description": "会议转录文本文件路径。"}, "knowledge_base_id": { "type": "string", "description": "知识库ID,对应 data/ 下的文件夹名。用户未说明时可以省略;工具会在只有一个知识库时自动使用该库。", }, "team_id": { "type": "string", "description": "兼容旧参数:等同于 knowledge_base_id。新代码优先使用 knowledge_base_id。", }, "meeting_date": {"type": "string", "description": "会议日期,YYYY-MM-DD,可省略。"}, }, "required": ["file_path"], "additionalProperties": False, }, handler=lambda ctx, args: _import_meeting(meeting_service, ctx, args), ) registry.register( name="query_knowledge", description="查询指定知识库项目台账中的项目状态、待办、风险、指标、决策、完成事项和最近会议更新。", parameters={ "type": "object", "properties": { "query": {"type": "string", "description": "用户查询。"}, "knowledge_base_id": { "type": "string", "description": "知识库ID,对应 data/ 下的文件夹名。用户未说明时可以省略;工具会在只有一个知识库时自动使用该库。", }, "team_id": { "type": "string", "description": "兼容旧参数:等同于 knowledge_base_id。新代码优先使用 knowledge_base_id。", }, "limit": {"type": "integer", "description": "最多返回多少条命中。", "minimum": 1, "maximum": 20}, }, "required": ["query"], "additionalProperties": False, }, handler=lambda ctx, args: _query_knowledge(meeting_service, ctx, args), ) registry.register( name="list_knowledge_bases", description="列出 data/ 下已有的知识库ID,用于让用户或Web界面选择要查询的库。", parameters={"type": "object", "properties": {}, "additionalProperties": False}, handler=lambda ctx, args: _list_knowledge_bases(meeting_service, ctx, args), ) return registry def _resolve_knowledge_base_arg(ctx: ToolContext, args: dict[str, Any]) -> str: return str( args.get("knowledge_base_id") or args.get("team_id") or ctx.session.get("last_knowledge_base_id") or ctx.session.get("last_team_id") or "default_team" ) def _import_meeting(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]: file_path = str(args["file_path"]) knowledge_base_id = _resolve_knowledge_base_arg(ctx, args) meeting_date = args.get("meeting_date") live_event_sink = ctx.session.get("_live_event_sink") def progress_callback(payload: dict[str, Any]) -> None: if callable(live_event_sink): live_event_sink({"kind": "subagent_progress", "payload": payload}) def should_cancel() -> bool: return bool(ctx.session.get("_cancel_requested")) result = service.import_meeting_transcript( file_path=file_path, knowledge_base_id=knowledge_base_id, meeting_date=meeting_date, progress_callback=progress_callback, should_cancel=should_cancel, ) ctx.session["last_knowledge_base_id"] = result.get("knowledge_base_id") or knowledge_base_id ctx.session["last_team_id"] = result.get("knowledge_base_id") or knowledge_base_id ctx.session["last_ledger_path"] = result.get("ledger_path") return result def _query_knowledge(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]: knowledge_base_id = _resolve_knowledge_base_arg(ctx, args) limit = int(args.get("limit") or 8) result = service.query_knowledge(query=str(args["query"]), knowledge_base_id=knowledge_base_id, limit=limit) if result.get("knowledge_base_id"): ctx.session["last_knowledge_base_id"] = result["knowledge_base_id"] ctx.session["last_team_id"] = result["knowledge_base_id"] return result def _list_knowledge_bases(service: MeetingKnowledgeService, ctx: ToolContext, args: dict[str, Any]) -> dict[str, Any]: return service.list_knowledge_bases()