2024年3月24日发(作者:)

在SprintBoot使用MQTT

2018.4.7

在Sprintboot中使用MQTT分为三步:

第一步定义一个收发数据的类给程序的其他类使用。

public class XRemoteDevice {

private static XRemoteDevice

instance

= new XRemoteDevice();

public static XRemoteDevice getInstance() {

return

instance

;

}

public void receive(Object topic, Object message) {

System.

out

.println("Topic: " + topic);

System.

out

.println("Payload" + message);

}

public boolean send(String topic, String message) {

try {

XServerContext.

getGateway

().sendToMqtt(message, topic);

}

catch(Exception e) {

XLogger.

getInstance

().debug("Error When Sending " + e);

return false;

}

return true;

}

}

/**

* Created by Lenovo on 2018/4/4.

*/

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")

public interface MyGateway {

void sendToMqtt(String data,@Header(MqttHeaders.

TOPIC

) String topic);

}

@Component

public class XServerContext implements ApplicationContextAware {

private static ApplicationContext

applicationContext

= null;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws

BeansException {

if (XServerContext.

applicationContext

== null) {

XServerContext.

applicationContext

= applicationContext;

}

}

public static MyGateway getGateway() {

return

applicationContext

.getBean();

}

}

第二步在pom文件中引入MQTT。

spring-boot-starter-integration

ation

spring-integration-stream

ation

spring-integration-mqtt

第三步在springboot主程序main函数类中定义MQTT的BEAN

@Bean

public MqttPahoClientFactory mqttClientFactory() {

DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

verURIs(ver());

rName(e());

sword(sword());

return factory;

}

////////////////////////////////////////////////////////////////////////////////////////

// The bean function (inbound)

////////////////////////////////////////////////////////////////////////////////////////

@Bean

public MessageChannel mqttInputChannel() {

return new DirectChannel();

}

@Bean

public MessageProducer inbound() {

MqttPahoMessageDrivenChannelAdapter adapter =

new MqttPahoMessageDrivenChannelAdapter(entId(), mqttClientFactory());

pletionTimeout(5000);

verter(new DefaultPahoMessageConverter());

putChannel(mqttInputChannel());

for(String s : ics()) {

ic(s, ());

}

return adapter;

}

@Bean

@ServiceActivator(inputChannel = "mqttInputChannel")

public MessageHandler handler() {

return new MessageHandler() {

@Override

public void handleMessage(Message message) throws MessagingException {

XRemoteDevice.

getInstance

().receive(ders().get("mqtt_topic"),

load());

}

};

}

////////////////////////////////////////////////////////////////////////////////////////

// The bean function (outbound)

////////////////////////////////////////////////////////////////////////////////////////

@Bean

@ServiceActivator(inputChannel = "mqttOutboundChannel")

public MessageHandler mqttOutbound() {

MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(entId(),

mqttClientFactory());

nc(true);

aultTopic("testTopic");

return messageHandler;

}

@Bean

public MessageChannel mqttOutboundChannel() {

return new DirectChannel();

}