์คํ๋ง ๋ถํธ MQTT ํด๋ผ์ด์ธํธ ๋ฉ์์ง ์ฑ๋ ๊ตฌ์ฑํ๊ธฐ
์ ์ฃผ ์ ๊ธฐ์ฐจ ์ค์ฆ ์์คํ ์ ๊ฐ๋ฐํ๋ฉด์ ์ ๊ธฐ์ฐจ OBD ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ธฐ ์ํ์ฌ MQTT(Message Queuing Telemetry Transfer) ํด๋ผ์ด์ธํธ๋ฅผ ๊ตฌ์ฑํ๊ณ ํ ํฝ์ ๋ํ ๋ฉ์์ง ํ์ด๋ก๋๋ฅผ ์์ ํ๋ ๊ฒ์ ์ฒ์ ๊ตฌ์ฑํ์์ต๋๋ค.
๋ณธ ๊ธ์์๋ ์คํ๋ง MQTT๋ฅผ ํตํด MQTT ํด๋ผ์ด์ธํธ์ ๋ฉ์์ง ๊ตฌ๋ ๋๋ ๋ฐํํ๋ ์ฑ๋์ ๊ตฌ์ฑํ๋ ๊ฒ์ ์ค๋ช ํฉ๋๋ค.
์์กด์ฑ
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'
์คํ๋ง MQTT๋ Eclipse Paho MQTT Client ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํฉ๋๋ค.
org.springframework.integration:spring-integration-mqtt:5.3.2.RELEASE
org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4
Inbound Channel Configuration
ํ ํฝ ๋ฉ์์ง ๊ตฌ๋
์ ์ํ ์ธ๋ฐ์ด๋ ์ฑ๋ ๊ตฌ์ฑ์ MqttPahoMessageDrivenChannelAdapter
๊ตฌํ์ฒด๋ฅผ ํตํด ๊ฐ๋ฅํฉ๋๋ค.
DefaultMqttPahoClientFactory
๊ธฐ๋ณธ์ ์ผ๋ก DefaultMqttPahoClientFactory
๋ฅผ ํตํด MQTT ํด๋ผ์ด์ธํธ๋ฅผ ๋ฑ๋กํฉ๋๋ค. ๊ทธ๋ฌ๋ฏ๋ก ์ฐ๋ฆฌ๋ MQTT ์ฐ๊ฒฐ ์ ๋ณด(MqttConnectOptions)์ ์ค์ ํ DefaultMqttPahoClientFactory๋ฅผ ๋น์ผ๋ก ๋ฑ๋กํฉ๋๋ค.
@Configuration
public class MqttConfig {
private static final String MQTT_USERNAME = "username";
private static final String MQTT_PASSWORD = "password";
private MqttConnectOptions connectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(MQTT_USERNAME);
options.setPassword(MQTT_PASSWORD.toCharArray());
return options;
}
@Bean
public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
clientFactory.setConnectionOptions(connectOptions());
return clientFactory;
}
}
MqttPahoMessageDrivenChannelAdapter
์์ ๋ฑ๋กํ์๋ MQTT ํด๋ผ์ด์ธํธ๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๊ตฌ๋
ํ๊ธฐ ์ํ์ฌ MqttPahoMessageDrivenChannelAdapter
๋ฅผ ํตํด ๋ฉ์์ง ์์ ์ ์ํ ์ฑ๋์ ๊ตฌ์ฑํฉ๋๋ค.
@Configuration
public class MqttConfig {
private static final String BROKER_URL = "tcp://localhost:1883";
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
private static final String TOPIC_FILTER = "[PROTECT]";
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inboundChannel() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler inboundMessageHandler() {
return message -> {
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
System.out.println("Topic:" + topic);
System.out.println("Payload" + message.getPayload());
};
}
}
์ด์ MQTT ํด๋ผ์ด์ธํธ์ ์ํด ์์ ๋ ํ์ด๋ก๋๋ MQTT ํด๋ผ์ด์ธํธ โค Inbound Channel โค MessageChannel โค MessageHandler ์์ผ๋ก ์ด๋๋์ด MessageHandler๋ฅผ ํตํด ์์ ๋ ํ์ด๋ก๋๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.
Outbound Channel Configuration
์ ์ฃผ ์ ๊ธฐ์ฐจ ์ค์ฆ ์์คํ ์์๋ MQTT ํด๋ผ์ด์ธํธ๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ๊ฒ์ ๊ตฌํํ์ง ์์์ต๋๋ค. ๋ค๋ง, ๋ฉ์์ง๋ฅผ ์์ ํ๋ ์ฑ๋์ ๋ฑ๋กํ๋ ๊ฒ์ฒ๋ผ ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ธฐ ์ํ ์ฑ๋์ ๊ตฌ์ฑํ๋ฉด ๋ฉ๋๋ค.
MqttPahoMessageHandler
MQTT ํด๋ผ์ด์ธํธ๋ ์ด๋ฏธ ๊ตฌ์ฑ๋์์ผ๋ฏ๋ก MqttPahoMessageHandler
์ผ๋ก ๋ฉ์์ง ๋ฐํ์ ์ํ ์ฑ๋์ ๊ตฌ์ฑํฉ๋๋ค.
@Configuration
public class MqttConfig {
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
return messageHandler;
}
}
์ด์ @MessagingGateway ์ด๋ ธํ ์ด์ ์ ์ ์ธํ ๋ฉ์์ง ๊ฒ์ดํธ์จ์ด API๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ฐ์กํฉ๋๋ค.
@Configuration
public class MqttConfig {
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface OutboundGateway {
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}
}
MQTT Events
๋ง์ฝ, MQTT ํด๋ผ์ด์ธํธ๊ฐ ๋ธ๋ก์ปค ์๋ฒ์ ์ฐ๊ฒฐ์ ์คํจํ๊ฑฐ๋ ํ ํฝ ๊ตฌ๋ ์ ๊ฐ์งํ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๊ณ ์ถ๋ค๋ฉด ๋ค์์ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ์๊ธฐ ๋ฐ๋๋๋ค.
https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt-events