diff --git a/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java b/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java
index f940e26..e547594 100644
--- a/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java
+++ b/backend/src/main/java/com/imeeting/config/RealtimeMeetingWebSocketConfig.java
@@ -2,6 +2,9 @@ package com.imeeting.config;
import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler;
import lombok.RequiredArgsConstructor;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.server.WebServerFactoryCustomizer;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
@@ -19,4 +22,25 @@ public class RealtimeMeetingWebSocketConfig implements WebSocketConfigurer {
registry.addHandler(realtimeMeetingProxyWebSocketHandler, "/ws/meeting/realtime")
.setAllowedOriginPatterns("*");
}
+
+ /**
+ * 关闭 Tomcat 内置的 WebSocket keepalive ping 检测(sessionIdleTimeout)。
+ *
+ * Tomcat 默认会对 WebSocket session 设置 sessionIdleTimeout(-1 表示无限,但某些版本默认非 -1),
+ * 并通过后台线程定期发送 Ping 帧,若在超时内未收到 Pong 响应,触发
+ * code=1011 "keepalive ping timeout" 强制断开。
+ * 实时 ASR 场景中,客户端持续发送音频帧,由前端心跳保活,
+ * 因此显式将 sessionIdleTimeout 设为 -1(无限)。
+ *
+ */
+ @Bean
+ public WebServerFactoryCustomizer wsSessionIdleTimeoutCustomizer() {
+ return factory -> factory.addConnectorCustomizers(connector -> {
+ // 通过系统属性通知 Tomcat WS 容器关闭 sessionIdleTimeout 检查
+ // org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT=-1
+ if (System.getProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT") == null) {
+ System.setProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT", "-1");
+ }
+ });
+ }
}
diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java
index d19ac61..c5aba29 100644
--- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java
+++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java
@@ -42,25 +42,25 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
private static final String ATTR_MEETING_ID = "meetingId";
private static final String ATTR_TARGET_WS_URL = "targetWsUrl";
- private static final String ATTR_PROVIDER = "provider";
+ private static final String ATTR_PROVIDER = "provider";
private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount";
private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount";
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
- private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService;
- private final RealtimeAsrChannelFactory realtimeAsrChannelFactory;
- private final ConcurrentMap meetingSessions = new ConcurrentHashMap<>();
- private final ConcurrentMap meetingLocks = new ConcurrentHashMap<>();
+ private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService;
+ private final RealtimeAsrChannelFactory realtimeAsrChannelFactory;
+ private final ConcurrentMap meetingSessions = new ConcurrentHashMap<>();
+ private final ConcurrentMap meetingLocks = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionToken = extractQueryParam(session.getUri(), "sessionToken");
RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken);
if (sessionData == null) {
- log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId());
+ log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId());
session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效"));
return;
}
@@ -69,65 +69,70 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024);
session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId());
session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl());
- session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider());
+ session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider());
session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger());
session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger());
realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId());
- log.info("实时会议 websocket 已接入:meetingId={}, sessionId={}, provider={}, upstream={}",
- sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl());
+ log.info("实时会议 websocket 已接入:meetingId={}, sessionId={}, provider={}, upstream={}",
+ sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl());
- attachFrontendSession(sessionData, session, frontendSession);
+ attachFrontendSession(sessionData, session, frontendSession);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
- MeetingChannelSession meetingSession = getMeetingSession(session);
- if (meetingSession == null || !meetingSession.isChannelOpen()) {
- log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}",
+ // 过滤前端发来的心跳保活消息,不转发给上游 ASR 服务
+ if (looksLikeKeepaliveMessage(message.getPayload())) {
+ log.debug("Frontend keepalive received, ignored: meetingId={}, sessionId={}",
+ session.getAttributes().get(ATTR_MEETING_ID), session.getId());
+ return;
+ }
+ MeetingChannelSession meetingSession = getMeetingSession(session);
+ if (meetingSession == null || !meetingSession.isChannelOpen()) {
+ log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return;
}
int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT);
- String payload = message.getPayload();
- log.info("前端文本 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, payload={}",
- session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload));
- meetingSession.channel.handleFrontendText(meetingSession.context, payload);
+ String payload = message.getPayload();
+ log.info("前端文本 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, payload={}",
+ session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload));
+ meetingSession.channel.handleFrontendText(meetingSession.context, payload);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
- MeetingChannelSession meetingSession = getMeetingSession(session);
- if (meetingSession == null || !meetingSession.isChannelOpen()) {
- log.warn("前端音频帧已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}",
+ MeetingChannelSession meetingSession = getMeetingSession(session);
+ if (meetingSession == null || !meetingSession.isChannelOpen()) {
+ log.warn("前端音频帧已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return;
}
int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT);
int bytes = message.getPayloadLength();
if (shouldLogBinaryFrame(count)) {
- log.info("前端音频帧 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, bytes={}",
- session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes);
+ log.info("前端音频帧 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, bytes={}",
+ session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes);
}
byte[] payload = toByteArray(message.getPayload());
realtimeMeetingAudioStorageService.append(session.getId(), payload);
- meetingSession.channel.handleFrontendBinary(meetingSession.context, payload);
+ meetingSession.channel.handleFrontendBinary(meetingSession.context, payload);
}
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
- if (getMeetingSession(session) == null) {
- return;
- }
- log.debug("前端 pong 已在本地忽略:meetingId={}, sessionId={}, bytes={}",
- session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength());
+ // Pong 是浏览器对服务端(Tomcat)发出 Ping 帧的回应,代理层在此消化即可,无需转发给上游 ASR。
+ // 转发 Pong 给上游没有语义意义,且可能引起上游协议混乱。
+ log.debug("Frontend pong received (keepalive): meetingId={}, sessionId={}",
+ session.getAttributes().get(ATTR_MEETING_ID), session.getId());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
- log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}",
+ log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception);
- detachFrontend(session);
- realtimeMeetingAudioStorageService.closeSession(session.getId());
+ detachFrontend(session);
+ realtimeMeetingAudioStorageService.closeSession(session.getId());
if (session.isOpen()) {
session.close(CloseStatus.SERVER_ERROR);
}
@@ -135,159 +140,159 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
- log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}",
+ log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason());
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) {
- detachFrontend(meetingId, session.getId());
+ detachFrontend(meetingId, session.getId());
realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId());
}
realtimeMeetingAudioStorageService.closeSession(session.getId());
}
- public void closeMeetingSession(Long meetingId) {
- if (meetingId == null) {
- return;
- }
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null || meetingSession.channel == null) {
- return;
- }
- meetingSession.channel.closeMeeting(meetingSession.context);
- }
-
- private void attachFrontendSession(RealtimeSocketSessionData sessionData,
- WebSocketSession rawSession,
- ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
- Long meetingId = sessionData.getMeetingId();
- MeetingChannelSession meetingSession;
- boolean reused = false;
- synchronized (lockForMeeting(meetingId)) {
- meetingSession = meetingSessions.get(meetingId);
- if (meetingSession != null && meetingSession.isChannelOpen()) {
- String previousSessionId = meetingSession.context.getRawSession() == null
- ? null
- : meetingSession.context.getRawSession().getId();
- meetingSession.clearFrontendIfClosed();
- if (previousSessionId != null && !meetingSession.hasOpenFrontend()) {
- realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId);
+ public void closeMeetingSession(Long meetingId) {
+ if (meetingId == null) {
+ return;
}
- if (meetingSession.hasOpenFrontend()) {
- sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接");
- frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接"));
- realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
- return;
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null || meetingSession.channel == null) {
+ return;
}
- if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) {
- sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议");
- frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议"));
- realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
- return;
+ meetingSession.channel.closeMeeting(meetingSession.context);
+ }
+
+ private void attachFrontendSession(RealtimeSocketSessionData sessionData,
+ WebSocketSession rawSession,
+ ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
+ Long meetingId = sessionData.getMeetingId();
+ MeetingChannelSession meetingSession;
+ boolean reused = false;
+ synchronized (lockForMeeting(meetingId)) {
+ meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession != null && meetingSession.isChannelOpen()) {
+ String previousSessionId = meetingSession.context.getRawSession() == null
+ ? null
+ : meetingSession.context.getRawSession().getId();
+ meetingSession.clearFrontendIfClosed();
+ if (previousSessionId != null && !meetingSession.hasOpenFrontend()) {
+ realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId);
+ }
+ if (meetingSession.hasOpenFrontend()) {
+ sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接");
+ frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接"));
+ realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
+ return;
+ }
+ if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) {
+ sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议");
+ frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议"));
+ realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
+ return;
+ }
+ meetingSession.bindFrontend(rawSession, frontendSession);
+ reused = true;
+ } else {
+ RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider());
+ RealtimeAsrChannelContext context = new RealtimeAsrChannelContext();
+ context.setMeetingId(meetingId);
+ context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider()));
+ context.setTargetWsUrl(sessionData.getTargetWsUrl());
+ context.setCallback(new HandlerChannelCallback());
+ context.bindFrontendSession(rawSession, frontendSession);
+ context.getChannelState().put("modelCode", sessionData.getModelCode());
+ context.getChannelState().put("mediaConfig", sessionData.getMediaConfig());
+ meetingSession = new MeetingChannelSession(meetingId, channel, context);
+ meetingSessions.put(meetingId, meetingSession);
+ }
}
- meetingSession.bindFrontend(rawSession, frontendSession);
- reused = true;
- } else {
- RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider());
- RealtimeAsrChannelContext context = new RealtimeAsrChannelContext();
- context.setMeetingId(meetingId);
- context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider()));
- context.setTargetWsUrl(sessionData.getTargetWsUrl());
- context.setCallback(new HandlerChannelCallback());
- context.bindFrontendSession(rawSession, frontendSession);
- context.getChannelState().put("modelCode", sessionData.getModelCode());
- context.getChannelState().put("mediaConfig", sessionData.getMediaConfig());
- meetingSession = new MeetingChannelSession(meetingId, channel, context);
- meetingSessions.put(meetingId, meetingSession);
- }
- }
- if (reused) {
- sendProxyReady(frontendSession);
- replayCachedMessages(meetingId, frontendSession);
- return;
- }
-
- try {
- meetingSession.channel.connect(meetingSession.context);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- removeMeetingSession(meetingId, meetingSession);
- log.error("连接上游 ASR websocket 时被中断:meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
- sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断");
- realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
- frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
- } catch (Exception ex) {
- removeMeetingSession(meetingId, meetingSession);
- log.warn("连接上游 ASR websocket 失败:meetingId={}, provider={}, target={}",
- meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex);
- sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败");
- realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
- frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
- }
- }
-
- private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) {
- try {
- if (!frontendSession.isOpen()) {
- return;
- }
- for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) {
- frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item)));
- }
- } catch (Exception ex) {
- log.warn("回放缓存转写消息失败:meetingId={}", meetingId, ex);
- }
- }
-
- private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
- if (frontendSession.isOpen()) {
- frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
- }
- }
-
- private void detachFrontend(WebSocketSession session) {
- Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
- if (meetingIdValue instanceof Long meetingId) {
- detachFrontend(meetingId, session.getId());
- }
- }
-
- private void detachFrontend(Long meetingId, String sessionId) {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
- return;
+ if (reused) {
+ sendProxyReady(frontendSession);
+ replayCachedMessages(meetingId, frontendSession);
+ return;
}
- synchronized (lockForMeeting(meetingId)) {
- if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) {
- meetingSession.channel.onFrontendDetached(meetingSession.context);
- }
- meetingSession.detachFrontend(sessionId);
- }
- }
- private MeetingChannelSession getMeetingSession(WebSocketSession session) {
- Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
- if (!(meetingIdValue instanceof Long meetingId)) {
- return null;
- }
- return meetingSessions.get(meetingId);
- }
-
- void removeMeetingSession(Long meetingId) {
- synchronized (lockForMeeting(meetingId)) {
- meetingSessions.remove(meetingId);
- }
- }
-
- void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) {
- synchronized (lockForMeeting(meetingId)) {
- meetingSessions.remove(meetingId, meetingSession);
+ try {
+ meetingSession.channel.connect(meetingSession.context);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ removeMeetingSession(meetingId, meetingSession);
+ log.error("连接上游 ASR websocket 时被中断:meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
+ sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断");
+ realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
+ frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
+ } catch (Exception ex) {
+ removeMeetingSession(meetingId, meetingSession);
+ log.warn("连接上游 ASR websocket 失败:meetingId={}, provider={}, target={}",
+ meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex);
+ sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败");
+ realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
+ frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
}
}
- private Object lockForMeeting(Long meetingId) {
- return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object());
- }
+ private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) {
+ try {
+ if (!frontendSession.isOpen()) {
+ return;
+ }
+ for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) {
+ frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item)));
+ }
+ } catch (Exception ex) {
+ log.warn("回放缓存转写消息失败:meetingId={}", meetingId, ex);
+ }
+ }
+
+ private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
+ if (frontendSession.isOpen()) {
+ frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
+ }
+ }
+
+ private void detachFrontend(WebSocketSession session) {
+ Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
+ if (meetingIdValue instanceof Long meetingId) {
+ detachFrontend(meetingId, session.getId());
+ }
+ }
+
+ private void detachFrontend(Long meetingId, String sessionId) {
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
+ return;
+ }
+ synchronized (lockForMeeting(meetingId)) {
+ if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) {
+ meetingSession.channel.onFrontendDetached(meetingSession.context);
+ }
+ meetingSession.detachFrontend(sessionId);
+ }
+ }
+
+ private MeetingChannelSession getMeetingSession(WebSocketSession session) {
+ Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
+ if (!(meetingIdValue instanceof Long meetingId)) {
+ return null;
+ }
+ return meetingSessions.get(meetingId);
+ }
+
+ void removeMeetingSession(Long meetingId) {
+ synchronized (lockForMeeting(meetingId)) {
+ meetingSessions.remove(meetingId);
+ }
+ }
+
+ void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) {
+ synchronized (lockForMeeting(meetingId)) {
+ meetingSessions.remove(meetingId, meetingSession);
+ }
+ }
+
+ private Object lockForMeeting(Long meetingId) {
+ return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object());
+ }
private String extractQueryParam(URI uri, String key) {
if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) {
@@ -316,18 +321,18 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return 0;
}
- private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) {
- try {
- if (!frontendSession.isOpen()) {
- return;
- }
- Map payload = new HashMap<>();
- payload.put("type", "error");
- payload.put("code", code);
- payload.put("message", message);
- frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload)));
- } catch (Exception ex) {
- log.warn("向前端发送实时代理错误消息失败:code={}", code, ex);
+ private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) {
+ try {
+ if (!frontendSession.isOpen()) {
+ return;
+ }
+ Map payload = new HashMap<>();
+ payload.put("type", "error");
+ payload.put("code", code);
+ payload.put("message", message);
+ frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload)));
+ } catch (Exception ex) {
+ log.warn("向前端发送实时代理错误消息失败:code={}", code, ex);
}
}
@@ -346,108 +351,116 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return normalized.substring(0, 240) + "...";
}
- static final class MeetingChannelSession {
- private final Long meetingId;
- private final RealtimeAsrChannel channel;
- private final RealtimeAsrChannelContext context;
-
- private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) {
- this.meetingId = meetingId;
- this.channel = channel;
- this.context = context;
+ private boolean looksLikeKeepaliveMessage(String payload) {
+ if (payload == null || payload.isBlank()) {
+ return false;
+ }
+ String normalized = payload.replaceAll("\\s+", "");
+ return normalized.contains("\"type\":\"keepalive\"");
}
- private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) {
- context.bindFrontendSession(rawSession, frontendSession);
+ static final class MeetingChannelSession {
+ private final Long meetingId;
+ private final RealtimeAsrChannel channel;
+ private final RealtimeAsrChannelContext context;
+
+ private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) {
+ this.meetingId = meetingId;
+ this.channel = channel;
+ this.context = context;
}
- private void detachFrontend(String sessionId) {
- if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) {
- context.bindFrontendSession(null, null);
- }
+ private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) {
+ context.bindFrontendSession(rawSession, frontendSession);
+ }
+
+ private void detachFrontend(String sessionId) {
+ if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) {
+ context.bindFrontendSession(null, null);
+ }
+ }
+
+ private void clearFrontendIfClosed() {
+ if (context.getRawSession() != null && !context.getRawSession().isOpen()) {
+ context.bindFrontendSession(null, null);
+ }
+ }
+
+ private boolean hasOpenFrontend() {
+ return context.getFrontendSession() != null
+ && context.getFrontendSession().isOpen()
+ && context.getRawSession() != null
+ && context.getRawSession().isOpen();
+ }
+
+ private boolean isChannelOpen() {
+ return channel != null && channel.isOpen(context);
+ }
}
- private void clearFrontendIfClosed() {
- if (context.getRawSession() != null && !context.getRawSession().isOpen()) {
- context.bindFrontendSession(null, null);
- }
- }
-
- private boolean hasOpenFrontend() {
- return context.getFrontendSession() != null
- && context.getFrontendSession().isOpen()
- && context.getRawSession() != null
- && context.getRawSession().isOpen();
- }
-
- private boolean isChannelOpen() {
- return channel != null && channel.isOpen(context);
- }
- }
-
- private final class HandlerChannelCallback implements RealtimeAsrChannelCallback {
+ private final class HandlerChannelCallback implements RealtimeAsrChannelCallback {
@Override
public void onChannelOpen(Long meetingId) throws Exception {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
return;
}
- ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
- if (frontendSession != null && frontendSession.isOpen()) {
- sendProxyReady(frontendSession);
- }
+ ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
+ if (frontendSession != null && frontendSession.isOpen()) {
+ sendProxyReady(frontendSession);
+ }
}
- @Override
- public void sendFrontendText(Long meetingId, String payload) throws Exception {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
+ @Override
+ public void sendFrontendText(Long meetingId, String payload) throws Exception {
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
return;
}
- ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
- if (frontendSession != null && frontendSession.isOpen()) {
- frontendSession.sendMessage(new TextMessage(payload));
- }
+ ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
+ if (frontendSession != null && frontendSession.isOpen()) {
+ frontendSession.sendMessage(new TextMessage(payload));
+ }
}
@Override
public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
- return;
- }
- ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
- if (frontendSession != null && frontendSession.isOpen()) {
- frontendSession.sendMessage(new BinaryMessage(payload));
- }
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
+ return;
+ }
+ ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
+ if (frontendSession != null && frontendSession.isOpen()) {
+ frontendSession.sendMessage(new BinaryMessage(payload));
+ }
}
@Override
public void sendFrontendError(Long meetingId, String code, String message) {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
- return;
- }
- ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
- if (frontendSession != null) {
- RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message);
- }
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
+ return;
+ }
+ ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
+ if (frontendSession != null) {
+ RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message);
+ }
}
@Override
public void removeMeetingSession(Long meetingId) {
- RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId);
+ RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId);
}
@Override
public void closeFrontend(Long meetingId, CloseStatus status) {
- MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
- if (meetingSession == null) {
- return;
- }
+ MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
+ if (meetingSession == null) {
+ return;
+ }
try {
- WebSocketSession rawSession = meetingSession.context.getRawSession();
- if (rawSession != null && rawSession.isOpen()) {
+ WebSocketSession rawSession = meetingSession.context.getRawSession();
+ if (rawSession != null && rawSession.isOpen()) {
rawSession.close(status);
}
} catch (Exception ignored) {
diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx
index 2dbd793..ee65572 100644
--- a/frontend/src/pages/business/RealtimeAsrSession.tsx
+++ b/frontend/src/pages/business/RealtimeAsrSession.tsx
@@ -244,6 +244,7 @@ export function RealtimeAsrSession() {
const [sessionStatus, setSessionStatus] = useState(null);
const transcriptRef = useRef(null);
const wsRef = useRef(null);
+ const wsHeartbeatRef = useRef | null>(null);
const audioContextRef = useRef(null);
const processorRef = useRef(null);
const audioSourceRef = useRef(null);
@@ -373,6 +374,33 @@ export function RealtimeAsrSession() {
return () => window.removeEventListener("pagehide", handlePageHide);
}, [meetingId]);
+ // 组件卸载(切换路由)时统一清理所有资源,防止定时器、WebSocket、音频管道泄漏
+ useEffect(() => {
+ return () => {
+ // 1. 清理心跳定时器
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
+ // 2. 关闭 WebSocket(移除回调避免触发不必要的状态更新)
+ if (wsRef.current) {
+ wsRef.current.onclose = null;
+ wsRef.current.onerror = null;
+ wsRef.current.onmessage = null;
+ wsRef.current.close();
+ wsRef.current = null;
+ }
+ // 3. 关闭音频管道(同步处理,无需 await)
+ processorRef.current?.disconnect();
+ audioSourceRef.current?.disconnect();
+ streamRef.current?.getTracks().forEach((t) => t.stop());
+ if (audioContextRef.current && audioContextRef.current.state !== "closed") {
+ void audioContextRef.current.close();
+ }
+ };
+ // eslint-disable-next-line react-hooks/exhaustive-deps
+ }, []);
+
const shutdownAudioPipeline = async () => {
processorRef.current?.disconnect();
audioSourceRef.current?.disconnect();
@@ -433,6 +461,10 @@ export function RealtimeAsrSession() {
setRecording(false);
setStatusText("连接失败");
sessionStartedRef.current = false;
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
wsRef.current?.close();
wsRef.current = null;
await shutdownAudioPipeline();
@@ -530,6 +562,13 @@ export function RealtimeAsrSession() {
}
const pauseRes = await pauseRealtimeMeeting(meetingId);
await closeFrontendSocket(false);
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
+ wsRef.current?.close();
+ wsRef.current = null;
+ sessionStartedRef.current = false;
await shutdownAudioPipeline();
setSessionStatus(pauseRes.data.data);
setRecording(false);
@@ -575,12 +614,41 @@ export function RealtimeAsrSession() {
hotwords: sessionDraft.hotwords || [],
});
const socketSession = socketSessionRes.data.data;
+
+ // 如果已有旧的 WebSocket(比如重连),先强制关闭并清理心跳
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
+ if (wsRef.current) {
+ wsRef.current.onclose = null;
+ wsRef.current.onerror = null;
+ wsRef.current.onmessage = null;
+ wsRef.current.close();
+ wsRef.current = null;
+ }
+
const socket = new WebSocket(buildRealtimeProxyWsUrl(socketSession));
socket.binaryType = "arraybuffer";
wsRef.current = socket;
socket.onopen = () => {
setStatusText("识别服务连接中,等待第三方服务就绪...");
+ // 先清除旧定时器(防止 onopen 被意外重复触发时叠加)
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
+ // 启动心跳保活:每 20 秒发送一次 keepalive JSON,防止服务端 ping timeout 断开
+ wsHeartbeatRef.current = setInterval(() => {
+ if (socket.readyState === WebSocket.OPEN) {
+ try {
+ socket.send(JSON.stringify({ type: "keepalive" }));
+ } catch {
+ // ignore heartbeat send failure
+ }
+ }
+ }, 20000);
};
socket.onmessage = (event) => {
@@ -641,6 +709,10 @@ export function RealtimeAsrSession() {
};
socket.onclose = () => {
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
setConnecting(false);
setRecording(false);
sessionStartedRef.current = false;
@@ -668,6 +740,16 @@ export function RealtimeAsrSession() {
setStatusText("结束会议中...");
await closeFrontendSocket(true);
+ if (wsRef.current?.readyState === WebSocket.OPEN) {
+ wsRef.current.send(JSON.stringify({ is_speaking: false }));
+ }
+ if (wsHeartbeatRef.current !== null) {
+ clearInterval(wsHeartbeatRef.current);
+ wsHeartbeatRef.current = null;
+ }
+ wsRef.current?.close();
+ wsRef.current = null;
+ sessionStartedRef.current = false;
await shutdownAudioPipeline();