互動式查詢
Kafka Streams binder API 暴露了一個名為 InteractiveQueryService 的類,用於互動式查詢狀態儲存。您可以將其作為 Spring bean 在應用程式中訪問。從應用程式中獲取此 bean 的一種簡單方法是 autowire(自動裝配)該 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您獲取了此 bean 的訪問許可權,就可以查詢您感興趣的特定狀態儲存。請看下面。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在啟動過程中,上述檢索儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的過程中。在這種情況下,重試此操作將很有用。Kafka Streams binder 提供了一個簡單的重試機制來適應這種情況。
以下是您可以用來控制重試的兩個屬性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設為
1。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設為
1000毫秒。
如果存在多個 Kafka Streams 應用程式例項正在執行,那麼在互動式查詢它們之前,您需要確定哪個應用程式例項託管了您正在查詢的特定鍵。InteractiveQueryService API 提供了識別主機資訊的方法。
為了使其正常工作,您必須按如下方式配置屬性 application.server
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些程式碼片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有關這些主機查詢方法的更多資訊,請參閱這些方法的 Javadoc。對於這些方法,在啟動期間,如果底層的 KafkaStreams 物件尚未準備就緒,它們也可能會丟擲異常。上述重試屬性也適用於這些方法。
透過 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KeyQueryMetadata 物件。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法檢索與給定儲存和鍵組合關聯的 KakfaStreams 物件。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
自定義儲存查詢引數
有時,您需要在透過 InteractiveQueryService 查詢儲存之前微調儲存查詢引數。為此,從 binder 的 4.0.1 版本開始,您可以提供一個 StoreQueryParametersCustomizer 的 bean,它是一個函式式介面,帶有一個 customize 方法,該方法接受一個 StoreQueryParameter 作為引數。以下是它的方法簽名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用這種方法,應用程式可以進一步自定義 StoreQueryParameters,例如啟用陳舊(stale)的儲存。
當此 bean 存在於應用程式中時,InteractiveQueryService 將在查詢狀態儲存之前呼叫其 customize 方法。
請記住,應用程式中必須有一個唯一的 StoreQueryParametersCustomizer bean。 |