From 9d9f3c4767f6db83f22729ab07ccb8cacd4f4b64 Mon Sep 17 00:00:00 2001 From: wany <13995595726@qq.com> Date: Fri, 21 Nov 2025 11:18:50 +0800 Subject: [PATCH] init --- pom.xml | 188 +++++++++++++++++ src/main/java/com/whdc/YjtApplication.java | 34 ++++ .../com/whdc/controller/WqRController.java | 29 +++ .../com/whdc/controller/WqRealController.java | 36 ++++ src/main/java/com/whdc/entity/QueryDto.java | 20 ++ src/main/java/com/whdc/entity/WqR.java | 50 +++++ src/main/java/com/whdc/entity/WqRReal.java | 49 +++++ src/main/java/com/whdc/mapper/WqRMapper.java | 17 ++ .../java/com/whdc/mapper/WqRealMapper.java | 17 ++ src/main/java/com/whdc/mqtt/Config.java | 9 + src/main/java/com/whdc/mqtt/MQTTConfig.java | 192 ++++++++++++++++++ src/main/java/com/whdc/mqtt/Pub.java | 115 +++++++++++ src/main/java/com/whdc/mqtt/ResultJson.java | 148 ++++++++++++++ src/main/java/com/whdc/mqtt/Sub.java | 133 ++++++++++++ .../java/com/whdc/service/WqRRealService.java | 22 ++ .../java/com/whdc/service/WqRService.java | 20 ++ .../whdc/service/impl/WqRRealServiceImp.java | 73 +++++++ .../com/whdc/service/impl/WqRServiceImp.java | 37 ++++ src/main/resources/application.yml | 35 ++++ 19 files changed, 1224 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/whdc/YjtApplication.java create mode 100644 src/main/java/com/whdc/controller/WqRController.java create mode 100644 src/main/java/com/whdc/controller/WqRealController.java create mode 100644 src/main/java/com/whdc/entity/QueryDto.java create mode 100644 src/main/java/com/whdc/entity/WqR.java create mode 100644 src/main/java/com/whdc/entity/WqRReal.java create mode 100644 src/main/java/com/whdc/mapper/WqRMapper.java create mode 100644 src/main/java/com/whdc/mapper/WqRealMapper.java create mode 100644 src/main/java/com/whdc/mqtt/Config.java create mode 100644 src/main/java/com/whdc/mqtt/MQTTConfig.java create mode 100644 src/main/java/com/whdc/mqtt/Pub.java create mode 100644 src/main/java/com/whdc/mqtt/ResultJson.java create mode 100644 src/main/java/com/whdc/mqtt/Sub.java create mode 100644 src/main/java/com/whdc/service/WqRRealService.java create mode 100644 src/main/java/com/whdc/service/WqRService.java create mode 100644 src/main/java/com/whdc/service/impl/WqRRealServiceImp.java create mode 100644 src/main/java/com/whdc/service/impl/WqRServiceImp.java create mode 100644 src/main/resources/application.yml diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2113a28 --- /dev/null +++ b/pom.xml @@ -0,0 +1,188 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.7.1 + + + + com.whdc + yjt-service + 1.0 + 颜家台 + + + 8 + 8 + 1.8 + target/release + + + + + + ali-maven + https://maven.aliyun.com/nexus/content/groups/public + + true + + + true + always + fail + + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-aop + + + com.baomidou + mybatis-plus-boot-starter + 3.5.2 + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-data-redis + + + + mysql + mysql-connector-java + 8.0.16 + + + + + + org.projectlombok + lombok + + + + + + com.alibaba + fastjson + 1.2.80 + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.0 + + + + + com.github.jeffreyning + mybatisplus-plus + 1.7.0-RELEASE + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.whdc.YjtApplication + + + + + repackage + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/whdc/YjtApplication.java b/src/main/java/com/whdc/YjtApplication.java new file mode 100644 index 0000000..ea8c0de --- /dev/null +++ b/src/main/java/com/whdc/YjtApplication.java @@ -0,0 +1,34 @@ +package com.whdc; + +import com.github.jeffreyning.mybatisplus.conf.EnableMPP; +import lombok.extern.slf4j.Slf4j; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.BeansException; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.servlet.config.annotation.EnableWebMvc; + +/** + * @author 李赛 + * @date 2022-06-26 0:17 + */ +@Slf4j +@EnableMPP +@EnableWebMvc +@EnableScheduling +@SpringBootApplication +@MapperScan("com.whdc.mapper") +public class YjtApplication { + public static void main(String[] args) { + try { + System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>> 启动程序 <<<<<<<<<<<<<<<<<<<<<<<<<<<"); + + SpringApplication.run(YjtApplication.class, args); + } catch (BeansException e) { + e.printStackTrace(); + log.error(e.getMessage(), e); + } + } + +} diff --git a/src/main/java/com/whdc/controller/WqRController.java b/src/main/java/com/whdc/controller/WqRController.java new file mode 100644 index 0000000..eabd6c6 --- /dev/null +++ b/src/main/java/com/whdc/controller/WqRController.java @@ -0,0 +1,29 @@ +package com.whdc.controller; + +import com.whdc.entity.QueryDto; +import com.whdc.entity.WqR; +import com.whdc.mqtt.ResultJson; +import com.whdc.service.WqRService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.util.List; + +@Slf4j +@RestController +@RequestMapping("/wq") +public class WqRController { + + @Resource + private WqRService service; + + + @PostMapping("/list") + public ResultJson> list(@RequestBody @Validated QueryDto queryDto ) { + return ResultJson.ok(service.getList(queryDto)); + + } + +} diff --git a/src/main/java/com/whdc/controller/WqRealController.java b/src/main/java/com/whdc/controller/WqRealController.java new file mode 100644 index 0000000..a7b404b --- /dev/null +++ b/src/main/java/com/whdc/controller/WqRealController.java @@ -0,0 +1,36 @@ +package com.whdc.controller; + +import com.whdc.entity.WqRReal; +import com.whdc.mqtt.ResultJson; +import com.whdc.service.WqRRealService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +@Slf4j +@RestController +@RequestMapping("/wq/real") +public class WqRealController { + + @Resource + private WqRRealService service; + + + @GetMapping("/list") + public ResultJson> list() { + return ResultJson.ok(service.getList()); + + } + + @GetMapping("/online") + public ResultJson< Map>> online() { + return ResultJson.ok(service.online()); + + } + +} diff --git a/src/main/java/com/whdc/entity/QueryDto.java b/src/main/java/com/whdc/entity/QueryDto.java new file mode 100644 index 0000000..63aaebe --- /dev/null +++ b/src/main/java/com/whdc/entity/QueryDto.java @@ -0,0 +1,20 @@ +package com.whdc.entity; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import java.util.Date; + +@Data +public class QueryDto { + + @NotBlank(message = "站点不能为空") + private String mn; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date startTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date endTime; +} diff --git a/src/main/java/com/whdc/entity/WqR.java b/src/main/java/com/whdc/entity/WqR.java new file mode 100644 index 0000000..da7c6f9 --- /dev/null +++ b/src/main/java/com/whdc/entity/WqR.java @@ -0,0 +1,50 @@ +package com.whdc.entity; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +import java.math.BigDecimal; +import java.time.LocalDateTime; + +/** + *

+ * 水质信息表 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Getter +@Setter +@TableName("wq_r") +@Data +public class WqR { + + @TableId(value = "mn") + private String mn; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + @TableField("tm") + private LocalDateTime tm; + + @TableField("cod") + private BigDecimal cod; + + @TableField("ad") + private BigDecimal ad; + + @TableField("ph") + private BigDecimal ph; + + @TableField("p") + private BigDecimal p; + + @TableField("cn") + private String cn; + +} diff --git a/src/main/java/com/whdc/entity/WqRReal.java b/src/main/java/com/whdc/entity/WqRReal.java new file mode 100644 index 0000000..36488bb --- /dev/null +++ b/src/main/java/com/whdc/entity/WqRReal.java @@ -0,0 +1,49 @@ +package com.whdc.entity; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +import java.math.BigDecimal; +import java.time.LocalDateTime; + +/** + *

+ * 水质实时信息表 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Getter +@Setter +@TableName("wq_r_real") +@Data +public class WqRReal { + + @TableId(value = "mn") + private String mn; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + @TableField("tm") + private LocalDateTime tm; + + @TableField("cod") + private BigDecimal cod; + + @TableField("ad") + private BigDecimal ad; + + @TableField("ph") + private BigDecimal ph; + + @TableField("p") + private BigDecimal p; + + @TableField("cn") + private String cn; +} diff --git a/src/main/java/com/whdc/mapper/WqRMapper.java b/src/main/java/com/whdc/mapper/WqRMapper.java new file mode 100644 index 0000000..52916a8 --- /dev/null +++ b/src/main/java/com/whdc/mapper/WqRMapper.java @@ -0,0 +1,17 @@ +package com.whdc.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.whdc.entity.WqR; +import org.apache.ibatis.annotations.Mapper; + +/** + *

+ * 水质信息表 Mapper 接口 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Mapper +public interface WqRMapper extends BaseMapper { +} diff --git a/src/main/java/com/whdc/mapper/WqRealMapper.java b/src/main/java/com/whdc/mapper/WqRealMapper.java new file mode 100644 index 0000000..db99d86 --- /dev/null +++ b/src/main/java/com/whdc/mapper/WqRealMapper.java @@ -0,0 +1,17 @@ +package com.whdc.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.whdc.entity.WqRReal; +import org.apache.ibatis.annotations.Mapper; + +/** + *

+ * 水质实时信息表 Mapper 接口 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Mapper +public interface WqRealMapper extends BaseMapper { +} diff --git a/src/main/java/com/whdc/mqtt/Config.java b/src/main/java/com/whdc/mqtt/Config.java new file mode 100644 index 0000000..d8eac28 --- /dev/null +++ b/src/main/java/com/whdc/mqtt/Config.java @@ -0,0 +1,9 @@ +package com.whdc.mqtt; + +public class Config { + public static final String USERNAME = "jiangling"; + public static final char[] PASSWORD = "Gsiot_890".toCharArray(); + + public static final String TOPIC_WQ_R = "/jiangling"; + +} diff --git a/src/main/java/com/whdc/mqtt/MQTTConfig.java b/src/main/java/com/whdc/mqtt/MQTTConfig.java new file mode 100644 index 0000000..9cd8230 --- /dev/null +++ b/src/main/java/com/whdc/mqtt/MQTTConfig.java @@ -0,0 +1,192 @@ +package com.whdc.mqtt; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.whdc.entity.WqR; +import com.whdc.entity.WqRReal; +import com.whdc.mapper.WqRMapper; +import com.whdc.mapper.WqRealMapper; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +@Configuration +@Slf4j +public class MQTTConfig { + + @Autowired + private WqRMapper wqRMapper; + + @Autowired + private WqRealMapper wqRealMapper; + + @Value("${mqtt.broker}") + private String broker; + + @Value("${mqtt.clientId}") + private String clientId; + + @Value("${enableMqttListening:true}") // 设置默认值 + private Boolean enable; + + private Sub sub; + + @Autowired + private ObjectMapper objectMapper; + + private static final DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + @PostConstruct + public void initMQTT() { + if (!enable) { + log.info("MQTT监听未启用"); + return; + } + + log.info("初始化MQTT连接 - broker: {}, clientId: {}", broker, clientId); + + try { + sub = new Sub(broker, clientId, Config.TOPIC_WQ_R); + sub.setOnMessageListener(new Sub.OnMessageListener() { + @Override + public void onMessage(String message) { + processMessage(message); + } + }); + + // 延迟连接,确保Spring完全启动 + new Thread(() -> { + try { + log.info("等待10秒后建立MQTT连接..."); + Thread.sleep(10000); + sub.connect(true); + log.info("MQTT连接初始化完成"); + } catch (MqttException e) { + log.error("MQTT连接失败: {}", e.getMessage(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("MQTT连接线程被中断"); + } + }, "MQTT-Connector").start(); + + } catch (Exception e) { + log.error("初始化MQTT配置失败: {}", e.getMessage(), e); + } + } + + private void processMessage(String message) { + try { + log.debug("收到MQTT消息: {}", message); + + JsonNode jsonNode = objectMapper.readTree(message); + + // 验证必需字段 + if (!jsonNode.has("DataTime") || !jsonNode.has("Mn")) { + log.warn("MQTT消息缺少必需字段: {}", message); + return; + } + + // 解析JSON数据 + String dataTimeStr = jsonNode.get("DataTime").asText(); + String mn = jsonNode.get("Mn").asText(); + + String cn = jsonNode.get("CN").asText(); + + // 解析水质参数 + BigDecimal cod = parseDecimalValue(jsonNode.get("w01018")); + BigDecimal ad = parseDecimalValue(jsonNode.get("w21003")); + BigDecimal p = parseDecimalValue(jsonNode.get("w21011")); + BigDecimal ph = parseDecimalValue(jsonNode.get("w01001")); + + // 转换时间格式 + LocalDateTime dataTime; + try { + dataTime = LocalDateTime.parse(dataTimeStr, formatter); + } catch (DateTimeParseException e) { + log.error("时间格式解析错误: {}, 原始数据: {}", dataTimeStr, message); + return; + } + + // 保存到历史数据表 + saveToWqR(mn, dataTime, cn, cod, ad, p, ph); + + // 更新实时数据表 + updateWqReal(mn, dataTime, cn, cod, ad, p, ph); + + log.info("成功处理水质数据 - 站点: {}, 时间: {}", mn, dataTime); + + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: {}, 原始数据: {}", e.getMessage(), message, e); + } + } + + private void saveToWqR(String mn, LocalDateTime dataTime, String cn, + BigDecimal cod, BigDecimal ad, BigDecimal p, BigDecimal ph) { + try { + WqR wqR = new WqR(); + wqR.setMn(mn); + wqR.setTm(dataTime); + wqR.setCod(cod); + wqR.setAd(ad); + wqR.setP(p); + wqR.setPh(ph); + wqR.setCn(cn); + + wqRMapper.insert(wqR); + log.debug("历史数据保存成功 - 站点: {}", mn); + } catch (Exception e) { + log.error("保存历史数据失败 - 站点: {}, 错误: {}", mn, e.getMessage(), e); + } + } + + private void updateWqReal(String mn, LocalDateTime dataTime, String cn, + BigDecimal cod, BigDecimal ad, BigDecimal p, BigDecimal ph) { + try { + WqRReal wqReal = wqRealMapper.selectById(mn); + if (wqReal == null) { + wqReal = new WqRReal(); + wqReal.setMn(mn); + wqReal.setTm(dataTime); + wqReal.setCod(cod); + wqReal.setAd(ad); + wqReal.setP(p); + wqReal.setPh(ph); + wqReal.setCn(cn); + wqRealMapper.insert(wqReal); + log.debug("新增实时数据 - 站点: {}", mn); + } else { + wqReal.setTm(dataTime); + wqReal.setCod(cod); + wqReal.setAd(ad); + wqReal.setP(p); + wqReal.setPh(ph); + wqReal.setCn(cn); + wqRealMapper.updateById(wqReal); + log.debug("更新实时数据 - 站点: {}", mn); + } + } catch (Exception e) { + log.error("更新实时数据失败 - 站点: {}, 错误: {}", mn, e.getMessage(), e); + } + } + + private BigDecimal parseDecimalValue(JsonNode node) { + if (node != null && !node.isNull() && !node.asText().isEmpty()) { + try { + return new BigDecimal(node.asText()); + } catch (NumberFormatException e) { + log.warn("解析数值失败: {}", node.asText()); + return null; + } + } + return null; + } +} \ No newline at end of file diff --git a/src/main/java/com/whdc/mqtt/Pub.java b/src/main/java/com/whdc/mqtt/Pub.java new file mode 100644 index 0000000..7c6efad --- /dev/null +++ b/src/main/java/com/whdc/mqtt/Pub.java @@ -0,0 +1,115 @@ +package com.whdc.mqtt; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class Pub { + private final String clientId; + private final String topic; + public static String BROKER; + private final int qos; + private MqttClient client; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public Pub(String clientId, String topic) { + this.clientId = clientId; + this.topic = topic; + this.qos = 0; + } + + public Pub(String clientId, String topic, int qos) { + this.clientId = clientId; + this.topic = topic; + this.qos = qos; + } + + public void connect() throws MqttException { + this.client = new MqttClient("tcp://120.24.5.249:3189", "mqttx_2b072a3c", new MemoryPersistence()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(Config.USERNAME); + connOpts.setPassword(Config.PASSWORD); + connOpts.setCleanSession(true); + this.client.connect(connOpts); + } + + public void close() { + try { + this.client.disconnectForcibly(200); + this.client.close(true); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + public void pub(String msg) throws MqttException { + MqttMessage message = new MqttMessage(msg.getBytes()); + message.setQos(qos); + this.client.publish(topic, message); + } + + public static void main(String[] args) { + try { + // 创建发布者 + Pub publisher = new Pub("mqttx_2b072a3c", Config.TOPIC_WQ_R, 1); + + // 连接到MQTT Broker + publisher.connect(); + System.out.println("MQTT发布者连接成功!"); + + // 创建测试数据 + String[] testData = { + createTestData("2025-11-20 10:00:00", "0000001", "61.3", "0.83", "1.4", "7.2"), + createTestData("2025-11-20 11:00:00", "0000002", "54.2", "0.99", "1.3", "7.5"), +// createTestData("2025-11-19 16:00:00", "0000003", "62.3", "0.78", "1.3", "7.5"), +// createTestData("2025-11-19 17:00:00", "0000001", "59.9", "0.83", "1.3", "7.7"), +// createTestData("2025-11-19 18:00:00", "0000002", "62.5", "0.85", "1.6", "7.8") + }; + + // 发送测试数据 + for (int i = 0; i < testData.length; i++) { + String data = testData[i]; + publisher.pub(data); + System.out.println("发送第 " + (i + 1) + " 条数据: " + data); + + // 间隔2秒发送一条 + Thread.sleep(2000); + } + + System.out.println("所有测试数据发送完成!"); + + // 保持连接,等待用户输入后关闭 + System.out.println("按回车键退出..."); + System.in.read(); + + publisher.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static String createTestData(String dataTime, String mn, String cod, String ad, String p, String ph) { + try { + ObjectNode json = objectMapper.createObjectNode(); + json.put("DataTime", dataTime); + json.put("Mn", mn); + json.put("CN", "2061"); + json.put("w01018", cod); // COD + json.put("w21003", ad); // 氨氮 + json.put("w21011", p); // 总磷 + json.put("w01001", ph); // PH + + return json.toString(); + } catch (Exception e) { + throw new RuntimeException("创建测试数据失败", e); + } + } +} + + diff --git a/src/main/java/com/whdc/mqtt/ResultJson.java b/src/main/java/com/whdc/mqtt/ResultJson.java new file mode 100644 index 0000000..4f0255c --- /dev/null +++ b/src/main/java/com/whdc/mqtt/ResultJson.java @@ -0,0 +1,148 @@ +package com.whdc.mqtt; + +import org.springframework.stereotype.Component; + +import java.io.Serializable; + +@Component +@SuppressWarnings("all") +public class ResultJson implements Serializable { + + private static final long serialVersionUID = 1L; + /** + * 成功 + */ + public static final Integer SUCCESS = 200; + public static final String SUCCESS_MSG = "success"; + + /** + * 失败 + */ + public static final Integer FAIL = 900; + public static final String FAIL_MSG = "fail"; + + public static final Integer PARAM_ERROR = 400; // 失败、参数错误等 + public static final Integer UNAUTHORIZED = 401; // 用户验证失败,或者用户验证信息过期 + public static final Integer PERMISSION_DENIED = 403; // 没有权限 + public static final Integer NOT_FOUND = 404; // 未找到资源 + public static final Integer METHOD_NOT_ALLOWED = 405; // 不支持的类型 + public static final Integer NSUPPORTED_MEDIA_TYPE = 415; // 不支持的媒体 + public static final Integer NOT_ALLOWED = 405; // 请求太频繁,同一个用户(token)、同一个url、同样的请求参数,请求间隔小于0.5秒 + public static final Integer SERVER_ERROR = 500; // 后台错误 + public static final Integer SRC_TIMEOUT = 504; // 请求资源超时 + + + private String msg; + + private Integer code; + + private T data; + + + public static ResultJson error() { + return error(FAIL, "未知异常,请联系管理员"); + } + + public static ResultJson error(String msg) { + return error(FAIL, msg); + } + + public static ResultJson error(int code, String msg) { + ResultJson r = new ResultJson(); + r.setCode(code); + r.setMsg(msg); + return r; + } + + public static ResultJson error(int code, String msg, Object data) { + ResultJson r = new ResultJson(); + r.setCode(code); + r.setMsg(msg); + r.setData(data); + return r; + } + + public static ResultJson ok(Integer code, String msg) { + ResultJson r = new ResultJson(); + r.setMsg(msg); + r.setCode(code); + return r; + } + + public static ResultJson ok() { + ResultJson r = new ResultJson(); + r.setCode(SUCCESS); + r.setMsg(SUCCESS_MSG); + return r; + } + + public static ResultJson ok(Object data) { + ResultJson r = new ResultJson(); + r.setCode(SUCCESS); + r.setMsg(SUCCESS_MSG); + r.setData(data); + return r; + } + + public static ResultJson ok(Object data, String msg) { + ResultJson r = new ResultJson(); + r.setData(data); + r.setCode(SUCCESS); + r.setMsg(msg); + return r; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public Integer getCode() { + return code; + } + + public void setCode(Integer code) { + this.code = code; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public static Integer getSUCCESS() { + return SUCCESS; + } + + public static String getSuccessMsg() { + return SUCCESS_MSG; + } + + public static Integer getFAIL() { + return FAIL; + } + + public static String getFailMsg() { + return FAIL_MSG; + } + + + @Override + public String toString() { + return "ResultJson{" + + "msg='" + msg + "'" + + ", code=" + code + + ", data=" + data + + '}'; + } +} diff --git a/src/main/java/com/whdc/mqtt/Sub.java b/src/main/java/com/whdc/mqtt/Sub.java new file mode 100644 index 0000000..83c549c --- /dev/null +++ b/src/main/java/com/whdc/mqtt/Sub.java @@ -0,0 +1,133 @@ +package com.whdc.mqtt; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Sub { + private final String clientId; + private final String topic; + private final String broker; + private OnMessageListener onMessageListener; + private MqttClient client; + + public Sub(String broker, String clientId, String topic) { + this.clientId = clientId; + this.topic = topic; + this.broker = broker; + } + + public void setOnMessageListener(OnMessageListener listener) { + this.onMessageListener = listener; + } + + public void connect() throws MqttException { + connect(true); + } + + public void connect(boolean autoReconnect) throws MqttException { + try { + // 使用唯一的客户端ID + String uniqueClientId = this.clientId + "_" + System.currentTimeMillis(); + + this.client = new MqttClient(broker, uniqueClientId, new MemoryPersistence()); + + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(Config.USERNAME); + connOpts.setPassword(Config.PASSWORD); // 修复:使用char数组 + connOpts.setCleanSession(true); + connOpts.setAutomaticReconnect(autoReconnect); + connOpts.setConnectionTimeout(30); // 设置连接超时 + connOpts.setKeepAliveInterval(60); // 设置心跳间隔 + + // 设置MQTT版本(重要) + connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1); + + this.client.setCallback(new MqttCallbackExtended() { + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("MQTT连接{}成功: {}", reconnect ? "重连" : "建立", serverURI); + } + + @Override + public void connectionLost(Throwable cause) { + log.error("MQTT连接丢失: {}", cause != null ? cause.getMessage() : "未知原因"); + if (cause != null) { + cause.printStackTrace(); + } + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + try { + byte[] payload = mqttMessage.getPayload(); + String msg = new String(payload, "UTF-8"); + log.debug("收到MQTT消息 - 主题: {}, QoS: {}", topic, mqttMessage.getQos()); + + if (onMessageListener != null) { + onMessageListener.onMessage(msg); + } + } catch (Throwable e) { + log.error("处理MQTT消息时发生错误: {}", e.getMessage(), e); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + // 订阅者不需要实现此方法 + } + }); + + log.info("正在连接到MQTT服务器: {}, 客户端ID: {}", broker, uniqueClientId); + this.client.connect(connOpts); + + // 使用QoS 1确保消息可靠传递 + this.client.subscribe(this.topic, 1); + log.info("成功订阅主题: {}", this.topic); + + } catch (MqttException e) { + log.error("MQTT连接失败: {}, 错误代码: {}", e.getMessage(), e.getReasonCode()); + throw e; + } + } + + public void close() { + try { + if (this.client != null && this.client.isConnected()) { + this.client.disconnect(); + this.client.close(); + log.info("MQTT客户端已关闭"); + } + } catch (MqttException e) { + log.error("关闭MQTT客户端时发生错误: {}", e.getMessage()); + } + } + + public boolean isConnected() { + return this.client != null && this.client.isConnected(); + } + + public interface OnMessageListener { + void onMessage(String message); + } + + public static void main(String[] args) { + try { + Sub sub = new Sub("tcp://120.24.5.249:3189", "test_client", Config.TOPIC_WQ_R); + sub.setOnMessageListener(new OnMessageListener() { + @Override + public void onMessage(String message) { + System.out.println("收到消息: " + message); + } + }); + sub.connect(true); + + // 保持程序运行 + Thread.sleep(300000); // 5分钟 + sub.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/whdc/service/WqRRealService.java b/src/main/java/com/whdc/service/WqRRealService.java new file mode 100644 index 0000000..d8530de --- /dev/null +++ b/src/main/java/com/whdc/service/WqRRealService.java @@ -0,0 +1,22 @@ +package com.whdc.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.whdc.entity.WqRReal; + +import java.util.List; +import java.util.Map; + +/** + *

+ * 水质信息表 服务类 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +public interface WqRRealService extends IService { + + List getList(); + + Map> online(); +} diff --git a/src/main/java/com/whdc/service/WqRService.java b/src/main/java/com/whdc/service/WqRService.java new file mode 100644 index 0000000..383ca57 --- /dev/null +++ b/src/main/java/com/whdc/service/WqRService.java @@ -0,0 +1,20 @@ +package com.whdc.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.whdc.entity.QueryDto; +import com.whdc.entity.WqR; + +import java.util.List; + +/** + *

+ * 水质信息表 服务类 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +public interface WqRService extends IService { + + List getList(QueryDto queryDto); +} diff --git a/src/main/java/com/whdc/service/impl/WqRRealServiceImp.java b/src/main/java/com/whdc/service/impl/WqRRealServiceImp.java new file mode 100644 index 0000000..42d8855 --- /dev/null +++ b/src/main/java/com/whdc/service/impl/WqRRealServiceImp.java @@ -0,0 +1,73 @@ +package com.whdc.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.whdc.entity.WqRReal; +import com.whdc.mapper.WqRealMapper; +import com.whdc.service.WqRRealService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +/** + *

+ * 水质信息表 服务实现类 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Service +public class WqRRealServiceImp extends ServiceImpl implements WqRRealService { + + @Value("${offlineThreshold}") + private Integer offlineThreshold; + + @Override + public List getList() { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.orderByDesc(WqRReal::getTm).orderByAsc(WqRReal::getMn); + return this.list(queryWrapper); + } + + @Override + public Map> online() { + List list = this.list(); + if(list != null && !list.isEmpty()) { + return list.stream() + .collect(Collectors.partitioningBy(device -> !isDeviceOffline(device), + // 对分组后的列表按时间倒序排序 + Collectors.collectingAndThen( + Collectors.toList(), + des -> des.stream() + .sorted(Comparator.comparing(WqRReal::getTm).reversed()) + .collect(Collectors.toList()) + ) + )); + } + return new HashMap<>(); + } + + public boolean isDeviceOffline(WqRReal device) { + return isDeviceOffline(device, offlineThreshold*3600*1000); + } + + public boolean isDeviceOffline(WqRReal device, long threshold) { + return isDeviceOffline(device, LocalDateTime.now(), threshold); + } + + private boolean isDeviceOffline(WqRReal device, LocalDateTime currentTime, long threshold) { + if (device == null || device.getTm() == null) { + return true; // 没有时间数据的设备视为离线 + } + + Duration duration = Duration.between(device.getTm(), currentTime); + long millis = duration.toMillis(); + + return millis >= threshold; + } +} diff --git a/src/main/java/com/whdc/service/impl/WqRServiceImp.java b/src/main/java/com/whdc/service/impl/WqRServiceImp.java new file mode 100644 index 0000000..b9143a3 --- /dev/null +++ b/src/main/java/com/whdc/service/impl/WqRServiceImp.java @@ -0,0 +1,37 @@ +package com.whdc.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.whdc.entity.QueryDto; +import com.whdc.entity.WqR; +import com.whdc.mapper.WqRMapper; +import com.whdc.service.WqRService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + *

+ * 水质信息表 服务实现类 + *

+ * + * @author wanyan + * @since 2023-11-01 + */ +@Service +public class WqRServiceImp extends ServiceImpl implements WqRService { + + @Override + public List getList(QueryDto queryDto) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(WqR::getMn, queryDto.getMn()); + if(queryDto.getStartTime() != null){ + queryWrapper.gt(WqR::getTm, queryDto.getStartTime()); + } + if(queryDto.getEndTime() != null){ + queryWrapper.lt(WqR::getTm, queryDto.getEndTime()); + } + queryWrapper.orderByDesc(WqR::getTm); + return this.list(queryWrapper); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..0863088 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,35 @@ +server: + port: 12117 + +spring: + #数据库配置 + datasource: + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://rm-wz9n28sq10rz5b0u2o.mysql.rds.aliyuncs.com:3306/shareddata?characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull&useInformationSchema=true&serverTimezone=GMT%2B8&autoReconnect=true + username: shareddata + password: SharedData_890 + servlet: + multipart: + max-file-size: 100MB + max-request-size: 100MB + + #jpa配置 + jpa: + properties: + hibernate: + dialect: org.hibernate.dialect.DmDialect + +logging: +# level: +# org.springframework.boot.autoconfigure: error #spring的自动装配日志只打error,否则debug输出的会打印很多自动装配的log信息到控制台 +# com.whdc.zhzmkzapi.mapper: error + config: classpath:logback-spring.xml +mybatis: + mapper-locations: classpath:mapper/*.xml + +mqtt: + broker: tcp://120.24.5.249:3189 + clientId: mqttx_c4d1c22b + +enableMqttListening: true +offlineThreshold: 24 \ No newline at end of file