본문 바로가기

Message Broker

[RabbitMQ] 3. 스프링 부트에서 RabbitMQ 설정 방법

 

이전 포스팅에선 AMQP 에 대해 살펴봤는데요,

 

2024.08.15 - [Message Broker] - [RabbitMQ] 1. 메시지 브로커(Message Broker)에 대해

2024.08.16 - [Message Broker] - [RabbitMQ] 2. AMQP 란?

 

이번엔 스프링 부트(Spring Boot 3.x)에서 RabbitMQ 를 설정하는 법을 알아보겠습니다.

 

1. RabbitMQ 의존성 추가

먼저, https://start.spring.io/ 에서 프로젝트를 초기 설정할 때를 예시로 들겠습니다.

ADD DEPENDENCIES 버튼을 눌러 "rabbitmq" 로 검색하여 의존성을 추가합니다.

RabbitMQ 의존성 추가

그러면 build.gradle 에는 다음과 같이 적용되어 있습니다.

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-amqp'
  ... // 생략
}

 

의존성 목록에 spring-boot-starter-amqp 가 추가된 것을 볼 수 있습니다.

Intellij IDE 에서 gradle 탭을 보면 spring-boot-starter-amqp 라이브러리엔 RabbitMQ 라이브러리가 포함되있는 것을 확인할 수 있습니다.

amqp 라이브러리를 추가하면 rabbitmq 관련 라이브러리도 함께 설치된다.

2. RabbitMQ 설정 클래스 (RabbitConfig)

스프링 부트에서 설정하는 간편한 방법 중 하나는 별개의 Config 클래스를 만들어 관리하는 것 입니다.

저는 RabbitConfig 클래스에서 관리하는 것을 설명드리겠습니다.

 

먼저 전체 코드입니다.

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    /* datetime format */
    public static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");

    /* exchange - binding key - queue */
    private static final String CHAT_EXCHANGE_NAME = "chat.exchange";
    private static final String ROUTING_KEY = "recipient.*";
    private static final String CHAT_QUEUE_NAME = "chat.queue";

    @Value("${spring.rabbitmq.username}")
    private String rabbitUser;

    @Value("${spring.rabbitmq.password}")
    private String rabbitPassword;

    @Value("${spring.rabbitmq.host}")
    private String rabbitHost;

    @Value("${spring.rabbitmq.virtual-host}")
    private String rabbitVirtualHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitPort;

    @Bean
    public Queue queue() {
        return new Queue(CHAT_QUEUE_NAME, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(CHAT_EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setRoutingKey(ROUTING_KEY);
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(rabbitHost);
        factory.setVirtualHost(rabbitVirtualHost);
        factory.setPort(rabbitPort);
        factory.setUsername(rabbitUser);
        factory.setPassword(rabbitPassword);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        objectMapper.registerModule(dateTimeModule());
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public Module dateTimeModule() {
        return new JavaTimeModule()
                .addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(FORMATTER))
                .addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(FORMATTER));
    }
}
  • @EnableRabbit: 스프링 부트에서 RabbitMQ 관련 설정을 읽고 Bean 으로 관리할 수 있도록 해주는 애너테이션 입니다.
    이 애너테이션으로 @RabbitListener 애너테이션이 붙은 메서드에서 메시지를 수신할 수 있도록 합니다.

아래는 설정 값을 읽어올 application.yml 입니다.

spring:
  rabbitmq:
    username: ${rmq-id} # 초기값은 guest
    password: ${rmq-password} # 초기값은 guest
    host: localhost # 추후 rabbitmq 서버의 주소로 변경
    port: 5672  # AMQP 기본 포트
    virtual-host: / # 가상 호스트 주소
  • username, password: rabbitmq 서버 설치 시 기본 값으로 설정했다면 초기 값은 둘 다 guest 입니다.
  • host: rabbitmq 서버가 올라가있는 주소입니다. 저는 로컬 환경에서 docker 로 띄웠기 때문에 localhost 로 설정했습니다.
  • port: 기본 설정 값은 5672 포트입니다.
  • virtual-host: 가상 호스트 주소입니다. 여러 가상 호스트로 서버를 논리적으로 분리할 수 있습니다. 별도로 설정하지 않으므로 '/' 로 설정합니다.

2-1. Queue, Exchange, Binding 설정

@Bean
public Queue queue() {
    return new Queue(CHAT_QUEUE_NAME, true);
}

@Bean
public TopicExchange exchange() {
    return new TopicExchange(CHAT_EXCHANGE_NAME);
}

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
  • AMQP 에선 메시지가 Exchange 를 거쳐 Binding 규칙에 따라 Queue 로 전달됨을 소개했었습니다.

메시지는 exchange 에 도달하면 binding 규칙에 의해 queue 로 전달된다.

  • java class 로 설정하면 각각은 @Bean 으로 등록하여 exchange, binding, queue 를 설정할 수 있습니다.
  • Exchange 예시에선 TopicExchange 로 전달함을 보여주고 있습니다.
  • Queue 예시에선 제가 정한 이름으로 새로운 큐를 생성함을 보여주고 있습니다.
    • 생성자의 두 번째 인자는 durable 에 대한 설정값으로, RabbitMQ 서버가 재시작되도 큐를 삭제하지 않고 유지하겠다는 의미입니다. (기본값이 true 입니다.)
      • 메시지에 대한 설정은 별개로 deliveryMode를 persistent로 설정해야 합니다. 이번 포스팅에선 다루지 않겠습니다.
      • Queue 예시에선 특정 큐의 이름으로 큐를 미리 생성함을 보여주고 있습니다.
    • 생성자의 세 번째 인자는 exclusive 에 대한 설정값으로, true 로 설정 시 해당 큐에 연결하는 것을 배타적으로 막을 수 있습니다. 연결이 끊어지면 큐가 삭제됩니다.
      • 채팅 시스템에선 여러 클라이언트에서 하나의 큐로 연결이 필요하므로 false 로 설정해야 합니다.

Queue 생성자의 기본값들

  • Binding 예시에선 빌더 패턴으로 routing key 를 이용해서 Exchange 와 Queue 를 말 그대로 바인딩 해주는 것을 볼 수 있습니다.
    • 라우팅 키(routing key)는 Exchange 에서 Queue 로 메시지를 전달할 때 사용되는 라우팅 패턴을 정의합니다.
      • Topic Exchange 예시에선 라우팅 키는 점(.)으로 구분된 단어들을 나열한 값입니다. 라우팅 키 패턴은 메시지를 특정 큐로 라우팅할 때, 와일드카드 문자를 사용하여 유연하게 메시지를 분류할 수 있게 합니다.
      • 예시에선 *(asterisk) 로 되어있기 때문에 한 개의 단어 패턴에 매칭되는 큐로 메시지를 전달합니다. (reciepient.member123, reciepient.member789 등이 해당)
      • 라우팅 키에 대한 포스팅은 별도로 작성할 예정입니다.

2-2. RabbitTemplate 설정

RabbitTemplate은 Spring AMQP에서 제공하는 주요 클래스 중 하나로, RabbitMQ와의 상호작용을 쉽게 만들어줍니다.

이 템플릿을 사용하면 메시지의 송수신이 간단해집니다. 추후 컨트롤러 부분에서 다루게 됩니다.

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    rabbitTemplate.setRoutingKey(ROUTING_KEY);
    return rabbitTemplate;
}
  • connectionFactory(): RabbitMQ 서버와의 연결을 관리하는 ConnectionFactory를 통해 RabbitTemplate이 RabbitMQ와 연결됩니다.
  • setMessageConverter(): 메시지를 JSON 형식으로 변환하기 위해 Jackson 기반의 MessageConverter를 설정합니다. 메시지를 주고받을 때 날짜 타입을 LocalDateTime 으로 사용하기 위해 등록합니다.
  • setRoutingKey(): 기본적으로 사용할 라우팅 키를 설정합니다. 메시지를 전송할 때 위에서 설정한 라우팅 키가 사용됩니다.

2-3. ConnectionFactory 설정

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost(rabbitHost);
    factory.setVirtualHost(rabbitVirtualHost);
    factory.setPort(rabbitPort);
    factory.setUsername(rabbitUser);
    factory.setPassword(rabbitPassword);
    return factory;
}

ConnectionFactory는 RabbitMQ 서버와의 연결을 관리하는 역할을 합니다. Spring에서는 CachingConnectionFactory를 사용하여 연결을 재사용하고 관리합니다.

예시에선 application.yml 설정 파일에서 값을 읽어서 사용합니다.

2-4. SimpleRabbitListenerContainerFactory 설정

@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMessageConverter(jackson2JsonMessageConverter());
    return factory;
}

메시지를 비동기로 처리하기 위해 Spring에서는 SimpleRabbitListenerContainerFactory를 사용합니다. 이 팩토리는 @RabbitListener를 사용하는 메시지 리스너를 설정하고 관리합니다. 추후 컨트롤러에서 다루게 됩니다.

  • setConnectionFactory(): 앞서 정의한 ConnectionFactory를 사용하여 RabbitMQ 서버와의 연결을 설정합니다.
  • setMessageConverter(): 메시지 변환기로 Jackson을 설정하여 메시지를 JSON으로 처리합니다.

2-5. Jackson2JsonMessageConverter 설정

@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    objectMapper.registerModule(dateTimeModule());
    return new Jackson2JsonMessageConverter(objectMapper);
}

RabbitMQ에서 메시지를 송수신할 때, 객체를 JSON 형식으로 변환하거나 JSON을 객체로 변환할 수 있도록 Jackson2JsonMessageConverter를 설정합니다.

 

  • ObjectMapper 설정: Jackson의 ObjectMapper를 사용해 JSON 직렬화 및 역직렬화를 커스터마이징합니다. 특히, 날짜와 시간을 타임스탬프 대신 포맷된 문자열로 처리하도록 설정합니다.
  • registerModule(): LocalDateTime과 같은 클래스를 처리하기 위해 JavaTimeModule을 등록합니다.

2-6. LocalDateTime을 위한 모듈 설정

위에서 설명하듯이, RabbitMQ 에서 메시지를 주고 받을 때 LocalDateTime과 같은 클래스를 직렬화/역직렬화 처리하기 위해 JavaTimeModule()을 사용합니다.

@Bean
public Module dateTimeModule() {
    return new JavaTimeModule()
            .addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(FORMATTER))
            .addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(FORMATTER));
}
  • 예시에선 LocalDateTimeDeserializer 및 LocalDateTimeSerializer: 특정 포맷(yyyy-MM-dd'T'HH:mm:ss.SSS'Z')을 사용하여 LocalDateTime을 직렬화 및 역직렬화합니다.

 

이번 포스팅에선 스프링 부트 (3.x)에서 RabbitMQ 를 설정하는 방법에 대해 알아봤습니다.

 

추가로 RabbitMQ 에서 다루는 메시지에서 LocalDateTime 클래스를 처리하기 위해 ObjectMapper 를 등록하는 부분까지 다뤄봤는데요,

예시 프로젝트에선 메시지를 Json 으로 교환하기 때문에 설정한 부분이었습니다. Json 에서 날짜 타입을 직렬과/역직렬화 할 때 LocalDateTime 클래스를 이용해 다루기 위해서 설정한 부분이니 이 점 참고하시면 되겠습니다.

 

다음 포스팅에선 Docker 를 이용해 RabbitMQ 서버를 설치하는 방법과 어드민 페이지를 보는 방법에 대해 알아보겠습니다.

 

읽어주셔서 감사합니다.

 

'Message Broker' 카테고리의 다른 글

[RabbitMQ] 2. AMQP 란?  (0) 2024.08.16
[RabbitMQ] 1. 메시지 브로커(Message Broker)에 대해  (0) 2024.08.15