์ œ์ฃผ ์ „๊ธฐ์ฐจ ์‹ค์ฆ ์‹œ์Šคํ…œ์„ ๊ฐœ๋ฐœํ•˜๋ฉด์„œ ์ „๊ธฐ์ฐจ OBD ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ธฐ ์œ„ํ•˜์—ฌ MQTT(Message Queuing Telemetry Transfer) ํด๋ผ์ด์–ธํŠธ๋ฅผ ๊ตฌ์„ฑํ•˜๊ณ  ํ† ํ”ฝ์— ๋Œ€ํ•œ ๋ฉ”์‹œ์ง€ ํŽ˜์ด๋กœ๋“œ๋ฅผ ์ˆ˜์‹ ํ•˜๋Š” ๊ฒƒ์„ ์ฒ˜์Œ ๊ตฌ์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค.

๋ณธ ๊ธ€์—์„œ๋Š” ์Šคํ”„๋ง MQTT๋ฅผ ํ†ตํ•ด MQTT ํด๋ผ์ด์–ธํŠธ์™€ ๋ฉ”์‹œ์ง€ ๊ตฌ๋… ๋˜๋Š” ๋ฐœํ–‰ํ•˜๋Š” ์ฑ„๋„์„ ๊ตฌ์„ฑํ•˜๋Š” ๊ฒƒ์„ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค.

์˜์กด์„ฑ

implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'

์Šคํ”„๋ง MQTT๋Š” Eclipse Paho MQTT Client ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

org.springframework.integration:spring-integration-mqtt:5.3.2.RELEASE
  org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4

Inbound Channel Configuration

ํ† ํ”ฝ ๋ฉ”์‹œ์ง€ ๊ตฌ๋…์„ ์œ„ํ•œ ์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ๊ตฌ์„ฑ์€ MqttPahoMessageDrivenChannelAdapter ๊ตฌํ˜„์ฒด๋ฅผ ํ†ตํ•ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

DefaultMqttPahoClientFactory

๊ธฐ๋ณธ์ ์œผ๋กœ DefaultMqttPahoClientFactory๋ฅผ ํ†ตํ•ด MQTT ํด๋ผ์ด์–ธํŠธ๋ฅผ ๋“ฑ๋กํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋ฏ€๋กœ ์šฐ๋ฆฌ๋Š” MQTT ์—ฐ๊ฒฐ ์ •๋ณด(MqttConnectOptions)์„ ์„ค์ •ํ•œ DefaultMqttPahoClientFactory๋ฅผ ๋นˆ์œผ๋กœ ๋“ฑ๋กํ•ฉ๋‹ˆ๋‹ค.

@Configuration
public class MqttConfig {
    private static final String MQTT_USERNAME = "username";
    private static final String MQTT_PASSWORD = "password";

    private MqttConnectOptions connectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(MQTT_USERNAME);
        options.setPassword(MQTT_PASSWORD.toCharArray());
        return options;
    }

    @Bean
    public DefaultMqttPahoClientFactory defaultMqttPahoClientFactory() {
        DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
        clientFactory.setConnectionOptions(connectOptions());
        return clientFactory;
    }
}

MqttPahoMessageDrivenChannelAdapter

์•ž์„œ ๋“ฑ๋กํ•˜์˜€๋˜ MQTT ํด๋ผ์ด์–ธํŠธ๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…ํ•˜๊ธฐ ์œ„ํ•˜์—ฌ MqttPahoMessageDrivenChannelAdapter๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€ ์ˆ˜์‹ ์„ ์œ„ํ•œ ์ฑ„๋„์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

@Configuration
public class MqttConfig {

    private static final String BROKER_URL = "tcp://localhost:1883";
    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
    private static final String TOPIC_FILTER = "[PROTECT]";

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inboundChannel() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler inboundMessageHandler() {
        return message -> {
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            System.out.println("Topic:" + topic);
            System.out.println("Payload" + message.getPayload());
        };
    }
}

์ด์ œ MQTT ํด๋ผ์ด์–ธํŠธ์— ์˜ํ•ด ์ˆ˜์‹ ๋œ ํŽ˜์ด๋กœ๋“œ๋Š” MQTT ํด๋ผ์ด์–ธํŠธ โžค Inbound Channel โžค MessageChannel โžค MessageHandler ์ˆœ์œผ๋กœ ์ด๋™๋˜์–ด MessageHandler๋ฅผ ํ†ตํ•ด ์ˆ˜์‹ ๋œ ํŽ˜์ด๋กœ๋“œ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Outbound Channel Configuration

์ œ์ฃผ ์ „๊ธฐ์ฐจ ์‹ค์ฆ ์‹œ์Šคํ…œ์—์„œ๋Š” MQTT ํด๋ผ์ด์–ธํŠธ๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ๊ฒƒ์€ ๊ตฌํ˜„ํ•˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๋‹ค๋งŒ, ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๋Š” ์ฑ„๋„์„ ๋“ฑ๋กํ•˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ธฐ ์œ„ํ•œ ์ฑ„๋„์„ ๊ตฌ์„ฑํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

MqttPahoMessageHandler

MQTT ํด๋ผ์ด์–ธํŠธ๋Š” ์ด๋ฏธ ๊ตฌ์„ฑ๋˜์—ˆ์œผ๋ฏ€๋กœ MqttPahoMessageHandler์œผ๋กœ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์„ ์œ„ํ•œ ์ฑ„๋„์„ ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

@Configuration
public class MqttConfig {
    
    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(DefaultMqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(MQTT_CLIENT_ID, clientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }
}

์ด์ œ @MessagingGateway ์–ด๋…ธํ…Œ์ด์…˜์„ ์„ ์–ธํ•œ ๋ฉ”์‹œ์ง€ ๊ฒŒ์ดํŠธ์›จ์ด API๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœ์†กํ•ฉ๋‹ˆ๋‹ค.

@Configuration
public class MqttConfig {

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface OutboundGateway {
        void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
    }
    
}

MQTT Events

๋งŒ์•ฝ, MQTT ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๋ธŒ๋กœ์ปค ์„œ๋ฒ„์— ์—ฐ๊ฒฐ์„ ์‹คํŒจํ•˜๊ฑฐ๋‚˜ ํ† ํ”ฝ ๊ตฌ๋…์„ ๊ฐ์ง€ํ•œ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ๋‹ค์Œ์˜ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ•˜์‹œ๊ธฐ ๋ฐ”๋ž๋‹ˆ๋‹ค.

https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt-events

์ฐธ๊ณ