diff --git a/backend/pom.xml b/backend/pom.xml index bf0c898..a392a88 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -177,6 +177,12 @@ springdoc-openapi-starter-webmvc-ui 2.3.0 + + + com.tencentcloudapi + tencentcloud-speech-sdk-java + 1.0.67 + diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java index 7906eba..ab46c51 100644 --- a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java @@ -27,4 +27,6 @@ public class RealtimeMeetingResumeConfig { private Boolean saveAudio; @Schema(description = "热词列表") private List> hotwords; + @Schema(description = "腾讯说话人上下文 ID") + private String speakerContextId; } diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java index 4cfcbd1..f1941d9 100644 --- a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java @@ -5,6 +5,7 @@ import lombok.Data; @Data public class RealtimeMeetingTranscriptCacheItem { private String sentenceKey; + private String sentenceGroupKey; private Integer sentenceId; private Integer sentenceType; private String speakerId; diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java index bc56143..6bd122f 100644 --- a/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java @@ -2,6 +2,8 @@ package com.imeeting.dto.biz; import lombok.Data; +import java.util.Map; + @Data public class RealtimeSocketSessionData { private Long meetingId; @@ -10,4 +12,6 @@ public class RealtimeSocketSessionData { private Long asrModelId; private String provider; private String targetWsUrl; + private String modelCode; + private Map mediaConfig; } diff --git a/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java b/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java index fe5de13..5d241bd 100644 --- a/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java +++ b/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java @@ -5,8 +5,7 @@ import lombok.Getter; @Getter public enum ModelProviderEnum { LOCAL("local", "本地"), - - ; + TENCENT("tencent", "腾讯云"); private final String code; private final String description; diff --git a/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java index 3f56ce6..77f74bb 100644 --- a/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java +++ b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java @@ -11,6 +11,8 @@ public interface RealtimeMeetingSessionStateService { void rememberResumeConfig(Long meetingId, RealtimeMeetingResumeConfig resumeConfig); + void rememberSpeakerContext(Long meetingId, String speakerContextId); + void assertCanOpenSession(Long meetingId); boolean activate(Long meetingId, String connectionId); diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java index ddf3a05..36e31a6 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java @@ -45,6 +45,10 @@ public class AiModelServiceImpl implements AiModelService { private static final String TYPE_ASR = "ASR"; private static final String TYPE_LLM = "LLM"; + private static final String TENCENT_PROVIDER = "tencent"; + private static final String MEDIA_TENCENT_APP_ID = "tencentAppId"; + private static final String MEDIA_TENCENT_SECRET_ID = "tencentSecretId"; + private static final String MEDIA_TENCENT_SECRET_KEY = "tencentSecretKey"; private static final int DEFAULT_SORT_ORDER = 0; private static final String DEFAULT_LLM_API_PATH = "/v1/chat/completions"; private static final String DEFAULT_ANTHROPIC_API_PATH = "/messages"; @@ -741,6 +745,7 @@ public class AiModelServiceImpl implements AiModelService { if (Integer.valueOf(1).equals(dto.getIsDefault()) && !Integer.valueOf(1).equals(dto.getStatus())) { throw new RuntimeException("默认模型必须为启用状态"); } + validateTencentAsrConfig(dto); // if ("custom".equals(normalizeProvider(dto.getProvider()))) { // if (TYPE_ASR.equals(normalizeType(dto.getModelType()))) { // Map mediaConfig = dto.getMediaConfig() == null ? Collections.emptyMap() : dto.getMediaConfig(); @@ -833,13 +838,20 @@ public class AiModelServiceImpl implements AiModelService { } private void pushAsrConfig(AsrModel entity) { - if (ModelProviderEnum.LOCAL.getCode().equals(normalizeProvider(entity.getProvider()))) { + String provider = normalizeProvider(entity.getProvider()); + if (ModelProviderEnum.LOCAL.getCode().equals(provider)) { if (entity.getApiKey() == null || entity.getApiKey().isBlank()) { log.info("Skip syncing local ASR profile because apiKey is blank, modelName={}", entity.getModelName()); return; } updateLocalProfile(entity); return; + } + if ("custom".equals(provider)) { + return; + } + if (TENCENT_PROVIDER.equals(provider)) { + return; } if (entity.getBaseUrl() == null || entity.getBaseUrl().isBlank()) { throw new RuntimeException("ASR 模型必须配置 baseUrl"); @@ -935,6 +947,28 @@ public class AiModelServiceImpl implements AiModelService { return text.isEmpty() ? null : text; } + private void validateTencentAsrConfig(AiModelDTO dto) { + if (!TYPE_ASR.equals(normalizeType(dto.getModelType()))) { + return; + } + if (!TENCENT_PROVIDER.equals(normalizeProvider(dto.getProvider()))) { + return; + } + Map mediaConfig = dto.getMediaConfig() == null ? Collections.emptyMap() : dto.getMediaConfig(); + if (readConfigString(mediaConfig.get(MEDIA_TENCENT_APP_ID)) == null) { + throw new RuntimeException("腾讯实时 ASR 模型必须配置 mediaConfig.tencentAppId"); + } + if (readConfigString(mediaConfig.get(MEDIA_TENCENT_SECRET_ID)) == null) { + throw new RuntimeException("腾讯实时 ASR 模型必须配置 mediaConfig.tencentSecretId"); + } + if (readConfigString(mediaConfig.get(MEDIA_TENCENT_SECRET_KEY)) == null) { + throw new RuntimeException("腾讯实时 ASR 模型必须配置 mediaConfig.tencentSecretKey"); + } + if (dto.getModelCode() == null || dto.getModelCode().isBlank()) { + throw new RuntimeException("腾讯实时 ASR 模型必须配置 modelCode"); + } + } + private BigDecimal readConfigDecimal(Object value) { if (value == null) { return null; diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java index 14e9406..744656f 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java @@ -63,6 +63,22 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe public void rememberResumeConfig(Long meetingId, RealtimeMeetingResumeConfig resumeConfig) { RealtimeMeetingSessionState state = getOrCreateState(meetingId); state.setResumeConfig(resumeConfig); + state.setUpdatedAt(System.currentTimeMillis()); + writeState(state); + } + + @Override + public void rememberSpeakerContext(Long meetingId, String speakerContextId) { + if (meetingId == null || speakerContextId == null || speakerContextId.isBlank()) { + return; + } + RealtimeMeetingSessionState state = getOrCreateState(meetingId); + RealtimeMeetingResumeConfig resumeConfig = state.getResumeConfig(); + if (resumeConfig == null) { + resumeConfig = new RealtimeMeetingResumeConfig(); + state.setResumeConfig(resumeConfig); + } + resumeConfig.setSpeakerContextId(speakerContextId.trim()); state.setUpdatedAt(System.currentTimeMillis()); writeState(state); } diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java index 1bb3e81..b15fcf7 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java @@ -39,10 +39,10 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS Boolean enableTextRefine, Boolean saveAudio, List> hotwords, LoginUser loginUser) { if (meetingId == null) { - throw new RuntimeException("浼氳 ID 涓嶈兘涓虹┖"); + throw new RuntimeException("会议 ID 不能为空"); } if (asrModelId == null) { - throw new RuntimeException("ASR 妯″瀷 ID 涓嶈兘涓虹┖"); + throw new RuntimeException("ASR 模型 ID 不能为空"); } Meeting meeting = meetingAccessService.requireMeeting(meetingId); @@ -53,13 +53,13 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); if (asrModel == null) { - throw new RuntimeException("ASR 妯″瀷涓嶅瓨鍦?"); + throw new RuntimeException("ASR 模型不存在"); } RealtimeAsrChannel realtimeAsrChannel = realtimeAsrChannelFactory.getRequired(asrModel.getProvider()); String targetWsUrl = realtimeAsrChannel.resolveTargetWsUrl(asrModel); if (targetWsUrl == null || targetWsUrl.isBlank()) { - throw new RuntimeException("ASR 妯″瀷鏈厤缃?WebSocket 鍦板潃"); + throw new RuntimeException("ASR 模型未配置 WebSocket 地址"); } RealtimeMeetingResumeConfig resumeConfig = new RealtimeMeetingResumeConfig(); @@ -87,6 +87,8 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS sessionData.setAsrModelId(asrModelId); sessionData.setProvider(realtimeAsrChannelFactory.normalizeProvider(asrModel.getProvider())); sessionData.setTargetWsUrl(targetWsUrl); + sessionData.setModelCode(asrModel.getModelCode()); + sessionData.setMediaConfig(asrModel.getMediaConfig()); String sessionToken = UUID.randomUUID().toString().replace("-", ""); socketSessionCache.save(sessionToken, sessionData); diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java index 382fbf1..c7cc23e 100644 --- a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java @@ -26,6 +26,10 @@ public interface RealtimeAsrChannel { void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload); + default void onFrontendDetached(RealtimeAsrChannelContext context) { + // default no-op + } + void closeMeeting(RealtimeAsrChannelContext context); boolean isOpen(RealtimeAsrChannelContext context); diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java index 57b26da..235b14a 100644 --- a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java @@ -100,17 +100,22 @@ public class RealtimeMeetingTranscriptCacheServiceImpl implements RealtimeMeetin Integer sentenceId = sentence.has("sentence_id") && sentence.get("sentence_id").canConvertToInt() ? sentence.get("sentence_id").asInt() : null; - String sentenceKey = sentenceId == null ? "sentence-" + nextLegacySequence(state) : "sentence-" + sentenceId; + String upstreamSentenceKey = readText(sentence, "sentence_key"); + String sentenceKey = upstreamSentenceKey != null && !upstreamSentenceKey.isBlank() + ? upstreamSentenceKey + : sentenceId == null ? "sentence-" + nextLegacySequence(state) : "sentence-" + sentenceId; RealtimeMeetingTranscriptCacheItem item = findBySentenceKey(state, sentenceKey); long now = System.currentTimeMillis(); if (item == null) { item = new RealtimeMeetingTranscriptCacheItem(); item.setSentenceKey(sentenceKey); + item.setSentenceGroupKey(upstreamSentenceKey); item.setSentenceId(sentenceId); item.setSortOrder(nextSortOrder(state)); item.setFirstReceivedAt(now); state.getItems().add(item); } + item.setSentenceGroupKey(upstreamSentenceKey); item.setSentenceType(readInteger(sentence, "sentence_type")); item.setSpeakerId(resolveSpeakerId(sentence)); item.setSpeakerName(readText(sentence, "speaker_name")); diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/TencentRealtimeAsrChannel.java b/backend/src/main/java/com/imeeting/service/realtime/impl/TencentRealtimeAsrChannel.java new file mode 100644 index 0000000..fd68fd8 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/TencentRealtimeAsrChannel.java @@ -0,0 +1,506 @@ +package com.imeeting.service.realtime.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.imeeting.dto.biz.AiModelVO; +import com.imeeting.enums.ModelProviderEnum; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.RealtimeAsrChannel; +import com.imeeting.service.realtime.RealtimeAsrChannelContext; +import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService; +import com.tencent.asrspeaker.SpeakerConstant; +import com.tencent.asrspeaker.SpeakerRecognitionListener; +import com.tencent.asrspeaker.SpeakerRecognitionResponse; +import com.tencent.asrspeaker.SpeakerRecognizer; +import com.tencent.asrspeaker.SpeakerRecognizerRequest; +import com.tencent.asrspeaker.SpeakerSentenceItem; +import com.tencent.core.ws.Credential; +import com.tencent.core.ws.SpeechClient; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RequiredArgsConstructor +public class TencentRealtimeAsrChannel implements RealtimeAsrChannel { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String TARGET_WS_URL = "tencent-sdk://speaker-recognizer"; + private static final String MEDIA_TENCENT_APP_ID = "tencentAppId"; + private static final String MEDIA_TENCENT_SECRET_ID = "tencentSecretId"; + private static final String MEDIA_TENCENT_SECRET_KEY = "tencentSecretKey"; + private static final String STATE_CONNECTED = "tencentConnected"; + private static final String STATE_STARTED = "tencentStarted"; + private static final String STATE_RECOGNIZER = "tencentRecognizer"; + private static final String STATE_SPEECH_CLIENT = "tencentSpeechClient"; + private static final String STATE_STOP_REQUESTED = "tencentStopRequested"; + private static final String STATE_MEETING_COMPLETE_REQUESTED = "tencentMeetingCompleteRequested"; + private static final String STATE_FRONTEND_DETACHED = "tencentFrontendDetached"; + private static final String STATE_SPEAKER_CONTEXT_ID = "speakerContextId"; + private static final String STATE_VOICE_ID = "voiceId"; + private static final String STATE_PENDING_AUDIO_FRAMES = "pendingAudioFrames"; + private static final String STATE_MODEL_CODE = "modelCode"; + private static final String STATE_MEDIA_CONFIG = "mediaConfig"; + + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; + + @Override + public boolean supports(String provider) { + return ModelProviderEnum.TENCENT.getCode().equalsIgnoreCase(provider); + } + + @Override + public String resolveTargetWsUrl(AiModelVO model) { + return TARGET_WS_URL; + } + + @Override + public Map buildStartMessage(AiModelVO model, + String mode, + String language, + Integer useSpkId, + Boolean enablePunctuation, + Boolean enableItn, + Boolean enableTextRefine, + Boolean saveAudio, + List> hotwords) { + Map payload = new HashMap<>(); + payload.put("provider", ModelProviderEnum.TENCENT.getCode()); + payload.put("engine_model_type", model.getModelCode()); + payload.put("language", language); + + Map root = new HashMap<>(); + root.put("type", "start"); + root.put("payload", payload); + return root; + } + + @Override + public void connect(RealtimeAsrChannelContext context) throws Exception { + String connectionId = currentConnectionId(context); + if (connectionId == null || !realtimeMeetingSessionStateService.activate(context.getMeetingId(), connectionId)) { + context.getCallback().sendFrontendError(context.getMeetingId(), "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议无法激活这条前端连接"); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.POLICY_VIOLATION.withReason("当前会议无法激活这条前端连接")); + return; + } + context.getChannelState().put(STATE_CONNECTED, Boolean.TRUE); + context.getChannelState().put(STATE_STARTED, Boolean.FALSE); + context.getChannelState().put(STATE_STOP_REQUESTED, Boolean.FALSE); + context.getChannelState().put(STATE_MEETING_COMPLETE_REQUESTED, Boolean.FALSE); + context.getChannelState().put(STATE_FRONTEND_DETACHED, Boolean.FALSE); + context.getChannelState().putIfAbsent(STATE_PENDING_AUDIO_FRAMES, new java.util.ArrayList()); + context.getCallback().onChannelOpen(context.getMeetingId()); + } + + @Override + public void handleFrontendText(RealtimeAsrChannelContext context, String payload) { + if (looksLikeStartMessage(payload)) { + startRecognizerIfNecessary(context); + return; + } + if (looksLikeStopMessage(payload)) { + context.getChannelState().put(STATE_STOP_REQUESTED, Boolean.TRUE); + stopRecognizer(context); + } + } + + @Override + public void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload) { + SpeakerRecognizer recognizer = getRecognizer(context); + if (payload == null || payload.length == 0) { + return; + } + if (recognizer == null) { + queuePendingAudioFrame(context, payload); + return; + } + try { + recognizer.write(payload); + } catch (Exception ex) { + handleChannelFailure(context, "REALTIME_UPSTREAM_ERROR", "腾讯实时 ASR 音频发送失败", ex); + } + } + + @Override + public void closeMeeting(RealtimeAsrChannelContext context) { + context.getChannelState().put(STATE_STOP_REQUESTED, Boolean.TRUE); + context.getChannelState().put(STATE_MEETING_COMPLETE_REQUESTED, Boolean.TRUE); + stopRecognizer(context); + } + + @Override + public boolean isOpen(RealtimeAsrChannelContext context) { + return !Boolean.TRUE.equals(context.getChannelState().get(STATE_MEETING_COMPLETE_REQUESTED)); + } + + @Override + public void onFrontendDetached(RealtimeAsrChannelContext context) { + context.getChannelState().put(STATE_FRONTEND_DETACHED, Boolean.TRUE); + context.getChannelState().put(STATE_STOP_REQUESTED, Boolean.TRUE); + stopRecognizer(context); + } + + static String buildFrontendTranscriptMessage(String sentenceKey, + String text, + boolean isFinal, + Integer sentenceId, + Long startTime, + Long endTime, + Integer speakerId) throws JsonProcessingException { + ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("type", isFinal ? "segment" : "partial"); + ObjectNode data = root.putObject("data"); + data.put("text", text); + data.put("is_final", isFinal); + if (sentenceId != null) { + data.put("sentence_id", sentenceId); + } + if (sentenceKey != null && !sentenceKey.isBlank()) { + data.put("sentence_key", sentenceKey); + } + if (startTime != null) { + data.put("start", startTime / 1000D); + } + if (endTime != null) { + data.put("end", endTime / 1000D); + } + if (speakerId != null) { + data.put("speaker_id", String.valueOf(speakerId)); + } + return OBJECT_MAPPER.writeValueAsString(root); + } + + private void startRecognizerIfNecessary(RealtimeAsrChannelContext context) { + synchronized (context.getChannelState()) { + if (Boolean.TRUE.equals(context.getChannelState().get(STATE_STARTED))) { + return; + } + try { + SpeechClient speechClient = createSpeechClient(); + SpeakerRecognizerRequest request = createRecognizerRequest(context); + SpeakerRecognizer recognizer = createRecognizer(context, speechClient, request); + context.getChannelState().put(STATE_SPEECH_CLIENT, speechClient); + context.getChannelState().put(STATE_RECOGNIZER, recognizer); + context.getChannelState().put(STATE_VOICE_ID, request.getVoiceId()); + recognizer.start(); + context.getChannelState().put(STATE_STARTED, Boolean.TRUE); + flushPendingAudioFrames(context, recognizer); + } catch (Exception ex) { + handleChannelFailure(context, "REALTIME_UPSTREAM_CONNECT_FAILED", "腾讯实时 ASR 启动失败", ex); + } + } + } + + protected SpeechClient createSpeechClient() { + return new SpeechClient(SpeakerConstant.DEFAULT_RT_REQ_URL); + } + + protected SpeakerRecognizer createRecognizer(RealtimeAsrChannelContext context, + SpeechClient speechClient, + SpeakerRecognizerRequest request) { + return new SpeakerRecognizer( + speechClient, + buildCredential(context), + request, + new TencentRecognitionListener(context) + ); + } + + private Credential buildCredential(RealtimeAsrChannelContext context) { + Map mediaConfig = getMediaConfig(context); + String appId = readConfigString(mediaConfig, MEDIA_TENCENT_APP_ID); + String secretId = readConfigString(mediaConfig, MEDIA_TENCENT_SECRET_ID); + String secretKey = readConfigString(mediaConfig, MEDIA_TENCENT_SECRET_KEY); + if (appId == null || secretId == null || secretKey == null) { + throw new RuntimeException("腾讯实时 ASR 会话缺少鉴权配置"); + } + return new Credential(appId, secretId, secretKey); + } + + SpeakerRecognizerRequest createRecognizerRequest(RealtimeAsrChannelContext context) { + SpeakerRecognizerRequest request = SpeakerRecognizerRequest.init(); + request.setEngineModelType(resolveEngineModelType(context)); + request.setVoiceFormat(SpeakerConstant.AUDIO_FORMAT_PCM); + request.setVoiceId(UUID.randomUUID().toString()); + //是否需要vad + request.setNeedVad(1); + //vad静默时间 + request.setVadSilenceTime(1000); +// 分句策略参数 0小1大 + request.setSentenceStrategy(0); + //是否进行阿拉伯数字智能转换 0否1智能 23:打开数学相关转化 + request.setConvertNumMode(1); + + request.setSpeakerDiarization(1); + //启动断点续传 + request.setEnableSpeakerContext(1); + String speakerContextId = resolveSpeakerContextId(context); + if (speakerContextId != null) { + request.setSpeakerContextId(speakerContextId); + } + return request; + } + + private String resolveEngineModelType(RealtimeAsrChannelContext context) { + Object modelCode = context.getChannelState().get(STATE_MODEL_CODE); + if (modelCode instanceof String value && !value.isBlank()) { + return value; + } + return "16k_zh"; + } + + private String resolveSpeakerContextId(RealtimeAsrChannelContext context) { + Object speakerContextId = context.getChannelState().get(STATE_SPEAKER_CONTEXT_ID); + if (speakerContextId instanceof String value && !value.isBlank()) { + return value; + } + var status = realtimeMeetingSessionStateService.getStatus(context.getMeetingId()); + if (status == null || status.getResumeConfig() == null) { + return null; + } + String value = status.getResumeConfig().getSpeakerContextId(); + if (value == null || value.isBlank()) { + return null; + } + context.getChannelState().put(STATE_SPEAKER_CONTEXT_ID, value); + return value; + } + + @SuppressWarnings("unchecked") + private Map getMediaConfig(RealtimeAsrChannelContext context) { + Object mediaConfig = context.getChannelState().get(STATE_MEDIA_CONFIG); + if (mediaConfig instanceof Map map) { + return (Map) map; + } + return Map.of(); + } + + private String readConfigString(Map mediaConfig, String key) { + Object value = mediaConfig.get(key); + if (value == null) { + return null; + } + String text = String.valueOf(value).trim(); + return text.isEmpty() ? null : text; + } + + private SpeakerRecognizer getRecognizer(RealtimeAsrChannelContext context) { + Object recognizer = context.getChannelState().get(STATE_RECOGNIZER); + return recognizer instanceof SpeakerRecognizer value ? value : null; + } + + private SpeechClient getSpeechClient(RealtimeAsrChannelContext context) { + Object speechClient = context.getChannelState().get(STATE_SPEECH_CLIENT); + return speechClient instanceof SpeechClient value ? value : null; + } + + private void stopRecognizer(RealtimeAsrChannelContext context) { + SpeakerRecognizer recognizer = getRecognizer(context); + if (recognizer == null) { + shutdownSdkResources(context); + return; + } + try { + recognizer.stop(); + } catch (Exception ex) { + log.warn("Tencent realtime ASR stop failed, meetingId={}, sessionId={}", + context.getMeetingId(), currentConnectionId(context), ex); + shutdownSdkResources(context); + } + } + + private void shutdownSdkResources(RealtimeAsrChannelContext context) { + SpeakerRecognizer recognizer = getRecognizer(context); + if (recognizer != null) { + try { + recognizer.close(); + } catch (Exception ignored) { + // ignore + } + } + SpeechClient speechClient = getSpeechClient(context); + if (speechClient != null) { + try { + speechClient.shutdown(); + } catch (Exception ignored) { + // ignore + } + } + context.getChannelState().remove(STATE_RECOGNIZER); + context.getChannelState().remove(STATE_SPEECH_CLIENT); + context.getChannelState().put(STATE_STARTED, Boolean.FALSE); + } + + private void forwardResponse(RealtimeAsrChannelContext context, + SpeakerRecognitionResponse response, + boolean forceFinal) { + if (response == null || response.getSentences() == null || response.getSentences().getSentenceList() == null) { + return; + } + try { + rememberSpeakerContext(context, response); + String cachePayload = buildCachePayload(response, forceFinal); + realtimeMeetingTranscriptCacheService.mergeUpstreamMessage(context.getMeetingId(), cachePayload); + for (SpeakerSentenceItem item : response.getSentences().getSentenceList()) { + if (item == null || item.getSentence() == null || item.getSentence().trim().isEmpty()) { + continue; + } + boolean isFinal = forceFinal || item.getSentenceType() == 1; + context.getCallback().sendFrontendText( + context.getMeetingId(), + buildFrontendTranscriptMessage( + buildSentenceKey(context, item.getSentenceId()), + item.getSentence().trim(), + isFinal, + item.getSentenceId(), + item.getStartTime(), + item.getEndTime(), + item.getSpeakerId() + ) + ); + } + } catch (Exception ex) { + handleChannelFailure(context, "REALTIME_UPSTREAM_ERROR", "腾讯实时 ASR 结果转发失败", ex); + } + } + + private void rememberSpeakerContext(RealtimeAsrChannelContext context, SpeakerRecognitionResponse response) { + if (response == null || response.getSpeakerContextId() == null || response.getSpeakerContextId().isBlank()) { + return; + } + String speakerContextId = response.getSpeakerContextId().trim(); + context.getChannelState().put(STATE_SPEAKER_CONTEXT_ID, speakerContextId); + realtimeMeetingSessionStateService.rememberSpeakerContext(context.getMeetingId(), speakerContextId); + } + + private String buildSentenceKey(RealtimeAsrChannelContext context, Integer sentenceId) { + Object voiceId = context.getChannelState().get(STATE_VOICE_ID); + if (!(voiceId instanceof String value) || value.isBlank() || sentenceId == null) { + return null; + } + return value + "-" + sentenceId; + } + + @SuppressWarnings("unchecked") + private void queuePendingAudioFrame(RealtimeAsrChannelContext context, byte[] payload) { + Object frames = context.getChannelState().get(STATE_PENDING_AUDIO_FRAMES); + if (frames instanceof List list) { + ((List) list).add(payload.clone()); + return; + } + List next = new java.util.ArrayList<>(); + next.add(payload.clone()); + context.getChannelState().put(STATE_PENDING_AUDIO_FRAMES, next); + } + + @SuppressWarnings("unchecked") + private void flushPendingAudioFrames(RealtimeAsrChannelContext context, SpeakerRecognizer recognizer) { + Object frames = context.getChannelState().get(STATE_PENDING_AUDIO_FRAMES); + if (!(frames instanceof List list) || list.isEmpty()) { + return; + } + List pendingFrames = (List) list; + for (byte[] frame : pendingFrames) { + if (frame != null && frame.length > 0) { + recognizer.write(frame); + } + } + pendingFrames.clear(); + } + + private String buildCachePayload(SpeakerRecognitionResponse response, boolean forceFinal) throws JsonProcessingException { + ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("type", forceFinal ? "end" : "sentences"); + ArrayNode sentences = root.putArray("sentences"); + for (SpeakerSentenceItem item : response.getSentences().getSentenceList()) { + if (item == null || item.getSentence() == null || item.getSentence().trim().isEmpty()) { + continue; + } + ObjectNode sentenceNode = sentences.addObject(); + sentenceNode.put("sentence", item.getSentence().trim()); + sentenceNode.put("sentence_type", forceFinal ? 1 : item.getSentenceType()); + sentenceNode.put("sentence_id", item.getSentenceId()); + sentenceNode.put("speaker_id", String.valueOf(item.getSpeakerId())); + sentenceNode.put("start_time", item.getStartTime()); + sentenceNode.put("end_time", item.getEndTime()); + } + return OBJECT_MAPPER.writeValueAsString(root); + } + + private void handleChannelFailure(RealtimeAsrChannelContext context, String code, String message, Exception ex) { + log.error("Tencent realtime ASR channel failed, meetingId={}, sessionId={}", + context.getMeetingId(), currentConnectionId(context), ex); + shutdownSdkResources(context); + context.getCallback().sendFrontendError(context.getMeetingId(), code, message); + CompletableFuture.delayedExecutor(200, TimeUnit.MILLISECONDS).execute( + () -> context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR) + ); + } + + private String currentConnectionId(RealtimeAsrChannelContext context) { + return context.getRawSession() == null ? null : context.getRawSession().getId(); + } + + private static boolean looksLikeStartMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + String normalized = payload.replaceAll("\\s+", ""); + return normalized.contains("\"type\":\"start\""); + } + + private static boolean looksLikeStopMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + String normalized = payload.replaceAll("\\s+", ""); + return normalized.contains("\"type\":\"stop\""); + } + + private final class TencentRecognitionListener extends SpeakerRecognitionListener { + private final RealtimeAsrChannelContext context; + + private TencentRecognitionListener(RealtimeAsrChannelContext context) { + this.context = context; + } + + @Override + public void onRecognitionStart(SpeakerRecognitionResponse response) { + rememberSpeakerContext(context, response); + log.info("Tencent realtime ASR started, meetingId={}, sessionId={}", + context.getMeetingId(), currentConnectionId(context)); + } + + @Override + public void onRecognitionSentences(SpeakerRecognitionResponse response) { + forwardResponse(context, response, false); + } + + @Override + public void onSentenceEnd(SpeakerRecognitionResponse response) { + forwardResponse(context, response, true); + shutdownSdkResources(context); + if (Boolean.TRUE.equals(context.getChannelState().get(STATE_MEETING_COMPLETE_REQUESTED))) { + context.getCallback().removeMeetingSession(context.getMeetingId()); + } + } + + @Override + public void onFail(SpeakerRecognitionResponse response, Exception error) { + handleChannelFailure(context, "REALTIME_UPSTREAM_ERROR", "腾讯实时 ASR 识别失败", error); + } + } +} diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java index 58f8e3d..d19ac61 100644 --- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java +++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java @@ -165,7 +165,13 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl synchronized (lockForMeeting(meetingId)) { meetingSession = meetingSessions.get(meetingId); if (meetingSession != null && meetingSession.isChannelOpen()) { + String previousSessionId = meetingSession.context.getRawSession() == null + ? null + : meetingSession.context.getRawSession().getId(); meetingSession.clearFrontendIfClosed(); + if (previousSessionId != null && !meetingSession.hasOpenFrontend()) { + realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, previousSessionId); + } if (meetingSession.hasOpenFrontend()) { sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接"); frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接")); @@ -188,6 +194,8 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl context.setTargetWsUrl(sessionData.getTargetWsUrl()); context.setCallback(new HandlerChannelCallback()); context.bindFrontendSession(rawSession, frontendSession); + context.getChannelState().put("modelCode", sessionData.getModelCode()); + context.getChannelState().put("mediaConfig", sessionData.getMediaConfig()); meetingSession = new MeetingChannelSession(meetingId, channel, context); meetingSessions.put(meetingId, meetingSession); } @@ -250,6 +258,9 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return; } synchronized (lockForMeeting(meetingId)) { + if (meetingSession.context.getRawSession() != null && meetingSession.context.getRawSession().getId().equals(sessionId)) { + meetingSession.channel.onFrontendDetached(meetingSession.context); + } meetingSession.detachFrontend(sessionId); } } diff --git a/backend/src/test/java/com/imeeting/service/biz/impl/AiModelServiceImplTest.java b/backend/src/test/java/com/imeeting/service/biz/impl/AiModelServiceImplTest.java index 65a57ce..6fad4b0 100644 --- a/backend/src/test/java/com/imeeting/service/biz/impl/AiModelServiceImplTest.java +++ b/backend/src/test/java/com/imeeting/service/biz/impl/AiModelServiceImplTest.java @@ -349,6 +349,64 @@ class AiModelServiceImplTest { assertNull(captor.getValue().getApiKey()); } + @Test + void saveModelShouldRejectTencentAsrWithoutSecretKey() { + AiModelServiceImpl service = new AiModelServiceImpl( + objectMapper, + mock(AsrModelMapper.class), + mock(LlmModelMapper.class) + ); + + AiModelDTO dto = new AiModelDTO(); + dto.setModelType("ASR"); + dto.setModelName("tencent-asr"); + dto.setProvider("tencent"); + dto.setModelCode("16k_zh"); + dto.setIsDefault(0); + dto.setStatus(1); + dto.setMediaConfig(Map.of( + "tencentAppId", "app-id", + "tencentSecretId", "secret-id" + )); + + RuntimeException ex = assertThrows(RuntimeException.class, () -> service.saveModel(dto)); + assertEquals("腾讯实时 ASR 模型必须配置 mediaConfig.tencentSecretKey", ex.getMessage()); + } + + @Test + void saveModelShouldPersistTencentAsrWithoutBaseUrl() { + AsrModelMapper asrModelMapper = mock(AsrModelMapper.class); + when(asrModelMapper.insert(any(AsrModel.class))).thenReturn(1); + + AiModelServiceImpl service = new AiModelServiceImpl( + objectMapper, + asrModelMapper, + mock(LlmModelMapper.class) + ); + + AiModelDTO dto = new AiModelDTO(); + dto.setModelType("ASR"); + dto.setModelName("tencent-asr"); + dto.setProvider("tencent"); + dto.setModelCode("16k_zh"); + dto.setIsDefault(0); + dto.setStatus(1); + dto.setMediaConfig(Map.of( + "tencentAppId", "app-id", + "tencentSecretId", "secret-id", + "tencentSecretKey", "secret-key" + )); + + service.saveModel(dto); + + ArgumentCaptor captor = ArgumentCaptor.forClass(AsrModel.class); + verify(asrModelMapper, times(1)).insert(captor.capture()); + assertEquals("tencent", captor.getValue().getProvider()); + assertEquals("16k_zh", captor.getValue().getModelCode()); + assertEquals("secret-key", captor.getValue().getMediaConfig().get("tencentSecretKey")); + assertNull(captor.getValue().getBaseUrl()); + } + private void captureRequest(HttpExchange exchange, AtomicReference requestPath, AtomicReference authorization, diff --git a/frontend/src/pages/business/AiModels.tsx b/frontend/src/pages/business/AiModels.tsx index 1c2c180..6886584 100644 --- a/frontend/src/pages/business/AiModels.tsx +++ b/frontend/src/pages/business/AiModels.tsx @@ -70,6 +70,7 @@ const AiModels: React.FC = () => { const provider = Form.useWatch("provider", form); const isDefaultChecked = Form.useWatch("isDefaultChecked", form); const isLocalProvider = String(provider || "").toLowerCase() === "custom"; + const isTencentProvider = String(provider || "").toLowerCase() === "tencent"; const isPlatformAdmin = useMemo(() => { const profileStr = sessionStorage.getItem("userProfile"); @@ -133,11 +134,17 @@ const AiModels: React.FC = () => { setEditingId(record.id); const speakerModel = record.mediaConfig?.speakerModel; const svThreshold = record.mediaConfig?.svThreshold; + const tencentAppId = record.mediaConfig?.tencentAppId; + const tencentSecretId = record.mediaConfig?.tencentSecretId; + const tencentSecretKey = record.mediaConfig?.tencentSecretKey; form.setFieldsValue({ ...record, modelType: record.modelType, speakerModel, svThreshold, + tencentAppId, + tencentSecretId, + tencentSecretKey, isDefaultChecked: record.isDefault === 1, statusChecked: record.status === 1, }); @@ -262,6 +269,12 @@ const AiModels: React.FC = () => { speakerModel: values.speakerModel, svThreshold: values.svThreshold, } + : activeType === "ASR" && isTencentProvider + ? { + tencentAppId: values.tencentAppId, + tencentSecretId: values.tencentSecretId, + tencentSecretKey: values.tencentSecretKey, + } : undefined, temperature: values.temperature, topP: values.topP, @@ -524,16 +537,20 @@ const AiModels: React.FC = () => { - - - + {!isTencentProvider && ( + <> + + + - - - + + + + + )} {(activeType === "LLM" || isLocalProvider) && ( @@ -556,7 +573,10 @@ const AiModels: React.FC = () => { { - + {!isTencentProvider && ( + + )} @@ -609,6 +631,38 @@ const AiModels: React.FC = () => { )} + {activeType === "ASR" && isTencentProvider && ( + + + + + + + + + + + + + + + + + + )} + {activeType === "LLM" && ( <> diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx index 886d4e0..e7a6cc4 100644 --- a/frontend/src/pages/business/RealtimeAsrSession.tsx +++ b/frontend/src/pages/business/RealtimeAsrSession.tsx @@ -43,6 +43,7 @@ type WsMessage = { text?: string; is_final?: boolean; sentence_id?: number; + sentence_key?: string; start?: number; end?: number; speaker_id?: string; @@ -57,6 +58,7 @@ type WsMessage = { type TranscriptCard = { id: string; + sentenceKey?: string; sentenceId?: number; speakerName: string; userId?: string | number; @@ -67,6 +69,7 @@ type TranscriptCard = { }; type NormalizedWsMessage = { + sentenceKey?: string; text: string; isFinal: boolean; sentenceId?: number; @@ -174,7 +177,10 @@ function toMs(value?: number) { return Math.round(value * 1000); } -function buildTranscriptCardId(sentenceId?: number) { +function buildTranscriptCardId(sentenceKey?: string, sentenceId?: number) { + if (sentenceKey) { + return sentenceKey; + } if (sentenceId === undefined || sentenceId === null) { return `live-${Date.now()}-${Math.random()}`; } @@ -192,6 +198,7 @@ function normalizeWsMessage(payload: WsMessage): NormalizedWsMessage | null { return { text: data.text || "", isFinal: payload.type === "segment" || !!data.is_final, + sentenceKey: data.sentence_key, sentenceId: data.sentence_id, speaker: { name: data.speaker_name, @@ -397,9 +404,10 @@ export function RealtimeAsrSession() { const upsertTranscriptCard = (normalized: NormalizedWsMessage, speaker: ReturnType) => { setTranscripts((prev) => { const next = [...prev]; - const cardId = buildTranscriptCardId(normalized.sentenceId); + const cardId = buildTranscriptCardId(normalized.sentenceKey, normalized.sentenceId); const nextCard: TranscriptCard = { id: cardId, + sentenceKey: normalized.sentenceKey, sentenceId: normalized.sentenceId, speakerName: speaker.speakerName, userId: speaker.userId, @@ -408,7 +416,7 @@ export function RealtimeAsrSession() { endTime: normalized.endTime, final: true, }; - if (normalized.sentenceId !== undefined && normalized.sentenceId !== null) { + if (normalized.sentenceKey || (normalized.sentenceId !== undefined && normalized.sentenceId !== null)) { const index = next.findIndex((item) => item.id === cardId); if (index >= 0) { next[index] = {...next[index], ...nextCard}; @@ -496,9 +504,9 @@ export function RealtimeAsrSession() { if (recording && startedAtRef.current) { elapsedOffsetRef.current += Math.floor((Date.now() - startedAtRef.current) / 1000); } + const pauseRes = await pauseRealtimeMeeting(meetingId); await closeFrontendSocket(false); await shutdownAudioPipeline(); - const pauseRes = await pauseRealtimeMeeting(meetingId); setSessionStatus(pauseRes.data.data); setRecording(false); setConnecting(false);