meeting_memory/webui_server.py

1294 lines
50 KiB
Python
Raw Permalink Normal View History

2026-06-24 07:05:19 +00:00
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import secrets
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from threading import Lock
from typing import Any
from urllib.parse import unquote
from fastapi import FastAPI, Header, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from agent import CoreAgent
from core_agent.config import (
apply_compat_env_aliases,
build_core_agent_config,
load_core_agent_env,
require_model_config,
)
from core_agent.session import ConversationSession
from meeting_memory.service import MeetingKnowledgeService
from providers.openai_compatible import OpenAICompatibleProvider
from providers.rule_based import RuleBasedMeetingProvider
from tools.default_tools import build_default_registry
WORKSPACE = Path(__file__).resolve().parent
DATA_DIR = WORKSPACE / "data"
FRONTEND_DIST_DIR = WORKSPACE / "webui" / "dist"
SIDEBAR_STATE_PATH = WORKSPACE / ".webui_sidebar_state.json"
UPLOADS_DIR = WORKSPACE / "agent_memory" / "uploads"
def now_iso() -> str:
return datetime.now().isoformat(timespec="seconds")
def now_ms() -> int:
return int(time.time() * 1000)
def _history_to_ui_messages(history: list[dict[str, Any]]) -> list[dict[str, Any]]:
messages: list[dict[str, Any]] = []
created_at = now_ms()
for item in history:
role = str(item.get("role", ""))
content = str(item.get("content", ""))
if role not in {"user", "assistant"} or not content:
continue
messages.append(
{
"id": uuid.uuid4().hex,
"role": role,
"content": content,
"createdAt": created_at,
"turnId": f"persisted_{len(messages)}",
"turnPhase": "user" if role == "user" else "answer",
"turnSeq": 0 if role == "user" else 99,
}
)
created_at += 1
return messages
def split_session_key(key: str) -> tuple[str, str]:
if ":" in key:
channel, chat_id = key.split(":", 1)
return channel, chat_id
return "websocket", key
def default_sidebar_state() -> dict[str, Any]:
return {
"schema_version": 1,
"project_paths": [],
"pinned_keys": [],
"archived_keys": [],
"title_overrides": {},
"project_name_overrides": {},
"tags_by_key": {},
"collapsed_groups": {},
"view": {
"density": "comfortable",
"show_previews": False,
"show_timestamps": False,
"show_archived": False,
"sort": "updated_desc",
},
"updated_at": now_iso(),
}
def _sanitize_upload_name(name: str, fallback_stem: str = "attachment") -> str:
raw = Path(name or "").name
if not raw:
raw = f"{fallback_stem}.txt"
safe = "".join(ch if ch not in '<>:"/\\|?*' else "_" for ch in raw).strip().strip(".")
return safe or f"{fallback_stem}.txt"
def _persist_text_attachments(chat_id: str, attachments: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not attachments:
return []
target_dir = UPLOADS_DIR / chat_id
target_dir.mkdir(parents=True, exist_ok=True)
saved: list[dict[str, Any]] = []
for index, item in enumerate(attachments, start=1):
name = _sanitize_upload_name(str(item.get("name") or f"attachment_{index}.txt"))
content = str(item.get("content") or "")
kind = str(item.get("kind") or "text/plain")
stem = Path(name).stem or f"attachment_{index}"
suffix = Path(name).suffix or ".txt"
path = target_dir / f"{int(time.time() * 1000)}_{index}_{stem}{suffix}"
path.write_text(content, encoding="utf-8")
saved.append(
{
"name": name,
"kind": kind,
"path": str(path),
"size": len(content.encode("utf-8")),
}
)
return saved
def _augment_message_with_saved_attachments(content: str, saved_attachments: list[dict[str, Any]]) -> str:
if not saved_attachments:
return content
lines = [content.rstrip(), "", "附加文本文件已保存,可直接读取或导入:"]
for item in saved_attachments:
lines.append(f"- {item['name']}: {item['path']}")
return "\n".join(line for line in lines if line is not None)
def _validate_workspace_scope(workspace_scope: dict[str, Any] | None) -> None:
project_path = str((workspace_scope or {}).get("project_path") or "").strip()
if not project_path or project_path.startswith("knowledge-base://"):
return
candidate = Path(project_path).expanduser().resolve()
if not candidate.exists() or not candidate.is_dir():
raise ValueError("project_path must be an existing directory")
@dataclass(slots=True)
class ChatRecord:
chat_id: str
session: ConversationSession
title: str = ""
preview: str = ""
knowledge_base_id: str | None = None
created_at: str = field(default_factory=now_iso)
updated_at: str = field(default_factory=now_iso)
ui_messages: list[dict[str, Any]] = field(default_factory=list)
workspace_scope: dict[str, Any] | None = None
running: bool = False
current_turn_id: str | None = None
run_task: asyncio.Task[Any] | None = None
@property
def key(self) -> str:
return f"websocket:{self.chat_id}"
def to_summary(self) -> dict[str, Any]:
return {
"key": self.key,
"created_at": self.created_at,
"updated_at": self.updated_at,
"title": self.title,
"preview": self.preview,
"run_started_at": int(time.time()) if self.running else None,
"workspace_scope": self.workspace_scope,
"knowledge_base_id": self.knowledge_base_id,
}
class ChatStore:
def __init__(self, agent: CoreAgent, data_dir: Path) -> None:
self._agent = agent
self._data_dir = data_dir
self._lock = Lock()
self._records: dict[str, ChatRecord] = {}
def _seed_chat_id(self, knowledge_base_id: str) -> str:
digest = hashlib.md5(knowledge_base_id.encode("utf-8")).hexdigest()[:12]
return f"kb_{digest}"
def _default_preview(self, knowledge_base_id: str | None) -> str:
if knowledge_base_id:
return f"围绕知识库「{knowledge_base_id}」开始提问"
return ""
def _new_session(self, knowledge_base_id: str | None = None) -> ConversationSession:
session_id = knowledge_base_id or f"chat_{uuid.uuid4().hex[:12]}"
session = self._agent.new_session(session_id=session_id)
if knowledge_base_id:
session.session_state["last_knowledge_base_id"] = knowledge_base_id
session.session_state["last_team_id"] = knowledge_base_id
return session
def _create_record(self, chat_id: str, knowledge_base_id: str | None, title: str = "") -> ChatRecord:
session = self._new_session(knowledge_base_id or chat_id)
record = ChatRecord(
chat_id=chat_id,
session=session,
title=title or (knowledge_base_id or ""),
preview=self._default_preview(knowledge_base_id),
knowledge_base_id=knowledge_base_id,
workspace_scope=None,
)
record.ui_messages = _history_to_ui_messages(session.persisted_dialog_messages())
if record.ui_messages and not record.preview:
record.preview = str(record.ui_messages[-1].get("content", ""))[:120]
self._records[chat_id] = record
return record
def _apply_workspace_scope(self, record: ChatRecord, workspace_scope: dict[str, Any] | None) -> None:
record.workspace_scope = workspace_scope
project_path = str((workspace_scope or {}).get("project_path") or "").strip()
if project_path and not project_path.startswith("knowledge-base://"):
candidate = Path(project_path).expanduser().resolve()
record.session.workspace = candidate
record.session.session_state["workspace"] = str(candidate)
else:
record.session.workspace = WORKSPACE.resolve()
record.session.session_state["workspace"] = str(WORKSPACE.resolve())
def ensure_seed_records(self) -> None:
meeting_service = MeetingKnowledgeService(self._data_dir, workspace=WORKSPACE)
for knowledge_base_id in meeting_service.list_knowledge_bases().get("knowledge_bases", []):
chat_id = self._seed_chat_id(knowledge_base_id)
with self._lock:
if chat_id not in self._records:
self._create_record(chat_id, knowledge_base_id, title=knowledge_base_id)
def list_records(self) -> list[ChatRecord]:
self.ensure_seed_records()
with self._lock:
return sorted(
self._records.values(),
key=lambda item: (item.updated_at, item.created_at, item.title),
reverse=True,
)
def create_chat(self, knowledge_base_id: str | None = None) -> ChatRecord:
with self._lock:
chat_id = f"chat_{uuid.uuid4().hex[:12]}"
return self._create_record(chat_id, knowledge_base_id)
def fork_chat(self, source_chat_id: str, before_user_index: int) -> ChatRecord:
with self._lock:
source = self._records[source_chat_id]
fork = self._create_record(
f"chat_{uuid.uuid4().hex[:12]}",
source.knowledge_base_id,
title=source.title,
)
user_seen = 0
copied: list[dict[str, Any]] = []
for message in source.ui_messages:
if message.get("role") == "user":
if user_seen >= before_user_index:
break
user_seen += 1
copied.append(json.loads(json.dumps(message)))
fork.ui_messages = copied
fork.preview = source.preview
return fork
def get_by_chat_id(self, chat_id: str) -> ChatRecord:
self.ensure_seed_records()
with self._lock:
if chat_id not in self._records:
return self._create_record(chat_id, knowledge_base_id=None)
return self._records[chat_id]
def get_by_key(self, key: str) -> ChatRecord:
_, chat_id = split_session_key(key)
return self.get_by_chat_id(chat_id)
def set_workspace_scope(self, chat_id: str, workspace_scope: dict[str, Any] | None) -> ChatRecord:
record = self.get_by_chat_id(chat_id)
self._apply_workspace_scope(record, workspace_scope)
record.updated_at = now_iso()
return record
def set_knowledge_base(self, chat_id: str, knowledge_base_id: str | None) -> ChatRecord:
record = self.get_by_chat_id(chat_id)
cleaned = (knowledge_base_id or "").strip() or None
record.knowledge_base_id = cleaned
if cleaned:
record.session.session_state["last_knowledge_base_id"] = cleaned
record.session.session_state["last_team_id"] = cleaned
else:
record.session.session_state.pop("last_knowledge_base_id", None)
record.session.session_state.pop("last_team_id", None)
record.updated_at = now_iso()
return record
def append_user_message(
self,
record: ChatRecord,
content: str,
turn_id: str | None,
*,
media: list[dict[str, Any]] | None = None,
) -> None:
text = content.strip()
record.updated_at = now_iso()
if text:
record.preview = text[:120]
if not record.title and not record.knowledge_base_id:
record.title = text[:32]
record.ui_messages.append(
{
"id": uuid.uuid4().hex,
"role": "user",
"content": content,
"createdAt": now_ms(),
"turnId": turn_id,
"turnPhase": "user",
"turnSeq": 0,
**({"media": media} if media else {}),
}
)
def append_assistant_message(self, record: ChatRecord, content: str, turn_id: str | None, latency_ms: int) -> None:
record.updated_at = now_iso()
record.ui_messages.append(
{
"id": uuid.uuid4().hex,
"role": "assistant",
"content": content,
"createdAt": now_ms(),
"turnId": turn_id,
"turnPhase": "answer",
"turnSeq": 99,
"latencyMs": latency_ms,
}
)
class GatewayState:
def __init__(self, *, agent: CoreAgent, config: Any, mode: str) -> None:
self.agent = agent
self.config = config
self.mode = mode
self.chat_store = ChatStore(agent, DATA_DIR)
self.token = secrets.token_urlsafe(24)
self.sidebar_state = self._load_sidebar_state()
self.socket_chats: dict[int, set[str]] = {}
self.chat_sockets: dict[str, set[WebSocket]] = {}
def _load_sidebar_state(self) -> dict[str, Any]:
if SIDEBAR_STATE_PATH.exists():
try:
return json.loads(SIDEBAR_STATE_PATH.read_text(encoding="utf-8"))
except Exception:
pass
return default_sidebar_state()
def save_sidebar_state(self, state: dict[str, Any]) -> dict[str, Any]:
state["updated_at"] = now_iso()
self.sidebar_state = state
SIDEBAR_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
return state
def register_socket_chat(self, websocket: WebSocket, chat_id: str) -> None:
socket_key = id(websocket)
self.socket_chats.setdefault(socket_key, set()).add(chat_id)
self.chat_sockets.setdefault(chat_id, set()).add(websocket)
def unregister_socket(self, websocket: WebSocket) -> None:
socket_key = id(websocket)
chat_ids = self.socket_chats.pop(socket_key, set())
for chat_id in chat_ids:
sockets = self.chat_sockets.get(chat_id)
if not sockets:
continue
sockets.discard(websocket)
if not sockets:
self.chat_sockets.pop(chat_id, None)
async def send_json(self, websocket: WebSocket, payload: dict[str, Any]) -> None:
await websocket.send_text(json.dumps(payload, ensure_ascii=False))
async def broadcast_chat(self, chat_id: str, payload: dict[str, Any]) -> None:
sockets = list(self.chat_sockets.get(chat_id, set()))
stale: list[WebSocket] = []
for websocket in sockets:
try:
await self.send_json(websocket, payload)
except Exception:
stale.append(websocket)
for websocket in stale:
self.unregister_socket(websocket)
def build_provider(force_offline: bool = False):
load_core_agent_env(WORKSPACE)
apply_compat_env_aliases()
config = build_core_agent_config()
if force_offline:
return RuleBasedMeetingProvider(), config, "offline"
require_model_config(config)
return (
OpenAICompatibleProvider(
model=config.model,
api_key=config.api_key,
base_url=config.base_url,
timeout=config.timeout,
temperature=config.temperature,
),
config,
"openai-compatible",
)
def build_agent(force_offline: bool = False) -> tuple[CoreAgent, Any, str]:
provider, config, mode = build_provider(force_offline=force_offline)
agent = CoreAgent(
provider=provider,
workspace=WORKSPACE,
skill_dirs=[WORKSPACE / "skills"],
tool_registry=build_default_registry(DATA_DIR),
max_iterations=config.max_iterations,
)
return agent, config, mode
def list_knowledge_base_cards(data_dir: Path) -> list[dict[str, Any]]:
cards: list[dict[str, Any]] = []
if not data_dir.exists():
return cards
for path in sorted(data_dir.iterdir()):
if not path.is_dir():
continue
state_path = path / "state.json"
if not state_path.exists():
continue
state = json.loads(state_path.read_text(encoding="utf-8"))
meetings = state.get("meetings", [])
latest = meetings[-1] if meetings else {}
cards.append(
{
"knowledge_base_id": path.name,
"title": path.name,
"meeting_count": len(meetings),
"last_meeting_date": state.get("last_meeting_date", ""),
"updated_at": state.get("last_updated", ""),
"latest_summary": latest.get("summary", [])[:3],
"counts": {
"metrics": len(state.get("current_metrics", [])),
"actions": len(state.get("open_actions", [])),
"risks": len(state.get("risks", [])),
"followups": len(state.get("followups", [])),
},
}
)
return cards
def default_knowledge_base_id(data_dir: Path) -> str | None:
cards = list_knowledge_base_cards(data_dir)
if len(cards) == 1:
return cards[0]["knowledge_base_id"]
return None
def _read_auth_token(request: Request, authorization: str | None = Header(default=None)) -> str | None:
if authorization and authorization.lower().startswith("bearer "):
return authorization.split(" ", 1)[1].strip()
token = request.query_params.get("token")
return token.strip() if token else None
def _require_token(request: Request, gateway: GatewayState, authorization: str | None = Header(default=None)) -> None:
token = _read_auth_token(request, authorization)
if token != gateway.token:
raise HTTPException(status_code=401, detail="Unauthorized")
def _minimal_settings(gateway: GatewayState) -> dict[str, Any]:
model_name = getattr(gateway.config, "model", "") or ""
provider_name = "openai-compatible" if gateway.mode != "offline" else "offline"
return {
"agent": {
"model": model_name,
"provider": provider_name,
"resolved_provider": provider_name,
"has_api_key": bool(getattr(gateway.config, "api_key", "") or gateway.mode == "offline"),
"model_preset": "default",
"max_tokens": 0,
"context_window_tokens": 0,
"temperature": getattr(gateway.config, "temperature", 0.2),
"reasoning_effort": None,
"timezone": "Asia/Shanghai",
"bot_name": "MeetingAgent",
"bot_icon": "MK",
"tool_hint_max_length": 160,
},
"model_presets": [
{
"name": "default",
"label": "Default",
"active": True,
"is_default": True,
"model": model_name,
"provider": provider_name,
"max_tokens": 0,
"context_window_tokens": 0,
"temperature": getattr(gateway.config, "temperature", 0.2),
"reasoning_effort": None,
"reasoning_effort_values": [],
}
],
"providers": [
{
"name": provider_name,
"label": provider_name,
"configured": True,
"auth_type": "api_key",
"api_key_required": gateway.mode != "offline",
"api_key_hint": "OPENAI_API_KEY",
"api_base": getattr(gateway.config, "base_url", None),
"default_api_base": getattr(gateway.config, "base_url", None),
"model_selectable": True,
"api_type": "chat_completions",
}
],
"web_search": {"provider": "none", "max_results": 5, "timeout": 20, "providers": []},
"web": {"enable": False, "search": {"max_results": 5, "timeout": 20}, "fetch": {"use_jina_reader": False}},
"image_generation": {
"enabled": False,
"provider": "none",
"provider_configured": False,
"model": "",
"default_aspect_ratio": "1:1",
"default_image_size": "1024x1024",
"max_images_per_turn": 1,
"save_dir": str(WORKSPACE / "generated_images"),
"providers": [],
},
"transcription": {
"enabled": False,
"provider": "none",
"provider_configured": False,
"model": "",
"language": "zh",
"max_duration_sec": 0,
"max_upload_mb": 0,
"providers": [],
},
"runtime": {
"config_path": str(WORKSPACE / ".env"),
"workspace_path": str(WORKSPACE),
"gateway_host": "127.0.0.1",
"gateway_port": 8010,
"heartbeat": {"enabled": False, "interval_s": 0, "keep_recent_messages": 0},
"dream": {"schedule": ""},
"unified_session": False,
},
"advanced": {
"restrict_to_workspace": True,
"webui_allow_local_service_access": True,
"allow_local_preview_access": True,
"webui_default_access_mode": "default",
"private_service_protection_enabled": False,
"ssrf_whitelist_count": 0,
"mcp_server_count": 0,
"exec_enabled": False,
"exec_sandbox": "workspace-write",
"exec_path_prepend_set": False,
"exec_path_append_set": False,
},
"requires_restart": False,
"runtime_surface": "browser",
"runtime_capabilities": {
"can_restart_engine": False,
"can_pick_folder": False,
"can_open_logs": False,
"can_export_diagnostics": False,
},
"version": {"current": "0.1.0"},
}
def create_app(force_offline: bool = False) -> FastAPI:
agent, config, mode = build_agent(force_offline=force_offline)
gateway = GatewayState(agent=agent, config=config, mode=mode)
app = FastAPI(title="Meeting Knowledge WebUI Gateway", version="0.2.0")
app.state.gateway = gateway
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
if FRONTEND_DIST_DIR.exists():
assets_dir = FRONTEND_DIST_DIR / "assets"
brand_dir = FRONTEND_DIST_DIR / "brand"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=assets_dir), name="assets")
if brand_dir.exists():
app.mount("/brand", StaticFiles(directory=brand_dir), name="brand")
@app.get("/webui/bootstrap")
def webui_bootstrap() -> dict[str, Any]:
return {
"token": gateway.token,
"ws_path": "/ws",
"expires_in": 24 * 3600,
"model_name": getattr(config, "model", "") or None,
"runtime_surface": "browser",
"runtime_capabilities": {
"can_restart_engine": False,
"can_pick_folder": False,
"can_open_logs": False,
"can_export_diagnostics": False,
},
}
@app.get("/api/runtime")
def runtime_info() -> dict[str, Any]:
return {
"mode": mode,
"model_name": getattr(config, "model", "") or None,
"default_knowledge_base_id": default_knowledge_base_id(DATA_DIR),
"workspace": str(WORKSPACE),
}
@app.get("/api/knowledge-bases")
def list_knowledge_bases() -> dict[str, Any]:
cards = list_knowledge_base_cards(DATA_DIR)
return {
"knowledge_bases": cards,
"default_knowledge_base_id": default_knowledge_base_id(DATA_DIR),
}
@app.get("/api/sessions")
def list_sessions(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
rows = [record.to_summary() for record in gateway.chat_store.list_records()]
return {"sessions": rows}
@app.get("/api/sessions/{session_key}/webui-thread")
def fetch_webui_thread(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
record = gateway.chat_store.get_by_key(unquote(session_key))
return {
"schemaVersion": 1,
"sessionKey": record.key,
"savedAt": record.updated_at,
"messages": record.ui_messages,
"fork_boundary_message_count": None,
"has_pending_tool_calls": record.running,
"page": {
"before_cursor": None,
"has_more_before": False,
"loaded_message_count": len(record.ui_messages),
"total_known_message_count": len(record.ui_messages),
"user_message_offset": 0,
},
"workspace_scope": record.workspace_scope,
}
@app.get("/api/sessions/{session_key}/automations")
def fetch_session_automations(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"jobs": []}
@app.get("/api/webui/automations")
def fetch_automations(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"jobs": []}
@app.get("/api/webui/skills")
def fetch_skills(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"skills": []}
@app.get("/api/settings")
def fetch_settings(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return _minimal_settings(gateway)
@app.get("/api/settings/usage")
def fetch_settings_usage(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {
"days": [],
"total_tokens": 0,
"total_tokens_30d": 0,
"total_tokens_365d": 0,
"peak_day_tokens": 0,
"current_streak_days": 0,
"longest_streak_days": 0,
"active_days_30d": 0,
"requests_30d": 0,
"updated_at": now_iso(),
}
@app.get("/api/settings/version-check")
def check_version(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"updateAvailable": None}
@app.get("/api/settings/provider-models")
def provider_models(request: Request, authorization: str | None = Header(default=None), provider: str = Query(default="openai-compatible")) -> dict[str, Any]:
_require_token(request, gateway, authorization)
model_name = getattr(config, "model", "") or ""
return {
"provider": provider,
"label": provider,
"status": "available",
"catalog_kind": "local",
"models": [{"id": model_name, "label": model_name, "owned_by": provider}],
"model_count": 1 if model_name else 0,
"fetched_at": int(time.time()),
}
@app.get("/api/settings/cli-apps")
def cli_apps(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"apps": [], "installed_count": 0}
@app.get("/api/settings/mcp-presets")
def mcp_presets(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"presets": [], "installed_count": 0}
@app.get("/api/commands")
def list_commands(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {"commands": []}
@app.get("/api/workspaces")
def workspaces(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return {
"schema_version": 1,
"default_access_mode": "default",
"default_scope": {
"project_path": str(WORKSPACE),
"project_name": WORKSPACE.name,
"access_mode": "restricted",
"restrict_to_workspace": True,
},
"controls": {
"can_change_project": True,
"can_use_full_access": False,
},
}
@app.get("/api/webui/sidebar-state")
def fetch_sidebar_state(request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
return gateway.sidebar_state
@app.get("/api/webui/sidebar-state/update")
def update_sidebar_state(
request: Request,
state: str,
authorization: str | None = Header(default=None),
) -> dict[str, Any]:
_require_token(request, gateway, authorization)
payload = json.loads(state)
return gateway.save_sidebar_state(payload)
@app.get("/api/sessions/{session_key}/delete")
def delete_session(session_key: str, request: Request, authorization: str | None = Header(default=None)) -> dict[str, Any]:
_require_token(request, gateway, authorization)
record = gateway.chat_store.get_by_key(unquote(session_key))
with gateway.chat_store._lock:
gateway.chat_store._records.pop(record.chat_id, None)
return {"deleted": True}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(default="")) -> None:
if token != gateway.token:
await websocket.close(code=4401)
return
await websocket.accept()
client_id = uuid.uuid4().hex
await gateway.send_json(websocket, {"event": "ready", "chat_id": "home", "client_id": client_id})
try:
while True:
raw = await websocket.receive_text()
payload = json.loads(raw)
frame_type = payload.get("type")
if frame_type == "new_chat":
workspace_scope = payload.get("workspace_scope")
knowledge_base_id = str(payload.get("knowledge_base_id") or "").strip() or None
try:
_validate_workspace_scope(workspace_scope)
except ValueError as exc:
await gateway.send_json(
websocket,
{
"event": "error",
"chat_id": "home",
"detail": "workspace_scope_rejected",
"reason": str(exc),
},
)
continue
record = gateway.chat_store.create_chat(knowledge_base_id=knowledge_base_id)
if workspace_scope:
gateway.chat_store.set_workspace_scope(record.chat_id, workspace_scope)
gateway.register_socket_chat(websocket, record.chat_id)
await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id})
await gateway.broadcast_chat(
record.chat_id,
{
"event": "session_updated",
"chat_id": record.chat_id,
"scope": "metadata",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
continue
if frame_type == "fork_chat":
source_chat_id = str(payload.get("source_chat_id") or "")
before_user_index = int(payload.get("before_user_index") or 0)
record = gateway.chat_store.fork_chat(source_chat_id, before_user_index)
gateway.register_socket_chat(websocket, record.chat_id)
await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id})
await gateway.broadcast_chat(
record.chat_id,
{
"event": "session_updated",
"chat_id": record.chat_id,
"scope": "thread",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
continue
if frame_type == "attach":
chat_id = str(payload.get("chat_id") or "")
record = gateway.chat_store.get_by_chat_id(chat_id)
gateway.register_socket_chat(websocket, chat_id)
await gateway.send_json(websocket, {"event": "attached", "chat_id": record.chat_id})
continue
if frame_type == "set_workspace_scope":
chat_id = str(payload.get("chat_id") or "")
workspace_scope = payload.get("workspace_scope")
try:
record = gateway.chat_store.set_workspace_scope(chat_id, workspace_scope)
except ValueError as exc:
await gateway.broadcast_chat(
chat_id,
{
"event": "error",
"chat_id": chat_id,
"detail": "workspace_scope_rejected",
"reason": str(exc),
},
)
continue
await gateway.broadcast_chat(
chat_id,
{
"event": "session_updated",
"chat_id": chat_id,
"scope": "metadata",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
continue
if frame_type == "message":
chat_id = str(payload.get("chat_id") or "")
record = gateway.chat_store.get_by_chat_id(chat_id)
gateway.register_socket_chat(websocket, chat_id)
content = str(payload.get("content") or "")
if content.strip() == "/stop":
if record.running:
record.session.request_cancel()
else:
await gateway.broadcast_chat(
chat_id,
{
"event": "turn_end",
"chat_id": chat_id,
"latency_ms": 0,
},
)
await gateway.broadcast_chat(
chat_id,
{
"event": "goal_status",
"chat_id": chat_id,
"status": "idle",
},
)
continue
if record.running:
await gateway.broadcast_chat(
chat_id,
{
"event": "error",
"chat_id": chat_id,
"detail": "A turn is already running for this chat.",
},
)
continue
knowledge_base_id = str(payload.get("knowledge_base_id") or "").strip() or None
attachments = payload.get("attachments")
attachment_list = attachments if isinstance(attachments, list) else []
saved_attachments = _persist_text_attachments(chat_id, attachment_list)
content_for_agent = _augment_message_with_saved_attachments(content, saved_attachments)
user_media = [
{
"kind": "file",
"name": item.get("name"),
}
for item in saved_attachments
]
if "knowledge_base_id" in payload:
record = gateway.chat_store.set_knowledge_base(chat_id, knowledge_base_id)
turn_id = payload.get("turn_id") or uuid.uuid4().hex
gateway.chat_store.append_user_message(
record,
content,
turn_id,
media=user_media or None,
)
await gateway.broadcast_chat(
chat_id,
{
"event": "goal_status",
"chat_id": chat_id,
"status": "running",
"started_at": time.time(),
},
)
await gateway.broadcast_chat(
chat_id,
{
"event": "session_updated",
"chat_id": chat_id,
"scope": "thread",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
record.run_task = asyncio.create_task(run_agent_turn(gateway, record, content_for_agent, turn_id))
continue
except WebSocketDisconnect:
gateway.unregister_socket(websocket)
except Exception:
gateway.unregister_socket(websocket)
try:
await websocket.close(code=1011)
except Exception:
pass
@app.get("/{full_path:path}", response_model=None)
def serve_frontend(full_path: str):
if not FRONTEND_DIST_DIR.exists():
return JSONResponse(
status_code=503,
content={
"detail": "Meeting Knowledge WebUI build is missing. Run `npm install` and `npm run build` in frontend_nanobot first."
},
)
index_path = FRONTEND_DIST_DIR / "index.html"
return FileResponse(index_path)
return app
async def run_agent_turn(gateway: GatewayState, record: ChatRecord, content: str, turn_id: str) -> None:
record.running = True
record.current_turn_id = turn_id
record.session.clear_cancel()
if record.knowledge_base_id:
record.session.session_state["last_knowledge_base_id"] = record.knowledge_base_id
record.session.session_state["last_team_id"] = record.knowledge_base_id
started_at = time.time()
try:
loop = asyncio.get_running_loop()
queue: asyncio.Queue[object] = asyncio.Queue()
sentinel = object()
def live_event_sink(event: dict[str, Any]) -> None:
loop.call_soon_threadsafe(queue.put_nowait, event)
record.session.session_state["_live_event_sink"] = live_event_sink
def produce_events() -> None:
try:
for event in record.session.stream_ask(content):
loop.call_soon_threadsafe(queue.put_nowait, event)
except Exception as exc:
loop.call_soon_threadsafe(queue.put_nowait, exc)
finally:
loop.call_soon_threadsafe(queue.put_nowait, sentinel)
producer_task = asyncio.create_task(asyncio.to_thread(produce_events))
tool_seq = 1
final_text = ""
stream_open = False
content_seen = False
while True:
item = await queue.get()
if item is sentinel:
break
if isinstance(item, Exception):
raise item
if isinstance(item, dict) and item.get("kind") == "subagent_progress":
payload = item.get("payload") or {}
text = str(payload.get("text") or "")
if text:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "message",
"chat_id": record.chat_id,
"text": text,
"kind": "progress",
"agent_ui": {
"kind": str(payload.get("kind") or "subagent_status"),
"data": payload,
},
"turn_id": turn_id,
"turn_phase": "activity",
"turn_seq": tool_seq,
},
)
continue
event = item
if event.type == "reasoning_delta" and event.delta:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "reasoning_delta",
"chat_id": record.chat_id,
"text": event.delta,
"stream_id": f"{turn_id}:{event.iteration}",
"turn_id": turn_id,
"turn_phase": "reasoning",
"turn_seq": max(1, event.iteration * 10 - 2),
},
)
stream_open = True
if gateway.mode == "offline":
await asyncio.sleep(0.012)
elif event.type == "content_delta" and event.delta:
content_seen = True
await gateway.broadcast_chat(
record.chat_id,
{
"event": "delta",
"chat_id": record.chat_id,
"text": event.delta,
"stream_id": f"{turn_id}:{event.iteration}",
"turn_id": turn_id,
"turn_phase": "answer",
"turn_seq": max(1, event.iteration * 10 - 1),
},
)
stream_open = True
if gateway.mode == "offline":
await asyncio.sleep(0.012)
elif event.type == "assistant_turn":
if stream_open:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "reasoning_end",
"chat_id": record.chat_id,
"stream_id": f"{turn_id}:{event.iteration}",
"turn_id": turn_id,
"turn_phase": "reasoning",
"turn_seq": max(1, event.iteration * 10),
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "stream_end",
"chat_id": record.chat_id,
"stream_id": f"{turn_id}:{event.iteration}",
"text": event.turn_content or "",
"turn_id": turn_id,
"turn_phase": "answer",
"turn_seq": max(1, event.iteration * 10 + 1),
},
)
stream_open = False
elif event.type == "tool_call":
await gateway.broadcast_chat(
record.chat_id,
{
"event": "message",
"chat_id": record.chat_id,
"text": "",
"kind": "tool_hint",
"tool_events": [
{
"phase": "start",
"call_id": f"{turn_id}:{tool_seq}",
"name": event.tool_name,
"arguments": event.tool_args,
}
],
"turn_id": turn_id,
"turn_phase": "activity",
"turn_seq": tool_seq,
},
)
tool_seq += 1
elif event.type == "tool_result":
result_obj: Any
try:
result_obj = json.loads(event.tool_result)
except Exception:
result_obj = event.tool_result
await gateway.broadcast_chat(
record.chat_id,
{
"event": "message",
"chat_id": record.chat_id,
"text": "",
"kind": "progress",
"tool_events": [
{
"phase": "end",
"call_id": f"{turn_id}:{max(tool_seq - 1, 1)}",
"name": event.tool_name,
"arguments": event.tool_args,
"result": result_obj,
}
],
"turn_id": turn_id,
"turn_phase": "activity",
"turn_seq": tool_seq,
},
)
elif event.type == "final":
final_text = event.final_response or ""
await producer_task
latency_ms = max(1, int((time.time() - started_at) * 1000))
gateway.chat_store.append_assistant_message(record, final_text, turn_id, latency_ms)
if not content_seen:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "message",
"chat_id": record.chat_id,
"text": final_text,
"turn_id": turn_id,
"turn_phase": "answer",
"turn_seq": tool_seq + 1,
"latency_ms": latency_ms,
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "turn_end",
"chat_id": record.chat_id,
"turn_id": turn_id,
"turn_phase": "complete",
"turn_seq": tool_seq + 2,
"latency_ms": latency_ms,
"goal_state": {"active": False, "ui_summary": ""},
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "goal_status",
"chat_id": record.chat_id,
"status": "idle",
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "session_updated",
"chat_id": record.chat_id,
"scope": "thread",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
except asyncio.CancelledError:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "turn_end",
"chat_id": record.chat_id,
"turn_id": turn_id,
"turn_phase": "complete",
"turn_seq": tool_seq + 2,
"latency_ms": 0,
"goal_state": {"active": False, "ui_summary": ""},
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "goal_status",
"chat_id": record.chat_id,
"status": "idle",
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "session_updated",
"chat_id": record.chat_id,
"scope": "thread",
"workspace_scope": record.workspace_scope,
"knowledge_base_id": record.knowledge_base_id,
},
)
raise
except Exception as exc:
await gateway.broadcast_chat(
record.chat_id,
{
"event": "error",
"chat_id": record.chat_id,
"detail": str(exc),
},
)
await gateway.broadcast_chat(
record.chat_id,
{
"event": "goal_status",
"chat_id": record.chat_id,
"status": "idle",
},
)
finally:
record.session.session_state.pop("_live_event_sink", None)
record.running = False
record.current_turn_id = None
record.run_task = None
record.updated_at = now_iso()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Meeting Knowledge WebUI server")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8010)
parser.add_argument("--offline", action="store_true")
return parser.parse_args()
def main() -> None:
args = parse_args()
import uvicorn
uvicorn.run(
create_app(force_offline=args.offline),
host=args.host,
port=args.port,
log_level="info",
)
if __name__ == "__main__":
main()