package com.whdc.component; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.whdc.mapper.AutoCallConfigMapper; import com.whdc.mapper.AutoCallPersonMapper; import com.whdc.model.autocall.AutoCallPerson; import com.whdc.model.autocall.AutoCallTask; import com.whdc.service.autocall.AutoCallTaskService2; import com.whdc.utils.AutoCallHelper; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** * @author lyf * @since 2025-06-20 */ @Component @Slf4j @Profile({"default"}) public class AutoCallTaskScheduled { @Autowired private AutoCallTaskService2 autoCallTaskService; @Autowired private AutoCallPersonMapper personMapper; @Autowired private AutoCallConfigMapper configMapper; @Autowired private AutoCallHelper autoCallHelper; private final AtomicBoolean initialized = new AtomicBoolean(false); private final ExecutorService executorService = Executors.newCachedThreadPool(); @EventListener(ApplicationReadyEvent.class) public void initialize() throws Exception { try { autoCallHelper.getToken(); log.info("token1 {}", autoCallHelper.getToken()); autoCallHelper.getToken(); log.info("token2 {}", autoCallHelper.getToken()); log.info("AutoCallTaskScheduled初始化完成"); } catch (Exception ignore) { } finally { initialized.set(true); } } @Scheduled(cron = "*/3 * * * * ?") public void generateTaskLoop() { if (configMapper.isScheduled()) { autoCallTaskService.step1GenerateTask(); } } private static final AtomicBoolean isSubmitting = new AtomicBoolean(false); @Scheduled(cron = "*/3 * * * * ?") public void submitTaskLoop() { if (!isSubmitting.compareAndSet(false, true)) { return; } try { if (!initialized.get()) { return; } if (!configMapper.isScheduled()) { return; } try { List tasks = autoCallTaskService.getTaskMapper().selectList( new QueryWrapper().eq("status", AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD).ne("submit", 1) ); for (AutoCallTask task : tasks) { executorService.submit(new AutoCallTaskThread(task.getId(), autoCallTaskService)); log.info("提交任务 taskId={}", task.getId()); task.setSubmit(1); autoCallTaskService.getTaskMapper().updateById(task); } } catch (Exception e) { log.error("提交任务时发生异常", e); } } finally { isSubmitting.set(false); } } @PreDestroy public void destroy() { log.info("正在关闭AutoCallTaskScheduled线程池..."); executorService.shutdown(); try { if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } log.info("AutoCallTaskScheduled线程池已关闭"); } @AllArgsConstructor @Slf4j private static class AutoCallTaskThread implements Runnable { private Integer taskId; private AutoCallTaskService2 autoCallTaskService; @Override public void run() { try { List personList = autoCallTaskService.getPersonMapper().selectList( new QueryWrapper().eq("task_id", taskId).orderByAsc("level") ); if (personList == null || personList.isEmpty()) { log.warn("任务没有责任人 taskId={}", taskId); return; } for (AutoCallPerson person : personList) { while (person.getUploadedTimes() < 2) { if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; //do upload autoCallTaskService.step3UploadAICCTask(person); //fetch status int pendingDuration = 60 * 1000 * 2; int loopGap = 1000; boolean isComplete = false; AutoCallPerson _person = null; while (pendingDuration > 0) { try { Thread.sleep(loopGap); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); break; } pendingDuration -= loopGap; //联通确保tag和remark同时返回 _person = autoCallTaskService.step4QueryAICCTaskResult(person); if (_person != null && _person.getIsComplete() == 1) { isComplete = true; } if (isComplete) break; } if (_person != null) person = _person; if (isComplete) { //获取到任务详情且有话术标签为已知晓,中断循环 if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; } else { //获取任务详情超时 if (person.getUploadedTimes() == 2) { //已经呼叫两次了,中断循环 autoCallTaskService.markPersonDetailQueryTimeout(person); break; } } //重呼等15秒 try { Thread.sleep(15 * 1000); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); break; } } if (AutoCallPerson.TAG_DONE.equals(person.getTag())) { //有话术标签为已知晓,中断后续呼叫 break; } else { autoCallTaskService.cancelPerson(person); } } } catch (Exception e) { log.error("处理任务时发生异常 taskId={}", taskId, e); AutoCallTask task = autoCallTaskService.getTaskMapper().selectById(taskId); task.setStatus(AutoCallTask.STATUS_CANCELLED); List personList = autoCallTaskService.getPersonMapper().selectList( new QueryWrapper() .eq("task_id", taskId) .isNull("__remark") .isNull("__tag") ); for (AutoCallPerson person : personList) { person.setStatus(AutoCallPerson.STATUS_CANCELLED); person.setDetailRemark("请人工处置"); autoCallTaskService.getPersonMapper().updateById(person); } } } } }