混合高階 DSL 和低階 Processor API

Kafka Streams 提供了兩種 API 變體。它有一個高階 DSL 類似 API,您可以鏈式操作,這對於許多函式式程式設計師來說可能很熟悉。Kafka Streams 還提供了對低階處理器 API 的訪問。處理器 API 儘管功能強大,並能夠以更低的級別控制事物,但本質上是命令式的。Spring Cloud Stream 的 Kafka Streams 繫結器允許您使用高階 DSL 或混合 DSL 和處理器 API。混合這兩種變體為您提供了許多選項來控制應用程式中的各種用例。應用程式可以使用 transformprocess 方法 API 呼叫來訪問處理器 API。

以下是關於如何在 Spring Cloud Stream 應用程式中使用 process API 結合 DSL 和處理器 API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API 方法呼叫是一個終止操作,而 transform API 是非終止操作,它為您提供一個可能經過轉換的 KStream,您可以使用它透過 DSL 或處理器 API 繼續進一步處理。

© . This site is unofficial and not affiliated with VMware.