混合使用高層級 DSL 和低層級 Processor API
Kafka Streams 提供了兩種 API 變體。它有一個更高層級的類 DSL API,你可以在其中連結各種操作,這對於許多函數語言程式設計人員來說可能很熟悉。Kafka Streams 也提供了低層級 Processor API 的訪問能力。Processor API 雖然非常強大,並且能夠以更低的層級控制事物,但本質上是命令式的。用於 Spring Cloud Stream 的 Kafka Streams Binder 允許你使用高層級 DSL 或混合使用 DSL 和 Processor API。混合使用這兩種變體為你提供了許多選項來控制應用程式中的各種用例。應用程式可以使用 `transform` 或 `process` 方法 API 呼叫來訪問 Processor API。
以下是使用 `process` API 在 Spring Cloud Stream 應用程式中如何結合使用 DSL 和 Processor API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是使用 `transform` API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
`process` API 方法呼叫是一個終端操作,而 `transform` API 是非終端操作,並給你一個經過潛在轉換的 `KStream`,你可以使用它繼續使用 DSL 或 Processor API 進行進一步處理。