Notepad

7장. Processor API

7장에서는 Processor API(PAPI라고도 부름)라는 카프카 스트림즈의 하위-수준 API에 대해 설명

Processor API

  • 상위-수준 DSL보다 적은 추상화를 갖고 있으며 명령형 프로그래밍 방식을 사용
  • 코드는 좀 더 길어지지만 토폴로지에서의 데이터 흐름, 스트림 프로세스들의 관계, 상태의 생성과 유지보수, 특정 연산의 수행 시간과 같은 특징들을 세세하게 제어할 수 있어 상위-수준 DSL보다 훨씬 강력함

언제 Processor API를 사용해야 하나?

일반적으로 아래 특징들 때문에 사용

  • 레코드의 메타데이터(토픽, 파티션, 오프셋 정보, 레코드 헤더 등) 접근
  • 주기적인 함수를 스케쥴링하는 기능
  • 레코드를 하위 스트림 프로세서로 넘길 때 사용 가능한 세세한 제어
  • 상태 저장소에 대한 좀 더 세분화된 접근
  • DSL을 사용할 때 마주칠 수 있는 제약들을 뛰어넘을 수 있는 기능

단점

  • 장황한 코드로 유지 관리에 더 많은 비용이 들어가며 가독성이 떨어짐
  • 다른 프로젝트 관리자가 진입하기 어려운 높은 장벽이 생길 수 있음
  • DSL 특징 또는 추상화를 우발적으로 재창조하거나, 이상한 문제-구조화와 성능 함적과 같은 문제를 일으킬 가능성이 높아짐

그래서?

  • 카프카 스트림즈에서는 DSL과 Processor API를 혼합해 사용할 수 있으므로 어느 한쪽만 사용할 필요는 없음
  • 단순한 표준 연산들이 필요하면 DSL 사용
  • 복잡한 연산과 프로세스 컨텍스트, 상태 또는 레코드의 메타 데이터를 하쉬-수준으로 접근할 필요가 있는 특별한 메서드에서는 Processor API 사용

Processor API를 사용해 어떻게 소스, 싱크 그리고 스트림 프로세서들을 추가할 수 있나?

  public static Topology getTopology() {
    // 토폴로지 인스턴스를 직접 인스턴스화
    Topology builder = new Topology();

    // #1
    // "desired-state-events" 토픽으로부터 데이터를 컨슘하는 "Desired State Events"라는 소스 프로세서 생성
    // DSL의 스트림과 동일
    builder.addSource(
        "Desired State Events", // name
        Serdes.String().deserializer(),
        JsonSerdes.TurbineState().deserializer(),
        "desired-state-events"); // topic

    // #2
    // "reported-state-events" 토픽으로부터 데이터를 컨슘하는 "Reported State Events"라는 소스 프로세서 생성
    // DSL의 스트림과 동일
    builder.addSource(
        "Reported State Events", // name
        Serdes.String().deserializer(),
        JsonSerdes.TurbineState().deserializer(),
        "reported-state-events"); // topic

    // #3
    // "High Winds Flatmap Processor" 스트림 프로세서 추가
    // "Reported State Events" 프로세서로부터 이벤트를 수신
    // 이 프로세서는 입력과 출력 레코드의 수의 관계가 1:N이므로 DSL의 flatMap 연산과 같음
    builder.addProcessor(
        "High Winds Flatmap Processor", // name
        HighWindsFlatmapProcessor::new, // process supplier
        "Reported State Events"); // parent

    // #4
    // "High Winds Flatmap Processor" 와 "Desired State Events" 양쪽에서 내보낸 데이터를 이용해 레코드를 생성하는 "Digital Twin Processor"라는 스트림 프로세서
    // 여러 소스로부터 데이터를 가져오므로 DSL의 merge 연산과 동일
    // 이 프로세서는 상태가 있으므로 DSL의 집계 테이블과 동일
    builder.addProcessor(
        "Digital Twin Processor", // name
        DigitalTwinProcessor::new, // process supplier
        "High Winds Flatmap Processor", // parents
        "Desired State Events");

    // #5
    // "Digital Twin Processor" 에서 접근 가능한 영구적인 키-값 저장소를 구축할 때 사용할 수 있는 "StoreBuilder" 생성을 위해 "Stores" 팩토리 클래스 이용
    StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("digital-twin-store"),
            Serdes.String(),
            JsonSerdes.DigitalTwin());

    // #6
    // 토폴로지에 상태 저장소를 추가하고 이를 "Digital Twin Processor" 노드에 연결
    builder.addStateStore(storeBuilder, "Digital Twin Processor");

    // #7
    // 모든 "Digital Twin Processor" 노드에서 내보낸 디지털 트윈 레코드를 "digital-twins"라는 출력 토픽으로 쓰는 "Digital Twin Sink"라는 이름의 싱크 프로세서 생성
    builder.addSink(
        "Digital Twin Sink", // name
        "digital-twins", // topic
        Serdes.String().serializer(),
        JsonSerdes.DigitalTwin().serializer(),
        "Digital Twin Processor"); // parent

    return builder;
  }

주기적인 함수를 어떻게 스케쥴링하나?

ProcessorContext#schedule 메서드로 수행할 테스크를 쉽게 스케쥴링 가능.

카프카 스트림즈에서 사용 가능한 구두점 타입

  • 스트림 시간
  • 벽시계 시간
public class DigitalTwinProcessor implements Processor<String, TurbineState, String, DigitalTwin> {
  private static final Logger log = LoggerFactory.getLogger(DigitalTwinProcessor.class);

  private ProcessorContext<String, DigitalTwin> context;
  private KeyValueStore<String, DigitalTwin> kvStore;
  private Cancellable punctuator;

  public void init(ProcessorContext<String, DigitalTwin> context) {
  // ... 생략 ...
      // 벽시계 시간 기준으로 매 5분마다 주기적인 함수를 실행하도록 스케쥴링하고, 반환된 Cancellable 객체를 반환
    punctuator =
        this.context.schedule(
            Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::enforceTtl);
  // ... 생략 ...
  }

  // ... 생략 ...    
  public void enforceTtl(Long timestamp) {
    try (KeyValueIterator<String, DigitalTwin> iter = kvStore.all()) {
      while (iter.hasNext()) {
        KeyValue<String, DigitalTwin> entry = iter.next();
        log.info("Checking to see if digital twin record has expired: {}", entry.key);
        TurbineState lastReportedState = entry.value.getReported();
        if (lastReportedState == null) {
          continue;
        }

        Instant lastUpdated = Instant.parse(lastReportedState.getTimestamp());
        long daysSinceLastUpdate = Duration.between(lastUpdated, Instant.now()).toDays();
        // 오래된 레코드를 상태 저장소에서 삭제
        if (daysSinceLastUpdate >= 7) {
          kvStore.delete(entry.key);
        }
      }
    }
  }
}

상위-수준 DSL과 Processor API를 같이 사용할 수 있는가?

하이브리드 DSL + Processor API 구현의 이점

  • 노드 이름과 부모 이름으로 관계를 정의하는 대신 연산자들을 연결하는 것이 데이터 플로우 구현할 때 더 쉬움
  • DSL에서는 대부분의 연산자에서 람다를 지원하여, 간결한 표현으로 변환 할 때 유리함
  • Processor API에서 키 재성성이 필요하다면 이를 구현하는데 코드가 엄청 길어짐
  • DSL 연산자들은 어떤 스트림 처리 단계에서 벌어지는 일을 정의하는 표준 용어를 제공(예: flatMap)
  • DSL은 다른 형태의 스트림 타입을 위한 공통 용어를 제공
// ... 생략 ...
    // DSL way of building a topology
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, TurbineState> desiredStateEvents =
        builder.stream(
            "desired-state-events", Consumed.with(Serdes.String(), JsonSerdes.TurbineState()));

    KStream<String, TurbineState> highWinds =
        builder
            .stream(
                "reported-state-events", Consumed.with(Serdes.String(), JsonSerdes.TurbineState()))
            // generate shutdown signals
            .flatMapValues(
                (key, reported) -> {
                  List<TurbineState> records = new ArrayList<>();
                  records.add(reported);
                  if (reported.getWindSpeedMph() > 65 && reported.getPower() == Power.ON) {
                    log.info("high winds detected. sending shutdown signal");
                    TurbineState desired = TurbineState.clone(reported);
                    desired.setPower(Power.OFF);
                    desired.setType(Type.DESIRED);
                    // forward the new desired state
                    records.add(desired);
                  }
                  return records;
                })
            .merge(desiredStateEvents);

    StoreBuilder<KeyValueStore<String, DigitalTwin>> storeBuilder =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("digital-twin-store"),
            Serdes.String(),
            JsonSerdes.DigitalTwin());

    builder.addStateStore(storeBuilder);

    highWinds
        .transformValues(DigitalTwinValueTransformerWithKey::new, "digital-twin-store")
        // sink processor
        .to("digital-twins", Produced.with(Serdes.String(), JsonSerdes.DigitalTwin()));

// ... 생략 ...     

프로세서와 트랜스포머의 차이는 무엇인가?

  • DSL은 상태 저장소, 레코드 메타데이터 그리고 프로세서 컨텍스트를 하위-수준으로 접근할 필요가 있을 때마다 Processor API를 사용할 수 있게 해주는 특별한 연산자들을 포함하고 있음
  • 어떤 경우에 사용할지는 처리해야 할 작업의 복잡성과 상태 유지의 필요성에 따라 달라짐
  • 프로세서는 간단한 데이터 변환 작업에 사용되는 상태를 유지하지 않는 구성 요소이며, 트랜스포머는 복잡한 데이터 변환 및 상태 기반 작업에 사용되는 상태를 유지하는 구성 요소

프로세서

  • 말단 연산이고 처리 로직은 Processor 인터페이스를 사용해 구현해야 함
  • DSL에서 Processor API가 필요할 때마다 사용해야함
  • 하위 스트림 연산자를 연결할 필요는 없음

트랜스포머

  • 프로세서와 비교해 좀 더 많은 변형이 있으며, 하나 이상의 레코드를 변환할 수 있음

참고

profile

Notepad

@Apio

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!