gunshi-project-ss/src/main/java/com/gunshi/project/xyt/timetask/DataTaskTSG.java

585 lines
30 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.gunshi.project.xyt.timetask;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.gunshi.project.xyt.model.StPptnR;
import com.gunshi.project.xyt.model.StPptnRD;
import com.gunshi.project.xyt.model.StPptnRH;
import com.gunshi.project.xyt.model.StRiverR;
import com.gunshi.project.xyt.model.StRiverRReal;
import com.gunshi.project.xyt.model.StRsvrR;
import com.gunshi.project.xyt.model.StRsvrRReal;
import com.gunshi.project.xyt.service.StPptnRDService;
import com.gunshi.project.xyt.service.StPptnRHService;
import com.gunshi.project.xyt.service.StPptnRRealService;
import com.gunshi.project.xyt.service.StPptnRService;
import com.gunshi.project.xyt.service.StRiverRRealService;
import com.gunshi.project.xyt.service.StRiverRService;
import com.gunshi.project.xyt.service.StRsvrRRealService;
import com.gunshi.project.xyt.service.StRsvrRService;
import com.gunshi.project.xyt.util.OkHttpUtil;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
*@description 檀树岗数据定时任务
*@author cxw
*@classname DataTaskTSG.java
*@create 2024-09-20, 周五, 17:05:22
*/
@EnableScheduling//开启定时任务
@Component
@Slf4j
@Profile({"prod","dev"})
public class DataTaskTSG {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static SimpleDateFormat sdfD = new SimpleDateFormat("yyyy-MM-dd");
private static SimpleDateFormat sdfEight = new SimpleDateFormat("yyyy-MM-dd 08:00:00");
@Value("${jcskPath}")
private String jcskPath;// 荆楚水库同步地址
@Value("${jcskToken}")
private String jcskToken;// 荆楚水库同步接口token
@Value("${reloadCache}")
private String reloadCache;// 清除ip白名单缓存
public static String jcskPathPptnRoute = "/pptn/";// 雨情接口需拼接源sh、sw、qx
public static String jcskPathRrRiverRoute = "/river/";// ZZ河道水位接口
public static String jcskPathZzRsvrRoute = "/rsvr/";// RR水库水位接口
// 实时雨情
@Autowired
private StPptnRRealService stPptnRRealService;
// 历史雨情
@Autowired
private StPptnRService stPptnRService;
// 小时雨情
@Autowired
private StPptnRHService stPptnRHService;
// 按天雨情
@Autowired
private StPptnRDService stPptnRDService;
// 实时水情
@Autowired
private StRsvrRRealService stRsvrRRealService;
// 历史水情
@Autowired
private StRsvrRService stRsvrRService;
// 实时水情
@Autowired
private StRiverRRealService stRiverRRealService;
// 历史水情
@Autowired
private StRiverRService stRiverRService;
/**
* @param
* @description: 定时获取水库雨情数据历史、天、时、real
* @return: void
* @auther: cxw
* @date: 2024-09-20, 周五, 17:07:23
*/
@Async
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
public void getPptnData() {
Date now = new Date();
System.out.println("雨情实时定时任务,执行时间:" + sdf.format(now));
// 获取水库站点编码
List<StPptnR> stcdLastPptnDatas = stPptnRService.getStcdLastPptnData();
OkHttpClient client = OkHttpUtil.build();
try {
if (CollectionUtils.isNotEmpty(stcdLastPptnDatas)) {
for (StPptnR stPptnR : stcdLastPptnDatas) {
Calendar calendar = Calendar.getInstance();
String stcd = stPptnR.getStcd();
Date stm = stPptnR.getStm();
// 默认30天前
if (ObjectUtils.isEmpty(stm)) {
if ("sh".equals(stPptnR.getSource().toLowerCase())) {
calendar.add(Calendar.DATE, -30);
} else {
calendar.add(Calendar.DATE, -2);
calendar.add(Calendar.HOUR_OF_DAY, -20);
}
} else {
int diffDays = new BigDecimal((int) Math.ceil(now.getTime() - stm.getTime() / (24 * 60 * 60 * 1000))).intValue();
if (diffDays > 3) {
calendar.add(Calendar.DATE, -2);
calendar.add(Calendar.HOUR_OF_DAY, -20);
} else {
calendar.setTime(stm);
calendar.add(Calendar.MINUTE, 1);
}
}
stm = calendar.getTime();
String url = jcskPath + jcskPathPptnRoute + stPptnR.getSource().toLowerCase();
Response resp = client.newCall(new Request.Builder().url(url)
.post(new FormBody.Builder().add("stcd", stcd)
.add("stm", sdf.format(stm))
.add("etm", sdf.format(now))
.build())
.header("Token", jcskToken)
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.build()).execute();
String respStr = resp.body().string();
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
// 清除同步平台的ip白名单缓存
if ("401".equals(jsonObject.get("code").toString())) {
OkHttpClient clientCache = new OkHttpClient();
Request request = new Request.Builder().url(reloadCache).build();
clientCache.newCall(request).execute();
}
JSONArray data = jsonObject.getJSONArray("data");
if (ObjectUtils.isNotEmpty(data)) {
List<StPptnR> rlist = data.toJavaList(StPptnR.class);
if (CollectionUtils.isNotEmpty(rlist)) {
// 全部替换为stcd并去重
rlist = rlist.stream().peek(entity -> entity.setStcd(stPptnR.getStcd())).collect(Collectors.toMap(
e -> e.getStcd().toUpperCase() + "_" + e.getTm(), // 使用属性组合作为键
Function.identity(),
(existing, replacement) -> existing // 如果有冲突,保留现有的
)).values().stream().collect(Collectors.toList());
stPptnRService.saveBatch(rlist);
// 更新到实时数据表
stPptnRRealService.updatePptnRReal(rlist.get(0).getStcd());
// 整编降雨量天表数据
StPptnRD maxDataD = stPptnRDService.getMaxData(stPptnR.getStcd());
Date tmMax = maxDataD.getTm();
maxDataD.setTm(sdf.parse(sdfEight.format(tmMax)));
List<StPptnRD> listD = stPptnRDService.reorganizePptnRDData(stPptnR.getStcd(), maxDataD);
if (CollectionUtils.isNotEmpty(listD)) {
if (listD.size() > 0) {
listD = listD.stream()
.map(user -> {
user.setYear(Integer.valueOf(sdfD.format(user.getTm()).substring(0, 4))); // 修改属性值
return user;
}).collect(Collectors.toList());
QueryWrapper<StPptnRD> deleteWrapper = new QueryWrapper<>();
deleteWrapper.eq("stcd", stcd).ge("tm", tmMax);
stPptnRDService.remove(deleteWrapper);
stPptnRDService.saveBatch(listD);
}
}
// 整编降雨量小时表数据
StPptnRH maxDataH = stPptnRHService.getMaxData(stPptnR.getStcd());
List<StPptnRH> listH = stPptnRHService.reorganizePptnRHData(stPptnR.getStcd(), maxDataH);
if (CollectionUtils.isNotEmpty(listH)) {
StPptnRH LastData = listH.get(0);
if (LastData.getTm().equals(maxDataH.getTm())) {
listH.remove(0);
UpdateWrapper<StPptnRH> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm());
LastData.setDrp(LastData.getDrp());
LastData.setChtm(maxDataH.getChtm());
stPptnRHService.saveOrUpdate(LastData, updateWrapper);
}
if (listH.size() > 0) {
stPptnRHService.saveBatch(listH);
}
}
}
}
}
}
} catch (IOException e) {
log.error("荆楚水库雨情定时任务失败:", e.getMessage());
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
/**
* @description: 定时获取水库水情数据历史、real
* @param
* @return: void
* @auther: cxw
* @date: 2024-09-23, 周一, 11:23:04
*/
@Async
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
public void getRiverData() {
Date now = new Date();
System.out.println("水情历史定时任务,执行时间:" + sdf.format(now));
// 获取水库站点编码历史水情表中站点最新数据时间
List<StRsvrR> stcdLast = stRsvrRService.getStcdLastRsvrData();
OkHttpClient client = OkHttpUtil.build();
try {
if (CollectionUtils.isNotEmpty(stcdLast)) {
for (StRsvrR stRsvrR : stcdLast) {
Date stm = stRsvrR.getStm();
String source = stRsvrR.getSource().toLowerCase();
String sttp = stRsvrR.getSttp().toLowerCase();
Calendar calendar = Calendar.getInstance();
// 默认40天前接口最多40天存在则加1小时接口是按小时算且大于等于开始时间
// 默认30天前
if (ObjectUtils.isEmpty(stm)) {
if ("sh".equals(source)) {
calendar.add(Calendar.DATE, -30);
} else {
calendar.add(Calendar.DATE, -2);
calendar.add(Calendar.HOUR_OF_DAY, -20);
}
} else {
int diffDays = new BigDecimal((int) Math.ceil(now.getTime() - stm.getTime() / (24 * 60 * 60 * 1000))).intValue();
if (diffDays > 3) {
calendar.add(Calendar.DATE, -2);
calendar.add(Calendar.HOUR_OF_DAY, -20);
} else {
calendar.setTime(stm);
calendar.add(Calendar.MINUTE, 1);
}
}
stm = calendar.getTime();
String url = jcskPath;
if ("rr".equals(sttp)) {
url += jcskPathZzRsvrRoute + source;
} else if ("zz".equals(sttp)) {
url += jcskPathRrRiverRoute + source;
} else {
continue;
}
Response resp = client.newCall(new Request.Builder().url(url)
.post(new FormBody.Builder().add("stcd", stRsvrR.getStcd())
.add("stm", sdf.format(stm))
.add("etm", sdf.format(now))
.build())
.header("Token", jcskToken)
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.build()).execute();
String respStr = resp.body().string();
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
if (!"200".equals(jsonObject.get("code").toString())) {
continue;
}
JSONArray data = jsonObject.getJSONArray("data");
if (CollectionUtils.isNotEmpty(data)) {
if ("zz".equals(sttp)) {
List<StRiverR> rlist = data.toJavaList(StRiverR.class);
stRiverRService.saveBatch(rlist);
// 更新实时表数据
StRiverR stRiverRLast = rlist.get(rlist.size() - 1);
StRiverRReal stRiverRReal = new StRiverRReal();
UpdateWrapper<StRiverRReal> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", stRiverRLast.getStcd()).eq("tm", stRiverRLast.getTm());
BeanUtils.copyProperties(stRiverRLast, stRiverRReal);
stRiverRReal.setChtm(new Date());
stRiverRRealService.saveOrUpdate(stRiverRReal, updateWrapper);
} else if ("rr".equals(sttp)) {
List<StRsvrR> rlist = data.toJavaList(StRsvrR.class);
stRsvrRService.saveBatch(rlist);
// 更新实时表数据
StRsvrR stRsvrRLast = rlist.get(rlist.size() - 1);
StRsvrRReal stRsvrRReal = new StRsvrRReal();
UpdateWrapper<StRsvrRReal> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", stRsvrRLast.getStcd()).eq("tm", stRsvrRLast.getTm());
BeanUtils.copyProperties(stRsvrRLast, stRsvrRReal);
stRsvrRReal.setChtm(new Date());
stRsvrRRealService.saveOrUpdate(stRsvrRReal, updateWrapper);
}
}
}
}
} catch (IOException e) {
log.error("水情历史定时任务错误:", e.getMessage());
}
}
/**
* @description: 补雨量数据
* @param
* @return: void
* @auther: cxw
* @date: 2024-10-16, 周三, 16:24:29
*/
// @PostConstruct
public void supplementPptnData() {
Date now = new Date();
System.out.println("补雨量数据任务,执行时间:" + sdf.format(now));
List<StPptnR> stcdLastPptnDatas = stPptnRService.getStcdFirstPptnData();
OkHttpClient client = OkHttpUtil.build();
try {
if (CollectionUtils.isNotEmpty(stcdLastPptnDatas)) {
for (StPptnR stPptnR : stcdLastPptnDatas) {
String stm = "2024-01-01 00:00:00";// 补数据的开始时间
Calendar calendar = Calendar.getInstance();
String url = jcskPath + jcskPathPptnRoute + stPptnR.getSource().toLowerCase();
String stcd = stPptnR.getStcd();
Date etmD = null;
if (ObjectUtils.isEmpty(stPptnR.getEtm())) {
etmD = now;
} else {
etmD = stPptnR.getEtm();
}
String etm = sdf.format(etmD);// 补数据的结束时间
if (stmIsBeforeEtm(etm, stm, "yyyy-MM-dd HH:mm:ss")) {
continue;
}
// 结束日期往前减4分钟防止数据重复
Calendar calendarEtm = Calendar.getInstance();
calendarEtm.setTime(sdf.parse(etm));
calendarEtm.add(Calendar.MINUTE, -4);
etm = sdf.format(calendarEtm.getTime());
// 开始时间往后推3天不超过结束日期
while (stmIsBeforeEtm(stm, etm, "yyyy-MM-dd HH:mm:ss")) {
String newEtm = dateManipulation(stm, 3, "yyyy-MM-dd HH:mm:ss");
if(stmIsBeforeEtm(etm, newEtm, "yyyy-MM-dd HH:mm:ss")){
newEtm = etm;
}
Response resp = client.newCall(new Request.Builder().url(url).post(new FormBody.Builder().add("stcd", stcd).add("stm", stm).add("etm", newEtm).build()).header("Token", jcskToken).addHeader("Content-Type", "application/x-www" +
"-form-urlencoded").build()).execute();
String respStr = resp.body().string();
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
// 清除同步平台的ip白名单缓存
if ("401".equals(jsonObject.get("code").toString())) {
OkHttpClient clientCache = new OkHttpClient();
Request request = new Request.Builder().url(reloadCache).build();
clientCache.newCall(request).execute();
}
JSONArray data = jsonObject.getJSONArray("data");
if (ObjectUtils.isNotEmpty(data)) {
List<StPptnR> rlist = data.toJavaList(StPptnR.class);
if (CollectionUtils.isNotEmpty(rlist)) {
// 全部替换为stcd并去重
rlist = rlist.stream().peek(entity -> entity.setStcd(stPptnR.getStcd())).collect(Collectors.toMap(e -> e.getStcd().toUpperCase() + "_" + e.getTm(), // 使用属性组合作为键
Function.identity(), (existing, replacement) -> existing // 如果有冲突,保留现有的
)).values().stream().collect(Collectors.toList());
stPptnRService.saveBatch(rlist);
// 更新到实时数据表
stPptnRRealService.updatePptnRReal(rlist.get(0).getStcd());
// 整编降雨量天表数据
StPptnRD maxDataD = stPptnRDService.getMaxData(stPptnR.getStcd());
Date tmMax = maxDataD.getTm();
maxDataD.setTm(sdf.parse(sdfEight.format(tmMax)));
List<StPptnRD> listD = stPptnRDService.reorganizePptnRDData(stPptnR.getStcd(), maxDataD);
if (CollectionUtils.isNotEmpty(listD)) {
if (listD.size() > 0) {
listD = listD.stream().map(user -> {
user.setYear(Integer.valueOf(sdfD.format(user.getTm()).substring(0, 4))); // 修改属性值
return user;
}).collect(Collectors.toList());
QueryWrapper<StPptnRD> deleteWrapper = new QueryWrapper<>();
deleteWrapper.eq("stcd", stcd).ge("tm", tmMax);
stPptnRDService.remove(deleteWrapper);
stPptnRDService.saveBatch(listD);
}
}
// 整编降雨量小时表数据
StPptnRH maxDataH = stPptnRHService.getMaxData(stPptnR.getStcd());
List<StPptnRH> listH = stPptnRHService.reorganizePptnRHData(stPptnR.getStcd(), maxDataH);
if (CollectionUtils.isNotEmpty(listH)) {
StPptnRH LastData = listH.get(0);
if (LastData.getTm().equals(maxDataH.getTm())) {
listH.remove(0);
UpdateWrapper<StPptnRH> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm());
LastData.setDrp(LastData.getDrp());
LastData.setChtm(maxDataH.getChtm());
stPptnRHService.saveOrUpdate(LastData, updateWrapper);
}
if (listH.size() > 0) {
stPptnRHService.saveBatch(listH);
}
}
}
}
if (stmIsBeforeEtm(etm, newEtm, "yyyy-MM-dd HH:mm:ss")) {
stm = etm;
} else {
stm = newEtm;
}
calendar.setTime(sdf.parse(stm));
calendar.add(Calendar.MINUTE, 5);
stm = sdf.format(calendar.getTime());
}
}
}
} catch (IOException e) {
log.error("补雨量数据任务失败:", e.getMessage());
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}
/**
* @description: 补水位数据
* @param
* @return: void
* @auther: cxw
* @date: 2024-10-16, 周三, 17:13:33
*/
// @PostConstruct
public void supplementRsvrData() {
Date now = new Date();
System.out.println("补水位数据任务,执行时间:" + sdf.format(now));
List<StRsvrR> stcdLast = stRsvrRService.getStcdFirstRsvrData();
OkHttpClient client = OkHttpUtil.build();
try {
if (CollectionUtils.isNotEmpty(stcdLast)) {
for (StRsvrR stRsvrR : stcdLast) {
String sttp = stRsvrR.getSttp().toLowerCase();
String source = stRsvrR.getSource().toLowerCase();
String stm = "2024-01-01 00:00:00";// 补数据的开始时间
Calendar calendar = Calendar.getInstance();
Date etmD = null;
if (ObjectUtils.isEmpty(stRsvrR.getEtm())) {
etmD = now;
} else {
etmD = stRsvrR.getEtm();
}
String etm = sdf.format(etmD);// 补数据的结束时间
if (stmIsBeforeEtm(etm, stm, "yyyy-MM-dd HH:mm:ss")) {
continue;
}
String url = jcskPath;
if ("rr".equals(sttp)) {
url += jcskPathZzRsvrRoute + source;
} else if ("zz".equals(sttp)) {
url += jcskPathRrRiverRoute + source;
} else {
continue;
}
// 结束日期往前减4分钟防止数据重复
Calendar calendarEtm = Calendar.getInstance();
calendarEtm.setTime(sdf.parse(etm));
calendarEtm.add(Calendar.MINUTE, -4);
etm = sdf.format(calendarEtm.getTime());
while (stmIsBeforeEtm(stm, etm, "yyyy-MM-dd HH:mm:ss")) {
String newEtm = dateManipulation(stm, 3, "yyyy-MM-dd HH:mm:ss");
if(stmIsBeforeEtm(etm, newEtm, "yyyy-MM-dd HH:mm:ss")){
newEtm = etm;
}
Response resp = client.newCall(new Request.Builder().url(url)
.post(new FormBody.Builder().add("stcd", stRsvrR.getStcd())
.add("stm", stm)
.add("etm", newEtm)
.build())
.header("Token", jcskToken)
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.build()).execute();
String respStr = resp.body().string();
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
if (!"200".equals(jsonObject.get("code").toString())) {
continue;
}
JSONArray data = jsonObject.getJSONArray("data");
if (CollectionUtils.isNotEmpty(data)) {
if ("zz".equals(sttp)) {
List<StRiverR> rlist = data.toJavaList(StRiverR.class);
stRiverRService.saveBatch(rlist);
// 更新实时表数据
StRiverR stRiverRLast = rlist.get(rlist.size() - 1);
StRiverRReal stRiverRReal = new StRiverRReal();
UpdateWrapper<StRiverRReal> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", stRiverRLast.getStcd()).eq("tm", stRiverRLast.getTm());
BeanUtils.copyProperties(stRiverRLast, stRiverRReal);
stRiverRReal.setChtm(new Date());
stRiverRRealService.saveOrUpdate(stRiverRReal, updateWrapper);
} else if ("rr".equals(sttp)) {
List<StRsvrR> rlist = data.toJavaList(StRsvrR.class);
stRsvrRService.saveBatch(rlist);
// 更新实时表数据
StRsvrR stRsvrRLast = rlist.get(rlist.size() - 1);
StRsvrRReal stRsvrRReal = new StRsvrRReal();
UpdateWrapper<StRsvrRReal> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("stcd", stRsvrRLast.getStcd()).eq("tm", stRsvrRLast.getTm());
BeanUtils.copyProperties(stRsvrRLast, stRsvrRReal);
stRsvrRReal.setChtm(new Date());
stRsvrRRealService.saveOrUpdate(stRsvrRReal, updateWrapper);
}
}
if (stmIsBeforeEtm(etm, newEtm, "yyyy-MM-dd HH:mm:ss")) {
stm = etm;
} else {
stm = newEtm;
}
calendar.setTime(sdf.parse(stm));
calendar.add(Calendar.MINUTE, 5);
stm = sdf.format(calendar.getTime());
}
}
}
} catch (IOException e) {
log.error("补水位数据任务错误:", e.getMessage());
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
}
private Boolean stmIsBeforeEtm(String stm, String etm, String fmt) {
Boolean ret = false;
try {
SimpleDateFormat sdf = new SimpleDateFormat(fmt);
Date date1 = sdf.parse(stm);
Date date2 = sdf.parse(etm);
if (date1.equals(date2)) {
ret = true;
} else if (date1.before(date2)) {
ret = true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("时间大小不正确!");
}
return ret;
}
private String dateManipulation(String tm, int daysToSubtract, String fmt) {
String newDateString = "";
try {
SimpleDateFormat dateFormat = new SimpleDateFormat(fmt);
Date parsedDate = dateFormat.parse(tm);
Calendar calendar = Calendar.getInstance();
calendar.setTime(parsedDate);
calendar.add(Calendar.DATE, daysToSubtract);
newDateString = dateFormat.format(calendar.getTime());
} catch (Exception e) {
throw new RuntimeException(e);
}
return newDateString;
}
}