Apache Kafka Streams 支援
從 1.1.4 版本開始,Spring for Apache Kafka 為 Kafka Streams 提供了第一類支援。要在 Spring 應用中使用它,classpath 中必須包含 kafka-streams
jar。它是 Spring for Apache Kafka 專案的可選依賴項,不會被傳遞下載。
基本概念
Apache Kafka Streams 參考文件建議以下使用 API 的方式
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我們有兩個主要元件
-
StreamsBuilder
:提供用於構建KStream
(或KTable
)例項的 API。 -
KafkaStreams
:用於管理這些例項的生命週期。
單個 StreamsBuilder 向 KafkaStreams 例項公開的所有 KStream 例項會同時啟動和停止,即使它們的邏輯不同。換句話說,由 StreamsBuilder 定義的所有流都繫結到同一個生命週期控制。一旦 KafkaStreams 例項透過 streams.close() 關閉,就無法重新啟動。相反,必須建立一個新的 KafkaStreams 例項來重新啟動流處理。 |
Spring 管理
為了簡化從 Spring 應用上下文角度使用 Kafka Streams 並透過容器進行生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean
。這是一個 AbstractFactoryBean
實現,用於將 StreamsBuilder
單例例項作為 bean 公開。以下示例建立了這樣一個 bean
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 提供。 |
StreamsBuilderFactoryBean
還實現了 SmartLifecycle
來管理內部 KafkaStreams
例項的生命週期。與 Kafka Streams API 類似,你必須在啟動 KafkaStreams
之前定義 KStream
例項。這同樣適用於 Spring for Kafka Streams API。因此,當你在 StreamsBuilderFactoryBean
上使用預設的 autoStartup = true
時,你必須在應用上下文重新整理之前在 StreamsBuilder
上宣告 KStream
例項。例如,KStream
可以是一個常規的 bean 定義,而 Kafka Streams API 的使用不受任何影響。以下示例展示瞭如何做到這一點
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果你想手動控制生命週期(例如,根據某些條件停止和啟動),你可以使用 factory bean (&
) 字首直接引用 StreamsBuilderFactoryBean
bean。由於 StreamsBuilderFactoryBean
使用其內部 KafkaStreams
例項,因此可以安全地再次停止和重新啟動它。每次 start()
時都會建立一個新的 KafkaStreams
例項。如果你想分別控制 KStream
例項的生命週期,也可以考慮使用不同的 StreamsBuilderFactoryBean
例項。
你還可以在 StreamsBuilderFactoryBean
上指定 KafkaStreams.StateListener
、Thread.UncaughtExceptionHandler
和 StateRestoreListener
選項,這些選項會被委託給內部 KafkaStreams
例項。
此外,除了在 StreamsBuilderFactoryBean
上間接設定這些選項之外,你還可以使用 KafkaStreamsCustomizer
回撥介面來
-
(從 版本 2.1.5 開始)使用
customize(KafkaStreams)
配置內部KafkaStreams
例項 -
(從 版本 3.3.0 開始)使用
initKafkaStreams(Topology, Properties, KafkaClientSupplier)
例項化KafkaStreams
的自定義實現
請注意,KafkaStreamsCustomizer
會覆蓋 StreamsBuilderFactoryBean
提供的選項。
如果你需要直接執行一些 KafkaStreams
操作,可以透過 StreamsBuilderFactoryBean.getKafkaStreams()
訪問該內部 KafkaStreams
例項。
你可以按型別自動裝配 StreamsBuilderFactoryBean
bean,但必須確保在 bean 定義中使用完整型別,如下例所示
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
另外,如果使用介面 bean 定義,可以新增 @Qualifier
按名稱注入。以下示例展示瞭如何做到這一點
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
從 2.4.1 版本開始,factory bean 有一個新屬性 infrastructureCustomizer
,型別為 KafkaStreamsInfrastructureCustomizer
;這允許在建立流之前自定義 StreamsBuilder
(例如,新增狀態儲存)和/或 Topology
。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了預設的無操作(no-op)實現,以避免在不需要實現某個方法時必須實現兩者。
提供了 CompositeKafkaStreamsInfrastructureCustomizer
,用於需要應用多個自定義器(customizers)的情況。
Kafka Streams Micrometer 支援
從 2.5.3 版本開始引入,你可以配置 KafkaStreamsMicrometerListener
來為 factory bean 管理的 KafkaStreams
物件自動註冊 micrometer 指標。
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
為了在以 JSON 格式讀寫主題或狀態儲存時進行資料序列化和反序列化,Spring for Apache Kafka 提供了一個使用 JSON 的 JsonSerde
實現,它委託給序列化、反序列化和訊息轉換中描述的 JsonSerializer
和 JsonDeserializer
。JsonSerde
實現透過其建構函式(目標型別或 ObjectMapper
)提供了相同的配置選項。在以下示例中,我們使用 JsonSerde
對 Kafka 流的 Cat
載荷進行序列化和反序列化(JsonSerde
可以在任何需要例項的地方以類似的方式使用)
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
自 2.3 版本起,在以程式設計方式構建序列化器/反序列化器用於生產者/消費者工廠時,你可以使用流式 API,這簡化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用 KafkaStreamBrancher
KafkaStreamBrancher
類引入了一種更方便的方式來在 KStream
之上構建條件分支。
考慮以下不使用 KafkaStreamBrancher
的示例
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 環境,StreamsBuilderFactoryBean
需要一個 KafkaStreamsConfiguration
例項。有關所有可能的選項,請參閱 Apache Kafka 文件。
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 提供。 |
為了避免大多數情況下的樣板程式碼,尤其是在開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams
註解,你應該將其放在 @Configuration
類上。你只需要宣告一個名為 defaultKafkaStreamsConfig
的 KafkaStreamsConfiguration
bean。一個名為 defaultKafkaStreamsBuilder
的 StreamsBuilderFactoryBean
bean 會自動在應用上下文中宣告。你也可以宣告和使用任何額外的 StreamsBuilderFactoryBean
bean。透過提供一個實現 StreamsBuilderFactoryBeanConfigurer
的 bean,你可以對該 bean 進行額外的自定義。如果存在多個這樣的 bean,它們將按照其 Ordered.order
屬性進行應用。
清理 & 停止配置
當 factory 停止時,會呼叫 KafkaStreams.close()
方法,帶有 2 個引數
-
closeTimeout:等待執行緒關閉的時間(預設為
DEFAULT_CLOSE_TIMEOUT
,設定為 10 秒)。可以透過StreamsBuilderFactoryBean.setCloseTimeout()
進行配置。 -
leaveGroupOnClose:是否觸發消費者離開組的呼叫(預設為
false
)。可以透過StreamsBuilderFactoryBean.setLeaveGroupOnClose()
進行配置。
預設情況下,當 factory bean 停止時,會呼叫 KafkaStreams.cleanUp()
方法。從 2.1.2 版本開始,factory bean 提供了額外的建構函式,接受一個 CleanupConfig
物件,該物件包含屬性,允許你控制 cleanUp()
方法是在 start()
或 stop()
期間呼叫,還是都不呼叫。從 2.7 版本開始,預設情況下永遠不清理本地狀態。
頭部豐富器
3.0 版本添加了 ContextualProcessor
的擴充套件 HeaderEnricherProcessor
;提供了與已廢棄的 HeaderEnricher
相同的功能,後者實現了已廢棄的 Transformer
介面。這可用於在流處理中新增頭部;頭部值是 SpEL 表示式;表示式評估的根物件具有 3 個屬性
-
record
-org.apache.kafka.streams.processor.api.Record
(key
、value
、timestamp
、headers
) -
key
- 當前記錄的鍵 -
value
- 當前記錄的值 -
context
-ProcessorContext
,允許訪問當前記錄元資料
表示式必須返回 byte[]
或 String
(後者將使用 UTF-8
轉換為 byte[]
)。
要在流中使用 enricher
.process(() -> new HeaderEnricherProcessor(expressions))
該處理器不會改變 key
或 value
;它僅新增頭部。
你需要為每條記錄建立一個新例項。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
這裡有一個簡單示例,新增一個字面量頭部和一個變數
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
3.0 版本添加了 ContextualProcessor
的擴充套件 MessagingProcessor
,提供了與已廢棄的 MessagingTransformer
相同的功能,後者實現了已廢棄的 Transformer
介面。這允許 Kafka Streams 拓撲與 Spring Messaging 元件(例如 Spring Integration 流)進行互動。該 transformer 需要 MessagingFunction
的實現。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 自動使用其 GatewayProxyFactoryBean
提供一個實現。它還需要一個 MessagingMessageConverter
將 key、value 和元資料(包括頭部)與 Spring Messaging Message>
相互轉換。有關更多資訊,請參閱[從 KStream
呼叫 Spring Integration 流]。
從反序列化異常中恢復
2.3 版本引入了 RecoveringDeserializationExceptionHandler
,它可以在發生反序列化異常時採取一些行動。請參考 Kafka 關於 DeserializationExceptionHandler
的文件,RecoveringDeserializationExceptionHandler
是它的一個實現。RecoveringDeserializationExceptionHandler
配置了一個 ConsumerRecordRecoverer
實現。框架提供了 DeadLetterPublishingRecoverer
,它將失敗的記錄傳送到死信主題。有關此 recoverer 的更多資訊,請參閱釋出死信記錄。
要配置 recoverer,請將以下屬性新增到你的流配置中
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
當然,recoverer()
bean 可以是你自己的 ConsumerRecordRecoverer
實現。
互動式查詢支援
從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 互動式查詢所需的基本功能。互動式查詢在有狀態的 Kafka Streams 應用中非常有用,因為它們提供了一種持續查詢應用中狀態儲存的方式。因此,如果應用想要實現系統中當前檢視的實體化,互動式查詢提供了一種實現方式。要了解更多關於互動式查詢的資訊,請參閱這篇文章。Spring for Apache Kafka 中的支援圍繞一個名為 KafkaStreamsInteractiveQueryService
的 API,它是 Kafka Streams 庫中互動式查詢 API 的一個外觀(facade)。應用可以將此服務例項建立為一個 bean,然後使用它按名稱檢索狀態儲存。
以下程式碼片段展示了一個示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假設 Kafka Streams 應用有一個名為 app-store
的狀態儲存,那麼可以透過如下所示的 KafkStreamsInteractiveQuery
API 檢索該儲存。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦應用訪問到狀態儲存,就可以從中查詢鍵值資訊。
在這種情況下,應用使用的狀態儲存是一個只讀的鍵值儲存。Kafka Streams 應用還可以使用其他型別的狀態儲存。例如,如果應用希望查詢基於視窗的儲存,可以在 Kafka Streams 應用業務邏輯中構建該儲存,然後稍後檢索它。因此,KafkaStreamsInteractiveQueryService
中檢索可查詢儲存的 API 具有通用儲存型別簽名,以便終端使用者可以指定正確的型別。
這是 API 中的型別簽名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
呼叫此方法時,使用者可以像我們在上面的示例中那樣,明確請求正確的狀態儲存型別。
重試狀態儲存檢索
嘗試使用 KafkaStreamsInteractiveQueryService
檢索狀態儲存時,狀態儲存可能因各種原因找不到。如果這些原因是暫時的,KafkaStreamsInteractiveQueryService
提供了重試檢索狀態儲存的選項,允許注入自定義的 RetryTemplate
。預設情況下,KafkaStreamsInteractiveQueryService
中使用的 RetryTemplate
最大嘗試次數為三次,固定退避時間為一秒。
以下是如何將最大嘗試次數設定為十的自定義 RetryTemplate
注入到 KafkaStreamsInteractiveQueryService
中。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查詢遠端狀態儲存
上面展示的用於檢索狀態儲存的 API - retrieveQueryableStore
,主要用於本地可用的鍵值狀態儲存。在生產環境中,Kafka Streams 應用很可能根據分割槽數量進行分散式部署。如果一個主題有四個分割槽,並且有四個相同的 Kafka Streams 處理器例項正在執行,那麼每個例項可能負責處理該主題的一個分割槽。在這種情況下,呼叫 retrieveQueryableStore
可能不會得到例項正在尋找的正確結果,即使它可能返回一個有效的儲存。假設具有四個分割槽的主題包含關於各種鍵的資料,並且一個分割槽總是負責特定的鍵。如果呼叫 retrieveQueryableStore
的例項正在查詢該例項未託管的鍵的資訊,那麼它將不會收到任何資料。這是因為當前的 Kafka Streams 例項對此鍵一無所知。為了解決這個問題,呼叫例項首先需要確保它們擁有託管特定鍵的 Kafka Streams 處理器例項的主機資訊。這可以從同一 application.id
下的任何 Kafka Streams 例項檢索,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例程式碼中,呼叫例項正在從名為 app-store
的狀態儲存中查詢特定鍵 12345
。該 API 還需要相應的鍵序列化器,在本例中是 IntegerSerializer
。Kafka Streams 會在其同一 application.id
下的所有例項中查詢哪個例項託管了此特定鍵,找到後,它將該主機資訊作為 HostInfo
物件返回。
該 API 如下所示
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
在這種分散式方式下使用相同 application.id
的多個 Kafka Streams 處理器例項時,應用應提供一個 RPC 層,以便可以透過 RPC 端點(例如 REST)查詢狀態儲存。有關此內容的更多詳細資訊,請參閱這篇文章。使用 Spring for Apache Kafka 時,利用 spring-web 技術新增基於 Spring 的 REST 端點非常容易。一旦有了 REST 端點,就可以從任何 Kafka Streams 例項查詢狀態儲存,前提是該例項知道託管該鍵的 HostInfo
。
如果託管鍵的例項是當前例項,那麼應用無需呼叫 RPC 機制,而是進行 JVM 內部呼叫。然而,問題在於應用可能不知道發起呼叫的例項就是託管鍵的例項,因為特定伺服器可能由於消費者再平衡而失去一個分割槽。為了解決這個問題,KafkaStreamsInteractiveQueryService
提供了一個方便的 API,透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo()
查詢當前主機資訊,該方法返回當前的 HostInfo
。其思想是應用可以先獲取關於鍵所在位置的資訊,然後將該 HostInfo
與當前例項的 HostInfo
進行比較。如果 HostInfo
資料匹配,則可以透過 retrieveQueryableStore
進行簡單的 JVM 呼叫,否則選擇 RPC 選項。
Kafka Streams 示例
以下示例結合了我們在本章中介紹的各種主題
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}