3.0.13.RELEASE
參考指南
本指南介紹了 Spring Cloud Stream 繫結器的 Apache Kafka 實現。它包含有關其設計、用法和配置選項的資訊,以及 Spring Cloud Stream 概念如何對映到 Apache Kafka 特定構造的資訊。此外,本指南還解釋了 Spring Cloud Stream 的 Kafka Streams 繫結能力。
1. Apache Kafka 繫結器
1.1. 用法
要使用 Apache Kafka 繫結器,您需要將 spring-cloud-stream-binder-kafka
新增為 Spring Cloud Stream 應用程式的依賴項,如以下 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如以下 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下圖顯示了 Apache Kafka 繫結器如何工作的簡化圖

Apache Kafka 繫結器實現將每個目標對映到一個 Apache Kafka 主題。消費者組直接對映到相同的 Apache Kafka 概念。分割槽也直接對映到 Apache Kafka 分割槽。
繫結器目前使用 Apache Kafka kafka-clients
2.3.1
版本。此客戶端可以與較舊的 broker 通訊(參見 Kafka 文件),但某些功能可能不可用。例如,對於早於 0.11.x.x 的版本,不支援原生 header。此外,0.11.x.x 不支援 autoAddPartitions
屬性。
1.3. 配置選項
本節包含 Apache Kafka 繫結器使用的配置選項。
有關繫結器的常用配置選項和屬性,請參閱核心文件中的繫結屬性。
1.3.1. Kafka 繫結器屬性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 繫結器連線的 broker 列表。
預設值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允許指定包含或不包含埠資訊的主機(例如host1,host2:port2
)。當 broker 列表中未配置埠時,此屬性設定預設埠。預設值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
傳遞給繫結器建立的所有客戶端(包括生產者和消費者)的客戶端屬性的 Key/Value map。由於這些屬性同時用於生產者和消費者,因此用法應限於通用屬性,例如安全設定。透過此配置提供的未知 Kafka 生產者或消費者屬性會被過濾掉,不允許傳播。此處的屬性會覆蓋在 Spring Boot 中設定的任何屬性。
預設值:空 map。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客戶端消費者屬性的 Key/Value map。除了支援已知的 Kafka 消費者屬性外,此處也允許未知消費者屬性。此處的屬性會覆蓋在 Spring Boot 中設定的任何屬性以及上面
configuration
屬性中設定的屬性。預設值:空 map。
- spring.cloud.stream.kafka.binder.headers
-
由繫結器傳輸的自定義 header 列表。僅在使用
kafka-clients
版本 < 0.11.0.0 與舊應用程式 (⇐ 1.3.x) 通訊時需要。更新版本原生支援 header。預設值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待獲取分割槽資訊的時間,單位為秒。如果此計時器過期,則健康狀態報告為 down。
預設值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
broker 上所需的 ack 數量。請參閱 Kafka 文件中的生產者
acks
屬性。預設值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
僅當設定了
autoCreateTopics
或autoAddPartitions
時有效。繫結器在其生產或消費資料的 topic 上配置的全域性最小分割槽數。它可能被生產者的partitionCount
設定或生產者的instanceCount * concurrency
設定的值所覆蓋(如果兩者中的任何一個更大)。預設值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客戶端生產者屬性的 Key/Value map。除了支援已知的 Kafka 生產者屬性外,此處也允許未知生產者屬性。此處的屬性會覆蓋在 Spring Boot 中設定的任何屬性以及上面
configuration
屬性中設定的屬性。預設值:空 map。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
處於活動狀態,則自動建立主題的副本因子。可以在每個繫結上覆蓋。如果您使用的是早於 2.4 的 Kafka broker 版本,則此值應至少設定為 1
。從 3.0.8 版本開始,繫結器使用−1
作為預設值,這表示將使用 broker 的 'default.replication.factor' 屬性來確定副本數量。請諮詢您的 Kafka broker 管理員,瞭解是否有要求最小副本因子的策略,如果存在,則通常default.replication.factor
將與該值匹配,並且應使用−1
,除非您需要大於最小值的副本因子。預設值:
−1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果設定為
true
,繫結器會自動建立新主題。如果設定為false
,繫結器則依賴於已經配置好的主題。在後一種情況下,如果主題不存在,繫結器將無法啟動。此設定獨立於 broker 的 auto.create.topics.enable
設定,並且不受其影響。如果伺服器設定為自動建立主題,則它們可能會作為元資料檢索請求的一部分被建立,並使用預設的 broker 設定。預設值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果設定為
true
,繫結器在需要時建立新分割槽。如果設定為false
,繫結器則依賴於已經配置好的主題分割槽大小。如果目標主題的分割槽計數小於預期值,繫結器將無法啟動。預設值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在繫結器中啟用事務。請參閱 Kafka 文件中的
transaction.id
以及 `spring-kafka` 文件中的事務。啟用事務後,單個producer
屬性將被忽略,所有生產者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
屬性。預設值
null
(無事務) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事務繫結器中生產者的全域性生產者屬性。請參閱
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和Kafka 生產者屬性以及所有繫結器支援的通用生產者屬性。預設值:參見單獨的生產者屬性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用於將
spring-messaging
header 對映到 Kafka header 以及從 Kafka header 映射回來的KafkaHeaderMapper
bean 的名稱。例如,如果您希望在使用 JSON 反序列化 header 的BinderHeaderMapper
bean 中自定義受信任的包,則可以使用此屬性。如果未透過此屬性將此自定義BinderHeaderMapper
bean 提供給繫結器,則繫結器會查詢名稱為kafkaBinderHeaderMapper
且型別為BinderHeaderMapper
的 header mapper bean,然後才會回退到繫結器建立的預設BinderHeaderMapper
。預設值:無。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
用於在主題上的任何分割槽(無論哪個消費者正在從中接收資料)被發現沒有 leader 時,將繫結器健康狀態設定為
down
的標誌。預設值:
false
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
當 truststore 或 keystore 證書位置以 classpath URL (`classpath:…`) 形式給出時,繫結器會將 JAR 檔案內部 classpath 位置的資源複製到檔案系統上的位置。檔案將被移動到此屬性指定的值所對應的位置,該位置必須是檔案系統上存在且執行應用程式的程序可寫的目錄。如果未設定此值且證書檔案是 classpath 資源,則會將其移動到 `System.getProperty("java.io.tmpdir")` 返回的系統臨時目錄。如果此值存在但檔案系統上找不到該目錄或不可寫,情況也同樣如此。
預設值:無。
1.3.2. Kafka 消費者屬性
為避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.consumer.<property>=<value> 。 |
以下屬性僅適用於 Kafka 消費者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
為字首。
- admin.configuration
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.properties
,未來版本將移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.replicas-assignment
,未來版本將移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.replication-factor
,未來版本將移除對其的支援。 - autoRebalanceEnabled
-
當設定為
true
時,主題分割槽會在消費者組的成員之間自動重新平衡。當設定為false
時,每個消費者會根據spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
分配固定的分割槽集合。這要求在每個啟動的例項上適當地設定spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
屬性。在這種情況下,spring.cloud.stream.instanceCount
屬性的值通常必須大於 1。預設值:
true
。 - ackEachRecord
-
當
autoCommitOffset
為true
時,此設定決定是否在處理完每條記錄後提交偏移量。預設情況下,偏移量會在處理完consumer.poll()
返回的批次中的所有記錄後提交。poll 返回的記錄數量可以透過 Kafka 屬性max.poll.records
控制,該屬性透過消費者configuration
屬性設定。將其設定為true
可能會導致效能下降,但這樣做可以減少發生故障時記錄被重複投遞的可能性。此外,請參閱繫結器requiredAcks
屬性,它也會影響提交偏移量的效能。預設值:
false
。 - autoCommitOffset
-
訊息處理完成後是否自動提交偏移量。如果設定為
false
,則入站訊息中會包含一個鍵為kafka_acknowledgment
、型別為org.springframework.kafka.support.Acknowledgment
的 header。應用程式可以使用此 header 來確認訊息。詳細資訊請參見示例部分。當此屬性設定為false
時,Kafka 繫結器將確認模式設定為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,並且應用程式負責確認記錄。另請參閱ackEachRecord
。預設值:
true
。 - autoCommitOnError
-
僅當
autoCommitOffset
設定為true
時有效。如果設定為 `false`,則抑制導致錯誤的訊息的自動提交,僅對成功處理的訊息進行提交。在發生永續性故障時,它允許流從最後一次成功處理的訊息自動重播。如果設定為 `true`,則始終自動提交(如果啟用了自動提交)。如果未設定(預設值),則其效果與 `enableDlq` 相同,如果錯誤訊息傳送到 DLQ 則自動提交,否則不提交。預設值:未設定。
- resetOffsets
-
是否將消費者的偏移量重置為
startOffset
提供的值。如果提供了KafkaRebalanceListener
,則必須為 false;參見使用 KafkaRebalanceListener。預設值:
false
。 - startOffset
-
新組的起始偏移量。允許的值:
earliest
和latest
。如果消費者組透過spring.cloud.stream.bindings.<channelName>.group
為消費者 'binding' 明確設定,則 'startOffset' 設定為earliest
。否則,對於anonymous
消費者組,它設定為latest
。另請參閱resetOffsets
(此列表前面)。預設值:null (等同於
earliest
)。 - enableDlq
-
設定為 true 時,為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會被轉發到名為
error.<destination>.<group>
的主題。可以透過設定dlqName
屬性或定義型別為DlqDestinationResolver
的@Bean
來配置 DLQ 主題名稱。這為常見的 Kafka 重播場景提供了一種替代方案,適用於錯誤數量相對較少且重播整個原始主題可能過於繁瑣的情況。有關詳細資訊,請參見死信主題處理。從 2.0 版本開始,傳送到 DLQ 主題的訊息會增強以下 header:x-original-topic
、x-exception-message
和x-exception-stacktrace
,型別為byte[]
。預設情況下,失敗的記錄會發送到 DLQ 主題中與原始記錄相同分割槽號的分割槽。有關如何更改此行為,請參見死信主題分割槽選擇。當destinationIsPattern
為true
時不允許使用此屬性。預設值:
false
。 - dlqPartitions
-
當
enableDlq
為 true 且未設定此屬性時,會建立一個與主主題具有相同分割槽數的死信主題。通常,死信記錄會發送到死信主題中與原始記錄相同分割槽號的分割槽。此行為可以更改;參見死信主題分割槽選擇。如果此屬性設定為1
且沒有DqlPartitionFunction
bean,所有死信記錄將寫入分割槽0
。如果此屬性大於1
,您必須提供一個DlqPartitionFunction
bean。請注意,實際的分割槽計數受繫結器minPartitionCount
屬性的影響。預設值:
none
- configuration
-
包含通用 Kafka 消費者屬性的 key/value 對的 Map。除了 Kafka 消費者屬性外,還可以在此處傳遞其他配置屬性。例如,應用程式所需的一些屬性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。預設值:空 map。
- dlqName
-
接收錯誤訊息的 DLQ 主題名稱。
預設值:null(如果未指定,導致錯誤的訊息將轉發到名為
error.<destination>.<group>
的主題)。 - dlqProducerProperties
-
透過此屬性可以設定 DLQ 特定的生產者屬性。所有透過 Kafka 生產者屬性可用的屬性都可以透過此屬性設定。當消費者啟用原生解碼(即 useNativeDecoding: true)時,應用程式必須為 DLQ 提供相應的 key/value 序列化器。這必須以
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。預設值:預設 Kafka 生產者屬性。
- standardHeaders
-
指示入站通道介面卡填充哪些標準 header。允許的值:
none
、id
、timestamp
或both
。如果使用原生反序列化並且第一個接收訊息的元件需要id
(例如配置為使用 JDBC 訊息儲存的聚合器),則此屬性非常有用。預設值:
none
- converterBeanName
-
實現
RecordMessageConverter
的 bean 的名稱。在入站通道介面卡中使用,以替換預設的MessagingMessageConverter
。預設值:
null
- idleEventInterval
-
表示最近沒有收到訊息的事件之間的間隔,單位為毫秒。使用
ApplicationListener<ListenerContainerIdleEvent>
來接收這些事件。使用示例請參見示例:暫停和恢復消費者。預設值:
30000
- destinationIsPattern
-
當為 true 時,目標被視為由 broker 用於匹配主題名稱的正則表示式
Pattern
。當為 true 時,不會預置主題,並且不允許使用enableDlq
,因為繫結器在預置階段不知道主題名稱。請注意,檢測匹配模式的新主題所需的時間由消費者屬性metadata.max.age.ms
控制,該屬性(在編寫本文時)預設為 300,000ms(5 分鐘)。這可以透過上面configuration
屬性進行配置。預設值:
false
- topic.properties
-
預置新主題時使用的 Kafka 主題屬性的
Map
。例如 `spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0`預設值:無。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中 key 是分割槽,value 是分配。預置新主題時使用。參見
kafka-clients
jar 中的NewTopic
Javadoc。預設值:無。
- topic.replication-factor
-
預置主題時使用的副本因子。覆蓋繫結器範圍的設定。如果存在
replicas-assignments
則忽略。預設值:none(使用繫結器範圍的預設值 -1)。
- pollTimeout
-
在可輪詢消費者中進行輪詢使用的超時時間。
預設值:5 秒。
- transactionManager
-
用於覆蓋此繫結器的事務管理器的
KafkaAwareTransactionManager
bean 名稱。通常,如果您想使用ChainedKafkaTransactionManaager
將另一個事務與 Kafka 事務同步,則需要此設定。為了實現記錄的精確一次消費和生產,消費者和生產者繫結必須都配置相同的事務管理器。預設值:無。
- txCommitRecovered
-
使用事務繫結器時,預設情況下,恢復的記錄(例如,當重試耗盡且記錄被髮送到死信主題時)的偏移量將透過新的事務提交。將此屬性設定為
false
會抑制提交恢復記錄的偏移量。預設值:true。
1.3.3. 重置偏移量
應用程式啟動時,每個分配的分割槽的初始位置取決於 startOffset
和 resetOffsets
這兩個屬性。如果 resetOffsets
為 false
,則應用正常的 Kafka 消費者 auto.offset.reset
語義。即,如果某個分割槽的繫結消費者組沒有已提交的偏移量,則位置為 earliest
或 latest
。預設情況下,具有顯式 group
的繫結使用 earliest
,而匿名繫結(沒有 group
)使用 latest
。這些預設值可以透過設定 startOffset
繫結屬性來覆蓋。繫結首次使用特定 group
啟動時,將沒有已提交的偏移量。沒有已提交偏移量的另一種情況是偏移量已過期。對於現代 broker(自 2.1 版本起)以及預設的 broker 屬性,偏移量在最後一個成員離開組後 7 天過期。有關詳細資訊,請參閱 offsets.retention.minutes
broker 屬性。
當 resetOffsets
為 true
時,繫結器應用與 broker 上沒有已提交偏移量時類似的語義,就好像此繫結從未消費過該主題一樣;即忽略任何當前的已提交偏移量。
以下是可能使用此屬性的兩個用例。
-
消費包含 key/value 對的壓縮主題。將
resetOffsets
設定為true
,將startOffset
設定為earliest
;繫結將對所有新分配的分割槽執行seekToBeginning
操作。 -
消費包含事件的主題,您只對該繫結執行時發生的事件感興趣。將
resetOffsets
設定為true
,將startOffset
設定為latest
;繫結將對所有新分配的分割槽執行seekToEnd
操作。
如果在初始分配後發生重新平衡,seek 操作將僅對初始分配期間未分配的新分配分割槽執行。 |
有關主題偏移量的更多控制,請參見使用 KafkaRebalanceListener;當提供了 listener 時,resetOffsets
不應設定為 true
,否則會導致錯誤。 >>>>>>> 7bc90c10… GH-1084: 新增 txCommitRecovered 屬性
1.3.4. 消費批次
從 3.0 版本開始,當 spring.cloud.stream.binding.<name>.consumer.batch-mode
設定為 true
時,透過輪詢 Kafka Consumer
收到的所有記錄將以 List<?>
的形式呈現給 listener 方法。否則,方法將一次處理一條記錄。批次的大小由 Kafka 消費者屬性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有關詳細資訊,請參閱 Kafka 文件。
請注意,@StreamListener
不支援批處理模式 - 它只適用於較新的函數語言程式設計模型。
使用批處理模式時,不支援繫結器內的重試,因此 maxAttempts 將被覆蓋為 1。您可以配置 SeekToCurrentBatchErrorHandler (使用 ListenerContainerCustomizer )以實現與繫結器內重試類似的功能。您還可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並重新投遞剩餘的記錄。有關這些技術的更多資訊,請參閱 Spring for Apache Kafka 文件。 |
1.3.5. Kafka 生產者屬性
為避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.producer.<property>=<value> 。 |
以下屬性僅適用於 Kafka 生產者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
為字首。
- admin.configuration
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.properties
,未來版本將移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.replicas-assignment
,未來版本將移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版本起,此屬性已棄用,推薦使用
topic.replication-factor
,未來版本將移除對其的支援。 - bufferSize
-
Kafka 生產者在傳送前嘗試批次處理的資料的上限,單位為位元組。
預設值:
16384
。 - sync
-
生產者是否同步。
預設值:
false
。 - sendTimeoutExpression
-
一個針對出站訊息評估的 SpEL 表示式,用於在啟用同步釋出時評估等待 ack 的時間——例如
headers['mySendTimeout']
。超時值以毫秒為單位。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為在評估此表示式時,payload 已經是以byte[]
的形式存在。現在,表示式在 payload 轉換之前進行評估。預設值:
none
。 - batchTimeout
-
生產者等待更多訊息在同一批次中累積後才傳送訊息的時間長度。(通常,生產者根本不等待,只發送前一次傳送進行中累積的所有訊息。)非零值可能會以延遲為代價提高吞吐量。
預設值:
0
。 - messageKeyExpression
-
一個針對出站訊息評估的 SpEL 表示式,用於填充生產的 Kafka 訊息的 key——例如
headers['myKey']
。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為在評估此表示式時,payload 已經是以byte[]
的形式存在。現在,表示式在 payload 轉換之前進行評估。對於常規處理器(`Function` 或 `Function , Message>>`),如果生成的 key 需要與來自主題的入站 key 相同,可以如下設定此屬性:`spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']` 對於 reactive 函式,有一個重要的注意事項。在這種情況下,由應用程式負責手動將 header 從入站訊息複製到出站訊息。您可以設定 header,例如 `myKey`,並如上所述使用 `headers['myKey']`,或者為了方便,直接設定 `KafkaHeaders.MESSAGE_KEY` header,這樣就完全不需要設定此屬性了。 預設值:
none
。 - headerPatterns
-
一個逗號分隔的簡單模式列表,用於匹配要對映到
ProducerRecord
中的 KafkaHeaders
的 Spring messaging header。模式可以以萬用字元(星號)開頭或結尾。模式可以透過字首!
進行否定。匹配在第一次匹配(肯定或否定)後停止。例如,!ask,as*
將透過ash
但不透過ask
。id
和timestamp
永遠不會對映。預設值:
*
(所有 header - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生產者屬性的 key/value 對的 Map。
預設值:空 map。
- topic.properties
-
預置新主題時使用的 Kafka 主題屬性的
Map
。例如 `spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0` - topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中 key 是分割槽,value 是分配。預置新主題時使用。參見
kafka-clients
jar 中的NewTopic
Javadoc。預設值:無。
- topic.replication-factor
-
預置主題時使用的副本因子。覆蓋繫結器範圍的設定。如果存在
replicas-assignments
則忽略。預設值:none(使用繫結器範圍的預設值 -1)。
- useTopicHeader
-
設定為
true
,以使用出站訊息中KafkaHeaders.TOPIC
訊息 header 的值覆蓋預設的繫結目標(主題名稱)。如果 header 不存在,則使用預設的繫結目標。預設值:false
。 - recordMetadataChannel
-
成功傳送結果應傳送到的
MessageChannel
的 bean 名稱;該 bean 必須存在於應用程式上下文中。傳送到通道的訊息是傳送的訊息(如果進行了轉換),並帶有附加的 headerKafkaHeaders.RECORD_METADATA
。該 header 包含由 Kafka 客戶端提供的RecordMetadata
物件;它包含記錄在主題中寫入的分割槽和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失敗的傳送會發送到生產者錯誤通道(如果已配置);參見錯誤通道。預設值:null
+
Kafka 繫結器使用生產者的 partitionCount 設定作為建立具有給定分割槽計數的主題的提示(結合 minPartitionCount ,取兩者中的最大值)。配置繫結器的 minPartitionCount 和應用程式的 partitionCount 時請謹慎,因為會使用較大的值。如果主題已經存在但分割槽計數較少且 autoAddPartitions 被停用(預設),繫結器將無法啟動。如果主題已經存在但分割槽計數較少且 autoAddPartitions 被啟用,則會新增新分割槽。如果主題已經存在但分割槽數大於(minPartitionCount 或 partitionCount )的最大值,則使用現有分割槽計數。 |
- compression
-
設定
compression.type
生產者屬性。支援的值有none
、gzip
、snappy
和lz4
。如果您按照 Spring for Apache Kafka 文件中所述覆蓋kafka-clients
jar 至 2.1.0(或更高版本),並希望使用zstd
壓縮,請使用 `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`。預設值:
none
。 - closeTimeout
-
關閉生產者時等待的超時時間,單位為秒。
預設值:
30
1.3.6. 使用示例
本節中,我們將展示前述屬性在特定場景下的使用。
示例:將 autoCommitOffset
設定為 false
並依賴手動確認
此示例說明了如何在消費者應用程式中手動確認偏移量。
此示例要求將 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
設定為 false
。請使用您示例中對應的輸入通道名稱。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支援客戶端和 broker 之間的安全連線。要利用此功能,請遵循 Apache Kafka 文件中的指南以及 Kafka 0.9 Confluent 文件中的安全指南。使用 spring.cloud.stream.kafka.binder.configuration
選項為繫結器建立的所有客戶端設定安全屬性。
例如,要將 security.protocol
設定為 SASL_SSL
,請設定以下屬性
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全屬性都可以用類似的方式設定。
使用 Kerberos 時,請按照參考文件中的說明建立和引用 JAAS 配置。
Spring Cloud Stream 支援透過使用 JAAS 配置檔案和 Spring Boot 屬性將 JAAS 配置資訊傳遞給應用程式。
使用 JAAS 配置檔案
可以使用系統屬性為 Spring Cloud Stream 應用程式設定 JAAS 和(可選)krb5 檔案位置。以下示例顯示瞭如何使用 JAAS 配置檔案啟動具有 SASL 和 Kerberos 的 Spring Cloud Stream 應用程式
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 屬性
作為 JAAS 配置檔案的替代方案,Spring Cloud Stream 提供了一種機制,允許使用 Spring Boot 屬性為 Spring Cloud Stream 應用程式設定 JAAS 配置。
以下屬性可用於配置 Kafka 客戶端的登入上下文
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登入模組名稱。通常情況下無需設定。
預設值:
com.sun.security.auth.module.Krb5LoginModule
。 - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登入模組的控制標誌。
預設值:
required
。 - spring.cloud.stream.kafka.binder.jaas.options
-
包含登入模組選項的 key/value 對的 Map。
預設值:空 map。
以下示例顯示瞭如何使用 Spring Boot 配置屬性啟動具有 SASL 和 Kerberos 的 Spring Cloud Stream 應用程式
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例等同於以下 JAAS 檔案
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需的主題已存在於 broker 上或將由管理員建立,則可以關閉自動建立,並且只需傳送客戶端 JAAS 屬性。
請勿在同一應用程式中混用 JAAS 配置檔案和 Spring Boot 屬性。如果已存在 -Djava.security.auth.login.config 系統屬性,Spring Cloud Stream 將忽略 Spring Boot 屬性。 |
在 Kerberos 環境下使用 autoCreateTopics 和 autoAddPartitions 時請注意。通常,應用程式使用的 principals 可能在 Kafka 和 Zookeeper 中沒有管理許可權。因此,依賴 Spring Cloud Stream 建立/修改主題可能會失敗。在安全環境中,我們強烈建議使用 Kafka 工具以管理方式建立主題和管理 ACL。 |
示例:暫停和恢復消費者
如果你想暫停消費但不引起分割槽再平衡,可以暫停和恢復消費者。這可以透過將 Consumer
作為引數新增到你的 @StreamListener
中來實現。要恢復,你需要一個針對 ListenerContainerIdleEvent
例項的 ApplicationListener
。事件釋出頻率由 idleEventInterval
屬性控制。由於消費者不是執行緒安全的,你必須在呼叫執行緒上呼叫這些方法。
下面的簡單應用程式展示瞭如何暫停和恢復
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事務繫結器
透過將 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
設定為非空值(例如 tx-
)來啟用事務。在 processor 應用程式中使用時,消費者啟動事務;在消費者執行緒上傳送的任何記錄都會參與同一事務。當監聽器正常退出時,監聽器容器會將偏移量傳送到事務並提交。使用一個通用的 producer factory 來配置所有 producer 繫結,這些繫結使用 spring.cloud.stream.kafka.binder.transaction.producer.*
屬性進行配置;單個繫結的 Kafka producer 屬性將被忽略。
常規的 binder 重試(和死信處理)在事務中不受支援,因為重試將在原始事務中執行,該事務可能會回滾,並且釋出的任何記錄也將回滾。當啟用重試時(通用屬性 maxAttempts 大於零),重試屬性用於配置一個 DefaultAfterRollbackProcessor 以在容器級別啟用重試。類似地,死信記錄的釋出功能不是在事務內完成,而是移至監聽器容器,同樣透過在主事務回滾後執行的 DefaultAfterRollbackProcessor 來實現。 |
如果你想在 source 應用程式中使用事務,或者從某個任意執行緒進行僅 producer 的事務(例如 @Scheduled
方法),你必須獲取事務性 producer factory 的引用,並使用它定義一個 KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
請注意,我們使用 BinderFactory
獲取 binder 的引用;當只配置了一個 binder 時,在第一個引數中使用 null
。如果配置了多個 binder,使用 binder 名稱來獲取引用。一旦我們獲得了 binder 的引用,就可以獲取 ProducerFactory
的引用並建立一個事務管理器。
然後你就可以使用正常的 Spring 事務支援,例如 TransactionTemplate
或 @Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果你想將僅 producer 的事務與來自其他事務管理器的事務同步,請使用 ChainedTransactionManager
。
1.5. 錯誤通道
從 1.3 版本開始,binder 無條件地將每個消費者目標地的異常傳送到錯誤通道,也可以配置將非同步 producer 傳送失敗傳送到錯誤通道。更多資訊請參閱錯誤處理這一節。
傳送失敗的 ErrorMessage
的 payload 是一個 KafkaSendFailureException
,包含以下屬性
-
failedMessage
: 未能傳送的 Spring MessagingMessage<?>
。 -
record
: 由failedMessage
建立的原始ProducerRecord
沒有自動處理 producer 異常(例如傳送到死信佇列)。你可以使用自己的 Spring Integration 流來消費這些異常。
1.6. Kafka 指標
Kafka binder 模組公開了以下指標
spring.cloud.stream.binder.kafka.offset
: 此指標指示給定消費者組從給定 binder 的主題尚未消費的訊息數量。提供的指標基於 Micrometer 庫。如果 classpath 中存在 Micrometer 且應用程式未提供其他此類 bean,則 binder 會建立 KafkaBinderMetrics
bean。該指標包含消費者組資訊、主題以及與主題最新偏移量相比的已提交偏移量的實際滯後量。此指標對於向 PaaS 平臺提供自動伸縮反饋特別有用。
透過在應用程式中提供以下元件,可以阻止 KafkaBinderMetrics
建立必要的消費者等基礎設施並報告指標。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
有關如何選擇性抑制 meter 的更多詳細資訊,請參閱此處。
1.7. Tombstone Records (null 記錄值)
使用 compacted topic 時,值為 null
的記錄(也稱為 tombstone 記錄)表示刪除鍵。要在 @StreamListener
方法中接收此類訊息,引數必須標記為非必需,以便接收 null
值引數。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
應用程式可能希望在分割槽最初分配時,將主題/分割槽 seek 到任意偏移量,或對消費者執行其他操作。從 2.1 版本開始,如果在應用程式上下文中提供一個單獨的 KafkaRebalanceListener
bean,它將被注入到所有 Kafka 消費者繫結中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
提供 rebalance listener 時,不能將 resetOffsets
消費者屬性設定為 true
。
1.9. 自定義消費者和生產者配置
如果你想對用於在 Kafka 中建立 ConsumerFactory
和 ProducerFactory
的消費者和 producer 配置進行高階定製,可以實現以下 customizer。
-
ConsumerConfigCustomizer
-
ProducerConfigCustomizer
這兩個介面都提供了一種配置用於消費者和 producer 屬性的配置對映的方式。例如,如果你想訪問在應用程式級別定義的 bean,可以在 configure
方法的實現中注入它。當 binder 檢測到這些 customizer 可用作 bean 時,它將在建立消費者和 producer factory 之前立即呼叫 configure
方法。
1.10. 自定義 AdminClient 配置
與上面的消費者和 producer 配置定製類似,應用程式也可以透過提供一個 AdminClientConfigCustomizer
來定製 Admin 客戶端的配置。AdminClientConfigCustomizer 的 configure 方法提供了對 Admin 客戶端屬性的訪問,你可以使用它來定義進一步的定製。Binder 的 Kafka 主題 provisioner 對透過此 customizer 提供的屬性賦予最高優先順序。這裡是一個提供此 customizer bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.11. 死信主題處理
1.11.1. 死信主題分割槽選擇
預設情況下,記錄使用與原始記錄相同的分割槽釋出到死信主題。這意味著死信主題必須至少擁有與原始記錄相同數量的分割槽。
要更改此行為,請將 DlqPartitionFunction
實現作為 @Bean
新增到應用程式上下文中。只能存在一個此類 bean。該函式會接收消費者組、失敗的 ConsumerRecord
和異常。例如,如果你總是想路由到分割槽 0,你可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果你將消費者繫結的 dlqPartitions 屬性設定為 1(並且 binder 的 minPartitionCount 等於 1 ),則無需提供 DlqPartitionFunction ;框架將始終使用分割槽 0。如果你將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或 binder 的 minPartitionCount 大於 1 ),則**必須**提供 DlqPartitionFunction bean,即使分割槽數量與原始主題相同。 |
也可以為 DLQ 主題定義自定義名稱。為此,請將 DlqDestinationResolver
的實現作為 @Bean
建立到應用程式上下文中。當 binder 檢測到此類 bean 時,它會優先使用,否則將使用 dlqName
屬性。如果兩者都找不到,它將預設為 error.<destination>.<group>
。這裡是一個將 DlqDestinationResolver
作為 @Bean
的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
提供 DlqDestinationResolver
的實現時,要記住一個重要的事情是,binder 中的 provisioner 不會自動為應用程式建立主題。這是因為 binder 無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果你使用此策略提供 DLQ 名稱,確保這些主題事先建立是應用程式的責任。
1.11.2. 處理死信主題中的記錄
由於框架無法預料使用者希望如何處理死信訊息,因此它不提供任何標準機制來處理它們。如果死信的原因是臨時的,你可能希望將訊息路由回原始主題。但是,如果問題是永久性的,可能會導致無限迴圈。本主題中的示例 Spring Boot 應用程式展示瞭如何將這些訊息路由回原始主題,但在三次嘗試後將其移動到“停車場”主題。該應用程式是另一個 spring-cloud-stream 應用程式,它從死信主題讀取。當 5 秒內沒有收到訊息時,它會終止。
示例假設原始目的地是 so8400out
,消費者組是 so8400
。
有幾種策略可以考慮
-
考慮僅在主應用程式未執行時執行重新路由。否則,臨時錯誤的重試次數會很快用完。
-
或者,使用兩階段方法:使用此應用程式路由到一個第三主題,然後使用另一個應用程式從該主題路由回主主題。
以下程式碼清單顯示了示例應用程式
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.12. 使用 Kafka 繫結器進行分割槽
Apache Kafka 原生支援主題分割槽。
有時將資料傳送到特定分割槽是有利的——例如,當你希望嚴格排序訊息處理時(特定客戶的所有訊息都應該傳送到同一分割槽)。
以下示例展示瞭如何配置 producer 和 consumer 端
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
主題必須預先配置足夠的分割槽,以滿足所有消費者組所需的併發度。上述配置支援最多 12 個消費者例項(如果 concurrency 為 2,則支援 6 個;如果併發度為 3,則支援 4 個,依此類推)。通常最好“過度配置”分割槽,以便未來增加消費者或併發度。 |
上述配置使用了預設分割槽策略(key.hashCode() % partitionCount )。這可能提供或不提供一個適當平衡的演算法,具體取決於鍵值。你可以透過使用 partitionSelectorExpression 或 partitionSelectorClass 屬性來覆蓋此預設設定。 |
由於分割槽由 Kafka 原生處理,消費者端無需特殊配置。Kafka 會在例項之間分配分割槽。
以下 Spring Boot 應用程式監聽 Kafka 流,並將每條訊息傳送到的分割槽 ID 列印到控制檯
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
你可以根據需要新增例項。Kafka 會重新平衡分割槽分配。如果例項數量(或 instance count * concurrency
)超過分割槽數量,一些消費者將處於空閒狀態。
2. Kafka Streams 繫結器
2.1. 用法
要使用 Kafka Streams binder,只需將其新增到你的 Spring Cloud Stream 應用程式中,使用以下 Maven 座標
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
快速啟動一個用於 Kafka Streams binder 的新專案的方法是使用Spring Initializr,然後選擇“Cloud Streams”和“Spring for Kafka Streams”,如下所示

2.2. 概述
Spring Cloud Stream 包含一個專門為Apache Kafka Streams 繫結設計的 binder 實現。透過這種原生整合,Spring Cloud Stream 的“processor”應用程式可以在核心業務邏輯中直接使用Apache Kafka Streams API。
Kafka Streams binder 實現構建在Spring for Apache Kafka 專案提供的基礎上。
Kafka Streams binder 為 Kafka Streams 中的三種主要型別提供繫結能力——KStream
、KTable
和 GlobalKTable
。
Kafka Streams 應用程式通常遵循一種模式:從入站主題讀取記錄,應用業務邏輯,然後將轉換後的記錄寫入出站主題。或者,也可以定義一個沒有出站目標的 Processor 應用程式。
在接下來的章節中,我們將詳細介紹 Spring Cloud Stream 與 Kafka Streams 的整合。
2.3. 程式設計模型
使用 Kafka Streams binder 提供的程式設計模型時,可以使用高階的Streams DSL,也可以混合使用高階和低階的Processor-API。混合使用高階和低階 API 通常透過在 KStream
上呼叫 transform
或 process
API 方法來實現。
2.3.1. 函式式風格
從 Spring Cloud Stream 3.0.0
版本開始,Kafka Streams binder 允許應用程式使用 Java 8 中提供的函數語言程式設計風格進行設計和開發。這意味著應用程式可以簡潔地表示為型別為 java.util.function.Function
或 java.util.function.Consumer
的 lambda 表示式。
讓我們看一個非常基礎的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
雖然簡單,但這是一個完整的獨立 Spring Boot 應用程式,它利用 Kafka Streams 進行流處理。這是一個消費者應用程式,沒有出站繫結,只有一個入站繫結。應用程式消費資料,並簡單地將來自 KStream
鍵和值的資訊記錄到標準輸出。應用程式包含 SpringBootApplication
註解和一個被標記為 Bean
的方法。該 bean 方法的型別是 java.util.function.Consumer
,它使用 KStream
進行引數化。然後在實現中,我們返回一個 Consumer 物件,它本質上是一個 lambda 表示式。在 lambda 表示式內部,提供了處理資料的程式碼。
在此應用程式中,有一個型別為 KStream
的單一輸入繫結。binder 為此應用程式建立一個名為 process-in-0
的繫結,即函式 bean 名稱後跟一個破折號字元 (-
),然後是文字 in
,再後跟一個破折號和引數的序數位置。你使用此繫結名稱來設定其他屬性,例如目的地。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
如果未在繫結上設定 destination 屬性,則會建立一個與繫結同名的主題(如果應用程式有足夠的許可權),或者期望該主題已存在。 |
一旦構建為 uber-jar(例如 kstream-consumer-app.jar
),就可以按如下方式執行上述示例。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
這是另一個示例,它是一個具有輸入和輸出繫結的完整 processor。這是經典的詞頻統計示例,應用程式從主題接收資料,然後在 tumbling time-window 中計算每個單詞的出現次數。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
這裡同樣,基本主題與之前的示例相同,但這裡我們有兩個輸入。Java 的 BiFunction
支援用於將輸入繫結到期望的目的地。binder 為輸入生成的預設繫結名稱分別是 process-in-0
和 process-in-1
。預設輸出繫結是 process-out-0
。在此示例中,BiFunction
的第一個引數被繫結為第一個輸入的 KStream
,第二個引數被繫結為第二個輸入的 KTable
。
一旦構建為 uber-jar(例如 wordcount-processor.jar
),就可以按如下方式執行上述示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此應用程式將從 Kafka 主題 words
消費訊息,並將計算結果釋出到輸出主題 counts
。
Spring Cloud Stream 將確保來自入站和出站主題的訊息自動繫結為 KStream 物件。作為開發人員,你可以專注於程式碼的業務方面,即編寫 processor 中所需的邏輯。由 Kafka Streams 基礎設施所需的 Kafka Streams 特定配置的設定由框架自動處理。
我們上面看到的兩個示例都有一個單一的 KStream
輸入繫結。在這兩種情況下,繫結都從一個主題接收記錄。如果你想將多個主題複用到一個單一的 KStream
繫結中,可以在下方提供逗號分隔的 Kafka 主題作為目的地。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果你想根據正則表示式匹配主題,還可以提供主題模式作為目的地。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多個輸入繫結
許多非簡單的 Kafka Streams 應用程式通常透過多個繫結從多個主題消費資料。例如,一個主題被消費為 KStream
,另一個則被消費為 KTable
或 GlobalKTable
。應用程式希望將資料接收為表型別有很多原因。考慮一個用例,其中底層主題是透過資料庫的變更資料捕獲 (CDC) 機制填充的,或者應用程式可能只關心最新的更新以進行下游處理。如果應用程式指定資料需要繫結為 KTable
或 GlobalKTable
,那麼 Kafka Streams binder 將正確地將目的地繫結到 KTable
或 GlobalKTable
,並使它們可用於應用程式進行操作。我們將研究 Kafka Streams binder 中如何處理多個輸入繫結的一些不同場景。
Kafka Streams Binder 中的 BiFunction
這裡是一個我們有兩個輸入和一個輸出的示例。在這種情況下,應用程式可以利用 java.util.function.BiFunction
。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
這裡同樣,基本主題與之前的示例相同,但這裡我們有兩個輸入。Java 的 BiFunction
支援用於將輸入繫結到期望的目的地。binder 為輸入生成的預設繫結名稱分別是 process-in-0
和 process-in-1
。預設輸出繫結是 process-out-0
。在此示例中,BiFunction
的第一個引數被繫結為第一個輸入的 KStream
,第二個引數被繫結為第二個輸入的 KTable
。
Kafka Streams Binder 中的 BiConsumer
如果有兩個輸入但沒有輸出,在這種情況下我們可以使用 java.util.function.BiConsumer
,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超過兩個輸入
如果你有超過兩個輸入怎麼辦?在某些情況下你需要超過兩個輸入。在這種情況下,binder 允許你鏈式呼叫部分函式。在函數語言程式設計術語中,這種技術通常被稱為 currying(柯里化)。隨著 Java 8 添加了函數語言程式設計支援,Java 現在允許你編寫 curried 函式。Spring Cloud Stream Kafka Streams binder 可以利用此特性來實現多個輸入繫結。
讓我們看一個例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
讓我們看看上面介紹的繫結模型的細節。在此模型中,我們在入站端有 3 個部分應用的函式。我們將它們稱為 f(x)
、f(y)
和 f(z)
。如果我們將這些函式按真實數學函式的意義展開,它將看起來像這樣:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。變數 x
代表 KStream<Long, Order>
,變數 y
代表 GlobalKTable<Long, Customer>
,變數 z
代表 GlobalKTable<Long, Product>
。第一個函式 f(x)
接收應用程式的第一個輸入繫結(KStream<Long, Order>
),其輸出是函式 f(y)。函式 f(y)
接收應用程式的第二個輸入繫結(GlobalKTable<Long, Customer>
),其輸出是另一個函式 f(z)
。函式 f(z)
的輸入是應用程式的第三個輸入(GlobalKTable<Long, Product>
),其輸出是 KStream<Long, EnrichedOrder>
,這是應用程式的最終輸出繫結。來自三個部分函式(分別是 KStream
、GlobalKTable
、GlobalKTable
)的輸入都在方法體中可供你使用,以實現 lambda 表示式中的業務邏輯。
輸入繫結分別命名為 enrichOrder-in-0
、enrichOrder-in-1
和 enrichOrder-in-2
。輸出繫結命名為 enrichOrder-out-0
。
使用 curried 函式,你實際上可以擁有任意數量的輸入。但是,請記住,在 Java 中,超過少量輸入及其部分應用函式(如上所示)可能會導致程式碼難以閱讀。因此,如果你的 Kafka Streams 應用程式需要超過合理範圍的少量輸入繫結,並且你想使用這種函式式模型,那麼你可能需要重新考慮你的設計並適當分解應用程式。
多個輸出繫結
Kafka Streams 允許將出站資料寫入多個主題。此功能在 Kafka Streams 中被稱為分支 (branching)。使用多個輸出繫結時,你需要提供一個 KStream 陣列(KStream[]
)作為出站返回型別。
這裡是一個示例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
程式設計模型保持不變,但出站引數化型別是 KStream[]
。預設輸出繫結名稱分別是 process-out-0
、process-out-1
、process-out-2
。binder 生成三個輸出繫結的原因是它檢測到了返回的 KStream
陣列的長度。
Kafka Streams 基於函式的程式設計風格總結
總之,下表顯示了函式式範例中可以使用的各種選項。
輸入數量 | 輸出數量 | 使用的元件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用 curried 函式 |
-
在此表中,如果輸出數量大於一,則型別簡單地變為
KStream[]
。
2.3.2. 指令式程式設計模型。
儘管上面概述的函數語言程式設計模型是首選方法,如果你願意,仍然可以使用經典的基於 StreamListener
的方法。
這裡有一些示例。
以下是使用 StreamListener
實現的詞頻統計示例的等價版本。
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如你所見,這有點冗長,因為你需要提供 EnableBinding
以及其他額外的註解,如 StreamListener
和 SendTo
,才能構成一個完整的應用程式。EnableBinding
用於指定包含繫結的繫結介面。在此示例中,我們使用了內建的 KafkaStreamsProcessor
繫結介面,它具有以下契約。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
Binder 將為輸入 KStream
和輸出 KStream
建立繫結,因為你使用的繫結介面包含這些宣告。
除了函式式風格提供的程式設計模型上的明顯差異之外,這裡需要提及的一個特別之處在於繫結名稱是你指定在繫結介面中的名稱。例如,在上面的應用程式中,由於我們使用了 KafkaStreamsProcessor
,繫結名稱是 input
和 output
。繫結屬性需要使用這些名稱。例如 spring.cloud.stream.bindings.input.destination
、spring.cloud.stream.bindings.output.destination
等。請記住,這與函式式風格根本不同,因為在函式式風格中,binder 會為應用程式生成繫結名稱。這是因為應用程式在使用 EnableBinding
的函式式模型中不提供任何繫結介面。
這是另一個有兩個輸入的 sink 示例。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是上面看到的基於 BiFunction
的 processor 的 StreamListener
等價版本。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最後,這裡是具有三個輸入和 curried 函式的應用程式的 StreamListener
等價版本。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-2") GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
你可能會注意到,上面兩個示例更加冗長,因為除了提供 EnableBinding
之外,你還需要編寫自己的自定義繫結介面。使用函式式模型,可以避免所有這些儀式性的細節。
在我們繼續探討 Kafka Streams binder 提供的通用程式設計模型之前,這裡是多個輸出繫結的 StreamListener
版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
回顧一下,我們已經回顧了使用 Kafka Streams binder 時的各種程式設計模型選擇。
binder 為輸入提供 KStream
、KTable
和 GlobalKTable
的繫結能力。KTable
和 GlobalKTable
繫結僅在輸入端可用。Binder 支援 KStream
的輸入和輸出繫結。
Kafka Streams binder 程式設計模型的要點在於,binder 提供了靈活性,你可以選擇完全函數語言程式設計模型,或者使用基於 StreamListener
的命令式方法。
2.4. 程式設計模型的輔助功能
2.4.1. 單個應用程式中的多個 Kafka Streams 處理器
Binder 允許在單個 Spring Cloud Stream 應用程式中包含多個 Kafka Streams processor。你可以擁有如下所示的應用程式。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在這種情況下,binder 將建立 3 個具有不同 application ID 的獨立 Kafka Streams 物件(更多內容見下文)。但是,如果應用程式中有一個以上的 processor,你必須告訴 Spring Cloud Stream 哪些函式需要被啟用。以下是啟用函式的方法。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果你希望某些函式不立即啟用,可以將其從列表中移除。
當你在同一個應用程式中有一個單一的 Kafka Streams processor 和其他型別透過不同 binder 處理的 Function
bean 時(例如,基於常規 Kafka Message Channel binder 的函式 bean),這也適用。
2.4.2. Kafka Streams 應用 ID
Application ID 是 Kafka Streams 應用程式必須提供的強制屬性。Spring Cloud Stream Kafka Streams binder 允許你透過多種方式配置此 application ID。
如果應用程式中只有一個 processor 或 StreamListener
,則可以使用以下屬性在 binder 級別進行設定
spring.cloud.stream.kafka.streams.binder.applicationId
.
為了方便起見,如果你只有一個 processor,還可以使用 spring.application.name
作為屬性來委託設定 application ID。
如果應用程式中有多個 Kafka Streams processor,則需要為每個 processor 設定 application ID。對於函式式模型,可以將其作為屬性附加到每個函式上。
例如,假設你有以下函式。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然後,你可以使用以下 binder 級別屬性為每個函式設定 application ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
對於 StreamListener
,你需要將其設定在 processor 的第一個輸入繫結上。
例如,假設你有以下兩個基於 StreamListener
的 processor。
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
那麼你必須使用以下繫結屬性來設定其 application ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
對於基於函式的模型,這種在繫結級別設定 application ID 的方法也適用。然而,如果你使用函式式模型,像我們上面看到的那樣在 binder 級別按函式設定要容易得多。
對於生產環境部署,強烈建議透過配置顯式指定 application ID。如果你正在自動伸縮應用程式,這一點尤其重要,在這種情況下,你需要確保每個例項都使用相同的 application ID 進行部署。
如果應用程式沒有提供 application ID,那麼在這種情況下,binder 會自動為你生成一個靜態 application ID。這在開發場景中很方便,因為它避免了顯式提供 application ID 的需要。透過這種方式生成的 application ID 在應用程式重啟後將保持靜態。對於函式式模型,生成的 application ID 將是函式 bean 名稱後跟文字 applicationID
,例如如果函式 bean 名稱是 process
,則生成的 ID 是 process-applicationID
。對於 StreamListener
,生成的 application ID 不使用函式 bean 名稱,而是使用包含類名後跟方法名,再後跟文字 applicationId
。
設定應用 ID 總結
-
預設情況下,binder 將為每個函式或
StreamListener
方法自動生成 application ID。 -
如果你有一個單一的 processor,那麼可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果你有多個 processor,則可以使用屬性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
為每個函式設定 application ID。對於StreamListener
,可以使用spring.cloud.stream.kafka.streams.bindings.input.applicationId
來完成,假設輸入繫結名稱是input
。
2.4.3. 使用函式式風格覆蓋繫結器生成的預設繫結名稱
預設情況下,使用函式式風格時,binder 使用上面討論的策略來生成繫結名稱,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0, process-out-0 等。如果你想覆蓋這些繫結名稱,可以透過指定以下屬性來實現。
spring.cloud.stream.function.bindings.<default binding name>
。預設繫結名稱是 binder 生成的原始繫結名稱。
例如,假設你有這個函式。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 將生成名為 process-in-0
、process-in-1
和 process-out-0
的繫結。現在,如果你想將它們完全更改為其他名稱,也許是更具領域特定性的繫結名稱,則可以按如下方式進行。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之後,必須在這些新的繫結名稱上設定所有繫結級別屬性。
請記住,使用上面描述的函數語言程式設計模型時,在大多數情況下遵循預設繫結名稱是有意義的。你可能仍然想進行這種覆蓋的唯一原因是你擁有大量的配置屬性,並且希望將繫結對映到更具領域友好性的名稱。
2.4.4. 設定引導伺服器配置
執行 Kafka Streams 應用程式時,必須提供 Kafka broker 伺服器資訊。如果你沒有提供此資訊,binder 會預設期望 broker 執行在 localhost:9092
。如果不是這種情況,則需要覆蓋此設定。有幾種方法可以做到這一點。
-
使用 boot 屬性 -
spring.kafka.bootstrapServers
-
Binder 級別屬性 -
spring.cloud.stream.kafka.streams.binder.brokers
對於 binder 級別屬性,無論你是否使用透過常規 Kafka binder 提供的 broker 屬性 - spring.cloud.stream.kafka.binder.brokers
都沒有關係。Kafka Streams binder 首先會檢查是否設定了 Kafka Streams binder 特定的 broker 屬性(spring.cloud.stream.kafka.streams.binder.brokers
),如果找不到,則會查詢 spring.cloud.stream.kafka.binder.brokers
。
2.5. 記錄序列化和反序列化
Kafka Streams binder 允許你透過兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換能力。讓我們來看一些細節。
2.5.1. 入站反序列化
鍵始終使用原生 Serdes 進行反序列化。
對於值,預設情況下,入站的反序列化由 Kafka 原生執行。請注意,這是與之前版本 Kafka Streams binder 的預設行為的重大改變,之前版本的反序列化是由框架完成的。
Kafka Streams binder 將嘗試透過檢視 java.util.function.Function|Consumer
或 StreamListener
的型別簽名來推斷匹配的 Serde
型別。以下是它匹配 Serdes 的順序。
-
如果應用程式提供一個型別為
Serde
的 bean,並且返回型別使用傳入鍵或值型別的實際型別進行引數化,那麼它將使用該Serde
進行入站反序列化。例如,如果應用程式中有以下內容,binder 會檢測到KStream
的傳入值型別與Serde
bean 上引數化的型別匹配。它將使用該 Serde 進行入站反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下來,它會檢視型別,檢查它們是否是 Kafka Streams 公開的型別之一。如果是,則使用它們。以下是 binder 將嘗試從 Kafka Streams 匹配的 Serde 型別。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
Kafka Streams 提供的 Serdes 如果都不匹配型別,那麼將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,繫結器假定型別是 JSON 友好的。如果您有多個值物件作為輸入,這將非常有用,因為繫結器會在內部將它們推斷為正確的 Java 型別。但在回退到
JsonSerde
之前,繫結器會檢查 Kafka Streams 配置中設定的預設Serde
,以檢視它是否可以與傳入的 KStream 型別匹配。
如果上述策略都不奏效,那麼應用程式必須透過配置提供 `Serde`。這可以透過兩種方式進行配置 - 繫結級別或預設級別。
首先,繫結器將檢視是否在繫結級別提供了 Serde
。例如,如果您有以下處理器:
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
那麼,您可以使用以下方式提供繫結級別的 Serde
:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您按上述方式為每個輸入繫結提供了 Serde ,則該配置將具有更高的優先順序,並且繫結器將不會進行任何 Serde 推斷。 |
如果您希望將預設的 key/value Serdes 用於入站反序列化,可以在繫結器級別進行設定。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不希望使用 Kafka 提供的原生解碼,可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設設定,因此為了讓 Spring Cloud Stream 反序列化入站值物件,您需要顯式停用原生解碼。
例如,如果您有上面相同的 BiFunction 處理器,那麼可以使用 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
。您需要單獨為所有輸入停用原生解碼。否則,對於那些您沒有停用的輸入,原生解碼仍將應用。
預設情況下,Spring Cloud Stream 將使用 application/json
作為內容型別,並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter
bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
出站序列化與入站反序列化基本遵循相同的規則。與入站反序列化一樣,Spring Cloud Stream 的一個主要變化是出站序列化現在由 Kafka 原生處理。在繫結器 3.0 版本之前,這是由框架本身完成的。
出站的 key 總是由 Kafka 使用繫結器推斷出的匹配 Serde
進行序列化。如果無法推斷 key 的型別,則需要透過配置指定。
值 Serdes 的推斷使用與入站反序列化相同的規則。首先檢查出站型別是否與應用程式中提供的 Bean 匹配。如果不匹配,則檢查是否與 Kafka 暴露的 Serde
(例如 - Integer
, Long
, Short
, Double
, Float
, byte[]
, UUID
和 String
)匹配。如果仍然不匹配,則回退到 Spring Kafka 專案提供的 JsonSerde
,但在此之前會先檢視預設的 Serde
配置是否存在匹配。請記住,所有這些對應用程式都是透明的。如果這些都不奏效,使用者必須透過配置提供要使用的 Serde
。
假設您正在使用上面相同的 BiFunction
處理器。那麼您可以按如下方式配置出站 key/value Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推斷失敗,並且沒有提供繫結級別的 Serdes,則繫結器回退到 JsonSerde
,但會查詢預設 Serdes 是否有匹配。
預設 serdes 的配置方式與上面描述的反序列化部分相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的應用程式使用分支功能並且有多個輸出繫結,則需要為每個繫結單獨配置。再次強調,如果繫結器能夠推斷 Serde
型別,則無需進行此配置。
如果您不希望使用 Kafka 提供的原生編碼,但希望使用框架提供的訊息轉換,則需要顯式停用原生編碼,因為原生編碼是預設設定。例如,如果您有上面相同的 BiFunction 處理器,那麼可以使用 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
。在分支的情況下,您需要單獨為所有輸出停用原生編碼。否則,對於那些您沒有停用的輸出,原生編碼仍將應用。
當由 Spring Cloud Stream 執行轉換時,預設情況下,它將使用 application/json
作為內容型別,並使用適當的 json 訊息轉換器。您可以透過使用以下屬性和相應的 MessageConverter
bean 來使用自定義訊息轉換器。
spring.cloud.stream.bindings.process-out-0.contentType
當停用原生編碼/解碼時,繫結器將不會像原生 Serdes 那樣進行任何推斷。應用程式需要顯式提供所有配置選項。因此,通常建議在編寫 Spring Cloud Stream Kafka Streams 應用程式時,保持使用預設的序列化/反序列化選項,並堅持使用 Kafka Streams 提供的原生序列化/反序列化。您必須使用框架提供的訊息轉換功能的唯一場景是您的上游生產者使用了特定的序列化策略。在這種情況下,您需要使用匹配的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde
機制時,應用程式必須確保繫結器能夠正確地將入站和出站型別對映到適當的 Serde
,否則可能會失敗。
值得一提的是,上面概述的資料序列化/反序列化方法僅適用於處理器的邊緣,即 - 入站和出站。您的業務邏輯仍然可能需要呼叫 Kafka Streams API,這些 API 明確需要 Serde
物件。這些仍然是應用程式的責任,必須由開發人員相應地處理。
2.6. 錯誤處理
Apache Kafka Streams 提供了原生處理反序列化錯誤的異常處理能力。有關此支援的詳細資訊,請參閱 此處。開箱即用,Apache Kafka Streams 提供了兩種反序列化異常處理器 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顧名思義,前者會記錄錯誤並繼續處理下一條記錄,後者會記錄錯誤並失敗。LogAndFailExceptionHandler
是預設的反序列化異常處理器。
2.6.1. 處理繫結器中的反序列化異常
Kafka Streams 繫結器允許使用以下屬性指定上述反序列化異常處理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上面兩種反序列化異常處理器外,繫結器還提供了第三種,用於將錯誤記錄(poison pills)傳送到 DLQ(死信佇列)主題。以下是啟用此 DLQ 異常處理器的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
設定上述屬性後,所有反序列化錯誤的記錄會自動傳送到 DLQ 主題。
您可以透過以下方式設定 DLQ 訊息釋出的 topic 名稱。
您可以提供一個 DlqDestinationResolver
的實現,這是一個函式式介面。DlqDestinationResolver
將 ConsumerRecord
和異常作為輸入,然後允許指定一個 topic 名稱作為輸出。透過訪問 Kafka ConsumerRecord
,可以在 BiFunction
的實現中檢查 header 記錄。
以下是提供 DlqDestinationResolver
實現的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
提供 DlqDestinationResolver
的實現時,要記住一個重要的事情是,binder 中的 provisioner 不會自動為應用程式建立主題。這是因為 binder 無法推斷出實現可能傳送到的所有 DLQ 主題的名稱。因此,如果你使用此策略提供 DLQ 名稱,確保這些主題事先建立是應用程式的責任。
如果應用程式中存在 DlqDestinationResolver
bean,則該 bean 具有更高的優先順序。如果您不想遵循此方法,而是希望使用配置提供靜態 DLQ 名稱,可以設定以下屬性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果設定了此屬性,則錯誤記錄將傳送到 custom-dlq
主題。如果應用程式沒有使用上述任何一種策略,它將建立一個名為 error.<input-topic-name>.<application-id>
的 DLQ 主題。例如,如果您的繫結目標主題是 inputTopic
並且應用程式 ID 是 process-applicationId
,則預設的 DLQ 主題是 error.inputTopic.process-applicationId
。如果您打算啟用 DLQ,始終建議為每個輸入繫結顯式建立一個 DLQ 主題。
2.6.2. 每個輸入消費者繫結的 DLQ
屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這意味著如果同一應用程式中有多個函式或 StreamListener
方法,此屬性將應用於所有這些方法。但是,如果您在單個處理器中有多個處理器或多個輸入繫結,則可以使用繫結器為每個輸入消費者繫結提供的更細粒度的 DLQ 控制。
如果您有以下處理器:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
並且您只想在第一個輸入繫結上啟用 DLQ,而在第二個繫結上啟用 logAndSkip,則可以在消費者級別進行設定,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以這種方式設定反序列化異常處理器比在繫結器級別設定具有更高的優先順序。
2.6.3. DLQ 分割槽
預設情況下,記錄使用與原始記錄相同的分割槽釋出到死信主題。這意味著死信主題必須至少擁有與原始記錄相同數量的分割槽。
要更改此行為,請將 DlqPartitionFunction
實現作為 `@Bean` 新增到應用程式上下文中。只能存在一個這樣的 bean。該函式提供了消費者組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord
和異常。例如,如果您總是希望路由到分割槽 0,可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果你將消費者繫結的 dlqPartitions 屬性設定為 1(並且 binder 的 minPartitionCount 等於 1 ),則無需提供 DlqPartitionFunction ;框架將始終使用分割槽 0。如果你將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或 binder 的 minPartitionCount 大於 1 ),則**必須**提供 DlqPartitionFunction bean,即使分割槽數量與原始主題相同。 |
在使用 Kafka Streams 繫結器中的異常處理功能時,需要牢記幾點。
-
屬性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這意味著如果同一應用程式中有多個函式或StreamListener
方法,此屬性將應用於所有這些方法。 -
反序列化的異常處理與原生反序列化和框架提供的訊息轉換保持一致。
2.6.4. 處理繫結器中的生產異常
與上面描述的反序列化異常處理支援不同,繫結器沒有提供用於處理生產異常的此類一流機制。但是,您仍然可以使用 StreamsBuilderFactoryBean
定製器來配置生產異常處理器,您可以在下面的後續章節中找到更多詳細資訊。
2.7. 重試關鍵業務邏輯
有些場景您可能希望重試對應用程式至關重要的部分業務邏輯。可能是從 Kafka Streams 處理器呼叫關係型資料庫或 REST 端點。這些呼叫可能由於網路問題或遠端服務不可用等各種原因而失敗。更常見的情況是,如果您可以再次嘗試,這些故障可能會自行解決。預設情況下,Kafka Streams 繫結器會為所有輸入繫結建立 RetryTemplate
bean。
如果函式具有以下簽名:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
並且使用預設繫結名稱,RetryTemplate
將註冊為 process-in-0-RetryTemplate
。這遵循了繫結名稱(process-in-0
)後跟字面量 -RetryTemplate
的約定。在有多個輸入繫結的情況下,每個繫結都會有一個單獨的 RetryTemplate
bean 可用。如果應用程式中存在自定義 RetryTemplate
bean 並透過 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
提供,則該 bean 優先於任何輸入繫結級別的重試模板配置屬性。
一旦繫結器中的 RetryTemplate
被注入到應用程式中,就可以用來重試應用程式的任何關鍵部分。以下是一個示例:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以使用自定義 RetryTemplate
如下所示。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
注意,當重試耗盡時,預設情況下,將丟擲最後一個異常,導致處理器終止。如果您希望處理異常並繼續處理,可以將 RecoveryCallback
新增到 execute
方法中:以下是一個示例。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有關 RetryTemplate、重試策略、回退策略等的更多資訊,請參閱 Spring Retry 專案。
2.8. 狀態儲存
當使用高階 DSL 並呼叫觸發狀態儲存的適當方法時,狀態儲存會由 Kafka Streams 自動建立。
如果您想將傳入的 KTable
繫結物化為一個命名的狀態儲存,可以使用以下策略。
假設您有以下函式。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然後透過設定以下屬性,傳入的 KTable
資料將被物化到命名的狀態儲存中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在應用程式中將自定義狀態儲存定義為 bean,繫結器會檢測到並將它們新增到 Kafka Streams builder 中。特別是在使用 Processor API 時,您需要手動註冊狀態儲存。為此,您可以在應用程式中建立一個 StateStore
bean。以下是定義此類 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
這些狀態儲存隨後可以直接由應用程式訪問。
在引導過程中,上述 bean 將由繫結器處理並傳遞給 Streams builder 物件。
訪問狀態儲存
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
這不適用於註冊全域性狀態儲存。要註冊全域性狀態儲存,請參閱下面關於定製 StreamsBuilderFactoryBean
的章節。
2.9. 互動式查詢
Kafka Streams 繫結器 API 暴露了一個名為 InteractiveQueryService
的類,用於互動式查詢狀態儲存。您可以將其作為 Spring bean 在應用程式中訪問。從應用程式訪問此 bean 的簡單方法是 autowire
該 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您獲得了此 bean 的訪問權,就可以查詢您感興趣的特定狀態儲存。見下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在啟動過程中,上述檢索儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的過程中。在這種情況下,重試此操作會很有用。Kafka Streams 繫結器提供了一個簡單的重試機制來適應這種情況。
以下是可用於控制此重試的兩個屬性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為
1000
毫秒。
如果有多個 Kafka Streams 應用程式例項正在執行,那麼在互動式查詢它們之前,您需要確定哪個應用程式例項託管您要查詢的特定 key。InteractiveQueryService
API 提供了識別主機資訊的方法。
為了使這工作,您必須配置屬性 application.server
如下所示:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
程式碼片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9.1. 透過 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法檢索與給定儲存和 key 組合關聯的 KeyQueryMetadata
物件。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法檢索與給定儲存和 key 組合關聯的 KakfaStreams
物件。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
2.10. 健康指示器
健康指示器需要依賴 spring-boot-starter-actuator
。對於 Maven,請使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams 繫結器提供了一個健康指示器來檢查底層 streams 執行緒的狀態。Spring Cloud Stream 定義了一個屬性 management.health.binders.enabled
來啟用健康指示器。請參閱 Spring Cloud Stream 文件。
健康指示器為每個 streams 執行緒的元資料提供以下詳細資訊:
-
執行緒名稱
-
執行緒狀態:
CREATED
,RUNNING
,PARTITIONS_REVOKED
,PARTITIONS_ASSIGNED
,PENDING_SHUTDOWN
或DEAD
-
活動任務:任務 ID 和分割槽
-
備用任務:任務 ID 和分割槽
預設情況下,只顯示全域性狀態(UP
或 DOWN
)。要顯示詳細資訊,必須將屬性 management.endpoint.health.show-details
設定為 ALWAYS
或 WHEN_AUTHORIZED
。有關健康資訊的更多詳細資訊,請參閱 Spring Boot Actuator 文件。
如果所有註冊的 Kafka 執行緒都處於 RUNNING 狀態,則健康指示器的狀態為 UP 。 |
由於 Kafka Streams 繫結器中有三種獨立的繫結器型別(KStream
、KTable
和 GlobalKTable
),它們都將報告健康狀態。當啟用 show-details
時,報告的一些資訊可能會冗餘。
當同一應用程式中存在多個 Kafka Streams 處理器時,健康檢查將針對所有處理器進行報告,並按 Kafka Streams 的應用程式 ID 進行分類。
2.11. 訪問 Kafka Streams 指標
Spring Cloud Stream Kafka Streams 繫結器提供了 Kafka Streams 指標,可以透過 Micrometer MeterRegistry
匯出。
對於 Spring Boot 2.2.x 版本,指標支援是透過繫結器提供的自定義 Micrometer 指標實現提供的。對於 Spring Boot 2.3.x 版本,Kafka Streams 指標支援透過 Micrometer 原生提供。
透過 Boot actuator 端點訪問指標時,請確保將 metrics
新增到屬性 management.endpoints.web.exposure.include
中。然後您可以訪問 /actuator/metrics
獲取所有可用指標的列表,然後可以透過相同的 URI(/actuator/metrics/<metric-name>
)單獨訪問這些指標。
2.12. 混合使用高階 DSL 和低階 Processor API
Kafka Streams 提供了兩種 API 變體。它有一個更高級別的 DSL 類似 API,您可以在其中連結各種操作,這對許多函數語言程式設計人員來說可能很熟悉。Kafka Streams 還提供了對低級別 Processor API 的訪問。Processor API 雖然功能非常強大,並且能夠以低得多的級別控制事物,但本質上是命令式的。Spring Cloud Stream 的 Kafka Streams 繫結器允許您使用高階 DSL 或混合使用 DSL 和 Processor API。混合使用這兩種變體為您提供了許多選項來控制應用程式中的各種用例。應用程式可以使用 transform
或 process
方法呼叫來訪問 Processor API。
以下是使用 process
API 在 Spring Cloud Stream 應用程式中結合使用 DSL 和 Processor API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是使用 transform
API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法呼叫是終端操作,而 transform
API 是非終端操作,它可以返回一個可能經過轉換的 KStream
,您可以使用它繼續使用 DSL 或 Processor API 進行進一步處理。
2.13. 出站分割槽支援
Kafka Streams 處理器通常將處理後的輸出傳送到出站 Kafka 主題。如果出站主題已分割槽,並且處理器需要將出站資料傳送到特定分割槽,應用程式需要提供一個 StreamPartitioner
型別的 bean。有關詳細資訊,請參閱 StreamPartitioner。讓我們看一些示例。
這就是我們已經多次看到的同一個處理器:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
以下是出站繫結目標:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主題 outputTopic
有 4 個分割槽,如果您不提供分割槽策略,Kafka Streams 將使用預設分割槽策略,這可能不是您想要的,具體取決於特定的用例。假設您希望將匹配 spring
的任何 key 傳送到分割槽 0,cloud
傳送到分割槽 1,stream
傳送到分割槽 2,其餘所有傳送到分割槽 3。這就是您需要在應用程式中執行的操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
這是一個基本的實現,但是,您可以訪問記錄的 key/value、主題名稱和總分割槽數。因此,如果需要,您可以實現複雜的分割槽策略。
您還需要將此 bean 名稱與應用程式配置一起提供。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
應用程式中的每個輸出主題都需要像這樣單獨配置。
2.14. StreamsBuilderFactoryBean 定製器
通常需要定製建立 KafkaStreams
物件的 StreamsBuilderFactoryBean
。基於 Spring Kafka 提供的底層支援,繫結器允許您定製 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanCustomizer
來定製 StreamsBuilderFactoryBean
本身。然後,一旦透過此定製器訪問到 StreamsBuilderFactoryBean
,就可以使用 KafkaStreamsCustomizer
定製相應的 KafkaStreams
。這兩個定製器都是 Spring for Apache Kafka 專案的一部分。
以下是使用 StreamsBuilderFactoryBeanCustomizer
的示例。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面展示了一個示例,說明了您可以如何定製 StreamsBuilderFactoryBean
。本質上,您可以呼叫 StreamsBuilderFactoryBean
中任何可用的修改操作來對其進行定製。繫結器將在 factory bean 啟動之前呼叫此定製器。
一旦您獲得了 StreamsBuilderFactoryBean
的訪問權,您也可以定製底層 KafkaStreams
物件。以下是這樣做的藍圖。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
將在底層 KafkaStreams
啟動之前由 StreamsBuilderFactoryBeabn
呼叫。
整個應用程式中只能有一個 StreamsBuilderFactoryBeanCustomizer
。那麼如何考慮多個 Kafka Streams 處理器呢,因為每個處理器都由獨立的 StreamsBuilderFactoryBean
物件支援?在這種情況下,如果這些處理器需要不同的定製,應用程式需要基於應用程式 ID 應用一些過濾。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.14.1. 使用定製器註冊全域性狀態儲存
如上所述,繫結器沒有提供一種一流的方式來註冊全域性狀態儲存作為一項功能。為此,您需要使用定製器。以下是如何做到這一點。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
再次強調,如果您有多個處理器,您需要透過過濾掉其他 StreamsBuilderFactoryBean
物件(如上所述使用應用程式 ID)將全域性狀態儲存附加到正確的 StreamsBuilder
。
2.14.2. 使用定製器註冊生產異常處理器
在錯誤處理部分,我們提到繫結器沒有提供一種一流的方式來處理生產異常。儘管如此,您仍然可以使用 StreamsBuilderFacotryBean
定製器來註冊生產異常處理器。見下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再一次,如果您有多個處理器,您可能需要適當設定正確的 StreamsBuilderFactoryBean
。您也可以使用配置屬性新增此類生產異常處理器(更多內容見下文),但如果您選擇採用程式設計式方法,這是一種選擇。
2.15. 時間戳提取器
Kafka Streams 允許您根據各種時間戳概念控制消費者記錄的處理。預設情況下,Kafka Streams 提取嵌入在消費者記錄中的時間戳元資料。您可以透過為每個輸入繫結提供不同的 TimestampExtractor
實現來更改此預設行為。以下是有關如何做到的一些詳細資訊。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然後為每個消費者繫結設定上述 TimestampExtractor
bean 名稱。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳過某個輸入消費者繫結以設定自定義時間戳提取器,該消費者將使用預設設定。
2.16. 具有 Kafka Streams 繫結器和常規 Kafka 繫結器的多繫結器
您可以在一個應用程式中同時擁有基於常規 Kafka 繫結器的函式/消費者/供應者和基於 Kafka Streams 的處理器。但是,您不能在單個函式或消費者中混合使用它們。
以下是同一應用程式中同時包含兩種繫結器型別元件的示例。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
這是配置中的相關部分。
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您有與上面相同的應用程式,但處理兩個不同的 Kafka 叢集,例如,常規的 process
作用於叢集 1 和叢集 2(從叢集 1 接收資料併發送到叢集 2),而 Kafka Streams 處理器作用於叢集 2。那麼您必須使用 Spring Cloud Stream 提供的多繫結器功能。
在這種場景下,您的配置可能會發生如下變化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
請注意上面的配置。我們有兩種型別的繫結器,但總共有三個繫結器,第一個是基於叢集 1 的常規 Kafka 繫結器(kafka1
),然後是基於叢集 2 的另一個 Kafka 繫結器(kafka2
),最後是 kstream
型別的繫結器(kafka3
)。應用程式中的第一個處理器從 kafka1
接收資料併發布到 kafka2
,其中兩個繫結器都基於常規 Kafka 繫結器,但連線到不同的叢集。第二個處理器是 Kafka Streams 處理器,它從 kafka3
消費資料,kafka3
與 kafka2
是同一個叢集,但繫結器型別不同。
由於 Kafka Streams 繫結器系列中有三種不同的繫結器型別 - kstream
、ktable
和 globalktable
- 如果您的應用程式有多個基於這些繫結器中的任何一個的繫結,則需要顯式提供繫結器型別。
例如,如果您有以下處理器:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那麼,這必須在多繫結器場景中配置如下。請注意,這僅在存在真正的多繫結器場景時才需要,即單個應用程式中存在處理多個叢集的多個處理器。在這種情況下,需要為繫結顯式提供繫結器,以便與來自其他處理器的繫結器型別和叢集區分開來。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.17. 狀態清理
預設情況下,當繫結停止時會呼叫 Kafkastreams.cleanup()
方法。請參閱Spring Kafka 文件。要修改此行為,只需將一個 CleanupConfig
`@Bean`(配置為在啟動、停止或兩者都不執行時進行清理)新增到應用程式上下文中即可;該 bean 將被檢測到並注入到 factory bean 中。
2.18. Kafka Streams 拓撲視覺化
Kafka Streams 繫結器提供了以下 actuator 端點,用於檢索拓撲描述,您可以使用外部工具視覺化該拓撲。
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
您需要包含 Spring Boot 的 actuator 和 web 依賴項才能訪問這些端點。此外,您還需要將 kafkastreamstopology
新增到 management.endpoints.web.exposure.include
屬性中。預設情況下,kafkastreamstopology
端點是停用的。
2.19. 配置選項
本節包含 Kafka Streams 繫結器使用的配置選項。
有關繫結器的常見配置選項和屬性,請參閱核心文件。
2.19.1. Kafka Streams 繫結器屬性
以下屬性可在繫結器級別使用,並且必須以 spring.cloud.stream.kafka.streams.binder.
為字首。
- configuration
-
包含 Apache Kafka Streams API 相關屬性的 key/value 對 Map。此屬性必須以
spring.cloud.stream.kafka.streams.binder.
為字首。以下是使用此屬性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有關所有可用於 streams 配置的屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig
JavaDocs。您可以在 StreamsConfig
中設定的所有配置都可以透過此屬性設定。使用此屬性時,它適用於整個應用程式,因為這是繫結器級別的屬性。如果應用程式中有多個處理器,所有處理器都將獲取這些屬性。對於像 application.id
這樣的屬性,這會帶來問題,因此您必須仔細檢查如何使用此繫結器級別的 configuration
屬性對映 StreamsConfig
中的屬性。
- functions.<function-bean-name>.applicationId
-
僅適用於函式式處理器。可用於在應用程式中為每個函式設定應用程式 ID。在有多個函式的情況下,這是一種方便的設定應用程式 ID 的方式。
- functions.<function-bean-name>.configuration
-
僅適用於函式式處理器。包含 Apache Kafka Streams API 相關屬性的 key/value 對 Map。這類似於上面描述的繫結器級別的
configuration
屬性,但此級別的configuration
屬性僅限於命名的函式。當您有多個處理器並且希望基於特定函式限制對配置的訪問時,您可能希望使用此屬性。這裡可以使用所有StreamsConfig
屬性。 - brokers
-
Broker URL
預設值:
localhost
- zkNodes
-
Zookeeper URL
預設值:
localhost
- deserializationExceptionHandler
-
反序列化錯誤處理器型別。此處理器應用於繫結器級別,因此適用於應用程式中的所有輸入繫結。可以在消費者繫結級別以更細粒度的方式進行控制。可能的值有 -
logAndContinue
、logAndFail
或sendToDlq
。預設值:
logAndFail
- applicationId
-
方便地在 binder 級別全域性設定 Kafka Streams 應用的
application.id
。如果應用包含多個函式或StreamListener
方法,則應以不同方式設定應用 ID。請參閱上面關於應用 ID 設定的詳細討論。預設值:應用將生成一個靜態應用 ID。有關更多詳細資訊,請參閱應用 ID 部分。
- stateStoreRetry.maxAttempts
-
嘗試連線狀態儲存的最大嘗試次數。
預設值:1
- stateStoreRetry.backoffPeriod
-
重試時嘗試連線狀態儲存的回退週期。
預設值:1000 ms
- consumerProperties
-
binder 級別的任意消費者屬性。
- producerProperties
-
binder 級別的任意生產者屬性。
2.19.2. Kafka Streams 生產者屬性
以下屬性僅適用於 Kafka Streams 生產者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
為字首。為方便起見,如果存在多個輸出 binding 並且它們都需要一個共同值,則可以使用字首 spring.cloud.stream.kafka.streams.default.producer.
進行配置。
- keySerde
-
要使用的 key serde
預設值:請參閱上面關於訊息反/序列化的討論
- valueSerde
-
要使用的 value serde
預設值:請參閱上面關於訊息反/序列化的討論
- useNativeEncoding
-
啟用/停用原生編碼的標誌
預設值:
true
。
streamPartitionerBeanName:要在消費者端使用的自定義出站分割槽器 bean 名稱。應用可以將自定義的 StreamPartitioner
作為 Spring bean 提供,並將該 bean 的名稱提供給生產者,以代替預設分割槽器。
+ 預設值:請參閱上面關於出站分割槽支援的討論。
- producedAs
-
處理器生產到的 sink 元件的自定義名稱。
預設值:
none
(由 Kafka Streams 生成)
2.19.3. Kafka Streams 消費者屬性
以下屬性適用於 Kafka Streams 消費者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
為字首。為方便起見,如果存在多個輸入 binding 並且它們都需要一個共同值,則可以使用字首 spring.cloud.stream.kafka.streams.default.consumer.
進行配置。
- applicationId
-
按輸入 binding 設定 application.id。這僅適用於基於
StreamListener
的處理器,對於基於函式的處理器,請參閱上面概述的其他方法。預設值:參見上文。
- keySerde
-
要使用的 key serde
預設值:請參閱上面關於訊息反/序列化的討論
- valueSerde
-
要使用的 value serde
預設值:請參閱上面關於訊息反/序列化的討論
- materializedAs
-
在使用傳入 KTable 型別時要物化(materialize)的狀態儲存
預設值:
none
。 - useNativeDecoding
-
啟用/停用原生解碼的標誌
預設值:
true
。 - dlqName
-
DLQ topic 名稱。
預設值:請參閱上面關於錯誤處理和 DLQ 的討論。
- startOffset
-
如果沒有已提交的 offset 可供消費,則從該 offset 開始消費。這主要在消費者首次消費 topic 時使用。Kafka Streams 使用
earliest
作為預設策略,binder 也使用相同的預設值。可以使用此屬性將其覆蓋為latest
。預設值:
earliest
。
注意:在消費者端使用 resetOffsets
對 Kafka Streams binder 沒有影響。與基於訊息通道的 binder 不同,Kafka Streams binder 不會按需定位到開始或結束。
- deserializationExceptionHandler
-
反序列化錯誤處理程式型別。此處理程式應用於每個消費者 binding,而不是應用於 binder 級別的屬性。可能的值包括 -
logAndContinue
,logAndFail
或sendToDlq
預設值:
logAndFail
- timestampExtractorBeanName
-
要在消費者端使用的特定時間戳提取器 bean 名稱。應用可以將
TimestampExtractor
作為 Spring bean 提供,並將該 bean 的名稱提供給消費者,以代替預設提取器。預設值:請參閱上面關於時間戳提取器的討論。
- eventTypes
-
此 binding 支援的事件型別列表,用逗號分隔。
預設值:
none
- eventTypeHeaderKey
-
透過此 binding 傳入的每個記錄上的事件型別 header key。
預設值:
event_type
- consumedAs
-
處理器從中消費的 source 元件的自定義名稱。
預設值:
none
(由 Kafka Streams 生成)
2.19.4. 併發性的特別說明
在 Kafka Streams 中,您可以使用 num.stream.threads
屬性控制處理器可以建立的執行緒數。您可以使用上面在 binder、functions、producer 或 consumer 級別描述的各種 configuration
選項來實現這一點。您也可以使用核心 Spring Cloud Stream 為此目的提供的 concurrency
屬性。使用此屬性時,您需要在消費者端使用它。當函式或 StreamListener
中有多個輸入 binding 時,請在第一個輸入 binding 上設定此屬性。例如,設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
後,binder 將其轉換為 num.stream.threads
。如果您有多個處理器,其中一個處理器定義了 binding 級別的併發性,而其他處理器沒有,則沒有定義 binding 級別併發性的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的 binder 範圍屬性。如果此 binder 配置不可用,則應用將使用 Kafka Streams 設定的預設值。
附錄
附錄 A: 構建
A.1. 基本編譯和測試
要構建原始碼,您需要安裝 JDK 1.7。
構建使用 Maven wrapper,因此您無需安裝特定版本的 Maven。要啟用測試,您應該在構建之前執行 Kafka 伺服器 0.9 或更高版本。有關執行伺服器的更多資訊,請參閱下文。
主構建命令是
$ ./mvnw clean install
您也可以新增 '-DskipTests' 來跳過執行測試(如果您願意)。
您也可以自行安裝 Maven(>=3.3.3),並在以下示例中使用 mvn 命令代替 ./mvnw 。如果您這樣做,並且您的本地 Maven 設定不包含 spring 預釋出工件的倉庫宣告,您可能還需要新增 -P spring 。 |
請注意,您可能需要透過設定 MAVEN_OPTS 環境變數來增加分配給 Maven 的記憶體量,例如設定為 -Xmx512m -XX:MaxPermSize=128m 。我們已嘗試在 .mvn 配置中涵蓋這一點,因此如果您發現必須這樣做才能使構建成功,請提交 issue,以便將這些設定新增到原始碼管理中。 |
通常,需要中介軟體的專案會包含一個 docker-compose.yml
檔案,因此可以考慮使用 Docker Compose 在 Docker 容器中執行中介軟體伺服器。
A.2. 文件
有一個 "full" profile 將生成文件。
A.3. 使用程式碼
如果您沒有 IDE 偏好,我們建議您在使用程式碼時使用 Spring Tools Suite 或 Eclipse。我們使用 m2eclipse Eclipse 外掛來支援 Maven。其他 IDE 和工具也應該可以正常工作。
A.3.1. 使用 m2eclipse 匯入到 Eclipse
我們建議在使用 Eclipse 時使用 m2eclipse Eclipse 外掛。如果您尚未安裝 m2eclipse,可以從“Eclipse Marketplace”獲取。
遺憾的是,m2e 尚不支援 Maven 3.3,因此將專案匯入 Eclipse 後,您還需要告訴 m2eclipse 使用專案中的 .settings.xml
檔案。如果您不這樣做,可能會看到與專案中 POM 相關的許多不同錯誤。開啟 Eclipse 偏好設定,展開 Maven 偏好設定,然後選擇 User Settings。在 User Settings 欄位中,單擊 Browse 並導航到您匯入的 Spring Cloud 專案,選擇該專案中的 .settings.xml
檔案。點選 Apply 然後點選 OK 儲存偏好設定更改。
或者,您可以將 .settings.xml 中的倉庫設定複製到您自己的 ~/.m2/settings.xml 檔案中。 |
A.3.2. 不使用 m2eclipse 匯入到 Eclipse
如果您不想使用 m2eclipse,可以使用以下命令生成 Eclipse 專案元資料
$ ./mvnw eclipse:eclipse
生成的 Eclipse 專案可以透過從 file
選單中選擇 import existing projects
來匯入。
[[contributing] == 貢獻
Spring Cloud 以非限制性的 Apache 2.0 許可釋出,並遵循非常標準的 Github 開發流程,使用 Github 跟蹤器處理問題並將 pull requests 合併到 master 分支。如果您想貢獻哪怕是微不足道的東西,請不要猶豫,但請遵循以下準則。
A.4. 簽署貢獻者許可協議
在我們接受非瑣碎的補丁或 pull request 之前,我們需要您簽署貢獻者協議。簽署貢獻者協議並不授予任何人向主倉庫提交程式碼的權利,但這確實意味著我們可以接受您的貢獻,如果我們接受,您將獲得作者署名。活躍的貢獻者可能會被邀請加入核心團隊,並獲得合併 pull requests 的能力。
A.5. 程式碼約定和內務管理
以下所有項對於 pull request 來說都不是必不可少的,但它們都會有所幫助。它們也可以在原始 pull request 之後但在合併之前新增。
-
使用 Spring Framework 程式碼格式約定。如果您使用 Eclipse,可以使用 Spring Cloud Build 專案中的
eclipse-code-formatter.xml
檔案匯入格式化程式設定。如果使用 IntelliJ,可以使用 Eclipse Code Formatter Plugin 匯入相同的檔案。 -
確保所有新的
.java
檔案都有一個簡單的 Javadoc 類註釋,至少包含一個標識您的@author
標籤,最好至少有一段描述該類的用途。 -
將 ASF 許可頭註釋新增到所有新的
.java
檔案(從專案中的現有檔案複製)。 -
如果您對
.java
檔案進行了實質性修改(不僅僅是表面更改),請將自己新增為@author
。 -
新增一些 Javadocs,如果更改了名稱空間,還要新增一些 XSD 文件元素。
-
一些單元測試也會大有幫助——總得有人去做。
-
如果沒有其他人使用您的分支,請將其基於當前的 master(或主專案中的其他目標分支)進行 rebase。
-
在編寫提交訊息時,請遵循這些約定,如果您正在修復現有問題,請在提交訊息末尾新增
Fixes gh-XXXX
(其中 XXXX 是問題編號)。