From 83081a269b08ce864f9c7af1c08655120d9f8df8 Mon Sep 17 00:00:00 2001 From: xjm Date: Thu, 13 Jun 2024 16:32:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AD=A3=E5=9C=A8=E7=94=9F=E6=95=88=E7=9A=84?= =?UTF-8?q?=E9=A2=84=E8=AD=A6=E4=BF=A1=E6=81=AF=E4=BF=AE=E6=94=B9=20?= =?UTF-8?q?=E9=A2=84=E8=AD=A6=E6=B6=88=E6=81=AF=E5=AD=98=E5=BA=93=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/whdc/component/MyPostConstruct.java | 117 ++++++++++++++++++ .../java/com/whdc/config/ThreadPools.java | 10 ++ .../com/whdc/controller/QXWarnController.java | 19 ++- 3 files changed, 143 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/whdc/component/MyPostConstruct.java b/src/main/java/com/whdc/component/MyPostConstruct.java index 76406c5..029ed27 100644 --- a/src/main/java/com/whdc/component/MyPostConstruct.java +++ b/src/main/java/com/whdc/component/MyPostConstruct.java @@ -1,9 +1,30 @@ package com.whdc.component; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.whdc.model.dto.ApiDto; +import com.whdc.model.entity.QXWarning; +import com.whdc.model.vo.QXWarningVO; +import com.whdc.model.vo.WarningData; +import com.whdc.service.IQXWarningService; +import com.whdc.utils.DateUtils; +import com.whdc.utils.HttpUtil; +import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.compress.utils.Lists; +import org.apache.http.HttpStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; import javax.annotation.PostConstruct; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * @author 李赛 @@ -17,4 +38,100 @@ public class MyPostConstruct { public void initCache() { log.debug("加载缓存"); } + + @Autowired + private IQXWarningService service; + + + @Async + @ApiOperation(value = "预警数据同步接口", notes = "预警数据同步接口") + @GetMapping("/syncData") + @Scheduled(cron ="0 0/1 * * * ?") + public void syncData() { + log.info("预警数据同步开始!!!"); + ApiDto apiDto = new ApiDto(); + apiDto.setFilter(Lists.newArrayList()); + + String str = HttpUtil.sendPost("http://223.75.53.141:8000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto)); +// String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto)); + JSONObject json = JSON.parseObject(str); + if (json != null && json.getInteger("code") == HttpStatus.SC_OK) { + List data = json.getJSONArray("data").toJavaList(WarningData.class); + List warningList = getList(data); + log.info("预警数据同步获取数据条数 " + warningList.size()); + List warnIds = warningList.stream().map(QXWarningVO::getWarnid).collect(Collectors.toList()); + + List list = service.lambdaQuery().in(QXWarning::getWarnid, warnIds).list(); + log.info("预警数据同步已存预警 " + list.size()); + + Set dbWarnSet = list.stream().map(QXWarning::getWarnid).collect(Collectors.toSet()); + + List adds = Lists.newArrayList(); + for (QXWarningVO warningVO : warningList) { + if (!dbWarnSet.contains(warningVO.getWarnid())) { + QXWarning qxwarning = new QXWarning(); + qxwarning.setCreateTime(DateUtils.dateToStrYMDHds(warningVO.getCreateTime())); + qxwarning.setPublishTime(DateUtils.dateToStrYMDHds(warningVO.getPublishTime())); + qxwarning.setStartTime(DateUtils.dateToStrYMDHds(warningVO.getStartTime())); + qxwarning.setEndTime(DateUtils.dateToStrYMDHds(warningVO.getEndTime())); + qxwarning.setWarnSignalType(warningVO.getWarnSignalType()); + qxwarning.setWarnSignalLevel(warningVO.getWarnSignalLevel()); + qxwarning.setPublishUnit(warningVO.getPublishUnit()); + qxwarning.setContent(warningVO.getContent()); + qxwarning.setWarnid(warningVO.getWarnid()); + qxwarning.setCtnm(warningVO.getCtnm()); + qxwarning.setCnnm(warningVO.getCnnm()); + adds.add(qxwarning); + + } + } + + if (CollectionUtils.isNotEmpty(adds)) { + log.info("预警数据同步待添加预警 " + adds.size()); + if (this.service.saveBatch(adds)) { + log.info("添加成功 " + adds.size()); + } + } + } + + log.info("预警数据同步执行完成!!!"); + + } + private List getList(List data) { + + List respList = Lists.newArrayList(); + for (WarningData datum : data) {//最外层的列表 + String ctnm = datum.getEffectArea();//市级范围 + for (WarningData.TypeList typeList : datum.getTypeList()) {//里面的 + // 预警类型 + String type = typeList.getType(); + List warnList = typeList.getWarnList(); + + for (WarningData.Warning warning : warnList) { + String cnnm = warning.getEffectArea(); + QXWarningVO vo = new QXWarningVO(); + String publishUnit = warning.getPublishUnit(); +// vo.setTitle(publishUnit + "发布" + type + "预警"); + vo.setCtnm(ctnm);//市级名称 + vo.setCnnm(cnnm);//县级名称 + vo.setPublishUnit(publishUnit);//发布单位 + vo.setPublishTime(warning.getPublishTime());//预警发布时间 + vo.setWarnSignalType(warning.getWarnSignalType());//预警类型 + vo.setWarnSignalLevel(warning.getWarnSignalLevel());//预警级别 + vo.setContent(warning.getContent());//预警内容 + vo.setWarnid(warning.getId()); + vo.setCreateTime(warning.getCreateTime()); + vo.setStartTime(warning.getStartTime()); + vo.setEndTime(warning.getEndTime()); + respList.add(vo); + } + } + } + + respList = respList.stream().filter(o -> "暴雨".equals(o.getWarnSignalType()) || "雷雨大风".equals(o.getWarnSignalType())).collect(Collectors.toList()); + respList = respList.stream().filter(o -> "红色".equals(o.getWarnSignalLevel()) || "橙色".equals(o.getWarnSignalLevel())).collect(Collectors.toList()); + return respList.stream().sorted(Comparator.comparing(QXWarningVO::getPublishTime).reversed()) + .collect(Collectors.toList()); + } + } diff --git a/src/main/java/com/whdc/config/ThreadPools.java b/src/main/java/com/whdc/config/ThreadPools.java index e020ae0..017019d 100644 --- a/src/main/java/com/whdc/config/ThreadPools.java +++ b/src/main/java/com/whdc/config/ThreadPools.java @@ -3,7 +3,9 @@ package com.whdc.config; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -25,6 +27,14 @@ public class ThreadPools implements AsyncConfigurer { .setNameFormat("fixed -- %d").build()); } + @Bean + public TaskScheduler taskScheduler() { + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(10);//不配置默认是1 + return taskScheduler; + } + + // public static Void handleException(Throwable ex) { // log.error("请求出现异常:" + ex.getMessage(), ex); // return null; diff --git a/src/main/java/com/whdc/controller/QXWarnController.java b/src/main/java/com/whdc/controller/QXWarnController.java index 5ee6626..aa70944 100644 --- a/src/main/java/com/whdc/controller/QXWarnController.java +++ b/src/main/java/com/whdc/controller/QXWarnController.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.Cacheable; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -69,19 +70,27 @@ public class QXWarnController { return ResultJson.ok(save); } - @Scheduled(cron = "0/5 * * * * ?") +// @Async + @ApiOperation(value = "预警数据同步接口", notes = "预警数据同步接口") + @GetMapping("/syncData") +// @Scheduled(cron ="0 0/5 * * * ?") public void syncData() { - + log.info("预警数据同步开始!!!"); ApiDto apiDto = new ApiDto(); apiDto.setFilter(Lists.newArrayList()); - String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto)); + String str = HttpUtil.sendPost("http://223.75.53.141:8000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto)); +// String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto)); JSONObject json = JSON.parseObject(str); if (json != null && json.getInteger("code") == HttpStatus.SC_OK) { List data = json.getJSONArray("data").toJavaList(WarningData.class); List warningList = getList(data, null, null, null); + log.info("预警数据同步获取数据条数 " + warningList.size()); List warnIds = warningList.stream().map(QXWarningVO::getWarnid).collect(Collectors.toList()); + List list = service.lambdaQuery().in(QXWarning::getWarnid, warnIds).list(); + log.info("预警数据同步已存预警 " + list.size()); + Set dbWarnSet = list.stream().map(QXWarning::getWarnid).collect(Collectors.toSet()); List adds = Lists.newArrayList(); @@ -105,11 +114,15 @@ public class QXWarnController { } if (CollectionUtils.isNotEmpty(adds)) { + log.info("预警数据同步待添加预警 " + adds.size()); if (this.service.saveBatch(adds)) { log.info("添加成功 " + adds.size()); } } } + + log.info("预警数据同步执行完成!!!"); + } /**