Apache Pulsar 支援
Apache Pulsar 透過提供對 Spring for Apache Pulsar 專案的自動配置來支援。
當 classpath 中包含 org.springframework.pulsar:spring-pulsar
時,Spring Boot 將自動配置並註冊經典的(命令式)Spring for Apache Pulsar 元件。當 classpath 中包含 org.springframework.pulsar:spring-pulsar-reactive
時,它也會對響應式元件執行相同的操作。
Spring Boot 提供了 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
starter,分別用於方便地收集命令式和響應式使用的依賴項。
連線到 Pulsar
當你使用 Pulsar starter 時,Spring Boot 將自動配置並註冊一個 PulsarClient
Bean。
預設情況下,應用會嘗試連線到本地的 Pulsar 例項,地址為 pulsar://:6650
。可以透過將 spring.pulsar.client.service-url
屬性設定為其他值來調整此設定。
該值必須是一個有效的 Pulsar Protocol URL |
你可以透過指定任何以 spring.pulsar.client.*
為字首的應用屬性來配置客戶端。
如果你需要對配置有更多的控制,可以考慮註冊一個或多個 PulsarClientBuilderCustomizer
Bean。
認證
要連線到需要認證的 Pulsar 叢集,你需要透過設定 pluginClassName
以及外掛所需的任何引數來指定要使用的認證外掛。你可以將引數設定為引數名稱到引數值的對映。以下示例展示瞭如何配置 AuthenticationOAuth2
外掛。
-
屬性
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
你需要確保在 例如,如果你想為 這種缺乏寬鬆繫結也使得使用環境變數配置認證引數變得麻煩,因為在轉換過程中會丟失大小寫敏感性。如果你使用環境變數來設定引數,你需要遵循 Spring for Apache Pulsar 參考文件中列出的 這些步驟 才能使其正常工作。 |
響應式連線到 Pulsar
當響應式自動配置被啟用時,Spring Boot 將自動配置並註冊一個 ReactivePulsarClient
Bean。
ReactivePulsarClient
適配了前面描述的 PulsarClient
例項。因此,請遵循上一節來配置 ReactivePulsarClient
使用的 PulsarClient
。
連線到 Pulsar 管理端
Spring for Apache Pulsar 的 PulsarAdministration
客戶端也會被自動配置。
預設情況下,應用會嘗試連線到本地的 Pulsar 例項,地址為 https://:8080
。可以透過將 spring.pulsar.admin.service-url
屬性設定為形如 (http|https)://<host>:<port>
的其他值來調整此設定。
如果你需要對配置有更多的控制,可以考慮註冊一個或多個 PulsarAdminBuilderCustomizer
Bean。
認證
當訪問需要認證的 Pulsar 叢集時,管理客戶端需要與常規 Pulsar 客戶端相同的安全配置。你可以使用上述 認證配置,只需將 spring.pulsar.client.authentication
替換為 spring.pulsar.admin.authentication
。
要在啟動時建立一個 topic,請新增一個型別為 PulsarTopic 的 Bean。如果該 topic 已存在,則忽略此 Bean。 |
傳送訊息
Spring 的 PulsarTemplate
會被自動配置,你可以使用它來發送訊息,如下例所示
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate
依賴於 PulsarProducerFactory
來建立底層的 Pulsar producer。Spring Boot 自動配置也提供了這個 producer factory,預設情況下,它會快取建立的 producer。你可以透過指定任何以 spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為字首的應用屬性來配置 producer factory 和快取設定。
如果你需要對 producer factory 配置有更多的控制,可以考慮註冊一個或多個 ProducerBuilderCustomizer
Bean。這些 customizer 會應用於所有建立的 producer。你也可以在傳送訊息時傳入一個 ProducerBuilderCustomizer
,以便僅影響當前的 producer。
如果你需要對要傳送的訊息有更多的控制,可以在傳送訊息時傳入一個 TypedMessageBuilderCustomizer
。
響應式傳送訊息
當響應式自動配置被啟用時,Spring 的 ReactivePulsarTemplate
會被自動配置,你可以使用它來發送訊息,如下例所示
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate
依賴於 ReactivePulsarSenderFactory
來實際建立底層的 sender。Spring Boot 自動配置也提供了這個 sender factory,預設情況下,它會快取建立的 producer。你可以透過指定任何以 spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為字首的應用屬性來配置 sender factory 和快取設定。
如果你需要對 sender factory 配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageSenderBuilderCustomizer
Bean。這些 customizer 會應用於所有建立的 sender。你也可以在傳送訊息時傳入一個 ReactiveMessageSenderBuilderCustomizer
,以便僅影響當前的 sender。
如果你需要對要傳送的訊息有更多的控制,可以在傳送訊息時傳入一個 MessageSpecBuilderCustomizer
。
接收訊息
當 Apache Pulsar 基礎設施存在時,任何 Bean 都可以使用 @PulsarListener
註解來建立一個 listener endpoint。以下元件在 someTopic
topic 上建立一個 listener endpoint
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自動配置提供了 @PulsarListener
所需的所有元件,例如 PulsarListenerContainerFactory
以及它用來構建底層 Pulsar consumer 的 consumer factory。你可以透過指定任何以 spring.pulsar.listener.*
和 spring.pulsar.consumer.*
為字首的應用屬性來配置這些元件。
如果你需要對 consumer factory 的配置有更多的控制,可以考慮註冊一個或多個 ConsumerBuilderCustomizer
Bean。這些 customizer 會應用於 factory 建立的所有 consumer,因此也會應用於所有 @PulsarListener
例項。你也可以透過設定 @PulsarListener
註解的 consumerCustomizer
屬性來定製單個 listener。
如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>
Bean。
響應式接收訊息
當 Apache Pulsar 基礎設施存在且響應式自動配置被啟用時,任何 Bean 都可以使用 @ReactivePulsarListener
註解來建立一個響應式 listener endpoint。以下元件在 someTopic
topic 上建立一個響應式 listener endpoint
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自動配置提供了 @ReactivePulsarListener
所需的所有元件,例如 ReactivePulsarListenerContainerFactory
以及它用來構建底層響應式 Pulsar consumer 的 consumer factory。你可以透過指定任何以 spring.pulsar.listener.*
和 spring.pulsar.consumer.*
為字首的應用屬性來配置這些元件。
如果你需要對 consumer factory 的配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageConsumerBuilderCustomizer
Bean。這些 customizer 會應用於 factory 建立的所有 consumer,因此也會應用於所有 @ReactivePulsarListener
例項。你也可以透過設定 @ReactivePulsarListener
註解的 consumerCustomizer
屬性來定製單個 listener。
如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>
Bean。
讀取訊息
Pulsar reader 介面使應用能夠手動管理 cursor。當你使用 reader 連線到 topic 時,你需要指定 reader 連線到 topic 時從哪條訊息開始讀取。
當 Apache Pulsar 基礎設施存在時,任何 Bean 都可以使用 @PulsarReader
註解來使用 reader 消費訊息。以下元件建立一個 reader endpoint,它從 someTopic
topic 的開頭開始讀取訊息
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader
依賴於 PulsarReaderFactory
來建立底層的 Pulsar reader。Spring Boot 自動配置提供了這個 reader factory,可以透過設定任何以 spring.pulsar.reader.*
為字首的應用屬性來對其進行定製。
如果你需要對 reader factory 的配置有更多的控制,可以考慮註冊一個或多個 ReaderBuilderCustomizer
Bean。這些 customizer 會應用於 factory 建立的所有 reader,因此也會應用於所有 @PulsarReader
例項。你也可以透過設定 @PulsarReader
註解的 readerCustomizer
屬性來定製單個 listener。
如果你需要對實際的 container factory 配置有更多的控制,可以考慮註冊一個或多個 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>>
Bean。
響應式讀取訊息
當 Apache Pulsar 基礎設施存在且響應式自動配置被啟用時,Spring 提供了 ReactivePulsarReaderFactory
,你可以使用它來建立一個 reader,以便以響應式方式讀取訊息。以下元件使用提供的 factory 建立一個 reader,並從 someTopic
topic 中讀取 5 分鐘前的單條訊息
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自動配置提供了這個 reader factory,可以透過設定任何以 spring.pulsar.reader.*
為字首的應用屬性來對其進行定製。
如果你需要對 reader factory 配置有更多的控制,可以在使用 factory 建立 reader 時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer
例項。
如果你需要對 reader factory 配置有更多的控制,可以考慮註冊一個或多個 ReactiveMessageReaderBuilderCustomizer
Bean。這些 customizer 會應用於所有建立的 reader。你也可以在建立 reader 時傳入一個或多個 ReactiveMessageReaderBuilderCustomizer
,以便僅將定製應用於建立的 reader。
有關上述任何元件的更多詳細資訊以及發現其他可用特性,請參閱 Spring for Apache Pulsar 參考文件。 |
事務支援
Spring for Apache Pulsar 在使用 PulsarTemplate
和 @PulsarListener
時支援事務。
目前使用響應式變體時不支援事務。 |
將 spring.pulsar.transaction.enabled
屬性設定為 true
將會
-
配置一個
PulsarTransactionManager
Bean -
為
PulsarTemplate
啟用事務支援 -
為
@PulsarListener
方法啟用事務支援
@PulsarListener
的 transactional
屬性可用於微調何時應將事務與 listener 一起使用。
為了更好地控制 Spring for Apache Pulsar 的事務特性,你應該定義自己的 PulsarTemplate
和/或 ConcurrentPulsarListenerContainerFactory
Bean。如果預設自動配置的 PulsarTransactionManager
不合適,你也可以定義一個 PulsarAwareTransactionManager
Bean。
額外 Pulsar 屬性
自動配置支援的屬性顯示在附錄的 整合屬性 部分。請注意,在大多數情況下,這些屬性(連字元命名或駝峰命名)直接對映到 Apache Pulsar 的配置屬性。有關詳細資訊,請參閱 Apache Pulsar 文件。
Pulsar 支援的屬性中,只有一部分可以直接透過 PulsarProperties
類獲取。如果你希望使用未直接支援的附加屬性來調整自動配置的元件,你可以使用上述每個元件支援的定製器。