AMQP

高階訊息佇列協議(AMQP)是一種與平臺無關、面向連線的協議,用於訊息中介軟體。Spring AMQP 專案將 Spring 核心概念應用於開發基於 AMQP 的訊息解決方案。Spring Boot 透過 RabbitMQ 為使用 AMQP 提供了多項便利,包括 `spring-boot-starter-amqp` Starter。

RabbitMQ 支援

RabbitMQ 是一個基於 AMQP 協議的輕量級、可靠、可伸縮且可移植的訊息代理。Spring 使用 RabbitMQ 透過 AMQP 協議進行通訊。

RabbitMQ 配置由 `spring.rabbitmq.*` 中的外部配置屬性控制。例如,你可以在 `application.properties` 中宣告以下部分:

  • 屬性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你可以使用 `addresses` 屬性配置相同的連線:

  • 屬性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
當以這種方式指定地址時,`host` 和 `port` 屬性將被忽略。如果地址使用 `amqps` 協議,則會自動啟用 SSL 支援。

有關更多支援的基於屬性的配置選項,請參見 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更底層細節,請定義一個 ConnectionFactoryCustomizer Bean。

如果上下文中存在 ConnectionNameStrategy Bean,它將自動用於命名由自動配置的 CachingConnectionFactory 建立的連線。

要對 RabbitTemplate 進行應用範圍內的累加式定製,請使用 RabbitTemplateCustomizer Bean。

有關更多詳細資訊,請參閱 理解 AMQP:RabbitMQ 使用的協議

傳送訊息

Spring 的 AmqpTemplateAmqpAdmin 會自動配置,你可以將它們直接自動注入到你自己的 Bean 中,如以下示例所示:

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter Bean,它會自動關聯到自動配置的 AmqpTemplate

如果需要,任何定義為 Bean 的 Queue 會自動用於在 RabbitMQ 例項上宣告相應的佇列。

要重試操作,你可以在 AmqpTemplate 上啟用重試(例如,在 Broker 連線丟失的情況下):

  • 屬性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

預設情況下,重試是停用的。你也可以透過宣告 RabbitRetryTemplateCustomizer Bean 來程式設計方式定製 RetryTemplate

如果你需要建立更多 RabbitTemplate 例項或者想覆蓋預設配置,Spring Boot 提供了一個 RabbitTemplateConfigurer Bean,你可以使用它來初始化一個 RabbitTemplate,其設定與自動配置使用的工廠相同。

傳送訊息到 Stream

要傳送訊息到特定的 Stream,請指定 Stream 的名稱,如以下示例所示:

  • 屬性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定義了 MessageConverterStreamMessageConverterProducerCustomizer Bean,它會自動關聯到自動配置的 RabbitStreamTemplate

如果你需要建立更多 RabbitStreamTemplate 例項或者想覆蓋預設配置,Spring Boot 提供了一個 RabbitStreamTemplateConfigurer Bean,你可以使用它來初始化一個 RabbitStreamTemplate,其設定與自動配置使用的工廠相同。

接收訊息

當存在 Rabbit 基礎設施時,任何 Bean 都可以用 @RabbitListener 註解進行標記,以建立監聽器端點。如果沒有定義 RabbitListenerContainerFactory,則會自動配置一個預設的 SimpleRabbitListenerContainerFactory,你可以使用 `spring.rabbitmq.listener.type` 屬性切換到 Direct 容器。如果定義了 MessageConverterMessageRecoverer Bean,它會自動關聯到預設工廠。

以下示例元件在 `someQueue` 佇列上建立了一個監聽器端點:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有關更多詳細資訊,請參閱 @EnableRabbit

如果你需要建立更多 RabbitListenerContainerFactory 例項或者想覆蓋預設配置,Spring Boot 提供了一個 SimpleRabbitListenerContainerFactoryConfigurer 和一個 DirectRabbitListenerContainerFactoryConfigurer,你可以使用它們來初始化一個 SimpleRabbitListenerContainerFactory 和一個 DirectRabbitListenerContainerFactory,其設定與自動配置使用的工廠相同。

你選擇哪種容器型別並不重要。這兩個 Bean 都由自動配置暴露出來。

例如,以下配置類暴露了另一個使用特定 MessageConverter 的工廠:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然後你可以在任何帶有 @RabbitListener 註解的方法中使用該工廠,如下所示:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以啟用重試來處理監聽器丟擲異常的情況。預設情況下,使用 RejectAndDontRequeueRecoverer,但你可以定義自己的 MessageRecoverer。當重試次數用盡後,訊息將被拒絕,並且如果 Broker 配置了死信交換機,訊息會被丟棄或路由到死信交換機。預設情況下,重試是停用的。你也可以透過宣告 RabbitRetryTemplateCustomizer Bean 來程式設計方式定製 RetryTemplate

預設情況下,如果重試被停用且監聽器丟擲異常,訊息將無限期地重試投遞。你可以透過兩種方式修改此行為:將 `defaultRequeueRejected` 屬性設定為 `false`,這樣將不再嘗試重新投遞;或者丟擲 AmqpRejectAndDontRequeueException 來表明訊息應該被拒絕。後者是啟用重試且達到最大投遞嘗試次數時使用的機制。