基於事件型別的 Kafka Streams 應用路由

常規基於訊息通道的 Binder 中可用的路由函式在 Kafka Streams Binder 中不受支援。然而,Kafka Streams Binder 仍然透過入站記錄上的事件型別記錄頭提供路由功能。

要啟用基於事件型別的路由,應用程式必須提供以下屬性。

spring.cloud.stream.kafka.streams.bindings.<繫結名稱>.consumer.eventTypes.

這可以是逗號分隔的值。

例如,假設我們有這個函式

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我們還假設只有當傳入記錄的事件型別為 foobar 時,才執行此函式中的業務邏輯。這可以透過在繫結上使用 eventTypes 屬性來表達,如下所示。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

現在,當應用程式執行時,binder 會檢查每個傳入記錄的 event_type 頭部,看其值是否設定為 foobar。如果兩者都沒有找到,則會跳過函式執行。

預設情況下,binder 期望記錄頭部鍵為 event_type,但這可以按繫結進行更改。例如,如果想將此繫結的頭部鍵從預設值更改為 my_event,可以按如下方式更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

在使用 Kafka Streams binder 的事件路由功能時,它會使用 byte array Serde 來反序列化所有傳入記錄。只有當記錄頭部與事件型別匹配時,它才會使用實際的 Serde 來進行適當的反序列化,無論是使用配置的 Serde 還是推斷的 Serde。如果您在繫結上設定了反序列化異常處理器,這會引入問題,因為預期的反序列化只在呼叫棧更深處發生,從而導致意外錯誤。為了解決這個問題,您可以在繫結上設定以下屬性,以強制 binder 使用配置或推斷的 Serde,而不是 byte array Serde

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

透過這種方式,應用程式在使用事件路由功能時可以立即檢測到反序列化問題,並可以採取適當的處理決策。