AMQP

高階訊息佇列協議(AMQP)是一種平臺中立的、線級協議,用於面向訊息的中介軟體。Spring AMQP 專案將 Spring 核心概念應用於基於 AMQP 的訊息解決方案的開發。Spring Boot 透過 RabbitMQ 提供了多種使用 AMQP 的便利功能,包括 spring-boot-starter-amqp 啟動器。

RabbitMQ 支援

RabbitMQ 是一個基於 AMQP 協議的輕量級、可靠、可擴充套件和可移植的訊息代理。Spring 使用 RabbitMQ 透過 AMQP 協議進行通訊。

RabbitMQ 的配置由 spring.rabbitmq.* 中的外部配置屬性控制。例如,你可以在 application.properties 中宣告以下部分:

  • 屬性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你可以使用 addresses 屬性配置相同的連線:

  • 屬性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以這種方式指定地址時,hostport 屬性將被忽略。如果地址使用 amqps 協議,則會自動啟用 SSL 支援。

有關更多支援的基於屬性的配置選項,請參閱 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更底層細節,請定義一個 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,它將自動用於命名由自動配置的 CachingConnectionFactory 建立的連線。

要對 RabbitTemplate 進行應用程式範圍的累加定製,請使用 RabbitTemplateCustomizer bean。

有關更多詳細資訊,請參閱 理解 AMQP,RabbitMQ 使用的協議

傳送訊息

Spring 的 AmqpTemplateAmqpAdmin 已自動配置,你可以將它們直接自動注入到你的 bean 中,如以下示例所示:

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以類似的方式注入。如果定義了 MessageConverter bean,它將自動與自動配置的 AmqpTemplate 關聯。

如有必要,任何定義為 bean 的 Queue 都將自動用於在 RabbitMQ 例項上宣告相應的佇列。

要重試操作,你可以為 AmqpTemplate 啟用重試(例如,在代理連線丟失的情況下):

  • 屬性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重試預設是停用的。你還可以透過宣告一個 RabbitTemplateRetrySettingsCustomizer bean,以程式設計方式自定義 RetryTemplate

如果你需要建立更多 RabbitTemplate 例項,或者想要覆蓋預設設定,Spring Boot 提供了一個 RabbitTemplateConfigurer bean,你可以使用它來初始化一個 RabbitTemplate,使其具有與自動配置使用的工廠相同的設定。

向流傳送訊息

要向特定流傳送訊息,請指定流的名稱,如以下示例所示:

  • 屬性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定義了 MessageConverterStreamMessageConverterProducerCustomizer bean,它將自動與自動配置的 RabbitStreamTemplate 關聯。

如果你需要建立更多 RabbitStreamTemplate 例項,或者想要覆蓋預設設定,Spring Boot 提供了一個 RabbitStreamTemplateConfigurer bean,你可以使用它來初始化一個 RabbitStreamTemplate,使其具有與自動配置使用的工廠相同的設定。

接收訊息

當 Rabbit 基礎設施存在時,任何 bean 都可以用 @RabbitListener 註解來建立監聽器端點。如果沒有定義 RabbitListenerContainerFactory,則會自動配置一個預設的 SimpleRabbitListenerContainerFactory,你可以使用 spring.rabbitmq.listener.type 屬性切換到直接容器。如果定義了 MessageConverterMessageRecoverer bean,它將自動與預設工廠關聯。

以下示例元件在 someQueue 佇列上建立了一個監聽器端點:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有關更多詳細資訊,請參閱 @EnableRabbit

如果你需要建立更多 RabbitListenerContainerFactory 例項,或者想要覆蓋預設設定,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,你可以使用它們來初始化一個 SimpleRabbitListenerContainerFactory 和一個 DirectRabbitListenerContainerFactory,使其具有與自動配置使用的工廠相同的設定。

你選擇哪種容器型別並不重要。這兩個 bean 都由自動配置暴露。

例如,以下配置類暴露了另一個使用特定 MessageConverter 的工廠:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory {
		return ...
	}

}

然後,你可以在任何帶有 @RabbitListener 註解的方法中使用該工廠,如下所示:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以啟用重試來處理監聽器丟擲異常的情況。預設情況下,使用 RejectAndDontRequeueRecoverer,但你可以定義自己的 MessageRecoverer。當重試耗盡時,訊息將被拒絕,如果代理配置為這樣做,則會被丟棄或路由到死信交換。預設情況下,重試是停用的。你還可以透過宣告一個 RabbitListenerRetrySettingsCustomizer bean,以程式設計方式自定義 RetryPolicy

預設情況下,如果重試被停用且監聽器丟擲異常,則會無限期地重試投遞。你可以透過兩種方式修改此行為:將 defaultRequeueRejected 屬性設定為 false,這樣就不會嘗試重新投遞;或者丟擲 AmqpRejectAndDontRequeueException 以表示訊息應該被拒絕。後者是在啟用重試且達到最大投遞嘗試次數時使用的機制。
© . This site is unofficial and not affiliated with VMware.