์๊ท๋ชจ ์ ํ๋ฆฌ์ผ์ด์ ์๋ฒ์์ ๋ ๋์ค๋ฅผ ์ฌ์ฉํ๊ณ ์์๋ RabbitMQ ๋๋ Apache Kafka์ ๊ฐ์ ๋ฉ์์ง ํ๋ฅผ ํตํด ๋น๋๊ธฐ ์ด๋ฒคํธ์ ๋ํ ์๊ตฌ๊ฐ ํ์ํ ๊ฒฝ์ฐ ๋ณ๋์ ๋ฉ์์ง ํ ์๋ฃจ์ ์ ๋์ ํ๋ ๊ฒ์ ์ธํ๋ผ ์ ๊ณผ๋ํ ์๋ ์๋ค. ์ด๋ฌํ ๊ฒฝ์ฐ ๋ ๋์ค์์ ์ ๊ณตํ๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ์ฌ ๋ฉ์์ง ๊ธฐ๋ฐ์ ํ๋ก๊ทธ๋๋ฐ์ ์ํํ ์ ์๋๋ฐ ์คํ๋ง ๋ถํธ ๊ธฐ๋ฐ์ ์ ํ๋ฆฌ์ผ์ด์ ์ธ ๊ฒฝ์ฐ Spring Data Redis์ ๋ ๋์ค ์คํธ๋ฆผ์ ๋ํ ๊ธฐ๋ฅ์ ํฌํจํ๊ณ ์์ผ๋ฏ๋ก ์ฝ๊ฒ ์คํธ๋ฆผ์ ๋ฉ์์ง๋ฅผ ๋ฑ๋กํ๊ณ ์ปจ์๋จธ๋ฅผ ์ถ๊ฐํ์ฌ ๋ฉ์์ง๋ฅผ ์์ ํ๋๋ก ๊ตฌํํ ์ ์๋ค.
๋ฉ์์ง ์๋ ๋น ์คํธ๋ฆผ ์์ฑํ๊ธฐ โ
XGROUP CREATE stream_key group_name $ MKSTREAM
๊ธฐ๋ณธ์ ์ธ ๋ ๋์ค ๋ช ๋ น์ด๋ก๋ ์์ ๊ฐ์ด ์ปจ์๋จธ ๊ทธ๋ฃน์ ์์ฑํ ๋ MKSTREAM ์ต์ ์ ์ง์ ํ์ฌ ์คํธ๋ฆผ์ด ์กด์ฌํ์ง ์์ ๊ฒฝ์ฐ์ ์คํธ๋ฆผ์ด ์๋์ผ๋ก ์์ฑ๋๋๋ก ํ ์ ์๋ค. Spring Data Redis ์์๋ RedisConnection ๊ณผ RedisStreamCommands ์ธํฐํ์ด์ค์ xGroupCreate ๋ผ๋ ํจ์๋ก ์ ์๋์ด ์์ผ๋ฏ๋ก ์๋์ ๊ฐ์ด ๊ตฌํํ๋ฉด ๋๋ค.
String streamKey = "stream-1";
String groupName = "group-1";
if (Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.lastConsumed(), groupName);
} else {
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
RedisStreamCommands streamCommands = connection.streamCommands();
streamCommands.xGroupCreate(streamKey.getBytes(StandardCharsets.UTF_8), groupName, ReadOffset.latest(), true);
}
StreamMessageListenerContainer โ
StreamMessageListenerContainer ์ StreamListener ๋ฅผ ํ์ฉํ๋ฉด RedisTemplate ์ผ๋ก ์คํธ๋ฆผ์ ์ง์ ์ฝ์ง ์์๋ ๋ค์ค ์ค๋ ๋๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ์์ ํ์ฌ ์ฒ๋ฆฌํ ์ ์๋ค.
StreamMessageListenerContainer messageListenerContainer = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.build());
Consumer consumer = Consumer.from(groupName, consumerName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
Subscription subscription = messageListenerContainer.receiveAutoAck(consumer, streamOffset, record -> {
System.out.println(record);
});
subscription.await(Duration.ofSeconds(1));
messageListenerContainer.start();