7장. Processor API
7장에서는 Processor API(PAPI라고도 부름)라는 카프카 스트림즈의 하위-수준 API에 대해 설명
Processor API
- 상위-수준 DSL보다 적은 추상화를 갖고 있으며 명령형 프로그래밍 방식을 사용
- 코드는 좀 더 길어지지만 토폴로지에서의 데이터 흐름, 스트림 프로세스들의 관계, 상태의 생성과 유지보수, 특정 연산의 수행 시간과 같은 특징들을 세세하게 제어할 수 있어 상위-수준 DSL보다 훨씬 강력함
언제 Processor API를 사용해야 하나?
일반적으로 아래 특징들 때문에 사용
- 레코드의 메타데이터(토픽, 파티션, 오프셋 정보, 레코드 헤더 등) 접근
- 주기적인 함수를 스케쥴링하는 기능
- 레코드를 하위 스트림 프로세서로 넘길 때 사용 가능한 세세한 제어
- 상태 저장소에 대한 좀 더 세분화된 접근
- DSL을 사용할 때 마주칠 수 있는 제약들을 뛰어넘을 수 있는 기능
단점
- 장황한 코드로 유지 관리에 더 많은 비용이 들어가며 가독성이 떨어짐
- 다른 프로젝트 관리자가 진입하기 어려운 높은 장벽이 생길 수 있음
- DSL 특징 또는 추상화를 우발적으로 재창조하거나, 이상한 문제-구조화와 성능 함적과 같은 문제를 일으킬 가능성이 높아짐
그래서?
- 카프카 스트림즈에서는 DSL과 Processor API를 혼합해 사용할 수 있으므로 어느 한쪽만 사용할 필요는 없음
- 단순한 표준 연산들이 필요하면 DSL 사용
- 복잡한 연산과 프로세스 컨텍스트, 상태 또는 레코드의 메타 데이터를 하쉬-수준으로 접근할 필요가 있는 특별한 메서드에서는 Processor API 사용
Processor API를 사용해 어떻게 소스, 싱크 그리고 스트림 프로세서들을 추가할 수 있나?
public static Topology getTopology() {
// 토폴로지 인스턴스를 직접 인스턴스화
Topology builder = new Topology();
// #1
// "desired-state-events" 토픽으로부터 데이터를 컨슘하는 "Desired State Events"라는 소스 프로세서 생성
// DSL의 스트림과 동일
builder.addSource(
"Desired State Events", // name
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"desired-state-events"); // topic
// #2
// "reported-state-events" 토픽으로부터 데이터를 컨슘하는 "Reported State Events"라는 소스 프로세서 생성
// DSL의 스트림과 동일
builder.addSource(
"Reported State Events", // name
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"reported-state-events"); // topic
// #3
// "High Winds Flatmap Processor" 스트림 프로세서 추가
// "Reported State Events" 프로세서로부터 이벤트를 수신
// 이 프로세서는 입력과 출력 레코드의 수의 관계가 1:N이므로 DSL의 flatMap 연산과 같음
builder.addProcessor(
"High Winds Flatmap Processor", // name
HighWindsFlatmapProcessor::new, // process supplier
"Reported State Events"); // parent
// #4
// "High Winds Flatmap Processor" 와 "Desired State Events" 양쪽에서 내보낸 데이터를 이용해 레코드를 생성하는 "Digital Twin Processor"라는 스트림 프로세서
// 여러 소스로부터 데이터를 가져오므로 DSL의 merge 연산과 동일
// 이 프로세서는 상태가 있으므로 DSL의 집계 테이블과 동일
builder.addProcessor(
"Digital Twin Processor", // name
DigitalTwinProcessor::new, // process supplier
"High Winds Flatmap Processor", // parents
"Desired State Events");
// #5
// "Digital Twin Processor" 에서 접근 가능한 영구적인 키-값 저장소를 구축할 때 사용할 수 있는 "StoreBuilder" 생성을 위해 "Stores" 팩토리 클래스 이용
StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("digital-twin-store"),
Serdes.String(),
JsonSerdes.DigitalTwin());
// #6
// 토폴로지에 상태 저장소를 추가하고 이를 "Digital Twin Processor" 노드에 연결
builder.addStateStore(storeBuilder, "Digital Twin Processor");
// #7
// 모든 "Digital Twin Processor" 노드에서 내보낸 디지털 트윈 레코드를 "digital-twins"라는 출력 토픽으로 쓰는 "Digital Twin Sink"라는 이름의 싱크 프로세서 생성
builder.addSink(
"Digital Twin Sink", // name
"digital-twins", // topic
Serdes.String().serializer(),
JsonSerdes.DigitalTwin().serializer(),
"Digital Twin Processor"); // parent
return builder;
}주기적인 함수를 어떻게 스케쥴링하나?
ProcessorContext#schedule 메서드로 수행할 테스크를 쉽게 스케쥴링 가능.
카프카 스트림즈에서 사용 가능한 구두점 타입
- 스트림 시간
- 벽시계 시간
public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
private static final Logger log = LoggerFactory.getLogger(DigitalTwinProcessor.class);
private ProcessorContext<String, DigitalTwin> context;
private KeyValueStore<String, DigitalTwin> kvStore;
private Cancellable punctuator;
public void init(ProcessorContext<String, DigitalTwin> context) {
// ... 생략 ...
// 벽시계 시간 기준으로 매 5분마다 주기적인 함수를 실행하도록 스케쥴링하고, 반환된 Cancellable 객체를 반환
punctuator =
this.context.schedule(
Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
// ... 생략 ...
}
// ... 생략 ...
public void enforceTtl(Long timestamp) {
try (KeyValueIterator<String, DigitalTwin> iter = kvStore.all()) {
while (iter.hasNext()) {
KeyValue<String, DigitalTwin> entry = iter.next();
log.info("Checking to see if digital twin record has expired: {}", entry.key);
TurbineState lastReportedState = entry.value.getReported();
if (lastReportedState == null) {
continue;
}
Instant lastUpdated = Instant.parse(lastReportedState.getTimestamp());
long daysSinceLastUpdate = Duration.between(lastUpdated, Instant.now()).toDays();
// 오래된 레코드를 상태 저장소에서 삭제
if (daysSinceLastUpdate >= 7) {
kvStore.delete(entry.key);
}
}
}
}
}상위-수준 DSL과 Processor API를 같이 사용할 수 있는가?
하이브리드 DSL + Processor API 구현의 이점
- 노드 이름과 부모 이름으로 관계를 정의하는 대신 연산자들을 연결하는 것이 데이터 플로우 구현할 때 더 쉬움
- DSL에서는 대부분의 연산자에서 람다를 지원하여, 간결한 표현으로 변환 할 때 유리함
- Processor API에서 키 재성성이 필요하다면 이를 구현하는데 코드가 엄청 길어짐
- DSL 연산자들은 어떤 스트림 처리 단계에서 벌어지는 일을 정의하는 표준 용어를 제공(예: flatMap)
- DSL은 다른 형태의 스트림 타입을 위한 공통 용어를 제공
// ... 생략 ...
// DSL way of building a topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, TurbineState> desiredStateEvents =
builder.stream(
"desired-state-events", Consumed.with(Serdes.String(), JsonSerdes.TurbineState()));
KStream<String, TurbineState> highWinds =
builder
.stream(
"reported-state-events", Consumed.with(Serdes.String(), JsonSerdes.TurbineState()))
// generate shutdown signals
.flatMapValues(
(key, reported) -> {
List<TurbineState> records = new ArrayList<>();
records.add(reported);
if (reported.getWindSpeedMph() > 65 && reported.getPower() == Power.ON) {
log.info("high winds detected. sending shutdown signal");
TurbineState desired = TurbineState.clone(reported);
desired.setPower(Power.OFF);
desired.setType(Type.DESIRED);
// forward the new desired state
records.add(desired);
}
return records;
})
.merge(desiredStateEvents);
StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("digital-twin-store"),
Serdes.String(),
JsonSerdes.DigitalTwin());
builder.addStateStore(storeBuilder);
highWinds
.transformValues(DigitalTwinValueTransformerWithKey::new, "digital-twin-store")
// sink processor
.to("digital-twins", Produced.with(Serdes.String(), JsonSerdes.DigitalTwin()));
// ... 생략 ... 프로세서와 트랜스포머의 차이는 무엇인가?
- DSL은 상태 저장소, 레코드 메타데이터 그리고 프로세서 컨텍스트를 하위-수준으로 접근할 필요가 있을 때마다 Processor API를 사용할 수 있게 해주는 특별한 연산자들을 포함하고 있음
- 어떤 경우에 사용할지는 처리해야 할 작업의 복잡성과 상태 유지의 필요성에 따라 달라짐
- 프로세서는 간단한 데이터 변환 작업에 사용되는 상태를 유지하지 않는 구성 요소이며, 트랜스포머는 복잡한 데이터 변환 및 상태 기반 작업에 사용되는 상태를 유지하는 구성 요소
프로세서
- 말단 연산이고 처리 로직은 Processor 인터페이스를 사용해 구현해야 함
- DSL에서 Processor API가 필요할 때마다 사용해야함
- 하위 스트림 연산자를 연결할 필요는 없음
트랜스포머
- 프로세서와 비교해 좀 더 많은 변형이 있으며, 하나 이상의 레코드를 변환할 수 있음
참고
'dev > 기타' 카테고리의 다른 글
| [정리] 핵심만 배우는 git (0) | 2023.08.27 |
|---|---|
| port 오픈 확인 (0) | 2023.07.21 |
| [정리] 실전 카프카 개발부터 운영까지(5) (0) | 2022.10.20 |
| [정리] 실전 카프카 개발부터 운영까지(3) (1) | 2022.10.04 |
| [정리] 실전 카프카 개발부터 운영까지(1) (0) | 2022.07.19 |