feat: 添加 WebSocket 心跳保活和资源清理逻辑
- 在 `RealtimeMeetingWebSocketConfig` 中添加配置,关闭 Tomcat 内置的 WebSocket keepalive 检测 - 在 `RealtimeAsrSession` 组件中添加心跳定时器和资源清理逻辑,防止资源泄漏 - 在 `RealtimeMeetingProxyWebSocketHandler` 中过滤前端心跳消息,避免转发给上游 ASR 服务dev_na^2
parent
c0cc4b1c27
commit
9c98e670e1
|
|
@ -2,6 +2,9 @@ package com.imeeting.config;
|
|||
|
||||
import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
|
||||
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||
|
|
@ -19,4 +22,25 @@ public class RealtimeMeetingWebSocketConfig implements WebSocketConfigurer {
|
|||
registry.addHandler(realtimeMeetingProxyWebSocketHandler, "/ws/meeting/realtime")
|
||||
.setAllowedOriginPatterns("*");
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭 Tomcat 内置的 WebSocket keepalive ping 检测(sessionIdleTimeout)。
|
||||
* <p>
|
||||
* Tomcat 默认会对 WebSocket session 设置 sessionIdleTimeout(-1 表示无限,但某些版本默认非 -1),
|
||||
* 并通过后台线程定期发送 Ping 帧,若在超时内未收到 Pong 响应,触发
|
||||
* code=1011 "keepalive ping timeout" 强制断开。
|
||||
* 实时 ASR 场景中,客户端持续发送音频帧,由前端心跳保活,
|
||||
* 因此显式将 sessionIdleTimeout 设为 -1(无限)。
|
||||
* </p>
|
||||
*/
|
||||
@Bean
|
||||
public WebServerFactoryCustomizer<TomcatServletWebServerFactory> wsSessionIdleTimeoutCustomizer() {
|
||||
return factory -> factory.addConnectorCustomizers(connector -> {
|
||||
// 通过系统属性通知 Tomcat WS 容器关闭 sessionIdleTimeout 检查
|
||||
// org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT=-1
|
||||
if (System.getProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT") == null) {
|
||||
System.setProperty("org.apache.tomcat.websocket.DEFAULT_SESSION_IDLE_TIMEOUT", "-1");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,6 +81,12 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
|
|||
|
||||
@Override
|
||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
|
||||
// 过滤前端发来的心跳保活消息,不转发给上游 ASR 服务
|
||||
if (looksLikeKeepaliveMessage(message.getPayload())) {
|
||||
log.debug("Frontend keepalive received, ignored: meetingId={}, sessionId={}",
|
||||
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
|
||||
return;
|
||||
}
|
||||
MeetingChannelSession meetingSession = getMeetingSession(session);
|
||||
if (meetingSession == null || !meetingSession.isChannelOpen()) {
|
||||
log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}",
|
||||
|
|
@ -115,11 +121,10 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
|
|||
|
||||
@Override
|
||||
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
|
||||
if (getMeetingSession(session) == null) {
|
||||
return;
|
||||
}
|
||||
log.debug("前端 pong 已在本地忽略:meetingId={}, sessionId={}, bytes={}",
|
||||
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength());
|
||||
// Pong 是浏览器对服务端(Tomcat)发出 Ping 帧的回应,代理层在此消化即可,无需转发给上游 ASR。
|
||||
// 转发 Pong 给上游没有语义意义,且可能引起上游协议混乱。
|
||||
log.debug("Frontend pong received (keepalive): meetingId={}, sessionId={}",
|
||||
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -346,6 +351,14 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
|
|||
return normalized.substring(0, 240) + "...";
|
||||
}
|
||||
|
||||
private boolean looksLikeKeepaliveMessage(String payload) {
|
||||
if (payload == null || payload.isBlank()) {
|
||||
return false;
|
||||
}
|
||||
String normalized = payload.replaceAll("\\s+", "");
|
||||
return normalized.contains("\"type\":\"keepalive\"");
|
||||
}
|
||||
|
||||
static final class MeetingChannelSession {
|
||||
private final Long meetingId;
|
||||
private final RealtimeAsrChannel channel;
|
||||
|
|
|
|||
|
|
@ -244,6 +244,7 @@ export function RealtimeAsrSession() {
|
|||
const [sessionStatus, setSessionStatus] = useState<RealtimeMeetingSessionStatus | null>(null);
|
||||
const transcriptRef = useRef<HTMLDivElement | null>(null);
|
||||
const wsRef = useRef<WebSocket | null>(null);
|
||||
const wsHeartbeatRef = useRef<ReturnType<typeof setInterval> | null>(null);
|
||||
const audioContextRef = useRef<AudioContext | null>(null);
|
||||
const processorRef = useRef<ScriptProcessorNode | null>(null);
|
||||
const audioSourceRef = useRef<MediaStreamAudioSourceNode | null>(null);
|
||||
|
|
@ -373,6 +374,33 @@ export function RealtimeAsrSession() {
|
|||
return () => window.removeEventListener("pagehide", handlePageHide);
|
||||
}, [meetingId]);
|
||||
|
||||
// 组件卸载(切换路由)时统一清理所有资源,防止定时器、WebSocket、音频管道泄漏
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
// 1. 清理心跳定时器
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
// 2. 关闭 WebSocket(移除回调避免触发不必要的状态更新)
|
||||
if (wsRef.current) {
|
||||
wsRef.current.onclose = null;
|
||||
wsRef.current.onerror = null;
|
||||
wsRef.current.onmessage = null;
|
||||
wsRef.current.close();
|
||||
wsRef.current = null;
|
||||
}
|
||||
// 3. 关闭音频管道(同步处理,无需 await)
|
||||
processorRef.current?.disconnect();
|
||||
audioSourceRef.current?.disconnect();
|
||||
streamRef.current?.getTracks().forEach((t) => t.stop());
|
||||
if (audioContextRef.current && audioContextRef.current.state !== "closed") {
|
||||
void audioContextRef.current.close();
|
||||
}
|
||||
};
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, []);
|
||||
|
||||
const shutdownAudioPipeline = async () => {
|
||||
processorRef.current?.disconnect();
|
||||
audioSourceRef.current?.disconnect();
|
||||
|
|
@ -433,6 +461,10 @@ export function RealtimeAsrSession() {
|
|||
setRecording(false);
|
||||
setStatusText("连接失败");
|
||||
sessionStartedRef.current = false;
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
wsRef.current?.close();
|
||||
wsRef.current = null;
|
||||
await shutdownAudioPipeline();
|
||||
|
|
@ -530,6 +562,13 @@ export function RealtimeAsrSession() {
|
|||
}
|
||||
const pauseRes = await pauseRealtimeMeeting(meetingId);
|
||||
await closeFrontendSocket(false);
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
wsRef.current?.close();
|
||||
wsRef.current = null;
|
||||
sessionStartedRef.current = false;
|
||||
await shutdownAudioPipeline();
|
||||
setSessionStatus(pauseRes.data.data);
|
||||
setRecording(false);
|
||||
|
|
@ -575,12 +614,41 @@ export function RealtimeAsrSession() {
|
|||
hotwords: sessionDraft.hotwords || [],
|
||||
});
|
||||
const socketSession = socketSessionRes.data.data;
|
||||
|
||||
// 如果已有旧的 WebSocket(比如重连),先强制关闭并清理心跳
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
if (wsRef.current) {
|
||||
wsRef.current.onclose = null;
|
||||
wsRef.current.onerror = null;
|
||||
wsRef.current.onmessage = null;
|
||||
wsRef.current.close();
|
||||
wsRef.current = null;
|
||||
}
|
||||
|
||||
const socket = new WebSocket(buildRealtimeProxyWsUrl(socketSession));
|
||||
socket.binaryType = "arraybuffer";
|
||||
wsRef.current = socket;
|
||||
|
||||
socket.onopen = () => {
|
||||
setStatusText("识别服务连接中,等待第三方服务就绪...");
|
||||
// 先清除旧定时器(防止 onopen 被意外重复触发时叠加)
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
// 启动心跳保活:每 20 秒发送一次 keepalive JSON,防止服务端 ping timeout 断开
|
||||
wsHeartbeatRef.current = setInterval(() => {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
socket.send(JSON.stringify({ type: "keepalive" }));
|
||||
} catch {
|
||||
// ignore heartbeat send failure
|
||||
}
|
||||
}
|
||||
}, 20000);
|
||||
};
|
||||
|
||||
socket.onmessage = (event) => {
|
||||
|
|
@ -641,6 +709,10 @@ export function RealtimeAsrSession() {
|
|||
};
|
||||
|
||||
socket.onclose = () => {
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
setConnecting(false);
|
||||
setRecording(false);
|
||||
sessionStartedRef.current = false;
|
||||
|
|
@ -668,6 +740,16 @@ export function RealtimeAsrSession() {
|
|||
setStatusText("结束会议中...");
|
||||
|
||||
await closeFrontendSocket(true);
|
||||
if (wsRef.current?.readyState === WebSocket.OPEN) {
|
||||
wsRef.current.send(JSON.stringify({ is_speaking: false }));
|
||||
}
|
||||
if (wsHeartbeatRef.current !== null) {
|
||||
clearInterval(wsHeartbeatRef.current);
|
||||
wsHeartbeatRef.current = null;
|
||||
}
|
||||
wsRef.current?.close();
|
||||
wsRef.current = null;
|
||||
sessionStartedRef.current = false;
|
||||
|
||||
await shutdownAudioPipeline();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue