1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| package com.example.demo.conf;
import jakarta.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration;
@Configuration public class MqttService {
@Value("${spring.mqtt.username}") private String username;
@Value("${spring.mqtt.password}") private String password;
@Value("${spring.mqtt.url}") private String hostUrl;
@Value("${spring.mqtt.client-id}") private String clientId;
private MqttClient client;
@PostConstruct public void init() throws MqttException { connect(); }
public void connect() throws MqttException { client = new MqttClient(hostUrl, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(20); options.setWill("topic/will", (clientId + "与服务器断开连接").getBytes(), 0, false);
client.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable throwable) { System.out.println("客户端断开连接"); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.printf("接收消息主题 : %s%n", topic); System.out.printf("接收消息Qos : %d%n", message.getQos()); System.out.printf("接收消息内容 : %s%n", new String(message.getPayload())); System.out.printf("接收消息retained : %b%n", message.isRetained()); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("消息发布成功"); System.out.println(iMqttDeliveryToken.toString()); } });
client.connect(options); }
public void disConnect() throws MqttException { client.disconnect(); }
public void subscribe(String topic, Integer qos) throws MqttException { client.subscribe(topic, qos); }
public void publish(String topic, Integer qos, Boolean retained, String message) throws MqttException { MqttTopic mqttTopic = client.getTopic(topic); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); MqttDeliveryToken token; token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } }
|