AMQP 抽象
Spring AMQP 由兩個模組組成(每個模組在分發中都由一個 JAR 表示):spring-amqp
和 spring-rabbit
。'spring-amqp' 模組包含 org.springframework.amqp.core
包。在該包中,您可以找到代表核心 AMQP “模型”的類。我們的目的是提供不依賴於任何特定 AMQP broker 實現或客戶端庫的通用抽象。終端使用者程式碼可以跨供應商實現更具可移植性,因為它只需針對抽象層進行開發。然後,這些抽象由特定於 broker 的模組(如 'spring-rabbit')實現。目前僅有 RabbitMQ 實現。然而,除了 RabbitMQ,這些抽象已在 .NET 中使用 Apache Qpid 進行了驗證。由於 AMQP 在協議級別執行,原則上,您可以使用 RabbitMQ 客戶端連線任何支援相同協議版本的 broker,但我們目前不對任何其他 broker 進行測試。
本概述假設您已經熟悉 AMQP 規範的基礎知識。如果不熟悉,請檢視 其他資源 中列出的資源。
Message
0-9-1 AMQP 規範沒有定義 Message
類或介面。相反,在執行 basicPublish()
等操作時,內容作為位元組陣列引數傳遞,附加屬性作為單獨的引數傳遞。Spring AMQP 定義了一個 Message
類,作為更通用的 AMQP 領域模型表示的一部分。Message
類的目的是將訊息體和屬性封裝在單個例項中,從而使 API 更簡潔。以下示例展示了 Message
類的定義
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
MessageProperties
介面定義了一些常見屬性,例如 'messageId'、'timestamp'、'contentType' 等。您還可以透過呼叫 setHeader(String key, Object value)
方法使用使用者定義的 'headers' 擴充套件這些屬性。
從版本 1.5.7 、1.6.11 、1.7.4 和 2.0.0 開始,如果訊息體是序列化的 Serializable Java 物件,則在執行 toString() 操作(例如日誌訊息)時不再(預設情況下)進行反序列化。這是為了防止不安全的反序列化。預設情況下,僅反序列化 java.util 和 java.lang 類。要恢復到先前的行為,您可以透過呼叫 Message.addAllowedListPatterns(…) 新增允許的類/包模式。支援簡單的 * 萬用字元,例如 com.something.*, *.MyClass 。無法反序列化的訊息體在日誌訊息中以 byte[<size>] 表示。 |
交換器 (Exchange)
Exchange
介面代表一個 AMQP Exchange,Message Producer 將訊息傳送到它。Broker 的虛擬主機中的每個 Exchange 都具有唯一的名稱以及其他一些屬性。以下示例展示了 Exchange
介面
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
正如您所見,Exchange
還有一個由 ExchangeTypes
中定義的常量表示的“型別”。基本型別是:direct
、topic
、fanout
和 headers
。在 core 包中,您可以找到這些型別中每一種的 Exchange
介面實現。這些 Exchange
型別的行為在如何處理與佇列的繫結方面有所不同。例如,Direct
交換器允許佇列透過固定的路由鍵(通常是佇列的名稱)進行繫結。Topic
交換器支援使用路由模式進行繫結,這些模式可能包含 '*' 和 '#' 萬用字元,分別表示“恰好一個”和“零個或多個”。Fanout
交換器會將訊息釋出到所有繫結到它的佇列,而不考慮任何路由鍵。有關這些交換器型別和其他交換器型別的更多資訊,請參閱 AMQP Exchanges。
從 3.2 版本開始,引入了 ConsistentHashExchange
型別,方便應用程式配置階段的使用。它為交換器型別提供了 x-consistent-hash
等選項。允許配置 hash-header
或 hash-property
交換器定義引數。相應的 RabbitMQ rabbitmq_consistent_hash_exchange
外掛必須在 broker 上啟用。有關 Consistent Hash Exchange 的目的、邏輯和行為的更多資訊,請參閱 RabbitMQ 官方文件。
AMQP 規範還要求任何 broker 提供一個“預設”的 direct exchange,它沒有名稱。所有宣告的佇列都透過它們的名稱作為路由鍵繫結到這個預設 Exchange 。您可以在 AmqpTemplate 中瞭解更多關於 Spring AMQP 中預設 Exchange 的用法。 |
佇列 (Queue)
Queue
類代表訊息消費者接收訊息的元件。與各種 Exchange
類一樣,我們的實現旨在抽象表示這個核心 AMQP 型別。以下列表展示了 Queue
類
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
請注意,建構函式接受佇列名稱。根據實現,admin 模板可能提供用於生成唯一命名佇列的方法。這些佇列可以用作“reply-to”地址或用於其他臨時情況。因此,自動生成的佇列的 'exclusive' 和 'autoDelete' 屬性都將被設定為 'true'。
有關使用名稱空間支援宣告佇列(包括佇列引數)的資訊,請參閱 配置 Broker 中關於佇列的部分。 |
繫結 (Binding)
鑑於生產者傳送到交換器,消費者從佇列接收訊息,連線佇列與交換器的繫結對於透過訊息傳遞連線生產者和消費者至關重要。在 Spring AMQP 中,我們定義了一個 Binding
類來表示這些連線。本節回顧了將佇列繫結到交換器的基本選項。
您可以使用固定的路由鍵將佇列繫結到 DirectExchange
,如下例所示
new Binding(someQueue, someDirectExchange, "foo.bar");
您可以使用路由模式將佇列繫結到 TopicExchange
,如下例所示
new Binding(someQueue, someTopicExchange, "foo.*");
您可以不使用路由鍵將佇列繫結到 FanoutExchange
,如下例所示
new Binding(someQueue, someFanoutExchange);
我們還提供了一個 BindingBuilder
來促進“流暢 API”風格,如下例所示
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
為清晰起見,前面的示例展示了 BindingBuilder 類,但當使用 'bind()' 方法的靜態匯入時,這種風格也效果很好。 |
Binding
類例項本身僅包含連線的資料。換句話說,它不是一個“活動”元件。然而,正如您稍後在 配置 Broker 中將看到的,AmqpAdmin
類可以使用 Binding
例項實際觸發 broker 上的繫結操作。此外,正如您在該同一節中看到的,您可以使用 Spring 的 @Bean
註解在 @Configuration
類中定義 Binding
例項。還有一個方便的基類,它進一步簡化了生成 AMQP 相關 bean 定義的方法,並識別佇列、交換器和繫結,以便它們都在應用程式啟動時宣告在 AMQP broker 上。
AmqpTemplate
也定義在核心包中。作為實際 AMQP 訊息傳遞的主要元件之一,它在自己的章節中有詳細討論(參見 AmqpTemplate
)。