連線與資源管理

雖然我們在上一節中描述的 AMQP 模型是通用的,適用於所有實現,但當我們談到資源管理時,細節就取決於 Broker 的具體實現。因此,在本節中,我們專注於只存在於我們的“spring-rabbit”模組中的程式碼,因為目前只支援 RabbitMQ 實現。

用於管理與 RabbitMQ Broker 連線的核心元件是 ConnectionFactory 介面。ConnectionFactory 實現的職責是提供 org.springframework.amqp.rabbit.connection.Connection 的例項,它是 com.rabbitmq.client.Connection 的包裝類。

選擇連線工廠

有三種連線工廠可供選擇

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前兩種在版本 2.3 中新增。

對於大多數用例,應使用 CachingConnectionFactory。如果您想確保嚴格的訊息順序而無需使用 Scoped Operations,則可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactoryCachingConnectionFactory 類似,因為它使用單個連線和通道池。它的實現更簡單,但不支援關聯的釋出者確認。

所有這三種工廠都支援簡單的釋出者確認。

配置 RabbitTemplate 使用 單獨連線 時,從版本 2.3.2 開始,您可以配置釋出連線工廠為不同的型別。預設情況下,釋出工廠與主工廠型別相同,並且設定在主工廠上的任何屬性也會傳播到釋出工廠。

從版本 3.1 開始,AbstractConnectionFactory 包含 connectionCreatingBackOff 屬性,該屬性在連線模組中支援指數退避策略。目前,createChannel() 方法的行為支援處理達到 channelMax 限制時發生的異常,實現了基於嘗試次數和間隔的退避策略。

PooledChannelConnectionFactory

該工廠管理單個連線和兩個通道池,基於 Apache Pool2。一個池用於事務性通道,另一個用於非事務性通道。這些池是配置預設為 GenericObjectPool 的池;提供回撥以配置這些池;有關更多資訊,請參閱 Apache 文件。

要使用此工廠,必須將 Apache commons-pool2 jar 放在類路徑上。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

該工廠管理單個連線和兩個 ThreadLocal,一個用於事務性通道,另一個用於非事務性通道。此工廠確保同一執行緒上的所有操作都使用同一通道(只要它保持開啟狀態)。這有助於實現嚴格的訊息順序,而無需 Scoped Operations。為避免記憶體洩漏,如果您的應用程式使用許多生命週期短暫的執行緒,則必須呼叫工廠的 closeThreadChannel() 方法釋放通道資源。從版本 2.3.7 開始,執行緒可以將其通道轉移到另一個執行緒。有關更多資訊,請參閱 多執行緒環境下的嚴格訊息順序

CachingConnectionFactory

提供的第三種實現是 CachingConnectionFactory,它預設建立一個可供應用程式共享的單個連線代理。由於 AMQP 訊息傳遞的“工作單元”實際上是一個“通道”(在某些方面,這類似於 JMS 中連線和會話之間的關係),因此連線共享成為可能。連線例項提供了一個 createChannel 方法。CachingConnectionFactory 實現支援快取這些通道,並根據它們是否具有事務性來維護獨立的通道快取。建立 CachingConnectionFactory 例項時,您可以透過建構函式提供“hostname”。您還應該提供“username”和“password”屬性。要配置通道快取的大小(預設為 25),您可以呼叫 setChannelCacheSize() 方法。

從版本 1.3 開始,您可以配置 CachingConnectionFactory 同時快取連線和通道。在這種情況下,每次呼叫 createConnection() 都會建立一個新連線(或從快取中檢索一個空閒連線)。關閉連線會將其返回到快取(如果尚未達到快取大小)。在此類連線上建立的通道也會被快取。在某些環境中,例如從 HA 叢集消費時,結合負載均衡器連線到不同的叢集成員,以及其他情況下,使用單獨的連線可能很有用。要快取連線,請將 cacheMode 設定為 CacheMode.CONNECTION

這不會限制連線的數量。相反,它指定允許快取多少個空閒的開啟連線。

從版本 1.5.5 開始,提供了一個名為 connectionLimit 的新屬性。設定此屬性後,它會限制允許的總連線數。設定後,如果達到限制,將使用 channelCheckoutTimeLimit 等待連線變為空閒。如果超出時間,將丟擲 AmqpTimeoutException

當快取模式為 CONNECTION 時,不支援佇列等的自動宣告(請參閱 自動宣告交換、佇列和繫結)。

此外,在撰寫本文時,amqp-client 庫預設會為每個連線建立一個固定執行緒池(預設大小:Runtime.getRuntime().availableProcessors() * 2 個執行緒)。使用大量連線時,應考慮在 CachingConnectionFactory 上設定自定義 executor。這樣,所有連線都可以使用同一個執行器,並且其執行緒可以共享。執行器的執行緒池應該無界或根據預期用途進行適當設定(通常,每個連線至少一個執行緒)。如果在每個連線上建立多個通道,池大小會影響併發性,因此可變(或簡單的快取)執行緒池執行器將是最合適的。

重要的是要理解,快取大小(預設情況下)不是限制,而只是可以快取的通道數量。例如,快取大小為 10,實際上可以使用任意數量的通道。如果使用了超過 10 個通道並且它們都返回到快取,則 10 個進入快取。其餘的將被物理關閉。

從版本 1.6 開始,預設通道快取大小從 1 增加到 25。在高流量、多執行緒環境中,小快取意味著通道建立和關閉的速度很快。增加預設快取大小可以避免這種開銷。您應該透過 RabbitMQ 管理 UI 監控正在使用的通道,如果您看到大量通道被建立和關閉,請考慮進一步增加快取大小。快取僅在按需增長(以適應應用程式的併發要求),因此此更改不會影響現有的低流量應用程式。

從版本 1.4.2 開始,CachingConnectionFactory 有一個名為 channelCheckoutTimeout 的屬性。當此屬性大於零時,channelCacheSize 成為可以在連線上建立的通道數量的限制。如果達到限制,呼叫執行緒將阻塞,直到通道可用或達到此超時時間,此時將丟擲 AmqpTimeoutException

框架內使用的通道(例如,RabbitTemplate)可靠地返回到快取。如果您在框架外部建立通道(例如,透過直接訪問連線並呼叫 createChannel()),則必須可靠地(也許在 finally 塊中)關閉它們,以避免通道耗盡。

以下示例展示瞭如何建立新連線

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 時,配置可能如下例所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
框架的單元測試程式碼中還有一個 SingleConnectionFactory 實現。它比 CachingConnectionFactory 更簡單,因為它不快取通道,但由於其效能和彈性不足,不適合在簡單測試之外的實際應用中使用。如果您因某些原因需要實現自己的 ConnectionFactoryAbstractConnectionFactory 基類可能是一個不錯的起點。

可以使用 rabbit 名稱空間快速方便地建立 ConnectionFactory,如下所示

<rabbit:connection-factory id="connectionFactory"/>

在大多數情況下,這種方法是首選,因為框架可以為您選擇最佳預設值。建立的例項是 CachingConnectionFactory。請記住,通道的預設快取大小是 25。如果您希望快取更多通道,請透過設定 'channelCacheSize' 屬性設定一個更大的值。在 XML 中,它將如下所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用名稱空間,您可以新增 'channel-cache-size' 屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

預設快取模式是 CHANNEL,但您可以將其配置為快取連線。在以下示例中,我們使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用名稱空間提供 host 和 port 屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在叢集環境中執行,可以使用 addresses 屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有關 address-shuffle-mode 的資訊,請參閱 連線到叢集

以下示例使用一個自定義執行緒工廠,將執行緒名稱加上 rabbitmq- 字首

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

AddressResolver

從版本 2.1.15 開始,您現在可以使用 AddressResolver 來解析連線地址。這將覆蓋 addresseshost/port 屬性的任何設定。

連線命名

從版本 1.7 開始,提供了 ConnectionNameStrategy,用於注入到 AbstractionConnectionFactory 中。生成的名稱用於目標 RabbitMQ 連線的應用程式特定標識。如果 RabbitMQ 伺服器支援,連線名稱將顯示在管理 UI 中。此值不必是唯一的,並且不能用作連線識別符號(例如,在 HTTP API 請求中)。此值應該是人類可讀的,並且是 ClientProperties 下的 connection_name 鍵的一部分。您可以使用簡單的 Lambda,如下所示

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 引數可用於透過某些邏輯區分目標連線名稱。預設情況下,使用 AbstractConnectionFactorybeanName、表示物件的十六進位制字串以及內部計數器來生成 connection_name<rabbit:connection-factory> 名稱空間元件也提供了 connection-name-strategy 屬性。

SimplePropertyValueConnectionNameStrategy 的實現將連線名稱設定為應用程式屬性。您可以將其宣告為 @Bean 並將其注入到連線工廠中,如下例所示

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

該屬性必須存在於應用程式上下文的 Environment 中。

使用 Spring Boot 及其自動配置的連線工廠時,您只需宣告 ConnectionNameStrategy@Bean。Boot 會自動檢測該 Bean 並將其注入到工廠中。

阻塞的連線和資源約束

連線可能因 Broker 的 記憶體警報 而被阻塞。從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 例項,以接收連線阻塞和取消阻塞事件的通知。此外,AbstractConnectionFactory 透過其內部的 BlockedListener 實現,分別發出 ConnectionBlockedEventConnectionUnblockedEvent。這些事件使您能夠提供應用程式邏輯,以適當響應 Broker 上的問題並(例如)採取一些糾正措施。

當應用程式配置了單個 CachingConnectionFactory 時(Spring Boot 自動配置預設如此),當連線被 Broker 阻塞時,應用程式將停止工作。而當連線被 Broker 阻塞時,它的任何客戶端都會停止工作。如果我們在同一個應用程式中有生產者和消費者,當生產者阻塞連線(因為 Broker 上沒有資源了)並且消費者無法釋放它們(因為連線被阻塞)時,我們可能會遇到死鎖。為了緩解這個問題,我們建議使用另一個具有相同選項的獨立 CachingConnectionFactory 例項——一個用於生產者,一個用於消費者。對於在消費者執行緒上執行的事務性生產者來說,使用單獨的 CachingConnectionFactory 是不可能的,因為它們應該重用與消費者事務關聯的 Channel

從版本 2.0.2 開始,RabbitTemplate 有一個配置選項,可以在不使用事務的情況下自動使用第二個連線工廠。有關更多資訊,請參閱 使用單獨的連線。釋出者連線的 ConnectionNameStrategy 與主策略相同,並在呼叫方法的結果後附加 .publisher

從版本 1.7.7 開始,提供了 AmqpResourceNotAvailableException,當 SimpleConnection.createChannel() 無法建立 Channel 時會丟擲此異常(例如,因為達到 channelMax 限制且快取中沒有可用通道)。您可以在 RetryPolicy 中使用此異常,在退避後恢復操作。

配置底層客戶端連線工廠

CachingConnectionFactory 使用 Rabbit 客戶端的 ConnectionFactory 例項。透過在 CachingConnectionFactory 上設定等效屬性時,許多配置屬性(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)會透傳下去。要設定其他屬性(例如 clientProperties),您可以定義一個 Rabbit 工廠例項,並透過使用 CachingConnectionFactory 的適當建構函式提供對其的引用。使用名稱空間(如前所述)時,需要在 connection-factory 屬性中提供對已配置工廠的引用。為了方便起見,提供了一個工廠 Bean,以幫助在 Spring 應用程式上下文中配置連線工廠,如 下一節 中所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客戶端預設啟用自動恢復。儘管與此功能相容,Spring AMQP 有其自己的恢復機制,並且客戶端恢復功能通常不需要。我們建議停用 amqp-client 自動恢復,以避免在 Broker 可用但連線尚未恢復時出現 AutoRecoverConnectionNotCurrentlyOpenException 例項。您可能會注意到此異常,例如,當在 RabbitTemplate 中配置了 RetryTemplate 時,即使故障轉移到叢集中的另一個 Broker 時也是如此。由於自動恢復連線會按計時器恢復,使用 Spring AMQP 的恢復機制可能會更快地恢復連線。從版本 1.7.1 開始,除非您明確建立自己的 RabbitMQ 連線工廠並將其提供給 CachingConnectionFactory,否則 Spring AMQP 將停用 amqp-client 自動恢復。由 RabbitConnectionFactoryBean 建立的 RabbitMQ ConnectionFactory 例項也預設停用此選項。

RabbitConnectionFactoryBean 和配置 SSL

從版本 1.4 開始,提供了一個方便的 RabbitConnectionFactoryBean,透過依賴注入方便地配置底層客戶端連線工廠的 SSL 屬性。其他 setter 方法委託給底層工廠。以前,您必須以程式設計方式配置 SSL 選項。以下示例展示瞭如何配置 RabbitConnectionFactoryBean

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
Boot application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

有關配置 SSL 的資訊,請參閱 RabbitMQ 文件。省略 keyStoretrustStore 配置以透過 SSL 連線而無需證書驗證。下一個示例展示瞭如何提供金鑰庫和信任庫配置。

sslPropertiesLocation 屬性是一個指向包含以下鍵的屬性檔案的 Spring Resource

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向儲存檔案的 Spring Resource。通常,此屬性檔案由作業系統保護,應用程式具有讀取許可權。

從 Spring AMQP 1.5 版本開始,您可以直接在工廠 Bean 上設定這些屬性。如果同時提供了離散屬性和 sslPropertiesLocation,則後者的屬性會覆蓋離散值。

從版本 2.0 開始,伺服器證書預設經過驗證,因為它更安全。如果您出於某種原因希望跳過此驗證,請將工廠 Bean 的 skipServerCertificateValidation 屬性設定為 true。從版本 2.1 開始,RabbitConnectionFactoryBean 現在預設呼叫 enableHostnameVerification()。要恢復到以前的行為,請將 enableHostnameVerification 屬性設定為 false
從版本 2.2.5 開始,工廠 Bean 預設將始終使用 TLS v1.2;之前,在某些情況下使用 v1.1,在其他情況下使用 v1.2(取決於其他屬性)。如果您出於某種原因需要使用 v1.1,請設定 sslAlgorithm 屬性:setSslAlgorithm("TLSv1.1")

連線到叢集

要連線到叢集,請在 CachingConnectionFactory 上配置 addresses 屬性

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

從版本 3.0 開始,底層連線工廠將在建立新連線時嘗試連線到隨機選擇的地址。要恢復到從第一個到最後一個的先前行為,請將 addressShuffleMode 屬性設定為 AddressShuffleMode.NONE

從版本 2.3 開始,添加了 INORDER 混洗模式,這意味著連線建立後第一個地址會移到末尾。如果您希望在所有節點上消費所有分片,並且使用 CacheMode.CONNECTION 和適當的併發性,則可以考慮將此模式與 RabbitMQ Sharding Plugin 一起使用。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由連線工廠

從版本 1.3 開始,引入了 AbstractRoutingConnectionFactory。此工廠提供了一種機制,可以配置多個 ConnectionFactories 的對映,並在執行時透過某個 lookupKey 確定目標 ConnectionFactory。通常,實現會檢查執行緒繫結的上下文。為了方便起見,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它從 SimpleResourceHolder 獲取當前執行緒繫結的 lookupKey。以下示例展示瞭如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用後解綁資源非常重要。有關更多資訊,請參閱 AbstractRoutingConnectionFactoryJavaDoc

從版本 1.4 開始,RabbitTemplate 支援 SpEL 表示式屬性 sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression,它們在每次 AMQP 協議互動操作(sendsendAndReceivereceivereceiveAndReply)時進行評估,解析為提供的 AbstractRoutingConnectionFactorylookupKey 值。您可以在表示式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)。對於 send 操作,要傳送的訊息是根評估物件。對於 receive 操作,queueName 是根評估物件。

路由演算法如下:如果選擇器表示式為 null 或評估結果為 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的例項,則一切如常,依賴於提供的 ConnectionFactory 實現。如果評估結果不為 null,但對於該 lookupKey 沒有目標 ConnectionFactory 並且 AbstractRoutingConnectionFactory 配置為 lenientFallback = true,也會發生同樣的情況。對於 AbstractRoutingConnectionFactory,它確實會根據 determineCurrentLookupKey() 回退到其路由實現。但是,如果 lenientFallback = false,則會丟擲 IllegalStateException

名稱空間支援還在 <rabbit:template> 元件上提供了 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 屬性。

此外,從版本 1.4 開始,您可以在監聽器容器中配置路由連線工廠。在這種情況下,佇列名稱列表將用作查詢鍵。例如,如果您將容器配置為監聽 setQueueNames("thing1", "thing2"),則查詢鍵將是 [thing1,thing](注意鍵中沒有空格)。

從版本 1.6.9 開始,您可以使用監聽器容器上的 setLookupKeyQualifier 方法向查詢鍵新增限定符。這樣做可以實現,例如,監聽名稱相同但位於不同虛擬主機(您將為每個虛擬主機設定一個連線工廠)中的佇列。

例如,使用查詢鍵限定符 thing1 和監聽佇列 thing2 的容器,您可以註冊目標連線工廠的查詢鍵可以是 thing1[thing2]

目標連線工廠(如果提供預設連線工廠,也包括預設連線工廠)對於釋出者確認和返回必須具有相同的設定。請參閱 釋出者確認和返回

從版本 2.4.4 開始,可以停用此驗證。如果您遇到確認和返回之間的值需要不相等的情況,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 關閉驗證。請注意,新增到 AbstractRoutingConnectionFactory 的第一個連線工廠將決定 confirmsreturns 的通用值。

如果您遇到某些訊息需要檢查確認/返回而其他訊息不需要的情況,這可能很有用。例如

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

這樣,帶有頭 x-use-publisher-confirms: true 的訊息將透過快取連線傳送,您可以確保訊息的投遞。有關確保訊息投遞的更多資訊,請參閱 釋出者確認和返回

佇列親和性與 LocalizedQueueConnectionFactory

在叢集中使用 HA 佇列時,為了獲得最佳效能,您可能希望連線到主佇列所在的物理 Broker。CachingConnectionFactory 可以配置多個 Broker 地址。這是為了實現故障轉移,客戶端會根據配置的 AddressShuffleMode 順序嘗試連線。LocalizedQueueConnectionFactory 使用管理外掛提供的 REST API 來確定哪個節點是該佇列的主節點。然後,它會建立一個連線到該節點的 CachingConnectionFactory(或從快取中檢索一個)。如果連線失敗,會確定新的主節點,並且消費者會連線到它。LocalizedQueueConnectionFactory 配置了一個預設連線工廠,以防無法確定佇列的物理位置,在這種情況下,它會正常連線到叢集。

LocalizedQueueConnectionFactory 是一個 RoutingConnectionFactorySimpleMessageListenerContainer 使用佇列名稱作為查詢鍵,如上文 路由連線工廠 中所述。

因此(由於使用佇列名稱進行查詢),只有當容器配置為監聽單個佇列時,才能使用 LocalizedQueueConnectionFactory
必須在每個節點上啟用 RabbitMQ 管理外掛。
此連線工廠旨在用於長連線,例如 SimpleMessageListenerContainer 使用的連線。由於在建立連線之前呼叫 REST API 的開銷,它不適用於短連線使用,例如與 RabbitTemplate 一起使用。此外,對於釋出操作,佇列是未知的,並且訊息無論如何都會發布到所有叢集成員,因此查詢節點的邏輯價值不大。

以下示例配置展示瞭如何配置這些工廠

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

請注意,前三個引數是 addressesadminUrisnodes 的陣列。它們是位置引數,即當容器嘗試連線到佇列時,它使用管理 API 確定哪個節點是該佇列的主節點,並連線到與該節點在同一陣列位置的地址。

從版本 3.0 開始,不再使用 RabbitMQ 的 http-client 來訪問 Rest API。預設情況下,如果類路徑中存在 spring-webflux,則使用 Spring Webflux 的 WebClient;否則,使用 RestTemplate

WebFlux 新增到類路徑

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您還可以透過實現 LocalizedQueueConnectionFactory.NodeLocator 並重寫其 createClientrestCall 方法,以及可選的 close 方法來使用其他 REST 技術。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

框架提供了 WebFluxNodeLocatorRestTemplateNodeLocator,預設設定如上所述。

Publisher Confirms 和 Returns

透過將 CachingConnectionFactory 屬性 publisherConfirmType 設定為 ConfirmType.CORRELATED 並將 publisherReturns 屬性設定為 'true',可以支援 confirmed (帶關聯) 和 returned 訊息。

設定這些選項後,工廠建立的 Channel 例項將包裝在 PublisherCallbackChannel 中,用於促進回撥。獲取此類 channel 後,客戶端可以向 Channel 註冊一個 PublisherCallbackChannel.ListenerPublisherCallbackChannel 實現包含將 confirm 或 return 路由到相應監聽器的邏輯。這些功能將在以下章節中進一步解釋。

另請參閱 關聯的 Publisher Confirms 和 Returns,以及 作用域操作 中的 simplePublisherConfirms

更多背景資訊,請參閱 RabbitMQ 團隊題為 Introducing Publisher Confirms 的部落格文章。

Connection 和 Channel Listeners

連線工廠支援註冊 ConnectionListenerChannelListener 實現。這允許您接收連線和 channel 相關事件的通知。(RabbitAdmin 在建立連線時使用 ConnectionListener 來執行宣告 - 更多資訊請參見 Exchange、Queue 和 Binding 的自動宣告)。以下列表顯示了 ConnectionListener 介面定義

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 物件可以提供 com.rabbitmq.client.BlockedListener 例項,以便在連線被阻塞和解除阻塞時收到通知。以下示例顯示了 ChannelListener 介面定義

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有關何時可能需要註冊 ChannelListener 的一個場景,請參見 釋出是非同步的 — 如何檢測成功和失敗

記錄 Channel 關閉事件

版本 1.5 引入了一種機制,允許使用者控制日誌記錄級別。

AbstractConnectionFactory 使用預設策略記錄 channel 關閉,如下所示

  • 正常的 channel 關閉 (200 OK) 不會記錄日誌。

  • 如果 channel 因被動佇列宣告失敗而關閉,則以 DEBUG 級別記錄日誌。

  • 如果 channel 因獨佔消費者條件導致 basic.consume 被拒絕而關閉,則以 DEBUG 級別記錄日誌 (自 3.1 版起,之前是 INFO)。

  • 所有其他情況都以 ERROR 級別記錄日誌。

要修改此行為,您可以在 CachingConnectionFactorycloseExceptionLogger 屬性中注入自定義的 ConditionalExceptionLogger

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 現在是公共的,允許對其進行子類化。

另請參閱 消費者事件

執行時快取屬性

從版本 1.6 開始,CachingConnectionFactory 現在透過 getCacheProperties() 方法提供快取統計資訊。這些統計資訊可用於調整快取以最佳化生產環境。例如,高水位標記可用於確定是否應增加快取大小。如果它等於快取大小,您可能需要考慮進一步增加。下表描述了 CacheMode.CHANNEL 的屬性

表 1. CacheMode.CHANNEL 的快取屬性
屬性 含義
connectionName

ConnectionNameStrategy 生成的連線名稱。

channelCacheSize

當前配置的最大允許空閒 channel 數。

localPort

連線的本地埠(如果可用)。這可用於與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。

idleChannelsTx

當前空閒(快取)的事務性 channel 數。

idleChannelsNotTx

當前空閒(快取)的非事務性 channel 數。

idleChannelsTxHighWater

曾經併發空閒(快取)的最大事務性 channel 數。

idleChannelsNotTxHighWater

曾經併發空閒(快取)的最大非事務性 channel 數。

下表描述了 CacheMode.CONNECTION 的屬性

表 2. CacheMode.CONNECTION 的快取屬性
屬性 含義
connectionName:<localPort>

ConnectionNameStrategy 生成的連線名稱。

openConnections

代表與 broker 連線的連線物件數。

channelCacheSize

當前配置的最大允許空閒 channel 數。

connectionCacheSize

當前配置的最大允許空閒連線數。

idleConnections

當前空閒的連線數。

idleConnectionsHighWater

曾經併發空閒的最大連線數。

idleChannelsTx:<localPort>

此連線當前空閒(快取)的事務性 channel 數。您可以使用屬性名稱中的 localPort 部分與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。

idleChannelsNotTx:<localPort>

此連線當前空閒(快取)的非事務性 channel 數。屬性名稱中的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。

idleChannelsTxHighWater:<localPort>

曾經併發空閒(快取)的最大事務性 channel 數。屬性名稱中的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。

idleChannelsNotTxHighWater:<localPort>

曾經併發空閒(快取)的最大非事務性 channel 數。您可以使用屬性名稱中的 localPort 部分與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。

還包括 cacheMode 屬性 (CHANNELCONNECTION)。

cacheStats
圖 1. JVisualVM 示例

RabbitMQ 自動連線/拓撲恢復

自 Spring AMQP 的第一個版本以來,該框架在發生 broker 故障時提供了自己的連線和 channel 恢復機制。此外,如 配置 Broker 中所述,RabbitAdmin 在重新建立連線時會重新宣告任何基礎設施 bean (佇列等)。因此,它不依賴於現在由 amqp-client 庫提供的自動恢復。預設情況下,amqp-client 啟用了自動恢復。這兩種恢復機制之間存在一些不相容性,因此預設情況下,Spring 將底層 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 屬性設定為 false。即使該屬性為 true,Spring 也透過立即關閉任何已恢復的連線來有效停用它。

預設情況下,連線失敗後僅會重新宣告定義為 bean 的元素(佇列、交換器、繫結)。有關如何更改此行為,請參閱 恢復自動刪除宣告