☁️ infra/✉️ MQ
Rabbit MQ + Spring Boot(1)
beomsic
2023. 1. 4. 21:56
직접 Rabbit MQ를 사용해 간단한 비동기 통신 샘플 프로젝트를 만들어보고자 한다.
📌 RabbitMQ 설치 (Docker)
간단한 프로젝트를 만들 예정이라 Docker를 통해서 RabbitMQ를 설치했다.
이미지 Pull
docker pull rabbitmq // 최신버전 pull
docker pull rabbitmq:3.11.5-management // 3.11 버전 pull(버전 명시)
컨테이너 실행
docker run 명령어를 통해 RabbitMQ Container를 실행해준다.
docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672
--name some-rabbit rabbitmq:3
- hostname : rabbitmq
- port binding
- 5672:5672 - RabbitMQ서버
- 15672:15672 - management plugin web
- RabbitMQ 의 default port 는 5672 이다.
docker ps - a
- 위 명령어를 통해 정상적으로 RabbitMQ Container가 동작하고 있는지 확인할 수 있다.
📌 Spring Boot - RabbitMQ
Spring Boot-rabbitMQ 페이지를 참고했다.
의존성 추가 - build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-amqp', version: '3.0.1'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
코드
4개의 클래스 구현
- ReceiverConfiuration
- RabbitMQ ↔ SpringBoot 간 Connection 설정
- Receiver 클래스에게 메시지를 전달하기 위한 Listener Container Factory 설정
- Receiver
- 수신한 메시지를 처리할 클래스
- SampleMessage
- 메시지 포맷 정의
- RabbitmqTestApplication
- SpringBootApplication Entry Point
ReceiverConfiguration
@PropertySource("classpath:application.yml")
@Configuration
public class ReceiverConfiguration {
@Value("${spring.rabbitmq.host}")
private String RABBITMQ_HOST;
@Value("${spring.rabbitmq.port}")
private int RABBITMQ_PORT;
@Value("${spring.rabbitmq.username}")
private String RABBITMQ_USERNAME;
@Value("${spring.rabbitmq.password}")
private String RABBITMQ_PASSWORD;
@Value("${spring.rabbitmq.template.routing-key}")
private String ROUTING_KEY;
@Value("${spring.rabbitmq.template.exchange}")
private String EXCHANGE_NAME;
@Value("${spring.rabbitmq.template.default-receive-queue}")
private String QUEUE_NAME;
/**
* RabbitMQ Server와 connection을 생성하는 factory 객체 반환
*
* @return 초기화된 RabbitMQ ConnectionFactory 객체
*/
@Bean
public ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT);
((CachingConnectionFactory) connectionFactory).setUsername(RABBITMQ_USERNAME);
((CachingConnectionFactory) connectionFactory).setPassword(RABBITMQ_PASSWORD);
return connectionFactory;
}
/**
* RabbitMQ에서 사용할 Exchange를 선언, 반환한다.
* Exchange 종류로 Topic 사용
*
* @return 초기화된 TopicExchange 객체
*/
@Bean
public TopicExchange getExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
* RabbitMQ에서 사용할 Queue를 선언하고 반환한다.
*
* @return 초기화된 Queue 객체
*/
@Bean
public Queue queue() {
// 큐의 이름, durable 여부 지정
// durable 이 true => queue는 서버 재시작후에도 유지가 된다.
return new Queue(QUEUE_NAME, true);
}
/**
* Exchange와 Queue를 연결하는 Binding 객체를 선언하고 반환한다.
*
* @param queue 초기화된 Queue
* @param exchange 초기화된 TopicExchange
* @return Binding 객체
*/
@Bean
public Binding getBinding(Queue queue, TopicExchange exchange){
// exchange 타입이 Topic -> routing key를 사용해 binding
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
/**
* RabbitMQ에서 메시지를 수신할 Listener의 Factory를 선언하고 반환한다.
*
* @param connectionFactory ConnectionFactory 객체
* @param converter Jackson Converter 객체. byte[] <-> 메시지 간 변환을 담당
* @return 초기화된 Listener Factory 객체
*/
@Bean("SampleContainerFactory")
public SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
return factory;
}
@Bean
public Jackson2JsonMessageConverter getMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
CachingConnectionFactory
- host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
// host 이름과 port가 지정된 새 CachingConnectionFactory를 생성
public CachingConnectionFactory(@Nullable String hostNameArg, int port) {
super(newRabbitConnectionFactory());
String hostname = hostNameArg;
if (!StringUtils.hasText(hostname)) {
hostname = getDefaultHostName();
}
setHost(hostname);
setPort(port);
doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}
...
// 지정된 ConnectionFactory에 대해 새 CachingConnectionFactory를 만든다.
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
boolean isPublisherFactory) {
super(rabbitConnectionFactory);
if (!isPublisherFactory) {
if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) {
rabbitConnectionFactory.setAutomaticRecoveryEnabled(false);
logger.warn("***\\nAutomatic Recovery was Enabled in the provided connection factory;\\n"
+ "while Spring AMQP is generally compatible with this feature, there\\n"
+ "are some corner cases where problems arise. Spring AMQP\\n"
+ "prefers to use its own recovery mechanisms; when this option is true, you may receive\\n"
+ "'AutoRecoverConnectionNotCurrentlyOpenException's until the connection is recovered.\\n"
+ "It has therefore been disabled; if you really wish to enable it, use\\n"
+ "'getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true)',\\n"
+ "but this is discouraged.");
}
super.setPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}
else {
super.setPublisherConnectionFactory(null);
this.defaultPublisherFactory = false;
}
}
AbstractConncetionFactory
// 사용자 지정 publisher connection factory 설정
public void setPublisherConnectionFactory(
@Nullable AbstractConnectionFactory publisherConnectionFactory) {
doSetPublisherConnectionFactory(publisherConnectionFactory);
}
protected final void doSetPublisherConnectionFactory(
@Nullable AbstractConnectionFactory publisherConnectionFactory) {
this.publisherConnectionFactory = publisherConnectionFactory;
}
Receiver
- 메시징 기반 응용프로그램을 사용할 경우 publish된 메시지에 응답하는 Receiver를 만들어야 한다.
- Receiver는 메시지 수신방법을 정의하는 POJO이다.
@Slf4j
@Component
public class Receiver {
/**
* 메시지 수신 시 처리할 Handler 함수
* @param message RabbitMQ의 test-queue 으로부터 수신한 메시지
*/
@RabbitListener(containerFactory = "SampleListenerContainerFactory", queues="test-queue")
public void onReceiveMessage(SampleMessage message){
log.info("== Receiver received message: {}", message);
}
}
SampleMessage
@Slf4j
@Getter
public class SampleMessage {
private String name;
private String content;
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
try {
return new ObjectMapper().writeValueAsString(this);
}
catch(Exception e){
log.error("error!!");
return "";
}
}
}
실행
RabbitMQ Web Managemet에서 메시지를 보내보기
Appliction을 구동

- RabbitMQ 연결정보와 연결되었다는 메시지가 출력된다.
RabbitMQ Web Management 페이지
- http://localhost:15672
- 초기 사용자와 패스워드 : guest, guest
Exchange
- 선언한 test-exchange 가 존재한다.

- Name 클릭시 해당 exchange의 타입과 binding 을 확인할 수 있다.
Queue
- 선언한 test-queue 존재

- 클릭시 해당 queue에 대한 정보를 확인할 수 있고 메시지 송/수신 등 Queue에 대한 기능을 수행할 수 있다.
큐에 메시지를 보내기
- publish message 를 눌러 메시지를 작성한 후 전송
- properties에 content_type 을 application/json으로 설정해주어야 한다.

Application 로그 확인

- 방금 publish 한 메시지의 내용을 출력한다.
참고자료
https://hub.docker.com/_/rabbitmq/
https://leeyh0216.github.io/posts/spring-rabbitmq-1/
https://heodolf.tistory.com/50
https://spring.io/guides/gs/messaging-rabbitmq/
https://www.rabbitmq.com/queues.html