Apache Pulsar 支援
Apache Pulsar 透過提供 Spring for Apache Pulsar 專案的自動配置來支援。
當 org.springframework.pulsar:spring-pulsar 在類路徑上時,Spring Boot 將自動配置並註冊 Spring for Apache Pulsar 元件。
有一個 spring-boot-starter-pulsar 啟動器,用於方便地收集要使用的依賴項。
連線到 Pulsar
當您使用 Pulsar 啟動器時,Spring Boot 將自動配置並註冊一個 PulsarClient Bean。
預設情況下,應用程式會嘗試連線到 pulsar://:6650 處的本地 Pulsar 例項。這可以透過將 spring.pulsar.client.service-url 屬性設定為不同的值來調整。
| 該值必須是有效的 Pulsar 協議 URL |
您可以透過指定任何 spring.pulsar.client.* 字首的應用程式屬性來配置客戶端。
如果您需要對配置進行更多控制,請考慮註冊一個或多個 PulsarClientBuilderCustomizer Bean。
認證
要連線到需要認證的 Pulsar 叢集,您需要透過設定 pluginClassName 和外掛所需的任何引數來指定要使用的認證外掛。您可以將引數設定為引數名稱到引數值的對映。以下示例展示瞭如何配置 AuthenticationOAuth2 外掛。
-
屬性
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
|
您需要確保 例如,如果您想為 這種缺乏寬鬆繫結也使得使用環境變數進行身份驗證引數變得麻煩,因為在轉換過程中會丟失大小寫敏感性。如果您使用環境變數作為引數,那麼您需要遵循 Spring for Apache Pulsar 參考文件中的這些步驟才能正常工作。 |
連線到 Pulsar 管理
Spring for Apache Pulsar 的 PulsarAdministration 客戶端也會自動配置。
預設情況下,應用程式會嘗試連線到 https://:8080 處的本地 Pulsar 例項。這可以透過將 spring.pulsar.admin.service-url 屬性設定為 (http|https)://<host>:<port> 形式的不同值來調整。
如果您需要對配置進行更多控制,請考慮註冊一個或多個 PulsarAdminBuilderCustomizer Bean。
認證
當訪問需要認證的 Pulsar 叢集時,管理客戶端需要與普通 Pulsar 客戶端相同的安全配置。您可以使用前面提到的認證配置,將 spring.pulsar.client.authentication 替換為 spring.pulsar.admin.authentication。
要在啟動時建立主題,請新增型別為 PulsarTopic 的 Bean。如果主題已存在,則忽略該 Bean。 |
傳送訊息
Spring 的 PulsarTemplate 是自動配置的,您可以使用它傳送訊息,如以下示例所示
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate 依賴於 PulsarProducerFactory 來建立底層 Pulsar 生產者。Spring Boot 自動配置也提供了這個生產者工廠,它預設會快取所建立的生產者。您可以透過指定任何以 spring.pulsar.producer.* 和 spring.pulsar.producer.cache.* 為字首的應用程式屬性來配置生產者工廠和快取設定。
如果您需要對生產者工廠配置進行更多控制,請考慮註冊一個或多個 ProducerBuilderCustomizer Bean。這些定製器應用於所有建立的生產者。您也可以在傳送訊息時傳入一個 ProducerBuilderCustomizer,以僅影響當前生產者。
如果您需要對傳送的訊息進行更多控制,可以在傳送訊息時傳入 TypedMessageBuilderCustomizer。
接收訊息
當存在 Apache Pulsar 基礎設施時,任何 Bean 都可以使用 @PulsarListener 註解來建立監聽器端點。以下元件在 someTopic 主題上建立了一個監聽器端點
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自動配置提供了 PulsarListener 所需的所有元件,例如 PulsarListenerContainerFactory 以及它用於構建底層 Pulsar 消費者的消費者工廠。您可以透過指定任何以 spring.pulsar.listener.* 和 spring.pulsar.consumer.* 為字首的應用程式屬性來配置這些元件。
如果您需要對消費者工廠的配置進行更多控制,請考慮註冊一個或多個 ConsumerBuilderCustomizer Bean。這些定製器適用於工廠建立的所有消費者,因此也適用於所有 @PulsarListener 例項。您還可以透過設定 @PulsarListener 註解的 consumerCustomizer 屬性來定製單個監聽器。
如果您需要對實際容器工廠配置進行更多控制,請考慮註冊一個或多個 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。
讀取訊息
Pulsar 讀取器介面允許應用程式手動管理遊標。當您使用讀取器連線到主題時,您需要指定讀取器連線到主題時開始讀取訊息的位置。
當存在 Apache Pulsar 基礎設施時,任何 Bean 都可以使用 @PulsarReader 註解來使用讀取器消費訊息。以下元件建立了一個讀取器端點,該端點從 someTopic 主題的開頭開始讀取訊息
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader 依賴於 PulsarReaderFactory 來建立底層的 Pulsar 讀取器。Spring Boot 自動配置提供了此讀取器工廠,可以透過設定任何以 spring.pulsar.reader.* 為字首的應用程式屬性進行定製。
如果您需要對讀取器工廠的配置進行更多控制,請考慮註冊一個或多個 ReaderBuilderCustomizer Bean。這些定製器適用於工廠建立的所有讀取器,因此也適用於所有 @PulsarReader 例項。您還可以透過設定 @PulsarReader 註解的 readerCustomizer 屬性來定製單個監聽器。
如果您需要對實際容器工廠配置進行更多控制,請考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。
| 有關上述任何元件的更多詳細資訊以及發現其他可用功能,請參閱 Spring for Apache Pulsar 參考文件。 |
事務支援
Spring for Apache Pulsar 在使用 PulsarTemplate 和 @PulsarListener 時支援事務。
將 spring.pulsar.transaction.enabled 屬性設定為 true 將
-
配置
PulsarTransactionManagerBean -
啟用
PulsarTemplate的事務支援 -
為
@PulsarListener方法啟用事務支援
@PulsarListener 的 transactional 屬性可用於微調何時與監聽器一起使用事務。
要更精細地控制 Spring for Apache Pulsar 事務特性,您應該定義自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果預設自動配置的 PulsarTransactionManager 不適用,您還可以定義 PulsarAwareTransactionManager Bean。
額外的 Pulsar 屬性
自動配置支援的屬性在附錄的整合屬性部分中顯示。請注意,這些屬性(帶連字元或駝峰式)在很大程度上直接對映到 Apache Pulsar 配置屬性。有關詳細資訊,請參閱 Apache Pulsar 文件。
Pulsar 支援的屬性中只有一部分可以透過 PulsarProperties 類直接使用。如果您希望使用未直接支援的額外屬性來調整自動配置的元件,可以使用上述每個元件支援的定製器。