☁️ infra/✉️ MQ

Rabbit MQ + Spring Boot(1)

beomsic 2023. 1. 4. 21:56

직접 Rabbit MQ를 사용해 간단한 비동기 통신 샘플 프로젝트를 만들어보고자 한다.

📌 RabbitMQ 설치 (Docker)


간단한 프로젝트를 만들 예정이라 Docker를 통해서 RabbitMQ를 설치했다.

 

이미지 Pull

docker pull rabbitmq // 최신버전 pull

docker pull rabbitmq:3.11.5-management // 3.11 버전 pull(버전 명시)

 

컨테이너 실행

docker run 명령어를 통해 RabbitMQ Container를 실행해준다.

docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 
--name some-rabbit rabbitmq:3
  • hostname : rabbitmq
  • port binding
    • 5672:5672 - RabbitMQ서버
    • 15672:15672 - management plugin web
    • RabbitMQ 의 default port 는 5672 이다.

 

docker ps - a
  • 위 명령어를 통해 정상적으로 RabbitMQ Container가 동작하고 있는지 확인할 수 있다.

 

📌 Spring Boot - RabbitMQ


Spring Boot-rabbitMQ 페이지를 참고했다.

의존성 추가 - build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation group: 'org.springframework.boot', name: 'spring-boot-starter-amqp', version: '3.0.1'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

 

코드

4개의 클래스 구현

  • ReceiverConfiuration
    • RabbitMQ ↔ SpringBoot 간 Connection 설정
    • Receiver 클래스에게 메시지를 전달하기 위한 Listener Container Factory 설정
  • Receiver
    • 수신한 메시지를 처리할 클래스
  • SampleMessage
    • 메시지 포맷 정의
  • RabbitmqTestApplication
    • SpringBootApplication Entry Point

 

ReceiverConfiguration

@PropertySource("classpath:application.yml")
@Configuration
public class ReceiverConfiguration {

  @Value("${spring.rabbitmq.host}")
  private String RABBITMQ_HOST;

  @Value("${spring.rabbitmq.port}")
  private int RABBITMQ_PORT;

  @Value("${spring.rabbitmq.username}")
  private String RABBITMQ_USERNAME;

  @Value("${spring.rabbitmq.password}")
  private String RABBITMQ_PASSWORD;

  @Value("${spring.rabbitmq.template.routing-key}")
  private String ROUTING_KEY;

  @Value("${spring.rabbitmq.template.exchange}")
  private String EXCHANGE_NAME;

  @Value("${spring.rabbitmq.template.default-receive-queue}")
  private String QUEUE_NAME;

  /**
   * RabbitMQ Server와 connection을 생성하는 factory 객체 반환
   *
   * @return 초기화된 RabbitMQ ConnectionFactory 객체
   */
  @Bean
  public ConnectionFactory getConnectionFactory() {
    ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT);
    ((CachingConnectionFactory) connectionFactory).setUsername(RABBITMQ_USERNAME);
    ((CachingConnectionFactory) connectionFactory).setPassword(RABBITMQ_PASSWORD);
    return connectionFactory;
  }

  /**
   * RabbitMQ에서 사용할 Exchange를 선언, 반환한다.
   * Exchange 종류로 Topic 사용
   *
   * @return 초기화된 TopicExchange 객체
   */
  @Bean
  public TopicExchange getExchange() {
    return new TopicExchange(EXCHANGE_NAME);
  }

  /**
   * RabbitMQ에서 사용할 Queue를 선언하고 반환한다.
   *
   * @return 초기화된 Queue 객체
   */
  @Bean
  public Queue queue() {

    // 큐의 이름, durable 여부 지정
    // durable 이 true => queue는 서버 재시작후에도 유지가 된다.
    return new Queue(QUEUE_NAME, true);
  }

  /**
   * Exchange와 Queue를 연결하는 Binding 객체를 선언하고 반환한다.
   *
   * @param queue 초기화된 Queue
   * @param exchange 초기화된 TopicExchange
   * @return Binding 객체
   */
  @Bean
  public Binding getBinding(Queue queue, TopicExchange exchange){
    // exchange 타입이 Topic -> routing key를 사용해 binding
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  }

  /**
   * RabbitMQ에서 메시지를 수신할 Listener의 Factory를 선언하고 반환한다.
   *
   * @param connectionFactory ConnectionFactory 객체
   * @param converter Jackson Converter 객체. byte[] <-> 메시지 간 변환을 담당
   * @return 초기화된 Listener Factory 객체
   */
  @Bean("SampleContainerFactory")
  public SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(converter);
    return factory;
  }

  @Bean
  public Jackson2JsonMessageConverter getMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }

}

 

CachingConnectionFactory

  • host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
// host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
public CachingConnectionFactory(@Nullable String hostNameArg, int port) {
  super(newRabbitConnectionFactory());
  String hostname = hostNameArg;
  if (!StringUtils.hasText(hostname)) {
    hostname = getDefaultHostName();
  }
  setHost(hostname);
  setPort(port);
  doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}

...

// 지정된 ConnectionFactory에 대해 새 CachingConnectionFactory를 만든다.
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
			boolean isPublisherFactory) {

  super(rabbitConnectionFactory);
  if (!isPublisherFactory) {
  if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) {
    rabbitConnectionFactory.setAutomaticRecoveryEnabled(false);
    logger.warn("***\\nAutomatic Recovery was Enabled in the provided connection factory;\\n"
				+ "while Spring AMQP is generally compatible with this feature, there\\n"
				+ "are some corner cases where problems arise. Spring AMQP\\n"
				+ "prefers to use its own recovery mechanisms; when this option is true, you may receive\\n"
				+ "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.\\n"
				+ "It has therefore been disabled; if you really wish to enable it, use\\n"
				+ "'getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true)',\\n"
				+ "but this is discouraged.");
  }
  super.setPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
  }
  else {
    super.setPublisherConnectionFactory(null);
    this.defaultPublisherFactory = false;
  }
}

 

AbstractConncetionFactory

// 사용자 지정 publisher connection factory 설정

public void setPublisherConnectionFactory(
  @Nullable AbstractConnectionFactory publisherConnectionFactory) {

  doSetPublisherConnectionFactory(publisherConnectionFactory);
}

protected final void doSetPublisherConnectionFactory(
  @Nullable AbstractConnectionFactory publisherConnectionFactory) {

  this.publisherConnectionFactory = publisherConnectionFactory;
}

 

Receiver

  • 메시징 기반 응용프로그램을 사용할 경우 publish된 메시지에 응답하는 Receiver를 만들어야 한다.
  • Receiver는 메시지 수신방법을 정의하는 POJO이다.
@Slf4j
@Component
public class Receiver {
  
  /**
   * 메시지 수신 시 처리할 Handler 함수
   * @param message RabbitMQ의 test-queue 으로부터 수신한 메시지
   */
  @RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue")
  public void onReceiveMessage(SampleMessage message){
    log.info("== Receiver received message: {}", message);
  }
  
}

 

SampleMessage

@Slf4j
@Getter
public class SampleMessage {

  private String name;
  private String content;

  public void setContent(String content) {
    this.content = content;
  }

  @Override
  public String toString() {
    try {
      return new ObjectMapper().writeValueAsString(this);
    }
    catch(Exception e){
      log.error("error!!");
      return "";
    }
  }
}

 

실행

RabbitMQ Web Managemet에서 메시지를 보내보기

 

Appliction을 구동

  • RabbitMQ 연결정보와 연결되었다는 메시지가 출력된다.

 

RabbitMQ Web Management 페이지

  • http://localhost:15672
  • 초기 사용자와 패스워드 : guest, guest

 

 

Exchange

  • 선언한 test-exchange 가 존재한다.

  • Name 클릭시 해당 exchange의 타입과 binding 을 확인할 수 있다.

 

 

Queue

  • 선언한 test-queue 존재

  • 클릭시 해당 queue에 대한 정보를 확인할 수 있고 메시지 송/수신 등 Queue에 대한 기능을 수행할 수 있다.

 

 

큐에 메시지를 보내기

  • publish message 를 눌러 메시지를 작성한 후 전송
  • propertiescontent_typeapplication/json으로 설정해주어야 한다.

 

Application 로그 확인

  • 방금 publish 한 메시지의 내용을 출력한다.

 

참고자료

https://hub.docker.com/_/rabbitmq/

https://leeyh0216.github.io/posts/spring-rabbitmq-1/

https://heodolf.tistory.com/50

https://spring.io/guides/gs/messaging-rabbitmq/

https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.html

https://www.rabbitmq.com/queues.html

https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html