傳送訊息
傳送訊息時,您可以使用以下任何方法
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我們可以從前面列表中最後一種方法開始討論,因為它實際上是最明確的。它允許在執行時提供 AMQP 交換名稱(以及路由鍵)。最後一個引數是負責實際建立訊息例項的回撥。使用此方法傳送訊息的示例如下:以下示例展示瞭如何使用 send 方法傳送訊息
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果您打算一直或大部分時間使用該模板例項傳送到同一個交換機,則可以在模板本身上設定 exchange 屬性。在這種情況下,您可以使用前面列表中的第二種方法。以下示例與前面的示例功能等效
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果模板上同時設定了 exchange 和 routingKey 屬性,則可以使用只接受 Message 的方法。以下示例展示瞭如何實現
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
思考交換機和路由鍵屬性的一個更好的方法是,顯式方法引數總是覆蓋模板的預設值。實際上,即使您沒有在模板上顯式設定這些屬性,也總會有預設值。在這兩種情況下,預設值都是空 String,但這實際上是一個合理的預設值。就路由鍵而言,它最初並非總是必需的(例如,對於 Fanout 交換機)。此外,佇列可能與空 String 繫結到交換機。這些都是依賴於模板路由鍵屬性的預設空 String 值的合理場景。就交換機名稱而言,空 String 經常使用,因為 AMQP 規範將“預設交換機”定義為沒有名稱。由於所有佇列都自動繫結到該預設交換機(這是一個直接交換機),並將其名稱用作繫結值,因此前面列表中的第二種方法可用於透過預設交換機向任何佇列進行簡單的點對點訊息傳遞。您可以將佇列名稱作為 routingKey 提供,方法是在執行時提供方法引數。以下示例展示瞭如何實現
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
另外,您可以建立一個模板,主要或專門用於釋出到單個佇列。以下示例展示瞭如何實現
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
訊息構建器 API
從版本 1.3 開始,MessageBuilder 和 MessagePropertiesBuilder 提供了訊息構建器 API。這些方法提供了一種方便的“流暢”方式來建立訊息或訊息屬性。以下示例展示了流暢 API 的實際應用
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
MessageProperties 上定義的每個屬性都可以設定。其他方法包括 setHeader(String key, String value)、removeHeader(String key)、removeHeaders() 和 copyProperties(MessageProperties properties)。每個屬性設定方法都有一個 set*IfAbsent() 變體。在存在預設初始值的情況下,方法名為 set*IfAbsentOrDefault()。
提供了五個靜態方法來建立初始訊息構建器
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
| 1 | 由構建器建立的訊息的 body 是引數的直接引用。 |
| 2 | 由構建器建立的訊息的 body 是一個新陣列,其中包含引數中位元組的副本。 |
| 3 | 由構建器建立的訊息的 body 是一個新陣列,其中包含引數中的位元組範圍。有關更多詳細資訊,請參見 Arrays.copyOfRange()。 |
| 4 | 由構建器建立的訊息的 body 是對引數 body 的直接引用。引數的屬性被複制到一個新的 MessageProperties 物件中。 |
| 5 | 由構建器建立的訊息的 body 是一個新陣列,其中包含引數 body 的副本。引數的屬性被複制到一個新的 MessageProperties 物件中。 |
提供了三個靜態方法來建立 MessagePropertiesBuilder 例項
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
| 1 | 一個新的訊息屬性物件使用預設值進行初始化。 |
| 2 | 構建器使用提供的屬性物件進行初始化,並且 build() 將返回該物件。 |
| 3 | 引數的屬性被複制到一個新的 MessageProperties 物件中。 |
使用 AmqpTemplate 的 RabbitTemplate 實現,每個 send() 方法都有一個過載版本,它接受一個額外的 CorrelationData 物件。當釋出者確認功能啟用時,此物件會在 AmqpTemplate 中描述的回撥中返回。這允許傳送方將確認(ack 或 nack)與傳送的訊息關聯起來。
從版本 1.6.7 開始,引入了 CorrelationAwareMessagePostProcessor 介面,允許在訊息轉換後修改關聯資料。以下示例展示瞭如何使用它
Message postProcessMessage(Message message, Correlation correlation);
在版本 2.0 中,此介面已棄用。該方法已移至 MessagePostProcessor,並帶有一個委託給 postProcessMessage(Message message) 的預設實現。
同樣從版本 1.6.7 開始,提供了一個名為 CorrelationDataPostProcessor 的新回撥介面。它在所有 MessagePostProcessor 例項(在 send() 方法中提供以及在 setBeforePublishPostProcessors() 中提供)之後呼叫。實現可以更新或替換 send() 方法中提供的關聯資料(如果有)。Message 和原始 CorrelationData(如果有)作為引數提供。以下示例展示瞭如何使用 postProcess 方法
CorrelationData postProcess(Message message, CorrelationData correlationData);
釋出者返回
當模板的 mandatory 屬性為 true 時,返回的訊息由 AmqpTemplate 中描述的回撥提供。
從版本 1.4 開始,RabbitTemplate 支援 SpEL mandatoryExpression 屬性,該屬性針對每個請求訊息作為根評估物件進行評估,解析為 boolean 值。表示式中可以使用 bean 引用,例如 @myBean.isMandatory(#root)。
釋出者返回也可以在 RabbitTemplate 內部用於傳送和接收操作。有關更多資訊,請參見 回覆超時。
批處理
版本 1.4.2 引入了 BatchingRabbitTemplate。它是 RabbitTemplate 的子類,具有重寫的 send 方法,該方法根據 BatchingStrategy 批次處理訊息。只有當批次完成時,訊息才會傳送到 RabbitMQ。以下列表顯示了 BatchingStrategy 介面定義
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
| 批次資料儲存在記憶體中。系統故障時,未傳送的訊息可能會丟失。 |
提供了 SimpleBatchingStrategy。它支援將訊息傳送到單個交換機或路由鍵。它具有以下屬性
-
batchSize:在傳送之前批處理中的訊息數量。 -
bufferLimit:批次訊息的最大大小。如果超過此限制,則會優先於batchSize,並導致傳送部分批次。 -
timeout:當沒有新的活動將訊息新增到批次時,部分批次傳送後的時間。
SimpleBatchingStrategy 透過在每個嵌入訊息前面加上一個四位元組的二進位制長度來格式化批次。透過將 springBatchFormat 訊息屬性設定為 lengthHeader4,將其傳達給接收系統。
預設情況下,偵聽器容器會自動解批處理訊息(透過使用 springBatchFormat 訊息頭)。拒絕批次中的任何訊息都會導致整個批次被拒絕。 |
但是,有關更多資訊,請參見 帶有批處理的 @RabbitListener。