配置 Broker

AMQP規範描述瞭如何使用協議在Broker上配置佇列、交換器和繫結。這些操作(從0.8規範及更高版本均可移植)存在於org.springframework.amqp.core包中的AmqpAdmin介面中。該類的RabbitMQ實現是位於org.springframework.amqp.rabbit.core包中的RabbitAdmin

AmqpAdmin介面基於使用Spring AMQP領域抽象,如下所示:

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    boolean deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

    QueueInformation getQueueInfo(String queueName);

}

另請參閱作用域操作

getQueueProperties()方法返回有關佇列的一些有限資訊(訊息計數和消費者計數)。返回屬性的鍵在RabbitAdmin中以常量形式提供(QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。getQueueInfo()返回一個方便的QueueInformation資料物件。

無引數的declareQueue()方法在Broker上定義一個自動生成名稱的佇列。此自動生成佇列的附加屬性是exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue)方法接受一個Queue物件並返回已宣告佇列的名稱。如果提供的Queuename屬性為空String,則Broker將以生成的名稱宣告佇列。該名稱將返回給呼叫者。該名稱也新增到QueueactualName屬性中。您只能透過直接呼叫RabbitAdmin以程式設計方式使用此功能。當在應用程式上下文中宣告性地定義佇列時,如果使用Admin的自動宣告功能,您可以將名稱屬性設定為""(空字串)。然後Broker將建立名稱。從版本2.1開始,監聽器容器可以使用這種型別的佇列。有關更多資訊,請參閱容器和Broker命名佇列

這與AnonymousQueue形成對比,在AnonymousQueue中,框架生成一個唯一的(UUID)名稱,並將durable設定為false,將exclusiveautoDelete設定為true。一個<rabbit:queue/>,如果其name屬性為空(或缺失),則始終會建立一個AnonymousQueue

請參閱AnonymousQueue,以瞭解為什麼AnonymousQueue優於Broker生成的佇列名稱,以及如何控制名稱的格式。從版本2.1開始,匿名佇列預設宣告時帶有引數Queue.X_QUEUE_LEADER_LOCATOR,並設定為client-local。這確保了佇列在應用程式連線到的節點上宣告。宣告式佇列必須具有固定名稱,因為它們可能在上下文中的其他地方被引用——例如在以下示例中所示的監聽器中:

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此介面的RabbitMQ實現是RabbitAdmin,當使用Spring XML配置時,它類似於以下示例:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory快取模式為CHANNEL(預設)時,RabbitAdmin實現會自動延遲宣告在同一ApplicationContext中宣告的佇列、交換器和繫結。一旦與Broker建立Connection,這些元件就會被宣告。有一些名稱空間功能使得這非常方便——例如,在Stocks示例應用程式中,我們有以下內容:

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我們使用匿名佇列(實際上,內部只是框架生成的帶有名稱的佇列,而不是由Broker生成的)並透過ID引用它們。我們也可以宣告具有顯式名稱的佇列,這些名稱也用作其在上下文中的bean定義的識別符號。以下示例配置了一個具有顯式名稱的佇列:

<rabbit:queue name="stocks.trade.queue"/>
您可以同時提供idname屬性。這允許您透過與佇列名稱無關的ID來引用佇列(例如,在繫結中)。它還允許使用標準Spring功能(例如屬性佔位符和SpEL表示式用於佇列名稱)。當您使用名稱作為bean識別符號時,這些功能不可用。

佇列可以配置額外的引數——例如,x-message-ttl。當您使用名稱空間支援時,它們以引數名稱/引數值對的Map形式提供,這些由<rabbit:queue-arguments>元素定義。以下示例展示瞭如何實現:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

預設情況下,引數被假定為字串。對於其他型別的引數,您必須提供型別。以下示例展示瞭如何指定型別:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

當提供混合型別的引數時,您必須為每個條目元素提供型別。以下示例展示瞭如何實現:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

從Spring Framework 3.2及更高版本開始,這可以更簡潔地宣告,如下所示:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

當使用Java配置時,Queue.X_QUEUE_LEADER_LOCATOR引數透過Queue類上的setLeaderLocator()方法作為一流屬性得到支援。從版本2.1開始,匿名佇列預設宣告時此屬性設定為client-local。這確保了佇列在應用程式連線到的節點上宣告。

RabbitMQ Broker不允許宣告具有不匹配引數的佇列。例如,如果一個queue已經存在且沒有time to live引數,而您嘗試使用(例如)key="x-message-ttl" value="100"來宣告它,則會丟擲異常。

預設情況下,當發生任何異常時,RabbitAdmin會立即停止處理所有宣告。這可能導致下游問題,例如監聽器容器因另一個佇列(在出錯的佇列之後定義)未宣告而未能初始化。

可以透過將RabbitAdmin例項上的ignore-declaration-exceptions屬性設定為true來修改此行為。此選項指示RabbitAdmin記錄異常並繼續宣告其他元素。當使用Java配置RabbitAdmin時,此屬性名為ignoreDeclarationExceptions。這是一個全域性設定,適用於所有元素。佇列、交換器和繫結具有一個類似的屬性,僅適用於這些元素。

在版本1.6之前,此屬性僅在通道上發生IOException時生效,例如噹噹前屬性和所需屬性不匹配時。現在,此屬性對任何異常都生效,包括TimeoutException等。

此外,任何宣告異常都會導致釋出DeclarationExceptionEvent,這是一個ApplicationEvent,可以由上下文中的任何ApplicationListener消費。該事件包含對admin、正在宣告的元素和Throwable的引用。

Header交換器

從版本1.3開始,您可以配置HeadersExchange以匹配多個header。您還可以指定是否必須匹配任何或所有header。以下示例展示瞭如何實現:

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

從版本1.6開始,您可以為Exchanges配置一個internal標誌(預設為false),並且這樣的Exchange透過RabbitAdmin(如果應用程式上下文存在)在Broker上正確配置。如果交換器的internal標誌為true,RabbitMQ將不允許客戶端使用該交換器。這對於死信交換器或交換器到交換器的繫結很有用,您不希望釋出者直接使用該交換器。

要了解如何使用Java配置AMQP基礎設施,請檢視Stock示例應用程式,其中有一個@ConfigurationAbstractStockRabbitConfiguration,該類又包含RabbitClientConfigurationRabbitServerConfiguration子類。以下列表顯示了AbstractStockRabbitConfiguration的程式碼:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在Stock應用程式中,伺服器使用以下@Configuration類配置:

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

這是@Configuration類整個繼承鏈的末端。最終結果是TopicExchangeQueue在應用程式啟動時被宣告到Broker。伺服器配置中沒有TopicExchange到佇列的繫結,因為這是在客戶端應用程式中完成的。然而,股票請求佇列會自動繫結到AMQP預設交換器。此行為由規範定義。

客戶端的@Configuration類更有趣一些。它的宣告如下:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客戶端透過AmqpAdmin上的declareQueue()方法宣告另一個佇列。它將該佇列繫結到市場資料交換器,路由模式在屬性檔案中外部化。

佇列和交換器的構建器API

版本1.6引入了一個方便的流式API,用於在使用Java配置時配置QueueExchange物件。以下示例展示瞭如何使用它:

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

從版本2.0開始,ExchangeBuilder現在預設建立持久交換器,以與各個AbstractExchange類上的簡單建構函式保持一致。要使用構建器建立非持久交換器,請在呼叫.build()之前使用.durable(false)。不帶引數的durable()方法不再提供。

版本2.2引入了流式API來新增“知名”交換器和佇列引數…

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

宣告交換器、佇列和繫結的集合

您可以將Declarable物件(QueueExchangeBinding)的集合包裝在Declarables物件中。RabbitAdmin會在應用程式上下文中檢測此類bean(以及離散的Declarable bean),並在建立連線(最初和連線失敗後)時在Broker上宣告包含的物件。以下示例展示瞭如何實現:

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在2.1版本之前,您可以透過定義Collection<Declarable>型別的bean來宣告多個Declarable例項。在某些情況下,這可能導致不良副作用,因為admin必須遍歷所有Collection<?> bean。

版本2.2為Declarables添加了getDeclarablesByType方法;這可以用作便利,例如在宣告監聽器容器bean時。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

條件宣告

預設情況下,所有佇列、交換器和繫結都由應用程式上下文中的所有RabbitAdmin例項宣告(假設它們具有auto-startup="true")。

從版本2.1.9開始,RabbitAdmin具有一個新屬性explicitDeclarationsOnly(預設值為false);當設定為true時,admin將僅宣告明確配置由該admin宣告的bean。

從1.2版本開始,您可以有條件地宣告這些元素。當應用程式連線到多個Broker並且需要指定特定元素應該在哪個Broker上宣告時,這特別有用。

代表這些元素的類實現了Declarable,它有兩個方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin使用這些方法來確定特定例項是否應該實際處理其Connection上的宣告。

這些屬性在名稱空間中以屬性形式提供,如以下示例所示:

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
預設情況下,auto-declare屬性為true,如果未提供declared-by(或為空),則所有RabbitAdmin例項都宣告該物件(只要admin的auto-startup屬性為true(預設值),並且admin的explicit-declarations-only屬性為false)。

同樣,您可以使用基於Java的@Configuration來實現相同的效果。在以下示例中,元件由admin1宣告,但不由admin2宣告:

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

關於idname屬性的注意事項

<rabbit:queue/><rabbit:exchange/>元素上的name屬性反映了實體在Broker中的名稱。對於佇列,如果省略name,則會建立一個匿名佇列(請參閱AnonymousQueue)。

在2.0版本之前,name也被註冊為bean名稱別名(類似於<bean/>元素上的name)。

這導致了兩個問題:

  • 它阻止了宣告具有相同名稱的佇列和交換器。

  • 如果別名包含SpEL表示式(#{…​}),則無法解析。

從版本2.0開始,如果您使用idname屬性宣告其中一個元素,則該名稱不再宣告為bean名稱別名。如果您希望宣告具有相同name的佇列和交換器,則必須提供一個id

如果元素只有一個name屬性,則沒有變化。bean仍然可以透過name引用——例如,在繫結宣告中。但是,如果名稱包含SpEL,您仍然無法引用它——您必須提供一個id以供引用。

AnonymousQueue

通常,當您需要一個唯一命名的、排他的、自動刪除的佇列時,我們建議您使用AnonymousQueue而不是Broker定義的佇列名稱(使用""作為Queue名稱會導致Broker生成佇列名稱)。

這是因為:

  1. 佇列實際上是在與Broker建立連線時宣告的。這比bean建立和連線在一起晚得多。使用佇列的bean需要知道它的名稱。實際上,在應用程式啟動時,Broker甚至可能沒有執行。

  2. 如果與Broker的連線由於某種原因丟失,admin將使用相同的名稱重新宣告AnonymousQueue。如果使用Broker宣告的佇列,佇列名稱將會改變。

您可以控制AnonymousQueue例項使用的佇列名稱的格式。

預設情況下,佇列名稱以spring.gen-為字首,後跟UUID的base64表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在建構函式引數中提供AnonymousQueue.NamingStrategy實現。以下示例展示瞭如何實現:

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一個bean生成以spring.gen-為字首的佇列名稱,後跟UUID的base64表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二個bean生成以something-為字首的佇列名稱,後跟UUID的base64表示。第三個bean僅使用UUID(無base64轉換)生成名稱——例如,f20c818a-006b-4416-bf91-643590fedb0e

base64編碼使用RFC 4648中的“URL和檔名安全字母表”。尾隨的填充字元(=)將被刪除。

您可以提供自己的命名策略,從而在佇列名稱中包含其他資訊(例如應用程式名稱或客戶端主機)。

您可以在使用XML配置時指定命名策略。naming-strategy屬性存在於<rabbit:queue>元素上,用於實現AnonymousQueue.NamingStrategy的bean引用。以下示例展示瞭如何以各種方式指定命名策略:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一個示例建立諸如spring.gen-MRBv9sqISkuCiPfOYfpo4g的名稱。第二個示例建立帶有UUID的字串表示的名稱。第三個示例建立諸如custom.gen-MRBv9sqISkuCiPfOYfpo4g的名稱。

您也可以提供自己的命名策略bean。

從版本2.1開始,匿名佇列預設宣告時帶有引數Queue.X_QUEUE_LEADER_LOCATOR,並設定為client-local。這確保了佇列在應用程式連線到的節點上宣告。您可以透過在構造例項後呼叫queue.setLeaderLocator(null)來恢復到之前的行為。

恢復自動刪除宣告

通常,RabbitAdmin(es)只恢復在應用程式上下文中宣告為bean的佇列/交換器/繫結;如果任何此類宣告是自動刪除的,則如果連線丟失,它們將被Broker刪除。當重新建立連線時,admin將重新宣告實體。通常,透過呼叫admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​)建立的實體將不會被恢復。

從版本2.4開始,admin有一個新屬性redeclareManualDeclarations;當設定為true時,admin除了恢復應用程式上下文中的bean之外,還會恢復這些實體。

如果呼叫了deleteQueue(…​)deleteExchange(…​)removeBinding(…​),則不會執行單個宣告的恢復。當佇列和交換器被刪除時,相關的繫結將從可恢復實體中移除。

最後,呼叫resetAllManualDeclarations()將阻止恢復任何先前宣告的實體。

© . This site is unofficial and not affiliated with VMware.