3.2.2

參考指南

本指南描述了 Spring Cloud Stream Binder 的 Apache Kafka 實現。它包含了關於其設計、用法和配置選項的資訊,以及 Stream Cloud Stream 概念如何對映到 Apache Kafka 特定構造的資訊。此外,本指南還解釋了 Spring Cloud Stream 的 Kafka Streams 繫結功能。

1. Apache Kafka Binder

1.1. 用法

要使用 Apache Kafka binder,您需要將 spring-cloud-stream-binder-kafka 新增為 Spring Cloud Stream 應用程式的依賴項,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2. 概覽

下圖顯示了 Apache Kafka binder 操作的簡化圖

kafka binder
圖 1. Kafka Binder

Apache Kafka Binder 實現將每個目標對映到 Apache Kafka 主題。消費者組直接對映到相同的 Apache Kafka 概念。分割槽也直接對映到 Apache Kafka 分割槽。

binder 當前使用 Apache Kafka kafka-clients 版本 2.3.1。此客戶端可以與舊代理通訊(請參閱 Kafka 文件),但某些功能可能不可用。例如,對於早於 0.11.x.x 的版本,不支援原生頭。此外,0.11.x.x 不支援 autoAddPartitions 屬性。

1.3. 配置選項

本節包含 Apache Kafka binder 使用的配置選項。

有關 binder 的常見配置選項和屬性,請參閱核心文件中的繫結屬性

1.3.1. Kafka Binder 屬性

spring.cloud.stream.kafka.binder.brokers

Kafka binder 連線的代理列表。

預設值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允許指定帶或不帶埠資訊的主機(例如,host1,host2:port2)。這設定了當代理列表中未配置埠時的預設埠。

預設值:9092

spring.cloud.stream.kafka.binder.configuration

鍵/值對映,包含傳遞給 binder 建立的所有客戶端的客戶端屬性(生產者和消費者)。由於這些屬性由生產者和消費者使用,因此用法應僅限於通用屬性——例如,安全設定。透過此配置提供的未知 Kafka 生產者或消費者屬性將被過濾掉,不允許傳播。此處的屬性將覆蓋引導中設定的任何屬性。

預設值:空對映。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客戶端消費者屬性的鍵/值對映。除了支援已知的 Kafka 消費者屬性外,此處也允許未知消費者屬性。此處的屬性將覆蓋引導中設定的任何屬性以及上面 configuration 屬性中設定的任何屬性。

預設值:空對映。

spring.cloud.stream.kafka.binder.headers

由 binder 傳輸的自定義頭列表。僅當與舊應用程式 (⇐ 1.3.x) 且 kafka-clients 版本 < 0.11.0.0 通訊時才需要。較新版本原生支援頭。

預設值:空。

spring.cloud.stream.kafka.binder.healthTimeout

等待獲取分割槽資訊的時間,以秒為單位。如果此計時器到期,則健康報告為 down。

預設值:10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的 ack 數量。有關生產者 acks 屬性,請參閱 Kafka 文件。

預設值:1

spring.cloud.stream.kafka.binder.minPartitionCount

僅當設定了 autoCreateTopicsautoAddPartitions 時才有效。binder 在其生產或消費資料的主題上配置的全域性最小分割槽數。它可以被生產者的 partitionCount 設定或生產者的 instanceCount * concurrency 設定的值(如果兩者中任何一個更大)覆蓋。

預設值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客戶端生產者屬性的鍵/值對映。除了支援已知的 Kafka 生產者屬性外,此處也允許未知生產者屬性。此處的屬性將覆蓋引導中設定的任何屬性以及上面 configuration 屬性中設定的任何屬性。

預設值:空對映。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 處於活動狀態,則自動建立主題的複製因子。可以在每個繫結上覆蓋。

如果您使用的 Kafka 代理版本早於 2.4,則此值應設定為至少 1。從 3.0.8 版本開始,binder 使用 -1 作為預設值,這表示將使用代理的“default.replication.factor”屬性來確定副本數量。請諮詢您的 Kafka 代理管理員,瞭解是否存在要求最小複製因子的策略,如果是這種情況,通常 default.replication.factor 將與該值匹配,並且應使用 -1,除非您需要大於最小值的複製因子。

預設值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果設定為 true,則 binder 會自動建立新主題。如果設定為 false,則 binder 依賴於已配置的主題。在後一種情況下,如果主題不存在,則 binder 無法啟動。

此設定獨立於代理的 auto.create.topics.enable 設定,並且不影響它。如果伺服器設定為自動建立主題,則它們可能會作為元資料檢索請求的一部分建立,並使用預設代理設定。

預設值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果設定為 true,則 binder 會在需要時建立新分割槽。如果設定為 false,則 binder 依賴於已配置的主題分割槽大小。如果目標主題的分割槽計數小於預期值,則 binder 無法啟動。

預設值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在 binder 中啟用事務。有關 Kafka 文件中的 transaction.idspring-kafka 文件中的事務,請參閱相關內容。啟用事務時,將忽略單個 producer 屬性,並且所有生產者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 屬性。

預設 null(無事務)

spring.cloud.stream.kafka.binder.transaction.producer.*

事務性 binder 中生產者的全域性生產者屬性。請參閱 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生產者屬性以及所有 binder 支援的通用生產者屬性。

預設值:請參閱各個生產者屬性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用於將 spring-messaging 頭對映到 Kafka 頭以及從 Kafka 頭對映的 KafkaHeaderMapper 的 bean 名稱。例如,如果您希望自定義使用 JSON 反序列化頭的 BinderHeaderMapper bean 中的受信任包,請使用此項。如果此自定義 BinderHeaderMapper bean 未透過此屬性提供給 binder,則 binder 將查詢名稱為 kafkaBinderHeaderMapper 且型別為 BinderHeaderMapper 的頭對映器 bean,然後再回退到 binder 建立的預設 BinderHeaderMapper

預設值:無。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

當主題上的任何分割槽(無論從哪個消費者接收資料)被發現沒有 leader 時,將 binder 健康狀態設定為 down 的標誌。

預設值:false

spring.cloud.stream.kafka.binder.certificateStoreDirectory

當 truststore 或 keystore 證書位置以 classpath URL (classpath:…​) 給出時,binder 會將 JAR 檔案內部 classpath 位置的資源複製到檔案系統上的某個位置。這對於代理級別證書 (ssl.truststore.locationssl.keystore.location) 和用於 schema 登錄檔 (schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location) 的證書都是如此。請記住,truststore 和 keystore 的 classpath 位置必須在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location`spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。檔案將移動到此屬性值指定的位置,該位置必須是檔案系統上存在的目錄,並且可由執行應用程式的程序寫入。如果此值未設定且證書檔案是 classpath 資源,則它將移動到由 System.getProperty("java.io.tmpdir") 返回的系統臨時目錄。如果此值存在但找不到目錄或不可寫入,則也是如此。

預設值:無。

1.3.2. Kafka 消費者屬性

為避免重複,Spring Cloud Stream 支援以 spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式為所有通道設定值。

以下屬性僅適用於 Kafka 消費者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 為字首。

admin.configuration

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.properties,並將在未來版本中移除對其的支援。

admin.replicas-assignment

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.replicas-assignment,並將在未來版本中移除對其的支援。

admin.replication-factor

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.replication-factor,並將在未來版本中移除對其的支援。

autoRebalanceEnabled

當為 true 時,主題分割槽會在消費者組的成員之間自動重新平衡。當為 false 時,每個消費者根據 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一組固定分割槽。這需要每個啟動例項上都適當地設定 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 屬性。在這種情況下,spring.cloud.stream.instanceCount 屬性的值通常必須大於 1。

預設值:true

ackEachRecord

autoCommitOffsettrue 時,此設定決定是否在處理每個記錄後提交偏移量。預設情況下,在處理完 consumer.poll() 返回的批次中的所有記錄後,會提交偏移量。透過 max.poll.records Kafka 屬性(透過消費者 configuration 屬性設定)可以控制輪詢返回的記錄數。將其設定為 true 可能會導致效能下降,但這樣做可以減少發生故障時重新投遞記錄的可能性。另請參閱 binder 的 requiredAcks 屬性,它也會影響提交偏移量的效能。此屬性自 3.1 版本起已棄用,取而代之的是使用 ackMode。如果未設定 ackMode 且未啟用批處理模式,將使用 RECORD ackMode。

預設值:false

autoCommitOffset

從 3.1 版本開始,此屬性已棄用。有關替代方案的更多詳細資訊,請參閱 ackMode。訊息處理後是否自動提交偏移量。如果設定為 false,則入站訊息中會存在一個鍵為 kafka_acknowledgment 且型別為 org.springframework.kafka.support.Acknowledgment 的頭。應用程式可以使用此頭來確認訊息。有關詳細資訊,請參閱示例部分。當此屬性設定為 false 時,Kafka binder 將確認模式設定為 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,並且應用程式負責確認記錄。另請參閱 ackEachRecord

預設值:true

ackMode

指定容器確認模式。這基於 Spring Kafka 中定義的 AckMode 列舉。如果 ackEachRecord 屬性設定為 true 且消費者不處於批處理模式,則將使用 RECORD 確認模式,否則,使用透過此屬性提供的確認模式。

autoCommitOnError

在可輪詢消費者中,如果設定為 true,則總是在出錯時自動提交。如果未設定(預設)或為 false,則在可輪詢消費者中不會自動提交。請注意,此屬性僅適用於可輪詢消費者。

預設值:未設定。

resetOffsets

是否將消費者上的偏移量重置為 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,則必須為 false;請參閱使用 KafkaBindingRebalanceListener。有關此屬性的更多資訊,請參閱重置偏移量

預設值:false

startOffset

新組的起始偏移量。允許的值:earliestlatest。如果為消費者“繫結”顯式設定了消費者組(透過 spring.cloud.stream.bindings.<channelName>.group),則“startOffset”設定為 earliest。否則,對於 anonymous 消費者組,它設定為 latest。有關此屬性的更多資訊,請參閱重置偏移量

預設值:null(等同於 earliest)。

enableDlq

當設定為 true 時,它為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會轉發到名為 error.<destination>.<group> 的主題。DLQ 主題名稱可以透過設定 dlqName 屬性或定義型別為 DlqDestinationResolver@Bean 來配置。這為更常見的 Kafka 重放場景提供了一種替代選項,適用於錯誤數量相對較少且重放整個原始主題可能過於繁瑣的情況。有關更多資訊,請參閱死信主題處理。從 2.0 版本開始,傳送到 DLQ 主題的訊息會新增以下頭:x-original-topicx-exception-messagex-exception-stacktrace 作為 byte[]。預設情況下,失敗的記錄會發送到 DLQ 主題中與原始記錄相同分割槽號的分割槽。有關如何更改此行為,請參閱死信主題分割槽選擇destinationIsPatterntrue 時不允許。

預設值:false

dlqPartitions

enableDlq 為 true 且未設定此屬性時,會建立一個死信主題,其分割槽數與主主題相同。通常,死信記錄會發送到死信主題中與原始記錄相同的分割槽。此行為可以更改;請參閱死信主題分割槽選擇。如果此屬性設定為 1 並且沒有 DqlPartitionFunction bean,則所有死信記錄都將寫入分割槽 0。如果此屬性大於 1,則您必須提供一個 DlqPartitionFunction bean。請注意,實際分割槽計數受 binder 的 minPartitionCount 屬性影響。

預設值:none

configuration

包含通用 Kafka 消費者屬性的鍵/值對對映。除了 Kafka 消費者屬性外,此處還可以傳遞其他配置屬性。例如,應用程式所需的一些屬性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此處不能設定 bootstrap.servers 屬性;如果需要連線到多個叢集,請使用多 binder 支援。

預設值:空對映。

dlqName

用於接收錯誤訊息的 DLQ 主題名稱。

預設值:null(如果未指定,導致錯誤的訊息將轉發到名為 error.<destination>.<group> 的主題)。

dlqProducerProperties

透過此屬性,可以設定 DLQ 特定的生產者屬性。所有可透過 Kafka 生產者屬性獲得的屬性都可以透過此屬性進行設定。當消費者上啟用原生解碼(即 useNativeDecoding: true)時,應用程式必須為 DLQ 提供相應的鍵/值序列化器。這必須以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

預設值:預設 Kafka 生產者屬性。

standardHeaders

指示入站通道介面卡填充哪些標準頭。允許的值:noneidtimestampboth。如果使用原生反序列化且接收訊息的第一個元件需要一個 id(例如配置為使用 JDBC 訊息儲存的聚合器),則此功能很有用。

預設值:none

converterBeanName

實現 RecordMessageConverter 的 bean 的名稱。在入站通道介面卡中用於替換預設的 MessagingMessageConverter

預設值:null

idleEventInterval

指示最近沒有收到訊息的事件之間的間隔,以毫秒為單位。使用 ApplicationListener<ListenerContainerIdleEvent> 來接收這些事件。有關使用示例,請參閱示例:暫停和恢復消費者

預設值:30000

destinationIsPattern

當為 true 時,目標被視為一個正則表示式 Pattern,用於由代理匹配主題名稱。當為 true 時,不提供主題,並且不允許 enableDlq,因為 binder 在 provisioning 階段不知道主題名稱。請注意,檢測與模式匹配的新主題所需的時間由消費者屬性 metadata.max.age.ms 控制,該屬性(在撰寫本文時)預設為 300,000 毫秒(5 分鐘)。這可以使用上面的 configuration 屬性進行配置。

預設值:false

topic.properties

在配置新主題時使用的 Kafka 主題屬性的 Map,例如 spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

預設值:無。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中鍵是分割槽,值是分配。在配置新主題時使用。請參閱 kafka-clients jar 中的 NewTopic Javadoc。

預設值:無。

topic.replication-factor

配置主題時使用的複製因子。覆蓋 binder 範圍的設定。如果存在 replicas-assignments 則忽略。

預設值:無(使用 binder 範圍的預設值 -1)。

pollTimeout

可輪詢消費者中輪詢使用的超時時間。

預設值:5 秒。

事務管理器

KafkaAwareTransactionManager 的 bean 名稱,用於覆蓋此繫結的 binder 事務管理器。通常在您希望使用 ChainedKafkaTransactionManaager 將另一個事務與 Kafka 事務同步時需要。要實現記錄的精確一次消費和生產,消費者和生產者繫結必須都配置相同的事務管理器。

預設值:無。

txCommitRecovered

使用事務性 binder 時,恢復記錄的偏移量(例如,當重試耗盡且記錄傳送到死信主題時)將預設透過新事務提交。將此屬性設定為 false 將抑制恢復記錄的偏移量提交。

預設值:true。

commonErrorHandlerBeanName

每個消費者繫結要使用的 CommonErrorHandler bean 名稱。如果存在,此使用者提供的 CommonErrorHandler 將優先於 binder 定義的任何其他錯誤處理器。如果應用程式不想使用 ListenerContainerCustomizer 然後檢查目標/組組合來設定錯誤處理器,這是一種方便表達錯誤處理器的方式。

預設值:無。

1.3.3. 重置偏移量

當應用程式啟動時,每個分配的分割槽中的初始位置取決於兩個屬性 startOffsetresetOffsets。如果 resetOffsetsfalse,則應用正常的 Kafka 消費者 auto.offset.reset 語義。即,如果繫結消費者組的分割槽沒有提交的偏移量,則位置為 earliestlatest。預設情況下,具有顯式 group 的繫結使用 earliest,匿名繫結(沒有 group)使用 latest。這些預設值可以透過設定 startOffset 繫結屬性來覆蓋。首次使用特定 group 啟動繫結時,將沒有提交的偏移量。沒有提交偏移量的另一種情況是偏移量已過期。對於現代代理(自 2.1 版本起),以及預設代理屬性,偏移量在最後一個成員離開組後 7 天過期。有關更多資訊,請參閱 offsets.retention.minutes 代理屬性。

resetOffsetstrue 時,binder 應用與代理上沒有提交偏移量時類似的語義,就像此繫結從未從主題消費過一樣;即,任何當前的提交偏移量都將被忽略。

以下是可能使用此功能兩種情況。

  1. 從包含鍵/值對的壓縮主題消費。將 resetOffsets 設定為 true 並將 startOffset 設定為 earliest;繫結將對所有新分配的分割槽執行 seekToBeginning

  2. 從包含事件的主題消費,您只對在此繫結執行時發生的事件感興趣。將 resetOffsets 設定為 true 並將 startOffset 設定為 latest;繫結將對所有新分配的分割槽執行 seekToEnd

如果在初始分配後發生再平衡,則尋求僅對在初始分配期間未分配的任何新分配的分割槽執行。

有關主題偏移量的更多控制,請參閱使用 KafkaBindingRebalanceListener;當提供偵聽器時,不應將 resetOffsets 設定為 true,否則將導致錯誤。

1.3.4. 消費批次

從 3.0 版本開始,當 spring.cloud.stream.binding.<name>.consumer.batch-mode 設定為 true 時,透過輪詢 Kafka Consumer 接收到的所有記錄都將以 List<?> 的形式呈現給偵聽器方法。否則,方法將一次呼叫一個記錄。批處理的大小由 Kafka 消費者屬性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有關詳細資訊,請參閱 Kafka 文件。

請記住,批處理模式不支援 @StreamListener - 它僅適用於較新的函數語言程式設計模型。

在批處理模式下不支援 binder 內的重試,因此 maxAttempts 將被覆蓋為 1。您可以配置一個 SeekToCurrentBatchErrorHandler(使用 ListenerContainerCustomizer)以實現與 binder 中重試類似的功能。您還可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並重新傳遞剩餘的記錄。有關這些技術的更多資訊,請參閱 Spring for Apache Kafka 文件

1.3.5. Kafka 生產者屬性

為避免重複,Spring Cloud Stream 支援以 spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式為所有通道設定值。

以下屬性僅適用於 Kafka 生產者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 為字首。

admin.configuration

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.properties,並將在未來版本中移除對其的支援。

admin.replicas-assignment

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.replicas-assignment,並將在未來版本中移除對其的支援。

admin.replication-factor

自 2.1.1 版起,此屬性已棄用,取而代之的是 topic.replication-factor,並將在未來版本中移除對其的支援。

bufferSize

Kafka 生產者在傳送之前嘗試批次處理的資料量的上限,以位元組為單位。

預設值:16384

sync

生產者是否同步。

預設值:false

sendTimeoutExpression

對出站訊息求值的 SpEL 表示式,用於評估啟用同步釋出時等待 ack 的時間——例如,headers['mySendTimeout']。超時值以毫秒為單位。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用有效負載,因為在評估此表示式時,有效負載已經是 byte[] 的形式。現在,在轉換有效負載之前評估表示式。

預設值:none

batchTimeout

生產者等待更多訊息累積在同一批次中然後傳送訊息的時間。(通常,生產者根本不會等待,而只是傳送在前一次傳送進行中累積的所有訊息。)非零值可能會以延遲為代價提高吞吐量。

預設值:0

messageKeyExpression

一個針對出站訊息進行評估的 SpEL 表示式,用於填充生成的 Kafka 訊息的鍵——例如,headers['myKey']。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用有效負載,因為在評估此表示式時,有效負載已經是 byte[] 的形式。現在,在轉換有效負載之前評估表示式。對於常規處理器 (Function<String, String>Function<Message<?>, Message<?>),如果生成的鍵需要與主題中的傳入鍵相同,則可以按如下方式設定此屬性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 對於響應式函式有一個重要的注意事項。在這種情況下,應用程式需要手動將傳入訊息中的頭複製到出站訊息中。您可以設定頭,例如 myKey,並使用 headers['myKey'],如上所述,或者為了方便起見,只需設定 KafkaHeaders.MESSAGE_KEY 頭,您根本不需要設定此屬性。

預設值:none

headerPatterns

一個逗號分隔的簡單模式列表,用於匹配 Spring 訊息頭,這些訊息頭將對映到 ProducerRecord 中的 Kafka Headers。模式可以以萬用字元(星號)開頭或結尾。模式可以透過字首 ! 來否定。匹配在第一次匹配(肯定或否定)後停止。例如 !ask,as* 將透過 ash 但不透過 askidtimestamp 永遠不會對映。

預設值:*(所有頭 - 除了 idtimestamp

configuration

包含通用 Kafka 生產者屬性的鍵/值對對映。此處不能設定 bootstrap.servers 屬性;如果需要連線到多個叢集,請使用多 binder 支援。

預設值:空對映。

topic.properties

在配置新主題時使用的 Kafka 主題屬性的 Map,例如 spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中鍵是分割槽,值是分配。在配置新主題時使用。請參閱 kafka-clients jar 中的 NewTopic Javadoc。

預設值:無。

topic.replication-factor

配置主題時使用的複製因子。覆蓋 binder 範圍的設定。如果存在 replicas-assignments 則忽略。

預設值:無(使用 binder 範圍的預設值 -1)。

useTopicHeader

設定為 true 以使用出站訊息中的 KafkaHeaders.TOPIC 訊息頭的值覆蓋預設繫結目標(主題名稱)。如果不存在該頭,則使用預設繫結目標。

預設值:false

recordMetadataChannel

用於傳送成功傳送結果的 MessageChannel 的 bean 名稱;該 bean 必須存在於應用程式上下文中。傳送到通道的訊息是已傳送訊息(如果進行了轉換,則為轉換後的訊息),並帶有一個額外的頭 KafkaHeaders.RECORD_METADATA。該頭包含 Kafka 客戶端提供的 RecordMetadata 物件;它包括記錄寫入主題的分割槽和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失敗的傳送將傳送到生產者錯誤通道(如果已配置);請參閱錯誤通道

預設值:null。

Kafka binder 使用生產者的 partitionCount 設定作為提示來建立具有給定分割槽計數的主題(結合 minPartitionCount,兩者中的最大值將用作實際值)。在為 binder 配置 minPartitionCount 和為應用程式配置 partitionCount 時要謹慎,因為將使用較大的值。如果主題已存在且分割槽計數較小,並且 autoAddPartitions 被停用(預設),則 binder 將無法啟動。如果主題已存在且分割槽計數較小,並且 autoAddPartitions 被啟用,則會新增新分割槽。如果主題已存在且分割槽數量大於 minPartitionCountpartitionCount 的最大值,則將使用現有分割槽計數。
compression

設定 compression.type 生產者屬性。支援的值為 nonegzipsnappylz4zstd。如果您將 kafka-clients jar 覆蓋為 2.1.0(或更高版本),如 Spring for Apache Kafka 文件中所述,並希望使用 zstd 壓縮,請使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

預設值:none

事務管理器

KafkaAwareTransactionManager 的 bean 名稱,用於覆蓋此繫結的 binder 事務管理器。通常在您希望使用 ChainedKafkaTransactionManaager 將另一個事務與 Kafka 事務同步時需要。要實現記錄的精確一次消費和生產,消費者和生產者繫結必須都配置相同的事務管理器。

預設值:無。

closeTimeout

關閉生產者時等待的超時時間,以秒為單位。

預設值:30

allowNonTransactional

通常,與事務性 binder 關聯的所有輸出繫結都將在新事務中釋出,如果尚未進行事務。此屬性允許您覆蓋該行為。如果設定為 true,則釋出到此輸出繫結的記錄將不會在事務中執行,除非事務已經進行中。

預設值:false

1.3.6. 使用示例

在本節中,我們將展示前面屬性在特定場景中的使用。

示例:將 ackMode 設定為 MANUAL 並依賴手動確認

此示例說明了如何在消費者應用程式中手動確認偏移量。

此示例要求將 spring.cloud.stream.kafka.bindings.input.consumer.ackMode 設定為 MANUAL。請為您的示例使用相應的輸入通道名稱。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}
示例:安全配置

Apache Kafka 0.9 支援客戶端和代理之間的安全連線。要利用此功能,請遵循 Apache Kafka 文件中的指南以及 Confluent 文件中 Kafka 0.9 安全指南。使用 spring.cloud.stream.kafka.binder.configuration 選項為 binder 建立的所有客戶端設定安全屬性。

例如,要將 security.protocol 設定為 SASL_SSL,請設定以下屬性

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全屬性也可以以類似的方式設定。

使用 Kerberos 時,請遵循 參考文件中的說明來建立和引用 JAAS 配置。

Spring Cloud Stream 支援透過使用 JAAS 配置檔案和 Spring Boot 屬性將 JAAS 配置資訊傳遞給應用程式。

使用 JAAS 配置檔案

透過使用系統屬性,可以為 Spring Cloud Stream 應用程式設定 JAAS 和(可選)krb5 檔案位置。以下示例演示瞭如何使用 JAAS 配置檔案啟動帶 SASL 和 Kerberos 的 Spring Cloud Stream 應用程式

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 屬性

作為 JAAS 配置檔案的替代方案,Spring Cloud Stream 提供了一種機制,允許使用 Spring Boot 屬性為 Spring Cloud Stream 應用程式設定 JAAS 配置。

以下屬性可用於配置 Kafka 客戶端的登入上下文

spring.cloud.stream.kafka.binder.jaas.loginModule

登入模組名稱。在正常情況下無需設定。

預設值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登入模組的控制標誌。

預設值:required

spring.cloud.stream.kafka.binder.jaas.options

包含登入模組選項的鍵/值對對映。

預設值:空對映。

以下示例演示瞭如何使用 Spring Boot 配置屬性啟動帶 SASL 和 Kerberos 的 Spring Cloud Stream 應用程式

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 檔案的等價物

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需主題已存在於代理上或將由管理員建立,則可以關閉自動建立,並且只需要傳送客戶端 JAAS 屬性。

不要在同一個應用程式中混合使用 JAAS 配置檔案和 Spring Boot 屬性。如果 -Djava.security.auth.login.config 系統屬性已存在,Spring Cloud Stream 將忽略 Spring Boot 屬性。
在使用 Kerberos 時,請謹慎使用 autoCreateTopicsautoAddPartitions。通常,應用程式可能使用在 Kafka 和 Zookeeper 中沒有管理許可權的主體。因此,依賴 Spring Cloud Stream 建立/修改主題可能會失敗。在安全環境中,我們強烈建議使用 Kafka 工具以管理方式建立主題和管理 ACL。
多 binder 配置和 JAAS

當連線到需要單獨 JAAS 配置的多個叢集時,請使用屬性 sasl.jaas.config 設定 JAAS 配置。當此屬性存在於應用程式中時,它將優先於上面提到的其他策略。有關更多詳細資訊,請參閱此 KIP-85

例如,如果您的應用程式中有兩個叢集,每個叢集都有單獨的 JAAS 配置,那麼以下是您可以使用的模板

spring.cloud.stream:
    binders:
        kafka1:
          type: kafka
          environment:
             spring:
               cloud:
                 stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
        kafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
    kafka.binder:
        configuration:
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: PLAIN

請注意,在上述配置中,兩個 Kafka 叢集及其各自的 sasl.jaas.config 值都是不同的。

有關如何設定和執行此類應用程式的更多詳細資訊,請參閱此示例應用程式

示例:暫停和恢復消費者

如果您希望暫停消費但不引起分割槽再平衡,您可以暫停和恢復消費者。這透過管理繫結生命週期來實現,如 Spring Cloud Stream 文件中 繫結視覺化和控制 所示,使用 State.PAUSEDState.RESUMED

要恢復,您可以使用 ApplicationListener(或 @EventListener 方法)接收 ListenerContainerIdleEvent 例項。事件釋出的頻率由 idleEventInterval 屬性控制。

1.4. 事務性 Binder

透過將 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 設定為非空值(例如 tx-)來啟用事務。當在處理器應用程式中使用時,消費者啟動事務;消費者執行緒上傳送的任何記錄都參與相同的事務。當偵聽器正常退出時,偵聽器容器將偏移量傳送到事務並提交。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 屬性配置的生產者繫結都使用一個通用的生產者工廠;單個繫結 Kafka 生產者屬性將被忽略。

事務不支援常規 binder 重試(和死信),因為重試將在原始事務中執行,該事務可能會回滾,並且任何已釋出的記錄也將回滾。啟用重試時(通用屬性 maxAttempts 大於零),重試屬性用於配置 DefaultAfterRollbackProcessor 以啟用容器級別的重試。同樣,死信記錄的釋出不再在事務中進行,此功能已轉移到偵聽器容器,再次透過 DefaultAfterRollbackProcessor,它在主事務回滾後執行。

如果您希望在源應用程式中使用事務,或者從某個任意執行緒進行僅生產者事務(例如 @Scheduled 方法),則必須獲取事務性生產者工廠的引用並使用它定義 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

請注意,我們使用 BinderFactory 獲取 binder 的引用;當只有一個 binder 配置時,第一個引數使用 null。如果配置了多個 binder,則使用 binder 名稱獲取引用。一旦我們獲得了 binder 的引用,我們就可以獲取 ProducerFactory 的引用並建立事務管理器。

然後您將使用正常的 Spring 事務支援,例如 TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望將僅生產者事務與來自其他事務管理器的事務同步,請使用 ChainedTransactionManager

如果您部署應用程式的多個例項,每個例項都需要唯一的 transactionIdPrefix

1.5. 錯誤通道

從 1.3 版本開始,binder 無條件地將異常傳送到每個消費者目標的錯誤通道,並且還可以配置為將非同步生產者傳送失敗傳送到錯誤通道。有關更多資訊,請參閱有關錯誤處理的這一部分

傳送失敗的 ErrorMessage 的有效負載是 KafkaSendFailureException,其屬性為

  • failedMessage:傳送失敗的 Spring Messaging Message<?>

  • record:從 failedMessage 建立的原始 ProducerRecord

沒有自動處理生產者異常(例如傳送到死信佇列)。您可以使用自己的 Spring Integration 流來處理這些異常。

1.6. Kafka 指標

Kafka binder 模組暴露以下指標

spring.cloud.stream.binder.kafka.offset:此指標指示給定消費者組從給定 binder 主題中尚未消費的訊息數量。提供的指標基於 Micrometer 庫。如果 Micrometer 在 classpath 中,並且應用程式未提供其他此類 bean,則 binder 會建立 KafkaBinderMetrics bean。該指標包含消費者組資訊、主題和已提交偏移量與主題最新偏移量之間的實際滯後。此指標對於向 PaaS 平臺提供自動伸縮反饋特別有用。

您可以透過在應用程式中提供以下元件來阻止 KafkaBinderMetrics 建立必要的基礎設施(例如消費者)並報告指標。

@Component
class NoOpBindingMeters {
	NoOpBindingMeters(MeterRegistry registry) {
		registry.config().meterFilter(
				MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
	}
}

有關如何選擇性地抑制儀表的更多詳細資訊,請參閱此處

1.7. 墓碑記錄 (空記錄值)

使用壓縮主題時,值為 null 的記錄(也稱為墓碑記錄)表示鍵的刪除。要在 @StreamListener 方法中接收此類訊息,引數必須標記為非必需才能接收 null 值引數。

@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

1.8. 使用 KafkaBindingRebalanceListener

應用程式可能希望在分割槽最初分配時將主題/分割槽定位到任意偏移量,或對消費者執行其他操作。從 2.1 版本開始,如果您在應用程式上下文中提供一個 KafkaBindingRebalanceListener bean,它將被連線到所有 Kafka 消費者繫結中。

public interface KafkaBindingRebalanceListener {

	/**
	 * Invoked by the container before any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked by the container after any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked when partitions are initially assigned or after a rebalance.
	 * Applications might only want to perform seek operations on an initial assignment.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
			boolean initial) {

	}

}

提供再平衡偵聽器時,不能將 resetOffsets 消費者屬性設定為 true

1.9. 重試和死信處理

預設情況下,當您在消費者繫結中配置重試(例如 maxAttemts)和 enableDlq 時,這些功能在 binder 內部執行,不參與偵聽器容器或 Kafka 消費者。

在某些情況下,將此功能移到偵聽器容器中更可取,例如

  • 重試和延遲的總和將超過消費者的 max.poll.interval.ms 屬性,可能導致分割槽再平衡。

  • 您希望將死信釋出到不同的 Kafka 叢集。

  • 您希望向錯誤處理器新增重試偵聽器。

  • …​

要配置將此功能從 binder 移動到容器,請定義型別為 ListenerContainerWithDlqAndRetryCustomizer@Bean。此介面具有以下方法

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return true to disable retrie in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

目標解析器和 BackOff 是根據繫結屬性(如果已配置)建立的。然後,您可以使用它們來建立自定義錯誤處理器和死信釋出器;例如

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template),
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

現在,只需要一個重試延遲大於消費者的 max.poll.interval.ms 屬性。

1.10. 自定義消費者和生產者配置

如果您想對用於在 Kafka 中建立 ConsumerFactoryProducerFactory 的消費者和生產者配置進行高階自定義,您可以實現以下定製器。

  • ConsumerConfigCustomizer

  • ProducerConfigCustomizer

這兩個介面都提供了一種配置用於消費者和生產者屬性的配置對映的方法。例如,如果您想訪問在應用程式級別定義的 bean,您可以將其注入 configure 方法的實現中。當 binder 發現這些定製器可用作 bean 時,它將在建立消費者和生產者工廠之前呼叫 configure 方法。

這兩個介面還提供了對繫結和目標名稱的訪問,以便在自定義生產者和消費者屬性時可以訪問它們。

1.11. 自定義 AdminClient 配置

與上述消費者和生產者配置定製一樣,應用程式還可以透過提供 AdminClientConfigCustomizer 來定製 AdminClient 的配置。AdminClientConfigCustomizer 的 configure 方法提供了對 AdminClient 屬性的訪問,您可以使用這些屬性進行進一步的定製。Binder 的 Kafka 主題供應器對此定製器提供的屬性給予最高優先順序。以下是提供此定製器 bean 的示例。

@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
    return props -> {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    };
}

1.12. 自定義 Kafka Binder 健康指示器

當 Spring Boot Actuator 在類路徑上時,Kafka binder 會啟用一個預設的健康指示器。此健康指示器檢查 binder 的健康狀況以及與 Kafka 代理的任何通訊問題。如果應用程式想要停用此預設健康檢查實現幷包含自定義實現,則可以為 KafkaBinderHealth 介面提供一個實現。KafkaBinderHealth 是一個擴充套件自 HealthIndicator 的標記介面。在自定義實現中,它必須提供 health() 方法的實現。自定義實現必須以 bean 的形式存在於應用程式配置中。當 binder 發現自定義實現時,它將使用它而不是預設實現。以下是應用程式中此類自定義實現 bean 的示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

1.13. 死信主題處理

1.13.1. 死信主題分割槽選擇

預設情況下,記錄釋出到死信主題時使用與原始記錄相同的分割槽。這意味著死信主題必須至少有與原始記錄相同數量的分割槽。

要更改此行為,請將 DlqPartitionFunction 實現作為 @Bean 新增到應用程式上下文。只能存在一個這樣的 bean。該函式將提供消費者組、失敗的 ConsumerRecord 和異常。例如,如果您總是希望路由到分割槽 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您將消費者繫結的 dlqPartitions 屬性設定為 1(並且繫結的 minPartitionCount 等於 1),則無需提供 DlqPartitionFunction;框架將始終使用分割槽 0。如果您將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或繫結的 minPartitionCount 大於 1),則您必須提供一個 DlqPartitionFunction bean,即使分割槽計數與原始主題相同。

還可以為 DLQ 主題定義自定義名稱。為此,請將 DlqDestinationResolver 的實現作為 @Bean 建立到應用程式上下文。當 binder 檢測到此類 bean 時,它將優先使用,否則它將使用 dlqName 屬性。如果兩者都未找到,它將預設為 error.<destination>.<group>。以下是 DlqDestinationResolver 作為 @Bean 的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的實現時,需要記住一件重要的事情,那就是繫結器中的 provisioner 不會自動為應用程式建立主題。這是因為繫結器無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果您使用此策略提供 DLQ 名稱,應用程式有責任確保這些主題事先已建立。

1.13.2. 處理死信主題中的記錄

由於框架無法預測使用者希望如何處理死信訊息,因此它不提供任何標準機制來處理它們。如果死信的原因是暫時的,您可能希望將訊息路由回原始主題。但是,如果問題是永久性問題,那可能會導致無限迴圈。本主題中的示例 Spring Boot 應用程式是一個示例,說明如何將這些訊息路由回原始主題,但在三次嘗試後將其移動到“停放”主題。該應用程式是另一個從死信主題讀取的 spring-cloud-stream 應用程式。當 5 秒內沒有收到訊息時,它會退出。

示例假設原始目標是 so8400out,消費者組是 so8400

有幾種策略需要考慮

  • 考慮僅在主應用程式未執行時執行重定向。否則,瞬態錯誤的重試將很快耗盡。

  • 或者,採用兩階段方法:使用此應用程式路由到第三個主題,並使用另一個應用程式從該主題路由回主主題。

以下程式碼清單顯示了示例應用程式

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
應用程式
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

1.14. 使用 Kafka Binder 進行分割槽

Apache Kafka 原生支援主題分割槽。

有時將資料傳送到特定分割槽是有利的——例如,當您希望嚴格排序訊息處理時(特定客戶的所有訊息都應傳送到同一分割槽)。

以下示例演示瞭如何配置生產者和消費者端

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
主題必須配置足夠的分割槽,以實現所有消費者組所需的併發性。上述配置支援多達 12 個消費者例項(如果其 concurrency 為 2,則為 6 個;如果其併發為 3,則為 4 個,依此類推)。通常最好“過度配置”分割槽,以允許未來增加消費者或併發性。
上述配置使用預設分割槽 (key.hashCode() % partitionCount)。這可能提供也可能不提供適當平衡的演算法,具體取決於鍵值。您可以透過使用 partitionSelectorExpressionpartitionSelectorClass 屬性來覆蓋此預設值。

由於分割槽由 Kafka 原生處理,因此消費者端不需要特殊配置。Kafka 在例項之間分配分割槽。

以下 Spring Boot 應用程式監聽 Kafka 流並將(到控制檯)每條訊息進入的分割槽 ID 打印出來

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根據需要新增例項。Kafka 會重新平衡分割槽分配。如果例項計數(或 例項計數 * 併發數)超過分割槽數,則某些消費者將處於空閒狀態。

2. Kafka Streams Binder

2.1. 用法

要使用 Kafka Streams binder,您只需將其新增到 Spring Cloud Stream 應用程式中,使用以下 maven 座標

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

快速啟動 Kafka Streams binder 新專案的方法是使用 Spring Initializr,然後選擇“Cloud Streams”和“Spring for Kafka Streams”,如下圖所示

spring initializr kafka streams

2.2. 概覽

Spring Cloud Stream 包含一個專門為 Apache Kafka Streams 繫結設計的 binder 實現。透過此原生整合,Spring Cloud Stream“處理器”應用程式可以直接在核心業務邏輯中使用 Apache Kafka Streams API。

Kafka Streams binder 實現建立在 Spring for Apache Kafka 專案提供的基礎上。

Kafka Streams binder 為 Kafka Streams 中的三種主要型別提供繫結功能 - KStreamKTableGlobalKTable

Kafka Streams 應用程式通常遵循一種模型,其中從入站主題讀取記錄,應用業務邏輯,然後將轉換後的記錄寫入出站主題。或者,也可以定義一個沒有出站目標的處理器應用程式。

在以下部分中,我們將深入瞭解 Spring Cloud Stream 與 Kafka Streams 整合的詳細資訊。

2.3. 程式設計模型

當使用 Kafka Streams binder 提供的程式設計模型時,可以使用高階 Streams DSL,也可以混合使用高階和低階 Processor-API。當混合使用高階和低階 API 時,通常透過呼叫 KStream 上的 transformprocess API 方法來實現。

2.3.1. 函式式風格

從 Spring Cloud Stream 3.0.0 開始,Kafka Streams binder 允許應用程式使用 Java 8 中可用的函數語言程式設計風格進行設計和開發。這意味著應用程式可以簡潔地表示為 java.util.function.Functionjava.util.function.Consumer 型別的 lambda 表示式。

讓我們看一個非常基本的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

儘管簡單,但這是一個完整的獨立 Spring Boot 應用程式,它利用 Kafka Streams 進行流處理。這是一個沒有出站繫結且只有一個入站繫結的消費者應用程式。該應用程式消費資料,並簡單地將 KStream 鍵和值的資訊記錄到標準輸出。該應用程式包含 SpringBootApplication 註解和一個標記為 Bean 的方法。bean 方法的型別是 java.util.function.Consumer,它被 KStream 引數化。然後,在實現中,我們返回一個 Consumer 物件,它本質上是一個 lambda 表示式。在 lambda 表示式內部,提供了處理資料的程式碼。

在此應用程式中,有一個型別為 KStream 的單一輸入繫結。binder 為應用程式建立此繫結,名稱為 process-in-0,即函式 bean 名稱後跟一個短劃線 (-) 和字面量 in,然後是另一個短劃線,最後是引數的序數位置。您可以使用此繫結名稱來設定其他屬性,例如目標。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果未在繫結上設定目標屬性,則會建立一個與繫結同名的主題(如果應用程式具有足夠的許可權),或者該主題預期已可用。

一旦構建為 uber-jar(例如,kstream-consumer-app.jar),您可以像下面這樣執行上述示例。

如果應用程式選擇使用 Spring 的 Component 註解定義函式式 bean,則 binder 也支援該模型。上述函式式 bean 可以改寫如下。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

這裡是另一個示例,它是一個完整的處理器,既有輸入繫結又有輸出繫結。這是一個經典的詞頻統計示例,其中應用程式從主題接收資料,然後計算每個單詞在滾動時間視窗中的出現次數。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

這裡又是一個完整的 Spring Boot 應用程式。與第一個應用程式的不同之處在於,bean 方法的型別是 java.util.function.FunctionFunction 的第一個引數化型別用於輸入 KStream,第二個引數化型別用於輸出。在方法體中,提供了一個型別為 Function 的 lambda 表示式,並作為實現,給出了實際的業務邏輯。類似於前面討論的基於 Consumer 的應用程式,此處的輸入繫結預設命名為 process-in-0。對於輸出,繫結名稱也自動設定為 process-out-0

一旦構建為 uber-jar(例如,wordcount-processor.jar),您可以像下面這樣執行上述示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此應用程式將從 Kafka 主題 words 消費訊息,並將計算結果釋出到輸出主題 counts

Spring Cloud Stream 將確保傳入和傳出主題的訊息都自動繫結為 KStream 物件。作為開發人員,您可以專注於程式碼的業務方面,即編寫處理器中所需的邏輯。設定 Kafka Streams 基礎設施所需的 Kafka Streams 特定配置由框架自動處理。

我們上面看到的兩個示例都有一個單一的 KStream 輸入繫結。在這兩種情況下,繫結都從一個主題接收記錄。如果您想將多個主題複用到單個 KStream 繫結中,您可以在下面提供逗號分隔的 Kafka 主題作為目標。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您希望將主題與正則表示式匹配,您還可以提供主題模式作為目標。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多個輸入繫結

許多非平凡的 Kafka Streams 應用程式通常透過多個繫結從多個主題消費資料。例如,一個主題作為 Kstream 消費,另一個主題作為 KTableGlobalKTable 消費。應用程式可能希望將資料作為表型別接收的原因有很多。考慮一種用例,其中底層主題透過資料庫的變更資料捕獲 (CDC) 機制填充,或者應用程式只關心用於下游處理的最新更新。如果應用程式指定需要將資料繫結為 KTableGlobalKTable,則 Kafka Streams binder 將正確地將目標繫結到 KTableGlobalKTable,並使其可用於應用程式操作。我們將看看 Kafka Streams binder 中處理多個輸入繫結的幾種不同場景。

Kafka Streams Binder 中的 BiFunction

這是一個我們有兩個輸入和一個輸出的示例。在這種情況下,應用程式可以利用 java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

這裡,基本主題與前面的示例相同,但我們有兩個輸入。Java 的 BiFunction 支援用於將輸入繫結到所需的目標。binder 為輸入生成的預設繫結名稱分別是 process-in-0process-in-1。預設輸出繫結是 process-out-0。在此示例中,BiFunction 的第一個引數繫結為第一個輸入的 KStream,第二個引數繫結為第二個輸入的 KTable

Kafka Streams Binder 中的 BiConsumer

如果存在兩個輸入但沒有輸出,在這種情況下,我們可以使用 java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
超過兩個輸入

如果您的輸入超過兩個怎麼辦?在某些情況下,您需要兩個以上的輸入。在這種情況下,binder 允許您連結部分函式。在函數語言程式設計術語中,這種技術通常稱為柯里化。隨著 Java 8 中新增的函數語言程式設計支援,Java 現在允許您編寫柯里化函式。Spring Cloud Stream Kafka Streams binder 可以利用此功能來啟用多個輸入繫結。

讓我們看一個例子。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

讓我們看看上面介紹的繫結模型的細節。在這個模型中,我們在入站有 3 個部分應用的函式。我們稱它們為 f(x)f(y)f(z)。如果我們在真實數學函式的意義上展開這些函式,它將看起來像這樣:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>x 變數代表 KStream<Long, Order>y 變數代表 GlobalKTable<Long, Customer>z 變數代表 GlobalKTable<Long, Product>。第一個函式 f(x) 具有應用程式的第一個輸入繫結 (KStream<Long, Order>),其輸出是函式 f(y)。函式 f(y) 具有應用程式的第二個輸入繫結 (GlobalKTable<Long, Customer>),其輸出是另一個函式 f(z)。函式 f(z) 的輸入是應用程式的第三個輸入 (GlobalKTable<Long, Product>),其輸出是 KStream<Long, EnrichedOrder>,這是應用程式的最終輸出繫結。來自三個部分函式的輸入,它們分別是 KStreamGlobalKTableGlobalKTable,在方法體中可供您用於實現作為 lambda 表示式一部分的業務邏輯。

輸入繫結分別命名為 enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。輸出繫結命名為 enrichOrder-out-0

使用柯里化函式,您實際上可以擁有任意數量的輸入。但是,請記住,如果輸入數量過多,並且像上面 Java 中那樣部分應用的函式過多,可能會導致程式碼難以閱讀。因此,如果您的 Kafka Streams 應用程式需要超過合理數量的輸入繫結,並且您想使用此函式模型,那麼您可能需要重新考慮您的設計並適當地分解應用程式。

輸出繫結

Kafka Streams binder 允許 KStreamKTable 型別作為輸出繫結。在幕後,binder 使用 KStream 上的 to 方法將結果記錄傳送到輸出主題。如果應用程式在函式中提供 KTable 作為輸出,binder 仍然透過委託給 KStreamto 方法來使用此技術。

例如,以下兩個函式都將起作用

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}
多個輸出繫結

Kafka Streams 允許將出站資料寫入多個主題。此功能在 Kafka Streams 中稱為分支。當使用多個輸出繫結時,您需要提供一個 KStream 陣列 (KStream[]) 作為出站返回型別。

這是一個例子

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

程式設計模型保持不變,但出站引數化型別為 KStream[]。對於上述函式,預設的輸出繫結名稱分別是 process-out-0process-out-1process-out-2。binder 生成三個輸出繫結的原因是它檢測到返回的 KStream 陣列長度為三。請注意,在此示例中,我們提供了 noDefaultBranch();如果我們改為使用 defaultBranch(),那將需要一個額外的輸出繫結,本質上返回一個長度為四的 KStream 陣列。

Kafka Streams 函數語言程式設計風格總結

總而言之,下表顯示了函式式正規化中可以使用的各種選項。

輸入數量 輸出數量 要使用的元件

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

使用柯里化函式

  • 在此表中,如果輸出超過一個,則型別簡單地變為 KStream[]

Kafka Streams binder 中的函式組合

Kafka Streams binder 支援線性拓撲的最小形式的函式組合。使用 Java 函式式 API 支援,您可以編寫多個函式,然後使用 andThen 方法自行組合它們。例如,假設您有以下兩個函式。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使沒有 binder 中的函式組合支援,您也可以如下組合這兩個函式。

@Bean
pubic Funcion<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然後您可以提供 spring.cloud.stream.function.definition=foo;bar;composed 形式的定義。有了 binder 中的函式組合支援,您就不需要編寫第三個進行顯式函式組合的函式。

您可以簡單地這樣做

spring.cloud.stream.function.definition=foo|bar

您甚至可以這樣做

spring.cloud.stream.function.definition=foo|bar;foo;bar

在此示例中,組合函式的預設繫結名稱變為 foobar-in-0foobar-out-0

Kafka Streams bincer 中函式組合的限制

當您擁有 java.util.function.Function bean 時,它可以與另一個函式或多個函式組合。相同的函式 bean 也可以與 java.util.function.Consumer 組合。在這種情況下,消費者是最後一個組合的元件。一個函式可以與多個函式組合,然後以 java.util.function.Consumer bean 結尾。

當組合型別為 java.util.function.BiFunction 的 bean 時,BiFunction 必須是定義中的第一個函式。組合實體必須是 java.util.function.Functionjava.util.function.Consumer 型別。換句話說,您不能接受一個 BiFunction bean,然後與另一個 BiFunction 組合。

您不能與 BiConsumer 型別或 Consumer 是第一個元件的定義進行組合。您也不能與輸出為陣列(用於分支的 KStream[])的函式進行組合,除非這是定義中的最後一個元件。

函式定義中的第一個 FunctionBiFunction 也可以使用柯里化形式。例如,以下是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函式定義可以是 curriedFoo|bar。在幕後,binder 將為柯里化函式建立兩個輸入繫結,並根據定義中的最終函式建立一個輸出繫結。在這種情況下,預設輸入繫結將是 curriedFoobar-in-0curriedFoobar-in-1。此示例的預設輸出繫結將是 curriedFoobar-out-0

函式組合中使用 KTable 作為輸出的特別說明

假設您有兩個函式。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以將它們組合為 foo|bar,但請記住,第二個函式(本例中的 bar)必須具有 KTable 作為輸入,因為第一個函式(foo)具有 KTable 作為輸出。

2.3.2. 指令式程式設計模型。

從 binder 的 3.1.0 版本開始,我們建議對基於 Kafka Streams binder 的應用程式使用上面描述的函數語言程式設計模型。從 Spring Cloud Stream 的 3.1.0 版本開始,對 StreamListener 的支援已棄用。下面,我們將提供一些關於基於 StreamListener 的 Kafka Streams 處理器的詳細資訊作為參考。

以下是使用 StreamListener 的詞頻統計示例的等效程式碼。

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }

如您所見,這有點冗長,因為您需要提供 EnableBinding 和其他額外的註解,如 StreamListenerSendTo 才能使其成為一個完整的應用程式。EnableBinding 是您指定包含繫結的繫結介面的地方。在此示例中,我們使用包含以下契約的庫存 KafkaStreamsProcessor 繫結介面。

public interface KafkaStreamsProcessor {

	@Input("input")
	KStream<?, ?> input();

	@Output("output")
	KStream<?, ?> output();

}

Binder 將為輸入的 KStream 和輸出的 KStream 建立繫結,因為您正在使用包含這些宣告的繫結介面。

除了函數語言程式設計模型中提供的明顯差異外,這裡需要特別提到的一點是,繫結名稱是您在繫結介面中指定的名稱。例如,在上述應用程式中,由於我們使用的是 KafkaStreamsProcessor,因此繫結名稱是 inputoutput。繫結屬性需要使用這些名稱。例如 spring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.output.destination 等。請記住,這與函數語言程式設計模型有著根本區別,因為在函數語言程式設計模型中,binder 為應用程式生成繫結名稱。這是因為應用程式在不使用 EnableBinding 的函式式模型中不提供任何繫結介面。

這是另一個有兩個輸入的 sink 示例。

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

以下是上面看到的基於 BiFunction 的處理器的 StreamListener 等效項。

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

最後,這是具有三個輸入和柯里化函式的應用程式的 StreamListener 等效項。

@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
    @StreamListener
    @SendTo("output")
    public KStream<Long, EnrichedOrder> process(
            @Input("input-1") KStream<Long, Order> ordersStream,
            @Input("input-2") GlobalKTable<Long, Customer> customers,
            @Input("input-3") GlobalKTable<Long, Product> products) {

        KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
                customers, (orderId, order) -> order.getCustomerId(),
                (order, customer) -> new CustomerOrder(customer, order));

        return customerOrdersStream.join(products,
                (orderId, customerOrder) -> customerOrder.productId(),
                (customerOrder, product) -> {
                    EnrichedOrder enrichedOrder = new EnrichedOrder();
                    enrichedOrder.setProduct(product);
                    enrichedOrder.setCustomer(customerOrder.customer);
                    enrichedOrder.setOrder(customerOrder.order);
                    return enrichedOrder;
                });
        }

    interface CustomGlobalKTableProcessor {

            @Input("input-1")
            KStream<?, ?> input1();

            @Input("input-2")
            GlobalKTable<?, ?> input2();

            @Input("input-3")
            GlobalKTable<?, ?> input3();

            @Output("output")
            KStream<?, ?> output();
    }

您可能會注意到,上面兩個示例更加冗長,因為除了提供 EnableBinding 之外,您還需要編寫自己的自定義繫結介面。使用函式式模型,您可以避免所有這些繁瑣的細節。

在繼續研究 Kafka Streams binder 提供的通用程式設計模型之前,這裡是多個輸出繫結的 StreamListener 版本。

EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3"})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

總結一下,我們回顧了使用 Kafka Streams binder 時的各種程式設計模型選擇。

binder 為輸入的 KStreamKTableGlobalKTable 提供繫結功能。KTableGlobalKTable 繫結僅在輸入端可用。Binder 支援 KStream 的輸入和輸出繫結。

Kafka Streams binder 程式設計模型的優點是,binder 為您提供了選擇完全函數語言程式設計模型或使用基於 StreamListener 的命令式方法的靈活性。

2.4. 程式設計模型的輔助功能

2.4.1. 單個應用程式中的多個 Kafka Streams 處理器

Binder 允許在一個 Spring Cloud Stream 應用程式中包含多個 Kafka Streams 處理器。您可以擁有如下應用程式。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在這種情況下,binder 將建立 3 個獨立的 Kafka Streams 物件,它們具有不同的應用程式 ID(下面將詳細介紹)。但是,如果應用程式中有多個處理器,您必須告訴 Spring Cloud Stream 需要啟用哪些函式。以下是啟用函式的方法。

spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess

如果您希望某些功能不立即啟用,可以將其從列表中刪除。

當您在同一個應用程式中擁有一個 Kafka Streams 處理器和透過不同 binder 處理的其他型別的 Function bean 時,這也是如此(例如,一個基於常規 Kafka 訊息通道 binder 的函式 bean)。

2.4.2. Kafka Streams 應用程式 ID

應用程式 ID 是 Kafka Streams 應用程式必須提供的屬性。Spring Cloud Stream Kafka Streams binder 允許您以多種方式配置此應用程式 ID。

如果應用程式中只有一個處理器或 StreamListener,則可以使用以下屬性在 binder 級別設定此項

spring.cloud.stream.kafka.streams.binder.applicationId.

為了方便起見,如果您只有一個處理器,您還可以使用 spring.application.name 作為屬性來委託應用程式 ID。

如果應用程式中有多個 Kafka Streams 處理器,則需要為每個處理器設定應用程式 ID。在函式式模型中,您可以將其作為屬性附加到每個函式。

例如,假設您有以下函式。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然後,您可以使用以下 binder 級別屬性為每個函式設定應用程式 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

對於 StreamListener,您需要在處理器的第一個輸入繫結上設定此項。

例如,假設您有以下兩個基於 StreamListener 的處理器。

@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
   ...
}

@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
   ...
}

然後,您必須使用以下繫結屬性設定此應用程式 ID。

spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId

spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId

對於基於函式式模型,這種在繫結級別設定應用程式 ID 的方法也適用。但是,如果您使用函式式模型,如上所述在 binder 級別為每個函式設定應用程式 ID 會更容易。

對於生產部署,強烈建議透過配置顯式指定應用程式 ID。如果您正在自動擴充套件應用程式,則這一點尤為關鍵,在這種情況下,您需要確保部署的每個例項都具有相同的應用程式 ID。

如果應用程式未提供應用程式 ID,則 binder 將為您自動生成一個靜態應用程式 ID。這在開發場景中很方便,因為它避免了顯式提供應用程式 ID 的需要。以這種方式生成的應用程式 ID 在應用程式重啟後將是靜態的。在函式式模型中,生成的應用程式 ID 將是函式 bean 名稱後跟字面量 applicationID,例如,如果 process 是函式 bean 名稱,則為 process-applicationID。對於 StreamListener,生成的應用程式 ID 將使用包含類名稱後跟方法名稱,然後是字面量 applicationId,而不是使用函式 bean 名稱。

設定應用程式 ID 總結
  • 預設情況下,binder 將為每個函式或 StreamListener 方法自動生成應用程式 ID。

  • 如果您有一個處理器,則可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果您有多個處理器,則可以使用屬性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 為每個函式設定應用程式 ID。對於 StreamListener,可以使用 spring.cloud.stream.kafka.streams.bindings.input.applicationId 完成此操作,假設輸入繫結名稱為 input

2.4.3. 使用函式式風格覆蓋 binder 生成的預設繫結名稱

預設情況下,當使用函式式樣式時,binder 使用上述策略生成繫結名稱,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆蓋這些繫結名稱,可以透過指定以下屬性來完成。

spring.cloud.stream.function.bindings.<default binding name>。預設繫結名稱是 binder 生成的原始繫結名稱。

例如,假設您有這個函式。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 將生成名稱為 process-in-0process-in-1process-out-0 的繫結。現在,如果您想將它們完全更改為其他名稱,可能是更具領域特定性的繫結名稱,那麼您可以如下操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之後,您必須將所有繫結級別屬性設定到這些新的繫結名稱上。

請記住,對於上面描述的函數語言程式設計模型,在大多數情況下,遵守預設繫結名稱是有意義的。您可能仍然想要進行此覆蓋的唯一原因是當您有大量配置屬性並且希望將繫結對映到更領域友好的名稱時。

2.4.4. 設定引導伺服器配置

執行 Kafka Streams 應用程式時,必須提供 Kafka broker 伺服器資訊。如果您不提供此資訊,binder 期望您在預設的 localhost:9092 上執行 broker。如果不是這種情況,則需要覆蓋它。有幾種方法可以做到這一點。

  • 使用 boot 屬性 - spring.kafka.bootstrapServers

  • Binder 級別屬性 - spring.cloud.stream.kafka.streams.binder.brokers

對於 binder 級別屬性,無論您是使用透過常規 Kafka binder 提供的 broker 屬性 - spring.cloud.stream.kafka.binder.brokers 都沒有關係。Kafka Streams binder 將首先檢查是否設定了 Kafka Streams binder 特定 broker 屬性(spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,則查詢 spring.cloud.stream.kafka.binder.brokers

2.5. 記錄序列化和反序列化

Kafka Streams binder 允許您以兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換功能。讓我們看一些細節。

2.5.1. 入站反序列化

鍵始終使用原生 Serdes 反序列化。

對於值,預設情況下,入站反序列化由 Kafka 原生執行。請注意,這是 Kafka Streams binder 以前版本預設行為的重大更改,以前的反序列化是由框架完成的。

Kafka Streams binder 將嘗試透過檢視 java.util.function.Function|ConsumerStreamListener 的型別簽名來推斷匹配的 Serde 型別。以下是它匹配 Serdes 的順序。

  • 如果應用程式提供了 Serde 型別的 bean,並且返回型別使用傳入鍵或值的實際型別引數化,則它將使用該 Serde 進行入站反序列化。例如,如果應用程式中存在以下內容,則 binder 會檢測到 KStream 的傳入值型別與 Serde bean 引數化的型別匹配。它將使用該型別進行入站反序列化。

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下來,它檢視型別並檢視它們是否是 Kafka Streams 公開的型別之一。如果是,則使用它們。以下是 binder 將嘗試從 Kafka Streams 匹配的 Serde 型別。

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的 Serde 都不匹配這些型別,那麼它將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,binder 假定這些型別是 JSON 友好的。這在您有多個值物件作為輸入時很有用,因為 binder 將在內部將它們推斷為正確的 Java 型別。但在回退到 JsonSerde 之前,binder 會檢查 Kafka Streams 配置中設定的預設 Serde`s,以檢視它是否是可以與傳入 KStream 的型別匹配的 `Serde

如果上述策略均未奏效,則應用程式必須透過配置提供 Serde。這可以透過兩種方式配置 - 繫結或預設。

首先,binder 將檢查是否在繫結級別提供了 Serde。例如,如果您有以下處理器,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然後,您可以使用以下方式提供繫結級別 Serde

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您如上所述為每個輸入繫結提供 Serde,那麼這將具有更高的優先順序,並且 binder 將避免任何 Serde 推理。

如果您希望將預設的鍵/值 Serdes 用於入站反序列化,則可以在 binder 級別進行設定。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不希望使用 Kafka 提供的原生解碼,則可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設設定,為了讓 Spring Cloud Stream 反序列化入站值物件,您需要顯式停用原生解碼。

例如,如果您擁有與上面相同的 BiFunction 處理器,那麼 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false。您需要單獨停用所有輸入的原生解碼。否則,對於那些您未停用的輸入,原生解碼仍將適用。

預設情況下,Spring Cloud Stream 將使用 application/json 作為內容型別並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和適當的 MessageConverter bean 來使用自定義訊息轉換器。

spring.cloud.stream.bindings.process-in-0.contentType

2.5.2. 出站序列化

出站序列化與上述入站反序列化遵循相同的規則。與入站反序列化一樣,Spring Cloud Stream 以前版本的一個主要變化是出站序列化由 Kafka 原生處理。在 binder 的 3.0 版本之前,這是由框架本身完成的。

出站鍵始終由 Kafka 使用 binder 推斷的匹配 Serde 進行序列化。如果它無法推斷鍵的型別,則需要透過配置指定。

值 Serdes 使用與入站反序列化相同的規則進行推斷。首先,它匹配以查看出站型別是否來自應用程式中提供的 bean。如果不是,它會檢查是否與 Kafka 公開的 Serde 匹配,例如 - IntegerLongShortDoubleFloatbyte[]UUIDString。如果不起作用,它會回退到 Spring Kafka 專案提供的 JsonSerde,但首先會檢視預設的 Serde 配置以檢視是否有匹配項。請記住,所有這些對應用程式都是透明的。如果這些都不起作用,則使用者必須透過配置提供要使用的 Serde

假設您正在使用與上面相同的 BiFunction 處理器。然後,您可以按如下方式配置出站鍵/值 Serdes。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推理失敗,並且沒有提供繫結級別的 Serdes,那麼 binder 將回退到 JsonSerde,但會檢視預設的 Serdes 以進行匹配。

預設 Serdes 的配置方式與上面在反序列化部分描述的方式相同。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您的應用程式使用分支功能並具有多個輸出繫結,則需要為每個繫結配置這些功能。同樣,如果 binder 能夠推斷 Serde 型別,則無需進行此配置。

如果您不希望使用 Kafka 提供的原生編碼,而是希望使用框架提供的訊息轉換,那麼您需要顯式停用原生編碼,因為原生編碼是預設設定。例如,如果您有與上面相同的 BiFunction 處理器,那麼 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false。在分支的情況下,您需要單獨停用所有輸出的原生編碼。否則,對於那些您未停用的輸出,原生編碼仍將適用。

當由 Spring Cloud Stream 完成轉換時,預設情況下,它將使用 application/json 作為內容型別並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter bean 來使用自定義訊息轉換器。

spring.cloud.stream.bindings.process-out-0.contentType

當停用原生編碼/解碼時,binder 不會像原生 Serdes 那樣進行任何推斷。應用程式需要顯式提供所有配置選項。因此,建議在編寫 Spring Cloud Stream Kafka Streams 應用程式時,通常堅持使用預設的反序列化選項並堅持使用 Kafka Streams 提供的原生反序列化。您必須使用框架提供的訊息轉換能力的唯一場景是當您的上游生產者使用特定的序列化策略時。在這種情況下,您希望使用匹配的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde 機制時,應用程式必須確保 binder 能夠正確地將入站和出站對映到適當的 Serde,否則可能會失敗。

值得一提的是,上面概述的資料解/序列化方法僅適用於處理器的邊緣,即入站和出站。您的業務邏輯可能仍需要呼叫需要顯式 Serde 物件的 Kafka Streams API。這些仍然是應用程式的責任,必須由開發人員相應地處理。

2.6. 錯誤處理

Apache Kafka Streams 提供了原生處理反序列化錯誤的異常處理能力。有關此支援的詳細資訊,請參閱 文件。Apache Kafka Streams 開箱即用地提供了兩種反序列化異常處理器 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顧名思義,前者將記錄錯誤並繼續處理下一個記錄,後者將記錄錯誤並失敗。LogAndFailExceptionHandler 是預設的反序列化異常處理器。

2.6.1. 處理 Binder 中的反序列化異常

Kafka Streams binder 允許使用以下屬性指定上述反序列化異常處理器。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述兩種反序列化異常處理器外,binder 還提供了第三種,用於將錯誤記錄(毒丸)傳送到 DLQ(死信佇列)主題。以下是啟用此 DLQ 異常處理器的方法。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

設定上述屬性後,所有反序列化錯誤的記錄將自動傳送到 DLQ 主題。

您可以如下設定釋出 DLQ 訊息的主題名稱。

您可以提供 DlqDestinationResolver 的實現,它是一個函式式介面。DlqDestinationResolverConsumerRecord 和異常作為輸入,然後允許指定主題名稱作為輸出。透過訪問 Kafka ConsumerRecord,可以在 BiFunction 的實現中檢查頭部記錄。

以下是提供 DlqDestinationResolver 實現的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的實現時,需要記住一件重要的事情,那就是繫結器中的 provisioner 不會自動為應用程式建立主題。這是因為繫結器無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果您使用此策略提供 DLQ 名稱,應用程式有責任確保這些主題事先已建立。

如果應用程式中存在 DlqDestinationResolver bean,則它具有更高的優先順序。如果您不想遵循此方法,而是希望透過配置提供靜態 DLQ 名稱,則可以設定以下屬性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果設定了此項,則錯誤記錄將傳送到主題 custom-dlq。如果應用程式未使用上述任何一種策略,則它將建立一個名為 error.<input-topic-name>.<application-id> 的 DLQ 主題。例如,如果您的繫結的目標主題是 inputTopic,應用程式 ID 是 process-applicationId,則預設 DLQ 主題是 error.inputTopic.process-applicationId。如果您打算啟用 DLQ,始終建議為每個輸入繫結顯式建立一個 DLQ 主題。

2.6.2. 每個輸入消費者繫結的 DLQ

屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 適用於整個應用程式。這意味著如果同一應用程式中有多個函式或 StreamListener 方法,此屬性將應用於所有這些方法。但是,如果一個處理器中有多個處理器或多個輸入繫結,則可以使用 binder 為每個輸入消費者繫結提供的更細粒度的 DLQ 控制。

如果您有以下處理器,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

並且您只想在第一個輸入繫結上啟用 DLQ,在第二個繫結上跳過並繼續,那麼您可以在消費者上按如下操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

以這種方式設定反序列化異常處理器比在 binder 級別設定具有更高的優先順序。

2.6.3. DLQ 分割槽

預設情況下,記錄釋出到死信主題時使用與原始記錄相同的分割槽。這意味著死信主題必須至少有與原始記錄相同數量的分割槽。

要更改此行為,請將 DlqPartitionFunction 實現新增為應用程式上下文中的 @Bean。只能存在一個這樣的 bean。該函式提供消費者組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord 和異常。例如,如果您總是希望路由到分割槽 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您將消費者繫結的 dlqPartitions 屬性設定為 1(並且繫結的 minPartitionCount 等於 1),則無需提供 DlqPartitionFunction;框架將始終使用分割槽 0。如果您將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或繫結的 minPartitionCount 大於 1),則您必須提供一個 DlqPartitionFunction bean,即使分割槽計數與原始主題相同。

在使用 Kafka Streams binder 中的異常處理功能時,需要注意幾點。

  • 屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 適用於整個應用程式。這意味著如果同一應用程式中有多個函式或 StreamListener 方法,此屬性將應用於所有這些方法。

  • 反序列化的異常處理與原生反序列化和框架提供的訊息轉換一致工作。

2.6.4. 處理 Binder 中的生產異常

與上述反序列化異常處理程式的支援不同,binder 不提供用於處理生產異常的此類一流機制。但是,您仍然可以使用 StreamsBuilderFactoryBean 定製器來配置生產異常處理程式,您可以在下面的後續部分中找到更多詳細資訊。

2.7. 重試關鍵業務邏輯

在某些情況下,您可能希望重試應用程式中對業務邏輯至關重要的部分。這可能是對關係資料庫的外部呼叫,或從 Kafka Streams 處理器呼叫 REST 端點。這些呼叫可能由於各種原因(例如網路問題或遠端服務不可用)而失敗。通常,如果可以再次嘗試,這些故障可能會自行解決。預設情況下,Kafka Streams binder 會為所有輸入繫結建立 RetryTemplate bean。

如果函式具有以下簽名,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

並且使用預設繫結名稱,RetryTemplate 將註冊為 process-in-0-RetryTemplate。這遵循繫結名稱(process-in-0)後跟字面量 -RetryTemplate 的約定。在多個輸入繫結的情況下,每個繫結將有一個單獨的 RetryTemplate bean 可用。如果應用程式中有一個自定義 RetryTemplate bean,並透過 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供,那麼它將優先於任何輸入繫結級別的重試模板配置屬性。

一旦將繫結中的 RetryTemplate 注入到應用程式中,它就可以用於重試應用程式的任何關鍵部分。這是一個示例

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者您可以使用自定義 RetryTemplate 如下。

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

請注意,當重試次數耗盡時,預設情況下會丟擲最後一個異常,導致處理器終止。如果您希望處理異常並繼續處理,可以將 RecoveryCallback 新增到 execute 方法中:這是一個示例。

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

有關 RetryTemplate、重試策略、回退策略等的更多資訊,請參閱 Spring Retry 專案。

2.8. 狀態儲存

當使用高階 DSL 並進行觸發狀態儲存的適當呼叫時,Kafka Streams 會自動建立狀態儲存。

如果您想將傳入的 KTable 繫結具體化為命名狀態儲存,那麼您可以使用以下策略。

假設您有以下函式。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然後透過設定以下屬性,傳入的 KTable 資料將被具體化到命名狀態儲存中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在應用程式中將自定義狀態儲存定義為 bean,這些狀態儲存將被 binder 檢測並新增到 Kafka Streams 構建器中。特別是當使用處理器 API 時,您需要手動註冊一個狀態儲存。為此,您可以在應用程式中將 StateStore 建立為一個 bean。以下是定義此類 bean 的示例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

這些狀態儲存可以由應用程式直接訪問。

在引導期間,上述 bean 將由 binder 處理並傳遞給 Streams 構建器物件。

訪問狀態儲存

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

在註冊全域性狀態儲存時,這將不起作用。為了註冊全域性狀態儲存,請參閱下面關於自定義 StreamsBuilderFactoryBean 的部分。

2.9. 互動式查詢

Kafka Streams binder API 公開了一個名為 InteractiveQueryService 的類,用於互動式查詢狀態儲存。您可以在應用程式中將其作為 Spring bean 訪問。從應用程式訪問此 bean 的簡單方法是自動注入此 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您訪問到這個 bean,就可以查詢您感興趣的特定狀態儲存。請看下面。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在啟動期間,上述檢索儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存。在這種情況下,重試此操作將很有用。Kafka Streams binder 提供了一個簡單的重試機制來適應這種情況。

以下是可用於控制此重試的兩個屬性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為 1000 毫秒。

如果有多個 Kafka Streams 應用程式例項正在執行,那麼在互動式查詢它們之前,您需要識別哪個應用程式例項託管您正在查詢的特定鍵。InteractiveQueryService API 提供了識別主機資訊的方法。

為了使此功能正常工作,您必須按如下方式配置屬性 application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些程式碼片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有關這些主機查詢方法的更多資訊,請參閱這些方法的 Javadoc。對於這些方法,在啟動期間,如果底層 KafkaStreams 物件尚未準備就緒,它們可能會丟擲異常。上述重試屬性也適用於這些方法。

2.9.1. InteractiveQueryService 提供的其他 API 方法

使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KeyQueryMetadata 物件。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KakfaStreams 物件。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

2.10. 健康指示器

健康指示器需要依賴 spring-boot-starter-actuator。對於 Maven,請使用

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Spring Cloud Stream Kafka Streams Binder 提供了一個健康指示器來檢查底層 streams 執行緒的狀態。Spring Cloud Stream 定義了一個屬性 management.health.binders.enabled 來啟用健康指示器。請參閱 Spring Cloud Stream 文件

健康指示器為每個流執行緒的元資料提供以下詳細資訊

  • 執行緒名稱

  • 執行緒狀態:CREATEDRUNNINGPARTITIONS_REVOKEDPARTITIONS_ASSIGNEDPENDING_SHUTDOWNDEAD

  • 活動任務:任務 ID 和分割槽

  • 備用任務:任務 ID 和分割槽

預設情況下,只顯示全域性狀態(UPDOWN)。要顯示詳細資訊,屬性 management.endpoint.health.show-details 必須設定為 ALWAYSWHEN_AUTHORIZED。有關健康資訊的更多詳細資訊,請參閱 Spring Boot Actuator 文件

如果所有已註冊的 Kafka 執行緒都處於 RUNNING 狀態,則健康指示器的狀態為 UP

由於 Kafka Streams binder 中有三個獨立的 binder(KStreamKTableGlobalKTable),它們都將報告健康狀態。啟用 show-details 時,報告的一些資訊可能冗餘。

當同一個應用程式中存在多個 Kafka Streams 處理器時,將為所有處理器報告健康檢查,並按 Kafka Streams 的應用程式 ID 進行分類。

2.11. 訪問 Kafka Streams 指標

Spring Cloud Stream Kafka Streams binder 提供了 Kafka Streams 指標,可以透過 Micrometer MeterRegistry 匯出。

對於 Spring Boot 2.2.x 版本,指標支援透過 binder 的自定義 Micrometer 指標實現提供。對於 Spring Boot 2.3.x 版本,Kafka Streams 指標支援透過 Micrometer 本地提供。

透過 Boot actuator 端點訪問指標時,請確保將 metrics 新增到屬性 management.endpoints.web.exposure.include 中。然後,您可以訪問 /acutator/metrics 以獲取所有可用指標的列表,然後可以透過相同的 URI(/actuator/metrics/<metric-name>)單獨訪問這些指標。

2.12. 混合使用高階 DSL 和低階 Processor API

Kafka Streams 提供了兩種 API 變體。它有一個更高階的 DSL 樣式的 API,您可以鏈式呼叫各種操作,這對於許多函式式程式設計師來說可能很熟悉。Kafka Streams 還提供了低階處理器 API。處理器 API 雖然功能強大,並且能夠以更低的級別控制事物,但其本質是命令式的。Spring Cloud Stream 的 Kafka Streams binder 允許您使用高階 DSL 或混合使用 DSL 和處理器 API。混合使用這兩種變體為您提供了許多選項來控制應用程式中的各種用例。應用程式可以使用 transformprocess 方法 API 呼叫來訪問處理器 API。

以下是如何在 Spring Cloud Stream 應用程式中使用 process API 結合 DSL 和處理器 API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API 方法呼叫是終端操作,而 transform API 是非終端操作,它為您提供一個潛在轉換的 KStream,您可以使用它繼續使用 DSL 或處理器 API 進行進一步處理。

2.13. 出站分割槽支援

Kafka Streams 處理器通常將處理後的輸出傳送到出站 Kafka 主題。如果出站主題已分割槽並且處理器需要將出站資料傳送到特定分割槽,則應用程式需要提供一個 StreamPartitioner 型別的 bean。有關詳細資訊,請參閱 StreamPartitioner。讓我們看一些示例。

這與我們之前多次看到的處理器相同,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

這是輸出繫結目的地

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主題 outputTopic 有 4 個分割槽,如果您不提供分割槽策略,Kafka Streams 將使用預設分割槽策略,這可能不是您想要的輸出,具體取決於特定用例。假設您想將任何匹配 spring 的鍵傳送到分割槽 0,將 cloud 傳送到分割槽 1,將 stream 傳送到分割槽 2,將所有其他鍵傳送到分割槽 3。以下是您需要在應用程式中執行的操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

這是一個粗略的實現,但是,您可以訪問記錄的鍵/值、主題名稱和分割槽總數。因此,如有需要,您可以實現複雜的分割槽策略。

您還需要將此 bean 名稱與應用程式配置一起提供。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

應用程式中的每個輸出主題都需要像這樣單獨配置。

2.14. StreamsBuilderFactoryBean 定製器

通常需要自定義建立 KafkaStreams 物件的 StreamsBuilderFactoryBean。基於 Spring Kafka 提供的底層支援,binder 允許您自定義 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanCustomizer 來自定義 StreamsBuilderFactoryBean 本身。然後,一旦您透過此定製器訪問 StreamsBuilderFactoryBean,您就可以使用 KafkaStreamsCustomzier 自定義相應的 KafkaStreams。這兩個定製器都是 Spring for Apache Kafka 專案的一部分。

以下是使用 StreamsBuilderFactoryBeanCustomizer 的示例。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面所示的只是一個示例,說明了您可以如何自定義 StreamsBuilderFactoryBean。您可以呼叫 StreamsBuilderFactoryBean 中的任何可用修改操作來自定義它。此定製器將在工廠 bean 啟動之前由 binder 呼叫。

一旦您獲得了 StreamsBuilderFactoryBean 的訪問許可權,您還可以自定義底層的 KafkaStreams 物件。以下是實現此目的的藍圖。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 將在底層 KafkaStreams 啟動之前由 StreamsBuilderFactoryBeabn 呼叫。

整個應用程式中只能有一個 StreamsBuilderFactoryBeanCustomizer。那麼,我們如何處理多個 Kafka Streams 處理器呢,因為它們每個都由獨立的 StreamsBuilderFactoryBean 物件支援?在這種情況下,如果這些處理器的自定義需要不同,那麼應用程式需要根據應用程式 ID 應用一些過濾器。

例如,

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {

    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

2.14.1. 使用 Customizer 註冊全域性狀態儲存

如上所述,binder 不提供一種一流的方式來註冊全域性狀態儲存。為此,您需要使用定製器。以下是如何實現此目的。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同樣,如果您有多個處理器,您可能希望透過如上所述使用應用程式 ID 過濾掉其他 StreamsBuilderFactoryBean 物件,將全域性狀態儲存附加到正確的 StreamsBuilder

2.14.2. 使用 Customizer 註冊生產異常處理器

在錯誤處理部分,我們指出 binder 沒有提供一種一流的方式來處理生產異常。儘管如此,您仍然可以使用 StreamsBuilderFacotryBean 定製器來註冊生產異常處理程式。請參閱下文。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次,如果您有多個處理器,您可能需要針對正確的 StreamsBuilderFactoryBean 適當設定它。您也可以使用配置屬性新增此類生產異常處理程式(有關更多資訊,請參見下文),但如果您選擇程式設計方法,這是一個選項。

2.15. 時間戳提取器

Kafka Streams 允許您根據各種時間戳概念控制消費者記錄的處理。預設情況下,Kafka Streams 提取嵌入在消費者記錄中的時間戳元資料。您可以透過為每個輸入繫結提供不同的 TimestampExtractor 實現來更改此預設行為。以下是有關如何實現此目的的一些詳細資訊。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然後為每個消費者繫結設定上述 TimestampExtractor bean 名稱。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果您跳過設定自定義時間戳提取器的輸入消費者繫結,則該消費者將使用預設設定。

2.16. 結合 Kafka Streams binder 和常規 Kafka Binder 的多 binder

您可以有一個應用程式,其中既有基於常規 Kafka binder 的函式/消費者/供應商,也有基於 Kafka Streams 的處理器。但是,您不能在一個函式或消費者中混合使用它們。

這是一個示例,其中您在同一個應用程式中同時擁有基於 binder 的元件。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

這是配置中的相關部分

spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您的應用程式與上述相同,但處理兩個不同的 Kafka 叢集,例如,常規的 process 作用於 Kafka 叢集 1 和叢集 2(從叢集 1 接收資料併發送到叢集 2),而 Kafka Streams 處理器作用於 Kafka 叢集 2。那麼事情就會變得更加複雜。您必須使用 Spring Cloud Stream 提供的 多繫結器 功能。

以下是您的配置在這種情況下可能如何變化。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.stream.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

請注意上面的配置。我們有兩種繫結器,但總共有 3 個繫結器,第一個是基於叢集 1 (kafka1) 的常規 Kafka 繫結器,然後是基於叢集 2 (kafka2) 的另一個 Kafka 繫結器,最後是 kstream (kafka3)。應用程式中的第一個處理器從 kafka1 接收資料併發布到 kafka2,其中兩個繫結器都基於常規 Kafka 繫結器,但叢集不同。第二個處理器是 Kafka Streams 處理器,它從 kafka3 消費資料,該叢集與 kafka2 相同,但繫結器型別不同。

由於 Kafka Streams 繫結器系列中有三種不同的繫結器型別 - kstreamktableglobalktable - 如果您的應用程式有基於這些繫結器中的任何一個的多個繫結,則需要明確提供繫結器型別。

例如,如果您有以下處理器,

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那麼,這必須在多繫結器場景中配置如下。請注意,這僅在您有一個真正的多繫結器場景時才需要,即單個應用程式中有多個處理器處理多個叢集。在這種情況下,需要顯式為繫結提供繫結器,以區分其他處理器的繫結器型別和叢集。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.

2.17. 狀態清理

預設情況下,當繫結停止時,不會清理任何本地狀態。這與 Spring Kafka 2.7 版本以來的行為相同。有關更多詳細資訊,請參閱 Spring Kafka 文件。要修改此行為,只需將單個 CleanupConfig @Bean(配置為在啟動、停止或兩者都不進行清理)新增到應用程式上下文;該 bean 將被檢測並連線到工廠 bean 中。

2.18. Kafka Streams 拓撲視覺化

Kafka Streams binder 提供了以下執行器端點,用於檢索拓撲描述,您可以使用外部工具視覺化該拓撲。

/actuator/kafkastreamstopology

/actuator/kafkastreamstopology/<application-id of the processor>

您需要包含 Spring Boot 的 actuator 和 web 依賴項才能訪問這些端點。此外,您還需要將 kafkastreamstopology 新增到 management.endpoints.web.exposure.include 屬性中。預設情況下,kafkastreamstopology 端點是停用的。

2.19. Kafka Streams 應用程式中基於事件型別的路由

Kafka Streams binder 不支援常規訊息通道繫結器中可用的路由功能。但是,Kafka Streams binder 仍然透過入站記錄上的事件型別記錄頭提供路由功能。

要啟用基於事件型別的路由,應用程式必須提供以下屬性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

這可以是逗號分隔的值。

例如,假設我們有這個函式

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

讓我們還假設我們只希望在此函式中執行業務邏輯,如果傳入記錄的事件型別為 foobar。這可以使用繫結上的 eventTypes 屬性表示如下。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

現在,當應用程式執行時,binder 會檢查每個傳入記錄的 event_type 頭部,檢視其值是否設定為 foobar。如果未找到其中任何一個,則將跳過函式執行。

預設情況下,binder 期望記錄頭鍵為 event_type,但可以為每個繫結更改。例如,如果我們要將此繫結上的頭鍵更改為 my_event 而不是預設值,則可以如下更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

2.20. Kafka Streams binder 中的繫結視覺化和控制

從 3.1.2 版本開始,Kafka Streams binder 支援繫結視覺化和控制。只支援 STOPPEDSTARTED 兩種生命週期階段。Kafka Streams binder 中不提供 PAUSEDRESUMED 生命週期階段。

為了啟用繫結視覺化和控制,應用程式需要包含以下兩個依賴項。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果您喜歡使用 webflux,則可以包含 spring-boot-starter-webflux 而不是標準 web 依賴項。

此外,您還需要設定以下屬性

management.endpoints.web.exposure.include=bindings

為了進一步說明此功能,讓我們使用以下應用程式作為指南

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

如我們所見,該應用程式有兩個 Kafka Streams 函式 - 一個是消費者,另一個是函式。消費者繫結預設命名為 consumer-in-0。類似地,對於函式,輸入繫結是 function-in-0,輸出繫結是 function-out-0

應用程式啟動後,我們可以使用以下繫結端點找到有關繫結的詳細資訊。

 curl https://:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

上面可以找到所有三個繫結的詳細資訊。

現在讓我們停止 consumer-in-0 繫結。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

此時,將不會透過此繫結接收任何記錄。

再次啟動繫結。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

當單個函式上存在多個繫結時,在其中任何一個繫結上呼叫這些操作都將起作用。這是因為單個函式上的所有繫結都由同一個 StreamsBuilderFactoryBean 支援。因此,對於上述函式,function-in-0function-out-0 都將起作用。

2.21. 手動啟動 Kafka Streams 處理器

Spring Cloud Stream Kafka Streams binder 在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一個名為 StreamsBuilderFactoryManager 的抽象。此管理器 API 用於控制基於 binder 的應用程式中每個處理器的多個 StreamsBuilderFactoryBean。因此,當使用 binder 時,如果您想手動控制應用程式中各種 StreamsBuilderFactoryBean 物件的自動啟動,則需要使用 StreamsBuilderFactoryManager。您可以使用屬性 spring.kafka.streams.auto-startup 並將其設定為 false 以關閉處理器的自動啟動。然後,在應用程式中,您可以使用以下內容透過 StreamsBuilderFactoryManager 啟動處理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

當您希望應用程式在主執行緒中啟動並讓 Kafka Streams 處理器單獨啟動時,此功能非常有用。例如,當您有一個需要恢復的大型狀態儲存時,如果處理器像預設情況下那樣正常啟動,這可能會阻止您的應用程式啟動。如果您正在使用某種活躍度探測機制(例如在 Kubernetes 上),它可能會認為應用程式已關閉並嘗試重新啟動。為了糾正此問題,您可以將 spring.kafka.streams.auto-startup 設定為 false 並遵循上述方法。

請記住,當使用 Spring Cloud Stream binder 時,您不是直接處理 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因為 StreamsBuilderFactoryBean 物件由 binder 內部管理。

2.22. 選擇性手動啟動 Kafka Streams 處理器

雖然上述方法將透過 StreamsBuilderFactoryManager 無條件地對應用程式中的所有 Kafka Streams 處理器應用 auto start false,但通常希望只有個別選定的 Kafka Streams 處理器不自動啟動。例如,假設您的應用程式中有三個不同的函式(處理器),並且對於其中一個處理器,您不希望它作為應用程式啟動的一部分自動啟動。以下是這種情況的一個示例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述場景中,如果您將 spring.kafka.streams.auto-startup 設定為 false,則所有處理器都不會在應用程式啟動期間自動啟動。在這種情況下,您必須透過呼叫底層 StreamsBuilderFactoryManager 上的 start() 來以程式設計方式啟動它們,如上所述。但是,如果我們要選擇性地停用一個處理器,則必須在該處理器的單個繫結上設定 auto-startup。假設我們不希望 process3 函式自動啟動。這是一個具有兩個輸入繫結(process3-in-0process3-in-1)的 BiFunction。為了避免此處理器自動啟動,您可以選擇任何一個輸入繫結並在其上設定 auto-startup。選擇哪個繫結無關緊要;如果您願意,可以在兩個繫結上都將 auto-startup 設定為 false,但一個就足夠了。因為它們共享同一個工廠 bean,所以您不必在兩個繫結上都將 autoStartup 設定為 false,但為了清晰起見,這樣做可能更有意義。

以下是您可以用於停用此處理器自動啟動的 Spring Cloud Stream 屬性。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然後,您可以手動啟動處理器,無論是使用 REST 端點還是使用 BindingsEndpoint API,如下所示。為此,您需要確保類路徑上存在 Spring Boot actuator 依賴項。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/process3-in-0

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有關此機制的更多詳細資訊,請參閱參考文件中的 此部分

當透過停用 auto-startup 控制繫結(如本節所述)時,請注意這僅適用於消費者繫結。換句話說,如果您使用生產者繫結 process3-out-0,它在停用處理器自動啟動方面沒有任何效果,儘管此生產者繫結與消費者繫結使用相同的 StreamsBuilderFactoryBean

2.23. 使用 Spring Cloud Sleuth 進行追蹤

當 Spring Cloud Sleuth 在基於 Spring Cloud Stream Kafka Streams binder 的應用程式的類路徑中時,其消費者和生產者都會自動透過跟蹤資訊進行檢測。但是,為了跟蹤任何特定於應用程式的操作,這些操作需要由使用者程式碼顯式檢測。這可以透過在應用程式中注入 Spring Cloud Sleuth 的 KafkaStreamsTracing bean,然後透過此注入的 bean 呼叫各種 Kafka Streams 操作來完成。以下是使用它的一些示例。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的示例中,有兩個地方添加了顯式跟蹤檢測。首先,我們正在記錄來自傳入 KStream 的鍵/值資訊。當記錄此資訊時,相關的 span 和 trace ID 也將被記錄,以便監控系統可以跟蹤它們並與相同的 span id 相關聯。其次,當我們呼叫 map 操作時,我們不是直接在 KStream 類上呼叫它,而是將其包裝在 transform 操作中,然後從 KafkaStreamsTracing 呼叫 map。在這種情況下,記錄的訊息也將包含 span ID 和 trace ID。

這是另一個示例,我們使用低階 transformer API 訪問各種 Kafka Streams 頭部。當 spring-cloud-sleuth 在類路徑中時,所有跟蹤頭部也可以這樣訪問。

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}

2.24. 配置選項

本節包含 Kafka Streams binder 使用的配置選項。

有關繫結器的常見配置選項和屬性,請參閱核心文件

2.24.1. Kafka Streams Binder 屬性

以下屬性在繫結器級別可用,並且必須以 spring.cloud.stream.kafka.streams.binder. 為字首。Kafka Streams 繫結器中重複使用的任何 Kafka 繫結器提供的屬性必須以 spring.cloud.stream.kafka.streams.binder 為字首,而不是 spring.cloud.stream.kafka.binder。此規則的唯一例外是定義 Kafka 引導伺服器屬性時,在這種情況下,任何字首都可以。

configuration

包含 Apache Kafka Streams API 相關屬性的鍵/值對對映。此屬性必須以 spring.cloud.stream.kafka.streams.binder. 為字首。以下是一些使用此屬性的示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有關 Streams 配置中可能包含的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig JavaDocs。您可以透過此屬性設定 StreamsConfig 中的所有配置。當使用此屬性時,它適用於整個應用程式,因為這是一個 binder 級別屬性。如果應用程式中有多個處理器,所有處理器都將獲取這些屬性。在 application.id 等屬性的情況下,這可能會出現問題,因此您必須仔細檢查 StreamsConfig 中的屬性如何使用此 binder 級別的 configuration 屬性進行對映。

functions.<function-bean-name>.applicationId

僅適用於函式式處理器。這可用於在應用程式中為每個函式設定應用程式 ID。在多個函式的情況下,這是一種設定應用程式 ID 的便捷方式。

functions.<function-bean-name>.configuration

僅適用於函式式處理器。包含 Apache Kafka Streams API 相關屬性的鍵/值對對映。這類似於上面描述的 binder 級別的 configuration 屬性,但此級別的 configuration 屬性僅限於命名函式。當您有多個處理器並且希望根據特定函式限制對配置的訪問時,您可能希望使用此屬性。此處可以使用所有 StreamsConfig 屬性。

brokers

Broker URL

預設值:localhost

zkNodes

Zookeeper URL

預設值:localhost

deserializationExceptionHandler

反序列化錯誤處理程式型別。此處理程式應用於繫結器級別,因此應用於應用程式中的所有輸入繫結。有一種更細粒度的方式可以在消費者繫結級別控制它。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

applicationId

在繫結器級別全域性設定 Kafka Streams 應用程式的 application.id 的便捷方式。如果應用程式包含多個函式或 StreamListener 方法,則應以不同方式設定應用程式 ID。詳細討論應用程式 ID 的設定請參見上文。

預設值:應用程式將生成一個靜態應用程式 ID。有關詳細資訊,請參閱應用程式 ID 部分。

stateStoreRetry.maxAttempts

嘗試連線到狀態儲存的最大嘗試次數。

預設值:1

stateStoreRetry.backoffPeriod

重試時連線到狀態儲存的退避週期。

預設值:1000 毫秒

consumerProperties

繫結器級別的任意消費者屬性。

producerProperties

繫結器級別的任意生產者屬性。

includeStoppedProcessorsForHealthCheck

當透過 actuator 停止處理器的繫結時,預設情況下此處理器將不參與健康檢查。將此屬性設定為 true 以啟用所有處理器的健康檢查,包括那些當前透過繫結 actuator 端點停止的處理器。

預設值:false

2.24.2. Kafka Streams 生產者屬性

以下屬性適用於 Kafka Streams 生產者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 為字首。為方便起見,如果存在多個輸出繫結並且它們都需要一個公共值,則可以透過使用字首 spring.cloud.stream.kafka.streams.default.producer. 進行配置。

keySerde

要使用的鍵序列化器

預設值:參見上面關於訊息解/序列化的討論

valueSerde

要使用的值序列化器

預設值:參見上面關於訊息解/序列化的討論

useNativeEncoding

啟用/停用原生編碼的標誌

預設值:true

streamPartitionerBeanName

用於消費者的自定義出站分割槽器 bean 名稱。應用程式可以提供自定義的 StreamPartitioner 作為 Spring bean,並且可以將此 bean 的名稱提供給生產者以使用,而不是預設的。

預設值:參見上面關於出站分割槽支援的討論。

producedAs

處理器正在生產到的 sink 元件的自定義名稱。

預設值:none (由 Kafka Streams 生成)

2.24.3. Kafka Streams 消費者屬性

以下屬性適用於 Kafka Streams 消費者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 為字首。為方便起見,如果存在多個輸入繫結並且它們都需要一個公共值,則可以透過使用字首 spring.cloud.stream.kafka.streams.default.consumer. 進行配置。

applicationId

為每個輸入繫結設定 application.id。這僅適用於基於 StreamListener 的處理器,對於基於函式的處理器,請參閱上面概述的其他方法。

預設值:參見上文。

keySerde

要使用的鍵序列化器

預設值:參見上面關於訊息解/序列化的討論

valueSerde

要使用的值序列化器

預設值:參見上面關於訊息解/序列化的討論

materializedAs

使用傳入 KTable 型別時要具體化的狀態儲存

預設值:none

useNativeDecoding

啟用/停用原生解碼的標誌

預設值:true

dlqName

DLQ 主題名稱。

預設值:參見上面關於錯誤處理和 DLQ 的討論。

startOffset

如果沒有已提交的偏移量可供消費,則從哪個偏移量開始。這主要用於消費者首次從主題消費時。Kafka Streams 使用 earliest 作為預設策略,binder 使用相同的預設值。這可以透過此屬性覆蓋為 latest

預設值:earliest

注意:在消費者上使用 resetOffsets 對 Kafka Streams binder 沒有影響。與基於訊息通道的 binder 不同,Kafka Streams binder 不會按需查詢開頭或結尾。

deserializationExceptionHandler

反序列化錯誤處理程式型別。此處理程式應用於每個消費者繫結,而不是之前描述的繫結器級別屬性。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

timestampExtractorBeanName

特定時間戳提取器 bean 名稱,用於消費者。應用程式可以提供 TimestampExtractor 作為 Spring bean,並且可以將此 bean 的名稱提供給消費者以使用,而不是預設的。

預設值:參見上面關於時間戳提取器的討論。

eventTypes

此繫結支援的事件型別列表,逗號分隔。

預設值:none

eventTypeHeaderKey

透過此繫結傳入的每條記錄上的事件型別頭鍵。

預設值:event_type

consumedAs

處理器從中消費的源元件的自定義名稱。

預設值:none (由 Kafka Streams 生成)

2.24.4. 關於併發的特別說明

在 Kafka Streams 中,您可以使用 num.stream.threads 屬性控制處理器可以建立的執行緒數。您可以透過上述繫結器、函式、生產者或消費者級別的各種 configuration 選項來完成此操作。您也可以為此目的使用 Spring Cloud Stream 核心提供的 concurrency 屬性。使用此屬性時,您需要將其應用於消費者。當函式或 StreamListener 中有多個輸入繫結時,請將其設定在第一個輸入繫結上。例如,當設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 時,它將被繫結器轉換為 num.stream.threads。如果您有多個處理器,並且一個處理器定義了繫結級別併發,而其他處理器沒有,則那些沒有繫結級別併發的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的繫結器範圍屬性。如果此繫結器配置不可用,則應用程式將使用 Kafka Streams 設定的預設值。

3. 提示、技巧和秘籍

3.1. Kafka 簡單 DLQ

3.1.1. 問題陳述

作為開發人員,我希望編寫一個從 Kafka 主題處理記錄的消費者應用程式。但是,如果在處理過程中發生一些錯誤,我不想讓應用程式完全停止。相反,我想將錯誤記錄傳送到 DLT(死信主題),然後繼續處理新記錄。

3.1.2. 解決方案

解決此問題的方法是使用 Spring Cloud Stream 中的 DLQ 功能。為了便於討論,我們假設以下是我們的處理器函式。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };

這是一個非常簡單的函式,它對處理的所有記錄都丟擲異常,但您可以將此函式擴充套件到任何其他類似情況。

為了將錯誤記錄傳送到 DLT,我們需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

為了啟用 DLQ,應用程式必須提供一個組名。匿名消費者無法使用 DLQ 功能。我們還需要透過將 Kafka 消費者繫結上的 enableDLQ 屬性設定為 true 來啟用 DLQ。最後,我們可以選擇性地透過在 Kafka 消費者繫結上提供 dlqName 來提供 DLT 名稱,否則在此情況下預設為 input-topic-dlq.my-group.error

請注意,在上面提供的示例消費者中,有效載荷的型別是 byte[]。預設情況下,Kafka binder 中的 DLQ 生產者期望有效載荷型別為 byte[]。如果不是這種情況,那麼我們需要提供正確序列化器的配置。例如,讓我們將消費者函式重新編寫如下

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

現在,我們需要告訴 Spring Cloud Stream,我們希望在寫入 DLT 時如何序列化資料。以下是此場景的修改配置

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

3.2. 帶高階重試選項的 DLQ

3.2.1. 問題陳述

這與上面的方法類似,但作為開發人員,我希望配置重試的處理方式。

3.2.2. 解決方案

如果您遵循了上述方法,那麼當處理遇到錯誤時,您將獲得 Kafka binder 中內建的預設重試選項。

預設情況下,binder 最多重試 3 次,初始延遲 1 秒,每次退避乘數為 2.0,最大延遲為 10 秒。您可以如下更改所有這些配置

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果需要,您還可以透過提供布林值對映來提供可重試異常列表。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

預設情況下,未在上述對映中列出的任何異常都將被重試。如果不需要,可以透過提供以下內容停用:

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您還可以提供自己的 RetryTemplate 並將其標記為 @StreamRetryTemplate,它將被繫結器掃描並使用。這在您需要更復雜的重試策略和策略時很有用。

如果您有多個 @StreamRetryTemplate bean,那麼您可以使用以下屬性指定您的繫結想要使用哪一個:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

3.3. 使用 DLQ 處理反序列化錯誤

3.3.1. 問題陳述

我有一個處理器,在 Kafka 消費者中遇到反序列化異常。我期望 Spring Cloud Stream DLQ 機制能捕獲這種情況,但它沒有。我該如何處理?

3.3.2. 解決方案

當 Kafka 消費者丟擲不可恢復的反序列化異常時,Spring Cloud Stream 提供的正常 DLQ 機制將無濟於事。這是因為此異常甚至在消費者 poll() 方法返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助 binder 處理這種情況。讓我們探討一下。

假設這是我們的函式

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

這是一個簡單的函式,它接受一個 String 引數。

我們希望繞過 Spring Cloud Stream 提供的訊息轉換器,而使用原生反序列化器。對於 String 型別,這沒什麼意義,但對於更復雜的型別,如 AVRO 等,您必須依賴外部反序列化器,因此希望將轉換委託給 Kafka。

現在當消費者接收到資料時,假設有一個錯誤的記錄導致了反序列化錯誤,例如有人傳遞了一個 Integer 而不是一個 String。在這種情況下,如果您在應用程式中不採取任何措施,異常將透過鏈傳播,並且您的應用程式最終將退出。

為了處理此問題,您可以新增一個 ListenerContainerCustomizer @Bean,它配置一個 SeekToCurrentErrorHandler。此 SeekToCurrentErrorHandler 配置有一個 DeadLetterPublishingRecoverer。我們還需要為消費者配置一個 ErrorHandlingDeserializer。這聽起來很複雜,但實際上,在這種情況下,它歸結為這 3 個 bean。

@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setErrorHandler(errorHandler);
		};
	}
	@Bean
	public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

讓我們分析一下它們。第一個是 ListenerContainerCustomizer bean,它接受一個 SeekToCurrentErrorHandler。容器現在使用該特定錯誤處理程式進行自定義。您可以在此處瞭解有關容器自定義的更多資訊。

第二個 bean 是 SeekToCurrentErrorHandler,它配置為釋出到 DLT。有關 SeekToCurrentErrorHandler 的更多詳細資訊,請參見此處

第三個 bean 是 DeadLetterPublishingRecoverer,它最終負責傳送到 DLT。預設情況下,DLT 主題的命名格式為 ORIGINAL_TOPIC_NAME.DLT。但是您可以更改它。有關詳細資訊,請參見文件

我們還需要透過應用程式配置配置一個 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委託給實際的反序列化器。在發生錯誤時,它將記錄的鍵/值設定為 null,幷包含訊息的原始位元組。然後它在頭部設定異常並將此記錄傳遞給監聽器,監聽器然後呼叫已註冊的錯誤處理程式。

以下是所需的配置

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我們透過繫結上的 configuration 屬性提供了 ErrorHandlingDeserializer。我們還指出了要委託的實際反序列化器是 StringDeserializer

請記住,上述任何 DLQ 屬性都與本方案中的討論無關。它們純粹是為了解決任何應用程式級別的錯誤。

3.4. Kafka binder 中的基本偏移量管理

3.4.1. 問題陳述

我想編寫一個 Spring Cloud Stream Kafka 消費者應用程式,但不確定它如何管理 Kafka 消費者偏移量。你能解釋一下嗎?

3.4.2. 解決方案

我們鼓勵您閱讀 文件 中的相關部分,以獲得全面的理解。

簡而言之

Kafka 預設支援兩種型別的起始偏移量 - earliestlatest。它們的語義從名稱中不言自明。

假設您是第一次執行消費者。如果您的 Spring Cloud Stream 應用程式中缺少 group.id,那麼它就變成了匿名消費者。每當您有匿名消費者時,Spring Cloud Stream 應用程式預設將從主題分割槽中可用的 latest 偏移量開始。另一方面,如果您顯式指定了 group.id,那麼 Spring Cloud Stream 應用程式預設將從主題分割槽中可用的 earliest 偏移量開始。

在上述兩種情況(具有顯式組和匿名組的消費者)下,可以透過使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 並將其設定為 earliestlatest 來切換起始偏移量。

現在,假設您之前已經執行過消費者,現在又重新啟動它。在這種情況下,上述情況中的起始偏移量語義不再適用,因為消費者為消費者組找到了一個已提交的偏移量(對於匿名消費者,儘管應用程式沒有提供 group.id,但 binder 會為您自動生成一個)。它只是從上次提交的偏移量開始。即使您提供了 startOffset 值,這也是如此。

但是,您可以透過使用 resetOffsets 屬性來覆蓋消費者從上次提交偏移量開始的預設行為。為此,請將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 設定為 true(預設值為 false)。然後確保提供 startOffset 值(earliestlatest)。當您這樣做並啟動消費者應用程式時,每次啟動時,它都會像第一次啟動一樣開始,並忽略分割槽的所有已提交偏移量。

3.5. 在 Kafka 中定位任意偏移量

3.5.1. 問題陳述

使用 Kafka binder,我知道它可以將偏移量設定為 earliestlatest,但我有一個需求,需要將偏移量查詢至中間的某個任意偏移量。Spring Cloud Stream Kafka binder 有沒有辦法實現這一點?

3.5.2. 解決方案

之前我們看到了 Kafka binder 如何幫助您解決基本的偏移量管理。預設情況下,binder 不允許您回溯到任意偏移量,至少透過我們在該方案中看到的那種機制。但是,binder 提供了一些低階策略來實現此用例。讓我們探討一下。

首先,當您想重置到 earliestlatest 之外的任意偏移量時,請確保將 resetOffsets 配置保留為預設值,即 false。然後您必須提供一個 KafkaBindingRebalanceListener 型別的自定義 bean,它將被注入到所有消費者繫結中。它是一個帶有幾個預設方法的介面,但我們感興趣的方法是:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

讓我們看看細節。

本質上,此方法將在主題分割槽的初始分配期間或再平衡之後每次呼叫。為了更好地說明,讓我們假設我們的主題是 foo,它有 4 個分割槽。最初,我們只在組中啟動一個消費者,此消費者將從所有分割槽消費。當消費者第一次啟動時,所有 4 個分割槽都將進行初始分配。但是,我們不希望分割槽從預設值開始消費(因為我們定義了一個組,所以是 earliest),而是希望每個分割槽在查詢任意偏移量後開始消費。想象一下您有一個業務用例,需要從如下所示的某些偏移量開始消費。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

這可以透過如下實現上述方法來完成。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle excpetions carefully.
                }
            }
        });
    }
}

這只是一個粗略的實現。實際用例比這複雜得多,您需要相應地進行調整,但這確實為您提供了一個基本草圖。當消費者 seek 失敗時,它可能會丟擲一些執行時異常,您需要決定在這種情況下該怎麼做。

3.5.3. 如果我們啟動一個具有相同 group id 的第二個消費者會怎樣?

當我們新增第二個消費者時,會發生重新平衡,並且一些分割槽將被移動。假設新的消費者獲得了分割槽 23。當這個新的 Spring Cloud Stream 消費者呼叫這個 onPartitionsAssigned 方法時,它將看到這是該消費者分割槽 23 的初始分配。因此,它將由於 initial 引數上的條件檢查而執行查詢操作。對於第一個消費者,它現在只有分割槽 01。但是,對於此消費者,它只是一個重新平衡事件,不被視為初始分配。因此,它不會由於 initial 引數上的條件檢查而重新查詢給定的偏移量。

3.6. 如何使用 Kafka binder 手動確認?

3.6.1. 問題陳述

使用 Kafka binder,我知道它可以在我的消費者中手動確認訊息。我該怎麼做?

3.6.2. 解決方案

預設情況下,Kafka binder 委託給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackModebatch。有關詳細資訊,請參閱 此處

在某些情況下,您希望停用此預設提交行為並依賴手動提交。以下步驟允許您這樣做。

將屬性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode設定為MANUALMANUAL_IMMEDIATE。這樣設定後,消費者方法收到的訊息中將包含一個名為kafka_acknowledgment(來自KafkaHeaders.ACKNOWLEDGMENT)的標頭。

例如,假設這是您的消費者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然後,將屬性spring.cloud.stream.bindings.myConsumer-in-0.consumer.ackMode設定為MANUALMANUAL_IMMEDIATE

3.7. 如何在 Spring Cloud Stream 中覆蓋預設繫結名稱?

3.7.1. 問題陳述

Spring Cloud Stream 根據函式定義和簽名建立預設繫結,但我如何將這些繫結覆蓋為更符合領域友好的名稱?

3.7.2. 解決方案

假設以下是您的函式簽名。

@Bean
public Function<String, String> uppercase(){
...
}

預設情況下,Spring Cloud Stream 將建立如下繫結。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下屬性覆蓋這些繫結。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

此後,所有繫結屬性都必須在新名稱my-transformer-inmy-transformer-out上進行設定。

以下是 Kafka Streams 和多個輸入的另一個示例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

預設情況下,Spring Cloud Stream 將為該函式建立三個不同的繫結名稱。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想對這些繫結進行一些配置時,都必須使用這些繫結名稱。您不喜歡這樣,並且希望使用更符合領域友好且可讀的繫結名稱,例如:

  1. orders

  2. accounts

  3. enrichedOrders

您只需設定以下三個屬性即可輕鬆做到這一點:

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

一旦您這樣做了,它將覆蓋預設繫結名稱,並且您想要設定的任何屬性都必須在新繫結名稱上。

3.8. 如何將訊息鍵作為記錄的一部分發送?

3.8.1. 問題陳述

我需要將一個鍵與記錄的有效載荷一起傳送,Spring Cloud Stream 中有什麼方法可以做到嗎?

3.8.2. 解決方案

通常需要將關聯資料結構(如對映)作為帶有鍵和值的記錄傳送。Spring Cloud Stream 允許您以直接的方式執行此操作。以下是實現此目的的基本藍圖,但您可能需要根據自己的特定用例進行調整。

這是一個示例生產者方法(又稱Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

這是一個傳送帶有String有效載荷和鍵的訊息的簡單函式。請注意,我們使用KafkaHeaders.MESSAGE_KEY將鍵設定為訊息頭。

如果您想更改預設的kafka_messageKey鍵,那麼在配置中,我們需要指定此屬性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

請注意,我們使用繫結名稱supplier-out-0,因為這是我們的函式名稱,請相應更新。

然後,我們在生成訊息時使用這個新鍵。

3.9. 如何使用原生序列化器和反序列化器,而不是 Spring Cloud Stream 進行的訊息轉換?

3.9.1. 問題陳述

我不想使用 Spring Cloud Stream 中的訊息轉換器,而是想使用 Kafka 中的原生序列化器和反序列化器。預設情況下,Spring Cloud Stream 使用其內部內建的訊息轉換器來處理此轉換。我如何繞過此操作並將責任委託給 Kafka?

3.9.2. 解決方案

這真的很容易做到。

您只需提供以下屬性即可啟用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然後,您還需要設定序列化器。有兩種方法可以做到這一點。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用繫結器配置。

spring.cloud.stream.kafka.binder.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer

當使用繫結器方式時,它適用於所有繫結,而將它們設定在繫結上則是針對每個繫結。

在反序列化方面,您只需將反序列化器作為配置提供即可。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configurarion.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在繫結器級別設定它們。

有一個可選屬性,您可以設定它來強制進行原生解碼。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,對於 Kafka 繫結器來說,這是不必要的,因為當它到達繫結器時,Kafka 已經使用配置的反序列化器對它們進行了反序列化。

3.10. 解釋 Kafka Streams binder 中偏移量重置的工作原理

3.10.1. 問題陳述

預設情況下,Kafka Streams 繫結器對於新消費者總是從最早的偏移量開始。有時,應用程式需要或有益於從最新的偏移量開始。Kafka Streams 繫結器允許您這樣做。

3.10.2. 解決方案

在我們檢視解決方案之前,讓我們先看以下場景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我們有一個需要兩個輸入繫結的BiConsumer Bean。在這種情況下,第一個繫結用於KStream,第二個繫結用於KTable。首次執行此應用程式時,預設情況下,兩個繫結都從earliest偏移量開始。如果由於某些要求,我想從latest偏移量開始怎麼辦?您可以透過啟用以下屬性來做到這一點。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想讓一個繫結從latest偏移量開始,而另一個繫結從預設的earliest消費,則將後者繫結排除在配置之外。

請記住,一旦存在已提交的偏移量,這些設定將生效,並且已提交的偏移量將優先。

3.11. 在 Kafka 中跟蹤成功傳送記錄(生產)

3.11.1. 問題陳述

我有一個 Kafka 生產者應用程式,我想跟蹤所有成功的傳送。

3.11.2. 解決方案

讓我們假設應用程式中有以下供應商。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然後,我們需要定義一個新的MessageChannel bean 來捕獲所有成功的傳送資訊。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下來,在應用程式配置中定義此屬性,以提供recordMetadataChannel的 bean 名稱。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此時,成功的傳送資訊將傳送到fooRecordChannel

您可以編寫一個如下所示的IntegrationFlow來檢視資訊。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle方法中,有效載荷是傳送到 Kafka 的內容,訊息頭包含一個名為kafka_recordMetadata的特殊鍵。它的值是RecordMetadata,其中包含有關主題分割槽、當前偏移量等資訊。

3.12. 在 Kafka 中新增自定義頭對映器

3.12.1. 問題陳述

我有一個 Kafka 生產者應用程式設定了一些頭,但它們在消費者應用程式中丟失了。這是為什麼?

3.12.2. 解決方案

在正常情況下,這應該沒問題。

想象一下,您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消費者端,您應該仍然看到頭“foo”,以下內容不應給您帶來任何問題。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在應用程式中提供了自定義頭對映器,那麼這將不起作用。假設您的應用程式中有一個空的KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果這是您的實現,那麼您將錯過消費者端的foo頭。您可能在那些KafkaHeaderMapper方法中有一些邏輯。您需要以下內容來填充foo頭。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

這將正確地將foo頭從生產者填充到消費者。

3.12.3. 關於 ID 頭的特別說明

在 Spring Cloud Stream 中,id頭是一個特殊的頭,但有些應用程式可能希望擁有特殊的自定義 id 頭,例如custom-idIDId。第一個(custom-id)無需任何自定義頭對映器即可從生產者傳播到消費者。但是,如果您使用框架保留的id頭的變體(例如IDIdiD等)進行生產,那麼您將遇到框架內部問題。有關此用例的更多上下文,請參閱此StackOverflow 帖子。在這種情況下,您必須使用自定義KafkaHeaderMapper來對映大小寫敏感的 id 頭。例如,假設您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的頭Id將從消費端消失,因為它與框架的id頭衝突。您可以提供自定義KafkaHeaderMapper來解決此問題。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

透過這樣做,idId頭都將從生產者端到消費者端可用。

3.13. 在事務中生產到多個主題

3.13.1. 問題陳述

我如何向多個 Kafka 主題生產事務性訊息?

有關更多上下文,請參閱此StackOverflow 問題

3.13.2. 解決方案

在 Kafka 繫結器中使用事務支援進行事務處理,然後提供一個AfterRollbackProcessor。為了生產到多個主題,請使用StreamBridge API。

以下是相關的程式碼片段。

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

3.13.3. 所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

為了測試,您可以使用以下內容。

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要注意事項

請確保您的應用程式配置中沒有任何 DLQ 設定,因為我們手動配置了 DLT(預設情況下,它將基於初始消費者函式釋出到名為input.DLT的主題)。此外,將消費者繫結上的maxAttempts重置為1,以避免繫結器重試。在上面的示例中,總共將嘗試 3 次(初始嘗試 + FixedBackoff中的 2 次嘗試)。

有關如何測試此程式碼的更多詳細資訊,請參閱StackOverflow 帖子。如果您使用 Spring Cloud Stream 透過新增更多消費者函式來測試它,請確保將消費者繫結上的isolation-level設定為read-committed

StackOverflow 帖子也與此討論相關。

3.14. 執行多個可輪詢消費者時要避免的陷阱

3.14.1. 問題陳述

如何執行可輪詢消費者的多個例項併為每個例項生成唯一的client.id

3.14.2. 解決方案

假設我有以下定義。

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

當執行應用程式時,Kafka 消費者會生成一個 client.id(類似於consumer-my-group-1)。對於應用程式的每個執行例項,此client.id將相同,從而導致意外問題。

為了解決這個問題,您可以在應用程式的每個例項上新增以下屬性:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有關更多詳細資訊,請參閱此GitHub 問題

附錄

附錄 A:構建

A.1. 基本編譯和測試

要構建原始碼,您需要安裝 JDK 1.7。

構建使用 Maven wrapper,因此您無需安裝特定版本的 Maven。要啟用測試,您應該在構建之前執行 Kafka 伺服器 0.9 或更高版本。有關執行伺服器的更多資訊,請參閱下文。

主要的構建命令是

$ ./mvnw clean install

如果您願意,也可以新增“-DskipTests”以避免執行測試。

您也可以自行安裝 Maven(>=3.3.3),並在下面的示例中用 mvn 命令代替 ./mvnw。如果您這樣做,並且您的本地 Maven 設定不包含 Spring 預釋出構件的倉庫宣告,您可能還需要新增 -P spring
請注意,您可能需要透過設定 MAVEN_OPTS 環境變數,並將其值設定為 -Xmx512m -XX:MaxPermSize=128m 來增加 Maven 可用的記憶體量。我們嘗試在 .mvn 配置中涵蓋這一點,因此如果您發現必須這樣做才能使構建成功,請提出一個問題,將這些設定新增到原始碼控制中。

需要中介軟體的專案通常包含docker-compose.yml,因此請考慮使用Docker Compose在 Docker 容器中執行中介軟體伺服器。

A.2. 文件

有一個“完整”配置檔案將生成文件。

A.3. 使用程式碼

如果您沒有 IDE 偏好,我們建議您在使用程式碼時使用Spring Tools SuiteEclipse。我們使用m2eclipse Eclipse 外掛提供 Maven 支援。其他 IDE 和工具也應能正常工作。

A.3.1. 使用 m2eclipse 匯入到 eclipse

我們建議在使用 Eclipse 時使用m2eclipse Eclipse 外掛。如果您尚未安裝 m2eclipse,可以從“Eclipse Marketplace”獲取。

不幸的是,m2e 尚不支援 Maven 3.3,因此一旦專案匯入 Eclipse,您還需要告訴 m2eclipse 對專案使用.settings.xml檔案。如果您不這樣做,您可能會看到許多與專案中 POM 相關錯誤。開啟您的 Eclipse 首選項,展開 Maven 首選項,然後選擇使用者設定。在使用者設定欄位中單擊瀏覽並導航到您匯入的 Spring Cloud 專案,選擇該專案中的.settings.xml檔案。單擊應用,然後單擊確定以儲存首選項更改。

或者,您可以將.settings.xml中的儲存庫設定複製到您自己的~/.m2/settings.xml中。

A.3.2. 不使用 m2eclipse 匯入到 eclipse

如果您不想使用 m2eclipse,可以使用以下命令生成 Eclipse 專案元資料:

$ ./mvnw eclipse:eclipse

生成的 Eclipse 專案可以透過從“檔案”選單中選擇“匯入現有專案”來匯入。

[[contributing] == 貢獻

Spring Cloud 根據非限制性 Apache 2.0 許可證釋出,並遵循非常標準的 Github 開發流程,使用 Github 跟蹤器處理問題並將拉取請求合併到 master 分支。如果您想貢獻哪怕是微不足道的東西,請不要猶豫,但請遵循以下準則。

A.4. 簽署貢獻者許可協議

在我們接受非平凡的補丁或拉取請求之前,我們需要您簽署貢獻者協議。簽署貢獻者協議不會授予任何人主倉庫的提交許可權,但這確實意味著我們可以接受您的貢獻,如果接受,您將獲得作者署名。積極的貢獻者可能會被邀請加入核心團隊,並被授予合併拉取請求的能力。

A.5. 程式碼約定和內務管理

這些對於拉取請求都不是必不可少的,但它們都會有所幫助。它們也可以在原始拉取請求之後但在合併之前新增。

  • 使用 Spring Framework 程式碼格式約定。如果您使用 Eclipse,可以使用Spring Cloud Build專案中的eclipse-code-formatter.xml檔案匯入格式化程式設定。如果使用 IntelliJ,可以使用Eclipse Code Formatter Plugin匯入相同的檔案。

  • 確保所有新的 .java 檔案都包含一個簡單的 Javadoc 類註釋,至少包含一個標識您的 @author 標籤,並且最好至少有一個段落說明該類的用途。

  • 將 ASF 許可頭註釋新增到所有新的 .java 檔案中(從專案中的現有檔案複製)

  • 如果您對 .java 檔案進行了大量修改(不僅僅是表面上的更改),請將自己新增為 @author

  • 新增一些 Javadoc,如果您更改了名稱空間,還要新增一些 XSD 文件元素。

  • 一些單元測試也會有很大幫助——總得有人去做。

  • 如果您的分支沒有其他人使用,請將其與當前 master(或主專案中的其他目標分支)進行 rebase。

  • 編寫提交訊息時請遵循 這些約定,如果您正在修復現有問題,請在提交訊息末尾新增 Fixes gh-XXXX(其中 XXXX 是問題編號)。

© . This site is unofficial and not affiliated with VMware.