fxkh-txl-service/src/main/java/com/whdc/component/AutoCallTaskScheduled.java

215 lines
8.4 KiB
Java

package com.whdc.component;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.whdc.mapper.AutoCallConfigMapper;
import com.whdc.mapper.AutoCallPersonMapper;
import com.whdc.model.autocall.AutoCallPerson;
import com.whdc.model.autocall.AutoCallTask;
import com.whdc.service.autocall.AutoCallTaskService2;
import com.whdc.utils.AutoCallHelper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Profile;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author lyf
* @since 2025-06-20
*/
@Component
@Slf4j
@Profile({"default"})
public class AutoCallTaskScheduled {
@Autowired
private AutoCallTaskService2 autoCallTaskService;
@Autowired
private AutoCallPersonMapper personMapper;
@Autowired
private AutoCallConfigMapper configMapper;
@Autowired
private AutoCallHelper autoCallHelper;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ExecutorService executorService = Executors.newCachedThreadPool();
@EventListener(ApplicationReadyEvent.class)
public void initialize() throws Exception {
try {
autoCallHelper.getToken();
log.info("token1 {}", autoCallHelper.getToken());
autoCallHelper.getToken();
log.info("token2 {}", autoCallHelper.getToken());
log.info("AutoCallTaskScheduled初始化完成");
} catch (Exception ignore) {
} finally {
initialized.set(true);
}
}
@Scheduled(cron = "*/3 * * * * ?")
public void generateTaskLoop() {
if (configMapper.isScheduled()) {
autoCallTaskService.step1GenerateTask();
}
}
private static final AtomicBoolean isSubmitting = new AtomicBoolean(false);
@Scheduled(cron = "*/3 * * * * ?")
public void submitTaskLoop() {
if (!isSubmitting.compareAndSet(false, true)) {
return;
}
try {
if (!initialized.get()) {
return;
}
if (!configMapper.isScheduled()) {
return;
}
try {
List<AutoCallTask> tasks = autoCallTaskService.getTaskMapper().selectList(
new QueryWrapper<AutoCallTask>().eq("status", AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD).ne("submit", 1)
);
for (AutoCallTask task : tasks) {
executorService.submit(new AutoCallTaskThread(task.getId(), autoCallTaskService));
log.info("提交任务 taskId={}", task.getId());
task.setSubmit(1);
autoCallTaskService.getTaskMapper().updateById(task);
}
} catch (Exception e) {
log.error("提交任务时发生异常", e);
}
} finally {
isSubmitting.set(false);
}
}
@PreDestroy
public void destroy() {
log.info("正在关闭AutoCallTaskScheduled线程池...");
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("AutoCallTaskScheduled线程池已关闭");
}
@AllArgsConstructor
@Slf4j
private static class AutoCallTaskThread implements Runnable {
private Integer taskId;
private AutoCallTaskService2 autoCallTaskService;
@Override
public void run() {
try {
List<AutoCallPerson> personList = autoCallTaskService.getPersonMapper().selectList(
new QueryWrapper<AutoCallPerson>().eq("task_id", taskId).orderByAsc("level")
);
if (personList == null || personList.isEmpty()) {
log.warn("任务没有责任人 taskId={}", taskId);
return;
}
for (AutoCallPerson person : personList) {
AutoCallTask task = autoCallTaskService.getTaskMapper().selectById(taskId);
if (task.getStatus() == AutoCallTask.STATUS_MANUAL_CLOSE || task.getStatus() == AutoCallTask.STATUS_CANCELLED) {
autoCallTaskService.cancelPerson(person);
continue;
}
while (person.getUploadedTimes() < 2) {
if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break;
//do upload
autoCallTaskService.step3UploadAICCTask(person);
//fetch status
int pendingDuration = 60 * 1000 * 2;
int loopGap = 1000;
boolean isComplete = false;
AutoCallPerson _person = null;
while (pendingDuration > 0) {
try {
Thread.sleep(loopGap);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
break;
}
pendingDuration -= loopGap;
//联通确保tag和remark同时返回
_person = autoCallTaskService.step4QueryAICCTaskResult(person);
if (_person != null && _person.getIsComplete() == 1) {
isComplete = true;
}
if (isComplete) break;
}
if (_person != null) person = _person;
if (isComplete) {
//获取到任务详情且有话术标签为已知晓,中断循环
if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break;
} else {
//获取任务详情超时
if (person.getUploadedTimes() == 2) {
//已经呼叫两次了,中断循环
autoCallTaskService.markPersonDetailQueryTimeout(person);
break;
}
}
//重呼等15秒
try {
Thread.sleep(15 * 1000);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
break;
}
}
if (AutoCallPerson.TAG_DONE.equals(person.getTag())) {
//有话术标签为已知晓,中断后续呼叫
break;
} else {
autoCallTaskService.cancelPerson(person);
}
}
} catch (Exception e) {
log.error("处理任务时发生异常 taskId={}", taskId, e);
AutoCallTask task = autoCallTaskService.getTaskMapper().selectById(taskId);
task.setStatus(AutoCallTask.STATUS_CANCELLED);
List<AutoCallPerson> personList = autoCallTaskService.getPersonMapper().selectList(
new QueryWrapper<AutoCallPerson>()
.eq("task_id", taskId)
.isNull("__remark")
.isNull("__tag")
);
for (AutoCallPerson person : personList) {
person.setStatus(AutoCallPerson.STATUS_CANCELLED);
person.setDetailRemark("请人工处置");
autoCallTaskService.getPersonMapper().updateById(person);
}
}
}
}
}