스프링 부트 메시징
- 메시지 연결을 활용해서 컴포넌트 사이의 결함도를 낮추도록 아키텍처를 고도화
- 비동기 메시징은 동일한 애플리케이션 안에 있는 컴포넌트들 또는 네트워크를 통해 연결된 여러 마이크로서비스에 분산돼 있는 컴포넌트들을 연결하는 좋은 수단이 될 수 있음
학습 내용
스프링 부트에서 지원하는 다양한 메시징 솔루션
스프링 부트에서 직접 지원하지는 않지만 스프링 포트폴리오에서 지원하는 다양한 메시징 솔루션
AMQP(Advenced Message Queuing Protocol)를 자세히 알아보고, 스프링 AMQP와 프로젝트 리액터를 활용해 웹 계층과 백엔드의 결합 관계 해소
메시징 솔루션 선택
- JMS(Java Messaging Service), 아파치 카프카, AMQP, 레디스, 젬파이어, 아파치 지오드 등 솔루션의 종류가 다양함
- 솔루션들은 공통점도 많지만 저마다 다른 관심사에 최적화돼 있음
- 책에서는 솔루션 별 다양한 케이스에 대해 설명하지 않고 메시징을 활용하고 리액티브 스트림 프로그래밍에 적절히 통합하는 방법만을 다룸
익숙한 패턴을 사용한 문제 해결
- 자바의 복잡도 감소(Reducing Java Complexity)가 스프링 포트폴리오의 핵심 특징
- 2008년 스프링 사용자 컨퍼런스에서 로드 존슨이 실제로 썼던 표현
- 스프링의 설계 목표는 무엇보다도 애플리케이션 만드는 방법을 단순화하는 것
- 이를 위한 강력한 수단 중 하나로 템플릿 패턴(Template Pattern)이 있음
- 템플릿 패턴의 유용함에 GoF의 디자인 패턴 책에도 포함돼 있음
- 템플릿이란?
- 특정 API의 모든 복잡성을 가장 단순한 연산으로 축약하는 것을 의미
- 대표적인 템플릿: JdbcTemplate
- JDBC를 직접 사용 시 불편한 점: 개발자가 쿼리, DB 연결 관리를 모두 신경 써야 하고, 쿼리 작성 후 ResultSet을 닫지 않고 종료 처리하지 않았는지 하나하나 확인해야 함
- JdbcTemplate을 활용하면 몇 가지 연산만 사용해서 데이터 조회와 수정을 처리할 수 있음
// JdbcTemplate 사용 예
int result = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM ITEM", Integer.class);
- 비동기 메시징에 대한 Template
- JMS: 자바 표준 메시징 API
- JmsTemplate과 DefaultMessageListenerContainer를 제공
- 아파치 카프카: 빠른 속도로 대세로 자리 잡고 있는 브로커
- KafkaTemplate과 KafkaMessageListenerContainer를 제공
- 래빗엠큐(RabbitMQ): 높은 처리량과 강한 회복력이 특징인 메시지 브로커
- AmqpTemplate과 SimpleMessageListenerContainer를 제공
- 레디스: 빠른 속도를 무기로 가장 널리 사용되는 브로커
- RedisMessageListenerContainer를 제공
- JMS: 자바 표준 메시징 API
손쉬운 테스트
- AMQP 브로커인 래빗엠큐를 사용하는 테스트를 중심으로 메시지 처리 방법 소개
- 래빗엠큐 설치와 설정, 실행에 시간이 많이 소요됨으로 간단히 테스트컨테이너를 활용
- 테스트컨테이너
- 도커를 활용하는 자바 테스트 지원 라이브러리
- 도커에서 실행될 수만 있다면, 어떤 데이터베이스나 메시지 브로커, 서드파티 시스템도 테스트용으로 쉽게 쓸 수 있음
- 테스트가 종료되면 테스트에 사용했던 여러 컨테이너 자원도 남김없이 깔끔하게 종료됨
- 테스트 시 클린 한 상태로 래빗엠큐를 실행하고 사용할 수 있음
- 테스트컨테이너 버전 지정
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.15.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- 래빗엠큐 테스트컨테이너 의존관계 추가
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
테스트 컨테이너 사용 테스트
- 래빗엠큐 테스트 컨테이너를 사용하는 테스트 작성
@SpringBootTest
@AutoConfigureWebTestClient
@Testcontainers
@ContextConfiguration
public class RabbitTest {
@Container
static RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine");
@Autowired
WebTestClient webTestClient;
@Autowired
ItemRepository repository;
@DynamicPropertySource
static void configure(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", container::getContainerIpAddress);
registry.add("spring.rabbitmq.port", container::getAmqpPort);
}
// .. AMQP 메시지 테스트 코드 참고 ..
}
테스트 케이스 구성
- 웹 컨트롤러가 처리해야 할 일
- 새 Item 객체를 생성하기 위해 Item 데이터가 담겨 있는 HTTP POST API 요청
- Item 데이터를 적절한 메시지로 변환
- Item 생성 메시지를 브로커에게 전송
- AMQP 메시징 테스트
@Test
void verifyMessagingThroughAmqp() throws InterruptedException {
this.webTestClient.post().uri("/items")
.bodyValue(new Item("Alf alarm clock", "nothing important", 19.99))
.exchange()
.expectStatus().isCreated()
.expectBody();
Thread.sleep(1500L);
this.webTestClient.post().uri("/items")
.bodyValue(new Item("Smurf TV tray", "nothing important", 29.99))
.exchange()
.expectStatus().isCreated()
.expectBody();
Thread.sleep(2000L);
this.repository.findAll()
.as(StepVerifier::create)
.expectNextMatches(item -> {
assertThat(item.getName()).isEqualTo("Alf alarm clock");
assertThat(item.getDescription()).isEqualTo("nothing important");
assertThat(item.getPrice()).isEqualTo(19.99);
return true;
})
.expectNextMatches(item -> {
assertThat(item.getName()).isEqualTo("Smurf TV tray");
assertThat(item.getDescription()).isEqualTo("nothing important");
assertThat(item.getPrice()).isEqualTo(29.99);
return true;
})
.verifyComplete();
}
- 아직 Item 데이터를 받아서 메시지로 변환하고 브로커에 보내서 몽고 디비에 저장하는 로직이 구현돼 있지 않음으로 테스트 실패함
- AMQP 의존성 추가 필요
- AMQP 메시징을 처리를 위한 리액티브 컨트롤러 추가 필요
- 컨슈머 추가 필요
- 스프링 AMQP 의존성 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- AMQP 메시징을 처리할 수 있는 리액티브 컨트롤러 구성
@RestController
public class SpringAmqpItemController {
private final AmqpTemplate template; // 래빗엠큐를 사용함으로 실제 구현체로는 RabbitTemplate 이 사용됨.
public SpringAmqpItemController(AmqpTemplate template) {
this.template = template;
}
// .. 리액티브 컨트롤러에서 AMQP 메시지 전송 코드 참고 ..
}
- 리액티브 컨트롤러에서 AMQP 메시지 전송
@PostMapping("/items")
Mono<ResponseEntity<?>> addNewItemUsingSpringAmqp(@RequestBody Mono<Item> item) {
return item
.subscribeOn(Schedulers.boundedElastic()) // AmqpTemplate 이 블로킹 API를 호출하므로 subscribeOn()을 통해
.flatMap(content -> { // 바운디드 엘라스틱 스케쥴러에서 관리하는 별도의 스레드에서 실행
return Mono
.fromCallable(() -> {
this.template.convertAndSend(// param1: exchange 이름, param2: 라우팅 키, param3: message
"hacking-spring-boot", "new-items-spring-amqp", content); // Item 데이터를 라우팅 키와 함께 hacking-spring-boot 익스체인지로 전송
return ResponseEntity.created(URI.create("/items")).build();
});
});
}
- 래빗엠큐는 블로킹 API를 호출함
- 비동기 메시징 시스템이긴 하지만 많은 래빗엠큐 API는 작업 수행 중 현재 스레드를 블록함
- 예제에서는 긴 시간 동안 블로킹하지 않으므로 큰 문제가 되지 않지만 이런 블로킹에 의해 발생하는 지연이 쌓이고 쌓이면 문제가 됨
- 프로젝트 리액터에서는 이런 문제를 해결하기 위해 스케쥴러를 사용해서 해결
스케쥴러를 사용해서 블로킹 API 감싸기
- 리액터의 API를 사용할 때 멀티스레드 프로그래밍을 반드시 활용해야 하는 것은 아님
- 수십 개에서 수백 개의 스레드를 사용하는 것은 여러 문제를 일으키며 그다지 좋은 방법이라고 할 수 없음
- 리액터는 스케줄러를 통해 개별 수행 단계가 어느 스레드에서 실행될지 지정할 수 있음
- 한 개의 스레드만을 사용하면서도 비동기 논블로킹 코드를 작성할 수 있음
- 한 개의 스레드가 작업을 수행할 수 있을 때(스레드가 시스템 자원의 가용성에 반응할 준비가 돼 있을 때) 개별 수행 단계를 실행하는 방식을 사용하면 가능
- 순서
- 하나의 작업 단계가 완료
- 스레드는 리액터의 작업 코디네이터에게 반환
- 어떤 작업 단계를 실행할지 결정
- 스레드는 리액터의 작업 코디네이터에게 반환
- 하나의 작업 단계가 완료
- 모든 작업이 이처럼 개별 단계가 완료될 때마다 스케쥴러에게 스레드를 반환하는 패러다임으로 수행될 수 있다면, 스레드의 숫자는 전통적인 멀티스레드 프로그래밍에서 만큼 중요하지 않게 됨
- 스케쥴러
Schedulers.immediate(): 현재 스레드Schedulers.single: 재사용 가능한 하나의 스레드. 현재 수행 중인 리액터 플로우뿐만 아니라 호출되는 모든 작업이 동일한 하나의 스레드에서 실행됨Schedulers.newSingle(): 새로 생성한 전용 스레드Schedulers.boundedElastic(): 작업량에 따라 스레드 숫자가 늘어나거나 줄어드는 신축성 있는 스레드 풀Schedulers.parallel(): 병렬 작업에 적합하도록 최적화된 고정 크기 워커 스레드 풀Schedulers.fromExecutorService():ExecuterService인스턴스를 감싸서 재사용
single(),newSingle(),parallel()은 논블로킹 작업에 사용되는 스레드를 생성- 리액터 플로우에서 스케쥴러를 변경하는 방법 두 가지
publishOn()- 호출되는 시점 이후로는 지정한 스케쥴러를 사용함
- 이 방법을 사용하면 사용하는 스케쥴러를 여러 번 바꿀 수도 있음
subscribeOn()- 플로우 전 단계에 걸쳐 사용되는 스케쥴러를 지정
- 플로우 전체에 영향을 미치므로
publishOn()에 비해 영향 범위가 더 넓음
컨슈머 작성
- 스프링 AMQP에는 컨슈머를 만들 수 있는 여러 방법이 준비되어 있음
- 가장 단순한 방식은
AmqpTemplate.receive(queueName)이지만 가장 좋은 방식이라고 할 수 없음- 부하가 많은 상황에서는 적합하지 않음
- 더 많은 메시지를 폴링 방식으로 처리할 수도 있고 콜백을 등록해서 처리할 수도 있지만
@Rabbitlistener를 사용하는 것이 가장 유현하고 편리함
- 리액티브 방식으로 AMQP 메시지 사용
@Slf4j
@Service
public class SpringAmqpItemService {
private final ItemRepository repository;
public SpringAmqpItemService(ItemRepository repository) {
this.repository = repository;
}
// .. 래빗엠큐 메시지 리스너 등록 코드 참고 ..
}
- 래빗엠큐 메시지 리스너 등록
@RabbitListener( // 스프링 AMQP 메시지 리스너로 등록하여 메시지를 컨슘
ackMode = "MANUAL",
bindings = @QueueBinding( // 큐를 익스체인지에 바인딩하는 방법을 지정
value = @Queue, // @Queue 임의의 지속성 없는 익명 큐를 생성
exchange = @Exchange("hacking-spring-boot"), // 이 큐와 연결될 익스체인지를 지정
key = "new-items-spring-amqp")) // 라우팅 키를 지정
public Mono<Void> processNewItemsViaSpringAmqp(Item item) { // @RabbitListener에서 지정한 내용에 맞는 메시지가 들어오면 실행되며, 메시지가 item 변수를 통해 전달됨
log.debug("Consuming => " + item);
return this.repository.save(item).then();
}
- 익명 큐와 이름 있는 큐의 차이
- 동일한 메시지를 여러 컨슈머가 사용해야 하는 상황에서는 용도에 맞게 설정하는 것이 중요
- (이름 있는 큐의 경우) 만약 2개의 컨슈머가 동일한 큐를 사용하도록 설정되면 하나의 메시지는 두 컨슈머 중 하나의 컨슈머만 접근해서 사용할 수 있음
- 하나의 큐에 있는 메시지는 하나의 클라이언트에 의해서만 소비될 수 있음
- (익명 큐의 경우) 동일한 라우팅 키를 사용하는 하나의 익스체인지에 2개의 컨슈머가 연결돼 있지만 각자 다른 큐를 사용한다면, 하나의 메시지가 다른 큐에 복제되므로 메시지 발행자 쪽을 변경하지 않고도 2개의 컨슈머가 모두 해당 메시지를 사용할 수 있음
- 스프링 AMQP를 사용하면 자바의
Serializable인터페이스를 사용해서 직렬화를 처리할 수 있음- 적용은 그리 어렵지 않지만 최선의 방법이라고 할 수 없음
- 역직렬화가 자바에 포함돼 있는 여러 보안 검사를 우회함
- 다양한 보안 공격에 활용됨
- 잭슨 사용으로 성능 저하가 발생한다는 확실한 벤치마크 결과가 나오지 않는 한 일반적으로
Serializable보다 잭슨 사용을 추천함
- 적용은 그리 어렵지 않지만 최선의 방법이라고 할 수 없음
- 다른 대안으로 POJO 객체를 JSON 같은 문자열 표현으로 변환하고 문자열을 바이트 배열로 만들어서 네트워크를 통해 전송하는 방법이 있음
- 스프링에서 JSON 직렬화를 담당하는 잭슨(Jackson) 라이브러리를 사용하는 방법은 아주 간단함
- 아래 JSON 기반 메시지 직렬화 설정과 같이 빈을 하나 등록해주면 됨
- 스프링에서 JSON 직렬화를 담당하는 잭슨(Jackson) 라이브러리를 사용하는 방법은 아주 간단함
- JSON 기반 메시지 직렬화 설정
@Bean
Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
정리
- 테스트 컨테이너, 래빗엠큐, 스프링 AMQP 설정
- 웹과 백엔드가 예상대로 동작하는지 검증하는 테스트 작성
- 동기적 웹 요청을 받아서 처리하는 웹 플럭스 컨트롤러 작성
- 블로킹 API 호출부를 감싸서 리액터의 엘라스틱 스레드 풀에서 실행
RabbitTemplate을 사용해서 비동기 메시지 브로커를 통해 메시지 전송@RabbitListener를 사용해서 래빗엠큐 리스너를 설정하고, 전송받은 메시지 소비
참고
- "스프링 부트 실전 활용 마스터", 그렉 턴키스트 저(원서: "Hacking with Spring Boot 2.3 - Reactive Edition")
- 소스 코드
'dev > Spring' 카테고리의 다른 글
| [정리] 스프링 부트 실전 활용 마스터(9) (0) | 2022.08.11 |
|---|---|
| [정리] 스프링 부트 실전 활용 마스터(8) (0) | 2022.08.04 |
| [정리] 스프링 부트 실전 활용 마스터(6) (0) | 2022.06.30 |
| [정리] 스프링 부트 실전 활용 마스터(5) (0) | 2022.06.23 |
| [정리] 스프링 부트 실전 활용 마스터(4) (0) | 2022.06.16 |