配置 Broker

AMQP 規範描述瞭如何在 broker 上配置佇列、交換機和繫結。這些操作(可從 0.8 規範及更高版本移植)存在於 org.springframework.amqp.core 包中的 AmqpAdmin 介面中。該類的 RabbitMQ 實現是位於 org.springframework.amqp.rabbit.core 包中的 RabbitAdmin

AmqpAdmin 介面基於使用 Spring AMQP 領域抽象,如下所示:

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另請參閱 Scoped Operations

getQueueProperties() 方法返回有關佇列的一些有限資訊(訊息計數和消費者計數)。返回屬性的鍵在 RabbitAdmin 中作為常量提供(QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。RabbitMQ REST APIQueueInfo 物件中提供了更多資訊。

無引數的 declareQueue() 方法在 broker 上定義一個自動生成名稱的佇列。此自動生成佇列的附加屬性是 exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue) 方法接受一個 Queue 物件並返回已宣告佇列的名稱。如果提供的 Queuename 屬性為空 String,則 broker 會使用生成的名稱宣告佇列。該名稱將返回給呼叫者。該名稱也會新增到 QueueactualName 屬性中。只有直接呼叫 RabbitAdmin 才能以程式設計方式使用此功能。當在 application context 中以宣告方式定義佇列時,使用 admin 進行自動宣告時,可以將 name 屬性設定為 ""(空字串)。然後 broker 會建立名稱。從版本 2.1 開始,監聽器容器可以使用此型別的佇列。有關更多資訊,請參閱 Containers and Broker-Named queues

這與 AnonymousQueue 不同,AnonymousQueue 中框架生成一個唯一的(UUID)名稱,並將 durable 設定為 falseexclusiveautoDelete 設定為 true。帶有空(或缺失)name 屬性的 <rabbit:queue/> 總是建立一個 AnonymousQueue

請參閱 AnonymousQueue,瞭解為何 AnonymousQueue 優先於 broker 生成的佇列名稱,以及如何控制名稱的格式。從版本 2.1 開始,匿名佇列預設宣告時將引數 Queue.X_QUEUE_LEADER_LOCATOR 設定為 client-local。這確保佇列宣告在應用程式連線到的節點上。宣告式佇列必須具有固定名稱,因為它們可能在上下文中的其他地方被引用,例如以下示例中所示的監聽器:

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此介面的 RabbitMQ 實現是 RabbitAdmin,當使用 Spring XML 配置時,示例如下:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory 的快取模式是 CHANNEL(預設)時,RabbitAdmin 實現會自動延遲宣告在同一個 ApplicationContext 中宣告的佇列、交換機和繫結。一旦開啟與 broker 的連線,這些元件就會被宣告。有一些名稱空間特性使得這非常方便——例如,在 Stocks 示例應用程式中,我們有以下內容:

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我們使用匿名佇列(實際上,在內部,只是由框架而非 broker 生成名稱的佇列),並按 ID 引用它們。我們還可以宣告具有顯式名稱的佇列,這些名稱也用作它們在上下文中的 bean 定義的識別符號。以下示例配置了一個具有顯式名稱的佇列:

<rabbit:queue name="stocks.trade.queue"/>
您可以同時提供 idname 屬性。這使您可以透過獨立於佇列名稱的 ID 來引用佇列(例如,在繫結中)。它還允許使用標準的 Spring 功能(例如,用於佇列名稱的屬性佔位符和 SpEL 表示式)。當您使用 name 作為 bean 識別符號時,這些功能不可用。

佇列可以使用附加引數進行配置——例如 x-message-ttl。當您使用名稱空間支援時,它們以引數名/引數值對的 Map 形式提供,這透過 <rabbit:queue-arguments> 元素定義。以下示例展示瞭如何進行配置:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

預設情況下,引數被假定為字串型別。對於其他型別的引數,您必須提供型別。以下示例展示瞭如何指定型別:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

提供混合型別引數時,必須為每個 entry 元素提供型別。以下示例展示瞭如何進行配置:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用 Spring Framework 3.2 及更高版本,可以更簡潔地宣告,如下所示:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

當您使用 Java 配置時,Queue.X_QUEUE_LEADER_LOCATOR 引數透過 Queue 類上的 setLeaderLocator() 方法作為一流屬性得到支援。從版本 2.1 開始,匿名佇列預設宣告時將此屬性設定為 client-local。這確保佇列宣告在應用程式連線到的節點上。

RabbitMQ broker 不允許宣告具有不匹配引數的佇列。例如,如果一個 queue 已經存在且沒有 time to live 引數,而您嘗試使用(例如)key="x-message-ttl" value="100" 來宣告它,將會丟擲異常。

預設情況下,當發生任何異常時,RabbitAdmin 會立即停止處理所有宣告。這可能導致下游問題,例如監聽器容器因另一個佇列(在出錯的佇列之後定義)未宣告而初始化失敗。

可以透過在 RabbitAdmin 例項上將 ignore-declaration-exceptions 屬性設定為 true 來修改此行為。此選項指示 RabbitAdmin 記錄異常並繼續宣告其他元素。使用 Java 配置 RabbitAdmin 時,此屬性稱為 ignoreDeclarationExceptions。這是一個全域性設定,適用於所有元素。佇列、交換機和繫結具有僅適用於這些元素的類似屬性。

在版本 1.6 之前,此屬性僅在通道上發生 IOException 時才生效,例如當前屬性與期望屬性不匹配時。現在,此屬性對任何異常都生效,包括 TimeoutException 及其他異常。

此外,任何宣告異常都會導致釋出 DeclarationExceptionEvent,它是一個 ApplicationEvent,可由上下文中的任何 ApplicationListener 消費。該事件包含對 admin、正在宣告的元素和 Throwable 的引用。

Header 交換機

從版本 1.3 開始,您可以配置 HeadersExchange 以匹配多個 header。您還可以指定是否必須匹配任意或所有 header。以下示例展示瞭如何進行配置:

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

從版本 1.6 開始,您可以配置帶有 internal 標誌(預設為 false)的 Exchanges,並且此類 Exchange 透過 RabbitAdmin(如果存在於 application context 中)在 Broker 上正確配置。如果交換機的 internal 標誌為 true,RabbitMQ 不允許客戶端使用該交換機。這對於死信交換機或交換機到交換機的繫結非常有用,在這種情況下,您不希望釋出者直接使用該交換機。

要了解如何使用 Java 配置 AMQP 基礎設施,請檢視 Stock 示例應用程式,其中有一個 @ConfigurationAbstractStockRabbitConfiguration,它又包含 RabbitClientConfigurationRabbitServerConfiguration 子類。以下列表顯示了 AbstractStockRabbitConfiguration 的程式碼:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 應用程式中,伺服器使用以下 @Configuration 類進行配置:

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

這是 @Configuration 類整個繼承鏈的末端。最終結果是 TopicExchangeQueue 在應用程式啟動時被宣告到 broker。server 配置中沒有將 TopicExchange 繫結到佇列,因為這在 client 應用程式中完成。然而,stock request queue 會自動繫結到 AMQP 預設交換機。此行為由規範定義。

client @Configuration 類更有趣一些。其宣告如下:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

client 透過 AmqpAdmin 上的 declareQueue() 方法宣告另一個佇列。它將該佇列繫結到 market data exchange,並使用在 properties 檔案中外部化的 routing pattern。

佇列和交換機的 Builder API

版本 1.6 引入了一種方便的 fluent API,用於在使用 Java 配置時配置 QueueExchange 物件。以下示例展示瞭如何使用它:

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

從版本 2.0 開始,ExchangeBuilder 現在預設建立 durable exchanges,這與各個 AbstractExchange 類上的簡單建構函式保持一致。要使用 builder 建立 non-durable exchange,請在呼叫 .build() 之前使用 .durable(false)。不再提供無引數的 durable() 方法。

版本 2.2 引入了 fluent API,用於新增“已知”的 exchange 和 queue 引數...

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

宣告 Exchanges, Queues 和 Bindings 的集合

您可以將 Declarable 物件(QueueExchangeBinding)的集合包裝在 Declarables 物件中。RabbitAdmin 檢測 application context 中的此類 bean(以及離散的 Declarable bean),並在建立連線(初始連線和連線失敗後)時將包含的物件宣告到 broker。以下示例展示瞭如何進行配置:

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在版本 2.1 之前,您可以透過定義 Collection<Declarable> 型別的 bean 來宣告多個 Declarable 例項。這在某些情況下可能導致不良副作用,因為 admin 必須迭代所有 Collection<?> bean。

版本 2.2 向 Declarables 添加了 getDeclarablesByType 方法;例如,在宣告監聽器容器 bean 時,這可以作為一種便利。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

條件宣告

預設情況下,application context 中的所有 RabbitAdmin 例項(假設它們具有 auto-startup="true")都會宣告所有佇列、交換機和繫結。

從版本 2.1.9 開始,RabbitAdmin 有一個新屬性 explicitDeclarationsOnly(預設值為 false);當將其設定為 true 時,admin 將僅宣告明確配置由該 admin 宣告的 bean。

從 1.2 版本開始,您可以有條件地宣告這些元素。當應用程式連線到多個 broker 並需要指定特定元素應在哪些 broker 上宣告時,這特別有用。

表示這些元素的類實現了 Declarable 介面,該介面有兩個方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin 使用這些方法來確定特定例項是否應實際處理其 Connection 上的宣告。

屬性在名稱空間中作為屬性提供,示例如下:

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
預設情況下,auto-declare 屬性為 true,如果 declared-by 未提供(或為空),則所有 RabbitAdmin 例項都會宣告物件(只要 admin 的 auto-startup 屬性為 true(預設值),並且 admin 的 explicit-declarations-only 屬性為 false)。

同樣,您可以使用基於 Java 的 @Configuration 來實現相同的效果。在以下示例中,元件由 admin1 宣告,但不由 admin2 宣告:

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

關於 idname 屬性的注意事項

<rabbit:queue/><rabbit:exchange/> 元素上的 name 屬性反映了實體在 broker 中的名稱。對於佇列,如果 name 被省略,則建立一個匿名佇列(請參閱 AnonymousQueue)。

在版本 2.0 之前,name 也被註冊為 bean 名稱別名(類似於 <bean/> 元素上的 name)。

這導致了兩個問題:

  • 它阻止了宣告具有相同名稱的佇列和交換機。

  • 如果別名包含 SpEL 表示式(#{…​}),則無法解析。

從版本 2.0 開始,如果您宣告這些元素之一時同時帶有 idname 屬性,則 name 不再宣告為 bean 名稱別名。如果您希望宣告具有相同 name 的佇列和交換機,則必須提供 id

如果元素僅具有 name 屬性,則沒有變化。bean 仍然可以按 name 引用——例如,在繫結宣告中。但是,如果 name 包含 SpEL,您仍然無法引用它——您必須提供一個 id 用於引用。

AnonymousQueue

通常,當您需要一個唯一命名、獨佔、自動刪除的佇列時,我們建議您使用 AnonymousQueue 而不是 broker 定義的佇列名稱(將 "" 用作 Queue 名稱會導致 broker 生成佇列名稱)。

這是因為:

  1. 佇列實際上是在與 broker 建立連線時宣告的。這發生在 bean 建立和裝配在一起之後很長時間。使用佇列的 bean 需要知道其名稱。事實上,當應用程式啟動時,broker 可能甚至沒有執行。

  2. 如果由於某種原因與 broker 的連線丟失,admin 會以相同的名稱重新宣告 AnonymousQueue。如果使用 broker 宣告的佇列,則佇列名稱會更改。

您可以控制 AnonymousQueue 例項使用的佇列名稱的格式。

預設情況下,佇列名稱以 spring.gen- 為字首,後跟 UUID 的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在建構函式引數中提供 AnonymousQueue.NamingStrategy 實現。以下示例展示瞭如何進行配置:

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一個 bean 生成以 spring.gen- 為字首,後跟 UUID 的 base64 表示的佇列名稱——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二個 bean 生成以 something- 為字首,後跟 UUID 的 base64 表示的佇列名稱。第三個 bean 僅使用 UUID(無 base64 轉換)生成名稱——例如:f20c818a-006b-4416-bf91-643590fedb0e

base64 編碼使用 RFC 4648 中的“URL 和檔名安全字母表”。末尾的填充字元(=)被移除。

您可以提供自己的命名策略,從而可以在佇列名稱中包含其他資訊(例如應用程式名稱或客戶端主機)。

使用 XML 配置時,可以指定命名策略。<rabbit:queue> 元素上存在 naming-strategy 屬性,用於實現 AnonymousQueue.NamingStrategy 的 bean 引用。以下示例展示瞭如何以各種方式指定命名策略:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一個示例建立諸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g 的名稱。第二個示例建立使用 UUID 的 String 表示的名稱。第三個示例建立諸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g 的名稱。

您還可以提供自己的命名策略 bean。

從版本 2.1 開始,匿名佇列預設宣告時將引數 Queue.X_QUEUE_LEADER_LOCATOR 設定為 client-local。這確保佇列宣告在應用程式連線到的節點上。在構建例項後,可以透過呼叫 queue.setLeaderLocator(null) 恢復到先前的行為。

恢復自動刪除宣告

通常,RabbitAdmin 僅恢復在 application context 中宣告為 bean 的佇列/交換機/繫結;如果任何此類宣告是自動刪除的,則在連線丟失時會被 broker 移除。重新建立連線後,admin 將重新宣告這些實體。通常,透過呼叫 admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​) 建立的實體不會被恢復。

從版本 2.4 開始,admin 有一個新屬性 redeclareManualDeclarations;當設定為 true 時,admin 將在恢復 application context 中的 bean 的同時恢復這些實體。

如果呼叫了 deleteQueue(…​)deleteExchange(…​)removeBinding(…​),則不會執行單個宣告的恢復。刪除佇列和交換機時,關聯的繫結會從可恢復實體中移除。

最後,呼叫 resetAllManualDeclarations() 將阻止恢復任何先前宣告的實體。