Apache Pulsar 支援

Apache Pulsar 透過提供 Spring for Apache Pulsar 專案的自動配置來支援。

org.springframework.pulsar:spring-pulsar 在類路徑上時,Spring Boot 將自動配置並註冊 Spring for Apache Pulsar 元件。

有一個 spring-boot-starter-pulsar 啟動器,用於方便地收集要使用的依賴項。

連線到 Pulsar

當您使用 Pulsar 啟動器時,Spring Boot 將自動配置並註冊一個 PulsarClient Bean。

預設情況下,應用程式會嘗試連線到 pulsar://:6650 處的本地 Pulsar 例項。這可以透過將 spring.pulsar.client.service-url 屬性設定為不同的值來調整。

該值必須是有效的 Pulsar 協議 URL

您可以透過指定任何 spring.pulsar.client.* 字首的應用程式屬性來配置客戶端。

如果您需要對配置進行更多控制,請考慮註冊一個或多個 PulsarClientBuilderCustomizer Bean。

認證

要連線到需要認證的 Pulsar 叢集,您需要透過設定 pluginClassName 和外掛所需的任何引數來指定要使用的認證外掛。您可以將引數設定為引數名稱到引數值的對映。以下示例展示瞭如何配置 AuthenticationOAuth2 外掛。

  • 屬性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要確保 spring.pulsar.client.authentication.param.* 下定義的名稱與您的認證外掛預期(通常是駝峰式命名)的名稱完全匹配。Spring Boot 不會嘗試對這些條目進行任何形式的寬鬆繫結。

例如,如果您想為 AuthenticationOAuth2 認證外掛配置發行者 URL,則必須使用 spring.pulsar.client.authentication.param.issuerUrl。如果您使用其他形式,例如 issuerurlissuer-url,則該設定將不會應用於外掛。

這種缺乏寬鬆繫結也使得使用環境變數進行身份驗證引數變得麻煩,因為在轉換過程中會丟失大小寫敏感性。如果您使用環境變數作為引數,那麼您需要遵循 Spring for Apache Pulsar 參考文件中的這些步驟才能正常工作。

SSL

預設情況下,Pulsar 客戶端以純文字方式與 Pulsar 服務通訊。您可以按照 Spring for Apache Pulsar 參考文件中的這些步驟啟用 TLS 加密。

有關客戶端和身份驗證的完整詳細資訊,請參閱 Spring for Apache Pulsar 參考文件

連線到 Pulsar 管理

Spring for Apache Pulsar 的 PulsarAdministration 客戶端也會自動配置。

預設情況下,應用程式會嘗試連線到 https://:8080 處的本地 Pulsar 例項。這可以透過將 spring.pulsar.admin.service-url 屬性設定為 (http|https)://<host>:<port> 形式的不同值來調整。

如果您需要對配置進行更多控制,請考慮註冊一個或多個 PulsarAdminBuilderCustomizer Bean。

認證

當訪問需要認證的 Pulsar 叢集時,管理客戶端需要與普通 Pulsar 客戶端相同的安全配置。您可以使用前面提到的認證配置,將 spring.pulsar.client.authentication 替換為 spring.pulsar.admin.authentication

要在啟動時建立主題,請新增型別為 PulsarTopic 的 Bean。如果主題已存在,則忽略該 Bean。

傳送訊息

Spring 的 PulsarTemplate 是自動配置的,您可以使用它傳送訊息,如以下示例所示

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依賴於 PulsarProducerFactory 來建立底層 Pulsar 生產者。Spring Boot 自動配置也提供了這個生產者工廠,它預設會快取所建立的生產者。您可以透過指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 為字首的應用程式屬性來配置生產者工廠和快取設定。

如果您需要對生產者工廠配置進行更多控制,請考慮註冊一個或多個 ProducerBuilderCustomizer Bean。這些定製器應用於所有建立的生產者。您也可以在傳送訊息時傳入一個 ProducerBuilderCustomizer,以僅影響當前生產者。

如果您需要對傳送的訊息進行更多控制,可以在傳送訊息時傳入 TypedMessageBuilderCustomizer

接收訊息

當存在 Apache Pulsar 基礎設施時,任何 Bean 都可以使用 @PulsarListener 註解來建立監聽器端點。以下元件在 someTopic 主題上建立了一個監聽器端點

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自動配置提供了 PulsarListener 所需的所有元件,例如 PulsarListenerContainerFactory 以及它用於構建底層 Pulsar 消費者的消費者工廠。您可以透過指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 為字首的應用程式屬性來配置這些元件。

如果您需要對消費者工廠的配置進行更多控制,請考慮註冊一個或多個 ConsumerBuilderCustomizer Bean。這些定製器適用於工廠建立的所有消費者,因此也適用於所有 @PulsarListener 例項。您還可以透過設定 @PulsarListener 註解的 consumerCustomizer 屬性來定製單個監聽器。

如果您需要對實際容器工廠配置進行更多控制,請考慮註冊一個或多個 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。

讀取訊息

Pulsar 讀取器介面允許應用程式手動管理遊標。當您使用讀取器連線到主題時,您需要指定讀取器連線到主題時開始讀取訊息的位置。

當存在 Apache Pulsar 基礎設施時,任何 Bean 都可以使用 @PulsarReader 註解來使用讀取器消費訊息。以下元件建立了一個讀取器端點,該端點從 someTopic 主題的開頭開始讀取訊息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依賴於 PulsarReaderFactory 來建立底層的 Pulsar 讀取器。Spring Boot 自動配置提供了此讀取器工廠,可以透過設定任何以 spring.pulsar.reader.* 為字首的應用程式屬性進行定製。

如果您需要對讀取器工廠的配置進行更多控制,請考慮註冊一個或多個 ReaderBuilderCustomizer Bean。這些定製器適用於工廠建立的所有讀取器,因此也適用於所有 @PulsarReader 例項。您還可以透過設定 @PulsarReader 註解的 readerCustomizer 屬性來定製單個監聽器。

如果您需要對實際容器工廠配置進行更多控制,請考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。

有關上述任何元件的更多詳細資訊以及發現其他可用功能,請參閱 Spring for Apache Pulsar 參考文件

事務支援

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 時支援事務。

spring.pulsar.transaction.enabled 屬性設定為 true

@PulsarListenertransactional 屬性可用於微調何時與監聽器一起使用事務。

要更精細地控制 Spring for Apache Pulsar 事務特性,您應該定義自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果預設自動配置的 PulsarTransactionManager 不適用,您還可以定義 PulsarAwareTransactionManager Bean。

額外的 Pulsar 屬性

自動配置支援的屬性在附錄的整合屬性部分中顯示。請注意,這些屬性(帶連字元或駝峰式)在很大程度上直接對映到 Apache Pulsar 配置屬性。有關詳細資訊,請參閱 Apache Pulsar 文件。

Pulsar 支援的屬性中只有一部分可以透過 PulsarProperties 類直接使用。如果您希望使用未直接支援的額外屬性來調整自動配置的元件,可以使用上述每個元件支援的定製器。

© . This site is unofficial and not affiliated with VMware.