From 9c98e670e1875e67a03a885bec4486844cb18044 Mon Sep 17 00:00:00 2001 From: chenhao Date: Fri, 26 Jun 2026 16:56:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20WebSocket=20?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E4=BF=9D=E6=B4=BB=E5=92=8C=E8=B5=84=E6=BA=90?= =?UTF-8?q?=E6=B8=85=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 `RealtimeMeetingWebSocketConfig` 中添加配置,关闭 Tomcat 内置的 WebSocket keepalive 检测 - 在 `RealtimeAsrSession` 组件中添加心跳定时器和资源清理逻辑,防止资源泄漏 - 在 `RealtimeMeetingProxyWebSocketHandler` 中过滤前端心跳消息,避免转发给上游 ASR 服务 --- .../RealtimeMeetingWebSocketConfig.java | 24 + .../RealtimeMeetingProxyWebSocketHandler.java | 515 +++++++++--------- .../src/pages/business/RealtimeAsrSession.tsx | 82 +++ 3 files changed, 370 insertions(+), 251 deletions(-) diff --git a/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java b/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java index f940e26..e547594 100644 --- a/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java +++ b/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java @@ -2,6 +2,9 @@ package com.imeeting.config; import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler; import lombok.RequiredArgsConstructor; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.server.WebServerFactoryCustomizer; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; @@ -19,4 +22,25 @@ public class RealtimeMeetingWebSocketConfig implements WebSocketConfigurer { registry.addHandler(realtimeMeetingProxyWebSocketHandler, "/ws/meeting/realtime") .setAllowedOriginPatterns("*"); } + + /** + * 关闭 Tomcat 内置的 WebSocket keepalive ping 检测(sessionIdleTimeout)。 + *

+ * Tomcat 默认会对 WebSocket session 设置 sessionIdleTimeout(-1 表示无限,但某些版本默认非 -1), + * 并通过后台线程定期发送 Ping 帧,若在超时内未收到 Pong 响应,触发 + * code=1011 "keepalive ping timeout" 强制断开。 + * 实时 ASR 场景中,客户端持续发送音频帧,由前端心跳保活, + * 因此显式将 sessionIdleTimeout 设为 -1(无限)。 + *

+ */ + @Bean + public WebServerFactoryCustomizer wsSessionIdleTimeoutCustomizer() { + return factory -> factory.addConnectorCustomizers(connector -> { + // 通过系统属性通知 Tomcat WS 容器关闭 sessionIdleTimeout 检查 + // org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT=-1 + if (System.getProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT") == null) { + System.setProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT", "-1"); + } + }); + } } diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java index d19ac61..c5aba29 100644 --- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java +++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java @@ -42,25 +42,25 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl private static final String ATTR_MEETING_ID = "meetingId"; private static final String ATTR_TARGET_WS_URL = "targetWsUrl"; - private static final String ATTR_PROVIDER = "provider"; + private static final String ATTR_PROVIDER = "provider"; private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount"; private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; - private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; - private final RealtimeAsrChannelFactory realtimeAsrChannelFactory; - private final ConcurrentMap meetingSessions = new ConcurrentHashMap<>(); - private final ConcurrentMap meetingLocks = new ConcurrentHashMap<>(); + private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; + private final RealtimeAsrChannelFactory realtimeAsrChannelFactory; + private final ConcurrentMap meetingSessions = new ConcurrentHashMap<>(); + private final ConcurrentMap meetingLocks = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionToken = extractQueryParam(session.getUri(), "sessionToken"); RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken); if (sessionData == null) { - log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId()); + log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId()); session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效")); return; } @@ -69,65 +69,70 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024); session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId()); session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl()); - session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider()); + session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider()); session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger()); session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger()); realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId()); - log.info("实时会议 websocket 已接入:meetingId={}, sessionId={}, provider={}, upstream={}", - sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl()); + log.info("实时会议 websocket 已接入:meetingId={}, sessionId={}, provider={}, upstream={}", + sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl()); - attachFrontendSession(sessionData, session, frontendSession); + attachFrontendSession(sessionData, session, frontendSession); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { - MeetingChannelSession meetingSession = getMeetingSession(session); - if (meetingSession == null || !meetingSession.isChannelOpen()) { - log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", + // 过滤前端发来的心跳保活消息,不转发给上游 ASR 服务 + if (looksLikeKeepaliveMessage(message.getPayload())) { + log.debug("Frontend keepalive received, ignored: meetingId={}, sessionId={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId()); + return; + } + MeetingChannelSession meetingSession = getMeetingSession(session); + if (meetingSession == null || !meetingSession.isChannelOpen()) { + log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId()); return; } int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT); - String payload = message.getPayload(); - log.info("前端文本 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, payload={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload)); - meetingSession.channel.handleFrontendText(meetingSession.context, payload); + String payload = message.getPayload(); + log.info("前端文本 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, payload={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload)); + meetingSession.channel.handleFrontendText(meetingSession.context, payload); } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { - MeetingChannelSession meetingSession = getMeetingSession(session); - if (meetingSession == null || !meetingSession.isChannelOpen()) { - log.warn("前端音频帧已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", + MeetingChannelSession meetingSession = getMeetingSession(session); + if (meetingSession == null || !meetingSession.isChannelOpen()) { + log.warn("前端音频帧已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId()); return; } int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT); int bytes = message.getPayloadLength(); if (shouldLogBinaryFrame(count)) { - log.info("前端音频帧 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, bytes={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes); + log.info("前端音频帧 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, bytes={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes); } byte[] payload = toByteArray(message.getPayload()); realtimeMeetingAudioStorageService.append(session.getId(), payload); - meetingSession.channel.handleFrontendBinary(meetingSession.context, payload); + meetingSession.channel.handleFrontendBinary(meetingSession.context, payload); } @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) { - if (getMeetingSession(session) == null) { - return; - } - log.debug("前端 pong 已在本地忽略:meetingId={}, sessionId={}, bytes={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength()); + // Pong 是浏览器对服务端(Tomcat)发出 Ping 帧的回应,代理层在此消化即可,无需转发给上游 ASR。 + // 转发 Pong 给上游没有语义意义,且可能引起上游协议混乱。 + log.debug("Frontend pong received (keepalive): meetingId={}, sessionId={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId()); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}", + log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception); - detachFrontend(session); - realtimeMeetingAudioStorageService.closeSession(session.getId()); + detachFrontend(session); + realtimeMeetingAudioStorageService.closeSession(session.getId()); if (session.isOpen()) { session.close(CloseStatus.SERVER_ERROR); } @@ -135,159 +140,159 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}", + log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason()); Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); if (meetingIdValue instanceof Long meetingId) { - detachFrontend(meetingId, session.getId()); + detachFrontend(meetingId, session.getId()); realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId()); } realtimeMeetingAudioStorageService.closeSession(session.getId()); } - public void closeMeetingSession(Long meetingId) { - if (meetingId == null) { - return; - } - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null || meetingSession.channel == null) { - return; - } - meetingSession.channel.closeMeeting(meetingSession.context); - } - - private void attachFrontendSession(RealtimeSocketSessionData sessionData, - WebSocketSession rawSession, - ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { - Long meetingId = sessionData.getMeetingId(); - MeetingChannelSession meetingSession; - boolean reused = false; - synchronized (lockForMeeting(meetingId)) { - meetingSession = meetingSessions.get(meetingId); - if (meetingSession != null && meetingSession.isChannelOpen()) { - String previousSessionId = meetingSession.context.getRawSession() == null - ? null - : meetingSession.context.getRawSession().getId(); - meetingSession.clearFrontendIfClosed(); - if (previousSessionId != null && !meetingSession.hasOpenFrontend()) { - realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId); + public void closeMeetingSession(Long meetingId) { + if (meetingId == null) { + return; } - if (meetingSession.hasOpenFrontend()) { - sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接"); - frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接")); - realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); - return; + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null || meetingSession.channel == null) { + return; } - if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { - sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议"); - frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议")); - realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); - return; + meetingSession.channel.closeMeeting(meetingSession.context); + } + + private void attachFrontendSession(RealtimeSocketSessionData sessionData, + WebSocketSession rawSession, + ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { + Long meetingId = sessionData.getMeetingId(); + MeetingChannelSession meetingSession; + boolean reused = false; + synchronized (lockForMeeting(meetingId)) { + meetingSession = meetingSessions.get(meetingId); + if (meetingSession != null && meetingSession.isChannelOpen()) { + String previousSessionId = meetingSession.context.getRawSession() == null + ? null + : meetingSession.context.getRawSession().getId(); + meetingSession.clearFrontendIfClosed(); + if (previousSessionId != null && !meetingSession.hasOpenFrontend()) { + realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId); + } + if (meetingSession.hasOpenFrontend()) { + sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接"); + frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接")); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + return; + } + if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { + sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议"); + frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议")); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + return; + } + meetingSession.bindFrontend(rawSession, frontendSession); + reused = true; + } else { + RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider()); + RealtimeAsrChannelContext context = new RealtimeAsrChannelContext(); + context.setMeetingId(meetingId); + context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider())); + context.setTargetWsUrl(sessionData.getTargetWsUrl()); + context.setCallback(new HandlerChannelCallback()); + context.bindFrontendSession(rawSession, frontendSession); + context.getChannelState().put("modelCode", sessionData.getModelCode()); + context.getChannelState().put("mediaConfig", sessionData.getMediaConfig()); + meetingSession = new MeetingChannelSession(meetingId, channel, context); + meetingSessions.put(meetingId, meetingSession); + } } - meetingSession.bindFrontend(rawSession, frontendSession); - reused = true; - } else { - RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider()); - RealtimeAsrChannelContext context = new RealtimeAsrChannelContext(); - context.setMeetingId(meetingId); - context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider())); - context.setTargetWsUrl(sessionData.getTargetWsUrl()); - context.setCallback(new HandlerChannelCallback()); - context.bindFrontendSession(rawSession, frontendSession); - context.getChannelState().put("modelCode", sessionData.getModelCode()); - context.getChannelState().put("mediaConfig", sessionData.getMediaConfig()); - meetingSession = new MeetingChannelSession(meetingId, channel, context); - meetingSessions.put(meetingId, meetingSession); - } - } - if (reused) { - sendProxyReady(frontendSession); - replayCachedMessages(meetingId, frontendSession); - return; - } - - try { - meetingSession.channel.connect(meetingSession.context); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - removeMeetingSession(meetingId, meetingSession); - log.error("连接上游 ASR websocket 时被中断:meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断"); - realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); - frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断")); - } catch (Exception ex) { - removeMeetingSession(meetingId, meetingSession); - log.warn("连接上游 ASR websocket 失败:meetingId={}, provider={}, target={}", - meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex); - sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败"); - realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); - frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败")); - } - } - - private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) { - try { - if (!frontendSession.isOpen()) { - return; - } - for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) { - frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item))); - } - } catch (Exception ex) { - log.warn("回放缓存转写消息失败:meetingId={}", meetingId, ex); - } - } - - private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); - } - } - - private void detachFrontend(WebSocketSession session) { - Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); - if (meetingIdValue instanceof Long meetingId) { - detachFrontend(meetingId, session.getId()); - } - } - - private void detachFrontend(Long meetingId, String sessionId) { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { - return; + if (reused) { + sendProxyReady(frontendSession); + replayCachedMessages(meetingId, frontendSession); + return; } - synchronized (lockForMeeting(meetingId)) { - if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) { - meetingSession.channel.onFrontendDetached(meetingSession.context); - } - meetingSession.detachFrontend(sessionId); - } - } - private MeetingChannelSession getMeetingSession(WebSocketSession session) { - Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); - if (!(meetingIdValue instanceof Long meetingId)) { - return null; - } - return meetingSessions.get(meetingId); - } - - void removeMeetingSession(Long meetingId) { - synchronized (lockForMeeting(meetingId)) { - meetingSessions.remove(meetingId); - } - } - - void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) { - synchronized (lockForMeeting(meetingId)) { - meetingSessions.remove(meetingId, meetingSession); + try { + meetingSession.channel.connect(meetingSession.context); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + removeMeetingSession(meetingId, meetingSession); + log.error("连接上游 ASR websocket 时被中断:meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断"); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断")); + } catch (Exception ex) { + removeMeetingSession(meetingId, meetingSession); + log.warn("连接上游 ASR websocket 失败:meetingId={}, provider={}, target={}", + meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败"); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败")); } } - private Object lockForMeeting(Long meetingId) { - return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object()); - } + private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) { + try { + if (!frontendSession.isOpen()) { + return; + } + for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) { + frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item))); + } + } catch (Exception ex) { + log.warn("回放缓存转写消息失败:meetingId={}", meetingId, ex); + } + } + + private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { + if (frontendSession.isOpen()) { + frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); + } + } + + private void detachFrontend(WebSocketSession session) { + Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); + if (meetingIdValue instanceof Long meetingId) { + detachFrontend(meetingId, session.getId()); + } + } + + private void detachFrontend(Long meetingId, String sessionId) { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + synchronized (lockForMeeting(meetingId)) { + if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) { + meetingSession.channel.onFrontendDetached(meetingSession.context); + } + meetingSession.detachFrontend(sessionId); + } + } + + private MeetingChannelSession getMeetingSession(WebSocketSession session) { + Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); + if (!(meetingIdValue instanceof Long meetingId)) { + return null; + } + return meetingSessions.get(meetingId); + } + + void removeMeetingSession(Long meetingId) { + synchronized (lockForMeeting(meetingId)) { + meetingSessions.remove(meetingId); + } + } + + void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) { + synchronized (lockForMeeting(meetingId)) { + meetingSessions.remove(meetingId, meetingSession); + } + } + + private Object lockForMeeting(Long meetingId) { + return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object()); + } private String extractQueryParam(URI uri, String key) { if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) { @@ -316,18 +321,18 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return 0; } - private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { - try { - if (!frontendSession.isOpen()) { - return; - } - Map payload = new HashMap<>(); - payload.put("type", "error"); - payload.put("code", code); - payload.put("message", message); - frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload))); - } catch (Exception ex) { - log.warn("向前端发送实时代理错误消息失败:code={}", code, ex); + private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { + try { + if (!frontendSession.isOpen()) { + return; + } + Map payload = new HashMap<>(); + payload.put("type", "error"); + payload.put("code", code); + payload.put("message", message); + frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload))); + } catch (Exception ex) { + log.warn("向前端发送实时代理错误消息失败:code={}", code, ex); } } @@ -346,108 +351,116 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return normalized.substring(0, 240) + "..."; } - static final class MeetingChannelSession { - private final Long meetingId; - private final RealtimeAsrChannel channel; - private final RealtimeAsrChannelContext context; - - private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) { - this.meetingId = meetingId; - this.channel = channel; - this.context = context; + private boolean looksLikeKeepaliveMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + String normalized = payload.replaceAll("\\s+", ""); + return normalized.contains("\"type\":\"keepalive\""); } - private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) { - context.bindFrontendSession(rawSession, frontendSession); + static final class MeetingChannelSession { + private final Long meetingId; + private final RealtimeAsrChannel channel; + private final RealtimeAsrChannelContext context; + + private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) { + this.meetingId = meetingId; + this.channel = channel; + this.context = context; } - private void detachFrontend(String sessionId) { - if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) { - context.bindFrontendSession(null, null); - } + private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) { + context.bindFrontendSession(rawSession, frontendSession); + } + + private void detachFrontend(String sessionId) { + if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) { + context.bindFrontendSession(null, null); + } + } + + private void clearFrontendIfClosed() { + if (context.getRawSession() != null && !context.getRawSession().isOpen()) { + context.bindFrontendSession(null, null); + } + } + + private boolean hasOpenFrontend() { + return context.getFrontendSession() != null + && context.getFrontendSession().isOpen() + && context.getRawSession() != null + && context.getRawSession().isOpen(); + } + + private boolean isChannelOpen() { + return channel != null && channel.isOpen(context); + } } - private void clearFrontendIfClosed() { - if (context.getRawSession() != null && !context.getRawSession().isOpen()) { - context.bindFrontendSession(null, null); - } - } - - private boolean hasOpenFrontend() { - return context.getFrontendSession() != null - && context.getFrontendSession().isOpen() - && context.getRawSession() != null - && context.getRawSession().isOpen(); - } - - private boolean isChannelOpen() { - return channel != null && channel.isOpen(context); - } - } - - private final class HandlerChannelCallback implements RealtimeAsrChannelCallback { + private final class HandlerChannelCallback implements RealtimeAsrChannelCallback { @Override public void onChannelOpen(Long meetingId) throws Exception { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { return; } - ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); - if (frontendSession != null && frontendSession.isOpen()) { - sendProxyReady(frontendSession); - } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + sendProxyReady(frontendSession); + } } - @Override - public void sendFrontendText(Long meetingId, String payload) throws Exception { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { + @Override + public void sendFrontendText(Long meetingId, String payload) throws Exception { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { return; } - ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); - if (frontendSession != null && frontendSession.isOpen()) { - frontendSession.sendMessage(new TextMessage(payload)); - } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + frontendSession.sendMessage(new TextMessage(payload)); + } } @Override public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { - return; - } - ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); - if (frontendSession != null && frontendSession.isOpen()) { - frontendSession.sendMessage(new BinaryMessage(payload)); - } + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + frontendSession.sendMessage(new BinaryMessage(payload)); + } } @Override public void sendFrontendError(Long meetingId, String code, String message) { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { - return; - } - ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); - if (frontendSession != null) { - RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message); - } + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null) { + RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message); + } } @Override public void removeMeetingSession(Long meetingId) { - RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId); + RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId); } @Override public void closeFrontend(Long meetingId, CloseStatus status) { - MeetingChannelSession meetingSession = meetingSessions.get(meetingId); - if (meetingSession == null) { - return; - } + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } try { - WebSocketSession rawSession = meetingSession.context.getRawSession(); - if (rawSession != null && rawSession.isOpen()) { + WebSocketSession rawSession = meetingSession.context.getRawSession(); + if (rawSession != null && rawSession.isOpen()) { rawSession.close(status); } } catch (Exception ignored) { diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx index 2dbd793..ee65572 100644 --- a/frontend/src/pages/business/RealtimeAsrSession.tsx +++ b/frontend/src/pages/business/RealtimeAsrSession.tsx @@ -244,6 +244,7 @@ export function RealtimeAsrSession() { const [sessionStatus, setSessionStatus] = useState(null); const transcriptRef = useRef(null); const wsRef = useRef(null); + const wsHeartbeatRef = useRef | null>(null); const audioContextRef = useRef(null); const processorRef = useRef(null); const audioSourceRef = useRef(null); @@ -373,6 +374,33 @@ export function RealtimeAsrSession() { return () => window.removeEventListener("pagehide", handlePageHide); }, [meetingId]); + // 组件卸载(切换路由)时统一清理所有资源,防止定时器、WebSocket、音频管道泄漏 + useEffect(() => { + return () => { + // 1. 清理心跳定时器 + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } + // 2. 关闭 WebSocket(移除回调避免触发不必要的状态更新) + if (wsRef.current) { + wsRef.current.onclose = null; + wsRef.current.onerror = null; + wsRef.current.onmessage = null; + wsRef.current.close(); + wsRef.current = null; + } + // 3. 关闭音频管道(同步处理,无需 await) + processorRef.current?.disconnect(); + audioSourceRef.current?.disconnect(); + streamRef.current?.getTracks().forEach((t) => t.stop()); + if (audioContextRef.current && audioContextRef.current.state !== "closed") { + void audioContextRef.current.close(); + } + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + const shutdownAudioPipeline = async () => { processorRef.current?.disconnect(); audioSourceRef.current?.disconnect(); @@ -433,6 +461,10 @@ export function RealtimeAsrSession() { setRecording(false); setStatusText("连接失败"); sessionStartedRef.current = false; + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } wsRef.current?.close(); wsRef.current = null; await shutdownAudioPipeline(); @@ -530,6 +562,13 @@ export function RealtimeAsrSession() { } const pauseRes = await pauseRealtimeMeeting(meetingId); await closeFrontendSocket(false); + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } + wsRef.current?.close(); + wsRef.current = null; + sessionStartedRef.current = false; await shutdownAudioPipeline(); setSessionStatus(pauseRes.data.data); setRecording(false); @@ -575,12 +614,41 @@ export function RealtimeAsrSession() { hotwords: sessionDraft.hotwords || [], }); const socketSession = socketSessionRes.data.data; + + // 如果已有旧的 WebSocket(比如重连),先强制关闭并清理心跳 + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } + if (wsRef.current) { + wsRef.current.onclose = null; + wsRef.current.onerror = null; + wsRef.current.onmessage = null; + wsRef.current.close(); + wsRef.current = null; + } + const socket = new WebSocket(buildRealtimeProxyWsUrl(socketSession)); socket.binaryType = "arraybuffer"; wsRef.current = socket; socket.onopen = () => { setStatusText("识别服务连接中,等待第三方服务就绪..."); + // 先清除旧定时器(防止 onopen 被意外重复触发时叠加) + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } + // 启动心跳保活:每 20 秒发送一次 keepalive JSON,防止服务端 ping timeout 断开 + wsHeartbeatRef.current = setInterval(() => { + if (socket.readyState === WebSocket.OPEN) { + try { + socket.send(JSON.stringify({ type: "keepalive" })); + } catch { + // ignore heartbeat send failure + } + } + }, 20000); }; socket.onmessage = (event) => { @@ -641,6 +709,10 @@ export function RealtimeAsrSession() { }; socket.onclose = () => { + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } setConnecting(false); setRecording(false); sessionStartedRef.current = false; @@ -668,6 +740,16 @@ export function RealtimeAsrSession() { setStatusText("结束会议中..."); await closeFrontendSocket(true); + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify({ is_speaking: false })); + } + if (wsHeartbeatRef.current !== null) { + clearInterval(wsHeartbeatRef.current); + wsHeartbeatRef.current = null; + } + wsRef.current?.close(); + wsRef.current = null; + sessionStartedRef.current = false; await shutdownAudioPipeline();