Skip to content

์†Œ๊ทœ๋ชจ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์„œ๋ฒ„์—์„œ ๋ ˆ๋””์Šค๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์„๋•Œ RabbitMQ ๋˜๋Š” Apache Kafka์™€ ๊ฐ™์€ ๋ฉ”์‹œ์ง€ ํ๋ฅผ ํ†ตํ•ด ๋น„๋™๊ธฐ ์ด๋ฒคํŠธ์— ๋Œ€ํ•œ ์š”๊ตฌ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ ๋ณ„๋„์˜ ๋ฉ”์‹œ์ง€ ํ ์†”๋ฃจ์…˜์„ ๋„์ž…ํ•˜๋Š” ๊ฒƒ์€ ์ธํ”„๋ผ ์ƒ ๊ณผ๋„ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ ๋ ˆ๋””์Šค์—์„œ ์ œ๊ณตํ•˜๋Š” ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ๊ธฐ๋ฐ˜์˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ ์Šคํ”„๋ง ๋ถ€ํŠธ ๊ธฐ๋ฐ˜์˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ธ ๊ฒฝ์šฐ Spring Data Redis์— ๋ ˆ๋””์Šค ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ๊ธฐ๋Šฅ์„ ํฌํ•จํ•˜๊ณ  ์žˆ์œผ๋ฏ€๋กœ ์‰ฝ๊ฒŒ ์ŠคํŠธ๋ฆผ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋“ฑ๋กํ•˜๊ณ  ์ปจ์Šˆ๋จธ๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๋„๋ก ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋ฉ”์‹œ์ง€ ์—†๋Š” ๋นˆ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑํ•˜๊ธฐ โ€‹

sh
XGROUP CREATE stream_key group_name $ MKSTREAM

๊ธฐ๋ณธ์ ์ธ ๋ ˆ๋””์Šค ๋ช…๋ น์–ด๋กœ๋Š” ์œ„์™€ ๊ฐ™์ด ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ์ƒ์„ฑํ•  ๋•Œ MKSTREAM ์˜ต์…˜์„ ์ง€์ •ํ•˜์—ฌ ์ŠคํŠธ๋ฆผ์ด ์กด์žฌํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ์— ์ŠคํŠธ๋ฆผ์ด ์ž๋™์œผ๋กœ ์ƒ์„ฑ๋˜๋„๋ก ํ•  ์ˆ˜ ์žˆ๋‹ค. Spring Data Redis ์—์„œ๋Š” RedisConnection ๊ณผ RedisStreamCommands ์ธํ„ฐํŽ˜์ด์Šค์— xGroupCreate ๋ผ๋Š” ํ•จ์ˆ˜๋กœ ์ •์˜๋˜์–ด ์žˆ์œผ๋ฏ€๋กœ ์•„๋ž˜์™€ ๊ฐ™์ด ๊ตฌํ˜„ํ•˜๋ฉด ๋œ๋‹ค.

java
String streamKey = "stream-1";
String groupName = "group-1";

if (Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
    redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.lastConsumed(), groupName);
} else {
    RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
    RedisStreamCommands streamCommands = connection.streamCommands();
    streamCommands.xGroupCreate(streamKey.getBytes(StandardCharsets.UTF_8), groupName, ReadOffset.latest(), true);
}

StreamMessageListenerContainer โ€‹

StreamMessageListenerContainer ์™€ StreamListener ๋ฅผ ํ™œ์šฉํ•˜๋ฉด RedisTemplate ์œผ๋กœ ์ŠคํŠธ๋ฆผ์„ ์ง์ ‘ ์ฝ์ง€ ์•Š์•„๋„ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

java
StreamMessageListenerContainer messageListenerContainer = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .hashKeySerializer(new StringRedisSerializer())
                .hashValueSerializer(new StringRedisSerializer())
                .build());

Consumer consumer = Consumer.from(groupName, consumerName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
Subscription subscription = messageListenerContainer.receiveAutoAck(consumer, streamOffset, record -> {
    System.out.println(record);
});
subscription.await(Duration.ofSeconds(1));

messageListenerContainer.start();

Released under the MIT License.