2024年3月24日发(作者:)
在SprintBoot使用MQTT
2018.4.7
在Sprintboot中使用MQTT分为三步:
第一步定义一个收发数据的类给程序的其他类使用。
public class XRemoteDevice {
private static XRemoteDevice
instance
= new XRemoteDevice();
public static XRemoteDevice getInstance() {
return
instance
;
}
public void receive(Object topic, Object message) {
System.
out
.println("Topic: " + topic);
System.
out
.println("Payload" + message);
}
public boolean send(String topic, String message) {
try {
XServerContext.
getGateway
().sendToMqtt(message, topic);
}
catch(Exception e) {
XLogger.
getInstance
().debug("Error When Sending " + e);
return false;
}
return true;
}
}
/**
* Created by Lenovo on 2018/4/4.
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data,@Header(MqttHeaders.
TOPIC
) String topic);
}
@Component
public class XServerContext implements ApplicationContextAware {
private static ApplicationContext
applicationContext
= null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws
BeansException {
if (XServerContext.
applicationContext
== null) {
XServerContext.
applicationContext
= applicationContext;
}
}
public static MyGateway getGateway() {
return
applicationContext
.getBean();
}
}
第二步在pom文件中引入MQTT。
第三步在springboot主程序main函数类中定义MQTT的BEAN
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
verURIs(ver());
rName(e());
sword(sword());
return factory;
}
////////////////////////////////////////////////////////////////////////////////////////
// The bean function (inbound)
////////////////////////////////////////////////////////////////////////////////////////
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(entId(), mqttClientFactory());
pletionTimeout(5000);
verter(new DefaultPahoMessageConverter());
putChannel(mqttInputChannel());
for(String s : ics()) {
ic(s, ());
}
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
XRemoteDevice.
getInstance
().receive(ders().get("mqtt_topic"),
load());
}
};
}
////////////////////////////////////////////////////////////////////////////////////////
// The bean function (outbound)
////////////////////////////////////////////////////////////////////////////////////////
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(entId(),
mqttClientFactory());
nc(true);
aultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}


发布评论