正在生效的预警信息修改 预警消息存库修改
parent
09f410c0c6
commit
83081a269b
|
|
@ -1,9 +1,30 @@
|
||||||
package com.whdc.component;
|
package com.whdc.component;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.whdc.model.dto.ApiDto;
|
||||||
|
import com.whdc.model.entity.QXWarning;
|
||||||
|
import com.whdc.model.vo.QXWarningVO;
|
||||||
|
import com.whdc.model.vo.WarningData;
|
||||||
|
import com.whdc.service.IQXWarningService;
|
||||||
|
import com.whdc.utils.DateUtils;
|
||||||
|
import com.whdc.utils.HttpUtil;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.compress.utils.Lists;
|
||||||
|
import org.apache.http.HttpStatus;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author 李赛
|
* @author 李赛
|
||||||
|
|
@ -17,4 +38,100 @@ public class MyPostConstruct {
|
||||||
public void initCache() {
|
public void initCache() {
|
||||||
log.debug("加载缓存");
|
log.debug("加载缓存");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IQXWarningService service;
|
||||||
|
|
||||||
|
|
||||||
|
@Async
|
||||||
|
@ApiOperation(value = "预警数据同步接口", notes = "预警数据同步接口")
|
||||||
|
@GetMapping("/syncData")
|
||||||
|
@Scheduled(cron ="0 0/1 * * * ?")
|
||||||
|
public void syncData() {
|
||||||
|
log.info("预警数据同步开始!!!");
|
||||||
|
ApiDto apiDto = new ApiDto();
|
||||||
|
apiDto.setFilter(Lists.newArrayList());
|
||||||
|
|
||||||
|
String str = HttpUtil.sendPost("http://223.75.53.141:8000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto));
|
||||||
|
// String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto));
|
||||||
|
JSONObject json = JSON.parseObject(str);
|
||||||
|
if (json != null && json.getInteger("code") == HttpStatus.SC_OK) {
|
||||||
|
List<WarningData> data = json.getJSONArray("data").toJavaList(WarningData.class);
|
||||||
|
List<QXWarningVO> warningList = getList(data);
|
||||||
|
log.info("预警数据同步获取数据条数 " + warningList.size());
|
||||||
|
List<Integer> warnIds = warningList.stream().map(QXWarningVO::getWarnid).collect(Collectors.toList());
|
||||||
|
|
||||||
|
List<QXWarning> list = service.lambdaQuery().in(QXWarning::getWarnid, warnIds).list();
|
||||||
|
log.info("预警数据同步已存预警 " + list.size());
|
||||||
|
|
||||||
|
Set<Integer> dbWarnSet = list.stream().map(QXWarning::getWarnid).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
List<QXWarning> adds = Lists.newArrayList();
|
||||||
|
for (QXWarningVO warningVO : warningList) {
|
||||||
|
if (!dbWarnSet.contains(warningVO.getWarnid())) {
|
||||||
|
QXWarning qxwarning = new QXWarning();
|
||||||
|
qxwarning.setCreateTime(DateUtils.dateToStrYMDHds(warningVO.getCreateTime()));
|
||||||
|
qxwarning.setPublishTime(DateUtils.dateToStrYMDHds(warningVO.getPublishTime()));
|
||||||
|
qxwarning.setStartTime(DateUtils.dateToStrYMDHds(warningVO.getStartTime()));
|
||||||
|
qxwarning.setEndTime(DateUtils.dateToStrYMDHds(warningVO.getEndTime()));
|
||||||
|
qxwarning.setWarnSignalType(warningVO.getWarnSignalType());
|
||||||
|
qxwarning.setWarnSignalLevel(warningVO.getWarnSignalLevel());
|
||||||
|
qxwarning.setPublishUnit(warningVO.getPublishUnit());
|
||||||
|
qxwarning.setContent(warningVO.getContent());
|
||||||
|
qxwarning.setWarnid(warningVO.getWarnid());
|
||||||
|
qxwarning.setCtnm(warningVO.getCtnm());
|
||||||
|
qxwarning.setCnnm(warningVO.getCnnm());
|
||||||
|
adds.add(qxwarning);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CollectionUtils.isNotEmpty(adds)) {
|
||||||
|
log.info("预警数据同步待添加预警 " + adds.size());
|
||||||
|
if (this.service.saveBatch(adds)) {
|
||||||
|
log.info("添加成功 " + adds.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("预警数据同步执行完成!!!");
|
||||||
|
|
||||||
|
}
|
||||||
|
private List<QXWarningVO> getList(List<WarningData> data) {
|
||||||
|
|
||||||
|
List<QXWarningVO> respList = Lists.newArrayList();
|
||||||
|
for (WarningData datum : data) {//最外层的列表
|
||||||
|
String ctnm = datum.getEffectArea();//市级范围
|
||||||
|
for (WarningData.TypeList typeList : datum.getTypeList()) {//里面的
|
||||||
|
// 预警类型
|
||||||
|
String type = typeList.getType();
|
||||||
|
List<WarningData.Warning> warnList = typeList.getWarnList();
|
||||||
|
|
||||||
|
for (WarningData.Warning warning : warnList) {
|
||||||
|
String cnnm = warning.getEffectArea();
|
||||||
|
QXWarningVO vo = new QXWarningVO();
|
||||||
|
String publishUnit = warning.getPublishUnit();
|
||||||
|
// vo.setTitle(publishUnit + "发布" + type + "预警");
|
||||||
|
vo.setCtnm(ctnm);//市级名称
|
||||||
|
vo.setCnnm(cnnm);//县级名称
|
||||||
|
vo.setPublishUnit(publishUnit);//发布单位
|
||||||
|
vo.setPublishTime(warning.getPublishTime());//预警发布时间
|
||||||
|
vo.setWarnSignalType(warning.getWarnSignalType());//预警类型
|
||||||
|
vo.setWarnSignalLevel(warning.getWarnSignalLevel());//预警级别
|
||||||
|
vo.setContent(warning.getContent());//预警内容
|
||||||
|
vo.setWarnid(warning.getId());
|
||||||
|
vo.setCreateTime(warning.getCreateTime());
|
||||||
|
vo.setStartTime(warning.getStartTime());
|
||||||
|
vo.setEndTime(warning.getEndTime());
|
||||||
|
respList.add(vo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
respList = respList.stream().filter(o -> "暴雨".equals(o.getWarnSignalType()) || "雷雨大风".equals(o.getWarnSignalType())).collect(Collectors.toList());
|
||||||
|
respList = respList.stream().filter(o -> "红色".equals(o.getWarnSignalLevel()) || "橙色".equals(o.getWarnSignalLevel())).collect(Collectors.toList());
|
||||||
|
return respList.stream().sorted(Comparator.comparing(QXWarningVO::getPublishTime).reversed())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,9 @@ package com.whdc.config;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
import org.springframework.scheduling.annotation.AsyncConfigurer;
|
import org.springframework.scheduling.annotation.AsyncConfigurer;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
@ -25,6 +27,14 @@ public class ThreadPools implements AsyncConfigurer {
|
||||||
.setNameFormat("fixed -- %d").build());
|
.setNameFormat("fixed -- %d").build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public TaskScheduler taskScheduler() {
|
||||||
|
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||||
|
taskScheduler.setPoolSize(10);//不配置默认是1
|
||||||
|
return taskScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// public static Void handleException(Throwable ex) {
|
// public static Void handleException(Throwable ex) {
|
||||||
// log.error("请求出现异常:" + ex.getMessage(), ex);
|
// log.error("请求出现异常:" + ex.getMessage(), ex);
|
||||||
// return null;
|
// return null;
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.cache.annotation.Cacheable;
|
import org.springframework.cache.annotation.Cacheable;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
@ -69,19 +70,27 @@ public class QXWarnController {
|
||||||
return ResultJson.ok(save);
|
return ResultJson.ok(save);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Scheduled(cron = "0/5 * * * * ?")
|
// @Async
|
||||||
|
@ApiOperation(value = "预警数据同步接口", notes = "预警数据同步接口")
|
||||||
|
@GetMapping("/syncData")
|
||||||
|
// @Scheduled(cron ="0 0/5 * * * ?")
|
||||||
public void syncData() {
|
public void syncData() {
|
||||||
|
log.info("预警数据同步开始!!!");
|
||||||
ApiDto apiDto = new ApiDto();
|
ApiDto apiDto = new ApiDto();
|
||||||
apiDto.setFilter(Lists.newArrayList());
|
apiDto.setFilter(Lists.newArrayList());
|
||||||
|
|
||||||
String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto));
|
String str = HttpUtil.sendPost("http://223.75.53.141:8000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto));
|
||||||
|
// String str = HttpUtil.sendPost("http://127.0.0.1:20000/shzh/met/zyqxfw/api/warning/getGroupWarning", JSON.toJSONString(apiDto));
|
||||||
JSONObject json = JSON.parseObject(str);
|
JSONObject json = JSON.parseObject(str);
|
||||||
if (json != null && json.getInteger("code") == HttpStatus.SC_OK) {
|
if (json != null && json.getInteger("code") == HttpStatus.SC_OK) {
|
||||||
List<WarningData> data = json.getJSONArray("data").toJavaList(WarningData.class);
|
List<WarningData> data = json.getJSONArray("data").toJavaList(WarningData.class);
|
||||||
List<QXWarningVO> warningList = getList(data, null, null, null);
|
List<QXWarningVO> warningList = getList(data, null, null, null);
|
||||||
|
log.info("预警数据同步获取数据条数 " + warningList.size());
|
||||||
List<Integer> warnIds = warningList.stream().map(QXWarningVO::getWarnid).collect(Collectors.toList());
|
List<Integer> warnIds = warningList.stream().map(QXWarningVO::getWarnid).collect(Collectors.toList());
|
||||||
|
|
||||||
List<QXWarning> list = service.lambdaQuery().in(QXWarning::getWarnid, warnIds).list();
|
List<QXWarning> list = service.lambdaQuery().in(QXWarning::getWarnid, warnIds).list();
|
||||||
|
log.info("预警数据同步已存预警 " + list.size());
|
||||||
|
|
||||||
Set<Integer> dbWarnSet = list.stream().map(QXWarning::getWarnid).collect(Collectors.toSet());
|
Set<Integer> dbWarnSet = list.stream().map(QXWarning::getWarnid).collect(Collectors.toSet());
|
||||||
|
|
||||||
List<QXWarning> adds = Lists.newArrayList();
|
List<QXWarning> adds = Lists.newArrayList();
|
||||||
|
|
@ -105,11 +114,15 @@ public class QXWarnController {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CollectionUtils.isNotEmpty(adds)) {
|
if (CollectionUtils.isNotEmpty(adds)) {
|
||||||
|
log.info("预警数据同步待添加预警 " + adds.size());
|
||||||
if (this.service.saveBatch(adds)) {
|
if (this.service.saveBatch(adds)) {
|
||||||
log.info("添加成功 " + adds.size());
|
log.info("添加成功 " + adds.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("预警数据同步执行完成!!!");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue