ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Rabbit MQ + Spring Boot(1)
    ✉️ MQ 2023. 1. 4. 21:56

    직접 Rabbit MQ를 사용해 간단한 비동기 통신 샘플 프로젝트를 만들어보고자 한다.

    📌 RabbitMQ 설치 (Docker)


    간단한 프로젝트를 만들 예정이라 Docker를 통해서 RabbitMQ를 설치했다.

     

    이미지 Pull

    docker pull rabbitmq // 최신버전 pull
    
    docker pull rabbitmq:3.11.5-management // 3.11 버전 pull(버전 명시)
    

     

    컨테이너 실행

    docker run 명령어를 통해 RabbitMQ Container를 실행해준다.

    docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 
    --name some-rabbit rabbitmq:3
    
    • hostname : rabbitmq
    • port binding
      • 5672:5672 - RabbitMQ서버
      • 15672:15672 - management plugin web
      • RabbitMQ 의 default port 는 5672 이다.

     

    docker ps - a
    
    • 위 명령어를 통해 정상적으로 RabbitMQ Container가 동작하고 있는지 확인할 수 있다.

     

    📌 Spring Boot - RabbitMQ


    Spring Boot-rabbitMQ 페이지를 참고했다.

    의존성 추가 - build.gradle

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation group: 'org.springframework.boot', name: 'spring-boot-starter-amqp', version: '3.0.1'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
    }
    

     

    코드

    4개의 클래스 구현

    • ReceiverConfiuration
      • RabbitMQ ↔ SpringBoot 간 Connection 설정
      • Receiver 클래스에게 메시지를 전달하기 위한 Listener Container Factory 설정
    • Receiver
      • 수신한 메시지를 처리할 클래스
    • SampleMessage
      • 메시지 포맷 정의
    • RabbitmqTestApplication
      • SpringBootApplication Entry Point

     

    ReceiverConfiguration

    @PropertySource("classpath:application.yml")
    @Configuration
    public class ReceiverConfiguration {
    
      @Value("${spring.rabbitmq.host}")
      private String RABBITMQ_HOST;
    
      @Value("${spring.rabbitmq.port}")
      private int RABBITMQ_PORT;
    
      @Value("${spring.rabbitmq.username}")
      private String RABBITMQ_USERNAME;
    
      @Value("${spring.rabbitmq.password}")
      private String RABBITMQ_PASSWORD;
    
      @Value("${spring.rabbitmq.template.routing-key}")
      private String ROUTING_KEY;
    
      @Value("${spring.rabbitmq.template.exchange}")
      private String EXCHANGE_NAME;
    
      @Value("${spring.rabbitmq.template.default-receive-queue}")
      private String QUEUE_NAME;
    
      /**
       * RabbitMQ Server와 connection을 생성하는 factory 객체 반환
       *
       * @return 초기화된 RabbitMQ ConnectionFactory 객체
       */
      @Bean
      public ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT);
        ((CachingConnectionFactory) connectionFactory).setUsername(RABBITMQ_USERNAME);
        ((CachingConnectionFactory) connectionFactory).setPassword(RABBITMQ_PASSWORD);
        return connectionFactory;
      }
    
      /**
       * RabbitMQ에서 사용할 Exchange를 선언, 반환한다.
       * Exchange 종류로 Topic 사용
       *
       * @return 초기화된 TopicExchange 객체
       */
      @Bean
      public TopicExchange getExchange() {
        return new TopicExchange(EXCHANGE_NAME);
      }
    
      /**
       * RabbitMQ에서 사용할 Queue를 선언하고 반환한다.
       *
       * @return 초기화된 Queue 객체
       */
      @Bean
      public Queue queue() {
    
        // 큐의 이름, durable 여부 지정
        // durable 이 true => queue는 서버 재시작후에도 유지가 된다.
        return new Queue(QUEUE_NAME, true);
      }
    
      /**
       * Exchange와 Queue를 연결하는 Binding 객체를 선언하고 반환한다.
       *
       * @param queue 초기화된 Queue
       * @param exchange 초기화된 TopicExchange
       * @return Binding 객체
       */
      @Bean
      public Binding getBinding(Queue queue, TopicExchange exchange){
        // exchange 타입이 Topic -> routing key를 사용해 binding
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
      }
    
      /**
       * RabbitMQ에서 메시지를 수신할 Listener의 Factory를 선언하고 반환한다.
       *
       * @param connectionFactory ConnectionFactory 객체
       * @param converter Jackson Converter 객체. byte[] <-> 메시지 간 변환을 담당
       * @return 초기화된 Listener Factory 객체
       */
      @Bean("SampleContainerFactory")
      public SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(converter);
        return factory;
      }
    
      @Bean
      public Jackson2JsonMessageConverter getMessageConverter() {
        return new Jackson2JsonMessageConverter();
      }
    
    }
    

     

    CachingConnectionFactory

    • host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
    // host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
    public CachingConnectionFactory(@Nullable String hostNameArg, int port) {
      super(newRabbitConnectionFactory());
      String hostname = hostNameArg;
      if (!StringUtils.hasText(hostname)) {
        hostname = getDefaultHostName();
      }
      setHost(hostname);
      setPort(port);
      doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
    }
    
    ...
    
    // 지정된 ConnectionFactory에 대해 새 CachingConnectionFactory를 만든다.
    private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
    			boolean isPublisherFactory) {
    
      super(rabbitConnectionFactory);
      if (!isPublisherFactory) {
      if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) {
        rabbitConnectionFactory.setAutomaticRecoveryEnabled(false);
        logger.warn("***\\nAutomatic Recovery was Enabled in the provided connection factory;\\n"
    				+ "while Spring AMQP is generally compatible with this feature, there\\n"
    				+ "are some corner cases where problems arise. Spring AMQP\\n"
    				+ "prefers to use its own recovery mechanisms; when this option is true, you may receive\\n"
    				+ "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.\\n"
    				+ "It has therefore been disabled; if you really wish to enable it, use\\n"
    				+ "'getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true)',\\n"
    				+ "but this is discouraged.");
      }
      super.setPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
      }
      else {
        super.setPublisherConnectionFactory(null);
        this.defaultPublisherFactory = false;
      }
    }

     

    AbstractConncetionFactory

    // 사용자 지정 publisher connection factory 설정
    
    public void setPublisherConnectionFactory(
      @Nullable AbstractConnectionFactory publisherConnectionFactory) {
    
      doSetPublisherConnectionFactory(publisherConnectionFactory);
    }
    
    protected final void doSetPublisherConnectionFactory(
      @Nullable AbstractConnectionFactory publisherConnectionFactory) {
    
      this.publisherConnectionFactory = publisherConnectionFactory;
    }

     

    Receiver

    • 메시징 기반 응용프로그램을 사용할 경우 publish된 메시지에 응답하는 Receiver를 만들어야 한다.
    • Receiver는 메시지 수신방법을 정의하는 POJO이다.
    @Slf4j
    @Component
    public class Receiver {
      
      /**
       * 메시지 수신 시 처리할 Handler 함수
       * @param message RabbitMQ의 test-queue 으로부터 수신한 메시지
       */
      @RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue")
      public void onReceiveMessage(SampleMessage message){
        log.info("== Receiver received message: {}", message);
      }
      
    }

     

    SampleMessage

    @Slf4j
    @Getter
    public class SampleMessage {
    
      private String name;
      private String content;
    
      public void setContent(String content) {
        this.content = content;
      }
    
      @Override
      public String toString() {
        try {
          return new ObjectMapper().writeValueAsString(this);
        }
        catch(Exception e){
          log.error("error!!");
          return "";
        }
      }
    }

     

    실행

    RabbitMQ Web Managemet에서 메시지를 보내보기

     

    Appliction을 구동

    • RabbitMQ 연결정보와 연결되었다는 메시지가 출력된다.

     

    RabbitMQ Web Management 페이지

    • http://localhost:15672
    • 초기 사용자와 패스워드 : guest, guest

     

     

    Exchange

    • 선언한 test-exchange 가 존재한다.

    • Name 클릭시 해당 exchange의 타입과 binding 을 확인할 수 있다.

     

     

    Queue

    • 선언한 test-queue 존재

    • 클릭시 해당 queue에 대한 정보를 확인할 수 있고 메시지 송/수신 등 Queue에 대한 기능을 수행할 수 있다.

     

     

    큐에 메시지를 보내기

    • publish message 를 눌러 메시지를 작성한 후 전송
    • propertiescontent_typeapplication/json으로 설정해주어야 한다.

     

    Application 로그 확인

    • 방금 publish 한 메시지의 내용을 출력한다.

     

    참고자료

    https://hub.docker.com/_/rabbitmq/

    https://leeyh0216.github.io/posts/spring-rabbitmq-1/

    https://heodolf.tistory.com/50

    https://spring.io/guides/gs/messaging-rabbitmq/

    https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.html

    https://www.rabbitmq.com/queues.html

    https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html

     

    '✉️ MQ' 카테고리의 다른 글

    [RabbitMQ] 한 Queue에서 여러 타입의 Message 처리  (0) 2023.01.05
    RabbitMQ + Spring Boot(2)  (0) 2023.01.05
    Rabbit MQ  (0) 2023.01.04
    Message Queue 란❓  (0) 2023.01.04

    댓글

Designed by Tistory.