Kafka Streams 應用中基於事件型別的路由
Kafka Streams 繫結器不支援常規訊息通道繫結器中可用的路由函式。但是,Kafka Streams 繫結器仍然透過入站記錄上的事件型別記錄頭提供路由功能。
要啟用基於事件型別的路由,應用程式必須提供以下屬性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.
這可以是一個逗號分隔的值。
例如,假設我們有以下函式:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我們還假設,我們只希望在此函式中執行業務邏輯,如果傳入記錄的事件型別為 foo 或 bar。這可以使用繫結上的 eventTypes 屬性表示如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
現在,當應用程式執行時,繫結器會檢查每個傳入記錄的 event_type 頭,並檢視其值是否設定為 foo 或 bar。如果它沒有找到其中任何一個,則將跳過函式執行。
預設情況下,繫結器期望記錄頭鍵為 event_type,但這可以按繫結更改。例如,如果我們要將此繫結上的頭鍵更改為 my_event 而不是預設值,可以按如下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.
當在 Kafka Streams 繫結器中使用事件路由功能時,它使用位元組陣列 Serde 反序列化所有傳入記錄。如果記錄頭匹配事件型別,則它只使用實際的 Serde 透過配置的或推斷的 Serde 進行正確的反序列化。如果您在繫結上設定了反序列化異常處理程式,這會引入問題,因為預期的反序列化只會發生在堆疊下方,從而導致意外錯誤。為了解決這個問題,您可以在繫結上設定以下屬性,以強制繫結器使用配置的或推斷的 Serde,而不是位元組陣列 Serde。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
透過這種方式,應用程式在使用事件路由功能時可以立即檢測到反序列化問題,並可以採取適當的處理決策。