AMQP
高階訊息佇列協議(AMQP)是一種與平臺無關、面向連線的協議,用於訊息中介軟體。Spring AMQP 專案將 Spring 核心概念應用於開發基於 AMQP 的訊息解決方案。Spring Boot 透過 RabbitMQ 為使用 AMQP 提供了多項便利,包括 `spring-boot-starter-amqp` Starter。
RabbitMQ 支援
RabbitMQ 是一個基於 AMQP 協議的輕量級、可靠、可伸縮且可移植的訊息代理。Spring 使用 RabbitMQ 透過 AMQP 協議進行通訊。
RabbitMQ 配置由 `spring.rabbitmq.*` 中的外部配置屬性控制。例如,你可以在 `application.properties` 中宣告以下部分:
-
屬性
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,你可以使用 `addresses` 屬性配置相同的連線:
-
屬性
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
當以這種方式指定地址時,`host` 和 `port` 屬性將被忽略。如果地址使用 `amqps` 協議,則會自動啟用 SSL 支援。 |
有關更多支援的基於屬性的配置選項,請參見 RabbitProperties
。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory
的更底層細節,請定義一個 ConnectionFactoryCustomizer
Bean。
如果上下文中存在 ConnectionNameStrategy
Bean,它將自動用於命名由自動配置的 CachingConnectionFactory
建立的連線。
要對 RabbitTemplate
進行應用範圍內的累加式定製,請使用 RabbitTemplateCustomizer
Bean。
有關更多詳細資訊,請參閱 理解 AMQP:RabbitMQ 使用的協議。 |
傳送訊息
Spring 的 AmqpTemplate
和 AmqpAdmin
會自動配置,你可以將它們直接自動注入到你自己的 Bean 中,如以下示例所示:
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter Bean,它會自動關聯到自動配置的 AmqpTemplate 。 |
如果需要,任何定義為 Bean 的 Queue
會自動用於在 RabbitMQ 例項上宣告相應的佇列。
要重試操作,你可以在 AmqpTemplate
上啟用重試(例如,在 Broker 連線丟失的情況下):
-
屬性
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
預設情況下,重試是停用的。你也可以透過宣告 RabbitRetryTemplateCustomizer
Bean 來程式設計方式定製 RetryTemplate
。
如果你需要建立更多 RabbitTemplate
例項或者想覆蓋預設配置,Spring Boot 提供了一個 RabbitTemplateConfigurer
Bean,你可以使用它來初始化一個 RabbitTemplate
,其設定與自動配置使用的工廠相同。
傳送訊息到 Stream
要傳送訊息到特定的 Stream,請指定 Stream 的名稱,如以下示例所示:
-
屬性
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定義了 MessageConverter
、StreamMessageConverter
或 ProducerCustomizer
Bean,它會自動關聯到自動配置的 RabbitStreamTemplate
。
如果你需要建立更多 RabbitStreamTemplate
例項或者想覆蓋預設配置,Spring Boot 提供了一個 RabbitStreamTemplateConfigurer
Bean,你可以使用它來初始化一個 RabbitStreamTemplate
,其設定與自動配置使用的工廠相同。
接收訊息
當存在 Rabbit 基礎設施時,任何 Bean 都可以用 @RabbitListener
註解進行標記,以建立監聽器端點。如果沒有定義 RabbitListenerContainerFactory
,則會自動配置一個預設的 SimpleRabbitListenerContainerFactory
,你可以使用 `spring.rabbitmq.listener.type` 屬性切換到 Direct 容器。如果定義了 MessageConverter
或 MessageRecoverer
Bean,它會自動關聯到預設工廠。
以下示例元件在 `someQueue` 佇列上建立了一個監聽器端點:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
有關更多詳細資訊,請參閱 @EnableRabbit 。 |
如果你需要建立更多 RabbitListenerContainerFactory
例項或者想覆蓋預設配置,Spring Boot 提供了一個 SimpleRabbitListenerContainerFactoryConfigurer
和一個 DirectRabbitListenerContainerFactoryConfigurer
,你可以使用它們來初始化一個 SimpleRabbitListenerContainerFactory
和一個 DirectRabbitListenerContainerFactory
,其設定與自動配置使用的工廠相同。
你選擇哪種容器型別並不重要。這兩個 Bean 都由自動配置暴露出來。 |
例如,以下配置類暴露了另一個使用特定 MessageConverter
的工廠:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然後你可以在任何帶有 @RabbitListener
註解的方法中使用該工廠,如下所示:
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
你可以啟用重試來處理監聽器丟擲異常的情況。預設情況下,使用 RejectAndDontRequeueRecoverer
,但你可以定義自己的 MessageRecoverer
。當重試次數用盡後,訊息將被拒絕,並且如果 Broker 配置了死信交換機,訊息會被丟棄或路由到死信交換機。預設情況下,重試是停用的。你也可以透過宣告 RabbitRetryTemplateCustomizer
Bean 來程式設計方式定製 RetryTemplate
。
預設情況下,如果重試被停用且監聽器丟擲異常,訊息將無限期地重試投遞。你可以透過兩種方式修改此行為:將 `defaultRequeueRejected` 屬性設定為 `false`,這樣將不再嘗試重新投遞;或者丟擲 AmqpRejectAndDontRequeueException 來表明訊息應該被拒絕。後者是啟用重試且達到最大投遞嘗試次數時使用的機制。 |