meeting_memory/meeting_memory/web_demo/server.py

421 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import logging
import mimetypes
import sys
import threading
import time
import uuid
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
if __package__ in (None, ""):
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from meeting_memory.config import config
from meeting_memory.graph_store import graph_store
from meeting_memory.meeting_processor import meeting_processor, state_store
logger = logging.getLogger(__name__)
STATIC_DIR = Path(__file__).resolve().parent / "static"
STATIC_V2_DIR = Path(__file__).resolve().parent / "static_v2"
RAW_DIR = Path(config.storage.raw_dir)
IMPORT_JOBS = {}
IMPORT_JOBS_LOCK = threading.Lock()
class GraphDemoHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(STATIC_DIR), **kwargs)
# ── Route: serve /static_v2/* from the v2 directory ──
def translate_path(self, path):
parsed = urlparse(path)
raw = parsed.path
# Serve /static_v2/* from static_v2 directory
if raw.startswith("/static_v2/"):
rel = raw[len("/static_v2/"):]
return str(STATIC_V2_DIR / rel)
return super().translate_path(path)
def do_GET(self):
parsed = urlparse(self.path)
# API endpoints
if parsed.path == "/api/dashboard":
self._handle_dashboard()
return
if parsed.path == "/api/graph":
self._handle_graph(parsed.query)
return
if parsed.path == "/api/graph-types":
self._handle_graph_types()
return
if parsed.path == "/api/graph-kinds":
self._handle_graph_kinds()
return
if parsed.path == "/api/search":
self._handle_search(parsed.query)
return
if parsed.path == "/api/meetings":
self._handle_meetings(parsed.query)
return
if parsed.path == "/api/meeting":
self._handle_meeting(parsed.query)
return
if parsed.path == "/api/import-status":
self._handle_import_status(parsed.query)
return
# Page routing — serve v2 HTML as default
if parsed.path in ("/", "/index.html"):
self.path = "/static_v2/index.html"
elif parsed.path == "/graph":
self.path = "/static_v2/graph.html"
elif parsed.path == "/graph.html":
self.path = "/static_v2/graph.html"
# JS files (/app.js, /graph.js) resolve to STATIC_DIR via default translate_path
super().do_GET()
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == "/api/import":
self._handle_import()
return
self.send_error(HTTPStatus.NOT_FOUND, "Unsupported endpoint")
def log_message(self, format, *args):
logger.info("%s - %s", self.address_string(), format % args)
def end_headers(self):
self.send_header("Cache-Control", "no-store")
super().end_headers()
def guess_type(self, path):
guessed = super().guess_type(path)
if guessed == "application/octet-stream":
return mimetypes.guess_type(path)[0] or guessed
return guessed
def _handle_graph(self, raw_query: str):
params = parse_qs(raw_query)
query = (params.get("q") or [""])[0].strip()
limit_nodes = self._safe_int((params.get("limit_nodes") or ["80"])[0], default=80)
limit_edges = self._safe_int((params.get("limit_edges") or ["160"])[0], default=160)
entity_types = params.get("entity_types")
kinds = params.get("kinds")
payload = graph_store.get_graph_snapshot(
query=query,
entity_types=entity_types if entity_types else None,
kinds=kinds if kinds else None,
limit_nodes=limit_nodes,
limit_edges=limit_edges,
)
self._write_json(payload)
def _handle_graph_types(self):
types = graph_store.get_entity_types()
self._write_json({"types": types})
def _handle_graph_kinds(self):
kinds = graph_store.get_graph_kinds()
self._write_json({"kinds": kinds})
def _handle_search(self, raw_query: str):
params = parse_qs(raw_query)
query = (params.get("q") or [""])[0].strip()
limit = self._safe_int((params.get("limit") or ["8"])[0], default=8)
payload = {
"query": query,
"results": graph_store.hybrid_search(query, limit=limit) if query else [],
}
self._write_json(payload)
def _handle_dashboard(self):
meetings = _load_recent_meetings(limit=6)
action_items = _state_items("action_items", limit=6)
metrics = _state_items("metrics", limit=6)
series = _load_series(limit=6)
graph_stats = graph_store.get_stats()
payload = {
"graph": graph_stats,
"state": state_store.get_stats(),
"meetings": meetings,
"action_items": action_items,
"metrics": metrics,
"series": series,
"highlights": _build_highlights(meetings, action_items, metrics, graph_stats),
}
self._write_json(payload)
def _handle_meetings(self, raw_query: str):
params = parse_qs(raw_query)
limit = self._safe_int((params.get("limit") or ["24"])[0], default=24)
self._write_json({"meetings": _load_recent_meetings(limit=limit)})
def _handle_meeting(self, raw_query: str):
params = parse_qs(raw_query)
filename = (params.get("filename") or [""])[0].strip()
if not filename:
self._write_json({"error": "filename is required"}, status=HTTPStatus.BAD_REQUEST)
return
file_path = RAW_DIR / filename
if not file_path.exists() or file_path.parent != RAW_DIR:
self._write_json({"error": "meeting not found"}, status=HTTPStatus.NOT_FOUND)
return
self._write_json(_serialize_meeting(file_path, include_content=True))
def _handle_import(self):
payload = self._read_json_body()
if payload is None:
self._write_json({"ok": False, "error": "invalid json body"}, status=HTTPStatus.BAD_REQUEST)
return
text = str(payload.get("text") or "").strip()
force = bool(payload.get("force", False))
if not text:
self._write_json({"ok": False, "error": "text is required"}, status=HTTPStatus.BAD_REQUEST)
return
job_id = str(uuid.uuid4())
with IMPORT_JOBS_LOCK:
IMPORT_JOBS[job_id] = {
"job_id": job_id,
"status": "queued",
"message": "任务已创建,等待处理",
"archive_path": "",
"created_at": time.time(),
"updated_at": time.time(),
"steps": [],
}
thread = threading.Thread(
target=_run_import_job,
args=(job_id, text, force),
daemon=True,
)
thread.start()
self._write_json({"ok": True, "job_id": job_id, "status": "queued"})
def _handle_import_status(self, raw_query: str):
params = parse_qs(raw_query)
job_id = (params.get("job_id") or [""])[0].strip()
if not job_id:
self._write_json({"error": "job_id is required"}, status=HTTPStatus.BAD_REQUEST)
return
with IMPORT_JOBS_LOCK:
payload = IMPORT_JOBS.get(job_id)
if not payload:
self._write_json({"error": "job not found"}, status=HTTPStatus.NOT_FOUND)
return
self._write_json(payload)
def _read_json_body(self):
length = self._safe_int(self.headers.get("Content-Length"), default=0)
if length <= 0:
return None
try:
body = self.rfile.read(length)
return json.loads(body.decode("utf-8"))
except Exception:
return None
def _write_json(self, payload, status: HTTPStatus = HTTPStatus.OK):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
@staticmethod
def _safe_int(raw_value, default: int) -> int:
try:
value = int(raw_value)
except (TypeError, ValueError):
return default
return max(0, value)
def run_demo_server(host: str = "127.0.0.1", port: int = 8765) -> None:
server = ThreadingHTTPServer((host, port), GraphDemoHandler)
logger.info("Graph demo server started at http://%s:%s", host, port)
print(f"Graph demo server started: http://{host}:{port}")
print("Press Ctrl+C to stop.")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nServer stopped.")
finally:
server.server_close()
def _run_import_job(job_id: str, text: str, force: bool) -> None:
def update(status: str | None = None, message: str | None = None, *, append_step: bool = False):
with IMPORT_JOBS_LOCK:
job = IMPORT_JOBS.get(job_id)
if not job:
return
if status:
job["status"] = status
if message:
job["message"] = message
if append_step:
job["steps"].append(message)
job["updated_at"] = time.time()
def progress(step: int, total: int, message: str):
update(
"running",
f"步骤 {step}/{total}{message}",
append_step=True,
)
update("running", "开始处理会议文本", append_step=True)
try:
archive_path = meeting_processor.process_meeting_text(
text,
force=force,
interactive=False,
progress_callback=progress,
)
if not archive_path:
update("error", "处理被跳过:可能是重复内容,或结构化抽取失败", append_step=True)
return
with IMPORT_JOBS_LOCK:
job = IMPORT_JOBS.get(job_id)
if job:
job["status"] = "done"
job["message"] = "导入完成"
job["archive_path"] = archive_path
job["updated_at"] = time.time()
job["dashboard"] = {
"graph": graph_store.get_stats(),
"state": state_store.get_stats(),
"meetings": _load_recent_meetings(limit=6),
}
job["steps"].append("导入完成")
except Exception as exc:
logger.exception("Meeting import failed")
update("error", f"处理失败:{exc}", append_step=True)
def _load_recent_meetings(limit: int = 6):
if not RAW_DIR.exists():
return []
files = sorted(
RAW_DIR.glob("*.md"),
key=lambda path: path.stat().st_mtime,
reverse=True,
)
return [_serialize_meeting(path) for path in files[:limit]]
def _serialize_meeting(path: Path, include_content: bool = False):
raw_text = path.read_text(encoding="utf-8")
title = ""
date = ""
lines = raw_text.splitlines()
for line in lines[:12]:
if line.startswith('title: "'):
title = line[len('title: "'):-1]
elif line.startswith('date: "'):
date = line[len('date: "'):-1]
content_start = 0
for idx, line in enumerate(lines):
if line.startswith("# "):
content_start = idx + 2
if not title:
title = line[2:].strip()
break
body = "\n".join(lines[content_start:]).strip()
snippet = body[:180] + ("..." if len(body) > 180 else "")
payload = {
"filename": path.name,
"title": title or path.stem,
"date": date,
"snippet": snippet,
"updated_at": int(path.stat().st_mtime),
}
if include_content:
payload["content"] = body
return payload
def _state_items(key: str, limit: int = 6):
bucket = getattr(state_store, "_state", {}).get(key, {})
items = []
for item in bucket.values():
latest = item.get("latest", {})
items.append({**item, "latest": latest})
items.sort(key=lambda row: str(row.get("latest", {}).get("date", "")), reverse=True)
return items[:limit]
def _load_series(limit: int = 6):
series = getattr(state_store, "_state", {}).get("meeting_series", {})
rows = []
for name, payload in series.items():
rows.append(
{
"name": name,
"latest_date": payload.get("latest_date", ""),
"processed_titles": payload.get("processed_titles", []),
"meeting_count": len(payload.get("processed_titles", [])),
}
)
rows.sort(key=lambda row: row.get("latest_date", ""), reverse=True)
return rows[:limit]
def _build_highlights(meetings, action_items, metrics, graph_stats):
latest_meeting = meetings[0] if meetings else {}
top_action = action_items[0] if action_items else {}
top_metric = metrics[0] if metrics else {}
return [
{
"label": "最近归档",
"value": latest_meeting.get("title", "暂无会议"),
"meta": latest_meeting.get("date", ""),
},
{
"label": "待跟进事项",
"value": str(len(action_items)),
"meta": top_action.get("task", ""),
},
{
"label": "图谱节点",
"value": str(graph_stats.get("entities", 0)),
"meta": "Neo4j 实体总数",
},
{
"label": "关键指标",
"value": str(len(metrics)),
"meta": top_metric.get("metric_name", ""),
},
]
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
run_demo_server()