Apache Kafka Streams 支援
從 1.1.4 版本開始,Spring for Apache Kafka 提供了對 Kafka Streams 的一流支援。要從 Spring 應用程式中使用它,kafka-streams jar 必須存在於類路徑中。它是 Spring for Apache Kafka 專案的一個可選依賴項,不會被傳遞下載。
基礎
參考 Apache Kafka Streams 文件建議以下使用 API 的方式
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我們有兩個主要元件
-
StreamsBuilder:具有構建KStream(或KTable)例項的 API。 -
KafkaStreams:管理這些例項的生命週期。
由單個 StreamsBuilder 暴露給 KafkaStreams 例項的所有 KStream 例項,即使它們具有不同的邏輯,也會同時啟動和停止。換句話說,由 StreamsBuilder 定義的所有流都與單個生命週期控制繫結。一旦 KafkaStreams 例項透過 streams.close() 關閉,它就不能重新啟動。相反,必須建立一個新的 KafkaStreams 例項來重新啟動流處理。 |
Spring 管理
為了簡化從 Spring 應用程式上下文角度使用 Kafka Streams 並透過容器使用生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。這是一個 AbstractFactoryBean 實現,用於將 StreamsBuilder 單例例項公開為 bean。以下示例建立了這樣一個 bean
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 的形式提供。 |
StreamsBuilderFactoryBean 也實現了 SmartLifecycle 來管理內部 KafkaStreams 例項的生命週期。與 Kafka Streams API 類似,您必須在啟動 KafkaStreams 之前定義 KStream 例項。這也適用於 Kafka Streams 的 Spring API。因此,當您在 StreamsBuilderFactoryBean 上使用預設的 autoStartup = true 時,您必須在應用程式上下文重新整理之前在 StreamsBuilder 上宣告 KStream 例項。例如,KStream 可以是一個常規的 bean 定義,而 Kafka Streams API 的使用沒有任何影響。以下示例展示瞭如何實現
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手動控制生命週期(例如,根據某些條件停止和啟動),您可以使用工廠 bean (&) 字首 直接引用 StreamsBuilderFactoryBean bean。由於 StreamsBuilderFactoryBean 使用其內部 KafkaStreams 例項,因此可以安全地停止並重新啟動它。每次 start() 都會建立一個新的 KafkaStreams。如果您希望單獨控制 KStream 例項的生命週期,您還可以考慮使用不同的 StreamsBuilderFactoryBean 例項。
您還可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListener、Thread.UncaughtExceptionHandler 和 StateRestoreListener 選項,這些選項將委託給內部 KafkaStreams 例項。
此外,除了間接在 StreamsBuilderFactoryBean 上設定這些選項之外,您還可以使用 KafkaStreamsCustomizer 回撥介面來
-
(從 2.1.5 版本開始)使用
customize(KafkaStreams)配置內部KafkaStreams例項 -
(從 3.3.0 版本開始)使用
initKafkaStreams(Topology, Properties, KafkaClientSupplier)例項化KafkaStreams的自定義實現
請注意,KafkaStreamsCustomizer 會覆蓋 StreamsBuilderFactoryBean 提供的選項。
如果您需要直接執行某些 KafkaStreams 操作,您可以透過使用 StreamsBuilderFactoryBean.getKafkaStreams() 訪問該內部 KafkaStreams 例項。
您可以透過型別自動裝配 StreamsBuilderFactoryBean bean,但您應該確保在 bean 定義中使用完整型別,如下例所示
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果您使用介面 bean 定義,您可以新增 @Qualifier 以透過名稱進行注入。以下示例展示瞭如何實現
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
從 2.4.1 版本開始,工廠 bean 有一個名為 infrastructureCustomizer 的新屬性,型別為 KafkaStreamsInfrastructureCustomizer;這允許在流建立之前自定義 StreamsBuilder(例如,新增狀態儲存)和/或 Topology。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了預設的無操作實現,以避免在不需要時必須實現這兩個方法。
提供了一個 CompositeKafkaStreamsInfrastructureCustomizer,用於在您需要應用多個自定義器時使用。
KafkaStreams Micrometer 支援
在 2.5.3 版本中引入,您可以配置一個 KafkaStreamsMicrometerListener,以自動為工廠 bean 管理的 KafkaStreams 物件註冊 Micrometer 計量器
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
流 JSON 序列化和反序列化
為了在以 JSON 格式讀寫主題或狀態儲存時序列化和反序列化資料,Spring for Apache Kafka 提供了一個 JacksonJsonSerde 實現,它使用 JSON,委託給 序列化、反序列化和訊息轉換 中描述的 JacksonJsonSerializer 和 JacksonJsonDeserializer。JacksonJsonSerde 實現透過其建構函式(目標型別或 ObjectMapper)提供相同的配置選項。在以下示例中,我們使用 JacksonJsonSerde 序列化和反序列化 Kafka 流的 Cat 有效負載(JacksonJsonSerde 可以在需要例項的任何地方以類似方式使用)
stream.through(Serdes.Integer(), new JacksonJsonSerde<>(Cat.class), "cats");
在以程式設計方式構建序列化器/反序列化器以在生產者/消費者工廠中使用時,從 2.3 版本開始,您可以使用流式 API,這簡化了配置。
stream.through(
new JacksonJsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JacksonJsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用 KafkaStreamBrancher
KafkaStreamBrancher 類引入了一種更方便的方式來在 KStream 之上構建條件分支。
考慮以下不使用 KafkaStreamBrancher 的示例
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
為了配置 Kafka Streams 環境,StreamsBuilderFactoryBean 需要一個 KafkaStreamsConfiguration 例項。有關所有可能的選項,請參閱 Apache Kafka 文件。
從 2.2 版本開始,流配置現在以 KafkaStreamsConfiguration 物件而不是 StreamsConfig 的形式提供。 |
為了避免大多數情況下的樣板程式碼,尤其是在您開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams 註解,您應該將其放置在 @Configuration 類上。您所需要做的就是宣告一個名為 defaultKafkaStreamsConfig 的 KafkaStreamsConfiguration bean。一個名為 defaultKafkaStreamsBuilder 的 StreamsBuilderFactoryBean bean 會自動在應用程式上下文中宣告。您還可以宣告和使用任何額外的 StreamsBuilderFactoryBean bean。您可以透過提供一個實現 StreamsBuilderFactoryBeanConfigurer 的 bean 來對該 bean 執行額外的自定義。如果有多個這樣的 bean,它們將根據其 Ordered.order 屬性應用。
清理和停止配置
當工廠停止時,KafkaStreams.close() 會帶兩個引數呼叫
-
closeTimeout:等待執行緒關閉的時間(預設為
DEFAULT_CLOSE_TIMEOUT,設定為 10 秒)。可以使用StreamsBuilderFactoryBean.setCloseTimeout()進行配置。 -
leaveGroupOnClose:觸發消費者離開組的呼叫(預設為
false)。可以使用StreamsBuilderFactoryBean.setLeaveGroupOnClose()進行配置。
預設情況下,當工廠 bean 停止時,會呼叫 KafkaStreams.cleanUp() 方法。從 2.1.2 版本開始,工廠 bean 具有額外的建構函式,接受一個 CleanupConfig 物件,該物件具有屬性,允許您控制在 start() 或 stop() 期間是否呼叫 cleanUp() 方法,或者兩者都不呼叫。從 2.7 版本開始,預設情況下永不清理本地狀態。
頭部增強器
版本 3.0 添加了 ContextualProcessor 的 HeaderEnricherProcessor 擴充套件;提供了與已棄用的 HeaderEnricher 相同的功能,後者實現了已棄用的 Transformer 介面。這可用於在流處理中新增頭部;頭部值是 SpEL 表示式;表示式評估的根物件有 3 個屬性
-
record-org.apache.kafka.streams.processor.api.Record(key,value,timestamp,headers) -
key- 當前記錄的鍵 -
value- 當前記錄的值 -
context-ProcessorContext,允許訪問當前記錄元資料
表示式必須返回 byte[] 或 String(將使用 UTF-8 轉換為 byte[])。
要在流中使用增強器
.process(() -> new HeaderEnricherProcessor(expressions))
處理器不改變 key 或 value;它只是新增頭部。
| 每個記錄都需要一個新的例項。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
這是一個簡單的示例,新增一個字面量頭部和一個變數
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 添加了 ContextualProcessor 的 MessagingProcessor 擴充套件,提供了與已棄用的 MessagingTransformer 相同的功能,後者實現了已棄用的 Transformer 介面。這允許 Kafka Streams 拓撲與 Spring Messaging 元件(例如 Spring Integration 流)進行互動。轉換器需要 MessagingFunction 的實現。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 自動使用其 GatewayProxyFactoryBean 提供了一個實現。它還需要一個 MessagingMessageConverter 來將鍵、值和元資料(包括頭部)轉換為 Spring Messaging Message<?>。有關更多資訊,請參閱 從 KStream 呼叫 Spring Integration 流。
從反序列化異常中恢復
版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在發生反序列化異常時採取一些操作。請參閱 Kafka 文件中關於 DeserializationExceptionHandler 的內容,RecoveringDeserializationExceptionHandler 是其實現之一。RecoveringDeserializationExceptionHandler 配置了一個 ConsumerRecordRecoverer 實現。框架提供了 DeadLetterPublishingRecoverer,它將失敗的記錄傳送到死信主題。有關此恢復器的更多資訊,請參閱 釋出死信記錄。
要配置恢復器,請將以下屬性新增到您的流配置中
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
當然,recoverer() bean 可以是您自己的 ConsumerRecordRecoverer 實現。
互動式查詢支援
從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 中互動式查詢所需的基本設施。互動式查詢在有狀態的 Kafka Streams 應用程式中非常有用,因為它們提供了一種持續查詢應用程式中有狀態儲存的方法。因此,如果應用程式想要具體化所考慮系統的當前檢視,互動式查詢提供了一種實現方式。要了解有關互動式查詢的更多資訊,請參閱這篇文章。Spring for Apache Kafka 中的支援以一個名為 KafkaStreamsInteractiveQueryService 的 API 為中心,該 API 是 Kafka Streams 庫中互動式查詢 API 的一個外觀。應用程式可以建立此服務的一個例項作為 bean,然後使用它透過名稱檢索狀態儲存。
以下程式碼片段顯示了一個示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假設一個 Kafka Streams 應用程式有一個名為 app-store 的狀態儲存,那麼該儲存可以透過 KafkaStreamsInteractiveQuery API 檢索,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦應用程式獲得了對狀態儲存的訪問許可權,它就可以從中查詢鍵值資訊。
在這種情況下,應用程式使用的狀態儲存是一個只讀的鍵值儲存。Kafka Streams 應用程式可以使用其他型別的狀態儲存。例如,如果應用程式更喜歡查詢基於視窗的儲存,它可以在 Kafka Streams 應用程式業務邏輯中構建該儲存,然後稍後檢索它。因此,KafkaStreamsInteractiveQueryService 中檢索可查詢儲存的 API 具有通用儲存型別簽名,以便終端使用者可以分配適當的型別。
這是 API 的型別簽名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
呼叫此方法時,使用者可以明確請求適當的狀態儲存型別,就像我們在上面的示例中所做的那樣。
重試狀態儲存檢索
嘗試使用 KafkaStreamsInteractiveQueryService 檢索狀態儲存時,狀態儲存可能會因各種原因而未找到。如果這些原因是暫時的,KafkaStreamsInteractiveQueryService 提供了一個選項,透過允許注入自定義的 RetryTemplate 來重試檢索狀態儲存。預設情況下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 使用最大三次嘗試,並具有一秒的固定回退。
您可以像這樣將自定義的 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中,最大嘗試次數為十次。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查詢遠端狀態儲存
上面顯示用於檢索狀態儲存的 API - retrieveQueryableStore 用於本地可用的鍵值狀態儲存。在生產環境中,Kafka Streams 應用程式很可能根據分割槽數量進行分散式。如果一個主題有四個分割槽,並且有四個相同 Kafka Streams 處理器例項正在執行,那麼每個例項可能負責處理主題中的單個分割槽。在這種情況下,呼叫 retrieveQueryableStore 可能無法提供例項正在尋找的正確結果,儘管它可能返回一個有效的儲存。讓我們假設一個有四個分割槽的主題包含各種鍵的資料,並且單個分割槽始終負責特定的鍵。如果呼叫 retrieveQueryableStore 的例項正在查詢此例項不託管的鍵的資訊,那麼它將不會收到任何資料。這是因為當前的 Kafka Streams 例項對該鍵一無所知。要解決此問題,呼叫例項首先需要確保它們擁有 Kafka Streams 處理器例項的主機資訊,該例項託管特定鍵。這可以從同一 application.id 下的任何 Kafka Streams 例項中檢索,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例程式碼中,呼叫例項正在從名為 app-store 的狀態儲存中查詢特定鍵 12345。API 還需要一個相應的鍵序列化器,在本例中為 IntegerSerializer。Kafka Streams 會遍歷同一 application.id 下的所有例項,並嘗試查詢哪個例項託管此特定鍵。一旦找到,它會以 HostInfo 物件的形式返回該主機資訊。
API 如下所示
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
當以這種分散式方式使用同一 application.id 的多個 Kafka Streams 處理器例項時,應用程式應該提供一個 RPC 層,狀態儲存可以透過 RPC 端點(例如 REST 端點)進行查詢。有關此內容的更多詳細資訊,請參閱這篇文章。當使用 Spring for Apache Kafka 時,透過使用 spring-web 技術新增基於 Spring 的 REST 端點非常容易。一旦有了 REST 端點,就可以使用它從任何 Kafka Streams 例項查詢狀態儲存,前提是例項知道託管鍵的 HostInfo。
如果託管金鑰的例項是當前例項,則應用程式無需呼叫 RPC 機制,而是進行 JVM 內部呼叫。然而,問題是應用程式可能不知道進行呼叫的例項就是託管金鑰的例項,因為特定伺服器可能會由於消費者重新平衡而丟失分割槽。為了解決這個問題,KafkaStreamsInteractiveQueryService 提供了一個方便的 API,透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查詢當前主機資訊,該方法返回當前的 HostInfo。其思想是應用程式可以首先獲取有關金鑰所在位置的資訊,然後將 HostInfo 與當前例項的 HostInfo 進行比較。如果 HostInfo 資料匹配,則可以透過 retrieveQueryableStore 進行簡單的 JVM 呼叫,否則選擇 RPC 選項。
Kafka Streams 示例
以下示例結合了本章中涵蓋的各種主題
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}