基於事件型別的 Kafka Streams 應用路由
常規基於訊息通道的 Binder 中可用的路由函式在 Kafka Streams Binder 中不受支援。然而,Kafka Streams Binder 仍然透過入站記錄上的事件型別記錄頭提供路由功能。
要啟用基於事件型別的路由,應用程式必須提供以下屬性。
spring.cloud.stream.kafka.streams.bindings.<繫結名稱>.consumer.eventTypes
.
這可以是逗號分隔的值。
例如,假設我們有這個函式
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我們還假設只有當傳入記錄的事件型別為 foo
或 bar
時,才執行此函式中的業務邏輯。這可以透過在繫結上使用 eventTypes
屬性來表達,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
現在,當應用程式執行時,binder 會檢查每個傳入記錄的 event_type
頭部,看其值是否設定為 foo
或 bar
。如果兩者都沒有找到,則會跳過函式執行。
預設情況下,binder 期望記錄頭部鍵為 event_type
,但這可以按繫結進行更改。例如,如果想將此繫結的頭部鍵從預設值更改為 my_event
,可以按如下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
在使用 Kafka Streams binder 的事件路由功能時,它會使用 byte array Serde
來反序列化所有傳入記錄。只有當記錄頭部與事件型別匹配時,它才會使用實際的 Serde
來進行適當的反序列化,無論是使用配置的 Serde 還是推斷的 Serde。如果您在繫結上設定了反序列化異常處理器,這會引入問題,因為預期的反序列化只在呼叫棧更深處發生,從而導致意外錯誤。為了解決這個問題,您可以在繫結上設定以下屬性,以強制 binder 使用配置或推斷的 Serde
,而不是 byte array Serde
。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
透過這種方式,應用程式在使用事件路由功能時可以立即檢測到反序列化問題,並可以採取適當的處理決策。