package com.whdc.component; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.whdc.mapper.AutoCallConfigMapper; import com.whdc.mapper.AutoCallPersonMapper; import com.whdc.model.autocall.AutoCallPerson; import com.whdc.model.autocall.AutoCallTask; import com.whdc.service.autocall.AutoCallTaskService2; import com.whdc.utils.AutoCallHelper; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** * @author lyf * @since 2025-06-20 */ @Component @Slf4j @Profile({"dev"}) public class AutoCallTaskScheduled { @Autowired private AutoCallTaskService2 autoCallTaskService; @Autowired private AutoCallPersonMapper personMapper; @Autowired private AutoCallConfigMapper configMapper; @Autowired private AutoCallHelper autoCallHelper; private final AtomicBoolean initialized = new AtomicBoolean(false); private final ExecutorService executorService = Executors.newCachedThreadPool(); @EventListener(ApplicationReadyEvent.class) public void initialize() throws Exception { autoCallHelper.getToken(); initialized.set(true); log.info("AutoCallTaskScheduled初始化完成"); } @Scheduled(cron = "*/3 * * * * ?") public void generateTaskLoop() { if (configMapper.isScheduled()) { autoCallTaskService.step1GenerateTask(); } } private static final AtomicBoolean isSubmitting = new AtomicBoolean(false); @Scheduled(cron = "*/3 * * * * ?") public void submitTaskLoop() { if (!isSubmitting.compareAndSet(false, true)) { return; } try { if (!initialized.get()) { return; } if (!configMapper.isScheduled()) { return; } try { List tasks = autoCallTaskService.getTaskMapper().selectList( new QueryWrapper().eq("status", AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD).ne("submit", 1) ); for (AutoCallTask task : tasks) { executorService.submit(new AutoCallTaskThread(task.getId(), autoCallTaskService)); log.info("提交任务 taskId={}", task.getId()); task.setSubmit(1); autoCallTaskService.getTaskMapper().updateById(task); } } catch (Exception e) { log.error("提交任务时发生异常", e); } } finally { isSubmitting.set(false); } } @PreDestroy public void destroy() { log.info("正在关闭AutoCallTaskScheduled线程池..."); executorService.shutdown(); try { if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } log.info("AutoCallTaskScheduled线程池已关闭"); } @AllArgsConstructor @Slf4j private static class AutoCallTaskThread implements Runnable { private Integer taskId; private AutoCallTaskService2 autoCallTaskService; @Override public void run() { try { List personList = autoCallTaskService.getPersonMapper().selectList( new QueryWrapper().eq("task_id", taskId).orderByAsc("level") ); if (personList == null || personList.isEmpty()) { log.warn("任务没有责任人 taskId={}", taskId); return; } for (AutoCallPerson person : personList) { //任务可以并发了,但是人员没能前一通挂掉后再重呼 while (person.getUploadedTimes() < 2) { if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; //do upload autoCallTaskService.step3UploadAICCTask(person); //fetch status int pendingDuration = 60 * 1000 * 2; int loopGap = 1000; boolean success = false; while (pendingDuration > 0) { try { Thread.sleep(loopGap); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); break; } pendingDuration -= loopGap; success = autoCallTaskService.step4QueryAICCTaskResult(person); if (success) break; } AutoCallPerson _person = autoCallTaskService.getPersonMapper().selectById(person.getId()); person = _person; if (!success) { if (person.getUploadedTimes() == 2) { autoCallTaskService.markPersonDetailQueryTimeout(person); } } //重呼等15秒 if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; try { Thread.sleep(15*1000); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); break; } } if (!AutoCallPerson.TAG_DONE.equals(person.getTag())) { autoCallTaskService.cancelPerson(person); } } } catch (Exception e) { log.error("处理任务时发生异常 taskId={}", taskId, e); AutoCallTask task = autoCallTaskService.getTaskMapper().selectById(taskId); task.setStatus(AutoCallTask.STATUS_CANCELLED); List personList = autoCallTaskService.getPersonMapper().selectList( new QueryWrapper() .eq("task_id", taskId) .isNull("__remark") .isNull("__tag") ); for (AutoCallPerson person : personList) { person.setStatus(AutoCallPerson.STATUS_CANCELLED); person.setDetailRemark("请人工处置"); autoCallTaskService.getPersonMapper().updateById(person); } } } } }