并发处理呼叫任务

master
李一帆 2025-08-08 13:51:52 +08:00
parent 7ee22a55da
commit c809762302
8 changed files with 214 additions and 154 deletions

View File

@ -1,22 +1,26 @@
package com.whdc.component;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.whdc.mapper.AutoCallConfigMapper;
import com.whdc.mapper.AutoCallPersonMapper;
import com.whdc.model.entity.AutoCallPerson;
import com.whdc.model.entity.AutoCallTask;
import com.whdc.service.AutoCallTaskService2;
import com.whdc.utils.AutoCallHelper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Profile;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* @author lyf
@ -35,10 +39,9 @@ public class AutoCallTaskScheduled {
private AutoCallConfigMapper configMapper;
@Autowired
private AutoCallHelper autoCallHelper;
@Autowired
private TaskScheduler taskScheduler;
private AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ExecutorService executorService = Executors.newCachedThreadPool();
@EventListener(ApplicationReadyEvent.class)
public void initialize() throws Exception {
@ -48,57 +51,122 @@ public class AutoCallTaskScheduled {
}
@Scheduled(cron = "*/3 * * * * ?")
public void generateLoop() {
public void generateTaskLoop() {
if (configMapper.isScheduled()) {
autoCallTaskService.step1GenerateTask();
}
}
private static final AtomicBoolean isSubmitting = new AtomicBoolean(false);
@Scheduled(cron = "*/3 * * * * ?")
public void callLoop() {
if (!initialized.get()) {
public void submitTaskLoop() {
if (!isSubmitting.compareAndSet(false, true)) {
return;
}
if (!configMapper.isScheduled()) {
return;
}
log.info("AutoCallTaskScheduled callLoop");
List<AutoCallPerson> personList = autoCallTaskService.step2GetOneUnUploadedPerson();
log.info("AutoCallTaskScheduled {}个外呼人, {}", personList.size(), personList.stream().map(AutoCallPerson::getUploadCustName).collect(Collectors.toList()));
try {
for (AutoCallPerson person : personList) {
if (person.getUploadedTimes() < 2) {
autoCallTaskService.step3UploadAICCTask(person);
} else {
autoCallTaskService.cancelPerson(person);
continue;
}
int pendingDuration = 60 * 1000 * 2;
int loopGap = 1000;
boolean success = false;
while (pendingDuration > 0) {
try {
Thread.sleep(loopGap);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
return;
}
pendingDuration -= loopGap;
success = autoCallTaskService.step4QueryAICCTaskResult(person);
if (success) break;
}
if (!success) {
person.setUploadedTimes(personMapper.selectById(person.getId()).getUploadedTimes());
if (person.getUploadedTimes() == 2) {
autoCallTaskService.markPersonDetailQueryTimeout(person);
}
}
if (!initialized.get()) {
return;
}
} catch (Exception e) {
log.error("AutoCallTaskScheduled callLoop error", e);
if (!configMapper.isScheduled()) {
return;
}
try {
List<AutoCallTask> tasks = autoCallTaskService.getTaskMapper().selectList(
new QueryWrapper<AutoCallTask>().eq("status", AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD).ne("submit", 1)
);
for (AutoCallTask task : tasks) {
executorService.submit(new AutoCallTaskThread(task.getId(), autoCallTaskService));
log.info("提交任务 taskId={}", task.getId());
task.setSubmit(1);
autoCallTaskService.getTaskMapper().updateById(task);
}
} catch (Exception e) {
log.error("提交任务时发生异常", e);
}
} finally {
isSubmitting.set(false);
}
}
@PreDestroy
public void destroy() {
log.info("正在关闭AutoCallTaskScheduled线程池...");
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("AutoCallTaskScheduled线程池已关闭");
}
@AllArgsConstructor
@Slf4j
private static class AutoCallTaskThread implements Runnable {
private Integer taskId;
private AutoCallTaskService2 autoCallTaskService;
@Override
public void run() {
try {
List<AutoCallPerson> personList = autoCallTaskService.getPersonMapper().selectList(
new QueryWrapper<AutoCallPerson>().eq("task_id", taskId).orderByAsc("level")
);
if (personList == null || personList.isEmpty()) {
log.warn("任务没有责任人 taskId={}", taskId);
return;
}
for (AutoCallPerson person : personList) {
//任务可以并发了,但是人员没能前一通挂掉后再重呼
while (person.getUploadedTimes() < 2) {
if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break;
//do upload
autoCallTaskService.step3UploadAICCTask(person);
//fetch status
int pendingDuration = 60 * 1000 * 2;
int loopGap = 1000;
boolean success = false;
while (pendingDuration > 0) {
try {
Thread.sleep(loopGap);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
break;
}
pendingDuration -= loopGap;
success = autoCallTaskService.step4QueryAICCTaskResult(person);
if (success) break;
}
AutoCallPerson _person = autoCallTaskService.getPersonMapper().selectById(person.getId());
person = _person;
if (!success) {
if (person.getUploadedTimes() == 2) {
autoCallTaskService.markPersonDetailQueryTimeout(person);
}
}
//重呼等15秒
if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break;
try {
Thread.sleep(15*1000);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
break;
}
}
if (!AutoCallPerson.TAG_DONE.equals(person.getTag())) {
autoCallTaskService.cancelPerson(person);
}
}
} catch (Exception e) {
log.error("处理任务时发生异常 taskId={}", taskId, e);
}
}
}
}

View File

@ -55,12 +55,6 @@ public class AutoCallController {
return ResultJson.ok("resp");
}
@GetMapping("/doCallTest2")
public ResultJson<List<AutoCallPerson>> doCallTest2() throws ParseException {
List<AutoCallPerson> personList = autoCallTaskService2.doCallTest();
return ResultJson.ok(personList);
}
@GetMapping("/getToken")
public ResultJson<String> getToken() {
return ResultJson.ok(autoCallTaskService.getToken());

View File

@ -24,6 +24,7 @@ public class AutoCallPerson {
public static final int STATUS_MANUAL_CLOSE = 6;
public static final int ERRCODE_ENCODE = 1;
public static final int ERRCODE_UPLOAD_FAIL = 2;
public static final String TAG_DONE = "已知晓";
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;

View File

@ -19,7 +19,7 @@ import java.util.List;
public class AutoCallTask {
public static final int STATUS_DEFAULT = 0; // 不生成
public static final int STATUS_SHOULD_GENERATE = 1;
public static final int STATUS_GENERATED = 2;
public static final int STATUS_GENERATED_AKA_READY_TO_UPLOAD = 2;
public static final int STATUS_ANY_SUCCESS = 3;
public static final int STATUS_ALL_FAIL = 4;
public static final int STATUS_CANCELLED = 5;
@ -50,6 +50,8 @@ public class AutoCallTask {
private String warnContent;
@TableField(value = "__tag")
private String tag; //话术识别:未识别,已知晓
@TableField(value = "submit")
private Integer submit; //是否已提交线程池default 0
@TableField(value = "_create_tm")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")

View File

@ -89,7 +89,7 @@ public class AutoCallApiService {
} else {
query.in("status",
AutoCallTask.STATUS_SHOULD_GENERATE,
AutoCallTask.STATUS_GENERATED,
AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD,
AutoCallTask.STATUS_ANY_SUCCESS,
AutoCallTask.STATUS_ALL_FAIL,
AutoCallTask.STATUS_CANCELLED,
@ -224,7 +224,7 @@ public class AutoCallApiService {
} else {
query.in("status",
AutoCallTask.STATUS_SHOULD_GENERATE,
AutoCallTask.STATUS_GENERATED,
AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD,
AutoCallTask.STATUS_ANY_SUCCESS,
AutoCallTask.STATUS_ALL_FAIL,
AutoCallTask.STATUS_CANCELLED,
@ -541,7 +541,7 @@ private void addTableHeader(Table table) {
AutoCallTask.STATUS_ALL_FAIL,
// AutoCallTask.STATUS_SHOULD_GENERATE,
// AutoCallTask.STATUS_CANCELLED,
AutoCallTask.STATUS_GENERATED,
AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD,
AutoCallTask.STATUS_ANY_SUCCESS
);
//__tag is null or __tag != "已知晓"
@ -564,7 +564,7 @@ private void addTableHeader(Table table) {
AutoCallTask.STATUS_ALL_FAIL,
AutoCallTask.STATUS_SHOULD_GENERATE,
AutoCallTask.STATUS_CANCELLED,
AutoCallTask.STATUS_GENERATED
AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD
);
Page pageParam = dto.getPage().getPage();
Page<AutoCallTask> page = taskMapper.selectPage(pageParam, query);

View File

@ -40,6 +40,7 @@ public class AutoCallTaskService2 {
@Getter
private AutoCallTaskMapper taskMapper;
@Autowired
@Getter
private AutoCallPersonMapper personMapper;
@Autowired
private AutoCallConfigMapper configMapper;
@ -85,48 +86,20 @@ public class AutoCallTaskService2 {
taskMapper.insert(task);
generatePerson(task);
warn = qxWarningMapper.selectOne(
new QueryWrapper<QXWarning>()
.orderByDesc("WARNID")
.last("limit 1")
);
warn.setCtnm("荆州市");
warn.setCnnm("竹溪 ");
taskList = newTask(warn);
task = taskList.get(0);
task.setStatus(0);
task.setCreateTm(new Date());
task.setWarnCnnm("竹溪 ");
taskMapper.insert(task);
generatePerson(task);
}
public List<AutoCallPerson> doCallTest() {
List<AutoCallPerson> personList = step2GetOneUnUploadedPerson();
for (AutoCallPerson person : personList) {
step3UploadAICCTask(person);
int pendingDuration = 60 * 1000 * 2;
int loopGap = 1000;
while (true) {
try {
Thread.sleep(loopGap);
} catch (InterruptedException ignore) {
}
pendingDuration -= loopGap;
if (pendingDuration <= 0) break;
boolean f = step4QueryAICCTaskResult(person);
if (f) {
break;
}
}
if (pendingDuration <= 0) {
markPersonDetailQueryTimeout(person);
}
}
return personList;
// warn = qxWarningMapper.selectOne(
// new QueryWrapper<QXWarning>()
// .orderByDesc("WARNID")
// .last("limit 1")
// );
// warn.setCtnm("荆州市");
// warn.setCnnm("竹溪 ");
// taskList = newTask(warn);
// task = taskList.get(0);
// task.setStatus(0);
// task.setCreateTm(new Date());
// task.setWarnCnnm("竹溪 ");
// taskMapper.insert(task);
// generatePerson(task);
}
public void markPersonDetailQueryTimeout(AutoCallPerson person) {
@ -154,12 +127,11 @@ public class AutoCallTaskService2 {
task.setRemark("未找到县区");
}
task.setCreateTm(now);
if (enable && task.getWarnCnnm() != null) {
task.setStatus(AutoCallTask.STATUS_SHOULD_GENERATE);
}
taskMapper.insert(task);
if (enable && task.getWarnCnnm() != null) {
generatePerson(task);
if (generatePerson(task)) {
task.setStatus(AutoCallTask.STATUS_SHOULD_GENERATE);
}
}
}
} catch (Exception e) {
@ -209,7 +181,7 @@ public class AutoCallTaskService2 {
return task;
}
public void generatePerson(AutoCallTask task) {
public boolean generatePerson(AutoCallTask task) {
//切记要设置task的status
List<AutoCallPerson> personList = newPerson(task);
if (personList.size() == 0) {
@ -217,7 +189,7 @@ public class AutoCallTaskService2 {
task.setStatus(AutoCallTask.STATUS_CANCELLED);
task.setErrorCode(AutoCallTask.ERRCODE_NO_PERSON);
taskMapper.updateById(task);
return;
return false;
}
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
@ -226,7 +198,7 @@ public class AutoCallTaskService2 {
a.setCreateTm(now);
personMapper.insert(a);
});
task.setStatus(AutoCallTask.STATUS_GENERATED);
task.setStatus(AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD);
taskMapper.updateById(task);
transactionManager.commit(status);
} catch (Exception e) {
@ -237,7 +209,9 @@ public class AutoCallTaskService2 {
taskMapper.updateById(task);
log.error("插入外呼责任人异常", e);
log.error("{}", JSON.toJSONString(personList));
return false;
}
return true;
}
private List<Integer> mapWarnSignalLevel(String warnSignalLevel) {
@ -342,6 +316,7 @@ public class AutoCallTaskService2 {
return personList;
}
@Deprecated
public List<AutoCallPerson> step2GetOneUnUploadedPerson() {
List<AutoCallPerson> personList = personMapper.listUnUploaded();
if (personList == null || personList.isEmpty()) return Collections.emptyList();
@ -358,14 +333,14 @@ public class AutoCallTaskService2 {
return ret;
}
private static final AtomicBoolean isCalling = new AtomicBoolean(false);
// private static final AtomicBoolean isCalling = new AtomicBoolean(false);
public void step3UploadAICCTask(AutoCallPerson person) {
//切记要设置person的status
if (!isCalling.compareAndSet(false, true)) {
return;
}
try {
// if (!isCalling.compareAndSet(false, true)) {
// return;
// }
// try {
if (person.getUploadedTimes() == 2) {
cancelPerson(person);
try {
@ -413,9 +388,10 @@ public class AutoCallTaskService2 {
personMapper.updateById(person);
smsHelper.send(numbers, person.getSmsContent());
}
} finally {
isCalling.set(false);
}
// }
// finally {
// isCalling.set(false);
// }
}
public boolean step4QueryAICCTaskResult(AutoCallPerson person) {

View File

@ -117,7 +117,7 @@ public class AICCHelper {
TypeReference<AICCCallRespWrapper<AICCCallRespTask>> type = new TypeReference<AICCCallRespWrapper<AICCCallRespTask>>() {
};
AICCCallRespWrapper<AICCCallRespTask> AICCCallRespWrapper = null;
if (!resp.contains("请求失败") || !resp.contains("网络异常")) {
if (!resp.contains("请求失败") && !resp.contains("网络异常")) {
try {
AICCCallRespWrapper = JSON.parseObject(resp, type);
} catch (Exception ex) {
@ -132,7 +132,7 @@ public class AICCHelper {
initToken();
headers.put("X-Access-Token", getToken());
resp = httpHelper.postJsonString("https://aicc.cuopen.net:9801/aicc-api/ssb/callout/thirdParty/task/uploadCallData", request.toJSONString(), headers);
if (!resp.contains("请求失败") || !resp.contains("网络异常")) {
if (!resp.contains("请求失败") && !resp.contains("网络异常")) {
try {
AICCCallRespWrapper = JSON.parseObject(resp, type);
} catch (Exception ex) {

View File

@ -24,42 +24,42 @@ public class HttpHelper {
@Autowired
private OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
public @NotNull String get(@NotNull String url, @Nullable Map<String, String> params, @Nullable Map<String, String> headers) {
if (url.isEmpty()) {
return "url为空";
}
Request.Builder requestBuilder = new Request.Builder()
.url(url)
.get();
//header
if (headers != null) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key != null && value != null) {
requestBuilder.header(key, value);
}
}
}
//doGet
Request request = requestBuilder.build();
return doGet(request);
}
private @NotNull String doGet(Request request) {
try (Response response = okHttpClient.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
return response.body().string();
} else {
return "请求失败:" + response;
}
} catch (IOException e) {
return "网络异常:" + e.getMessage();
}
}
// public @NotNull String get(@NotNull String url, @Nullable Map<String, String> params, @Nullable Map<String, String> headers) {
// if (url.isEmpty()) {
// return "url为空";
// }
//
// Request.Builder requestBuilder = new Request.Builder()
// .url(url)
// .get();
//
// //header
// if (headers != null) {
// for (Map.Entry<String, String> entry : headers.entrySet()) {
// String key = entry.getKey();
// String value = entry.getValue();
// if (key != null && value != null) {
// requestBuilder.header(key, value);
// }
// }
// }
// //doGet
// Request request = requestBuilder.build();
//
// return doGet(request);
// }
//
// private @NotNull String doGet(Request request) {
// try (Response response = okHttpClient.newCall(request).execute()) {
// if (response.isSuccessful() && response.body() != null) {
// return response.body().string();
// } else {
// return "请求失败:" + response;
// }
// } catch (IOException e) {
// return "网络异常:" + e.getMessage();
// }
// }
public @NotNull String postJsonString(@NotNull String url, @Nullable String json, @Nullable Map<String, String> headers) {
return postJsonString(url, json, headers, StandardCharsets.UTF_8);
@ -94,7 +94,13 @@ public class HttpHelper {
}
Request request = requestBuilder.build();
//post
return doPost(request);
String resp = doPost(request);
if (resp.contains("请求失败") || resp.contains("网络异常")) {
log.warn("请求失败request{}", request);
log.warn("请求失败request header{}", headers);
log.warn("请求失败request payload{}", json);
}
return resp;
}
public @NotNull String postFormData(@NotNull String url, @Nullable Map<String, Object> params, @Nullable Map<String, String> headers) {
@ -148,7 +154,13 @@ public class HttpHelper {
//post
Request request = requestBuilder.build();
return doPost(request);
String resp = doPost(request);
if (resp.contains("请求失败") || resp.contains("网络异常")) {
log.warn("请求失败request{}", request);
log.warn("请求失败request header{}", headers);
log.warn("请求失败request payload{}", params);
}
return resp;
}
public @NotNull String postFormUrlEncoded(@NotNull String url, @Nullable Map<String, Object> params, @Nullable Map<String, String> headers) {
@ -198,7 +210,13 @@ public class HttpHelper {
//post
Request request = requestBuilder.build();
return doPost(request);
String resp = doPost(request);
if (resp.contains("请求失败") || resp.contains("网络异常")) {
log.warn("请求失败request{}", request);
log.warn("请求失败request header{}", headers);
log.warn("请求失败request payload{}", params);
}
return resp;
}
private @NotNull String doPost(Request request) {
@ -206,6 +224,7 @@ public class HttpHelper {
if (response.isSuccessful() && response.body() != null) {
return response.body().string();
} else {
log.warn("请求失败response{}", response);
return "请求失败:" + response;
}
} catch (IOException e) {