meeting_memory/meeting_memory/graph_store.py

913 lines
37 KiB
Python

import hashlib
import json
import logging
import re
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional
from meeting_memory.config import config
from meeting_memory.services.embedding_service import embedding_service
logger = logging.getLogger(__name__)
def _cosine_similarity(left: List[float], right: List[float]) -> float:
if not left or not right or len(left) != len(right):
return 0.0
dot = sum(a * b for a, b in zip(left, right))
left_norm = sum(a * a for a in left) ** 0.5
right_norm = sum(b * b for b in right) ** 0.5
if left_norm == 0 or right_norm == 0:
return 0.0
return dot / (left_norm * right_norm)
def _keyword_score(text: str, question: str) -> float:
source = (text or '').lower()
terms = _keyword_terms(question)
if not source or not terms:
return 0.0
hits = sum(1 for term in terms if term in source)
return hits / len(terms)
class _EntityType(str, Enum):
DEPARTMENT = 'Department'
PROJECT = 'Project'
METRIC = 'Metric'
PERSON = 'Person'
SYSTEM = 'System'
DOCUMENT = 'Document'
PARTICIPANT = 'participant'
UNKNOWN = 'Unknown'
_ENTITY_TYPE_ALIASES = {
'组织': 'Department',
'organization': 'Department',
'部门': 'Department',
'指标': 'Metric',
'kpi': 'Metric',
'项目': 'Project',
}
def _canonical_entity_type(raw: str) -> str:
normalized = raw.strip()
if normalized in _ENTITY_TYPE_ALIASES:
return _ENTITY_TYPE_ALIASES[normalized]
for member in _EntityType:
if member.value.lower() == normalized.lower():
return member.value
return _EntityType.UNKNOWN.value
def _neo4j_labels(entity_type: str) -> list[str]:
canonical = _canonical_entity_type(entity_type)
labels = ['Entity']
if canonical != _EntityType.UNKNOWN.value:
labels.append(canonical)
return labels
def _keyword_terms(text: str) -> List[str]:
normalized = (text or '').lower()
raw_terms = re.findall(r'[a-z0-9]+|[\u4e00-\u9fff]{2,}', normalized)
stopwords = {'是什么', '多少', '分别', '以及', '还有', '当前值', '目标值'}
terms: List[str] = []
for raw in raw_terms:
if raw in stopwords:
continue
if raw not in terms:
terms.append(raw)
if re.fullmatch(r'[\u4e00-\u9fff]{4,}', raw):
for size in (2, 3, 4):
for idx in range(0, len(raw) - size + 1):
piece = raw[idx: idx + size]
if piece not in stopwords and piece not in terms:
terms.append(piece)
return terms
class Neo4jGraphStore:
def __init__(self):
self._driver = None
self._enabled = False
self._uri = config.neo4j.uri
self._last_failure_at = 0.0
self._retry_cooldown_seconds = 10.0
self._connect()
def _connect(self):
if not config.neo4j.enabled:
logger.info('Neo4j graph store disabled')
return
try:
from neo4j import GraphDatabase
except ImportError:
logger.warning('neo4j package is not installed')
return
if not config.neo4j.password:
logger.warning('Neo4j is enabled but NEO4J_PASSWORD is empty')
return
tried_uris = [self._uri]
if self._uri.startswith('neo4j://'):
tried_uris.append('bolt://' + self._uri[len('neo4j://'):])
for uri in tried_uris:
driver = None
try:
driver = GraphDatabase.driver(
uri,
auth=(config.neo4j.user, config.neo4j.password),
)
driver.verify_connectivity()
self._driver = driver
self._uri = uri
self._enabled = True
self._last_failure_at = 0.0
if uri != config.neo4j.uri:
logger.warning('Neo4j routing URI unavailable; fell back to %s', uri)
return
except Exception as exc:
logger.warning('Neo4j connection failed for %s: %s', uri, exc)
try:
driver.close()
except Exception:
pass
self._mark_unavailable('Neo4j is currently unreachable')
@property
def enabled(self) -> bool:
if not self._enabled and self._should_retry_connect():
self._connect()
return self._enabled and self._driver is not None
def _should_retry_connect(self) -> bool:
return (time.time() - self._last_failure_at) >= self._retry_cooldown_seconds
def _mark_unavailable(self, reason: str = '') -> None:
if reason:
logger.warning('Neo4j temporarily disabled: %s', reason)
self._enabled = False
self._last_failure_at = time.time()
if self._driver is not None:
try:
self._driver.close()
except Exception:
pass
self._driver = None
@staticmethod
def meeting_id(meeting_data: dict) -> str:
title = meeting_data.get('title', '')
date = meeting_data.get('date', '')
raw = f'{date}_{title}'
return f'meeting_{hashlib.md5(raw.encode("utf-8")).hexdigest()[:12]}'
def close(self):
if self._driver is not None:
self._driver.close()
def run_query(self, query: str, **params) -> List[Dict[str, Any]]:
if not self.enabled:
return []
try:
with self._driver.session(database=config.neo4j.database) as session:
result = session.run(query, **params)
return [record.data() for record in result]
except Exception as exc:
logger.warning('Neo4j query failed: %s', exc)
self._mark_unavailable(str(exc))
return []
def initialize_schema(self):
if not self.enabled:
return
statements = [
'CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE',
'CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE',
'CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE',
'CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)',
'CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)',
'CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)',
'CREATE INDEX relates_to_name IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.name)',
'CREATE INDEX relates_to_fact IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.fact)',
]
for statement in statements:
self.run_query(statement)
def get_stats(self) -> Dict[str, Any]:
if not self.enabled:
return {'enabled': False}
rows = self.run_query('''
CALL { MATCH (m:Meeting) RETURN count(m) AS meetings }
CALL { MATCH (ep:Episode) RETURN count(ep) AS episodes }
CALL { MATCH (e:Entity) RETURN count(e) AS entities }
CALL { MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS relations }
CALL { MATCH (d:Department) RETURN count(d) AS departments }
CALL { MATCH (p:Project) RETURN count(p) AS projects }
CALL { MATCH (m:Metric) RETURN count(m) AS metrics }
RETURN meetings, episodes, entities, relations, departments, projects, metrics
''')
if not rows:
return {'enabled': False}
return {'enabled': True, **rows[0]}
# ==================== Entity Dedup (from Graphiti) ====================
def find_similar_entities(
self, name: str, threshold: float = 0.6, limit: int = 15
) -> List[Dict[str, Any]]:
if not self.enabled or not name.strip():
return []
query_embedding = embedding_service.embed_text(name)
rows = self.run_query('''
MATCH (e:Entity)
RETURN e.name AS name,
e.entity_type AS entity_type,
e.summary AS summary,
e.description AS description,
e.name_embedding AS name_embedding
''')
scored = []
for row in rows:
score = _cosine_similarity(query_embedding, row.get('name_embedding', []))
if score >= threshold:
scored.append({
'candidate_id': len(scored),
'name': row.get('name', ''),
'entity_type': row.get('entity_type', ''),
'summary': row.get('summary', '') or row.get('description', ''),
'score': score,
})
scored.sort(key=lambda r: r['score'], reverse=True)
return scored[:limit]
def get_entities_map(self) -> Dict[str, Dict[str, Any]]:
rows = self.run_query('''
MATCH (e:Entity)
RETURN e.name AS name,
e.entity_type AS entity_type,
e.summary AS summary,
e.description AS description
''')
return {r['name']: r for r in rows if r.get('name')}
# ==================== Edge Dedup / Resolution (from Graphiti) ====================
def get_facts_between(self, source_name: str, target_name: str) -> List[Dict[str, Any]]:
return self.run_query('''
MATCH (s:Entity {name: $source_name})-[r:RELATES_TO]->(t:Entity {name: $target_name})
RETURN r.name AS relation_type,
r.fact AS fact,
r.qualifiers AS qualifiers,
r.confidence AS confidence,
r.valid_at AS valid_at,
r.invalid_at AS invalid_at,
r.expired_at AS expired_at,
r.meeting_id AS meeting_id
ORDER BY coalesce(r.valid_at, '') DESC
''', source_name=source_name, target_name=target_name)
def search_related_facts(
self, fact_text: str, group_id: str = '', limit: int = 10
) -> List[Dict[str, Any]]:
if not fact_text.strip():
return []
query_embedding = embedding_service.embed_text(fact_text)
rows = self.run_query('''
MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity)
RETURN r.fact AS fact,
r.name AS relation_type,
r.fact_embedding AS fact_embedding,
r.valid_at AS valid_at,
r.invalid_at AS invalid_at,
r.expired_at AS expired_at,
s.name AS source_name,
t.name AS target_name
''')
scored = []
for row in rows:
score = _cosine_similarity(query_embedding, row.get('fact_embedding', []))
if score > 0.3:
scored.append({
'fact': row.get('fact', ''),
'relation_type': row.get('relation_type', ''),
'source_name': row.get('source_name', ''),
'target_name': row.get('target_name', ''),
'valid_at': row.get('valid_at', ''),
'invalid_at': row.get('invalid_at', ''),
'expired_at': row.get('expired_at', ''),
'score': score,
})
scored.sort(key=lambda r: r['score'], reverse=True)
return scored[:limit]
def mark_relation_expired(self, source_name: str, target_name: str, relation_type: str, expired_at: str | None = None):
if not expired_at:
expired_at = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
self.run_query('''
MATCH (s:Entity {name: $source_name})-[r:RELATES_TO {name: $relation_type}]->(t:Entity {name: $target_name})
SET r.expired_at = $expired_at,
r.invalid_at = $expired_at,
r.updated_at = datetime()
''', source_name=source_name, target_name=target_name, relation_type=relation_type, expired_at=expired_at)
# ==================== Core Write Operations ====================
def upsert_meeting_subgraph(self, meeting_data: dict) -> None:
if not self.enabled:
return
meeting_id = meeting_data.get('_graph_meeting_id') or self.meeting_id(meeting_data)
episode_text = self._build_episode_text(meeting_data)
episode_embedding = embedding_service.embed_text(episode_text)
self.initialize_schema()
self.run_query('''
MERGE (m:Meeting {meeting_id: $meeting_id})
SET m.title = $title,
m.date = $date,
m.summary = $summary,
m.content_hash = $content_hash,
m.raw_path = $raw_path,
m.updated_at = datetime()
MERGE (ep:Episode {episode_id: $meeting_id})
SET ep.title = $title,
ep.date = $date,
ep.summary = $summary,
ep.content = $content,
ep.content_hash = $content_hash,
ep.raw_path = $raw_path,
ep.participants = $participants,
ep.content_embedding = $content_embedding,
ep.updated_at = datetime()
MERGE (m)-[:HAS_EPISODE]->(ep)
''',
meeting_id=meeting_id,
title=meeting_data.get('title', ''),
date=meeting_data.get('date', ''),
summary=meeting_data.get('summary', ''),
content_hash=meeting_data.get('_content_hash', ''),
raw_path=meeting_data.get('_original_text_path', ''),
content=episode_text,
participants=meeting_data.get('participants', []),
content_embedding=episode_embedding,
)
for entity in meeting_data.get('entities', []):
self._upsert_entity(meeting_id, entity)
for participant in meeting_data.get('participants', []):
self._upsert_entity(
meeting_id,
{'name': participant, 'entity_type': 'participant', 'description': ''},
)
for relation in meeting_data.get('relations', []):
self._upsert_direct_edge(meeting_id, relation, meeting_data.get('date', ''))
self._upsert_hierarchy(meeting_id, meeting_data)
for metric in meeting_data.get('metrics', []):
self._upsert_metric_node(meeting_id, metric, meeting_data.get('date', ''))
def _upsert_entity(self, meeting_id: str, entity: dict) -> None:
name = entity.get('name', '').strip()
if not name:
return
raw_type = entity.get('entity_type', '').strip()
labels = _neo4j_labels(raw_type)
summary = self._entity_summary(entity)
name_embedding = embedding_service.embed_text(summary or name)
set_labels = ' SET ' + ', '.join(f'e:{label}' for label in labels[1:]) if len(labels) > 1 else ''
self.run_query(f'''
MATCH (:Meeting {{meeting_id: $meeting_id}})-[:HAS_EPISODE]->(ep:Episode {{episode_id: $meeting_id}})
MERGE (e:Entity {{name: $name}})
{set_labels}
SET e.entity_type = CASE
WHEN $type <> '' THEN $type
ELSE coalesce(e.entity_type, '')
END,
e.description = CASE
WHEN $description <> '' THEN $description
ELSE coalesce(e.description, '')
END,
e.summary = CASE
WHEN $summary <> '' THEN $summary
ELSE coalesce(e.summary, '')
END,
e.name_embedding = CASE
WHEN size($name_embedding) > 0 THEN $name_embedding
ELSE coalesce(e.name_embedding, [])
END,
e.updated_at = datetime()
MERGE (ep)-[:MENTIONS]->(e)
''',
meeting_id=meeting_id,
name=name,
type=raw_type,
description=entity.get('description', ''),
summary=summary,
name_embedding=name_embedding,
)
def _upsert_direct_edge(self, meeting_id: str, relation: dict, meeting_date: str) -> None:
source = relation.get('source_entity_name', '').strip()
target = relation.get('target_entity_name', '').strip()
rtype = relation.get('relation_type', '').strip()
if not source or not target or not rtype:
return
self._upsert_entity(
meeting_id,
{'name': source, 'entity_type': '', 'description': ''},
)
self._upsert_entity(
meeting_id,
{'name': target, 'entity_type': '', 'description': ''},
)
fact_text = self._relation_text(relation)
fact_embedding = embedding_service.embed_text(fact_text)
self.run_query('''
MATCH (s:Entity {name: $source})
MATCH (t:Entity {name: $target})
MERGE (s)-[r:RELATES_TO {name: $rtype}]->(t)
SET r.fact = $fact,
r.fact_embedding = $fact_embedding,
r.evidence = $evidence,
r.qualifiers = $qualifiers,
r.confidence = $confidence,
r.valid_at = $valid_at,
r.invalid_at = $invalid_at,
r.meeting_id = $meeting_id,
r.meeting_date = $meeting_date,
r.updated_at = datetime()
''',
meeting_id=meeting_id,
source=source,
target=target,
rtype=rtype,
fact=fact_text,
fact_embedding=fact_embedding,
evidence=relation.get('evidence', ''),
qualifiers=relation.get('qualifiers', []),
confidence=relation.get('confidence', 0.0),
valid_at=relation.get('valid_at', ''),
invalid_at=relation.get('invalid_at', ''),
meeting_date=meeting_date,
)
def _upsert_hierarchy(self, meeting_id: str, meeting_data: dict) -> None:
entities_map = {e['name']: e for e in meeting_data.get('entities', []) if e.get('name')}
for rel in meeting_data.get('relations', []):
rtype = rel.get('relation_type', '')
if rtype not in ('HAS_PROJECT', 'HAS_METRIC', 'PART_OF'):
continue
source = rel.get('source_entity_name', '')
target = rel.get('target_entity_name', '')
if not source or not target:
continue
if rtype == 'HAS_PROJECT' or rtype == 'PART_OF':
self.run_query('''
MATCH (s:Entity {name: $source})
MATCH (t:Entity {name: $target})
MERGE (s)-[r:HAS_PROJECT]->(t)
SET r.updated_at = datetime(),
r.meeting_id = $meeting_id
''', source=source, target=target, meeting_id=meeting_id)
elif rtype == 'HAS_METRIC':
self.run_query('''
MATCH (s:Entity {name: $source})
MATCH (t:Entity {name: $target})
MERGE (s)-[r:HAS_METRIC]->(t)
SET r.updated_at = datetime(),
r.meeting_id = $meeting_id
''', source=source, target=target, meeting_id=meeting_id)
departments = meeting_data.get('departments', [])
for dept in departments:
dept_name = dept.get('name', '').strip()
if not dept_name or dept_name not in entities_map:
continue
for proj_name in dept.get('projects', []):
if proj_name in entities_map:
self.run_query('''
MATCH (s:Entity {name: $source})
MATCH (t:Entity {name: $target})
MERGE (s)-[r:HAS_PROJECT]->(t)
SET r.updated_at = datetime(),
r.meeting_id = $meeting_id
''', source=dept_name, target=proj_name, meeting_id=meeting_id)
def _upsert_metric_node(self, meeting_id: str, metric: dict, meeting_date: str) -> None:
name = metric.get('metric_name', '').strip()
if not name:
return
entity = {
'name': name,
'entity_type': 'Metric',
'description': f"{metric.get('value', '')} ({metric.get('unit', '')})" if metric.get('unit') else metric.get('value', ''),
}
self._upsert_entity(meeting_id, entity)
self.run_query('''
MATCH (e:Entity {name: $name})
SET e.current_value = $value,
e.target = $target,
e.trend = $trend,
e.unit = $unit,
e.owner = $owner,
e.updated_at = datetime()
''',
name=name,
value=metric.get('value', ''),
target=metric.get('target', ''),
trend=metric.get('trend', ''),
unit=metric.get('unit', ''),
owner=metric.get('owner', ''),
)
def remove_meeting_subgraph(self, meeting_id: str) -> None:
if not self.enabled:
return
# Phase 1: detach all entities mentioned by this episode
self.run_query('''
MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode)
OPTIONAL MATCH (ep)-[mention:MENTIONS]->(entity:Entity)
OPTIONAL MATCH (entity)-[er]-()
DELETE mention, er
''', meeting_id=meeting_id)
# Phase 2: delete orphan entities no longer mentioned by any episode
self.run_query('''
MATCH (entity:Entity)
WHERE NOT (entity)<-[:MENTIONS]-(:Episode)
DETACH DELETE entity
''')
# Phase 3: delete episode and meeting
self.run_query('''
MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode)
DETACH DELETE ep, m
''', meeting_id=meeting_id)
# ==================== Retrieval ====================
def get_meeting(self, title: str, date: str = '') -> Optional[Dict[str, Any]]:
if not self.enabled:
return None
rows = self.run_query('''
MATCH (m:Meeting)
WHERE m.title = $title AND ($date = '' OR m.date = $date)
RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date,
m.summary AS summary, m.content_hash AS content_hash
LIMIT 1
''', title=title, date=date)
return rows[0] if rows else None
def find_similar_episode(self, text: str, threshold: float = 0.92) -> Optional[Dict[str, Any]]:
if not self.enabled or not text.strip():
return None
query_embedding = embedding_service.embed_text(text)
rows = self.run_query('''
MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode)
RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date,
m.content_hash AS content_hash, ep.content_embedding AS content_embedding
''')
best_match = None
for row in rows:
score = _cosine_similarity(query_embedding, row.get('content_embedding', []))
if score >= threshold and (best_match is None or score > best_match['score']):
best_match = {
'metadata': {
'meeting_id': row.get('meeting_id', ''),
'title': row.get('title', ''),
'date': row.get('date', ''),
'content_hash': row.get('content_hash', ''),
},
'score': score,
}
return best_match
def hybrid_search(self, question: str, limit: int = 5) -> List[Dict[str, Any]]:
if not self.enabled or not question.strip():
return []
query_embedding = embedding_service.embed_text(question)
candidates = self._load_fact_candidates()
candidates.extend(self._load_entity_candidates())
candidates.extend(self._load_episode_candidates())
scored = []
for item in candidates:
combined_text = ' '.join([
str(item.get('title') or ''),
str(item.get('text') or ''),
str(item.get('meeting_title') or ''),
str(item.get('date') or ''),
])
semantic = _cosine_similarity(query_embedding, item.get('embedding', []))
lexical = _keyword_score(combined_text, question)
graph_bonus = 0.1 if item.get('kind') == 'fact' else 0.05
score = semantic * 0.7 + lexical * 0.2 + graph_bonus
if score <= 0:
continue
scored.append({
**item,
'score': round(score, 4),
'semantic_score': round(semantic, 4),
'keyword_score': round(lexical, 4),
})
scored.sort(key=lambda row: row['score'], reverse=True)
return scored[:limit]
def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]:
return self.hybrid_search(question, limit=limit)
def get_graph_kinds(self) -> List[Dict[str, Any]]:
if not self.enabled:
return []
return self.run_query('''
MATCH (n)
WHERE n:Meeting OR n:Episode OR n:Entity OR n:Department OR n:Project OR n:Metric
WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Department','Project','Metric']][0] AS kind
RETURN kind, count(*) AS count ORDER BY count DESC
''')
def get_entity_types(self) -> List[Dict[str, Any]]:
if not self.enabled:
return []
rows = self.run_query('''
MATCH (e:Entity)
WHERE coalesce(e.entity_type, '') <> ''
RETURN e.entity_type AS entity_type, count(*) AS count ORDER BY count DESC
''')
return [
{'entity_type': _canonical_entity_type(r.get('entity_type', '')), 'count': r.get('count', 0)}
for r in rows
]
def get_graph_snapshot(
self,
query: str = '',
entity_types: Optional[List[str]] = None,
kinds: Optional[List[str]] = None,
limit_nodes: int = 80,
limit_edges: int = 160,
) -> Dict[str, Any]:
if not self.enabled:
return {'nodes': [], 'edges': [], 'stats': {'enabled': False}}
keyword_terms = _keyword_terms(query) if query else []
raw_nodes = self.run_query(f'''
MATCH (n)
WHERE (n:Meeting OR n:Episode OR n:Entity)
AND ($kinds = [] OR any(lbl IN labels(n) WHERE lbl IN $kinds))
AND ($terms = []
OR (n:Meeting AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t))
OR (n:Episode AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.content,'')) CONTAINS t))
OR (n:Entity AND any(t IN $terms WHERE toLower(coalesce(n.name,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t OR toLower(coalesce(n.description,'')) CONTAINS t))
)
AND ($types = [] OR NOT n:Entity OR coalesce(n.entity_type, '') IN $types)
OPTIONAL MATCH (n)-[r]-()
RETURN n.meeting_id AS meeting_id,
n.episode_id AS episode_id,
n.name AS entity_name,
n.title AS title,
n.summary AS summary,
n.date AS date,
n.entity_type AS entity_type,
n.description AS description,
n.meeting_date AS meeting_date,
n.current_value AS current_value,
n.target AS target_value,
n.trend AS trend,
CASE
WHEN n:Meeting THEN 'Meeting'
WHEN n:Episode THEN 'Episode'
WHEN n:Department THEN 'Department'
WHEN n:Project THEN 'Project'
WHEN n:Metric THEN 'Metric'
ELSE 'Entity'
END AS kind,
count(DISTINCT r) AS degree
ORDER BY degree DESC, coalesce(n.title, n.name) ASC
LIMIT $limit_nodes
''',
terms=keyword_terms,
types=entity_types or [],
kinds=kinds or [],
limit_nodes=limit_nodes,
)
if not raw_nodes:
return {'nodes': [], 'edges': [], 'stats': self.get_stats()}
all_raw_ids = set()
nodes = []
for row in raw_nodes:
kind = row.get('kind', '')
if kind == 'Meeting':
raw_id = row.get('meeting_id', '')
label = row.get('title', '') or raw_id
elif kind == 'Episode':
raw_id = row.get('episode_id', '')
label = row.get('title', '') or raw_id
elif kind in ('Entity', 'Department', 'Project', 'Metric'):
raw_id = row.get('entity_name', '')
label = raw_id
else:
continue
if not raw_id:
continue
nid = f'{kind}:{raw_id}'
all_raw_ids.add(raw_id)
node = {
'id': nid,
'label': label,
'kind': kind,
'entity_type': row.get('entity_type', '') if kind in ('Entity', 'Department', 'Project', 'Metric') else '',
'description': row.get('description', '') or row.get('summary', '') or '',
'date': row.get('date', '') or row.get('meeting_date', '') or '',
'degree': row.get('degree', 0),
'summary': row.get('summary', '') or '',
}
if kind == 'Metric':
node['current_value'] = row.get('current_value', '')
node['target'] = row.get('target_value', '')
node['trend'] = row.get('trend', '')
nodes.append(node)
if not nodes:
return {'nodes': [], 'edges': [], 'stats': self.get_stats()}
ids_list = list(all_raw_ids)
edges_raw = self.run_query('''
MATCH (s)-[r]->(t)
WHERE type(r) IN ['HAS_EPISODE','MENTIONS','RELATES_TO','HAS_PROJECT','HAS_METRIC']
AND (
(s:Meeting AND s.meeting_id IN $ids)
OR (s:Episode AND s.episode_id IN $ids)
OR (s:Entity AND s.name IN $ids)
)
AND (
(t:Meeting AND t.meeting_id IN $ids)
OR (t:Episode AND t.episode_id IN $ids)
OR (t:Entity AND t.name IN $ids)
)
RETURN type(r) AS predicate,
r.name AS relation_name,
r.fact AS relation_fact,
r.confidence AS relation_confidence,
r.meeting_date AS relation_date,
r.meeting_id AS relation_meeting_id,
CASE WHEN s:Meeting THEN s.meeting_id
WHEN s:Episode THEN s.episode_id
WHEN s:Entity THEN s.name END AS source_raw,
CASE WHEN t:Meeting THEN t.meeting_id
WHEN t:Episode THEN t.episode_id
WHEN t:Entity THEN t.name END AS target_raw,
CASE WHEN s:Meeting THEN 'Meeting' WHEN s:Episode THEN 'Episode'
WHEN s:Department THEN 'Department'
WHEN s:Project THEN 'Project'
WHEN s:Metric THEN 'Metric'
WHEN s:Entity THEN 'Entity' END AS source_kind,
CASE WHEN t:Meeting THEN 'Meeting' WHEN t:Episode THEN 'Episode'
WHEN t:Department THEN 'Department'
WHEN t:Project THEN 'Project'
WHEN t:Metric THEN 'Metric'
WHEN t:Entity THEN 'Entity' END AS target_kind
LIMIT $limit_edges
''', ids=list(all_raw_ids), limit_edges=limit_edges)
degree_map: Dict[str, int] = {}
for row in edges_raw:
src = row.get('source_raw', '')
tgt = row.get('target_raw', '')
sk = row.get('source_kind', '')
tk = row.get('target_kind', '')
if sk and src:
degree_map[f'{sk}:{src}'] = degree_map.get(f'{sk}:{src}', 0) + 1
if tk and tgt:
degree_map[f'{tk}:{tgt}'] = degree_map.get(f'{tk}:{tgt}', 0) + 1
for node in nodes:
node['degree'] = degree_map.get(node['id'], node.get('degree', 0))
edges = []
for idx, row in enumerate(edges_raw, start=1):
sk = row.get('source_kind', '')
tk = row.get('target_kind', '')
edges.append({
'id': f'edge_{idx}',
'source': f'{sk}:{row["source_raw"]}' if sk and row.get('source_raw') else '',
'target': f'{tk}:{row["target_raw"]}' if tk and row.get('target_raw') else '',
'predicate': row.get('predicate', ''),
'relation_name': row.get('relation_name', ''),
'fact': row.get('relation_fact', '') or '',
'confidence': row.get('relation_confidence', 0.0),
'date': row.get('relation_date', '') or '',
'meeting_id': row.get('relation_meeting_id', '') or '',
})
return {'nodes': nodes, 'edges': edges, 'stats': self.get_stats(), 'query': query}
def format_search_context(self, question: str, top_k: int = 5) -> str:
results = self.hybrid_search(question, limit=top_k)
if not results:
return ''
lines = []
for idx, row in enumerate(results, start=1):
date = row.get('date', '')
meeting_title = row.get('meeting_title', '')
title = row.get('title', row.get('kind', 'item'))
suffix = f' ({date})' if date else ''
source = f' | 来源会议: {meeting_title}' if meeting_title else ''
lines.append(
f'[{idx}] {title}{suffix}{source}\n'
f'{row.get("text", "")}\n'
f'score={row.get("score", 0):.4f}, semantic={row.get("semantic_score", 0):.4f}, keyword={row.get("keyword_score", 0):.4f}'
)
return '\n\n'.join(lines)
def _load_fact_candidates(self) -> List[Dict[str, Any]]:
return self.run_query('''
MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity)
OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(s)
WITH s, r, t, collect(DISTINCT ep.date) AS dates, collect(DISTINCT ep.title) AS titles
RETURN 'fact' AS kind,
s.name + ' -[' + r.name + ']-> ' + t.name AS title,
coalesce(r.fact, '') AS text,
head(dates) AS date,
head(titles) AS meeting_title,
r.fact_embedding AS embedding
''')
def _load_entity_candidates(self) -> List[Dict[str, Any]]:
return self.run_query('''
MATCH (e:Entity)
OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(e)
RETURN 'entity' AS kind,
e.name AS title,
coalesce(e.summary, e.description, '') AS text,
max(ep.date) AS date,
head(collect(DISTINCT ep.title)) AS meeting_title,
e.name_embedding AS embedding
''')
def _load_episode_candidates(self) -> List[Dict[str, Any]]:
return self.run_query('''
MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode)
RETURN 'episode' AS kind,
m.title AS title,
coalesce(ep.summary, ep.content, '') AS text,
ep.date AS date,
m.title AS meeting_title,
ep.content_embedding AS embedding
''')
@staticmethod
def _entity_summary(entity: dict) -> str:
entity_type = _canonical_entity_type(entity.get('entity_type', '').strip())
name = entity.get('name', '').strip()
description = entity.get('description', '').strip()
parts = [part for part in [entity_type, name, description] if part]
return ' | '.join(parts)
@staticmethod
def _relation_text(relation: dict) -> str:
source = relation.get('source_entity_name', '').strip()
rtype = relation.get('relation_type', '').strip()
target = relation.get('target_entity_name', '').strip()
fact = relation.get('fact', '').strip() or f'{source} {rtype} {target}'.strip()
qualifiers = relation.get('qualifiers', [])
qualifier_text = '; '.join(item for item in qualifiers if item)
parts = [fact]
if qualifier_text:
parts.append(qualifier_text)
return '. '.join(parts)
@staticmethod
def _build_episode_text(meeting_data: dict) -> str:
payload = {
'title': meeting_data.get('title', ''),
'date': meeting_data.get('date', ''),
'participants': meeting_data.get('participants', []),
'summary': meeting_data.get('summary', ''),
'entities': meeting_data.get('entities', []),
'relations': meeting_data.get('relations', []),
'action_items': meeting_data.get('action_items', []),
'metrics': meeting_data.get('metrics', []),
'decisions': meeting_data.get('decisions', []),
'departments': meeting_data.get('departments', []),
'original_text': meeting_data.get('_original_text', ''),
}
return json.dumps(payload, ensure_ascii=False)
graph_store = Neo4jGraphStore()