互動式查詢
Kafka Streams binder API 暴露了一個名為 InteractiveQueryService
的類,用於互動式查詢狀態儲存。您可以在應用中將其作為 Spring bean 進行訪問。從應用中訪問此 bean 的簡單方法是 autowire
(自動注入)該 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您獲得了此 bean 的訪問許可權,就可以查詢您感興趣的特定狀態儲存。參見下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在啟動期間,上述檢索儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的過程中。在這種情況下,重試此操作會很有用。Kafka Streams binder 提供了一個簡單的重試機制來應對這種情況。
以下是您可以用來控制此重試的兩個屬性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為
1000
毫秒。
如果存在執行中的 Kafka Streams 應用的多個例項,那麼在互動式查詢它們之前,您需要確定哪個應用例項託管了您正在查詢的特定鍵。InteractiveQueryService
API 提供了識別主機資訊的方法。
為了使其工作,您必須按如下方式配置屬性 application.server
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些程式碼片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有關這些主機查詢方法的更多資訊,請參閱這些方法的 Javadoc。對於這些方法,在啟動期間,如果底層的 KafkaStreams 物件尚未準備好,它們也可能丟擲異常。前面提到的重試屬性同樣適用於這些方法。
透過 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KeyQueryMetadata
物件。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KafkaStreams
物件。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
定製儲存查詢引數
有時您可能需要在透過 InteractiveQueryService
查詢儲存之前微調儲存查詢引數。為此,從 binder 的 4.0.1
版本開始,您可以提供一個用於 StoreQueryParametersCustomizer
的 bean,它是一個函式式介面,帶有一個接受 StoreQueryParameter
作為引數的 customize
方法。以下是其方法簽名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
透過這種方法,應用可以進一步定製 StoreQueryParameters
,例如啟用陳舊(stale)的儲存。
當此 bean 存在於應用中時,InteractiveQueryService
在查詢狀態儲存之前會呼叫其 customize
方法。
請記住,應用中必須有一個唯一的 StoreQueryParametersCustomizer 的 bean 可用。 |