互動式查詢

Kafka Streams binder API 暴露了一個名為 InteractiveQueryService 的類,用於互動式查詢狀態儲存。您可以在應用中將其作為 Spring bean 進行訪問。從應用中訪問此 bean 的簡單方法是 autowire(自動注入)該 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您獲得了此 bean 的訪問許可權,就可以查詢您感興趣的特定狀態儲存。參見下文。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在啟動期間,上述檢索儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的過程中。在這種情況下,重試此操作會很有用。Kafka Streams binder 提供了一個簡單的重試機制來應對這種情況。

以下是您可以用來控制此重試的兩個屬性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為 1000 毫秒。

如果存在執行中的 Kafka Streams 應用的多個例項,那麼在互動式查詢它們之前,您需要確定哪個應用例項託管了您正在查詢的特定鍵。InteractiveQueryService API 提供了識別主機資訊的方法。

為了使其工作,您必須按如下方式配置屬性 application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些程式碼片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有關這些主機查詢方法的更多資訊,請參閱這些方法的 Javadoc。對於這些方法,在啟動期間,如果底層的 KafkaStreams 物件尚未準備好,它們也可能丟擲異常。前面提到的重試屬性同樣適用於這些方法。

透過 InteractiveQueryService 可用的其他 API 方法

使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KeyQueryMetadata 物件。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KafkaStreams 物件。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

定製儲存查詢引數

有時您可能需要在透過 InteractiveQueryService 查詢儲存之前微調儲存查詢引數。為此,從 binder 的 4.0.1 版本開始,您可以提供一個用於 StoreQueryParametersCustomizer 的 bean,它是一個函式式介面,帶有一個接受 StoreQueryParameter 作為引數的 customize 方法。以下是其方法簽名。

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

透過這種方法,應用可以進一步定製 StoreQueryParameters,例如啟用陳舊(stale)的儲存。

當此 bean 存在於應用中時,InteractiveQueryService 在查詢狀態儲存之前會呼叫其 customize 方法。

請記住,應用中必須有一個唯一的 StoreQueryParametersCustomizer 的 bean 可用。