AMQP
高階訊息佇列協議(AMQP)是一種平臺中立的、線級協議,用於面向訊息的中介軟體。Spring AMQP 專案將 Spring 核心概念應用於基於 AMQP 的訊息解決方案的開發。Spring Boot 透過 RabbitMQ 提供了多種使用 AMQP 的便利功能,包括 spring-boot-starter-amqp 啟動器。
RabbitMQ 支援
RabbitMQ 是一個基於 AMQP 協議的輕量級、可靠、可擴充套件和可移植的訊息代理。Spring 使用 RabbitMQ 透過 AMQP 協議進行通訊。
RabbitMQ 的配置由 spring.rabbitmq.* 中的外部配置屬性控制。例如,你可以在 application.properties 中宣告以下部分:
-
屬性
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,你可以使用 addresses 屬性配置相同的連線:
-
屬性
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以這種方式指定地址時,host 和 port 屬性將被忽略。如果地址使用 amqps 協議,則會自動啟用 SSL 支援。 |
有關更多支援的基於屬性的配置選項,請參閱 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更底層細節,請定義一個 ConnectionFactoryCustomizer bean。
如果上下文中存在 ConnectionNameStrategy bean,它將自動用於命名由自動配置的 CachingConnectionFactory 建立的連線。
要對 RabbitTemplate 進行應用程式範圍的累加定製,請使用 RabbitTemplateCustomizer bean。
| 有關更多詳細資訊,請參閱 理解 AMQP,RabbitMQ 使用的協議。 |
傳送訊息
Spring 的 AmqpTemplate 和 AmqpAdmin 已自動配置,你可以將它們直接自動注入到你的 bean 中,如以下示例所示:
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter bean,它將自動與自動配置的 AmqpTemplate 關聯。 |
如有必要,任何定義為 bean 的 Queue 都將自動用於在 RabbitMQ 例項上宣告相應的佇列。
要重試操作,你可以為 AmqpTemplate 啟用重試(例如,在代理連線丟失的情況下):
-
屬性
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
重試預設是停用的。你還可以透過宣告一個 RabbitTemplateRetrySettingsCustomizer bean,以程式設計方式自定義 RetryTemplate。
如果你需要建立更多 RabbitTemplate 例項,或者想要覆蓋預設設定,Spring Boot 提供了一個 RabbitTemplateConfigurer bean,你可以使用它來初始化一個 RabbitTemplate,使其具有與自動配置使用的工廠相同的設定。
向流傳送訊息
要向特定流傳送訊息,請指定流的名稱,如以下示例所示:
-
屬性
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定義了 MessageConverter、StreamMessageConverter 或 ProducerCustomizer bean,它將自動與自動配置的 RabbitStreamTemplate 關聯。
如果你需要建立更多 RabbitStreamTemplate 例項,或者想要覆蓋預設設定,Spring Boot 提供了一個 RabbitStreamTemplateConfigurer bean,你可以使用它來初始化一個 RabbitStreamTemplate,使其具有與自動配置使用的工廠相同的設定。
接收訊息
當 Rabbit 基礎設施存在時,任何 bean 都可以用 @RabbitListener 註解來建立監聽器端點。如果沒有定義 RabbitListenerContainerFactory,則會自動配置一個預設的 SimpleRabbitListenerContainerFactory,你可以使用 spring.rabbitmq.listener.type 屬性切換到直接容器。如果定義了 MessageConverter 或 MessageRecoverer bean,它將自動與預設工廠關聯。
以下示例元件在 someQueue 佇列上建立了一個監聽器端點:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
有關更多詳細資訊,請參閱 @EnableRabbit。 |
如果你需要建立更多 RabbitListenerContainerFactory 例項,或者想要覆蓋預設設定,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurer 和 DirectRabbitListenerContainerFactoryConfigurer,你可以使用它們來初始化一個 SimpleRabbitListenerContainerFactory 和一個 DirectRabbitListenerContainerFactory,使其具有與自動配置使用的工廠相同的設定。
| 你選擇哪種容器型別並不重要。這兩個 bean 都由自動配置暴露。 |
例如,以下配置類暴露了另一個使用特定 MessageConverter 的工廠:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory {
return ...
}
}
然後,你可以在任何帶有 @RabbitListener 註解的方法中使用該工廠,如下所示:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
你可以啟用重試來處理監聽器丟擲異常的情況。預設情況下,使用 RejectAndDontRequeueRecoverer,但你可以定義自己的 MessageRecoverer。當重試耗盡時,訊息將被拒絕,如果代理配置為這樣做,則會被丟棄或路由到死信交換。預設情況下,重試是停用的。你還可以透過宣告一個 RabbitListenerRetrySettingsCustomizer bean,以程式設計方式自定義 RetryPolicy。
預設情況下,如果重試被停用且監聽器丟擲異常,則會無限期地重試投遞。你可以透過兩種方式修改此行為:將 defaultRequeueRejected 屬性設定為 false,這樣就不會嘗試重新投遞;或者丟擲 AmqpRejectAndDontRequeueException 以表示訊息應該被拒絕。後者是在啟用重試且達到最大投遞嘗試次數時使用的機制。 |