-
Rabbit MQ + Spring Boot(1)☁️ infra/✉️ MQ 2023. 1. 4. 21:56
직접 Rabbit MQ를 사용해 간단한 비동기 통신 샘플 프로젝트를 만들어보고자 한다.
📌 RabbitMQ 설치 (Docker)
간단한 프로젝트를 만들 예정이라 Docker를 통해서 RabbitMQ를 설치했다.
이미지 Pull
docker pull rabbitmq // 최신버전 pull docker pull rabbitmq:3.11.5-management // 3.11 버전 pull(버전 명시)
컨테이너 실행
docker run 명령어를 통해 RabbitMQ Container를 실행해준다.
docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 --name some-rabbit rabbitmq:3
- hostname : rabbitmq
- port binding
- 5672:5672 - RabbitMQ서버
- 15672:15672 - management plugin web
- RabbitMQ 의 default port 는 5672 이다.
docker ps - a
- 위 명령어를 통해 정상적으로 RabbitMQ Container가 동작하고 있는지 확인할 수 있다.
📌 Spring Boot - RabbitMQ
Spring Boot-rabbitMQ 페이지를 참고했다.
의존성 추가 - build.gradle
dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation group: 'org.springframework.boot', name: 'spring-boot-starter-amqp', version: '3.0.1' testImplementation 'org.springframework.boot:spring-boot-starter-test' }
코드
4개의 클래스 구현
- ReceiverConfiuration
- RabbitMQ ↔ SpringBoot 간 Connection 설정
- Receiver 클래스에게 메시지를 전달하기 위한 Listener Container Factory 설정
- Receiver
- 수신한 메시지를 처리할 클래스
- SampleMessage
- 메시지 포맷 정의
- RabbitmqTestApplication
- SpringBootApplication Entry Point
ReceiverConfiguration
@PropertySource("classpath:application.yml") @Configuration public class ReceiverConfiguration { @Value("${spring.rabbitmq.host}") private String RABBITMQ_HOST; @Value("${spring.rabbitmq.port}") private int RABBITMQ_PORT; @Value("${spring.rabbitmq.username}") private String RABBITMQ_USERNAME; @Value("${spring.rabbitmq.password}") private String RABBITMQ_PASSWORD; @Value("${spring.rabbitmq.template.routing-key}") private String ROUTING_KEY; @Value("${spring.rabbitmq.template.exchange}") private String EXCHANGE_NAME; @Value("${spring.rabbitmq.template.default-receive-queue}") private String QUEUE_NAME; /** * RabbitMQ Server와 connection을 생성하는 factory 객체 반환 * * @return 초기화된 RabbitMQ ConnectionFactory 객체 */ @Bean public ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT); ((CachingConnectionFactory) connectionFactory).setUsername(RABBITMQ_USERNAME); ((CachingConnectionFactory) connectionFactory).setPassword(RABBITMQ_PASSWORD); return connectionFactory; } /** * RabbitMQ에서 사용할 Exchange를 선언, 반환한다. * Exchange 종류로 Topic 사용 * * @return 초기화된 TopicExchange 객체 */ @Bean public TopicExchange getExchange() { return new TopicExchange(EXCHANGE_NAME); } /** * RabbitMQ에서 사용할 Queue를 선언하고 반환한다. * * @return 초기화된 Queue 객체 */ @Bean public Queue queue() { // 큐의 이름, durable 여부 지정 // durable 이 true => queue는 서버 재시작후에도 유지가 된다. return new Queue(QUEUE_NAME, true); } /** * Exchange와 Queue를 연결하는 Binding 객체를 선언하고 반환한다. * * @param queue 초기화된 Queue * @param exchange 초기화된 TopicExchange * @return Binding 객체 */ @Bean public Binding getBinding(Queue queue, TopicExchange exchange){ // exchange 타입이 Topic -> routing key를 사용해 binding return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } /** * RabbitMQ에서 메시지를 수신할 Listener의 Factory를 선언하고 반환한다. * * @param connectionFactory ConnectionFactory 객체 * @param converter Jackson Converter 객체. byte[] <-> 메시지 간 변환을 담당 * @return 초기화된 Listener Factory 객체 */ @Bean("SampleContainerFactory") public SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(converter); return factory; } @Bean public Jackson2JsonMessageConverter getMessageConverter() { return new Jackson2JsonMessageConverter(); } }
CachingConnectionFactory
- host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
// host 이름과 port가 지정된 새 CachingConnectionFactory를 생성 public CachingConnectionFactory(@Nullable String hostNameArg, int port) { super(newRabbitConnectionFactory()); String hostname = hostNameArg; if (!StringUtils.hasText(hostname)) { hostname = getDefaultHostName(); } setHost(hostname); setPort(port); doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true)); } ... // 지정된 ConnectionFactory에 대해 새 CachingConnectionFactory를 만든다. private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory, boolean isPublisherFactory) { super(rabbitConnectionFactory); if (!isPublisherFactory) { if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) { rabbitConnectionFactory.setAutomaticRecoveryEnabled(false); logger.warn("***\\nAutomatic Recovery was Enabled in the provided connection factory;\\n" + "while Spring AMQP is generally compatible with this feature, there\\n" + "are some corner cases where problems arise. Spring AMQP\\n" + "prefers to use its own recovery mechanisms; when this option is true, you may receive\\n" + "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.\\n" + "It has therefore been disabled; if you really wish to enable it, use\\n" + "'getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true)',\\n" + "but this is discouraged."); } super.setPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true)); } else { super.setPublisherConnectionFactory(null); this.defaultPublisherFactory = false; } }
AbstractConncetionFactory
// 사용자 지정 publisher connection factory 설정 public void setPublisherConnectionFactory( @Nullable AbstractConnectionFactory publisherConnectionFactory) { doSetPublisherConnectionFactory(publisherConnectionFactory); } protected final void doSetPublisherConnectionFactory( @Nullable AbstractConnectionFactory publisherConnectionFactory) { this.publisherConnectionFactory = publisherConnectionFactory; }
Receiver
- 메시징 기반 응용프로그램을 사용할 경우 publish된 메시지에 응답하는 Receiver를 만들어야 한다.
- Receiver는 메시지 수신방법을 정의하는 POJO이다.
@Slf4j @Component public class Receiver { /** * 메시지 수신 시 처리할 Handler 함수 * @param message RabbitMQ의 test-queue 으로부터 수신한 메시지 */ @RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue") public void onReceiveMessage(SampleMessage message){ log.info("== Receiver received message: {}", message); } }
SampleMessage
@Slf4j @Getter public class SampleMessage { private String name; private String content; public void setContent(String content) { this.content = content; } @Override public String toString() { try { return new ObjectMapper().writeValueAsString(this); } catch(Exception e){ log.error("error!!"); return ""; } } }
실행
RabbitMQ Web Managemet에서 메시지를 보내보기
Appliction을 구동
- RabbitMQ 연결정보와 연결되었다는 메시지가 출력된다.
RabbitMQ Web Management 페이지
- http://localhost:15672
- 초기 사용자와 패스워드 : guest, guest
Exchange
- 선언한 test-exchange 가 존재한다.
- Name 클릭시 해당 exchange의 타입과 binding 을 확인할 수 있다.
Queue
- 선언한 test-queue 존재
- 클릭시 해당 queue에 대한 정보를 확인할 수 있고 메시지 송/수신 등 Queue에 대한 기능을 수행할 수 있다.
큐에 메시지를 보내기
- publish message 를 눌러 메시지를 작성한 후 전송
- properties에 content_type 을 application/json으로 설정해주어야 한다.
Application 로그 확인
- 방금 publish 한 메시지의 내용을 출력한다.
참고자료
https://hub.docker.com/_/rabbitmq/
https://leeyh0216.github.io/posts/spring-rabbitmq-1/
https://heodolf.tistory.com/50
https://spring.io/guides/gs/messaging-rabbitmq/
https://www.rabbitmq.com/queues.html
'☁️ infra > ✉️ MQ' 카테고리의 다른 글
[RabbitMQ] 한 Queue에서 여러 타입의 Message 처리 (0) 2023.01.05 RabbitMQ + Spring Boot(2) (0) 2023.01.05 Rabbit MQ (0) 2023.01.04 Message Queue 란❓ (0) 2023.01.04