Merge remote-tracking branch '个人/dev_na' into dev_na

dev_na
chenhao 2026-06-26 16:57:59 +08:00
commit 982425e21e
3 changed files with 370 additions and 251 deletions

View File

@ -2,6 +2,9 @@ package com.imeeting.config;
import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler; import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
@ -19,4 +22,25 @@ public class RealtimeMeetingWebSocketConfig implements WebSocketConfigurer {
registry.addHandler(realtimeMeetingProxyWebSocketHandler, "/ws/meeting/realtime") registry.addHandler(realtimeMeetingProxyWebSocketHandler, "/ws/meeting/realtime")
.setAllowedOriginPatterns("*"); .setAllowedOriginPatterns("*");
} }
/**
* Tomcat WebSocket keepalive ping sessionIdleTimeout
* <p>
* Tomcat WebSocket session sessionIdleTimeout-1 -1
* 线 Ping Pong
* code=1011 "keepalive ping timeout"
* ASR
* sessionIdleTimeout -1
* </p>
*/
@Bean
public WebServerFactoryCustomizer<TomcatServletWebServerFactory> wsSessionIdleTimeoutCustomizer() {
return factory -> factory.addConnectorCustomizers(connector -> {
// 通过系统属性通知 Tomcat WS 容器关闭 sessionIdleTimeout 检查
// org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT=-1
if (System.getProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT") == null) {
System.setProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT", "-1");
}
});
}
} }

View File

@ -42,25 +42,25 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
private static final String ATTR_MEETING_ID = "meetingId"; private static final String ATTR_MEETING_ID = "meetingId";
private static final String ATTR_TARGET_WS_URL = "targetWsUrl"; private static final String ATTR_TARGET_WS_URL = "targetWsUrl";
private static final String ATTR_PROVIDER = "provider"; private static final String ATTR_PROVIDER = "provider";
private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount"; private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount";
private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount"; private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService;
private final RealtimeAsrChannelFactory realtimeAsrChannelFactory; private final RealtimeAsrChannelFactory realtimeAsrChannelFactory;
private final ConcurrentMap<Long, MeetingChannelSession> meetingSessions = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, MeetingChannelSession> meetingSessions = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, Object> meetingLocks = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, Object> meetingLocks = new ConcurrentHashMap<>();
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionToken = extractQueryParam(session.getUri(), "sessionToken"); String sessionToken = extractQueryParam(session.getUri(), "sessionToken");
RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken); RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken);
if (sessionData == null) { if (sessionData == null) {
log.warn("实时会议 websocket 拒绝连接会话令牌无效sessionId={}", session.getId()); log.warn("实时会议 websocket 拒绝连接会话令牌无效sessionId={}", session.getId());
session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效")); session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效"));
return; return;
} }
@ -69,65 +69,70 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024); new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024);
session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId()); session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId());
session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl()); session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl());
session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider()); session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider());
session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger()); session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger());
session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger()); session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger());
realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId()); realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId());
log.info("实时会议 websocket 已接入meetingId={}, sessionId={}, provider={}, upstream={}", log.info("实时会议 websocket 已接入meetingId={}, sessionId={}, provider={}, upstream={}",
sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl()); sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl());
attachFrontendSession(sessionData, session, frontendSession); attachFrontendSession(sessionData, session, frontendSession);
} }
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) { protected void handleTextMessage(WebSocketSession session, TextMessage message) {
MeetingChannelSession meetingSession = getMeetingSession(session); // 过滤前端发来的心跳保活消息,不转发给上游 ASR 服务
if (meetingSession == null || !meetingSession.isChannelOpen()) { if (looksLikeKeepaliveMessage(message.getPayload())) {
log.warn("前端文本消息已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}", log.debug("Frontend keepalive received, ignored: meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return;
}
MeetingChannelSession meetingSession = getMeetingSession(session);
if (meetingSession == null || !meetingSession.isChannelOpen()) {
log.warn("前端文本消息已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId()); session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return; return;
} }
int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT); int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT);
String payload = message.getPayload(); String payload = message.getPayload();
log.info("前端文本 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, payload={}", log.info("前端文本 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, payload={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload)); session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload));
meetingSession.channel.handleFrontendText(meetingSession.context, payload); meetingSession.channel.handleFrontendText(meetingSession.context, payload);
} }
@Override @Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
MeetingChannelSession meetingSession = getMeetingSession(session); MeetingChannelSession meetingSession = getMeetingSession(session);
if (meetingSession == null || !meetingSession.isChannelOpen()) { if (meetingSession == null || !meetingSession.isChannelOpen()) {
log.warn("前端音频帧已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}", log.warn("前端音频帧已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId()); session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return; return;
} }
int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT); int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT);
int bytes = message.getPayloadLength(); int bytes = message.getPayloadLength();
if (shouldLogBinaryFrame(count)) { if (shouldLogBinaryFrame(count)) {
log.info("前端音频帧 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, bytes={}", log.info("前端音频帧 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, bytes={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes); session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes);
} }
byte[] payload = toByteArray(message.getPayload()); byte[] payload = toByteArray(message.getPayload());
realtimeMeetingAudioStorageService.append(session.getId(), payload); realtimeMeetingAudioStorageService.append(session.getId(), payload);
meetingSession.channel.handleFrontendBinary(meetingSession.context, payload); meetingSession.channel.handleFrontendBinary(meetingSession.context, payload);
} }
@Override @Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) { protected void handlePongMessage(WebSocketSession session, PongMessage message) {
if (getMeetingSession(session) == null) { // Pong 是浏览器对服务端Tomcat发出 Ping 帧的回应,代理层在此消化即可,无需转发给上游 ASR。
return; // 转发 Pong 给上游没有语义意义,且可能引起上游协议混乱。
} log.debug("Frontend pong received (keepalive): meetingId={}, sessionId={}",
log.debug("前端 pong 已在本地忽略meetingId={}, sessionId={}, bytes={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId());
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength());
} }
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("实时会议 websocket 传输异常meetingId={}, sessionId={}, upstream={}", log.error("实时会议 websocket 传输异常meetingId={}, sessionId={}, upstream={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception); session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception);
detachFrontend(session); detachFrontend(session);
realtimeMeetingAudioStorageService.closeSession(session.getId()); realtimeMeetingAudioStorageService.closeSession(session.getId());
if (session.isOpen()) { if (session.isOpen()) {
session.close(CloseStatus.SERVER_ERROR); session.close(CloseStatus.SERVER_ERROR);
} }
@ -135,159 +140,159 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.info("实时会议 websocket 已关闭meetingId={}, sessionId={}, code={}, reason={}", log.info("实时会议 websocket 已关闭meetingId={}, sessionId={}, code={}, reason={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason()); session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason());
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) { if (meetingIdValue instanceof Long meetingId) {
detachFrontend(meetingId, session.getId()); detachFrontend(meetingId, session.getId());
realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId()); realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId());
} }
realtimeMeetingAudioStorageService.closeSession(session.getId()); realtimeMeetingAudioStorageService.closeSession(session.getId());
} }
public void closeMeetingSession(Long meetingId) { public void closeMeetingSession(Long meetingId) {
if (meetingId == null) { if (meetingId == null) {
return; return;
}
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null || meetingSession.channel == null) {
return;
}
meetingSession.channel.closeMeeting(meetingSession.context);
}
private void attachFrontendSession(RealtimeSocketSessionData sessionData,
WebSocketSession rawSession,
ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
Long meetingId = sessionData.getMeetingId();
MeetingChannelSession meetingSession;
boolean reused = false;
synchronized (lockForMeeting(meetingId)) {
meetingSession = meetingSessions.get(meetingId);
if (meetingSession != null && meetingSession.isChannelOpen()) {
String previousSessionId = meetingSession.context.getRawSession() == null
? null
: meetingSession.context.getRawSession().getId();
meetingSession.clearFrontendIfClosed();
if (previousSessionId != null && !meetingSession.hasOpenFrontend()) {
realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId);
} }
if (meetingSession.hasOpenFrontend()) { MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接"); if (meetingSession == null || meetingSession.channel == null) {
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接")); return;
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
return;
} }
if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { meetingSession.channel.closeMeeting(meetingSession.context);
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议"); }
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议"));
realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); private void attachFrontendSession(RealtimeSocketSessionData sessionData,
return; WebSocketSession rawSession,
ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
Long meetingId = sessionData.getMeetingId();
MeetingChannelSession meetingSession;
boolean reused = false;
synchronized (lockForMeeting(meetingId)) {
meetingSession = meetingSessions.get(meetingId);
if (meetingSession != null && meetingSession.isChannelOpen()) {
String previousSessionId = meetingSession.context.getRawSession() == null
? null
: meetingSession.context.getRawSession().getId();
meetingSession.clearFrontendIfClosed();
if (previousSessionId != null && !meetingSession.hasOpenFrontend()) {
realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId);
}
if (meetingSession.hasOpenFrontend()) {
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接");
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接"));
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
return;
}
if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) {
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议");
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议"));
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
return;
}
meetingSession.bindFrontend(rawSession, frontendSession);
reused = true;
} else {
RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider());
RealtimeAsrChannelContext context = new RealtimeAsrChannelContext();
context.setMeetingId(meetingId);
context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider()));
context.setTargetWsUrl(sessionData.getTargetWsUrl());
context.setCallback(new HandlerChannelCallback());
context.bindFrontendSession(rawSession, frontendSession);
context.getChannelState().put("modelCode", sessionData.getModelCode());
context.getChannelState().put("mediaConfig", sessionData.getMediaConfig());
meetingSession = new MeetingChannelSession(meetingId, channel, context);
meetingSessions.put(meetingId, meetingSession);
}
} }
meetingSession.bindFrontend(rawSession, frontendSession);
reused = true;
} else {
RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider());
RealtimeAsrChannelContext context = new RealtimeAsrChannelContext();
context.setMeetingId(meetingId);
context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider()));
context.setTargetWsUrl(sessionData.getTargetWsUrl());
context.setCallback(new HandlerChannelCallback());
context.bindFrontendSession(rawSession, frontendSession);
context.getChannelState().put("modelCode", sessionData.getModelCode());
context.getChannelState().put("mediaConfig", sessionData.getMediaConfig());
meetingSession = new MeetingChannelSession(meetingId, channel, context);
meetingSessions.put(meetingId, meetingSession);
}
}
if (reused) { if (reused) {
sendProxyReady(frontendSession); sendProxyReady(frontendSession);
replayCachedMessages(meetingId, frontendSession); replayCachedMessages(meetingId, frontendSession);
return; return;
}
try {
meetingSession.channel.connect(meetingSession.context);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
removeMeetingSession(meetingId, meetingSession);
log.error("连接上游 ASR websocket 时被中断meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断");
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
} catch (Exception ex) {
removeMeetingSession(meetingId, meetingSession);
log.warn("连接上游 ASR websocket 失败meetingId={}, provider={}, target={}",
meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败");
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
}
}
private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) {
try {
if (!frontendSession.isOpen()) {
return;
}
for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) {
frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item)));
}
} catch (Exception ex) {
log.warn("回放缓存转写消息失败meetingId={}", meetingId, ex);
}
}
private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
}
}
private void detachFrontend(WebSocketSession session) {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) {
detachFrontend(meetingId, session.getId());
}
}
private void detachFrontend(Long meetingId, String sessionId) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
} }
synchronized (lockForMeeting(meetingId)) {
if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) {
meetingSession.channel.onFrontendDetached(meetingSession.context);
}
meetingSession.detachFrontend(sessionId);
}
}
private MeetingChannelSession getMeetingSession(WebSocketSession session) { try {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); meetingSession.channel.connect(meetingSession.context);
if (!(meetingIdValue instanceof Long meetingId)) { } catch (InterruptedException ex) {
return null; Thread.currentThread().interrupt();
} removeMeetingSession(meetingId, meetingSession);
return meetingSessions.get(meetingId); log.error("连接上游 ASR websocket 时被中断meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
} sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断");
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
void removeMeetingSession(Long meetingId) { frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
synchronized (lockForMeeting(meetingId)) { } catch (Exception ex) {
meetingSessions.remove(meetingId); removeMeetingSession(meetingId, meetingSession);
} log.warn("连接上游 ASR websocket 失败meetingId={}, provider={}, target={}",
} meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败");
void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) { realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
synchronized (lockForMeeting(meetingId)) { frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
meetingSessions.remove(meetingId, meetingSession);
} }
} }
private Object lockForMeeting(Long meetingId) { private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) {
return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object()); try {
} if (!frontendSession.isOpen()) {
return;
}
for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) {
frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item)));
}
} catch (Exception ex) {
log.warn("回放缓存转写消息失败meetingId={}", meetingId, ex);
}
}
private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
}
}
private void detachFrontend(WebSocketSession session) {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) {
detachFrontend(meetingId, session.getId());
}
}
private void detachFrontend(Long meetingId, String sessionId) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
synchronized (lockForMeeting(meetingId)) {
if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) {
meetingSession.channel.onFrontendDetached(meetingSession.context);
}
meetingSession.detachFrontend(sessionId);
}
}
private MeetingChannelSession getMeetingSession(WebSocketSession session) {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (!(meetingIdValue instanceof Long meetingId)) {
return null;
}
return meetingSessions.get(meetingId);
}
void removeMeetingSession(Long meetingId) {
synchronized (lockForMeeting(meetingId)) {
meetingSessions.remove(meetingId);
}
}
void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) {
synchronized (lockForMeeting(meetingId)) {
meetingSessions.remove(meetingId, meetingSession);
}
}
private Object lockForMeeting(Long meetingId) {
return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object());
}
private String extractQueryParam(URI uri, String key) { private String extractQueryParam(URI uri, String key) {
if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) { if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) {
@ -316,18 +321,18 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return 0; return 0;
} }
private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) {
try { try {
if (!frontendSession.isOpen()) { if (!frontendSession.isOpen()) {
return; return;
} }
Map<String, Object> payload = new HashMap<>(); Map<String, Object> payload = new HashMap<>();
payload.put("type", "error"); payload.put("type", "error");
payload.put("code", code); payload.put("code", code);
payload.put("message", message); payload.put("message", message);
frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload))); frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload)));
} catch (Exception ex) { } catch (Exception ex) {
log.warn("向前端发送实时代理错误消息失败code={}", code, ex); log.warn("向前端发送实时代理错误消息失败code={}", code, ex);
} }
} }
@ -346,108 +351,116 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return normalized.substring(0, 240) + "..."; return normalized.substring(0, 240) + "...";
} }
static final class MeetingChannelSession { private boolean looksLikeKeepaliveMessage(String payload) {
private final Long meetingId; if (payload == null || payload.isBlank()) {
private final RealtimeAsrChannel channel; return false;
private final RealtimeAsrChannelContext context; }
String normalized = payload.replaceAll("\\s+", "");
private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) { return normalized.contains("\"type\":\"keepalive\"");
this.meetingId = meetingId;
this.channel = channel;
this.context = context;
} }
private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) { static final class MeetingChannelSession {
context.bindFrontendSession(rawSession, frontendSession); private final Long meetingId;
private final RealtimeAsrChannel channel;
private final RealtimeAsrChannelContext context;
private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) {
this.meetingId = meetingId;
this.channel = channel;
this.context = context;
} }
private void detachFrontend(String sessionId) { private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) {
if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) { context.bindFrontendSession(rawSession, frontendSession);
context.bindFrontendSession(null, null); }
}
private void detachFrontend(String sessionId) {
if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) {
context.bindFrontendSession(null, null);
}
}
private void clearFrontendIfClosed() {
if (context.getRawSession() != null && !context.getRawSession().isOpen()) {
context.bindFrontendSession(null, null);
}
}
private boolean hasOpenFrontend() {
return context.getFrontendSession() != null
&& context.getFrontendSession().isOpen()
&& context.getRawSession() != null
&& context.getRawSession().isOpen();
}
private boolean isChannelOpen() {
return channel != null && channel.isOpen(context);
}
} }
private void clearFrontendIfClosed() { private final class HandlerChannelCallback implements RealtimeAsrChannelCallback {
if (context.getRawSession() != null && !context.getRawSession().isOpen()) {
context.bindFrontendSession(null, null);
}
}
private boolean hasOpenFrontend() {
return context.getFrontendSession() != null
&& context.getFrontendSession().isOpen()
&& context.getRawSession() != null
&& context.getRawSession().isOpen();
}
private boolean isChannelOpen() {
return channel != null && channel.isOpen(context);
}
}
private final class HandlerChannelCallback implements RealtimeAsrChannelCallback {
@Override @Override
public void onChannelOpen(Long meetingId) throws Exception { public void onChannelOpen(Long meetingId) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId); MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) { if (meetingSession == null) {
return; return;
} }
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) { if (frontendSession != null && frontendSession.isOpen()) {
sendProxyReady(frontendSession); sendProxyReady(frontendSession);
} }
} }
@Override @Override
public void sendFrontendText(Long meetingId, String payload) throws Exception { public void sendFrontendText(Long meetingId, String payload) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId); MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) { if (meetingSession == null) {
return; return;
} }
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) { if (frontendSession != null && frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage(payload)); frontendSession.sendMessage(new TextMessage(payload));
} }
} }
@Override @Override
public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception { public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId); MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) { if (meetingSession == null) {
return; return;
} }
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) { if (frontendSession != null && frontendSession.isOpen()) {
frontendSession.sendMessage(new BinaryMessage(payload)); frontendSession.sendMessage(new BinaryMessage(payload));
} }
} }
@Override @Override
public void sendFrontendError(Long meetingId, String code, String message) { public void sendFrontendError(Long meetingId, String code, String message) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId); MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) { if (meetingSession == null) {
return; return;
} }
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null) { if (frontendSession != null) {
RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message); RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message);
} }
} }
@Override @Override
public void removeMeetingSession(Long meetingId) { public void removeMeetingSession(Long meetingId) {
RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId); RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId);
} }
@Override @Override
public void closeFrontend(Long meetingId, CloseStatus status) { public void closeFrontend(Long meetingId, CloseStatus status) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId); MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) { if (meetingSession == null) {
return; return;
} }
try { try {
WebSocketSession rawSession = meetingSession.context.getRawSession(); WebSocketSession rawSession = meetingSession.context.getRawSession();
if (rawSession != null && rawSession.isOpen()) { if (rawSession != null && rawSession.isOpen()) {
rawSession.close(status); rawSession.close(status);
} }
} catch (Exception ignored) { } catch (Exception ignored) {

View File

@ -244,6 +244,7 @@ export function RealtimeAsrSession() {
const [sessionStatus, setSessionStatus] = useState<RealtimeMeetingSessionStatus | null>(null); const [sessionStatus, setSessionStatus] = useState<RealtimeMeetingSessionStatus | null>(null);
const transcriptRef = useRef<HTMLDivElement | null>(null); const transcriptRef = useRef<HTMLDivElement | null>(null);
const wsRef = useRef<WebSocket | null>(null); const wsRef = useRef<WebSocket | null>(null);
const wsHeartbeatRef = useRef<ReturnType<typeof setInterval> | null>(null);
const audioContextRef = useRef<AudioContext | null>(null); const audioContextRef = useRef<AudioContext | null>(null);
const processorRef = useRef<ScriptProcessorNode | null>(null); const processorRef = useRef<ScriptProcessorNode | null>(null);
const audioSourceRef = useRef<MediaStreamAudioSourceNode | null>(null); const audioSourceRef = useRef<MediaStreamAudioSourceNode | null>(null);
@ -373,6 +374,33 @@ export function RealtimeAsrSession() {
return () => window.removeEventListener("pagehide", handlePageHide); return () => window.removeEventListener("pagehide", handlePageHide);
}, [meetingId]); }, [meetingId]);
// 组件卸载切换路由时统一清理所有资源防止定时器、WebSocket、音频管道泄漏
useEffect(() => {
return () => {
// 1. 清理心跳定时器
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
// 2. 关闭 WebSocket移除回调避免触发不必要的状态更新
if (wsRef.current) {
wsRef.current.onclose = null;
wsRef.current.onerror = null;
wsRef.current.onmessage = null;
wsRef.current.close();
wsRef.current = null;
}
// 3. 关闭音频管道(同步处理,无需 await
processorRef.current?.disconnect();
audioSourceRef.current?.disconnect();
streamRef.current?.getTracks().forEach((t) => t.stop());
if (audioContextRef.current && audioContextRef.current.state !== "closed") {
void audioContextRef.current.close();
}
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const shutdownAudioPipeline = async () => { const shutdownAudioPipeline = async () => {
processorRef.current?.disconnect(); processorRef.current?.disconnect();
audioSourceRef.current?.disconnect(); audioSourceRef.current?.disconnect();
@ -433,6 +461,10 @@ export function RealtimeAsrSession() {
setRecording(false); setRecording(false);
setStatusText("连接失败"); setStatusText("连接失败");
sessionStartedRef.current = false; sessionStartedRef.current = false;
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
wsRef.current?.close(); wsRef.current?.close();
wsRef.current = null; wsRef.current = null;
await shutdownAudioPipeline(); await shutdownAudioPipeline();
@ -530,6 +562,13 @@ export function RealtimeAsrSession() {
} }
const pauseRes = await pauseRealtimeMeeting(meetingId); const pauseRes = await pauseRealtimeMeeting(meetingId);
await closeFrontendSocket(false); await closeFrontendSocket(false);
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
wsRef.current?.close();
wsRef.current = null;
sessionStartedRef.current = false;
await shutdownAudioPipeline(); await shutdownAudioPipeline();
setSessionStatus(pauseRes.data.data); setSessionStatus(pauseRes.data.data);
setRecording(false); setRecording(false);
@ -575,12 +614,41 @@ export function RealtimeAsrSession() {
hotwords: sessionDraft.hotwords || [], hotwords: sessionDraft.hotwords || [],
}); });
const socketSession = socketSessionRes.data.data; const socketSession = socketSessionRes.data.data;
// 如果已有旧的 WebSocket比如重连先强制关闭并清理心跳
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
if (wsRef.current) {
wsRef.current.onclose = null;
wsRef.current.onerror = null;
wsRef.current.onmessage = null;
wsRef.current.close();
wsRef.current = null;
}
const socket = new WebSocket(buildRealtimeProxyWsUrl(socketSession)); const socket = new WebSocket(buildRealtimeProxyWsUrl(socketSession));
socket.binaryType = "arraybuffer"; socket.binaryType = "arraybuffer";
wsRef.current = socket; wsRef.current = socket;
socket.onopen = () => { socket.onopen = () => {
setStatusText("识别服务连接中,等待第三方服务就绪..."); setStatusText("识别服务连接中,等待第三方服务就绪...");
// 先清除旧定时器(防止 onopen 被意外重复触发时叠加)
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
// 启动心跳保活:每 20 秒发送一次 keepalive JSON防止服务端 ping timeout 断开
wsHeartbeatRef.current = setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
try {
socket.send(JSON.stringify({ type: "keepalive" }));
} catch {
// ignore heartbeat send failure
}
}
}, 20000);
}; };
socket.onmessage = (event) => { socket.onmessage = (event) => {
@ -641,6 +709,10 @@ export function RealtimeAsrSession() {
}; };
socket.onclose = () => { socket.onclose = () => {
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
setConnecting(false); setConnecting(false);
setRecording(false); setRecording(false);
sessionStartedRef.current = false; sessionStartedRef.current = false;
@ -668,6 +740,16 @@ export function RealtimeAsrSession() {
setStatusText("结束会议中..."); setStatusText("结束会议中...");
await closeFrontendSocket(true); await closeFrontendSocket(true);
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({ is_speaking: false }));
}
if (wsHeartbeatRef.current !== null) {
clearInterval(wsHeartbeatRef.current);
wsHeartbeatRef.current = null;
}
wsRef.current?.close();
wsRef.current = null;
sessionStartedRef.current = false;
await shutdownAudioPipeline(); await shutdownAudioPipeline();