feat: 添加 ASR 任务许可缓存和优化任务调度逻辑

- 在 `AiTaskServiceImpl` 中添加 `MeetingAsrPermitCache` 依赖,用于管理 ASR 任务的许可
- 更新 `claimQueuedAsrTaskForScheduling` 方法,使用 `MeetingAsrPermitCache` 进行许可检查
- 在 `dispatchTasks` 方法中更新并发任务数量计算,考虑许可缓存中的许可数量
- 在 `requeueAsrTask` 和 `finally` 块中移除许可,确保资源释放
- 在 `RedisSupport` 中添加 `addToSetQuietly`、`isSetMemberQuietly` 和 `getSetSizeQuietly` 方法,支持许可缓存操作
dev_na
chenhao 2026-06-16 19:23:28 +08:00
parent 8d4a31e043
commit f787f867bb
3 changed files with 92 additions and 19 deletions

View File

@ -31,6 +31,7 @@ import com.imeeting.service.biz.MeetingTranscriptChapterService;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.support.TaskSecurityContextRunner;
import com.imeeting.support.redis.MeetingAsrPermitCache;
import com.imeeting.support.redis.MeetingLockCache;
import com.unisbase.entity.SysUser;
import com.unisbase.mapper.SysUserMapper;
@ -73,6 +74,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
private final SysUserMapper sysUserMapper;
private final HotWordService hotWordService;
private final MeetingLockCache meetingLockCache;
private final MeetingAsrPermitCache meetingAsrPermitCache;
private final MeetingProgressService meetingProgressService;
private final MeetingPointsService meetingPointsService;
private final MeetingSummaryFileService meetingSummaryFileService;
@ -120,6 +122,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
SysUserMapper sysUserMapper,
HotWordService hotWordService,
MeetingLockCache meetingLockCache,
MeetingAsrPermitCache meetingAsrPermitCache,
MeetingProgressService meetingProgressService,
MeetingPointsService meetingPointsService,
MeetingSummaryFileService meetingSummaryFileService,
@ -136,6 +139,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
this.sysUserMapper = sysUserMapper;
this.hotWordService = hotWordService;
this.meetingLockCache = meetingLockCache;
this.meetingAsrPermitCache = meetingAsrPermitCache;
this.meetingProgressService = meetingProgressService;
this.meetingPointsService = meetingPointsService;
this.meetingSummaryFileService = meetingSummaryFileService;
@ -228,10 +232,16 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
asrTask.setQueuedAt(LocalDateTime.now());
this.updateById(asrTask);
}
if (!meetingAsrPermitCache.hasPermit(meetingId)) {
log.info("[ASR-FLOW] ASR任务处于排队状态等待调度执行: meetingId={}, asrTaskId={}", meetingId, asrTask.getId());
meetingProgressService.markQueued(meetingId, asrTask, 1, "ASR queued and waiting for execution");
return;
}
if (!markAsrTaskRunningAfterLock(asrTask)) {
meetingAsrPermitCache.removePermit(meetingId);
return;
}
}
}
String asrText = "";
@ -291,6 +301,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
updateMeetingStatus(meetingId, 4);
updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0);
} finally {
meetingAsrPermitCache.removePermit(meetingId);
meetingLockCache.releasePollingLock(meetingId);
scheduleQueuedAsrTasks();
log.info("[ASR-FLOW] ASR任务流程结束已释放轮询锁: meetingId={}, costMs={}",
@ -423,9 +434,9 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
try {
int maxConcurrent = resolveAsrMaxConcurrent();
long runningCount = count(new LambdaQueryWrapper<AiTask>()
long runningCount = Math.max(count(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getTaskType, "ASR")
.eq(AiTask::getStatus, 1));
.eq(AiTask::getStatus, 1)), meetingAsrPermitCache.countPermits());
int available = (int) (maxConcurrent - runningCount);
if (available <= 0) {
return;
@ -451,6 +462,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
for (AiTask queuedTask : claimedTasks) {
Meeting queuedMeeting = meetingMapper.selectByIdIgnoreTenant(queuedTask.getMeetingId());
if (queuedMeeting == null) {
meetingAsrPermitCache.removePermit(queuedTask.getMeetingId());
continue;
}
self.dispatchTasks(queuedMeeting.getId(), queuedMeeting.getTenantId(), queuedMeeting.getCreatorId());
@ -475,24 +487,10 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
private boolean claimQueuedAsrTaskForScheduling(AiTask task) {
if (task == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) {
if (task == null || task.getMeetingId() == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) {
return false;
}
LocalDateTime now = LocalDateTime.now();
boolean claimed = update(new LambdaUpdateWrapper<AiTask>()
.eq(AiTask::getId, task.getId())
.eq(AiTask::getStatus, 0)
.set(AiTask::getStatus, 1)
.set(AiTask::getStartedAt, now)
.set(AiTask::getCompletedAt, null)
.set(AiTask::getErrorMsg, null));
if (!claimed) {
return false;
}
task.setStatus(1);
task.setStartedAt(now);
meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR 任务已开始执行", 0);
return true;
return meetingAsrPermitCache.acquirePermit(task.getMeetingId());
}
private int resolveAsrMaxConcurrent() {
@ -507,6 +505,27 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
}
private boolean markAsrTaskRunningAfterLock(AiTask task) {
if (task == null || task.getId() == null) {
return false;
}
LocalDateTime now = LocalDateTime.now();
boolean updated = update(new LambdaUpdateWrapper<AiTask>()
.eq(AiTask::getId, task.getId())
.eq(AiTask::getStatus, 0)
.set(AiTask::getStatus, 1)
.set(AiTask::getStartedAt, now)
.set(AiTask::getCompletedAt, null)
.set(AiTask::getErrorMsg, null));
if (!updated) {
return false;
}
task.setStatus(1);
task.setStartedAt(now);
meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR task started", 0);
return true;
}
private void requeueAsrTask(AiTask task, String reason, boolean clearExternalTaskId) {
if (task == null || task.getId() == null) {
return;

View File

@ -109,6 +109,42 @@ public class RedisSupport {
}
}
public boolean addToSetQuietly(String key, String member) {
if (member == null || member.isBlank()) {
return false;
}
try {
Long added = redisTemplate.opsForSet().add(key, member);
return added != null && added > 0;
} catch (Exception ex) {
log.warn("add Redis set member failed, key={}", key, ex);
return false;
}
}
public boolean isSetMemberQuietly(String key, String member) {
if (member == null || member.isBlank()) {
return false;
}
try {
Boolean memberPresent = redisTemplate.opsForSet().isMember(key, member);
return Boolean.TRUE.equals(memberPresent);
} catch (Exception ex) {
log.warn("check Redis set member failed, key={}", key, ex);
return false;
}
}
public long getSetSizeQuietly(String key) {
try {
Long size = redisTemplate.opsForSet().size(key);
return size == null ? 0L : size;
} catch (Exception ex) {
log.warn("read Redis set size failed, key={}", key, ex);
return 0L;
}
}
private String writeJson(Object value) {
try {
return objectMapper.writeValueAsString(value);

View File

@ -29,4 +29,22 @@ public class MeetingAsrPermitCache {
}
redisSupport.removeFromSetQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId));
}
public boolean acquirePermit(Long meetingId) {
if (meetingId == null) {
return false;
}
return redisSupport.addToSetQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId));
}
public boolean hasPermit(Long meetingId) {
if (meetingId == null) {
return false;
}
return redisSupport.isSetMemberQuietly(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId));
}
public long countPermits() {
return redisSupport.getSetSizeQuietly(RedisKeys.meetingAsrPermitSetKey());
}
}