package com.gunshi.project.ss.timetask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.gunshi.db.dto.DateTimeRangeSo; import com.gunshi.project.ss.common.model.*; import com.gunshi.project.ss.common.model.so.OsmoticQuerySo; import com.gunshi.project.ss.common.model.vo.OsmoticShiftValueVo2; import com.gunshi.project.ss.common.util.LocalDateTimeConverter; import com.gunshi.project.ss.entity.dto.ProjectSafeCalculateDto; import com.gunshi.project.ss.entity.vo.ProjectSafeCalculateVo; import com.gunshi.project.ss.mapper.JcskGnssREightAmMapper; import com.gunshi.project.ss.common.mapper.JcskSlBMapper; import com.gunshi.project.ss.mapper.JcskSlREightAmMapper; import com.gunshi.project.ss.mapper.JcskSyREightAmMapper; import com.gunshi.project.ss.model.*; import com.gunshi.project.ss.service.*; import com.gunshi.project.ss.util.DateTransforUtil; import com.gunshi.project.ss.util.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @EnableScheduling//开启定时任务 @Component @Slf4j @Profile({"prod","dev","local"}) public class JcskDataTask { @Autowired private JcskGnssREightAmMapper jcskGnssREightAmMapper; @Autowired private JcskGnssRService jcskGnssRService; @Autowired private JcskGnssBService service; @Autowired private JcskSyREightAmMapper jcskSyREightAmMapper; @Autowired private JcskSyRService jcskSyRService; @Autowired private JcskSyBService jcskSyBService; @Autowired private JcskSlBMapper jcskSlBMapper; @Autowired private JcskSlREightAmMapper jcskSlREightAmMapper; @Autowired private JcskSlBService jcskSlBService; @Autowired private OsmoticWarnRuleService osmoticWarnRuleService; @Autowired private JcskSlRService jcskSlRService; private static final Integer PRESS = 1; public static final Integer FLOW = 2; public static final Integer GNSS = 3; // @Scheduled(cron = "0 30 8 * * ?") @Async public void calculate(){ List dvcds = jcskSyBService.getAllDvcd(); for (String dvcd : dvcds) { ProjectSafeCalculateDto dto = new ProjectSafeCalculateDto(); LocalDateTime now = LocalDateTime.now().withHour(23).withMinute(59).withSecond(59).withNano(0); LocalDateTime lastYear = now.minusYears(1); lastYear = lastYear.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0); DateTimeRangeSo dateTimeRangeSo = new DateTimeRangeSo(); dateTimeRangeSo.setStart(LocalDateTimeConverter.toDate(lastYear)); dateTimeRangeSo.setEnd(LocalDateTimeConverter.toDate(now)); dto.setDateTimeRangeSo(dateTimeRangeSo); dto.setDvcd(dvcd); ProjectSafeCalculateVo ans = jcskSyRService.calculate(dto); } } // /** // * 每5分钟扫描渗流。渗压、位移表 生成预警 // */ // @Scheduled(cron = "0 */30 * * * ?") // @Async public void syncWarn(){ // 获取昨天的时间 Date yesterday = getYesterday(); Date syMaxTm = osmoticWarnRuleService.queryMaxTmByType(PRESS); if(syMaxTm==null){ syMaxTm = yesterday; } Date slMaxTm = osmoticWarnRuleService.queryMaxTmByType(FLOW); if(slMaxTm==null){ slMaxTm = yesterday; } Date gnssMaxTm = osmoticWarnRuleService.queryMaxTmByType(GNSS); if(gnssMaxTm==null){ gnssMaxTm = yesterday; } List syList = jcskSyRService.lambdaQuery().gt(JcskSyR::getMstm, syMaxTm).list(); List slList = jcskSlRService.lambdaQuery().gt(JcskSlR::getMstm, slMaxTm).list(); List gnssList = jcskGnssRService.lambdaQuery().gt(JcskGnssR::getTm, gnssMaxTm).list(); for (JcskSyR jcskSyR : syList) { osmoticWarnRuleService.checkWarn(jcskSyR); } for (JcskSlR jcskSlR : slList) { osmoticWarnRuleService.checkWarn(jcskSlR); } for (JcskGnssR jcskGnssR : gnssList) { osmoticWarnRuleService.checkWarn(jcskGnssR); } } private Date getYesterday() { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.DAY_OF_YEAR, -1); return calendar.getTime(); } /** * 同步渗流每日八点的数据 */ // @Scheduled(cron = "*/10 * * * * ?") // @Scheduled(cron = "0 17 8 * * ?") // @Async public void syncSlREightAmData() { OsmoticQuerySo osmoticQuerySo = new OsmoticQuerySo(); DateTimeRangeSo so = new DateTimeRangeSo(); List list = jcskSlBService.lambdaQuery().list(); List cds = list.stream().map(JcskSlB::getDvcd).collect(Collectors.toList());//获取所有的测点编码(仪器编号) osmoticQuerySo.setStationCodes(cds); //获取整编表数量 Long count = jcskSlREightAmMapper.selectCount(null); List saveDatas = new ArrayList<>(); List> syncData = new ArrayList<>(); if (count > 0) { //增量同步 //取最新的一个数据的时间,按每个测站测点分别同步 for (JcskSlB jcskSlB : list) { String mpcd = jcskSlB.getMpcd(); String dvcd = jcskSlB.getDvcd(); osmoticQuerySo.setStationCodes(Arrays.asList(dvcd)); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(JcskSlREightAm::getMpcd, mpcd) .orderByDesc(JcskSlREightAm::getMstm).last("limit 1"); JcskSlREightAm jcskGnssREightAm = jcskSlREightAmMapper.selectOne(wrapper); Calendar calendar = Calendar.getInstance(); if (jcskGnssREightAm != null) { // 如果存在最新数据,从最新时间的第二天开始同步 LocalDateTime latestDate = jcskGnssREightAm.getMstm(); calendar.setTime(DateTransforUtil.transforLocalDateTimeToDate(latestDate)); calendar.add(Calendar.DAY_OF_MONTH, 1); // 加1天 } else { // 如果为空,设置同步时间为前一个星期 calendar.add(Calendar.DAY_OF_MONTH, -7); // 减7天 } // 设置时间范围为8:00:00开始 calendar.set(Calendar.HOUR_OF_DAY, 8); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); Date start = calendar.getTime(); so.setStart(start); so.setEnd(new Date()); osmoticQuerySo.setDateTimeRangeSo(so); List osmoticValueVo2s = jcskSlBMapper.syncqueryValue(osmoticQuerySo); if(!osmoticValueVo2s.isEmpty()){ syncData.add(osmoticValueVo2s); } } } else { //全量同步 osmoticQuerySo.setStationCodes(cds); List osmoticValueVo2s = jcskSlBMapper.syncqueryValue(osmoticQuerySo); if(!osmoticValueVo2s.isEmpty()){ syncData.add(osmoticValueVo2s); } } // 使用 DateTimeFormatter 替代 SimpleDateFormat DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd"); DateTimeFormatter formatter3 = DateTimeFormatter.ofPattern("HH:mm"); if(syncData.size() > 0){ for (List osmoticValueVo2s : syncData) { //筛选出每天每个测点8-9点的数据 List closestTo8AMList = osmoticValueVo2s.stream() .filter(vo -> { // 使用 DateTimeFormatter 格式化 LocalDateTime String timeStr = vo.getMstm().format(formatter3); return timeStr.compareTo("08:00") >= 0 && timeStr.compareTo("09:00") <=0; }) .collect(Collectors.groupingBy(vo -> { // 分组键:日期(yyyy-MM-dd) + 测站编码 + 测点编码 String date = vo.getMstm().format(formatter2); return date + "_" + vo.getMpcd(); })) .values().stream() .map(group -> group.stream() .min((vo1, vo2) -> { String time1 = vo1.getMstm().format(formatter3); String time2 = vo2.getMstm().format(formatter3); // 计算与08:00的时间差(分钟数) int diff1 = Math.abs( Integer.parseInt(time1.substring(0, 2)) * 60 + Integer.parseInt(time1.substring(3, 5)) - 8 * 60 ); int diff2 = Math.abs( Integer.parseInt(time2.substring(0, 2)) * 60 + Integer.parseInt(time2.substring(3, 5)) - 8 * 60 ); //获取最接近于八点的数据 return Integer.compare(diff1, diff2); }) .orElse(null) ) .filter(Objects::nonNull) .collect(Collectors.toList()); // 处理筛选后的数据 for (JcskSlR data : closestTo8AMList) { JcskSlREightAm entity = new JcskSlREightAm(); BeanUtils.copyProperties(data, entity); saveDatas.add(entity); } if(saveDatas.size() > 0){ jcskSlREightAmMapper.insert(saveDatas); } saveDatas.clear(); } } } /** * 同步渗压每日八点的数据 */ //@Scheduled(cron = "*/10 * * * * ?") // @Scheduled(cron = "0 17 8 * * ?") // @Async public void syncSyREightAmData() { OsmoticQuerySo osmoticQuerySo = new OsmoticQuerySo(); DateTimeRangeSo so = new DateTimeRangeSo(); List list = jcskSyBService.lambdaQuery().list(); List cds = list.stream().map(JcskSyB::getDvcd).collect(Collectors.toList()); //获取整编表数量 Long count = jcskSyREightAmMapper.selectCount(null); List saveDatas = new ArrayList<>(); List> syncData = new ArrayList<>(); if (count > 0) { //增量同步 //取最新的一个数据的时间 for (JcskSyB jcskSyB : list) { String stcd = jcskSyB.getStcd(); String mpcd = jcskSyB.getMpcd(); String dvcd = jcskSyB.getDvcd(); osmoticQuerySo.setStationCodes(Arrays.asList(dvcd)); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(JcskSyREightAm::getStcd, stcd) .eq(JcskSyREightAm::getMpcd, mpcd) .orderByDesc(JcskSyREightAm::getMstm).last("limit 1"); JcskSyREightAm jcskGnssREightAm = jcskSyREightAmMapper.selectOne(wrapper); LocalDateTime startTime; if (jcskGnssREightAm != null) { // 如果存在最新数据,从最新时间的第二天开始同步 startTime = jcskGnssREightAm.getMstm().plusDays(1); // 加1天 } else { // 如果为空,设置同步时间为前一个星期 startTime = LocalDateTime.now().minusDays(7); // 减7天 } // 设置时间范围为当天的8:00:00开始 startTime = startTime.withHour(8).withMinute(0).withSecond(0).withNano(0); so.setStart(DateTransforUtil.transforLocalDateTimeToDate(startTime)); so.setEnd(DateTransforUtil.transforLocalDateTimeToDate(LocalDateTime.now())); osmoticQuerySo.setDateTimeRangeSo(so); List osmoticValueVo2s = jcskSyRService.getBaseMapper().syncqueryValue(osmoticQuerySo); if(!osmoticValueVo2s.isEmpty()){ syncData.add(osmoticValueVo2s); } } } else { //全量同步 osmoticQuerySo.setStationCodes(cds); List osmoticValueVo2s = jcskSyRService.getBaseMapper().syncqueryValue(osmoticQuerySo); if(!osmoticValueVo2s.isEmpty()){ syncData.add(osmoticValueVo2s); } } // 使用DateTimeFormatter替代SimpleDateFormat DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm"); if(!syncData.isEmpty()){ for (List osmoticValueVo2s : syncData) { //筛选出每天每个测点8-9点的数据 List closestTo8AMList = osmoticValueVo2s.stream() .filter(vo -> { // 使用LocalDateTime直接获取小时和分钟 int hour = vo.getMstm().getHour(); int minute = vo.getMstm().getMinute(); // 过滤8:00到9:00之间的数据 return (hour == 8 && minute >= 0) || (hour == 9 && minute == 0); }) .collect(Collectors.groupingBy(vo -> { // 分组键:日期(yyyy-MM-dd) + 测站编码 + 测点编码 String date = vo.getMstm().format(dateFormatter); return date + "_" + vo.getStcd() + "_" + vo.getMpcd(); })) .values().stream() .map(group -> group.stream() .min((vo1, vo2) -> { // 计算与08:00的时间差(分钟数) int diff1 = Math.abs( vo1.getMstm().getHour() * 60 + vo1.getMstm().getMinute() - 8 * 60 ); int diff2 = Math.abs( vo2.getMstm().getHour() * 60 + vo2.getMstm().getMinute() - 8 * 60 ); //获取最接近于八点的数据 return Integer.compare(diff1, diff2); }) .orElse(null) ) .filter(Objects::nonNull) .collect(Collectors.toList()); // 处理筛选后的数据 for (JcskSyR data : closestTo8AMList) { JcskSyREightAm entity = new JcskSyREightAm(); BeanUtils.copyProperties(data, entity); entity.setMstm(data.getMstm()); // 直接设置LocalDateTime saveDatas.add(entity); } if(!saveDatas.isEmpty()){ // 批量插入,注意检查MyBatis-Plus是否支持批量插入 for (JcskSyREightAm entity : saveDatas) { jcskSyREightAmMapper.insert(entity); } // 或者使用批量插入方法(如果配置了) // jcskSyREightAmMapper.insertBatch(saveDatas); } saveDatas.clear(); } } } /** * 同步位移监测每天最接近八点的数据 */ //@Scheduled(cron = "*/30 * * * * ?") // @Async // @Scheduled(cron = "0 17 8 * * ?") public void syncGnssREightAmData() { OsmoticQuerySo osmoticQuerySo = new OsmoticQuerySo(); DateTimeRangeSo so = new DateTimeRangeSo(); List list = service.lambdaQuery().list(); List cds = list.stream().map(JcskGnssB::getCd).collect(Collectors.toList());//获取所有的测点编码 //获取整编表数量 Long count = jcskGnssREightAmMapper.selectCount(null); List saveDatas = new ArrayList<>(); List> syncData = new ArrayList<>(); if (count > 0) { //增量同步 //取最新的一个数据的时间 //获取每个cd的最新时间按cd进行同步 for (String cd : cds) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(JcskGnssREightAm::getCd, cd) .orderByDesc(JcskGnssREightAm::getTm).last("limit 1"); osmoticQuerySo.setStationCodes(Arrays.asList(cd)); JcskGnssREightAm jcskGnssREightAm = jcskGnssREightAmMapper.selectOne(wrapper); Calendar calendar = Calendar.getInstance(); if(jcskGnssREightAm != null){ // 如果有历史数据,从最新时间的第二天开始同步 String latestTm = jcskGnssREightAm.getTm(); Date latestDate = DateUtil.convertStringToDate(latestTm); calendar.setTime(latestDate); calendar.add(Calendar.DAY_OF_MONTH, 1); // 加1天 } else { // 如果没有同步过数据,同步一个星期以前的数据 calendar.add(Calendar.DAY_OF_MONTH, -7); // 减7天 } // 设置时间范围为8:00:00开始 calendar.set(Calendar.HOUR_OF_DAY, 8); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); Date start = calendar.getTime(); so.setStart(start); so.setEnd(new Date()); osmoticQuerySo.setDateTimeRangeSo(so); List shiftValueVo2s = jcskGnssRService.getBaseMapper().queryValue(osmoticQuerySo); if(!shiftValueVo2s.isEmpty()){ syncData.add(shiftValueVo2s); } } } else { //全量同步 osmoticQuerySo.setStationCodes(cds); List shiftValueVo2s = jcskGnssRService.getBaseMapper().queryValue(osmoticQuerySo); if(!shiftValueVo2s.isEmpty()){ syncData.add(shiftValueVo2s); } } if(syncData.size()>0){ //筛选出每天每个测点8-9点的数据 for (List shiftValueVo2s : syncData) { List closestTo8AMList = shiftValueVo2s.stream() .filter(vo -> { // 假设OsmoticShiftValueVo2中有getTm方法返回时间字符串 String timePart = vo.getTm().substring(11, 16); // 提取HH:MM部分 return timePart.compareTo("08:00") >= 0 && timePart.compareTo("09:00") <=0; }) .collect(Collectors.groupingBy(vo -> { // 分组键:日期(yyyy-MM-dd) + 测点编码 // 假设OsmoticShiftValueVo2中有getCd方法返回测点编码 String date = vo.getTm().substring(0, 10); // 提取日期部分 return date + "_" + vo.getCd(); })) .values().stream() .map(group -> group.stream() .min((vo1, vo2) -> { String time1 = vo1.getTm().substring(11, 16); String time2 = vo2.getTm().substring(11, 16); // 计算与08:00的时间差(分钟数) int diff1 = Math.abs( Integer.parseInt(time1.substring(0, 2)) * 60 + Integer.parseInt(time1.substring(3, 5)) - 8 * 60 ); int diff2 = Math.abs( Integer.parseInt(time2.substring(0, 2)) * 60 + Integer.parseInt(time2.substring(3, 5)) - 8 * 60 ); //获取最接近于八点的数据 return Integer.compare(diff1, diff2); }) .orElse(null) ) .filter(Objects::nonNull) .collect(Collectors.toList()); // 处理筛选后的数据 for (OsmoticShiftValueVo2 data : closestTo8AMList) { JcskGnssREightAm entity = new JcskGnssREightAm(); BeanUtils.copyProperties(data, entity); saveDatas.add(entity); } if(saveDatas.size()>0){ jcskGnssREightAmMapper.insert(saveDatas); } saveDatas.clear(); } } } }