From f787f867bba5ebdc162228fdd957d99f97a772d9 Mon Sep 17 00:00:00 2001 From: chenhao Date: Tue, 16 Jun 2026 19:23:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20ASR=20=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E8=AE=B8=E5=8F=AF=E7=BC=93=E5=AD=98=E5=92=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 `AiTaskServiceImpl` 中添加 `MeetingAsrPermitCache` 依赖,用于管理 ASR 任务的许可 - 更新 `claimQueuedAsrTaskForScheduling` 方法,使用 `MeetingAsrPermitCache` 进行许可检查 - 在 `dispatchTasks` 方法中更新并发任务数量计算,考虑许可缓存中的许可数量 - 在 `requeueAsrTask` 和 `finally` 块中移除许可,确保资源释放 - 在 `RedisSupport` 中添加 `addToSetQuietly`、`isSetMemberQuietly` 和 `getSetSizeQuietly` 方法,支持许可缓存操作 --- .../service/biz/impl/AiTaskServiceImpl.java | 57 ++++++++++++------- .../com/imeeting/support/RedisSupport.java | 36 ++++++++++++ .../support/redis/MeetingAsrPermitCache.java | 18 ++++++ 3 files changed, 92 insertions(+), 19 deletions(-) diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java index f276c37..3681ece 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java @@ -31,6 +31,7 @@ import com.imeeting.service.biz.MeetingTranscriptChapterService; import com.imeeting.service.biz.MeetingTranscriptFileService; import com.imeeting.support.TaskSecurityContextRunner; +import com.imeeting.support.redis.MeetingAsrPermitCache; import com.imeeting.support.redis.MeetingLockCache; import com.unisbase.entity.SysUser; import com.unisbase.mapper.SysUserMapper; @@ -73,6 +74,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme private final SysUserMapper sysUserMapper; private final HotWordService hotWordService; private final MeetingLockCache meetingLockCache; + private final MeetingAsrPermitCache meetingAsrPermitCache; private final MeetingProgressService meetingProgressService; private final MeetingPointsService meetingPointsService; private final MeetingSummaryFileService meetingSummaryFileService; @@ -120,6 +122,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme SysUserMapper sysUserMapper, HotWordService hotWordService, MeetingLockCache meetingLockCache, + MeetingAsrPermitCache meetingAsrPermitCache, MeetingProgressService meetingProgressService, MeetingPointsService meetingPointsService, MeetingSummaryFileService meetingSummaryFileService, @@ -136,6 +139,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme this.sysUserMapper = sysUserMapper; this.hotWordService = hotWordService; this.meetingLockCache = meetingLockCache; + this.meetingAsrPermitCache = meetingAsrPermitCache; this.meetingProgressService = meetingProgressService; this.meetingPointsService = meetingPointsService; this.meetingSummaryFileService = meetingSummaryFileService; @@ -228,9 +232,15 @@ public class AiTaskServiceImpl extends ServiceImpl impleme asrTask.setQueuedAt(LocalDateTime.now()); this.updateById(asrTask); } - log.info("[ASR-FLOW] ASR任务处于排队状态,等待调度执行: meetingId={}, asrTaskId={}", meetingId, asrTask.getId()); + if (!meetingAsrPermitCache.hasPermit(meetingId)) { + log.info("[ASR-FLOW] ASR任务处于排队状态,等待调度执行: meetingId={}, asrTaskId={}", meetingId, asrTask.getId()); meetingProgressService.markQueued(meetingId, asrTask, 1, "ASR queued and waiting for execution"); return; + } + if (!markAsrTaskRunningAfterLock(asrTask)) { + meetingAsrPermitCache.removePermit(meetingId); + return; + } } } @@ -291,6 +301,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme updateMeetingStatus(meetingId, 4); updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0); } finally { + meetingAsrPermitCache.removePermit(meetingId); meetingLockCache.releasePollingLock(meetingId); scheduleQueuedAsrTasks(); log.info("[ASR-FLOW] ASR任务流程结束,已释放轮询锁: meetingId={}, costMs={}", @@ -423,9 +434,9 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } try { int maxConcurrent = resolveAsrMaxConcurrent(); - long runningCount = count(new LambdaQueryWrapper() + long runningCount = Math.max(count(new LambdaQueryWrapper() .eq(AiTask::getTaskType, "ASR") - .eq(AiTask::getStatus, 1)); + .eq(AiTask::getStatus, 1)), meetingAsrPermitCache.countPermits()); int available = (int) (maxConcurrent - runningCount); if (available <= 0) { return; @@ -451,6 +462,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme for (AiTask queuedTask : claimedTasks) { Meeting queuedMeeting = meetingMapper.selectByIdIgnoreTenant(queuedTask.getMeetingId()); if (queuedMeeting == null) { + meetingAsrPermitCache.removePermit(queuedTask.getMeetingId()); continue; } self.dispatchTasks(queuedMeeting.getId(), queuedMeeting.getTenantId(), queuedMeeting.getCreatorId()); @@ -475,24 +487,10 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } private boolean claimQueuedAsrTaskForScheduling(AiTask task) { - if (task == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) { + if (task == null || task.getMeetingId() == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) { return false; } - LocalDateTime now = LocalDateTime.now(); - boolean claimed = update(new LambdaUpdateWrapper() - .eq(AiTask::getId, task.getId()) - .eq(AiTask::getStatus, 0) - .set(AiTask::getStatus, 1) - .set(AiTask::getStartedAt, now) - .set(AiTask::getCompletedAt, null) - .set(AiTask::getErrorMsg, null)); - if (!claimed) { - return false; - } - task.setStatus(1); - task.setStartedAt(now); - meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR 任务已开始执行", 0); - return true; + return meetingAsrPermitCache.acquirePermit(task.getMeetingId()); } private int resolveAsrMaxConcurrent() { @@ -507,6 +505,27 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } } + private boolean markAsrTaskRunningAfterLock(AiTask task) { + if (task == null || task.getId() == null) { + return false; + } + LocalDateTime now = LocalDateTime.now(); + boolean updated = update(new LambdaUpdateWrapper() + .eq(AiTask::getId, task.getId()) + .eq(AiTask::getStatus, 0) + .set(AiTask::getStatus, 1) + .set(AiTask::getStartedAt, now) + .set(AiTask::getCompletedAt, null) + .set(AiTask::getErrorMsg, null)); + if (!updated) { + return false; + } + task.setStatus(1); + task.setStartedAt(now); + meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR task started", 0); + return true; + } + private void requeueAsrTask(AiTask task, String reason, boolean clearExternalTaskId) { if (task == null || task.getId() == null) { return; diff --git a/backend/src/main/java/com/imeeting/support/RedisSupport.java b/backend/src/main/java/com/imeeting/support/RedisSupport.java index 5c0ab0d..aff047b 100644 --- a/backend/src/main/java/com/imeeting/support/RedisSupport.java +++ b/backend/src/main/java/com/imeeting/support/RedisSupport.java @@ -109,6 +109,42 @@ public class RedisSupport { } } + public boolean addToSetQuietly(String key, String member) { + if (member == null || member.isBlank()) { + return false; + } + try { + Long added = redisTemplate.opsForSet().add(key, member); + return added != null && added > 0; + } catch (Exception ex) { + log.warn("add Redis set member failed, key={}", key, ex); + return false; + } + } + + public boolean isSetMemberQuietly(String key, String member) { + if (member == null || member.isBlank()) { + return false; + } + try { + Boolean memberPresent = redisTemplate.opsForSet().isMember(key, member); + return Boolean.TRUE.equals(memberPresent); + } catch (Exception ex) { + log.warn("check Redis set member failed, key={}", key, ex); + return false; + } + } + + public long getSetSizeQuietly(String key) { + try { + Long size = redisTemplate.opsForSet().size(key); + return size == null ? 0L : size; + } catch (Exception ex) { + log.warn("read Redis set size failed, key={}", key, ex); + return 0L; + } + } + private String writeJson(Object value) { try { return objectMapper.writeValueAsString(value); diff --git a/backend/src/main/java/com/imeeting/support/redis/MeetingAsrPermitCache.java b/backend/src/main/java/com/imeeting/support/redis/MeetingAsrPermitCache.java index 5f3303e..aae4142 100644 --- a/backend/src/main/java/com/imeeting/support/redis/MeetingAsrPermitCache.java +++ b/backend/src/main/java/com/imeeting/support/redis/MeetingAsrPermitCache.java @@ -29,4 +29,22 @@ public class MeetingAsrPermitCache { } redisSupport.removeFromSetQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId)); } + + public boolean acquirePermit(Long meetingId) { + if (meetingId == null) { + return false; + } + return redisSupport.addToSetQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId)); + } + + public boolean hasPermit(Long meetingId) { + if (meetingId == null) { + return false; + } + return redisSupport.isSetMemberQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId)); + } + + public long countPermits() { + return redisSupport.getSetSizeQuietly(RedisKeys.meetingAsrPermitSetKey()); + } }