From fd8344fdaaa19b4fee07d8ff6a70c14322b8158b Mon Sep 17 00:00:00 2001 From: cxw <1520264117@qq.com> Date: Mon, 23 Sep 2024 13:52:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AA=80=E6=A0=91=E5=B2=97=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=8D=86=E6=A5=9A=E6=B0=B4=E5=BA=93=E6=B0=B4=E3=80=81=E9=9B=A8?= =?UTF-8?q?=E6=83=85=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../project/xyt/mapper/StPptnRDMapper.java | 5 + .../com/gunshi/project/xyt/model/StPptnR.java | 6 + .../com/gunshi/project/xyt/model/StRsvrR.java | 9 + .../project/xyt/service/StPptnRDService.java | 8 + .../gunshi/project/xyt/timetask/DataTask.java | 4 +- .../project/xyt/timetask/DataTaskTSG.java | 304 ++++++++++++++++++ src/main/resources/config-common.yml | 4 +- src/main/resources/mapper/StPptnRDMapper.xml | 18 ++ src/main/resources/mapper/StPptnRHMapper.xml | 2 +- src/main/resources/mapper/StPptnRMapper.xml | 6 +- src/main/resources/mapper/StRsvrRMapper.xml | 6 +- 11 files changed, 363 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/gunshi/project/xyt/timetask/DataTaskTSG.java diff --git a/src/main/java/com/gunshi/project/xyt/mapper/StPptnRDMapper.java b/src/main/java/com/gunshi/project/xyt/mapper/StPptnRDMapper.java index 20d48a0..ae19535 100644 --- a/src/main/java/com/gunshi/project/xyt/mapper/StPptnRDMapper.java +++ b/src/main/java/com/gunshi/project/xyt/mapper/StPptnRDMapper.java @@ -3,6 +3,7 @@ package com.gunshi.project.xyt.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.gunshi.project.xyt.model.StPptnRD; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import java.util.List; @@ -15,4 +16,8 @@ import java.util.List; public interface StPptnRDMapper extends BaseMapper { List getStcdLastPptnDayData(); + + StPptnRD getMaxData(@Param("stcd") String stcd); + + List reorganizePptnRDData(@Param("stcd") String stcd, @Param("maxData") StPptnRD maxData); } diff --git a/src/main/java/com/gunshi/project/xyt/model/StPptnR.java b/src/main/java/com/gunshi/project/xyt/model/StPptnR.java index 3606396..4862419 100644 --- a/src/main/java/com/gunshi/project/xyt/model/StPptnR.java +++ b/src/main/java/com/gunshi/project/xyt/model/StPptnR.java @@ -101,4 +101,10 @@ public class StPptnR implements Serializable { @TableField(exist = false) private Date etm;// 同步的数据的结束时间 + + @TableField(exist = false) + private String source;// 测站数据源 + + @TableField(exist = false) + private String sttp;// 测站类型 } diff --git a/src/main/java/com/gunshi/project/xyt/model/StRsvrR.java b/src/main/java/com/gunshi/project/xyt/model/StRsvrR.java index 54f66eb..5bce3a8 100644 --- a/src/main/java/com/gunshi/project/xyt/model/StRsvrR.java +++ b/src/main/java/com/gunshi/project/xyt/model/StRsvrR.java @@ -158,4 +158,13 @@ public class StRsvrR implements Serializable { @TableField(exist = false) private Date etm;// 同步的数据的结束时间 + @TableField(exist = false) + private String source;// 测站数据源 + + @TableField(exist = false) + private String sttp;// 测站类型 + + @TableField(exist = false) + private String z;// 山洪河道水位 + } diff --git a/src/main/java/com/gunshi/project/xyt/service/StPptnRDService.java b/src/main/java/com/gunshi/project/xyt/service/StPptnRDService.java index a408a8e..f2a4f10 100644 --- a/src/main/java/com/gunshi/project/xyt/service/StPptnRDService.java +++ b/src/main/java/com/gunshi/project/xyt/service/StPptnRDService.java @@ -96,6 +96,14 @@ public class StPptnRDService extends ServiceImpl } saveBatch(dList); } + + public StPptnRD getMaxData(String stcd) { + return baseMapper.getMaxData(stcd); + } + + public List reorganizePptnRDData(String stcd, StPptnRD maxData) { + return baseMapper.reorganizePptnRDData(stcd, maxData); + } } diff --git a/src/main/java/com/gunshi/project/xyt/timetask/DataTask.java b/src/main/java/com/gunshi/project/xyt/timetask/DataTask.java index 911d70e..6d92e3c 100644 --- a/src/main/java/com/gunshi/project/xyt/timetask/DataTask.java +++ b/src/main/java/com/gunshi/project/xyt/timetask/DataTask.java @@ -62,8 +62,8 @@ public class DataTask { @Value("${reloadCache}") private String reloadCache;// 清除ip白名单缓存 - public static String jcskPathPptnRoute = "/st_pptn_r";// 雨情接口 - public static String jcskPathRiverRoute = "/st_rsvr_r";// 水位接口 + public static String jcskPathPptnRoute = "/xfdbaj/st_pptn_r";// 雨情接口 + public static String jcskPathRiverRoute = "/xfdbaj/st_rsvr_r";// 水位接口 public static String hbskpprealstsRoute = "pubapi/hbsk/pprealsts";// 站点实时雨情 diff --git a/src/main/java/com/gunshi/project/xyt/timetask/DataTaskTSG.java b/src/main/java/com/gunshi/project/xyt/timetask/DataTaskTSG.java new file mode 100644 index 0000000..f54c4bb --- /dev/null +++ b/src/main/java/com/gunshi/project/xyt/timetask/DataTaskTSG.java @@ -0,0 +1,304 @@ +package com.gunshi.project.xyt.timetask; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.gunshi.project.xyt.model.StPptnR; +import com.gunshi.project.xyt.model.StPptnRD; +import com.gunshi.project.xyt.model.StPptnRH; +import com.gunshi.project.xyt.model.StRsvrR; +import com.gunshi.project.xyt.model.StRsvrRReal; +import com.gunshi.project.xyt.service.StPptnRDService; +import com.gunshi.project.xyt.service.StPptnRHService; +import com.gunshi.project.xyt.service.StPptnRRealService; +import com.gunshi.project.xyt.service.StPptnRService; +import com.gunshi.project.xyt.service.StRsvrRRealService; +import com.gunshi.project.xyt.service.StRsvrRService; +import com.gunshi.project.xyt.util.OkHttpUtil; +import com.ruoyi.common.utils.StringUtils; +import lombok.extern.slf4j.Slf4j; +import okhttp3.FormBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Profile; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** +*@description 檀树岗数据定时任务 +*@author cxw +*@classname DataTaskTSG.java +*@create 2024-09-20, 周五, 17:05:22 +*/ +@EnableScheduling//开启定时任务 +@Component +@Slf4j +@Profile("prod") +public class DataTaskTSG { + private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private static SimpleDateFormat sdfD = new SimpleDateFormat("yyyy-MM-dd"); + + + @Value("${jcskPath}") + private String jcskPath;// 荆楚水库同步地址 + + @Value("${jcskToken}") + private String jcskToken;// 荆楚水库同步接口token + + @Value("${reloadCache}") + private String reloadCache;// 清除ip白名单缓存 + + public static String jcskPathPptnRoute = "/pptn/";// 雨情接口,需拼接源(sh、sw、qx) + + public static String jcskPathRrRiverRoute = "/rsvr/";// RR水库水位接口 + public static String jcskPathZzRiverRoute = "/river/";// ZZ河道水位接口 + + // 实时雨情 + @Autowired + private StPptnRRealService stPptnRRealService; + + // 历史雨情 + @Autowired + private StPptnRService stPptnRService; + + // 小时雨情 + @Autowired + private StPptnRHService stPptnRHService; + + // 按天雨情 + @Autowired + private StPptnRDService stPptnRDService; + + // 实时水情 + @Autowired + private StRsvrRRealService stRsvrRRealService; + + // 历史水情 + @Autowired + private StRsvrRService stRsvrRService; + + + /** + * @param + * @description: 定时获取水库雨情数据(历史、天、时、real) + * @return: void + * @auther: cxw + * @date: 2024-09-20, 周五, 17:07:23 + */ + @Async + @Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES) + public void getPptnData() { + Date now = new Date(); + System.out.println("雨情实时定时任务,执行时间:" + sdf.format(now)); + // 获取水库站点编码 + List stcdLastPptnDatas = stPptnRService.getStcdLastPptnData(); + OkHttpClient client = OkHttpUtil.build(); + try { + if (CollectionUtils.isNotEmpty(stcdLastPptnDatas)) { + for (StPptnR stPptnR : stcdLastPptnDatas) { + Calendar calendar = Calendar.getInstance(); + String stcd = stPptnR.getStcd(); + Date stm = stPptnR.getStm(); + // 默认30天前 + if (ObjectUtils.isEmpty(stm)) { + if ("sh".equals(stPptnR.getSource().toLowerCase())) { + calendar.add(Calendar.DATE, -30); + } else { + calendar.add(Calendar.DATE, -2); + calendar.add(Calendar.HOUR_OF_DAY, -20); + } + } else { + int diffDays = new BigDecimal((now.getTime() - stm.getTime()) / (24 * 60 * 60 * 1000)).intValue(); + if (diffDays > 3) { + calendar.add(Calendar.DATE, -2); + calendar.add(Calendar.HOUR_OF_DAY, -20); + } else { + calendar.setTime(stm); + calendar.add(Calendar.MINUTE, 1); + } + } + stm = calendar.getTime(); + String url = jcskPath + jcskPathPptnRoute + stPptnR.getSource().toLowerCase(); + Response resp = client.newCall(new Request.Builder().url(url) + .post(new FormBody.Builder().add("stcd", stcd) + .add("stm", sdf.format(stm)) + .add("etm", sdf.format(now)) + .build()) + .header("Token", jcskToken) + .addHeader("Content-Type", "application/x-www-form-urlencoded") + .build()).execute(); + String respStr = resp.body().string(); + JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase()); + // 清除同步平台的ip白名单缓存 + if ("401".equals(jsonObject.get("code").toString())) { + OkHttpClient clientCache = new OkHttpClient(); + Request request = new Request.Builder().url(reloadCache).build(); + clientCache.newCall(request).execute(); + } + JSONArray data = jsonObject.getJSONArray("data"); + if (ObjectUtils.isNotEmpty(data)) { + List rlist = data.toJavaList(StPptnR.class); + if (CollectionUtils.isNotEmpty(rlist)) { + // 全部替换为stcd,并去重 + rlist = rlist.stream().peek(entity -> entity.setStcd(stPptnR.getStcd())).collect(Collectors.toMap( + e -> e.getStcd().toUpperCase() + "_" + e.getTm(), // 使用属性组合作为键 + Function.identity(), + (existing, replacement) -> existing // 如果有冲突,保留现有的 + )).values().stream().collect(Collectors.toList()); + stPptnRService.saveBatch(rlist); + // 更新到实时数据表 + stPptnRRealService.updatePptnRReal(rlist.get(0).getStcd()); + // 整编降雨量天表数据 + StPptnRD maxDataD = stPptnRDService.getMaxData(stPptnR.getStcd()); + List listD = stPptnRDService.reorganizePptnRDData(stPptnR.getStcd(), maxDataD); + if (CollectionUtils.isNotEmpty(listD)) { + StPptnRD LastData = listD.get(0); + if (LastData.getTm().equals(maxDataD.getTm())) { + listD.remove(0); + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm()); + LastData.setDrp(LastData.getDrp()); + LastData.setYear(Integer.valueOf(sdfD.format(LastData.getTm()).substring(0, 4))); + stPptnRDService.saveOrUpdate(LastData, updateWrapper); + } + if (listD.size() > 0) { + listD = listD.stream() + .map(user -> { + user.setYear(Integer.valueOf(sdfD.format(user.getTm()).substring(0, 4))); // 修改属性值 + return user; + }).collect(Collectors.toList()); + stPptnRDService.saveBatch(listD); + } + } + // 整编降雨量小时表数据 + StPptnRH maxDataH = stPptnRHService.getMaxData(stPptnR.getStcd()); + List listH = stPptnRHService.reorganizePptnRHData(stPptnR.getStcd(), maxDataH); + if (CollectionUtils.isNotEmpty(listH)) { + StPptnRH LastData = listH.get(0); + if (LastData.getTm().equals(maxDataH.getTm())) { + listH.remove(0); + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm()); + LastData.setDrp(LastData.getDrp()); + LastData.setChtm(maxDataH.getChtm()); + stPptnRHService.saveOrUpdate(LastData, updateWrapper); + } + if (listH.size() > 0) { + stPptnRHService.saveBatch(listH); + } + } + } + } + } + } + } catch (IOException e) { + log.error("荆楚水库雨情定时任务失败:", e.getMessage()); + } + } + + /** + * @description: 定时获取水库水情数据(历史、real) + * @param + * @return: void + * @auther: cxw + * @date: 2024-09-23, 周一, 11:23:04 + */ + @Async + @Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES) + public void getRiverData() { + Date now = new Date(); + System.out.println("水情历史定时任务,执行时间:" + sdf.format(now)); + // 获取水库站点编码历史水情表中站点最新数据时间 + List stcdLast = stRsvrRService.getStcdLastRsvrData(); + OkHttpClient client = OkHttpUtil.build(); + try { + if (CollectionUtils.isNotEmpty(stcdLast)) { + for (StRsvrR stRsvrR : stcdLast) { + Date stm = stRsvrR.getStm(); + String source = stRsvrR.getSource().toLowerCase(); + String sttp = stRsvrR.getSttp().toLowerCase(); + Calendar calendar = Calendar.getInstance(); + // 默认40天前(接口最多40天),存在则加1小时(接口是按小时算且大于等于开始时间) + // 默认30天前 + if (ObjectUtils.isEmpty(stm)) { + if ("sh".equals(source)) { + calendar.add(Calendar.DATE, -30); + } else { + calendar.add(Calendar.DATE, -2); + calendar.add(Calendar.HOUR_OF_DAY, -20); + } + } else { + int diffDays = new BigDecimal((now.getTime() - stm.getTime()) / (24 * 60 * 60 * 1000)).intValue(); + if (diffDays > 3) { + calendar.add(Calendar.DATE, -2); + calendar.add(Calendar.HOUR_OF_DAY, -20); + } else { + calendar.setTime(stm); + calendar.add(Calendar.MINUTE, 1); + } + } + stm = calendar.getTime(); + String url = jcskPath; + if ("rr".equals(sttp)) { + url += jcskPathRrRiverRoute + source; + } else if ("zz".equals(sttp)) { + url += jcskPathZzRiverRoute + source; + } else { + continue; + } + Response resp = client.newCall(new Request.Builder().url(url) + .post(new FormBody.Builder().add("stcd", stRsvrR.getStcd()) + .add("stm", sdf.format(stm)) + .add("etm", sdf.format(now)) + .build()) + .header("Token", jcskToken) + .addHeader("Content-Type", "application/x-www-form-urlencoded") + .build()).execute(); + String respStr = resp.body().string(); + JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase()); + if (!"200".equals(jsonObject.get("code").toString())) { + continue; + } + JSONArray data = jsonObject.getJSONArray("data"); + if (CollectionUtils.isNotEmpty(data)) { + List rlist = data.toJavaList(StRsvrR.class); + for (StRsvrR rsvrR : rlist) { + if (StringUtils.isEmpty(rsvrR.getRz()) && StringUtils.isNotEmpty(rsvrR.getZ())) { + rsvrR.setRz(rsvrR.getZ()); + } + } + stRsvrRService.saveBatch(rlist); + // 更新实时表数据 + StRsvrR stRsvrRLast = rlist.get(rlist.size() - 1); + StRsvrRReal stRsvrRReal = new StRsvrRReal(); + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("stcd", stRsvrRLast.getStcd()).eq("tm", stRsvrRLast.getTm()); + BeanUtils.copyProperties(stRsvrRLast, stRsvrRReal); + stRsvrRRealService.saveOrUpdate(stRsvrRReal, updateWrapper); + } + } + } + } catch (IOException e) { + log.error("水情历史定时任务错误:", e.getMessage()); + } + } +} diff --git a/src/main/resources/config-common.yml b/src/main/resources/config-common.yml index f9823c9..4e25067 100644 --- a/src/main/resources/config-common.yml +++ b/src/main/resources/config-common.yml @@ -30,8 +30,8 @@ shqxjsCloudowrCnPath: http://shqxjs.cloudowr.cn/service/ owrsvrPath: http://owrsvr.cloudowr.cn/ -jcskPath: http://223.75.53.124:8002/shareddata/api/v1/monitdata/xfdbaj -jcskToken: 65E30623C28021BDC9400E2552FF88EB345FC5091BBE53C0650C489709C44075B13DD33924FEE38B7DFE9633B68E941126AB16D8BE875B480193141D9B8B8EB8 +jcskPath: http://223.75.53.124:8002/shareddata/api/v1/monitdata +jcskToken: FB1EE57468E0CB9A51306F9056A534778235BF27CBDCB8546B7EABA6FB72BBCBEE4BB01A9CBD8C3899E682C67167C02D81FDABD21639DE2655EB4EE567391899 reloadCache: http://223.75.53.124:8002/shareddata/sys/whitelists/reloadCache diff --git a/src/main/resources/mapper/StPptnRDMapper.xml b/src/main/resources/mapper/StPptnRDMapper.xml index db737b5..4561b49 100644 --- a/src/main/resources/mapper/StPptnRDMapper.xml +++ b/src/main/resources/mapper/StPptnRDMapper.xml @@ -12,4 +12,22 @@ WHERE subquery.rn = 1) r ON stb.stcd = r.stcd WHERE stb.source = 'SK' + + + + diff --git a/src/main/resources/mapper/StPptnRHMapper.xml b/src/main/resources/mapper/StPptnRHMapper.xml index d984958..35f53ed 100644 --- a/src/main/resources/mapper/StPptnRHMapper.xml +++ b/src/main/resources/mapper/StPptnRHMapper.xml @@ -38,7 +38,7 @@ SELECT stb.stcd, - r.tm stm + r.tm stm, + stb.source, + stb.sttp FROM public.st_stbprp_b stb LEFT JOIN (SELECT * FROM (SELECT *, ROW_NUMBER() OVER ( PARTITION BY stcd ORDER BY tm DESC ) AS rn FROM public.st_pptn_r) subquery WHERE subquery.rn = 1) r ON stb.stcd = r.stcd - WHERE stb.source = 'SK' + WHERE stb.source in ('QX', 'SW', 'SH') SELECT stb.stcd, - r.tm stm + r.tm stm, + stb.source, + stb.sttp FROM public.st_stbprp_b stb LEFT JOIN (SELECT * FROM (SELECT *, ROW_NUMBER() OVER ( PARTITION BY stcd ORDER BY tm DESC ) AS rn FROM public.st_rsvr_r) subquery WHERE subquery.rn = 1) r ON stb.stcd = r.stcd - WHERE stb.source = 'SK' + WHERE stb.source in ('QX', 'SW', 'SH')