Redis Stream
μκ·λͺ¨ μ ν리μΌμ΄μ μλ²μμ λ λμ€λ₯Ό μ¬μ©νκ³ μμλ RabbitMQ λλ Apache Kafkaμ κ°μ λ©μμ§ νλ₯Ό ν΅ν΄ λΉλκΈ° μ΄λ²€νΈμ λν μκ΅¬κ° νμν κ²½μ° λ³λμ λ©μμ§ ν μ루μ μ λμ νλ κ²μ μΈνλΌ μ κ³Όλν μλ μλ€. μ΄λ¬ν κ²½μ° λ λμ€μμ μ 곡νλ μ€νΈλ¦Όμ μ¬μ©νμ¬ λ©μμ§ κΈ°λ°μ νλ‘κ·Έλλ°μ μνν μ μλλ° μ€νλ§ λΆνΈ κΈ°λ°μ μ ν리μΌμ΄μ μΈ κ²½μ° Spring Data Redisμ λ λμ€ μ€νΈλ¦Όμ λν κΈ°λ₯μ ν¬ν¨νκ³ μμΌλ―λ‘ μ½κ² μ€νΈλ¦Όμ λ©μμ§λ₯Ό λ±λ‘νκ³ μ»¨μλ¨Έλ₯Ό μΆκ°νμ¬ λ©μμ§λ₯Ό μμ νλλ‘ κ΅¬νν μ μλ€.
λ©μμ§ μλ λΉ μ€νΈλ¦Ό μμ±νκΈ°
XGROUP CREATE stream_key group_name $ MKSTREAM
κΈ°λ³Έμ μΈ λ λμ€ λͺ λ Ήμ΄λ‘λ μμ κ°μ΄ 컨μλ¨Έ κ·Έλ£Ήμ μμ±ν λ MKSTREAM μ΅μ μ μ§μ νμ¬ μ€νΈλ¦Όμ΄ μ‘΄μ¬νμ§ μμ κ²½μ°μ μ€νΈλ¦Όμ΄ μλμΌλ‘ μμ±λλλ‘ ν μ μλ€. Spring Data Redis μμλ RedisConnection κ³Ό RedisStreamCommands μΈν°νμ΄μ€μ xGroupCreate λΌλ ν¨μλ‘ μ μλμ΄ μμΌλ―λ‘ μλμ κ°μ΄ ꡬννλ©΄ λλ€.
String streamKey = "stream-1";
String groupName = "group-1";
if (Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.lastConsumed(), groupName);
} else {
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
RedisStreamCommands streamCommands = connection.streamCommands();
streamCommands.xGroupCreate(streamKey.getBytes(StandardCharsets.UTF_8), groupName, ReadOffset.latest(), true);
}
StreamMessageListenerContainer
StreamMessageListenerContainer μ StreamListener λ₯Ό νμ©νλ©΄ RedisTemplate μΌλ‘ μ€νΈλ¦Όμ μ§μ μ½μ§ μμλ λ€μ€ μ€λ λλ₯Ό ν΅ν΄ λ©μμ§λ₯Ό μμ νμ¬ μ²λ¦¬ν μ μλ€.
StreamMessageListenerContainer messageListenerContainer = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.build());
Consumer consumer = Consumer.from(groupName, consumerName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
Subscription subscription = messageListenerContainer.receiveAutoAck(consumer, streamOffset, record -> {
System.out.println(record);
});
subscription.await(Duration.ofSeconds(1));
messageListenerContainer.start();