Apache Pulsar 支援

Apache Pulsar 透過提供對 Spring for Apache Pulsar 專案的自動配置來支援。

當 classpath 中包含 org.springframework.pulsar:spring-pulsar 時,Spring Boot 將自動配置並註冊經典的(命令式)Spring for Apache Pulsar 元件。當 classpath 中包含 org.springframework.pulsar:spring-pulsar-reactive 時,它也會對響應式元件執行相同的操作。

Spring Boot 提供了 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive starter,分別用於方便地收集命令式和響應式使用的依賴項。

連線到 Pulsar

當你使用 Pulsar starter 時,Spring Boot 將自動配置並註冊一個 PulsarClient Bean。

預設情況下,應用會嘗試連線到本地的 Pulsar 例項,地址為 pulsar://:6650。可以透過將 spring.pulsar.client.service-url 屬性設定為其他值來調整此設定。

該值必須是一個有效的 Pulsar Protocol URL

你可以透過指定任何以 spring.pulsar.client.* 為字首的應用屬性來配置客戶端。

如果你需要對配置有更多的控制,可以考慮註冊一個或多個 PulsarClientBuilderCustomizer Bean。

認證

要連線到需要認證的 Pulsar 叢集,你需要透過設定 pluginClassName 以及外掛所需的任何引數來指定要使用的認證外掛。你可以將引數設定為引數名稱到引數值的對映。以下示例展示瞭如何配置 AuthenticationOAuth2 外掛。

  • 屬性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

你需要確保在 spring.pulsar.client.authentication.param.* 下定義的名稱與你的認證外掛預期的名稱(通常是駝峰命名法)完全匹配。Spring Boot 不會嘗試對這些條目進行任何寬鬆繫結。

例如,如果你想為 AuthenticationOAuth2 認證外掛配置發行者 URL,你必須使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurlissuer-url,該設定將不會應用於外掛。

這種缺乏寬鬆繫結也使得使用環境變數配置認證引數變得麻煩,因為在轉換過程中會丟失大小寫敏感性。如果你使用環境變數來設定引數,你需要遵循 Spring for Apache Pulsar 參考文件中列出的 這些步驟 才能使其正常工作。

SSL

預設情況下,Pulsar 客戶端以明文方式與 Pulsar 服務通訊。你可以遵循 Spring for Apache Pulsar 參考文件中列出的 這些步驟 來啟用 TLS 加密。

有關客戶端和認證的完整詳細資訊,請參閱 Spring for Apache Pulsar 參考文件

響應式連線到 Pulsar

當響應式自動配置被啟用時,Spring Boot 將自動配置並註冊一個 ReactivePulsarClient Bean。

ReactivePulsarClient 適配了前面描述的 PulsarClient 例項。因此,請遵循上一節來配置 ReactivePulsarClient 使用的 PulsarClient

連線到 Pulsar 管理端

Spring for Apache Pulsar 的 PulsarAdministration 客戶端也會被自動配置。

預設情況下,應用會嘗試連線到本地的 Pulsar 例項,地址為 https://:8080。可以透過將 spring.pulsar.admin.service-url 屬性設定為形如 (http|https)://<host>:<port> 的其他值來調整此設定。

如果你需要對配置有更多的控制,可以考慮註冊一個或多個 PulsarAdminBuilderCustomizer Bean。

認證

當訪問需要認證的 Pulsar 叢集時,管理客戶端需要與常規 Pulsar 客戶端相同的安全配置。你可以使用上述 認證配置,只需將 spring.pulsar.client.authentication 替換為 spring.pulsar.admin.authentication

要在啟動時建立一個 topic,請新增一個型別為 PulsarTopic 的 Bean。如果該 topic 已存在,則忽略此 Bean。

傳送訊息

Spring 的 PulsarTemplate 會被自動配置,你可以使用它來發送訊息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依賴於 PulsarProducerFactory 來建立底層的 Pulsar producer。Spring Boot 自動配置也提供了這個 producer factory,預設情況下,它會快取建立的 producer。你可以透過指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 為字首的應用屬性來配置 producer factory 和快取設定。

如果你需要對 producer factory 配置有更多的控制,可以考慮註冊一個或多個 ProducerBuilderCustomizer Bean。這些 customizer 會應用於所有建立的 producer。你也可以在傳送訊息時傳入一個 ProducerBuilderCustomizer,以便僅影響當前的 producer。

如果你需要對要傳送的訊息有更多的控制,可以在傳送訊息時傳入一個 TypedMessageBuilderCustomizer

響應式傳送訊息

當響應式自動配置被啟用時,Spring 的 ReactivePulsarTemplate 會被自動配置,你可以使用它來發送訊息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依賴於 ReactivePulsarSenderFactory 來實際建立底層的 sender。Spring Boot 自動配置也提供了這個 sender factory,預設情況下,它會快取建立的 producer。你可以透過指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 為字首的應用屬性來配置 sender factory 和快取設定。

如果你需要對 sender factory 配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageSenderBuilderCustomizer Bean。這些 customizer 會應用於所有建立的 sender。你也可以在傳送訊息時傳入一個 ReactiveMessageSenderBuilderCustomizer,以便僅影響當前的 sender。

如果你需要對要傳送的訊息有更多的控制,可以在傳送訊息時傳入一個 MessageSpecBuilderCustomizer

接收訊息

當 Apache Pulsar 基礎設施存在時,任何 Bean 都可以使用 @PulsarListener 註解來建立一個 listener endpoint。以下元件在 someTopic topic 上建立一個 listener endpoint

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自動配置提供了 @PulsarListener 所需的所有元件,例如 PulsarListenerContainerFactory 以及它用來構建底層 Pulsar consumer 的 consumer factory。你可以透過指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 為字首的應用屬性來配置這些元件。

如果你需要對 consumer factory 的配置有更多的控制,可以考慮註冊一個或多個 ConsumerBuilderCustomizer Bean。這些 customizer 會應用於 factory 建立的所有 consumer,因此也會應用於所有 @PulsarListener 例項。你也可以透過設定 @PulsarListener 註解的 consumerCustomizer 屬性來定製單個 listener。

如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。

響應式接收訊息

當 Apache Pulsar 基礎設施存在且響應式自動配置被啟用時,任何 Bean 都可以使用 @ReactivePulsarListener 註解來建立一個響應式 listener endpoint。以下元件在 someTopic topic 上建立一個響應式 listener endpoint

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自動配置提供了 @ReactivePulsarListener 所需的所有元件,例如 ReactivePulsarListenerContainerFactory 以及它用來構建底層響應式 Pulsar consumer 的 consumer factory。你可以透過指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 為字首的應用屬性來配置這些元件。

如果你需要對 consumer factory 的配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageConsumerBuilderCustomizer Bean。這些 customizer 會應用於 factory 建立的所有 consumer,因此也會應用於所有 @ReactivePulsarListener 例項。你也可以透過設定 @ReactivePulsarListener 註解的 consumerCustomizer 屬性來定製單個 listener。

如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> Bean。

讀取訊息

Pulsar reader 介面使應用能夠手動管理 cursor。當你使用 reader 連線到 topic 時,你需要指定 reader 連線到 topic 時從哪條訊息開始讀取。

當 Apache Pulsar 基礎設施存在時,任何 Bean 都可以使用 @PulsarReader 註解來使用 reader 消費訊息。以下元件建立一個 reader endpoint,它從 someTopic topic 的開頭開始讀取訊息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依賴於 PulsarReaderFactory 來建立底層的 Pulsar reader。Spring Boot 自動配置提供了這個 reader factory,可以透過設定任何以 spring.pulsar.reader.* 為字首的應用屬性來對其進行定製。

如果你需要對 reader factory 的配置有更多的控制,可以考慮註冊一個或多個 ReaderBuilderCustomizer Bean。這些 customizer 會應用於 factory 建立的所有 reader,因此也會應用於所有 @PulsarReader 例項。你也可以透過設定 @PulsarReader 註解的 readerCustomizer 屬性來定製單個 listener。

如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。

響應式讀取訊息

當 Apache Pulsar 基礎設施存在且響應式自動配置被啟用時,Spring 提供了 ReactivePulsarReaderFactory,你可以使用它來建立一個 reader,以便以響應式方式讀取訊息。以下元件使用提供的 factory 建立一個 reader,並從 someTopic topic 中讀取 5 分鐘前的單條訊息

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自動配置提供了這個 reader factory,可以透過設定任何以 spring.pulsar.reader.* 為字首的應用屬性來對其進行定製。

如果你需要對 reader factory 配置有更多的控制,可以在使用 factory 建立 reader 時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer 例項。

如果你需要對 reader factory 配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageReaderBuilderCustomizer Bean。這些 customizer 會應用於所有建立的 reader。你也可以在建立 reader 時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer,以便僅將定製應用於建立的 reader。

有關上述任何元件的更多詳細資訊以及發現其他可用特性,請參閱 Spring for Apache Pulsar 參考文件

事務支援

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 時支援事務。

目前使用響應式變體時不支援事務。

spring.pulsar.transaction.enabled 屬性設定為 true 將會

@PulsarListenertransactional 屬性可用於微調何時應將事務與 listener 一起使用。

為了更好地控制 Spring for Apache Pulsar 的事務特性,你應該定義自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果預設自動配置的 PulsarTransactionManager 不合適,你也可以定義一個 PulsarAwareTransactionManager Bean。

額外 Pulsar 屬性

自動配置支援的屬性顯示在附錄的 整合屬性 部分。請注意,在大多數情況下,這些屬性(連字元命名或駝峰命名)直接對映到 Apache Pulsar 的配置屬性。有關詳細資訊,請參閱 Apache Pulsar 文件。

Pulsar 支援的屬性中,只有一部分可以直接透過 PulsarProperties 類獲取。如果你希望使用未直接支援的附加屬性來調整自動配置的元件,你可以使用上述每個元件支援的定製器。