【笔记】SpringBoot项目整合MQTT

前言

SpringBoot项目整合MQTT

引入依赖

pom.xml
1
2
3
4
5
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.6</version>
</dependency>

添加配置

  • 自定义配置
src/main/resource/application.properties
1
2
3
4
spring.mqtt.url=tcp://127.0.0.1:1883
spring.mqtt.client-id=idea
spring.mqtt.username=admin
spring.mqtt.password=admin123

封装Service方法

  • 通过@PostConstruct实现Bean初始化后立即建立MQTT连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.example.demo.conf;

import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttService {

@Value("${spring.mqtt.username}")
private String username;

@Value("${spring.mqtt.password}")
private String password;

@Value("${spring.mqtt.url}")
private String hostUrl;

@Value("${spring.mqtt.client-id}")
private String clientId;

/**
* 客户端对象
*/
private MqttClient client;

/**
* Bean初始化后立即建立MQTT连接
*/
@PostConstruct
public void init() throws MqttException {
connect();
}

/**
* 建立MQTT连接
*/
public void connect() throws MqttException {
// 创建MQTT客户端对象
client = new MqttClient(hostUrl, clientId, new MemoryPersistence());

// 创建连接选项
MqttConnectOptions options = new MqttConnectOptions();
// 是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
// 设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
// 设置连接用户名
options.setUserName(username);
// 设置连接密码
options.setPassword(password.toCharArray());
// 设置超时时间,单位为秒
options.setConnectionTimeout(60);
// 设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("topic/will", (clientId + "与服务器断开连接").getBytes(), 0, false);

// 设置回调
client.setCallback(new MqttCallback() {
/**
* 客户端断开连接回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("客户端断开连接");
}

/**
* 接收消息回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.printf("接收消息主题 : %s%n", topic);
System.out.printf("接收消息Qos : %d%n", message.getQos());
System.out.printf("接收消息内容 : %s%n", new String(message.getPayload()));
System.out.printf("接收消息retained : %b%n", message.isRetained());
}

/**
* 发送消息回调
* Qos 0:发送消息后立即回调
* Qos 1:服务器收到消息后发送ack给客户端,客户端收到ack后回调
* Qos 2:服务器收到消息后发送ack给客户端,客户端收到ack后发送ack给服务器,服务器收到ack后回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("消息发布成功");
System.out.println(iMqttDeliveryToken.toString());
}
});

// 建立连接
client.connect(options);
}

/**
* 断开MQTT连接
*/
public void disConnect() throws MqttException {
client.disconnect();
}


/**
* 订阅主题
*/
public void subscribe(String topic, Integer qos) throws MqttException {
client.subscribe(topic, qos);
}

/**
* 发布消息
*/
public void publish(String topic, Integer qos, Boolean retained, String message) throws MqttException {
// 主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = client.getTopic(topic);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
// 提供一种机制来跟踪消息的传递进度
// 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
// 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
// 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
}

完成

参考文献

阿里云开发者社区——章为忠学架构