☁️ infra/✉️ MQ

[RabbitMQ] 한 Queue에서 여러 타입의 Message 처리

beomsic 2023. 1. 5. 19:46

❗ 한 개의 Queue에 두 개 이상의 다른 타입 Message가 들어올 때의 처리!

한 개의 Exchange에 한 개의 Queue가 바인딩 된 상태

  • exchange 타입 - topic

 

메시지 타입

두 개의 다른 클래스

 

SampleMessage

@Slf4j
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class SampleMessage {

  private String name;
  private String content;

}

 

OtherMessage

@Slf4j
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class OtherMessage {

  private String name;
  private String content;
  private String writer;

}

각 메시지 타입에 따라 다른 로직을 수행하도록 한다.

 

📤 Producer(Sender)


한 개의 큐로 서로 다른 타입의 객체를 보낸다.

@Slf4j
@Service
public class RabbitProducerService {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private Queue queue;

  public void sendConvertSampleMessage(SampleMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send SampleMessage {}", message.getContent());
  }

  public void sendConvertOtherMessage(OtherMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send OtherMessage {}", message.getContent());
  }
}

 

message 보내기

@Component
public class TestRunner implements CommandLineRunner {

  @Autowired
  private RabbitProducerService producer;

  @Override
  public void run(String... args) throws Exception {
    SampleMessage testSampleMessage = new SampleMessage("beomsicA", "contentA");
    producer.sendConvertSampleMessage(testSampleMessage);

    OtherMessage testOtherMessage = new OtherMessage("beomsicB", "contentB", "범석");
    producer.sendConvertOtherMessage(testOtherMessage);
  }

}

 

log 확인

 

RabbitMQ Web Management 페이지

 

→ 정상적으로 동작!!

 

📩 Consumer


이전과 달리 클래스 레벨@RabbitListener 를 지정한다.

@Slf4j
@Component
@RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue")
public class Receiver {

  @RabbitHandler
  public void listenerSampleMessage(SampleMessage message) {
    log.info("SampleMessage Received!! - {}", message.getContent());
  }

  @RabbitHandler
  public void listenerOtherMessage(OtherMessage message) {
    log.info("OtherMessage Received!! - {}", message.getContent());
  }

}

 

@RabbitHandler 를 통해 메시지의 타입마다 다르게 매핑하도록 설정!!

 

결과

 

→ 메시지 타입마다 다른 메서드를 호출한다❗

 

📌 isDefault


위에서 사용한 메시지 타입인 SampleMessage 와 OtherMessage 외에 다른 타입이 전달될 수 있다.

  • 해당 두 타입외에 대한 처리를 일괄적으로 할 수 있다.

 

다른 타입 추가

AMessage

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class AMessage {

  private Long id;
  private String name;

}

 

BMessage

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class BMessage {

  private Long id;
  private String name;
  private String writer;

}

 

Producer

4개 타입의 메시지를 send

@Slf4j
@Service
public class RabbitProducerService {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private Queue queue;

  public void sendConvertSampleMessage(SampleMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send SampleMessage {}", message.getContent());
  }

  public void sendConvertOtherMessage(OtherMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send OtherMessage {}", message.getContent());
  }

  public void sendConvertMessageA(AMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send SampleMessage {}", message.getContent());
  }

  public void sendConvertMessageB(BMessage message) {
    rabbitTemplate.convertAndSend(queue.getName(), message);
    log.info("RabbitProducerService send OtherMessage {}", message.getContent());
  }
}

 

@Component
public class TestRunner implements CommandLineRunner {

  @Autowired
  private RabbitProducerService producer;

  @Override
  public void run(String... args) throws Exception {
    SampleMessage testSampleMessage = new SampleMessage("beomsicA", "contentA");
    producer.sendConvertSampleMessage(testSampleMessage);

    OtherMessage testOtherMessage = new OtherMessage("beomsicB", "contentB", "범석");
    producer.sendConvertOtherMessage(testOtherMessage);

    AMessage testAMessage = new AMessage(1L, "contentAA");
    producer.sendConvertMessageA(testAMessage);

    BMessage testBMessage = new BMessage(2L, "contentBB", "범석");
    producer.sendConvertMessageB(testBMessage);
  }

}

 

Consumer

Consumer 측에서 메시지를 처리할 때 @RabbitHandlerisDefault 속성을 사용하면 @RabbitHandler에 의해 매핑되지 않은 타입에 대한 처리를 지원한다.

 

@Slf4j
@Component
@RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue")
public class Receiver {

  @RabbitHandler
  public void listenerSampleMessage(SampleMessage message) {
    log.info("SampleMessage Received!! - {}", message.getContent());
  }

  @RabbitHandler
  public void listenerOtherMessage(OtherMessage message) {
    log.info("OtherMessage Received!! - {}", message.getContent());
  }
  
  @RabbitHandler(isDefault = true)
  public void listenerDefault(Object obj) {
    log.info("Default Message Received!! - {}", obj);
  }

}

 

결과

 

참고자료

https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitHandler.html

https://velog.io/@dhk22/RabbitMQ-한-개-Queue에-여러-타입-Message-처리하기-RabbitHandler