From c809762302e28e3d76fef15cf2579214091d75d3 Mon Sep 17 00:00:00 2001 From: lyf66 Date: Fri, 8 Aug 2025 13:51:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86=E5=91=BC?= =?UTF-8?q?=E5=8F=AB=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../whdc/component/AutoCallTaskScheduled.java | 158 +++++++++++++----- .../whdc/controller/AutoCallController.java | 6 - .../com/whdc/model/entity/AutoCallPerson.java | 1 + .../com/whdc/model/entity/AutoCallTask.java | 4 +- .../com/whdc/service/AutoCallApiService.java | 8 +- .../whdc/service/AutoCallTaskService2.java | 90 ++++------ src/main/java/com/whdc/utils/AICCHelper.java | 4 +- src/main/java/com/whdc/utils/HttpHelper.java | 97 ++++++----- 8 files changed, 214 insertions(+), 154 deletions(-) diff --git a/src/main/java/com/whdc/component/AutoCallTaskScheduled.java b/src/main/java/com/whdc/component/AutoCallTaskScheduled.java index b8dd76a..f8395b9 100644 --- a/src/main/java/com/whdc/component/AutoCallTaskScheduled.java +++ b/src/main/java/com/whdc/component/AutoCallTaskScheduled.java @@ -1,22 +1,26 @@ package com.whdc.component; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.whdc.mapper.AutoCallConfigMapper; import com.whdc.mapper.AutoCallPersonMapper; import com.whdc.model.entity.AutoCallPerson; +import com.whdc.model.entity.AutoCallTask; import com.whdc.service.AutoCallTaskService2; import com.whdc.utils.AutoCallHelper; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; /** * @author lyf @@ -35,10 +39,9 @@ public class AutoCallTaskScheduled { private AutoCallConfigMapper configMapper; @Autowired private AutoCallHelper autoCallHelper; - @Autowired - private TaskScheduler taskScheduler; - private AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final ExecutorService executorService = Executors.newCachedThreadPool(); @EventListener(ApplicationReadyEvent.class) public void initialize() throws Exception { @@ -48,57 +51,122 @@ public class AutoCallTaskScheduled { } @Scheduled(cron = "*/3 * * * * ?") - public void generateLoop() { + public void generateTaskLoop() { if (configMapper.isScheduled()) { autoCallTaskService.step1GenerateTask(); } } + private static final AtomicBoolean isSubmitting = new AtomicBoolean(false); + @Scheduled(cron = "*/3 * * * * ?") - public void callLoop() { - if (!initialized.get()) { + public void submitTaskLoop() { + if (!isSubmitting.compareAndSet(false, true)) { return; } - if (!configMapper.isScheduled()) { - return; - } - log.info("AutoCallTaskScheduled callLoop"); - List personList = autoCallTaskService.step2GetOneUnUploadedPerson(); - log.info("AutoCallTaskScheduled {}个外呼人, {}", personList.size(), personList.stream().map(AutoCallPerson::getUploadCustName).collect(Collectors.toList())); try { - for (AutoCallPerson person : personList) { - if (person.getUploadedTimes() < 2) { - autoCallTaskService.step3UploadAICCTask(person); - } else { - autoCallTaskService.cancelPerson(person); - continue; - } - int pendingDuration = 60 * 1000 * 2; - int loopGap = 1000; - boolean success = false; - while (pendingDuration > 0) { - try { - Thread.sleep(loopGap); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - return; - } - pendingDuration -= loopGap; - - success = autoCallTaskService.step4QueryAICCTaskResult(person); - if (success) break; - } - - if (!success) { - person.setUploadedTimes(personMapper.selectById(person.getId()).getUploadedTimes()); - if (person.getUploadedTimes() == 2) { - autoCallTaskService.markPersonDetailQueryTimeout(person); - } - } + if (!initialized.get()) { + return; } - } catch (Exception e) { - log.error("AutoCallTaskScheduled callLoop error", e); + if (!configMapper.isScheduled()) { + return; + } + + try { + List tasks = autoCallTaskService.getTaskMapper().selectList( + new QueryWrapper().eq("status", AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD).ne("submit", 1) + ); + for (AutoCallTask task : tasks) { + executorService.submit(new AutoCallTaskThread(task.getId(), autoCallTaskService)); + log.info("提交任务 taskId={}", task.getId()); + task.setSubmit(1); + autoCallTaskService.getTaskMapper().updateById(task); + } + } catch (Exception e) { + log.error("提交任务时发生异常", e); + } + } finally { + isSubmitting.set(false); } } + @PreDestroy + public void destroy() { + log.info("正在关闭AutoCallTaskScheduled线程池..."); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("AutoCallTaskScheduled线程池已关闭"); + } + + @AllArgsConstructor + @Slf4j + private static class AutoCallTaskThread implements Runnable { + private Integer taskId; + private AutoCallTaskService2 autoCallTaskService; + + @Override + public void run() { + try { + List personList = autoCallTaskService.getPersonMapper().selectList( + new QueryWrapper().eq("task_id", taskId).orderByAsc("level") + ); + if (personList == null || personList.isEmpty()) { + log.warn("任务没有责任人 taskId={}", taskId); + return; + } + for (AutoCallPerson person : personList) { + //任务可以并发了,但是人员没能前一通挂掉后再重呼 + while (person.getUploadedTimes() < 2) { + if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; + //do upload + autoCallTaskService.step3UploadAICCTask(person); + + //fetch status + int pendingDuration = 60 * 1000 * 2; + int loopGap = 1000; + boolean success = false; + while (pendingDuration > 0) { + try { + Thread.sleep(loopGap); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + break; + } + pendingDuration -= loopGap; + + success = autoCallTaskService.step4QueryAICCTaskResult(person); + if (success) break; + } + AutoCallPerson _person = autoCallTaskService.getPersonMapper().selectById(person.getId()); + person = _person; + if (!success) { + if (person.getUploadedTimes() == 2) { + autoCallTaskService.markPersonDetailQueryTimeout(person); + } + } + //重呼等15秒 + if (AutoCallPerson.TAG_DONE.equals(person.getTag())) break; + try { + Thread.sleep(15*1000); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + break; + } + } + if (!AutoCallPerson.TAG_DONE.equals(person.getTag())) { + autoCallTaskService.cancelPerson(person); + } + } + } catch (Exception e) { + log.error("处理任务时发生异常 taskId={}", taskId, e); + } + } + } } diff --git a/src/main/java/com/whdc/controller/AutoCallController.java b/src/main/java/com/whdc/controller/AutoCallController.java index a8a330c..42ef0db 100644 --- a/src/main/java/com/whdc/controller/AutoCallController.java +++ b/src/main/java/com/whdc/controller/AutoCallController.java @@ -55,12 +55,6 @@ public class AutoCallController { return ResultJson.ok("resp"); } - @GetMapping("/doCallTest2") - public ResultJson> doCallTest2() throws ParseException { - List personList = autoCallTaskService2.doCallTest(); - return ResultJson.ok(personList); - } - @GetMapping("/getToken") public ResultJson getToken() { return ResultJson.ok(autoCallTaskService.getToken()); diff --git a/src/main/java/com/whdc/model/entity/AutoCallPerson.java b/src/main/java/com/whdc/model/entity/AutoCallPerson.java index 9708dd5..a084395 100644 --- a/src/main/java/com/whdc/model/entity/AutoCallPerson.java +++ b/src/main/java/com/whdc/model/entity/AutoCallPerson.java @@ -24,6 +24,7 @@ public class AutoCallPerson { public static final int STATUS_MANUAL_CLOSE = 6; public static final int ERRCODE_ENCODE = 1; public static final int ERRCODE_UPLOAD_FAIL = 2; + public static final String TAG_DONE = "已知晓"; @TableId(value = "ID", type = IdType.AUTO) private Integer id; diff --git a/src/main/java/com/whdc/model/entity/AutoCallTask.java b/src/main/java/com/whdc/model/entity/AutoCallTask.java index 1b2a434..f78daf1 100644 --- a/src/main/java/com/whdc/model/entity/AutoCallTask.java +++ b/src/main/java/com/whdc/model/entity/AutoCallTask.java @@ -19,7 +19,7 @@ import java.util.List; public class AutoCallTask { public static final int STATUS_DEFAULT = 0; // 不生成 public static final int STATUS_SHOULD_GENERATE = 1; - public static final int STATUS_GENERATED = 2; + public static final int STATUS_GENERATED_AKA_READY_TO_UPLOAD = 2; public static final int STATUS_ANY_SUCCESS = 3; public static final int STATUS_ALL_FAIL = 4; public static final int STATUS_CANCELLED = 5; @@ -50,6 +50,8 @@ public class AutoCallTask { private String warnContent; @TableField(value = "__tag") private String tag; //话术识别:未识别,已知晓 + @TableField(value = "submit") + private Integer submit; //是否已提交线程池,default 0 @TableField(value = "_create_tm") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") diff --git a/src/main/java/com/whdc/service/AutoCallApiService.java b/src/main/java/com/whdc/service/AutoCallApiService.java index 2e2e4b0..a0a182c 100644 --- a/src/main/java/com/whdc/service/AutoCallApiService.java +++ b/src/main/java/com/whdc/service/AutoCallApiService.java @@ -89,7 +89,7 @@ public class AutoCallApiService { } else { query.in("status", AutoCallTask.STATUS_SHOULD_GENERATE, - AutoCallTask.STATUS_GENERATED, + AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD, AutoCallTask.STATUS_ANY_SUCCESS, AutoCallTask.STATUS_ALL_FAIL, AutoCallTask.STATUS_CANCELLED, @@ -224,7 +224,7 @@ public class AutoCallApiService { } else { query.in("status", AutoCallTask.STATUS_SHOULD_GENERATE, - AutoCallTask.STATUS_GENERATED, + AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD, AutoCallTask.STATUS_ANY_SUCCESS, AutoCallTask.STATUS_ALL_FAIL, AutoCallTask.STATUS_CANCELLED, @@ -541,7 +541,7 @@ private void addTableHeader(Table table) { AutoCallTask.STATUS_ALL_FAIL, // AutoCallTask.STATUS_SHOULD_GENERATE, // AutoCallTask.STATUS_CANCELLED, - AutoCallTask.STATUS_GENERATED, + AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD, AutoCallTask.STATUS_ANY_SUCCESS ); //__tag is null or __tag != "已知晓" @@ -564,7 +564,7 @@ private void addTableHeader(Table table) { AutoCallTask.STATUS_ALL_FAIL, AutoCallTask.STATUS_SHOULD_GENERATE, AutoCallTask.STATUS_CANCELLED, - AutoCallTask.STATUS_GENERATED + AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD ); Page pageParam = dto.getPage().getPage(); Page page = taskMapper.selectPage(pageParam, query); diff --git a/src/main/java/com/whdc/service/AutoCallTaskService2.java b/src/main/java/com/whdc/service/AutoCallTaskService2.java index 24c2068..a993ac0 100644 --- a/src/main/java/com/whdc/service/AutoCallTaskService2.java +++ b/src/main/java/com/whdc/service/AutoCallTaskService2.java @@ -40,6 +40,7 @@ public class AutoCallTaskService2 { @Getter private AutoCallTaskMapper taskMapper; @Autowired + @Getter private AutoCallPersonMapper personMapper; @Autowired private AutoCallConfigMapper configMapper; @@ -85,48 +86,20 @@ public class AutoCallTaskService2 { taskMapper.insert(task); generatePerson(task); - warn = qxWarningMapper.selectOne( - new QueryWrapper() - .orderByDesc("WARNID") - .last("limit 1") - ); - warn.setCtnm("荆州市"); - warn.setCnnm("竹溪 "); - taskList = newTask(warn); - task = taskList.get(0); - task.setStatus(0); - task.setCreateTm(new Date()); - task.setWarnCnnm("竹溪 "); - taskMapper.insert(task); - generatePerson(task); - } - - public List doCallTest() { - List personList = step2GetOneUnUploadedPerson(); - - for (AutoCallPerson person : personList) { - step3UploadAICCTask(person); - - int pendingDuration = 60 * 1000 * 2; - int loopGap = 1000; - while (true) { - try { - Thread.sleep(loopGap); - } catch (InterruptedException ignore) { - } - pendingDuration -= loopGap; - if (pendingDuration <= 0) break; - - boolean f = step4QueryAICCTaskResult(person); - if (f) { - break; - } - } - if (pendingDuration <= 0) { - markPersonDetailQueryTimeout(person); - } - } - return personList; +// warn = qxWarningMapper.selectOne( +// new QueryWrapper() +// .orderByDesc("WARNID") +// .last("limit 1") +// ); +// warn.setCtnm("荆州市"); +// warn.setCnnm("竹溪 "); +// taskList = newTask(warn); +// task = taskList.get(0); +// task.setStatus(0); +// task.setCreateTm(new Date()); +// task.setWarnCnnm("竹溪 "); +// taskMapper.insert(task); +// generatePerson(task); } public void markPersonDetailQueryTimeout(AutoCallPerson person) { @@ -154,12 +127,11 @@ public class AutoCallTaskService2 { task.setRemark("未找到县区"); } task.setCreateTm(now); - if (enable && task.getWarnCnnm() != null) { - task.setStatus(AutoCallTask.STATUS_SHOULD_GENERATE); - } taskMapper.insert(task); if (enable && task.getWarnCnnm() != null) { - generatePerson(task); + if (generatePerson(task)) { + task.setStatus(AutoCallTask.STATUS_SHOULD_GENERATE); + } } } } catch (Exception e) { @@ -209,7 +181,7 @@ public class AutoCallTaskService2 { return task; } - public void generatePerson(AutoCallTask task) { + public boolean generatePerson(AutoCallTask task) { //切记要设置task的status List personList = newPerson(task); if (personList.size() == 0) { @@ -217,7 +189,7 @@ public class AutoCallTaskService2 { task.setStatus(AutoCallTask.STATUS_CANCELLED); task.setErrorCode(AutoCallTask.ERRCODE_NO_PERSON); taskMapper.updateById(task); - return; + return false; } TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); try { @@ -226,7 +198,7 @@ public class AutoCallTaskService2 { a.setCreateTm(now); personMapper.insert(a); }); - task.setStatus(AutoCallTask.STATUS_GENERATED); + task.setStatus(AutoCallTask.STATUS_GENERATED_AKA_READY_TO_UPLOAD); taskMapper.updateById(task); transactionManager.commit(status); } catch (Exception e) { @@ -237,7 +209,9 @@ public class AutoCallTaskService2 { taskMapper.updateById(task); log.error("插入外呼责任人异常", e); log.error("{}", JSON.toJSONString(personList)); + return false; } + return true; } private List mapWarnSignalLevel(String warnSignalLevel) { @@ -342,6 +316,7 @@ public class AutoCallTaskService2 { return personList; } + @Deprecated public List step2GetOneUnUploadedPerson() { List personList = personMapper.listUnUploaded(); if (personList == null || personList.isEmpty()) return Collections.emptyList(); @@ -358,14 +333,14 @@ public class AutoCallTaskService2 { return ret; } - private static final AtomicBoolean isCalling = new AtomicBoolean(false); +// private static final AtomicBoolean isCalling = new AtomicBoolean(false); public void step3UploadAICCTask(AutoCallPerson person) { //切记要设置person的status - if (!isCalling.compareAndSet(false, true)) { - return; - } - try { +// if (!isCalling.compareAndSet(false, true)) { +// return; +// } +// try { if (person.getUploadedTimes() == 2) { cancelPerson(person); try { @@ -413,9 +388,10 @@ public class AutoCallTaskService2 { personMapper.updateById(person); smsHelper.send(numbers, person.getSmsContent()); } - } finally { - isCalling.set(false); - } +// } +// finally { +// isCalling.set(false); +// } } public boolean step4QueryAICCTaskResult(AutoCallPerson person) { diff --git a/src/main/java/com/whdc/utils/AICCHelper.java b/src/main/java/com/whdc/utils/AICCHelper.java index 86bd782..a110c02 100644 --- a/src/main/java/com/whdc/utils/AICCHelper.java +++ b/src/main/java/com/whdc/utils/AICCHelper.java @@ -117,7 +117,7 @@ public class AICCHelper { TypeReference> type = new TypeReference>() { }; AICCCallRespWrapper AICCCallRespWrapper = null; - if (!resp.contains("请求失败") || !resp.contains("网络异常")) { + if (!resp.contains("请求失败") && !resp.contains("网络异常")) { try { AICCCallRespWrapper = JSON.parseObject(resp, type); } catch (Exception ex) { @@ -132,7 +132,7 @@ public class AICCHelper { initToken(); headers.put("X-Access-Token", getToken()); resp = httpHelper.postJsonString("https://aicc.cuopen.net:9801/aicc-api/ssb/callout/thirdParty/task/uploadCallData", request.toJSONString(), headers); - if (!resp.contains("请求失败") || !resp.contains("网络异常")) { + if (!resp.contains("请求失败") && !resp.contains("网络异常")) { try { AICCCallRespWrapper = JSON.parseObject(resp, type); } catch (Exception ex) { diff --git a/src/main/java/com/whdc/utils/HttpHelper.java b/src/main/java/com/whdc/utils/HttpHelper.java index 0c1b919..fe8cd94 100644 --- a/src/main/java/com/whdc/utils/HttpHelper.java +++ b/src/main/java/com/whdc/utils/HttpHelper.java @@ -24,42 +24,42 @@ public class HttpHelper { @Autowired private OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); - public @NotNull String get(@NotNull String url, @Nullable Map params, @Nullable Map headers) { - if (url.isEmpty()) { - return "url为空"; - } - - Request.Builder requestBuilder = new Request.Builder() - .url(url) - .get(); - - //header - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key != null && value != null) { - requestBuilder.header(key, value); - } - } - } - //doGet - Request request = requestBuilder.build(); - - return doGet(request); - } - - private @NotNull String doGet(Request request) { - try (Response response = okHttpClient.newCall(request).execute()) { - if (response.isSuccessful() && response.body() != null) { - return response.body().string(); - } else { - return "请求失败:" + response; - } - } catch (IOException e) { - return "网络异常:" + e.getMessage(); - } - } +// public @NotNull String get(@NotNull String url, @Nullable Map params, @Nullable Map headers) { +// if (url.isEmpty()) { +// return "url为空"; +// } +// +// Request.Builder requestBuilder = new Request.Builder() +// .url(url) +// .get(); +// +// //header +// if (headers != null) { +// for (Map.Entry entry : headers.entrySet()) { +// String key = entry.getKey(); +// String value = entry.getValue(); +// if (key != null && value != null) { +// requestBuilder.header(key, value); +// } +// } +// } +// //doGet +// Request request = requestBuilder.build(); +// +// return doGet(request); +// } +// +// private @NotNull String doGet(Request request) { +// try (Response response = okHttpClient.newCall(request).execute()) { +// if (response.isSuccessful() && response.body() != null) { +// return response.body().string(); +// } else { +// return "请求失败:" + response; +// } +// } catch (IOException e) { +// return "网络异常:" + e.getMessage(); +// } +// } public @NotNull String postJsonString(@NotNull String url, @Nullable String json, @Nullable Map headers) { return postJsonString(url, json, headers, StandardCharsets.UTF_8); @@ -94,7 +94,13 @@ public class HttpHelper { } Request request = requestBuilder.build(); //post - return doPost(request); + String resp = doPost(request); + if (resp.contains("请求失败") || resp.contains("网络异常")) { + log.warn("请求失败request:{}", request); + log.warn("请求失败request header:{}", headers); + log.warn("请求失败request payload:{}", json); + } + return resp; } public @NotNull String postFormData(@NotNull String url, @Nullable Map params, @Nullable Map headers) { @@ -148,7 +154,13 @@ public class HttpHelper { //post Request request = requestBuilder.build(); - return doPost(request); + String resp = doPost(request); + if (resp.contains("请求失败") || resp.contains("网络异常")) { + log.warn("请求失败request:{}", request); + log.warn("请求失败request header:{}", headers); + log.warn("请求失败request payload:{}", params); + } + return resp; } public @NotNull String postFormUrlEncoded(@NotNull String url, @Nullable Map params, @Nullable Map headers) { @@ -198,7 +210,13 @@ public class HttpHelper { //post Request request = requestBuilder.build(); - return doPost(request); + String resp = doPost(request); + if (resp.contains("请求失败") || resp.contains("网络异常")) { + log.warn("请求失败request:{}", request); + log.warn("请求失败request header:{}", headers); + log.warn("请求失败request payload:{}", params); + } + return resp; } private @NotNull String doPost(Request request) { @@ -206,6 +224,7 @@ public class HttpHelper { if (response.isSuccessful() && response.body() != null) { return response.body().string(); } else { + log.warn("请求失败response:{}", response); return "请求失败:" + response; } } catch (IOException e) {