master
wany 2025-11-21 11:18:50 +08:00
commit 9d9f3c4767
19 changed files with 1224 additions and 0 deletions

188
pom.xml Normal file
View File

@ -0,0 +1,188 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.whdc</groupId>
<artifactId>yjt-service</artifactId>
<version>1.0</version>
<description>颜家台</description>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>1.8</java.version>
<copy.jar.directory>target/release</copy.jar.directory>
</properties>
<!-- 使用阿里 maven 库 -->
<repositories>
<repository>
<id>ali-maven</id>
<url>https://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- mysql start-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!-- mysql end-->
<!-- lombok start -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- lombok end -->
<!-- fastjson start -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!-- fastjson end-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<!-- mybatisplus-plus start -->
<dependency>
<groupId>com.github.jeffreyning</groupId>
<artifactId>mybatisplus-plus</artifactId>
<version>1.7.0-RELEASE</version>
</dependency>
<!-- mybatisplus-plus end -->
</dependencies>
<build>
<plugins>
<!-- 使用spring-boot-maven-plugin打包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.whdc.YjtApplication</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- <build>-->
<!-- <plugins>-->
<!-- &lt;!&ndash; 1.生成的jar中不要包含pom.xml和pom.properties这两个文件 &ndash;&gt;-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-jar-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <archive>-->
<!-- <addMavenDescriptor>false</addMavenDescriptor>-->
<!-- <manifest>-->
<!-- &lt;!&ndash; 是否要把第三方jar加入到类构建路径 &ndash;&gt;-->
<!-- <addClasspath>true</addClasspath>-->
<!-- &lt;!&ndash; 外部依赖jar包的最终位置 &ndash;&gt;-->
<!-- <classpathPrefix>lib/</classpathPrefix>-->
<!-- &lt;!&ndash; 项目启动类 &ndash;&gt;-->
<!-- <mainClass>com.whdc.YjtApplication</mainClass>-->
<!-- </manifest>-->
<!-- </archive>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- &lt;!&ndash;2.拷贝依赖到jar外面的lib目录&ndash;&gt;-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-dependency-plugin</artifactId>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>copy-lib</id>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>copy-dependencies</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <outputDirectory>${copy.jar.directory}/lib</outputDirectory>-->
<!-- <excludeTransitive>false</excludeTransitive>-->
<!-- <stripVersion>false</stripVersion>-->
<!-- <includeScope>runtime</includeScope>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- &lt;!&ndash; 3.把jar包拷贝到指定目录位置 &ndash;&gt;-->
<!-- <plugin>-->
<!-- <artifactId>maven-antrun-plugin</artifactId>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>copy</id>-->
<!-- <phase>package</phase>-->
<!-- <configuration>-->
<!-- <target>-->
<!-- <copy todir="${copy.jar.directory}">-->
<!-- <fileset dir="${project.build.directory}">-->
<!-- <include name="${project.artifactId}.${project.version}.jar"/>-->
<!-- </fileset>-->
<!-- </copy>-->
<!-- </target>-->
<!-- </configuration>-->
<!-- <goals>-->
<!-- <goal>run</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
</project>

View File

@ -0,0 +1,34 @@
package com.whdc;
import com.github.jeffreyning.mybatisplus.conf.EnableMPP;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
/**
* @author
* @date 2022-06-26 0:17
*/
@Slf4j
@EnableMPP
@EnableWebMvc
@EnableScheduling
@SpringBootApplication
@MapperScan("com.whdc.mapper")
public class YjtApplication {
public static void main(String[] args) {
try {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>> 启动程序 <<<<<<<<<<<<<<<<<<<<<<<<<<<");
SpringApplication.run(YjtApplication.class, args);
} catch (BeansException e) {
e.printStackTrace();
log.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,29 @@
package com.whdc.controller;
import com.whdc.entity.QueryDto;
import com.whdc.entity.WqR;
import com.whdc.mqtt.ResultJson;
import com.whdc.service.WqRService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@RestController
@RequestMapping("/wq")
public class WqRController {
@Resource
private WqRService service;
@PostMapping("/list")
public ResultJson<List<WqR>> list(@RequestBody @Validated QueryDto queryDto ) {
return ResultJson.ok(service.getList(queryDto));
}
}

View File

@ -0,0 +1,36 @@
package com.whdc.controller;
import com.whdc.entity.WqRReal;
import com.whdc.mqtt.ResultJson;
import com.whdc.service.WqRRealService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/wq/real")
public class WqRealController {
@Resource
private WqRRealService service;
@GetMapping("/list")
public ResultJson<List<WqRReal>> list() {
return ResultJson.ok(service.getList());
}
@GetMapping("/online")
public ResultJson< Map<Boolean, List<WqRReal>>> online() {
return ResultJson.ok(service.online());
}
}

View File

@ -0,0 +1,20 @@
package com.whdc.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import java.util.Date;
@Data
public class QueryDto {
@NotBlank(message = "站点不能为空")
private String mn;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
}

View File

@ -0,0 +1,50 @@
package com.whdc.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Getter
@Setter
@TableName("wq_r")
@Data
public class WqR {
@TableId(value = "mn")
private String mn;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@TableField("tm")
private LocalDateTime tm;
@TableField("cod")
private BigDecimal cod;
@TableField("ad")
private BigDecimal ad;
@TableField("ph")
private BigDecimal ph;
@TableField("p")
private BigDecimal p;
@TableField("cn")
private String cn;
}

View File

@ -0,0 +1,49 @@
package com.whdc.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Getter
@Setter
@TableName("wq_r_real")
@Data
public class WqRReal {
@TableId(value = "mn")
private String mn;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@TableField("tm")
private LocalDateTime tm;
@TableField("cod")
private BigDecimal cod;
@TableField("ad")
private BigDecimal ad;
@TableField("ph")
private BigDecimal ph;
@TableField("p")
private BigDecimal p;
@TableField("cn")
private String cn;
}

View File

@ -0,0 +1,17 @@
package com.whdc.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.whdc.entity.WqR;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Mapper
public interface WqRMapper extends BaseMapper<WqR> {
}

View File

@ -0,0 +1,17 @@
package com.whdc.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.whdc.entity.WqRReal;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Mapper
public interface WqRealMapper extends BaseMapper<WqRReal> {
}

View File

@ -0,0 +1,9 @@
package com.whdc.mqtt;
public class Config {
public static final String USERNAME = "jiangling";
public static final char[] PASSWORD = "Gsiot_890".toCharArray();
public static final String TOPIC_WQ_R = "/jiangling";
}

View File

@ -0,0 +1,192 @@
package com.whdc.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.whdc.entity.WqR;
import com.whdc.entity.WqRReal;
import com.whdc.mapper.WqRMapper;
import com.whdc.mapper.WqRealMapper;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
@Configuration
@Slf4j
public class MQTTConfig {
@Autowired
private WqRMapper wqRMapper;
@Autowired
private WqRealMapper wqRealMapper;
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${enableMqttListening:true}") // 设置默认值
private Boolean enable;
private Sub sub;
@Autowired
private ObjectMapper objectMapper;
private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@PostConstruct
public void initMQTT() {
if (!enable) {
log.info("MQTT监听未启用");
return;
}
log.info("初始化MQTT连接 - broker: {}, clientId: {}", broker, clientId);
try {
sub = new Sub(broker, clientId, Config.TOPIC_WQ_R);
sub.setOnMessageListener(new Sub.OnMessageListener() {
@Override
public void onMessage(String message) {
processMessage(message);
}
});
// 延迟连接确保Spring完全启动
new Thread(() -> {
try {
log.info("等待10秒后建立MQTT连接...");
Thread.sleep(10000);
sub.connect(true);
log.info("MQTT连接初始化完成");
} catch (MqttException e) {
log.error("MQTT连接失败: {}", e.getMessage(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("MQTT连接线程被中断");
}
}, "MQTT-Connector").start();
} catch (Exception e) {
log.error("初始化MQTT配置失败: {}", e.getMessage(), e);
}
}
private void processMessage(String message) {
try {
log.debug("收到MQTT消息: {}", message);
JsonNode jsonNode = objectMapper.readTree(message);
// 验证必需字段
if (!jsonNode.has("DataTime") || !jsonNode.has("Mn")) {
log.warn("MQTT消息缺少必需字段: {}", message);
return;
}
// 解析JSON数据
String dataTimeStr = jsonNode.get("DataTime").asText();
String mn = jsonNode.get("Mn").asText();
String cn = jsonNode.get("CN").asText();
// 解析水质参数
BigDecimal cod = parseDecimalValue(jsonNode.get("w01018"));
BigDecimal ad = parseDecimalValue(jsonNode.get("w21003"));
BigDecimal p = parseDecimalValue(jsonNode.get("w21011"));
BigDecimal ph = parseDecimalValue(jsonNode.get("w01001"));
// 转换时间格式
LocalDateTime dataTime;
try {
dataTime = LocalDateTime.parse(dataTimeStr, formatter);
} catch (DateTimeParseException e) {
log.error("时间格式解析错误: {}, 原始数据: {}", dataTimeStr, message);
return;
}
// 保存到历史数据表
saveToWqR(mn, dataTime, cn, cod, ad, p, ph);
// 更新实时数据表
updateWqReal(mn, dataTime, cn, cod, ad, p, ph);
log.info("成功处理水质数据 - 站点: {}, 时间: {}", mn, dataTime);
} catch (Exception e) {
log.error("处理MQTT消息时发生错误: {}, 原始数据: {}", e.getMessage(), message, e);
}
}
private void saveToWqR(String mn, LocalDateTime dataTime, String cn,
BigDecimal cod, BigDecimal ad, BigDecimal p, BigDecimal ph) {
try {
WqR wqR = new WqR();
wqR.setMn(mn);
wqR.setTm(dataTime);
wqR.setCod(cod);
wqR.setAd(ad);
wqR.setP(p);
wqR.setPh(ph);
wqR.setCn(cn);
wqRMapper.insert(wqR);
log.debug("历史数据保存成功 - 站点: {}", mn);
} catch (Exception e) {
log.error("保存历史数据失败 - 站点: {}, 错误: {}", mn, e.getMessage(), e);
}
}
private void updateWqReal(String mn, LocalDateTime dataTime, String cn,
BigDecimal cod, BigDecimal ad, BigDecimal p, BigDecimal ph) {
try {
WqRReal wqReal = wqRealMapper.selectById(mn);
if (wqReal == null) {
wqReal = new WqRReal();
wqReal.setMn(mn);
wqReal.setTm(dataTime);
wqReal.setCod(cod);
wqReal.setAd(ad);
wqReal.setP(p);
wqReal.setPh(ph);
wqReal.setCn(cn);
wqRealMapper.insert(wqReal);
log.debug("新增实时数据 - 站点: {}", mn);
} else {
wqReal.setTm(dataTime);
wqReal.setCod(cod);
wqReal.setAd(ad);
wqReal.setP(p);
wqReal.setPh(ph);
wqReal.setCn(cn);
wqRealMapper.updateById(wqReal);
log.debug("更新实时数据 - 站点: {}", mn);
}
} catch (Exception e) {
log.error("更新实时数据失败 - 站点: {}, 错误: {}", mn, e.getMessage(), e);
}
}
private BigDecimal parseDecimalValue(JsonNode node) {
if (node != null && !node.isNull() && !node.asText().isEmpty()) {
try {
return new BigDecimal(node.asText());
} catch (NumberFormatException e) {
log.warn("解析数值失败: {}", node.asText());
return null;
}
}
return null;
}
}

View File

@ -0,0 +1,115 @@
package com.whdc.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Pub {
private final String clientId;
private final String topic;
public static String BROKER;
private final int qos;
private MqttClient client;
private static final ObjectMapper objectMapper = new ObjectMapper();
public Pub(String clientId, String topic) {
this.clientId = clientId;
this.topic = topic;
this.qos = 0;
}
public Pub(String clientId, String topic, int qos) {
this.clientId = clientId;
this.topic = topic;
this.qos = qos;
}
public void connect() throws MqttException {
this.client = new MqttClient("tcp://120.24.5.249:3189", "mqttx_2b072a3c", new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(Config.USERNAME);
connOpts.setPassword(Config.PASSWORD);
connOpts.setCleanSession(true);
this.client.connect(connOpts);
}
public void close() {
try {
this.client.disconnectForcibly(200);
this.client.close(true);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void pub(String msg) throws MqttException {
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(qos);
this.client.publish(topic, message);
}
public static void main(String[] args) {
try {
// 创建发布者
Pub publisher = new Pub("mqttx_2b072a3c", Config.TOPIC_WQ_R, 1);
// 连接到MQTT Broker
publisher.connect();
System.out.println("MQTT发布者连接成功!");
// 创建测试数据
String[] testData = {
createTestData("2025-11-20 10:00:00", "0000001", "61.3", "0.83", "1.4", "7.2"),
createTestData("2025-11-20 11:00:00", "0000002", "54.2", "0.99", "1.3", "7.5"),
// createTestData("2025-11-19 16:00:00", "0000003", "62.3", "0.78", "1.3", "7.5"),
// createTestData("2025-11-19 17:00:00", "0000001", "59.9", "0.83", "1.3", "7.7"),
// createTestData("2025-11-19 18:00:00", "0000002", "62.5", "0.85", "1.6", "7.8")
};
// 发送测试数据
for (int i = 0; i < testData.length; i++) {
String data = testData[i];
publisher.pub(data);
System.out.println("发送第 " + (i + 1) + " 条数据: " + data);
// 间隔2秒发送一条
Thread.sleep(2000);
}
System.out.println("所有测试数据发送完成!");
// 保持连接,等待用户输入后关闭
System.out.println("按回车键退出...");
System.in.read();
publisher.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static String createTestData(String dataTime, String mn, String cod, String ad, String p, String ph) {
try {
ObjectNode json = objectMapper.createObjectNode();
json.put("DataTime", dataTime);
json.put("Mn", mn);
json.put("CN", "2061");
json.put("w01018", cod); // COD
json.put("w21003", ad); // 氨氮
json.put("w21011", p); // 总磷
json.put("w01001", ph); // PH
return json.toString();
} catch (Exception e) {
throw new RuntimeException("创建测试数据失败", e);
}
}
}

View File

@ -0,0 +1,148 @@
package com.whdc.mqtt;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
@SuppressWarnings("all")
public class ResultJson<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
*
*/
public static final Integer SUCCESS = 200;
public static final String SUCCESS_MSG = "success";
/**
*
*/
public static final Integer FAIL = 900;
public static final String FAIL_MSG = "fail";
public static final Integer PARAM_ERROR = 400; // 失败、参数错误等
public static final Integer UNAUTHORIZED = 401; // 用户验证失败,或者用户验证信息过期
public static final Integer PERMISSION_DENIED = 403; // 没有权限
public static final Integer NOT_FOUND = 404; // 未找到资源
public static final Integer METHOD_NOT_ALLOWED = 405; // 不支持的类型
public static final Integer NSUPPORTED_MEDIA_TYPE = 415; // 不支持的媒体
public static final Integer NOT_ALLOWED = 405; // 请求太频繁同一个用户token)、同一个url、同样的请求参数请求间隔小于0.5秒
public static final Integer SERVER_ERROR = 500; // 后台错误
public static final Integer SRC_TIMEOUT = 504; // 请求资源超时
private String msg;
private Integer code;
private T data;
public static ResultJson error() {
return error(FAIL, "未知异常,请联系管理员");
}
public static ResultJson error(String msg) {
return error(FAIL, msg);
}
public static ResultJson error(int code, String msg) {
ResultJson r = new ResultJson();
r.setCode(code);
r.setMsg(msg);
return r;
}
public static ResultJson error(int code, String msg, Object data) {
ResultJson r = new ResultJson();
r.setCode(code);
r.setMsg(msg);
r.setData(data);
return r;
}
public static ResultJson ok(Integer code, String msg) {
ResultJson r = new ResultJson();
r.setMsg(msg);
r.setCode(code);
return r;
}
public static ResultJson ok() {
ResultJson r = new ResultJson();
r.setCode(SUCCESS);
r.setMsg(SUCCESS_MSG);
return r;
}
public static ResultJson ok(Object data) {
ResultJson r = new ResultJson();
r.setCode(SUCCESS);
r.setMsg(SUCCESS_MSG);
r.setData(data);
return r;
}
public static ResultJson ok(Object data, String msg) {
ResultJson r = new ResultJson();
r.setData(data);
r.setCode(SUCCESS);
r.setMsg(msg);
return r;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public static Integer getSUCCESS() {
return SUCCESS;
}
public static String getSuccessMsg() {
return SUCCESS_MSG;
}
public static Integer getFAIL() {
return FAIL;
}
public static String getFailMsg() {
return FAIL_MSG;
}
@Override
public String toString() {
return "ResultJson{" +
"msg='" + msg + "'" +
", code=" + code +
", data=" + data +
'}';
}
}

View File

@ -0,0 +1,133 @@
package com.whdc.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Sub {
private final String clientId;
private final String topic;
private final String broker;
private OnMessageListener onMessageListener;
private MqttClient client;
public Sub(String broker, String clientId, String topic) {
this.clientId = clientId;
this.topic = topic;
this.broker = broker;
}
public void setOnMessageListener(OnMessageListener listener) {
this.onMessageListener = listener;
}
public void connect() throws MqttException {
connect(true);
}
public void connect(boolean autoReconnect) throws MqttException {
try {
// 使用唯一的客户端ID
String uniqueClientId = this.clientId + "_" + System.currentTimeMillis();
this.client = new MqttClient(broker, uniqueClientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(Config.USERNAME);
connOpts.setPassword(Config.PASSWORD); // 修复使用char数组
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(autoReconnect);
connOpts.setConnectionTimeout(30); // 设置连接超时
connOpts.setKeepAliveInterval(60); // 设置心跳间隔
// 设置MQTT版本重要
connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
this.client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT连接{}成功: {}", reconnect ? "重连" : "建立", serverURI);
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT连接丢失: {}", cause != null ? cause.getMessage() : "未知原因");
if (cause != null) {
cause.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
try {
byte[] payload = mqttMessage.getPayload();
String msg = new String(payload, "UTF-8");
log.debug("收到MQTT消息 - 主题: {}, QoS: {}", topic, mqttMessage.getQos());
if (onMessageListener != null) {
onMessageListener.onMessage(msg);
}
} catch (Throwable e) {
log.error("处理MQTT消息时发生错误: {}", e.getMessage(), e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 订阅者不需要实现此方法
}
});
log.info("正在连接到MQTT服务器: {}, 客户端ID: {}", broker, uniqueClientId);
this.client.connect(connOpts);
// 使用QoS 1确保消息可靠传递
this.client.subscribe(this.topic, 1);
log.info("成功订阅主题: {}", this.topic);
} catch (MqttException e) {
log.error("MQTT连接失败: {}, 错误代码: {}", e.getMessage(), e.getReasonCode());
throw e;
}
}
public void close() {
try {
if (this.client != null && this.client.isConnected()) {
this.client.disconnect();
this.client.close();
log.info("MQTT客户端已关闭");
}
} catch (MqttException e) {
log.error("关闭MQTT客户端时发生错误: {}", e.getMessage());
}
}
public boolean isConnected() {
return this.client != null && this.client.isConnected();
}
public interface OnMessageListener {
void onMessage(String message);
}
public static void main(String[] args) {
try {
Sub sub = new Sub("tcp://120.24.5.249:3189", "test_client", Config.TOPIC_WQ_R);
sub.setOnMessageListener(new OnMessageListener() {
@Override
public void onMessage(String message) {
System.out.println("收到消息: " + message);
}
});
sub.connect(true);
// 保持程序运行
Thread.sleep(300000); // 5分钟
sub.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,22 @@
package com.whdc.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.whdc.entity.WqRReal;
import java.util.List;
import java.util.Map;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
public interface WqRRealService extends IService<WqRReal> {
List<WqRReal> getList();
Map<Boolean, List<WqRReal>> online();
}

View File

@ -0,0 +1,20 @@
package com.whdc.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.whdc.entity.QueryDto;
import com.whdc.entity.WqR;
import java.util.List;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
public interface WqRService extends IService<WqR> {
List<WqR> getList(QueryDto queryDto);
}

View File

@ -0,0 +1,73 @@
package com.whdc.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.whdc.entity.WqRReal;
import com.whdc.mapper.WqRealMapper;
import com.whdc.service.WqRRealService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Service
public class WqRRealServiceImp extends ServiceImpl<WqRealMapper, WqRReal> implements WqRRealService {
@Value("${offlineThreshold}")
private Integer offlineThreshold;
@Override
public List<WqRReal> getList() {
LambdaQueryWrapper<WqRReal> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.orderByDesc(WqRReal::getTm).orderByAsc(WqRReal::getMn);
return this.list(queryWrapper);
}
@Override
public Map<Boolean, List<WqRReal>> online() {
List<WqRReal> list = this.list();
if(list != null && !list.isEmpty()) {
return list.stream()
.collect(Collectors.partitioningBy(device -> !isDeviceOffline(device),
// 对分组后的列表按时间倒序排序
Collectors.collectingAndThen(
Collectors.toList(),
des -> des.stream()
.sorted(Comparator.comparing(WqRReal::getTm).reversed())
.collect(Collectors.toList())
)
));
}
return new HashMap<>();
}
public boolean isDeviceOffline(WqRReal device) {
return isDeviceOffline(device, offlineThreshold*3600*1000);
}
public boolean isDeviceOffline(WqRReal device, long threshold) {
return isDeviceOffline(device, LocalDateTime.now(), threshold);
}
private boolean isDeviceOffline(WqRReal device, LocalDateTime currentTime, long threshold) {
if (device == null || device.getTm() == null) {
return true; // 没有时间数据的设备视为离线
}
Duration duration = Duration.between(device.getTm(), currentTime);
long millis = duration.toMillis();
return millis >= threshold;
}
}

View File

@ -0,0 +1,37 @@
package com.whdc.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.whdc.entity.QueryDto;
import com.whdc.entity.WqR;
import com.whdc.mapper.WqRMapper;
import com.whdc.service.WqRService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* <p>
*
* </p>
*
* @author wanyan
* @since 2023-11-01
*/
@Service
public class WqRServiceImp extends ServiceImpl<WqRMapper, WqR> implements WqRService {
@Override
public List<WqR> getList(QueryDto queryDto) {
LambdaQueryWrapper<WqR> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(WqR::getMn, queryDto.getMn());
if(queryDto.getStartTime() != null){
queryWrapper.gt(WqR::getTm, queryDto.getStartTime());
}
if(queryDto.getEndTime() != null){
queryWrapper.lt(WqR::getTm, queryDto.getEndTime());
}
queryWrapper.orderByDesc(WqR::getTm);
return this.list(queryWrapper);
}
}

View File

@ -0,0 +1,35 @@
server:
port: 12117
spring:
#数据库配置
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://rm-wz9n28sq10rz5b0u2o.mysql.rds.aliyuncs.com:3306/shareddata?characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull&useInformationSchema=true&serverTimezone=GMT%2B8&autoReconnect=true
username: shareddata
password: SharedData_890
servlet:
multipart:
max-file-size: 100MB
max-request-size: 100MB
#jpa配置
jpa:
properties:
hibernate:
dialect: org.hibernate.dialect.DmDialect
logging:
# level:
# org.springframework.boot.autoconfigure: error #spring的自动装配日志只打error否则debug输出的会打印很多自动装配的log信息到控制台
# com.whdc.zhzmkzapi.mapper: error
config: classpath:logback-spring.xml
mybatis:
mapper-locations: classpath:mapper/*.xml
mqtt:
broker: tcp://120.24.5.249:3189
clientId: mqttx_c4d1c22b
enableMqttListening: true
offlineThreshold: 24