μ†Œκ·œλͺ¨ μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ„œλ²„μ—μ„œ λ ˆλ””μŠ€λ₯Ό μ‚¬μš©ν•˜κ³  μžˆμ„λ•Œ RabbitMQ λ˜λŠ” Apache Kafka와 같은 λ©”μ‹œμ§€ 큐λ₯Ό 톡해 비동기 μ΄λ²€νŠΈμ— λŒ€ν•œ μš”κ΅¬κ°€ ν•„μš”ν•œ 경우 λ³„λ„μ˜ λ©”μ‹œμ§€ 큐 μ†”λ£¨μ…˜μ„ λ„μž…ν•˜λŠ” 것은 인프라 상 과도할 μˆ˜λ„ μžˆλ‹€. μ΄λŸ¬ν•œ 경우 λ ˆλ””μŠ€μ—μ„œ μ œκ³΅ν•˜λŠ” μŠ€νŠΈλ¦Όμ„ μ‚¬μš©ν•˜μ—¬ λ©”μ‹œμ§€ 기반의 ν”„λ‘œκ·Έλž˜λ°μ„ μˆ˜ν–‰ν•  수 μžˆλŠ”λ° μŠ€ν”„λ§ λΆ€νŠΈ 기반의 μ• ν”Œλ¦¬μΌ€μ΄μ…˜μΈ 경우 Spring Data Redis에 λ ˆλ””μŠ€ μŠ€νŠΈλ¦Όμ— λŒ€ν•œ κΈ°λŠ₯을 ν¬ν•¨ν•˜κ³  μžˆμœΌλ―€λ‘œ μ‰½κ²Œ μŠ€νŠΈλ¦Όμ— λ©”μ‹œμ§€λ₯Ό λ“±λ‘ν•˜κ³  컨슈머λ₯Ό μΆ”κ°€ν•˜μ—¬ λ©”μ‹œμ§€λ₯Ό μˆ˜μ‹ ν•˜λ„λ‘ κ΅¬ν˜„ν•  수 μžˆλ‹€.

λ©”μ‹œμ§€ μ—†λŠ” 빈 슀트림 μƒμ„±ν•˜κΈ°

XGROUP CREATE stream_key group_name $ MKSTREAM

기본적인 λ ˆλ””μŠ€ λͺ…λ Ήμ–΄λ‘œλŠ” μœ„μ™€ 같이 컨슈머 그룹을 생성할 λ•Œ MKSTREAM μ˜΅μ…˜μ„ μ§€μ •ν•˜μ—¬ 슀트림이 μ‘΄μž¬ν•˜μ§€ μ•Šμ€ κ²½μš°μ— 슀트림이 μžλ™μœΌλ‘œ μƒμ„±λ˜λ„λ‘ ν•  수 μžˆλ‹€. Spring Data Redis μ—μ„œλŠ” RedisConnection κ³Ό RedisStreamCommands μΈν„°νŽ˜μ΄μŠ€μ— xGroupCreate λΌλŠ” ν•¨μˆ˜λ‘œ μ •μ˜λ˜μ–΄ μžˆμœΌλ―€λ‘œ μ•„λž˜μ™€ 같이 κ΅¬ν˜„ν•˜λ©΄ λœλ‹€.

String streamKey = "stream-1";
String groupName = "group-1";

if (Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
    redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.lastConsumed(), groupName);
} else {
    RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
    RedisStreamCommands streamCommands = connection.streamCommands();
    streamCommands.xGroupCreate(streamKey.getBytes(StandardCharsets.UTF_8), groupName, ReadOffset.latest(), true);
}

StreamMessageListenerContainer

StreamMessageListenerContainer 와 StreamListener λ₯Ό ν™œμš©ν•˜λ©΄ RedisTemplate 으둜 μŠ€νŠΈλ¦Όμ„ 직접 읽지 μ•Šμ•„λ„ 닀쀑 μŠ€λ ˆλ“œλ₯Ό 톡해 λ©”μ‹œμ§€λ₯Ό μˆ˜μ‹ ν•˜μ—¬ μ²˜λ¦¬ν•  수 μžˆλ‹€.

StreamMessageListenerContainer messageListenerContainer = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .hashKeySerializer(new StringRedisSerializer())
                .hashValueSerializer(new StringRedisSerializer())
                .build());

Consumer consumer = Consumer.from(groupName, consumerName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
Subscription subscription = messageListenerContainer.receiveAutoAck(consumer, streamOffset, record -> {
    System.out.println(record);
});
subscription.await(Duration.ofSeconds(1));

messageListenerContainer.start();