傳送訊息
傳送訊息時,您可以使用以下任何方法
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我們可以從前面列表中最後一個方法開始討論,因為它實際上是最明確的。它允許在執行時提供 AMQP 交換機名稱(以及路由鍵)。最後一個引數是負責實際建立訊息例項的回撥。使用此方法傳送訊息的示例如下所示:以下示例展示瞭如何使用 send
方法傳送訊息
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果您打算大部分或全部時間使用該模板例項向同一個交換機發送訊息,則可以在模板本身上設定 exchange
屬性。在這種情況下,您可以使用前面列表中的第二個方法。以下示例在功能上等同於上一個示例
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果模板上同時設定了 exchange
和 routingKey
屬性,則可以使用只接受 Message
的方法。以下示例展示瞭如何執行此操作
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
更好的理解交換機和路由鍵屬性的方式是,明確的方法引數總是覆蓋模板的預設值。事實上,即使您沒有在模板上明確設定這些屬性,預設值也總是存在。在兩種情況下,預設值都是空字串 String
,但這實際上是一個合理的預設值。就路由鍵而言,它並非總是必需的(例如,對於 Fanout
交換機)。此外,佇列可以繫結到一個空字串 String
的交換機上。這些都是依賴於模板路由鍵屬性的預設空字串 String
值的合法場景。就交換機名稱而言,空字串 String
通常被使用,因為 AMQP 規範將“預設交換機”定義為沒有名稱。由於所有佇列都自動繫結到該預設交換機(這是一個 direct 交換機),並使用它們的名稱作為繫結值,因此可以使用前面列表中的第二個方法透過預設交換機向任何佇列進行簡單的點對點訊息傳遞。您可以在執行時透過提供方法引數來將佇列名稱作為 routingKey
。以下示例展示瞭如何執行此操作
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以建立一個主要或專門用於向單個佇列釋出訊息的模板。以下示例展示瞭如何執行此操作
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
訊息構建器 API
從 1.3 版本開始,MessageBuilder
和 MessagePropertiesBuilder
提供了訊息構建器 API。這些方法提供了建立訊息或訊息屬性的方便的“流暢”方式。以下示例展示了流暢 API 的實際應用
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
MessageProperties
上定義的每個屬性都可以設定。其他方法包括 setHeader(String key, String value)
、removeHeader(String key)
、removeHeaders()
和 copyProperties(MessageProperties properties)
。每個屬性設定方法都有一個 set*IfAbsent()
變體。在存在預設初始值的情況下,方法名為 set*IfAbsentOrDefault()
。
提供了五個靜態方法來建立初始訊息構建器
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 構建器建立的訊息體是對引數的直接引用。 |
2 | 構建器建立的訊息體是一個新陣列,包含引數位元組的副本。 |
3 | 構建器建立的訊息體是一個新陣列,包含引數中指定範圍的位元組。更多詳情請參閱 Arrays.copyOfRange() 。 |
4 | 構建器建立的訊息體是對引數訊息體的直接引用。引數的屬性被複制到一個新的 MessageProperties 物件中。 |
5 | 構建器建立的訊息體是一個新陣列,包含引數訊息體的副本。引數的屬性被複制到一個新的 MessageProperties 物件中。 |
提供了三個靜態方法來建立 MessagePropertiesBuilder
例項
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 使用預設值初始化新的訊息屬性物件。 |
2 | 構建器使用提供的屬性物件進行初始化,並且 build() 將返回該物件。 |
3 | 引數的屬性被複制到一個新的 MessageProperties 物件中。 |
對於 AmqpTemplate
的 RabbitTemplate
實現,每個 send()
方法都有一個過載版本,它接受一個額外的 CorrelationData
物件。啟用釋出者確認時,此物件在 AmqpTemplate
中描述的回撥中返回。這使得傳送方可以將確認(ack
或 nack
)與傳送的訊息關聯起來。
從 1.6.7 版本開始,引入了 CorrelationAwareMessagePostProcessor
介面,允許在訊息轉換後修改關聯資料。以下示例展示瞭如何使用它
Message postProcessMessage(Message message, Correlation correlation);
在 2.0 版本中,此介面已棄用。該方法已移至 MessagePostProcessor
,其預設實現委託給 postProcessMessage(Message message)
。
同樣從 1.6.7 版本開始,提供了一個名為 CorrelationDataPostProcessor
的新回撥介面。在所有 MessagePostProcessor
例項(在 send()
方法中提供的以及在 setBeforePublishPostProcessors()
中提供的)之後呼叫此介面。實現可以更新或替換在 send()
方法中提供的關聯資料(如果有)。Message
和原始 CorrelationData
(如果有)作為引數提供。以下示例展示瞭如何使用 postProcess
方法
CorrelationData postProcess(Message message, CorrelationData correlationData);
釋出者返回
當模板的 mandatory
屬性為 true
時,AmqpTemplate
中描述的回撥將提供返回的訊息。
從 1.4 版本開始,RabbitTemplate
支援 SpEL mandatoryExpression
屬性,該屬性針對每個請求訊息作為根評估物件進行評估,解析為布林值 boolean
。可以在表示式中使用 Bean 引用,例如 @myBean.isMandatory(#root)
。
RabbitTemplate
在傳送和接收操作中也可以內部使用釋出者返回。更多資訊請參閱 回覆超時。
批次處理
版本 1.4.2 引入了 BatchingRabbitTemplate
。它是 RabbitTemplate
的一個子類,其 send
方法被重寫,根據 BatchingStrategy
批次處理訊息。只有當批次完成後,訊息才會被髮送到 RabbitMQ。以下清單顯示了 BatchingStrategy
介面定義
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批次資料儲存在記憶體中。系統故障時,未傳送的訊息可能會丟失。 |
提供了一個 SimpleBatchingStrategy
。它支援將訊息傳送到單個交換機或路由鍵。它具有以下屬性
-
batchSize
: 在傳送之前,批次中的訊息數量。 -
bufferLimit
: 批次訊息的最大大小。如果超過此限制,即使未達到batchSize
,也會導致傳送部分批次。 -
timeout
: 在沒有新活動向批次新增訊息時,經過此時間後會傳送部分批次。
SimpleBatchingStrategy
透過在每個嵌入訊息前加上一個四位元組二進位制長度來格式化批次。透過將 springBatchFormat
訊息屬性設定為 lengthHeader4
,將此資訊傳達給接收系統。
批次訊息預設由監聽器容器自動解批次處理(透過使用 springBatchFormat 訊息頭)。拒絕批次中的任何訊息都會導致整個批次被拒絕。 |
但是,有關更多資訊,請參閱 @RabbitListener 批次處理。