Notepad

스프링 부트 메시징

  • 메시지 연결을 활용해서 컴포넌트 사이의 결함도를 낮추도록 아키텍처를 고도화
  • 비동기 메시징은 동일한 애플리케이션 안에 있는 컴포넌트들 또는 네트워크를 통해 연결된 여러 마이크로서비스에 분산돼 있는 컴포넌트들을 연결하는 좋은 수단이 될 수 있음

학습 내용

스프링 부트에서 지원하는 다양한 메시징 솔루션
스프링 부트에서 직접 지원하지는 않지만 스프링 포트폴리오에서 지원하는 다양한 메시징 솔루션
AMQP(Advenced Message Queuing Protocol)를 자세히 알아보고, 스프링 AMQP와 프로젝트 리액터를 활용해 웹 계층과 백엔드의 결합 관계 해소

메시징 솔루션 선택

  • JMS(Java Messaging Service), 아파치 카프카, AMQP, 레디스, 젬파이어, 아파치 지오드 등 솔루션의 종류가 다양함
  • 솔루션들은 공통점도 많지만 저마다 다른 관심사에 최적화돼 있음
  • 책에서는 솔루션 별 다양한 케이스에 대해 설명하지 않고 메시징을 활용하고 리액티브 스트림 프로그래밍에 적절히 통합하는 방법만을 다룸

익숙한 패턴을 사용한 문제 해결

  • 자바의 복잡도 감소(Reducing Java Complexity)가 스프링 포트폴리오의 핵심 특징
    • 2008년 스프링 사용자 컨퍼런스에서 로드 존슨이 실제로 썼던 표현
  • 스프링의 설계 목표는 무엇보다도 애플리케이션 만드는 방법을 단순화하는 것
  • 이를 위한 강력한 수단 중 하나로 템플릿 패턴(Template Pattern)이 있음
    • 템플릿 패턴의 유용함에 GoF의 디자인 패턴 책에도 포함돼 있음
  • 템플릿이란?
    • 특정 API의 모든 복잡성을 가장 단순한 연산으로 축약하는 것을 의미
    • 대표적인 템플릿: JdbcTemplate
    • JDBC를 직접 사용 시 불편한 점: 개발자가 쿼리, DB 연결 관리를 모두 신경 써야 하고, 쿼리 작성 후 ResultSet을 닫지 않고 종료 처리하지 않았는지 하나하나 확인해야 함
    • JdbcTemplate을 활용하면 몇 가지 연산만 사용해서 데이터 조회와 수정을 처리할 수 있음
// JdbcTemplate 사용 예
int result = jdbcTemplate.queryForObject(
    "SELECT COUNT(*) FROM ITEM", Integer.class);
  • 비동기 메시징에 대한 Template
    • JMS: 자바 표준 메시징 API
      • JmsTemplate과 DefaultMessageListenerContainer를 제공
    • 아파치 카프카: 빠른 속도로 대세로 자리 잡고 있는 브로커
      • KafkaTemplate과 KafkaMessageListenerContainer를 제공
    • 래빗엠큐(RabbitMQ): 높은 처리량과 강한 회복력이 특징인 메시지 브로커
      • AmqpTemplate과 SimpleMessageListenerContainer를 제공
    • 레디스: 빠른 속도를 무기로 가장 널리 사용되는 브로커
      • RedisMessageListenerContainer를 제공

손쉬운 테스트

  • AMQP 브로커인 래빗엠큐를 사용하는 테스트를 중심으로 메시지 처리 방법 소개
  • 래빗엠큐 설치와 설정, 실행에 시간이 많이 소요됨으로 간단히 테스트컨테이너를 활용
  • 테스트컨테이너
    • 도커를 활용하는 자바 테스트 지원 라이브러리
    • 도커에서 실행될 수만 있다면, 어떤 데이터베이스나 메시지 브로커, 서드파티 시스템도 테스트용으로 쉽게 쓸 수 있음
    • 테스트가 종료되면 테스트에 사용했던 여러 컨테이너 자원도 남김없이 깔끔하게 종료됨
    • 테스트 시 클린 한 상태로 래빗엠큐를 실행하고 사용할 수 있음
  • 테스트컨테이너 버전 지정
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers-bom</artifactId>
            <version>1.15.2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • 래빗엠큐 테스트컨테이너 의존관계 추가
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>rabbitmq</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

테스트 컨테이너 사용 테스트

  • 래빗엠큐 테스트 컨테이너를 사용하는 테스트 작성
@SpringBootTest
@AutoConfigureWebTestClient
@Testcontainers
@ContextConfiguration
public class RabbitTest {

    @Container
    static RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine");

    @Autowired
    WebTestClient webTestClient;

    @Autowired
    ItemRepository repository;

    @DynamicPropertySource
    static void configure(DynamicPropertyRegistry registry) {
        registry.add("spring.rabbitmq.host", container::getContainerIpAddress);
        registry.add("spring.rabbitmq.port", container::getAmqpPort);
    }

    // .. AMQP 메시지 테스트 코드 참고 ..

}

테스트 케이스 구성

  • 웹 컨트롤러가 처리해야 할 일
    • 새 Item 객체를 생성하기 위해 Item 데이터가 담겨 있는 HTTP POST API 요청
    • Item 데이터를 적절한 메시지로 변환
    • Item 생성 메시지를 브로커에게 전송
  • AMQP 메시징 테스트

    @Test
    void verifyMessagingThroughAmqp() throws InterruptedException {
        this.webTestClient.post().uri("/items")
                .bodyValue(new Item("Alf alarm clock", "nothing important", 19.99))
                .exchange()
                .expectStatus().isCreated()
                .expectBody();

        Thread.sleep(1500L);

        this.webTestClient.post().uri("/items")
                .bodyValue(new Item("Smurf TV tray", "nothing important", 29.99))
                .exchange()
                .expectStatus().isCreated()
                .expectBody();

        Thread.sleep(2000L);

        this.repository.findAll()
                .as(StepVerifier::create)
                .expectNextMatches(item -> {
                    assertThat(item.getName()).isEqualTo("Alf alarm clock");
                    assertThat(item.getDescription()).isEqualTo("nothing important");
                    assertThat(item.getPrice()).isEqualTo(19.99);
                    return true;
                })
                .expectNextMatches(item -> {
                    assertThat(item.getName()).isEqualTo("Smurf TV tray");
                    assertThat(item.getDescription()).isEqualTo("nothing important");
                    assertThat(item.getPrice()).isEqualTo(29.99);
                    return true;
                })
                .verifyComplete();
    }
  • 아직 Item 데이터를 받아서 메시지로 변환하고 브로커에 보내서 몽고 디비에 저장하는 로직이 구현돼 있지 않음으로 테스트 실패함
    • AMQP 의존성 추가 필요
    • AMQP 메시징을 처리를 위한 리액티브 컨트롤러 추가 필요
    • 컨슈머 추가 필요
  • 스프링 AMQP 의존성 추가
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • AMQP 메시징을 처리할 수 있는 리액티브 컨트롤러 구성
@RestController
public class SpringAmqpItemController {

    private final AmqpTemplate template; // 래빗엠큐를 사용함으로 실제 구현체로는 RabbitTemplate 이 사용됨.

    public SpringAmqpItemController(AmqpTemplate template) {
        this.template = template;
    }

    // .. 리액티브 컨트롤러에서 AMQP 메시지 전송 코드 참고 ..

}
  • 리액티브 컨트롤러에서 AMQP 메시지 전송
    @PostMapping("/items")
    Mono<ResponseEntity<?>> addNewItemUsingSpringAmqp(@RequestBody Mono<Item> item) {
        return item
                .subscribeOn(Schedulers.boundedElastic())  // AmqpTemplate 이 블로킹 API를 호출하므로 subscribeOn()을 통해
                .flatMap(content -> {                      // 바운디드 엘라스틱 스케쥴러에서 관리하는 별도의 스레드에서 실행
                    return Mono
                            .fromCallable(() -> {
                                this.template.convertAndSend(// param1: exchange 이름, param2: 라우팅 키, param3: message
                                        "hacking-spring-boot", "new-items-spring-amqp", content); // Item 데이터를 라우팅 키와 함께 hacking-spring-boot 익스체인지로 전송
                                return ResponseEntity.created(URI.create("/items")).build();
                            });
                });
    }
  • 래빗엠큐는 블로킹 API를 호출함
    • 비동기 메시징 시스템이긴 하지만 많은 래빗엠큐 API는 작업 수행 중 현재 스레드를 블록함
    • 예제에서는 긴 시간 동안 블로킹하지 않으므로 큰 문제가 되지 않지만 이런 블로킹에 의해 발생하는 지연이 쌓이고 쌓이면 문제가 됨
    • 프로젝트 리액터에서는 이런 문제를 해결하기 위해 스케쥴러를 사용해서 해결

스케쥴러를 사용해서 블로킹 API 감싸기

  • 리액터의 API를 사용할 때 멀티스레드 프로그래밍을 반드시 활용해야 하는 것은 아님
    • 수십 개에서 수백 개의 스레드를 사용하는 것은 여러 문제를 일으키며 그다지 좋은 방법이라고 할 수 없음
  • 리액터는 스케줄러를 통해 개별 수행 단계가 어느 스레드에서 실행될지 지정할 수 있음
  • 한 개의 스레드만을 사용하면서도 비동기 논블로킹 코드를 작성할 수 있음
    • 한 개의 스레드가 작업을 수행할 수 있을 때(스레드가 시스템 자원의 가용성에 반응할 준비가 돼 있을 때) 개별 수행 단계를 실행하는 방식을 사용하면 가능
    • 순서
      • 하나의 작업 단계가 완료
        • 스레드는 리액터의 작업 코디네이터에게 반환
          • 어떤 작업 단계를 실행할지 결정
    • 모든 작업이 이처럼 개별 단계가 완료될 때마다 스케쥴러에게 스레드를 반환하는 패러다임으로 수행될 수 있다면, 스레드의 숫자는 전통적인 멀티스레드 프로그래밍에서 만큼 중요하지 않게 됨
  • 스케쥴러
    • Schedulers.immediate(): 현재 스레드
    • Schedulers.single: 재사용 가능한 하나의 스레드. 현재 수행 중인 리액터 플로우뿐만 아니라 호출되는 모든 작업이 동일한 하나의 스레드에서 실행됨
    • Schedulers.newSingle(): 새로 생성한 전용 스레드
    • Schedulers.boundedElastic(): 작업량에 따라 스레드 숫자가 늘어나거나 줄어드는 신축성 있는 스레드 풀
    • Schedulers.parallel(): 병렬 작업에 적합하도록 최적화된 고정 크기 워커 스레드 풀
    • Schedulers.fromExecutorService(): ExecuterService 인스턴스를 감싸서 재사용
  • single(), newSingle(), parallel()은 논블로킹 작업에 사용되는 스레드를 생성
  • 리액터 플로우에서 스케쥴러를 변경하는 방법 두 가지
    • publishOn()
      • 호출되는 시점 이후로는 지정한 스케쥴러를 사용함
      • 이 방법을 사용하면 사용하는 스케쥴러를 여러 번 바꿀 수도 있음
    • subscribeOn()
      • 플로우 전 단계에 걸쳐 사용되는 스케쥴러를 지정
      • 플로우 전체에 영향을 미치므로 publishOn()에 비해 영향 범위가 더 넓음

컨슈머 작성

  • 스프링 AMQP에는 컨슈머를 만들 수 있는 여러 방법이 준비되어 있음
  • 가장 단순한 방식은 AmqpTemplate.receive(queueName) 이지만 가장 좋은 방식이라고 할 수 없음
    • 부하가 많은 상황에서는 적합하지 않음
    • 더 많은 메시지를 폴링 방식으로 처리할 수도 있고 콜백을 등록해서 처리할 수도 있지만 @Rabbitlistener를 사용하는 것이 가장 유현하고 편리함
  • 리액티브 방식으로 AMQP 메시지 사용
@Slf4j
@Service
public class SpringAmqpItemService {

    private final ItemRepository repository;

    public SpringAmqpItemService(ItemRepository repository) {
        this.repository = repository;
    }

    // .. 래빗엠큐 메시지 리스너 등록 코드 참고 ..

}
  • 래빗엠큐 메시지 리스너 등록
    @RabbitListener( // 스프링 AMQP 메시지 리스너로 등록하여 메시지를 컨슘
            ackMode = "MANUAL",
            bindings = @QueueBinding( // 큐를 익스체인지에 바인딩하는 방법을 지정
                    value = @Queue,   // @Queue 임의의 지속성 없는 익명 큐를 생성
                    exchange = @Exchange("hacking-spring-boot"), // 이 큐와 연결될 익스체인지를 지정
                    key = "new-items-spring-amqp")) // 라우팅 키를 지정
    public Mono<Void> processNewItemsViaSpringAmqp(Item item) { // @RabbitListener에서 지정한 내용에 맞는 메시지가 들어오면 실행되며, 메시지가 item 변수를 통해 전달됨
        log.debug("Consuming => " + item);
        return this.repository.save(item).then();
    }
  • 익명 큐와 이름 있는 큐의 차이
    • 동일한 메시지를 여러 컨슈머가 사용해야 하는 상황에서는 용도에 맞게 설정하는 것이 중요
    • (이름 있는 큐의 경우) 만약 2개의 컨슈머가 동일한 큐를 사용하도록 설정되면 하나의 메시지는 두 컨슈머 중 하나의 컨슈머만 접근해서 사용할 수 있음
      • 하나의 큐에 있는 메시지는 하나의 클라이언트에 의해서만 소비될 수 있음
    • (익명 큐의 경우) 동일한 라우팅 키를 사용하는 하나의 익스체인지에 2개의 컨슈머가 연결돼 있지만 각자 다른 큐를 사용한다면, 하나의 메시지가 다른 큐에 복제되므로 메시지 발행자 쪽을 변경하지 않고도 2개의 컨슈머가 모두 해당 메시지를 사용할 수 있음
  • 스프링 AMQP를 사용하면 자바의 Serializable 인터페이스를 사용해서 직렬화를 처리할 수 있음
    • 적용은 그리 어렵지 않지만 최선의 방법이라고 할 수 없음
      • 역직렬화가 자바에 포함돼 있는 여러 보안 검사를 우회함
      • 다양한 보안 공격에 활용됨
      • 잭슨 사용으로 성능 저하가 발생한다는 확실한 벤치마크 결과가 나오지 않는 한 일반적으로 Serializable 보다 잭슨 사용을 추천함
  • 다른 대안으로 POJO 객체를 JSON 같은 문자열 표현으로 변환하고 문자열을 바이트 배열로 만들어서 네트워크를 통해 전송하는 방법이 있음
    • 스프링에서 JSON 직렬화를 담당하는 잭슨(Jackson) 라이브러리를 사용하는 방법은 아주 간단함
      • 아래 JSON 기반 메시지 직렬화 설정과 같이 빈을 하나 등록해주면 됨
  • JSON 기반 메시지 직렬화 설정
    @Bean
    Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

정리

  • 테스트 컨테이너, 래빗엠큐, 스프링 AMQP 설정
  • 웹과 백엔드가 예상대로 동작하는지 검증하는 테스트 작성
  • 동기적 웹 요청을 받아서 처리하는 웹 플럭스 컨트롤러 작성
  • 블로킹 API 호출부를 감싸서 리액터의 엘라스틱 스레드 풀에서 실행
  • RabbitTemplate을 사용해서 비동기 메시지 브로커를 통해 메시지 전송
  • @RabbitListener를 사용해서 래빗엠큐 리스너를 설정하고, 전송받은 메시지 소비

참고

profile

Notepad

@Apio

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!