配置 Broker
AMQP 規範描述瞭如何在 broker 上配置佇列、交換機和繫結。這些操作(可從 0.8 規範及更高版本移植)存在於 org.springframework.amqp.core
包中的 AmqpAdmin
介面中。該類的 RabbitMQ 實現是位於 org.springframework.amqp.rabbit.core
包中的 RabbitAdmin
。
AmqpAdmin
介面基於使用 Spring AMQP 領域抽象,如下所示:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另請參閱 Scoped Operations。
getQueueProperties()
方法返回有關佇列的一些有限資訊(訊息計數和消費者計數)。返回屬性的鍵在 RabbitAdmin
中作為常量提供(QUEUE_NAME
、QUEUE_MESSAGE_COUNT
和 QUEUE_CONSUMER_COUNT
)。RabbitMQ REST API 在 QueueInfo
物件中提供了更多資訊。
無引數的 declareQueue()
方法在 broker 上定義一個自動生成名稱的佇列。此自動生成佇列的附加屬性是 exclusive=true
、autoDelete=true
和 durable=false
。
declareQueue(Queue queue)
方法接受一個 Queue
物件並返回已宣告佇列的名稱。如果提供的 Queue
的 name
屬性為空 String
,則 broker 會使用生成的名稱宣告佇列。該名稱將返回給呼叫者。該名稱也會新增到 Queue
的 actualName
屬性中。只有直接呼叫 RabbitAdmin
才能以程式設計方式使用此功能。當在 application context 中以宣告方式定義佇列時,使用 admin 進行自動宣告時,可以將 name 屬性設定為 ""
(空字串)。然後 broker 會建立名稱。從版本 2.1 開始,監聽器容器可以使用此型別的佇列。有關更多資訊,請參閱 Containers and Broker-Named queues。
這與 AnonymousQueue
不同,AnonymousQueue
中框架生成一個唯一的(UUID
)名稱,並將 durable
設定為 false
,exclusive
、autoDelete
設定為 true
。帶有空(或缺失)name
屬性的 <rabbit:queue/>
總是建立一個 AnonymousQueue
。
請參閱 AnonymousQueue
,瞭解為何 AnonymousQueue
優先於 broker 生成的佇列名稱,以及如何控制名稱的格式。從版本 2.1 開始,匿名佇列預設宣告時將引數 Queue.X_QUEUE_LEADER_LOCATOR
設定為 client-local
。這確保佇列宣告在應用程式連線到的節點上。宣告式佇列必須具有固定名稱,因為它們可能在上下文中的其他地方被引用,例如以下示例中所示的監聽器:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
此介面的 RabbitMQ 實現是 RabbitAdmin
,當使用 Spring XML 配置時,示例如下:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
當 CachingConnectionFactory
的快取模式是 CHANNEL
(預設)時,RabbitAdmin
實現會自動延遲宣告在同一個 ApplicationContext
中宣告的佇列、交換機和繫結。一旦開啟與 broker 的連線,這些元件就會被宣告。有一些名稱空間特性使得這非常方便——例如,在 Stocks 示例應用程式中,我們有以下內容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的示例中,我們使用匿名佇列(實際上,在內部,只是由框架而非 broker 生成名稱的佇列),並按 ID 引用它們。我們還可以宣告具有顯式名稱的佇列,這些名稱也用作它們在上下文中的 bean 定義的識別符號。以下示例配置了一個具有顯式名稱的佇列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同時提供 id 和 name 屬性。這使您可以透過獨立於佇列名稱的 ID 來引用佇列(例如,在繫結中)。它還允許使用標準的 Spring 功能(例如,用於佇列名稱的屬性佔位符和 SpEL 表示式)。當您使用 name 作為 bean 識別符號時,這些功能不可用。 |
佇列可以使用附加引數進行配置——例如 x-message-ttl
。當您使用名稱空間支援時,它們以引數名/引數值對的 Map
形式提供,這透過 <rabbit:queue-arguments>
元素定義。以下示例展示瞭如何進行配置:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
預設情況下,引數被假定為字串型別。對於其他型別的引數,您必須提供型別。以下示例展示瞭如何指定型別:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
提供混合型別引數時,必須為每個 entry 元素提供型別。以下示例展示瞭如何進行配置:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
使用 Spring Framework 3.2 及更高版本,可以更簡潔地宣告,如下所示:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
當您使用 Java 配置時,Queue.X_QUEUE_LEADER_LOCATOR
引數透過 Queue
類上的 setLeaderLocator()
方法作為一流屬性得到支援。從版本 2.1 開始,匿名佇列預設宣告時將此屬性設定為 client-local
。這確保佇列宣告在應用程式連線到的節點上。
RabbitMQ broker 不允許宣告具有不匹配引數的佇列。例如,如果一個 queue 已經存在且沒有 time to live 引數,而您嘗試使用(例如)key="x-message-ttl" value="100" 來宣告它,將會丟擲異常。 |
預設情況下,當發生任何異常時,RabbitAdmin
會立即停止處理所有宣告。這可能導致下游問題,例如監聽器容器因另一個佇列(在出錯的佇列之後定義)未宣告而初始化失敗。
可以透過在 RabbitAdmin
例項上將 ignore-declaration-exceptions
屬性設定為 true
來修改此行為。此選項指示 RabbitAdmin
記錄異常並繼續宣告其他元素。使用 Java 配置 RabbitAdmin
時,此屬性稱為 ignoreDeclarationExceptions
。這是一個全域性設定,適用於所有元素。佇列、交換機和繫結具有僅適用於這些元素的類似屬性。
在版本 1.6 之前,此屬性僅在通道上發生 IOException
時才生效,例如當前屬性與期望屬性不匹配時。現在,此屬性對任何異常都生效,包括 TimeoutException
及其他異常。
此外,任何宣告異常都會導致釋出 DeclarationExceptionEvent
,它是一個 ApplicationEvent
,可由上下文中的任何 ApplicationListener
消費。該事件包含對 admin、正在宣告的元素和 Throwable
的引用。
Header 交換機
從版本 1.3 開始,您可以配置 HeadersExchange
以匹配多個 header。您還可以指定是否必須匹配任意或所有 header。以下示例展示瞭如何進行配置:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
從版本 1.6 開始,您可以配置帶有 internal
標誌(預設為 false
)的 Exchanges
,並且此類 Exchange
透過 RabbitAdmin
(如果存在於 application context 中)在 Broker 上正確配置。如果交換機的 internal
標誌為 true
,RabbitMQ 不允許客戶端使用該交換機。這對於死信交換機或交換機到交換機的繫結非常有用,在這種情況下,您不希望釋出者直接使用該交換機。
要了解如何使用 Java 配置 AMQP 基礎設施,請檢視 Stock 示例應用程式,其中有一個 @Configuration
類 AbstractStockRabbitConfiguration
,它又包含 RabbitClientConfiguration
和 RabbitServerConfiguration
子類。以下列表顯示了 AbstractStockRabbitConfiguration
的程式碼:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 應用程式中,伺服器使用以下 @Configuration
類進行配置:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
這是 @Configuration
類整個繼承鏈的末端。最終結果是 TopicExchange
和 Queue
在應用程式啟動時被宣告到 broker。server 配置中沒有將 TopicExchange
繫結到佇列,因為這在 client 應用程式中完成。然而,stock request queue 會自動繫結到 AMQP 預設交換機。此行為由規範定義。
client @Configuration
類更有趣一些。其宣告如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
client 透過 AmqpAdmin
上的 declareQueue()
方法宣告另一個佇列。它將該佇列繫結到 market data exchange,並使用在 properties 檔案中外部化的 routing pattern。
佇列和交換機的 Builder API
版本 1.6 引入了一種方便的 fluent API,用於在使用 Java 配置時配置 Queue
和 Exchange
物件。以下示例展示瞭如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有關更多資訊,請參閱 org.springframework.amqp.core.QueueBuilder
和 org.springframework.amqp.core.ExchangeBuilder
的 Javadoc。
從版本 2.0 開始,ExchangeBuilder
現在預設建立 durable exchanges,這與各個 AbstractExchange
類上的簡單建構函式保持一致。要使用 builder 建立 non-durable exchange,請在呼叫 .build()
之前使用 .durable(false)
。不再提供無引數的 durable()
方法。
版本 2.2 引入了 fluent API,用於新增“已知”的 exchange 和 queue 引數...
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
宣告 Exchanges, Queues 和 Bindings 的集合
您可以將 Declarable
物件(Queue
、Exchange
和 Binding
)的集合包裝在 Declarables
物件中。RabbitAdmin
檢測 application context 中的此類 bean(以及離散的 Declarable
bean),並在建立連線(初始連線和連線失敗後)時將包含的物件宣告到 broker。以下示例展示瞭如何進行配置:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在版本 2.1 之前,您可以透過定義 Collection<Declarable> 型別的 bean 來宣告多個 Declarable 例項。這在某些情況下可能導致不良副作用,因為 admin 必須迭代所有 Collection<?> bean。 |
版本 2.2 向 Declarables
添加了 getDeclarablesByType
方法;例如,在宣告監聽器容器 bean 時,這可以作為一種便利。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
條件宣告
預設情況下,application context 中的所有 RabbitAdmin
例項(假設它們具有 auto-startup="true"
)都會宣告所有佇列、交換機和繫結。
從版本 2.1.9 開始,RabbitAdmin
有一個新屬性 explicitDeclarationsOnly
(預設值為 false
);當將其設定為 true
時,admin 將僅宣告明確配置由該 admin 宣告的 bean。
從 1.2 版本開始,您可以有條件地宣告這些元素。當應用程式連線到多個 broker 並需要指定特定元素應在哪些 broker 上宣告時,這特別有用。 |
表示這些元素的類實現了 Declarable
介面,該介面有兩個方法:shouldDeclare()
和 getDeclaringAdmins()
。RabbitAdmin
使用這些方法來確定特定例項是否應實際處理其 Connection
上的宣告。
屬性在名稱空間中作為屬性提供,示例如下:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
預設情況下,auto-declare 屬性為 true ,如果 declared-by 未提供(或為空),則所有 RabbitAdmin 例項都會宣告物件(只要 admin 的 auto-startup 屬性為 true (預設值),並且 admin 的 explicit-declarations-only 屬性為 false)。 |
同樣,您可以使用基於 Java 的 @Configuration
來實現相同的效果。在以下示例中,元件由 admin1
宣告,但不由 admin2
宣告:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
關於 id
和 name
屬性的注意事項
<rabbit:queue/>
和 <rabbit:exchange/>
元素上的 name
屬性反映了實體在 broker 中的名稱。對於佇列,如果 name 被省略,則建立一個匿名佇列(請參閱 AnonymousQueue
)。
在版本 2.0 之前,name
也被註冊為 bean 名稱別名(類似於 <bean/>
元素上的 name
)。
這導致了兩個問題:
-
它阻止了宣告具有相同名稱的佇列和交換機。
-
如果別名包含 SpEL 表示式(
#{…}
),則無法解析。
從版本 2.0 開始,如果您宣告這些元素之一時同時帶有 id
和 name
屬性,則 name 不再宣告為 bean 名稱別名。如果您希望宣告具有相同 name 的佇列和交換機,則必須提供 id
。
如果元素僅具有 name
屬性,則沒有變化。bean 仍然可以按 name 引用——例如,在繫結宣告中。但是,如果 name 包含 SpEL,您仍然無法引用它——您必須提供一個 id
用於引用。
AnonymousQueue
通常,當您需要一個唯一命名、獨佔、自動刪除的佇列時,我們建議您使用 AnonymousQueue
而不是 broker 定義的佇列名稱(將 ""
用作 Queue
名稱會導致 broker 生成佇列名稱)。
這是因為:
-
佇列實際上是在與 broker 建立連線時宣告的。這發生在 bean 建立和裝配在一起之後很長時間。使用佇列的 bean 需要知道其名稱。事實上,當應用程式啟動時,broker 可能甚至沒有執行。
-
如果由於某種原因與 broker 的連線丟失,admin 會以相同的名稱重新宣告
AnonymousQueue
。如果使用 broker 宣告的佇列,則佇列名稱會更改。
您可以控制 AnonymousQueue
例項使用的佇列名稱的格式。
預設情況下,佇列名稱以 spring.gen-
為字首,後跟 UUID
的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。
您可以在建構函式引數中提供 AnonymousQueue.NamingStrategy
實現。以下示例展示瞭如何進行配置:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一個 bean 生成以 spring.gen-
為字首,後跟 UUID
的 base64 表示的佇列名稱——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二個 bean 生成以 something-
為字首,後跟 UUID
的 base64 表示的佇列名稱。第三個 bean 僅使用 UUID
(無 base64 轉換)生成名稱——例如:f20c818a-006b-4416-bf91-643590fedb0e
。
base64 編碼使用 RFC 4648 中的“URL 和檔名安全字母表”。末尾的填充字元(=
)被移除。
您可以提供自己的命名策略,從而可以在佇列名稱中包含其他資訊(例如應用程式名稱或客戶端主機)。
使用 XML 配置時,可以指定命名策略。<rabbit:queue>
元素上存在 naming-strategy
屬性,用於實現 AnonymousQueue.NamingStrategy
的 bean 引用。以下示例展示瞭如何以各種方式指定命名策略:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一個示例建立諸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g
的名稱。第二個示例建立使用 UUID
的 String 表示的名稱。第三個示例建立諸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g
的名稱。
您還可以提供自己的命名策略 bean。
從版本 2.1 開始,匿名佇列預設宣告時將引數 Queue.X_QUEUE_LEADER_LOCATOR
設定為 client-local
。這確保佇列宣告在應用程式連線到的節點上。在構建例項後,可以透過呼叫 queue.setLeaderLocator(null)
恢復到先前的行為。
恢復自動刪除宣告
通常,RabbitAdmin
僅恢復在 application context 中宣告為 bean 的佇列/交換機/繫結;如果任何此類宣告是自動刪除的,則在連線丟失時會被 broker 移除。重新建立連線後,admin 將重新宣告這些實體。通常,透過呼叫 admin.declareQueue(…)
、admin.declareExchange(…)
和 admin.declareBinding(…)
建立的實體不會被恢復。
從版本 2.4 開始,admin 有一個新屬性 redeclareManualDeclarations
;當設定為 true
時,admin 將在恢復 application context 中的 bean 的同時恢復這些實體。
如果呼叫了 deleteQueue(…)
、deleteExchange(…)
或 removeBinding(…)
,則不會執行單個宣告的恢復。刪除佇列和交換機時,關聯的繫結會從可恢復實體中移除。
最後,呼叫 resetAllManualDeclarations()
將阻止恢復任何先前宣告的實體。