from __future__ import annotations import argparse import asyncio import hashlib import json import secrets import time import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from threading import Lock from typing import Any from urllib.parse import unquote from fastapi import FastAPI, Header, HTTPException, Query, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from agent import CoreAgent from core_agent.config import ( apply_compat_env_aliases, build_core_agent_config, load_core_agent_env, require_model_config, ) from core_agent.session import ConversationSession from meeting_memory.service import MeetingKnowledgeService from providers.openai_compatible import OpenAICompatibleProvider from providers.rule_based import RuleBasedMeetingProvider from tools.default_tools import build_default_registry WORKSPACE = Path(__file__).resolve().parent DATA_DIR = WORKSPACE / "data" FRONTEND_DIST_DIR = WORKSPACE / "webui" / "dist" SIDEBAR_STATE_PATH = WORKSPACE / ".webui_sidebar_state.json" UPLOADS_DIR = WORKSPACE / "agent_memory" / "uploads" def now_iso() -> str: return datetime.now().isoformat(timespec="seconds") def now_ms() -> int: return int(time.time() * 1000) def _history_to_ui_messages(history: list[dict[str, Any]]) -> list[dict[str, Any]]: messages: list[dict[str, Any]] = [] created_at = now_ms() for item in history: role = str(item.get("role", "")) content = str(item.get("content", "")) if role not in {"user", "assistant"} or not content: continue messages.append( { "id": uuid.uuid4().hex, "role": role, "content": content, "createdAt": created_at, "turnId": f"persisted_{len(messages)}", "turnPhase": "user" if role == "user" else "answer", "turnSeq": 0 if role == "user" else 99, } ) created_at += 1 return messages def split_session_key(key: str) -> tuple[str, str]: if ":" in key: channel, chat_id = key.split(":", 1) return channel, chat_id return "websocket", key def default_sidebar_state() -> dict[str, Any]: return { "schema_version": 1, "project_paths": [], "pinned_keys": [], "archived_keys": [], "title_overrides": {}, "project_name_overrides": {}, "tags_by_key": {}, "collapsed_groups": {}, "view": { "density": "comfortable", "show_previews": False, "show_timestamps": False, "show_archived": False, "sort": "updated_desc", }, "updated_at": now_iso(), } def _sanitize_upload_name(name: str, fallback_stem: str = "attachment") -> str: raw = Path(name or "").name if not raw: raw = f"{fallback_stem}.txt" safe = "".join(ch if ch not in '<>:"/\\|?*' else "_" for ch in raw).strip().strip(".") return safe or f"{fallback_stem}.txt" def _persist_text_attachments(chat_id: str, attachments: list[dict[str, Any]]) -> list[dict[str, Any]]: if not attachments: return [] target_dir = UPLOADS_DIR / chat_id target_dir.mkdir(parents=True, exist_ok=True) saved: list[dict[str, Any]] = [] for index, item in enumerate(attachments, start=1): name = _sanitize_upload_name(str(item.get("name") or f"attachment_{index}.txt")) content = str(item.get("content") or "") kind = str(item.get("kind") or "text/plain") stem = Path(name).stem or f"attachment_{index}" suffix = Path(name).suffix or ".txt" path = target_dir / f"{int(time.time() * 1000)}_{index}_{stem}{suffix}" path.write_text(content, encoding="utf-8") saved.append( { "name": name, "kind": kind, "path": str(path), "size": len(content.encode("utf-8")), } ) return saved def _augment_message_with_saved_attachments(content: str, saved_attachments: list[dict[str, Any]]) -> str: if not saved_attachments: return content lines = [content.rstrip(), "", "附加文本文件已保存,可直接读取或导入:"] for item in saved_attachments: lines.append(f"- {item['name']}: {item['path']}") return "\n".join(line for line in lines if line is not None) def _validate_workspace_scope(workspace_scope: dict[str, Any] | None) -> None: project_path = str((workspace_scope or {}).get("project_path") or "").strip() if not project_path or project_path.startswith("knowledge-base://"): return candidate = Path(project_path).expanduser().resolve() if not candidate.exists() or not candidate.is_dir(): raise ValueError("project_path must be an existing directory") @dataclass(slots=True) class ChatRecord: chat_id: str session: ConversationSession title: str = "" preview: str = "" knowledge_base_id: str | None = None created_at: str = field(default_factory=now_iso) updated_at: str = field(default_factory=now_iso) ui_messages: list[dict[str, Any]] = field(default_factory=list) workspace_scope: dict[str, Any] | None = None running: bool = False current_turn_id: str | None = None run_task: asyncio.Task[Any] | None = None @property def key(self) -> str: return f"websocket:{self.chat_id}" def to_summary(self) -> dict[str, Any]: return { "key": self.key, "created_at": self.created_at, "updated_at": self.updated_at, "title": self.title, "preview": self.preview, "run_started_at": int(time.time()) if self.running else None, "workspace_scope": self.workspace_scope, "knowledge_base_id": self.knowledge_base_id, } class ChatStore: def __init__(self, agent: CoreAgent, data_dir: Path) -> None: self._agent = agent self._data_dir = data_dir self._lock = Lock() self._records: dict[str, ChatRecord] = {} def _seed_chat_id(self, knowledge_base_id: str) -> str: digest = hashlib.md5(knowledge_base_id.encode("utf-8")).hexdigest()[:12] return f"kb_{digest}" def _default_preview(self, knowledge_base_id: str | None) -> str: if knowledge_base_id: return f"围绕知识库「{knowledge_base_id}」开始提问" return "" def _new_session(self, knowledge_base_id: str | None = None) -> ConversationSession: session_id = knowledge_base_id or f"chat_{uuid.uuid4().hex[:12]}" session = self._agent.new_session(session_id=session_id) if knowledge_base_id: session.session_state["last_knowledge_base_id"] = knowledge_base_id session.session_state["last_team_id"] = knowledge_base_id return session def _create_record(self, chat_id: str, knowledge_base_id: str | None, title: str = "") -> ChatRecord: session = self._new_session(knowledge_base_id or chat_id) record = ChatRecord( chat_id=chat_id, session=session, title=title or (knowledge_base_id or ""), preview=self._default_preview(knowledge_base_id), knowledge_base_id=knowledge_base_id, workspace_scope=None, ) record.ui_messages = _history_to_ui_messages(session.persisted_dialog_messages()) if record.ui_messages and not record.preview: record.preview = str(record.ui_messages[-1].get("content", ""))[:120] self._records[chat_id] = record return record def _apply_workspace_scope(self, record: ChatRecord, workspace_scope: dict[str, Any] | None) -> None: record.workspace_scope = workspace_scope project_path = str((workspace_scope or {}).get("project_path") or "").strip() if project_path and not project_path.startswith("knowledge-base://"): candidate = Path(project_path).expanduser().resolve() record.session.workspace = candidate record.session.session_state["workspace"] = str(candidate) else: record.session.workspace = WORKSPACE.resolve() record.session.session_state["workspace"] = str(WORKSPACE.resolve()) def ensure_seed_records(self) -> None: meeting_service = MeetingKnowledgeService(self._data_dir, workspace=WORKSPACE) for knowledge_base_id in meeting_service.list_knowledge_bases().get("knowledge_bases", []): chat_id = self._seed_chat_id(knowledge_base_id) with self._lock: if chat_id not in self._records: self._create_record(chat_id, knowledge_base_id, title=knowledge_base_id) def list_records(self) -> list[ChatRecord]: self.ensure_seed_records() with self._lock: return sorted( self._records.values(), key=lambda item: (item.updated_at, item.created_at, item.title), reverse=True, ) def create_chat(self, knowledge_base_id: str | None = None) -> ChatRecord: with self._lock: chat_id = f"chat_{uuid.uuid4().hex[:12]}" return self._create_record(chat_id, knowledge_base_id) def fork_chat(self, source_chat_id: str, before_user_index: int) -> ChatRecord: with self._lock: source = self._records[source_chat_id] fork = self._create_record( f"chat_{uuid.uuid4().hex[:12]}", source.knowledge_base_id, title=source.title, ) user_seen = 0 copied: list[dict[str, Any]] = [] for message in source.ui_messages: if message.get("role") == "user": if user_seen >= before_user_index: break user_seen += 1 copied.append(json.loads(json.dumps(message))) fork.ui_messages = copied fork.preview = source.preview return fork def get_by_chat_id(self, chat_id: str) -> ChatRecord: self.ensure_seed_records() with self._lock: if chat_id not in self._records: return self._create_record(chat_id, knowledge_base_id=None) return self._records[chat_id] def get_by_key(self, key: str) -> ChatRecord: _, chat_id = split_session_key(key) return self.get_by_chat_id(chat_id) def set_workspace_scope(self, chat_id: str, workspace_scope: dict[str, Any] | None) -> ChatRecord: record = self.get_by_chat_id(chat_id) self._apply_workspace_scope(record, workspace_scope) record.updated_at = now_iso() return record def set_knowledge_base(self, chat_id: str, knowledge_base_id: str | None) -> ChatRecord: record = self.get_by_chat_id(chat_id) cleaned = (knowledge_base_id or "").strip() or None record.knowledge_base_id = cleaned if cleaned: record.session.session_state["last_knowledge_base_id"] = cleaned record.session.session_state["last_team_id"] = cleaned else: record.session.session_state.pop("last_knowledge_base_id", None) record.session.session_state.pop("last_team_id", None) record.updated_at = now_iso() return record def append_user_message( self, record: ChatRecord, content: str, turn_id: str | None, *, media: list[dict[str, Any]] | None = None, ) -> None: text = content.strip() record.updated_at = now_iso() if text: record.preview = text[:120] if not record.title and not record.knowledge_base_id: record.title = text[:32] record.ui_messages.append( { "id": uuid.uuid4().hex, "role": "user", "content": content, "createdAt": now_ms(), "turnId": turn_id, "turnPhase": "user", "turnSeq": 0, **({"media": media} if media else {}), } ) def append_assistant_message(self, record: ChatRecord, content: str, turn_id: str | None, latency_ms: int) -> None: record.updated_at = now_iso() record.ui_messages.append( { "id": uuid.uuid4().hex, "role": "assistant", "content": content, "createdAt": now_ms(), "turnId": turn_id, "turnPhase": "answer", "turnSeq": 99, "latencyMs": latency_ms, } ) class GatewayState: def __init__(self, *, agent: CoreAgent, config: Any, mode: str) -> None: self.agent = agent self.config = config self.mode = mode self.chat_store = ChatStore(agent, DATA_DIR) self.token = secrets.token_urlsafe(24) self.sidebar_state = self._load_sidebar_state() self.socket_chats: dict[int, set[str]] = {} self.chat_sockets: dict[str, set[WebSocket]] = {} def _load_sidebar_state(self) -> dict[str, Any]: if SIDEBAR_STATE_PATH.exists(): try: return json.loads(SIDEBAR_STATE_PATH.read_text(encoding="utf-8")) except Exception: pass return default_sidebar_state() def save_sidebar_state(self, state: dict[str, Any]) -> dict[str, Any]: state["updated_at"] = now_iso() self.sidebar_state = state SIDEBAR_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8") return state def register_socket_chat(self, websocket: WebSocket, chat_id: str) -> None: socket_key = id(websocket) self.socket_chats.setdefault(socket_key, set()).add(chat_id) self.chat_sockets.setdefault(chat_id, set()).add(websocket) def unregister_socket(self, websocket: WebSocket) -> None: socket_key = id(websocket) chat_ids = self.socket_chats.pop(socket_key, set()) for chat_id in chat_ids: sockets = self.chat_sockets.get(chat_id) if not sockets: continue sockets.discard(websocket) if not sockets: self.chat_sockets.pop(chat_id, None) async def send_json(self, websocket: WebSocket, payload: dict[str, Any]) -> None: await websocket.send_text(json.dumps(payload, ensure_ascii=False)) async def broadcast_chat(self, chat_id: str, payload: dict[str, Any]) -> None: sockets = list(self.chat_sockets.get(chat_id, set())) stale: list[WebSocket] = [] for websocket in sockets: try: await self.send_json(websocket, payload) except Exception: stale.append(websocket) for websocket in stale: self.unregister_socket(websocket) def build_provider(force_offline: bool = False): load_core_agent_env(WORKSPACE) apply_compat_env_aliases() config = build_core_agent_config() if force_offline: return RuleBasedMeetingProvider(), config, "offline" require_model_config(config) return ( OpenAICompatibleProvider( model=config.model, api_key=config.api_key, base_url=config.base_url, timeout=config.timeout, temperature=config.temperature, ), config, "openai-compatible", ) def build_agent(force_offline: bool = False) -> tuple[CoreAgent, Any, str]: provider, config, mode = build_provider(force_offline=force_offline) agent = CoreAgent( provider=provider, workspace=WORKSPACE, skill_dirs=[WORKSPACE / "skills"], tool_registry=build_default_registry(DATA_DIR), max_iterations=config.max_iterations, ) return agent, config, mode def list_knowledge_base_cards(data_dir: Path) -> list[dict[str, Any]]: cards: list[dict[str, Any]] = [] if not data_dir.exists(): return cards for path in sorted(data_dir.iterdir()): if not path.is_dir(): continue state_path = path / "state.json" if not state_path.exists(): continue state = json.loads(state_path.read_text(encoding="utf-8")) meetings = state.get("meetings", []) latest = meetings[-1] if meetings else {} cards.append( { "knowledge_base_id": path.name, "title": path.name, "meeting_count": len(meetings), "last_meeting_date": state.get("last_meeting_date", ""), "updated_at": state.get("last_updated", ""), "latest_summary": latest.get("summary", [])[:3], "counts": { "metrics": len(state.get("current_metrics", [])), "actions": len(state.get("open_actions", [])), "risks": len(state.get("risks", [])), "followups": len(state.get("followups", [])), }, } ) return cards def default_knowledge_base_id(data_dir: Path) -> str | None: cards = list_knowledge_base_cards(data_dir) if len(cards) == 1: return cards[0]["knowledge_base_id"] return None def _read_auth_token(request: Request, authorization: str | None = Header(default=None)) -> str | None: if authorization and authorization.lower().startswith("bearer "): return authorization.split(" ", 1)[1].strip() token = request.query_params.get("token") return token.strip() if token else None def _require_token(request: Request, gateway: GatewayState, authorization: str | None = Header(default=None)) -> None: token = _read_auth_token(request, authorization) if token != gateway.token: raise HTTPException(status_code=401, detail="Unauthorized") def _minimal_settings(gateway: GatewayState) -> dict[str, Any]: model_name = getattr(gateway.config, "model", "") or "" provider_name = "openai-compatible" if gateway.mode != "offline" else "offline" return { "agent": { "model": model_name, "provider": provider_name, "resolved_provider": provider_name, "has_api_key": bool(getattr(gateway.config, "api_key", "") or gateway.mode == "offline"), "model_preset": "default", "max_tokens": 0, "context_window_tokens": 0, "temperature": getattr(gateway.config, "temperature", 0.2), "reasoning_effort": None, "timezone": "Asia/Shanghai", "bot_name": "MeetingAgent", "bot_icon": "MK", "tool_hint_max_length": 160, }, "model_presets": [ { "name": "default", "label": "Default", "active": True, "is_default": True, "model": model_name, "provider": provider_name, "max_tokens": 0, "context_window_tokens": 0, "temperature": getattr(gateway.config, "temperature", 0.2), "reasoning_effort": None, "reasoning_effort_values": [], } ], "providers": [ { "name": provider_name, "label": provider_name, "configured": True, "auth_type": "api_key", "api_key_required": gateway.mode != "offline", "api_key_hint": "OPENAI_API_KEY", "api_base": getattr(gateway.config, "base_url", None), "default_api_base": getattr(gateway.config, "base_url", None), "model_selectable": True, "api_type": "chat_completions", } ], "web_search": {"provider": "none", "max_results": 5, "timeout": 20, "providers": []}, "web": {"enable": False, "search": {"max_results": 5, "timeout": 20}, "fetch": {"use_jina_reader": False}}, "image_generation": { "enabled": False, "provider": "none", "provider_configured": False, "model": "", "default_aspect_ratio": "1:1", "default_image_size": "1024x1024", "max_images_per_turn": 1, "save_dir": str(WORKSPACE / "generated_images"), "providers": [], }, "transcription": { "enabled": False, "provider": "none", "provider_configured": False, "model": "", "language": "zh", "max_duration_sec": 0, "max_upload_mb": 0, "providers": [], }, "runtime": { "config_path": str(WORKSPACE / ".env"), "workspace_path": str(WORKSPACE), "gateway_host": "127.0.0.1", "gateway_port": 8010, "heartbeat": {"enabled": False, "interval_s": 0, "keep_recent_messages": 0}, "dream": {"schedule": ""}, "unified_session": False, }, "advanced": { "restrict_to_workspace": True, "webui_allow_local_service_access": True, "allow_local_preview_access": True, "webui_default_access_mode": "default", "private_service_protection_enabled": False, "ssrf_whitelist_count": 0, "mcp_server_count": 0, "exec_enabled": False, "exec_sandbox": "workspace-write", "exec_path_prepend_set": False, "exec_path_append_set": False, }, "requires_restart": False, "runtime_surface": "browser", "runtime_capabilities": { "can_restart_engine": False, "can_pick_folder": False, "can_open_logs": False, "can_export_diagnostics": False, }, "version": {"current": "0.1.0"}, } def create_app(force_offline: bool = False) -> FastAPI: agent, config, mode = build_agent(force_offline=force_offline) gateway = GatewayState(agent=agent, config=config, mode=mode) app = FastAPI(title="Meeting Knowledge WebUI Gateway", version="0.2.0") app.state.gateway = gateway app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) if FRONTEND_DIST_DIR.exists(): assets_dir = FRONTEND_DIST_DIR / "assets" brand_dir = FRONTEND_DIST_DIR / "brand" if assets_dir.exists(): app.mount("/assets", StaticFiles(directory=assets_dir), name="assets") if brand_dir.exists(): app.mount("/brand", StaticFiles(directory=brand_dir), name="brand") @app.get("/webui/bootstrap") def webui_bootstrap() -> dict[str, Any]: return { "token": gateway.token, "ws_path": "/ws", "expires_in": 24 * 3600, "model_name": getattr(config, "model", "") or None, "runtime_surface": "browser", "runtime_capabilities": { "can_restart_engine": False, "can_pick_folder": False, "can_open_logs": False, "can_export_diagnostics": False, }, } @app.get("/api/runtime") def runtime_info() -> dict[str, Any]: return { "mode": mode, "model_name": getattr(config, "model", "") or None, "default_knowledge_base_id": default_knowledge_base_id(DATA_DIR), "workspace": str(WORKSPACE), } @app.get("/api/knowledge-bases") def list_knowledge_bases() -> dict[str, Any]: cards = list_knowledge_base_cards(DATA_DIR) return { "knowledge_bases": cards, "default_knowledge_base_id": default_knowledge_base_id(DATA_DIR), } @app.get("/api/sessions") def list_sessions(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) rows = [record.to_summary() for record in gateway.chat_store.list_records()] return {"sessions": rows} @app.get("/api/sessions/{session_key}/webui-thread") def fetch_webui_thread(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) record = gateway.chat_store.get_by_key(unquote(session_key)) return { "schemaVersion": 1, "sessionKey": record.key, "savedAt": record.updated_at, "messages": record.ui_messages, "fork_boundary_message_count": None, "has_pending_tool_calls": record.running, "page": { "before_cursor": None, "has_more_before": False, "loaded_message_count": len(record.ui_messages), "total_known_message_count": len(record.ui_messages), "user_message_offset": 0, }, "workspace_scope": record.workspace_scope, } @app.get("/api/sessions/{session_key}/automations") def fetch_session_automations(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"jobs": []} @app.get("/api/webui/automations") def fetch_automations(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"jobs": []} @app.get("/api/webui/skills") def fetch_skills(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"skills": []} @app.get("/api/settings") def fetch_settings(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return _minimal_settings(gateway) @app.get("/api/settings/usage") def fetch_settings_usage(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return { "days": [], "total_tokens": 0, "total_tokens_30d": 0, "total_tokens_365d": 0, "peak_day_tokens": 0, "current_streak_days": 0, "longest_streak_days": 0, "active_days_30d": 0, "requests_30d": 0, "updated_at": now_iso(), } @app.get("/api/settings/version-check") def check_version(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"updateAvailable": None} @app.get("/api/settings/provider-models") def provider_models(request: Request, authorization: str | None = Header(default=None), provider: str = Query(default="openai-compatible")) -> dict[str, Any]: _require_token(request, gateway, authorization) model_name = getattr(config, "model", "") or "" return { "provider": provider, "label": provider, "status": "available", "catalog_kind": "local", "models": [{"id": model_name, "label": model_name, "owned_by": provider}], "model_count": 1 if model_name else 0, "fetched_at": int(time.time()), } @app.get("/api/settings/cli-apps") def cli_apps(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"apps": [], "installed_count": 0} @app.get("/api/settings/mcp-presets") def mcp_presets(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"presets": [], "installed_count": 0} @app.get("/api/commands") def list_commands(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return {"commands": []} @app.get("/api/workspaces") def workspaces(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return { "schema_version": 1, "default_access_mode": "default", "default_scope": { "project_path": str(WORKSPACE), "project_name": WORKSPACE.name, "access_mode": "restricted", "restrict_to_workspace": True, }, "controls": { "can_change_project": True, "can_use_full_access": False, }, } @app.get("/api/webui/sidebar-state") def fetch_sidebar_state(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) return gateway.sidebar_state @app.get("/api/webui/sidebar-state/update") def update_sidebar_state( request: Request, state: str, authorization: str | None = Header(default=None), ) -> dict[str, Any]: _require_token(request, gateway, authorization) payload = json.loads(state) return gateway.save_sidebar_state(payload) @app.get("/api/sessions/{session_key}/delete") def delete_session(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]: _require_token(request, gateway, authorization) record = gateway.chat_store.get_by_key(unquote(session_key)) with gateway.chat_store._lock: gateway.chat_store._records.pop(record.chat_id, None) return {"deleted": True} @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket, token: str = Query(default="")) -> None: if token != gateway.token: await websocket.close(code=4401) return await websocket.accept() client_id = uuid.uuid4().hex await gateway.send_json(websocket, {"event": "ready", "chat_id": "home", "client_id": client_id}) try: while True: raw = await websocket.receive_text() payload = json.loads(raw) frame_type = payload.get("type") if frame_type == "new_chat": workspace_scope = payload.get("workspace_scope") knowledge_base_id = str(payload.get("knowledge_base_id") or "").strip() or None try: _validate_workspace_scope(workspace_scope) except ValueError as exc: await gateway.send_json( websocket, { "event": "error", "chat_id": "home", "detail": "workspace_scope_rejected", "reason": str(exc), }, ) continue record = gateway.chat_store.create_chat(knowledge_base_id=knowledge_base_id) if workspace_scope: gateway.chat_store.set_workspace_scope(record.chat_id, workspace_scope) gateway.register_socket_chat(websocket, record.chat_id) await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id}) await gateway.broadcast_chat( record.chat_id, { "event": "session_updated", "chat_id": record.chat_id, "scope": "metadata", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) continue if frame_type == "fork_chat": source_chat_id = str(payload.get("source_chat_id") or "") before_user_index = int(payload.get("before_user_index") or 0) record = gateway.chat_store.fork_chat(source_chat_id, before_user_index) gateway.register_socket_chat(websocket, record.chat_id) await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id}) await gateway.broadcast_chat( record.chat_id, { "event": "session_updated", "chat_id": record.chat_id, "scope": "thread", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) continue if frame_type == "attach": chat_id = str(payload.get("chat_id") or "") record = gateway.chat_store.get_by_chat_id(chat_id) gateway.register_socket_chat(websocket, chat_id) await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id}) continue if frame_type == "set_workspace_scope": chat_id = str(payload.get("chat_id") or "") workspace_scope = payload.get("workspace_scope") try: record = gateway.chat_store.set_workspace_scope(chat_id, workspace_scope) except ValueError as exc: await gateway.broadcast_chat( chat_id, { "event": "error", "chat_id": chat_id, "detail": "workspace_scope_rejected", "reason": str(exc), }, ) continue await gateway.broadcast_chat( chat_id, { "event": "session_updated", "chat_id": chat_id, "scope": "metadata", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) continue if frame_type == "message": chat_id = str(payload.get("chat_id") or "") record = gateway.chat_store.get_by_chat_id(chat_id) gateway.register_socket_chat(websocket, chat_id) content = str(payload.get("content") or "") if content.strip() == "/stop": if record.running: record.session.request_cancel() else: await gateway.broadcast_chat( chat_id, { "event": "turn_end", "chat_id": chat_id, "latency_ms": 0, }, ) await gateway.broadcast_chat( chat_id, { "event": "goal_status", "chat_id": chat_id, "status": "idle", }, ) continue if record.running: await gateway.broadcast_chat( chat_id, { "event": "error", "chat_id": chat_id, "detail": "A turn is already running for this chat.", }, ) continue knowledge_base_id = str(payload.get("knowledge_base_id") or "").strip() or None attachments = payload.get("attachments") attachment_list = attachments if isinstance(attachments, list) else [] saved_attachments = _persist_text_attachments(chat_id, attachment_list) content_for_agent = _augment_message_with_saved_attachments(content, saved_attachments) user_media = [ { "kind": "file", "name": item.get("name"), } for item in saved_attachments ] if "knowledge_base_id" in payload: record = gateway.chat_store.set_knowledge_base(chat_id, knowledge_base_id) turn_id = payload.get("turn_id") or uuid.uuid4().hex gateway.chat_store.append_user_message( record, content, turn_id, media=user_media or None, ) await gateway.broadcast_chat( chat_id, { "event": "goal_status", "chat_id": chat_id, "status": "running", "started_at": time.time(), }, ) await gateway.broadcast_chat( chat_id, { "event": "session_updated", "chat_id": chat_id, "scope": "thread", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) record.run_task = asyncio.create_task(run_agent_turn(gateway, record, content_for_agent, turn_id)) continue except WebSocketDisconnect: gateway.unregister_socket(websocket) except Exception: gateway.unregister_socket(websocket) try: await websocket.close(code=1011) except Exception: pass @app.get("/{full_path:path}", response_model=None) def serve_frontend(full_path: str): if not FRONTEND_DIST_DIR.exists(): return JSONResponse( status_code=503, content={ "detail": "Meeting Knowledge WebUI build is missing. Run `npm install` and `npm run build` in frontend_nanobot first." }, ) index_path = FRONTEND_DIST_DIR / "index.html" return FileResponse(index_path) return app async def run_agent_turn(gateway: GatewayState, record: ChatRecord, content: str, turn_id: str) -> None: record.running = True record.current_turn_id = turn_id record.session.clear_cancel() if record.knowledge_base_id: record.session.session_state["last_knowledge_base_id"] = record.knowledge_base_id record.session.session_state["last_team_id"] = record.knowledge_base_id started_at = time.time() try: loop = asyncio.get_running_loop() queue: asyncio.Queue[object] = asyncio.Queue() sentinel = object() def live_event_sink(event: dict[str, Any]) -> None: loop.call_soon_threadsafe(queue.put_nowait, event) record.session.session_state["_live_event_sink"] = live_event_sink def produce_events() -> None: try: for event in record.session.stream_ask(content): loop.call_soon_threadsafe(queue.put_nowait, event) except Exception as exc: loop.call_soon_threadsafe(queue.put_nowait, exc) finally: loop.call_soon_threadsafe(queue.put_nowait, sentinel) producer_task = asyncio.create_task(asyncio.to_thread(produce_events)) tool_seq = 1 final_text = "" stream_open = False content_seen = False while True: item = await queue.get() if item is sentinel: break if isinstance(item, Exception): raise item if isinstance(item, dict) and item.get("kind") == "subagent_progress": payload = item.get("payload") or {} text = str(payload.get("text") or "") if text: await gateway.broadcast_chat( record.chat_id, { "event": "message", "chat_id": record.chat_id, "text": text, "kind": "progress", "agent_ui": { "kind": str(payload.get("kind") or "subagent_status"), "data": payload, }, "turn_id": turn_id, "turn_phase": "activity", "turn_seq": tool_seq, }, ) continue event = item if event.type == "reasoning_delta" and event.delta: await gateway.broadcast_chat( record.chat_id, { "event": "reasoning_delta", "chat_id": record.chat_id, "text": event.delta, "stream_id": f"{turn_id}:{event.iteration}", "turn_id": turn_id, "turn_phase": "reasoning", "turn_seq": max(1, event.iteration * 10 - 2), }, ) stream_open = True if gateway.mode == "offline": await asyncio.sleep(0.012) elif event.type == "content_delta" and event.delta: content_seen = True await gateway.broadcast_chat( record.chat_id, { "event": "delta", "chat_id": record.chat_id, "text": event.delta, "stream_id": f"{turn_id}:{event.iteration}", "turn_id": turn_id, "turn_phase": "answer", "turn_seq": max(1, event.iteration * 10 - 1), }, ) stream_open = True if gateway.mode == "offline": await asyncio.sleep(0.012) elif event.type == "assistant_turn": if stream_open: await gateway.broadcast_chat( record.chat_id, { "event": "reasoning_end", "chat_id": record.chat_id, "stream_id": f"{turn_id}:{event.iteration}", "turn_id": turn_id, "turn_phase": "reasoning", "turn_seq": max(1, event.iteration * 10), }, ) await gateway.broadcast_chat( record.chat_id, { "event": "stream_end", "chat_id": record.chat_id, "stream_id": f"{turn_id}:{event.iteration}", "text": event.turn_content or "", "turn_id": turn_id, "turn_phase": "answer", "turn_seq": max(1, event.iteration * 10 + 1), }, ) stream_open = False elif event.type == "tool_call": await gateway.broadcast_chat( record.chat_id, { "event": "message", "chat_id": record.chat_id, "text": "", "kind": "tool_hint", "tool_events": [ { "phase": "start", "call_id": f"{turn_id}:{tool_seq}", "name": event.tool_name, "arguments": event.tool_args, } ], "turn_id": turn_id, "turn_phase": "activity", "turn_seq": tool_seq, }, ) tool_seq += 1 elif event.type == "tool_result": result_obj: Any try: result_obj = json.loads(event.tool_result) except Exception: result_obj = event.tool_result await gateway.broadcast_chat( record.chat_id, { "event": "message", "chat_id": record.chat_id, "text": "", "kind": "progress", "tool_events": [ { "phase": "end", "call_id": f"{turn_id}:{max(tool_seq - 1, 1)}", "name": event.tool_name, "arguments": event.tool_args, "result": result_obj, } ], "turn_id": turn_id, "turn_phase": "activity", "turn_seq": tool_seq, }, ) elif event.type == "final": final_text = event.final_response or "" await producer_task latency_ms = max(1, int((time.time() - started_at) * 1000)) gateway.chat_store.append_assistant_message(record, final_text, turn_id, latency_ms) if not content_seen: await gateway.broadcast_chat( record.chat_id, { "event": "message", "chat_id": record.chat_id, "text": final_text, "turn_id": turn_id, "turn_phase": "answer", "turn_seq": tool_seq + 1, "latency_ms": latency_ms, }, ) await gateway.broadcast_chat( record.chat_id, { "event": "turn_end", "chat_id": record.chat_id, "turn_id": turn_id, "turn_phase": "complete", "turn_seq": tool_seq + 2, "latency_ms": latency_ms, "goal_state": {"active": False, "ui_summary": ""}, }, ) await gateway.broadcast_chat( record.chat_id, { "event": "goal_status", "chat_id": record.chat_id, "status": "idle", }, ) await gateway.broadcast_chat( record.chat_id, { "event": "session_updated", "chat_id": record.chat_id, "scope": "thread", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) except asyncio.CancelledError: await gateway.broadcast_chat( record.chat_id, { "event": "turn_end", "chat_id": record.chat_id, "turn_id": turn_id, "turn_phase": "complete", "turn_seq": tool_seq + 2, "latency_ms": 0, "goal_state": {"active": False, "ui_summary": ""}, }, ) await gateway.broadcast_chat( record.chat_id, { "event": "goal_status", "chat_id": record.chat_id, "status": "idle", }, ) await gateway.broadcast_chat( record.chat_id, { "event": "session_updated", "chat_id": record.chat_id, "scope": "thread", "workspace_scope": record.workspace_scope, "knowledge_base_id": record.knowledge_base_id, }, ) raise except Exception as exc: await gateway.broadcast_chat( record.chat_id, { "event": "error", "chat_id": record.chat_id, "detail": str(exc), }, ) await gateway.broadcast_chat( record.chat_id, { "event": "goal_status", "chat_id": record.chat_id, "status": "idle", }, ) finally: record.session.session_state.pop("_live_event_sink", None) record.running = False record.current_turn_id = None record.run_task = None record.updated_at = now_iso() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Meeting Knowledge WebUI server") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8010) parser.add_argument("--offline", action="store_true") return parser.parse_args() def main() -> None: args = parse_args() import uvicorn uvicorn.run( create_app(force_offline=args.offline), host=args.host, port=args.port, log_level="info", ) if __name__ == "__main__": main()