檀树岗增加荆楚水库水、雨情数据同步
parent
aaa437eae8
commit
fd8344fdaa
|
|
@ -3,6 +3,7 @@ package com.gunshi.project.xyt.mapper;
|
|||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.gunshi.project.xyt.model.StPptnRD;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -15,4 +16,8 @@ import java.util.List;
|
|||
public interface StPptnRDMapper extends BaseMapper<StPptnRD> {
|
||||
|
||||
List<StPptnRD> getStcdLastPptnDayData();
|
||||
|
||||
StPptnRD getMaxData(@Param("stcd") String stcd);
|
||||
|
||||
List<StPptnRD> reorganizePptnRDData(@Param("stcd") String stcd, @Param("maxData") StPptnRD maxData);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,4 +101,10 @@ public class StPptnR implements Serializable {
|
|||
|
||||
@TableField(exist = false)
|
||||
private Date etm;// 同步的数据的结束时间
|
||||
|
||||
@TableField(exist = false)
|
||||
private String source;// 测站数据源
|
||||
|
||||
@TableField(exist = false)
|
||||
private String sttp;// 测站类型
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,4 +158,13 @@ public class StRsvrR implements Serializable {
|
|||
@TableField(exist = false)
|
||||
private Date etm;// 同步的数据的结束时间
|
||||
|
||||
@TableField(exist = false)
|
||||
private String source;// 测站数据源
|
||||
|
||||
@TableField(exist = false)
|
||||
private String sttp;// 测站类型
|
||||
|
||||
@TableField(exist = false)
|
||||
private String z;// 山洪河道水位
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,6 +96,14 @@ public class StPptnRDService extends ServiceImpl<StPptnRDMapper, StPptnRD>
|
|||
}
|
||||
saveBatch(dList);
|
||||
}
|
||||
|
||||
public StPptnRD getMaxData(String stcd) {
|
||||
return baseMapper.getMaxData(stcd);
|
||||
}
|
||||
|
||||
public List<StPptnRD> reorganizePptnRDData(String stcd, StPptnRD maxData) {
|
||||
return baseMapper.reorganizePptnRDData(stcd, maxData);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -62,8 +62,8 @@ public class DataTask {
|
|||
@Value("${reloadCache}")
|
||||
private String reloadCache;// 清除ip白名单缓存
|
||||
|
||||
public static String jcskPathPptnRoute = "/st_pptn_r";// 雨情接口
|
||||
public static String jcskPathRiverRoute = "/st_rsvr_r";// 水位接口
|
||||
public static String jcskPathPptnRoute = "/xfdbaj/st_pptn_r";// 雨情接口
|
||||
public static String jcskPathRiverRoute = "/xfdbaj/st_rsvr_r";// 水位接口
|
||||
|
||||
public static String hbskpprealstsRoute = "pubapi/hbsk/pprealsts";// 站点实时雨情
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,304 @@
|
|||
package com.gunshi.project.xyt.timetask;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
||||
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
|
||||
import com.gunshi.project.xyt.model.StPptnR;
|
||||
import com.gunshi.project.xyt.model.StPptnRD;
|
||||
import com.gunshi.project.xyt.model.StPptnRH;
|
||||
import com.gunshi.project.xyt.model.StRsvrR;
|
||||
import com.gunshi.project.xyt.model.StRsvrRReal;
|
||||
import com.gunshi.project.xyt.service.StPptnRDService;
|
||||
import com.gunshi.project.xyt.service.StPptnRHService;
|
||||
import com.gunshi.project.xyt.service.StPptnRRealService;
|
||||
import com.gunshi.project.xyt.service.StPptnRService;
|
||||
import com.gunshi.project.xyt.service.StRsvrRRealService;
|
||||
import com.gunshi.project.xyt.service.StRsvrRService;
|
||||
import com.gunshi.project.xyt.util.OkHttpUtil;
|
||||
import com.ruoyi.common.utils.StringUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.FormBody;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*@description 檀树岗数据定时任务
|
||||
*@author cxw
|
||||
*@classname DataTaskTSG.java
|
||||
*@create 2024-09-20, 周五, 17:05:22
|
||||
*/
|
||||
@EnableScheduling//开启定时任务
|
||||
@Component
|
||||
@Slf4j
|
||||
@Profile("prod")
|
||||
public class DataTaskTSG {
|
||||
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
private static SimpleDateFormat sdfD = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
|
||||
@Value("${jcskPath}")
|
||||
private String jcskPath;// 荆楚水库同步地址
|
||||
|
||||
@Value("${jcskToken}")
|
||||
private String jcskToken;// 荆楚水库同步接口token
|
||||
|
||||
@Value("${reloadCache}")
|
||||
private String reloadCache;// 清除ip白名单缓存
|
||||
|
||||
public static String jcskPathPptnRoute = "/pptn/";// 雨情接口,需拼接源(sh、sw、qx)
|
||||
|
||||
public static String jcskPathRrRiverRoute = "/rsvr/";// RR水库水位接口
|
||||
public static String jcskPathZzRiverRoute = "/river/";// ZZ河道水位接口
|
||||
|
||||
// 实时雨情
|
||||
@Autowired
|
||||
private StPptnRRealService stPptnRRealService;
|
||||
|
||||
// 历史雨情
|
||||
@Autowired
|
||||
private StPptnRService stPptnRService;
|
||||
|
||||
// 小时雨情
|
||||
@Autowired
|
||||
private StPptnRHService stPptnRHService;
|
||||
|
||||
// 按天雨情
|
||||
@Autowired
|
||||
private StPptnRDService stPptnRDService;
|
||||
|
||||
// 实时水情
|
||||
@Autowired
|
||||
private StRsvrRRealService stRsvrRRealService;
|
||||
|
||||
// 历史水情
|
||||
@Autowired
|
||||
private StRsvrRService stRsvrRService;
|
||||
|
||||
|
||||
/**
|
||||
* @param
|
||||
* @description: 定时获取水库雨情数据(历史、天、时、real)
|
||||
* @return: void
|
||||
* @auther: cxw
|
||||
* @date: 2024-09-20, 周五, 17:07:23
|
||||
*/
|
||||
@Async
|
||||
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
|
||||
public void getPptnData() {
|
||||
Date now = new Date();
|
||||
System.out.println("雨情实时定时任务,执行时间:" + sdf.format(now));
|
||||
// 获取水库站点编码
|
||||
List<StPptnR> stcdLastPptnDatas = stPptnRService.getStcdLastPptnData();
|
||||
OkHttpClient client = OkHttpUtil.build();
|
||||
try {
|
||||
if (CollectionUtils.isNotEmpty(stcdLastPptnDatas)) {
|
||||
for (StPptnR stPptnR : stcdLastPptnDatas) {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
String stcd = stPptnR.getStcd();
|
||||
Date stm = stPptnR.getStm();
|
||||
// 默认30天前
|
||||
if (ObjectUtils.isEmpty(stm)) {
|
||||
if ("sh".equals(stPptnR.getSource().toLowerCase())) {
|
||||
calendar.add(Calendar.DATE, -30);
|
||||
} else {
|
||||
calendar.add(Calendar.DATE, -2);
|
||||
calendar.add(Calendar.HOUR_OF_DAY, -20);
|
||||
}
|
||||
} else {
|
||||
int diffDays = new BigDecimal((now.getTime() - stm.getTime()) / (24 * 60 * 60 * 1000)).intValue();
|
||||
if (diffDays > 3) {
|
||||
calendar.add(Calendar.DATE, -2);
|
||||
calendar.add(Calendar.HOUR_OF_DAY, -20);
|
||||
} else {
|
||||
calendar.setTime(stm);
|
||||
calendar.add(Calendar.MINUTE, 1);
|
||||
}
|
||||
}
|
||||
stm = calendar.getTime();
|
||||
String url = jcskPath + jcskPathPptnRoute + stPptnR.getSource().toLowerCase();
|
||||
Response resp = client.newCall(new Request.Builder().url(url)
|
||||
.post(new FormBody.Builder().add("stcd", stcd)
|
||||
.add("stm", sdf.format(stm))
|
||||
.add("etm", sdf.format(now))
|
||||
.build())
|
||||
.header("Token", jcskToken)
|
||||
.addHeader("Content-Type", "application/x-www-form-urlencoded")
|
||||
.build()).execute();
|
||||
String respStr = resp.body().string();
|
||||
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
|
||||
// 清除同步平台的ip白名单缓存
|
||||
if ("401".equals(jsonObject.get("code").toString())) {
|
||||
OkHttpClient clientCache = new OkHttpClient();
|
||||
Request request = new Request.Builder().url(reloadCache).build();
|
||||
clientCache.newCall(request).execute();
|
||||
}
|
||||
JSONArray data = jsonObject.getJSONArray("data");
|
||||
if (ObjectUtils.isNotEmpty(data)) {
|
||||
List<StPptnR> rlist = data.toJavaList(StPptnR.class);
|
||||
if (CollectionUtils.isNotEmpty(rlist)) {
|
||||
// 全部替换为stcd,并去重
|
||||
rlist = rlist.stream().peek(entity -> entity.setStcd(stPptnR.getStcd())).collect(Collectors.toMap(
|
||||
e -> e.getStcd().toUpperCase() + "_" + e.getTm(), // 使用属性组合作为键
|
||||
Function.identity(),
|
||||
(existing, replacement) -> existing // 如果有冲突,保留现有的
|
||||
)).values().stream().collect(Collectors.toList());
|
||||
stPptnRService.saveBatch(rlist);
|
||||
// 更新到实时数据表
|
||||
stPptnRRealService.updatePptnRReal(rlist.get(0).getStcd());
|
||||
// 整编降雨量天表数据
|
||||
StPptnRD maxDataD = stPptnRDService.getMaxData(stPptnR.getStcd());
|
||||
List<StPptnRD> listD = stPptnRDService.reorganizePptnRDData(stPptnR.getStcd(), maxDataD);
|
||||
if (CollectionUtils.isNotEmpty(listD)) {
|
||||
StPptnRD LastData = listD.get(0);
|
||||
if (LastData.getTm().equals(maxDataD.getTm())) {
|
||||
listD.remove(0);
|
||||
UpdateWrapper<StPptnRD> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm());
|
||||
LastData.setDrp(LastData.getDrp());
|
||||
LastData.setYear(Integer.valueOf(sdfD.format(LastData.getTm()).substring(0, 4)));
|
||||
stPptnRDService.saveOrUpdate(LastData, updateWrapper);
|
||||
}
|
||||
if (listD.size() > 0) {
|
||||
listD = listD.stream()
|
||||
.map(user -> {
|
||||
user.setYear(Integer.valueOf(sdfD.format(user.getTm()).substring(0, 4))); // 修改属性值
|
||||
return user;
|
||||
}).collect(Collectors.toList());
|
||||
stPptnRDService.saveBatch(listD);
|
||||
}
|
||||
}
|
||||
// 整编降雨量小时表数据
|
||||
StPptnRH maxDataH = stPptnRHService.getMaxData(stPptnR.getStcd());
|
||||
List<StPptnRH> listH = stPptnRHService.reorganizePptnRHData(stPptnR.getStcd(), maxDataH);
|
||||
if (CollectionUtils.isNotEmpty(listH)) {
|
||||
StPptnRH LastData = listH.get(0);
|
||||
if (LastData.getTm().equals(maxDataH.getTm())) {
|
||||
listH.remove(0);
|
||||
UpdateWrapper<StPptnRH> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("stcd", LastData.getStcd()).eq("tm", LastData.getTm());
|
||||
LastData.setDrp(LastData.getDrp());
|
||||
LastData.setChtm(maxDataH.getChtm());
|
||||
stPptnRHService.saveOrUpdate(LastData, updateWrapper);
|
||||
}
|
||||
if (listH.size() > 0) {
|
||||
stPptnRHService.saveBatch(listH);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("荆楚水库雨情定时任务失败:", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @description: 定时获取水库水情数据(历史、real)
|
||||
* @param
|
||||
* @return: void
|
||||
* @auther: cxw
|
||||
* @date: 2024-09-23, 周一, 11:23:04
|
||||
*/
|
||||
@Async
|
||||
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
|
||||
public void getRiverData() {
|
||||
Date now = new Date();
|
||||
System.out.println("水情历史定时任务,执行时间:" + sdf.format(now));
|
||||
// 获取水库站点编码历史水情表中站点最新数据时间
|
||||
List<StRsvrR> stcdLast = stRsvrRService.getStcdLastRsvrData();
|
||||
OkHttpClient client = OkHttpUtil.build();
|
||||
try {
|
||||
if (CollectionUtils.isNotEmpty(stcdLast)) {
|
||||
for (StRsvrR stRsvrR : stcdLast) {
|
||||
Date stm = stRsvrR.getStm();
|
||||
String source = stRsvrR.getSource().toLowerCase();
|
||||
String sttp = stRsvrR.getSttp().toLowerCase();
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
// 默认40天前(接口最多40天),存在则加1小时(接口是按小时算且大于等于开始时间)
|
||||
// 默认30天前
|
||||
if (ObjectUtils.isEmpty(stm)) {
|
||||
if ("sh".equals(source)) {
|
||||
calendar.add(Calendar.DATE, -30);
|
||||
} else {
|
||||
calendar.add(Calendar.DATE, -2);
|
||||
calendar.add(Calendar.HOUR_OF_DAY, -20);
|
||||
}
|
||||
} else {
|
||||
int diffDays = new BigDecimal((now.getTime() - stm.getTime()) / (24 * 60 * 60 * 1000)).intValue();
|
||||
if (diffDays > 3) {
|
||||
calendar.add(Calendar.DATE, -2);
|
||||
calendar.add(Calendar.HOUR_OF_DAY, -20);
|
||||
} else {
|
||||
calendar.setTime(stm);
|
||||
calendar.add(Calendar.MINUTE, 1);
|
||||
}
|
||||
}
|
||||
stm = calendar.getTime();
|
||||
String url = jcskPath;
|
||||
if ("rr".equals(sttp)) {
|
||||
url += jcskPathRrRiverRoute + source;
|
||||
} else if ("zz".equals(sttp)) {
|
||||
url += jcskPathZzRiverRoute + source;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
Response resp = client.newCall(new Request.Builder().url(url)
|
||||
.post(new FormBody.Builder().add("stcd", stRsvrR.getStcd())
|
||||
.add("stm", sdf.format(stm))
|
||||
.add("etm", sdf.format(now))
|
||||
.build())
|
||||
.header("Token", jcskToken)
|
||||
.addHeader("Content-Type", "application/x-www-form-urlencoded")
|
||||
.build()).execute();
|
||||
String respStr = resp.body().string();
|
||||
JSONObject jsonObject = JSONObject.parseObject(respStr.toLowerCase());
|
||||
if (!"200".equals(jsonObject.get("code").toString())) {
|
||||
continue;
|
||||
}
|
||||
JSONArray data = jsonObject.getJSONArray("data");
|
||||
if (CollectionUtils.isNotEmpty(data)) {
|
||||
List<StRsvrR> rlist = data.toJavaList(StRsvrR.class);
|
||||
for (StRsvrR rsvrR : rlist) {
|
||||
if (StringUtils.isEmpty(rsvrR.getRz()) && StringUtils.isNotEmpty(rsvrR.getZ())) {
|
||||
rsvrR.setRz(rsvrR.getZ());
|
||||
}
|
||||
}
|
||||
stRsvrRService.saveBatch(rlist);
|
||||
// 更新实时表数据
|
||||
StRsvrR stRsvrRLast = rlist.get(rlist.size() - 1);
|
||||
StRsvrRReal stRsvrRReal = new StRsvrRReal();
|
||||
UpdateWrapper<StRsvrRReal> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("stcd", stRsvrRLast.getStcd()).eq("tm", stRsvrRLast.getTm());
|
||||
BeanUtils.copyProperties(stRsvrRLast, stRsvrRReal);
|
||||
stRsvrRRealService.saveOrUpdate(stRsvrRReal, updateWrapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("水情历史定时任务错误:", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -30,8 +30,8 @@ shqxjsCloudowrCnPath: http://shqxjs.cloudowr.cn/service/
|
|||
|
||||
owrsvrPath: http://owrsvr.cloudowr.cn/
|
||||
|
||||
jcskPath: http://223.75.53.124:8002/shareddata/api/v1/monitdata/xfdbaj
|
||||
jcskToken: 65E30623C28021BDC9400E2552FF88EB345FC5091BBE53C0650C489709C44075B13DD33924FEE38B7DFE9633B68E941126AB16D8BE875B480193141D9B8B8EB8
|
||||
jcskPath: http://223.75.53.124:8002/shareddata/api/v1/monitdata
|
||||
jcskToken: FB1EE57468E0CB9A51306F9056A534778235BF27CBDCB8546B7EABA6FB72BBCBEE4BB01A9CBD8C3899E682C67167C02D81FDABD21639DE2655EB4EE567391899
|
||||
|
||||
reloadCache: http://223.75.53.124:8002/shareddata/sys/whitelists/reloadCache
|
||||
|
||||
|
|
|
|||
|
|
@ -12,4 +12,22 @@
|
|||
WHERE subquery.rn = 1) r ON stb.stcd = r.stcd
|
||||
WHERE stb.source = 'SK'
|
||||
</select>
|
||||
|
||||
<select id="getMaxData" resultType="com.gunshi.project.xyt.model.StPptnRD">
|
||||
SELECT #{stcd} stcd, COALESCE(max(tm), '2021-01-01 00:00:00') tm FROM st_pptn_r_d
|
||||
</select>
|
||||
|
||||
<select id="reorganizePptnRDData" resultType="com.gunshi.project.xyt.model.StPptnRD">
|
||||
select d.stcd, to_char(d.day_date, 'YYYY-MM-DD 00:00:00') tm, d.drp
|
||||
from (SELECT stcd,
|
||||
-- 计算分组键:直接调整时间戳以8点为基准
|
||||
DATE_TRUNC('day', tm - INTERVAL '8 hour' - INTERVAL '1 second') + INTERVAL '8 hour' AS day_date, sum (drp) AS drp
|
||||
FROM
|
||||
st_pptn_r
|
||||
WHERE stcd = #{stcd} AND tm >= #{maxData.tm}
|
||||
GROUP BY
|
||||
day_date, stcd
|
||||
ORDER BY
|
||||
day_date desc) d
|
||||
</select>
|
||||
</mapper>
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@
|
|||
</select>
|
||||
|
||||
<select id="getMaxData" resultType="com.gunshi.project.xyt.model.StPptnRH">
|
||||
SELECT #{stcd} stcd, COALESCE(max(tm), '2021-01-01 00:00:00') tm, MAX ( chtm ) chtm FROM st_pptn_r_h
|
||||
SELECT #{stcd} stcd, COALESCE(max(tm), '2021-01-01 00:00:00') tm, MAX ( chtm ) chtm FROM st_pptn_r_h WHERE stcd = #{stcd}
|
||||
</select>
|
||||
|
||||
<select id="queryDayRz" resultType="com.gunshi.project.xyt.model.StPptnRD">
|
||||
|
|
|
|||
|
|
@ -4,12 +4,14 @@
|
|||
|
||||
<select id="getStcdLastPptnData" resultType="com.gunshi.project.xyt.model.StPptnR">
|
||||
SELECT stb.stcd,
|
||||
r.tm stm
|
||||
r.tm stm,
|
||||
stb.source,
|
||||
stb.sttp
|
||||
FROM public.st_stbprp_b stb
|
||||
LEFT JOIN (SELECT *
|
||||
FROM (SELECT *, ROW_NUMBER() OVER ( PARTITION BY stcd ORDER BY tm DESC ) AS rn FROM public.st_pptn_r) subquery
|
||||
WHERE subquery.rn = 1) r ON stb.stcd = r.stcd
|
||||
WHERE stb.source = 'SK'
|
||||
WHERE stb.source in ('QX', 'SW', 'SH')
|
||||
</select>
|
||||
|
||||
<select id="getPptnRDataList" resultType="java.util.Map">
|
||||
|
|
|
|||
|
|
@ -4,11 +4,13 @@
|
|||
|
||||
<select id="getStcdLastRsvrData" resultType="com.gunshi.project.xyt.model.StRsvrR">
|
||||
SELECT stb.stcd,
|
||||
r.tm stm
|
||||
r.tm stm,
|
||||
stb.source,
|
||||
stb.sttp
|
||||
FROM public.st_stbprp_b stb
|
||||
LEFT JOIN (SELECT *
|
||||
FROM (SELECT *, ROW_NUMBER() OVER ( PARTITION BY stcd ORDER BY tm DESC ) AS rn FROM public.st_rsvr_r) subquery
|
||||
WHERE subquery.rn = 1) r ON stb.stcd = r.stcd
|
||||
WHERE stb.source = 'SK'
|
||||
WHERE stb.source in ('QX', 'SW', 'SH')
|
||||
</select>
|
||||
</mapper>
|
||||
|
|
|
|||
Loading…
Reference in New Issue