連線和資源管理

儘管我們在上一節中描述的 AMQP 模型是通用的,適用於所有實現,但當涉及到資源管理時,具體細節取決於 Broker 的實現。因此,在本節中,我們專注於“spring-rabbit”模組中存在的程式碼,因為目前 RabbitMQ 是唯一受支援的實現。

用於管理與 RabbitMQ Broker 連線的核心元件是 ConnectionFactory 介面。ConnectionFactory 實現的職責是提供 org.springframework.amqp.rabbit.connection.Connection 的例項,它是 com.rabbitmq.client.Connection 的包裝器。

選擇連線工廠

有三種連線工廠可供選擇

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前兩個是在版本 2.3 中新增的。

對於大多數用例,應使用 CachingConnectionFactory。如果您想在不需要使用 作用域操作 的情況下確保嚴格的訊息排序,則可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactoryCachingConnectionFactory 類似,因為它使用單個連線和通道池。它的實現更簡單,但不支援關聯的釋出者確認。

所有三個工廠都支援簡單的釋出者確認。

從版本 2.3.2 開始,當配置 RabbitTemplate 以使用 單獨連線 時,您現在可以將釋出連線工廠配置為不同的型別。預設情況下,釋出工廠的型別與主工廠相同,並且在主工廠上設定的任何屬性也會傳播到釋出工廠。

從版本 3.1 開始,AbstractConnectionFactory 包含 connectionCreatingBackOff 屬性,該屬性支援連線模組中的退避策略。目前,createChannel() 的行為中支援處理達到 channelMax 限制時發生的異常,實現基於嘗試次數和間隔的退避策略。

PooledChannelConnectionFactory

該工廠管理一個連線和兩個基於 Apache Pool2 的通道池。一個池用於事務性通道,另一個用於非事務性通道。這些池是具有預設配置的 GenericObjectPool;提供了一個回撥來配置池;有關更多資訊,請參閱 Apache 文件。

要使用此工廠,classpath 中必須包含 Apache commons-pool2 jar。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

該工廠管理一個連線和兩個 ThreadLocal,一個用於事務性通道,另一個用於非事務性通道。該工廠確保同一執行緒上的所有操作都使用相同的通道(只要它保持開啟狀態)。這有助於實現嚴格的訊息排序,而無需 作用域操作。為避免記憶體洩漏,如果您的應用程式使用許多短生命週期的執行緒,則必須呼叫工廠的 closeThreadChannel() 來釋放通道資源。從版本 2.3.7 開始,一個執行緒可以將其通道傳輸到另一個執行緒。有關更多資訊,請參見 多執行緒環境中的嚴格訊息排序

CachingConnectionFactory

提供的第三個實現是 CachingConnectionFactory,它預設建立一個可以由應用程式共享的單個連線代理。連線共享是可能的,因為 AMQP 訊息傳遞的“工作單元”實際上是“通道”(在某些方面,這類似於 JMS 中連線和會話之間的關係)。連線例項提供 createChannel 方法。CachingConnectionFactory 實現支援這些通道的快取,並且它根據通道是否是事務性來維護單獨的通道快取。建立 CachingConnectionFactory 例項時,可以透過建構函式提供“主機名”。您還應該提供“使用者名稱”和“密碼”屬性。要配置通道快取的大小(預設為 25),可以呼叫 setChannelCacheSize() 方法。

從版本 1.3 開始,您可以配置 CachingConnectionFactory 以快取連線以及僅快取通道。在這種情況下,每次呼叫 createConnection() 都會建立一個新連線(或從快取中檢索一個空閒連線)。關閉連線會將其返回到快取中(如果未達到快取大小)。在此類連線上建立的通道也會被快取。在某些環境中,使用單獨的連線可能很有用,例如從 HA 叢集消費,結合負載均衡器連線到不同的叢集成員等等。要快取連線,請將 cacheMode 設定為 CacheMode.CONNECTION

這不限制連線的數量。相反,它指定允許有多少空閒的開放連線。

從版本 1.5.5 開始,提供了一個名為 connectionLimit 的新屬性。設定此屬性後,它會限制允許的總連線數。設定後,如果達到限制,則使用 channelCheckoutTimeLimit 等待連線變為空閒。如果超過時間,則丟擲 AmqpTimeoutException

當快取模式為 CONNECTION 時,不支援佇列等的自動宣告(請參閱 交換機、佇列和繫結的自動宣告)。

此外,在撰寫本文時,amqp-client 庫預設會為每個連線建立一個固定執行緒池(預設大小:Runtime.getRuntime().availableProcessors() * 2 個執行緒)。當使用大量連線時,您應該考慮在 CachingConnectionFactory 上設定自定義 executor。然後,所有連線都可以使用相同的執行器,並且可以共享其執行緒。執行器的執行緒池應該是無界的,或者根據預期用途適當設定(通常,每個連線至少一個執行緒)。如果每個連線上建立多個通道,則池大小會影響併發性,因此可變(或簡單的快取)執行緒池執行器是最合適的。

重要的是要理解快取大小(預設情況下)不是限制,而僅僅是可以快取的通道數量。例如,如果快取大小為 10,則實際上可以使用任意數量的通道。如果使用了超過 10 個通道,並且它們都返回到快取中,則有 10 個通道進入快取。其餘的通道會被物理關閉。

從版本 1.6 開始,預設通道快取大小已從 1 增加到 25。在高併發、多執行緒環境中,小快取意味著通道以高速率建立和關閉。增加預設快取大小可以避免這種開銷。您應該透過 RabbitMQ 管理 UI 監控正在使用的通道,如果看到許多通道正在建立和關閉,請考慮進一步增加快取大小。快取僅在按需時增長(以適應應用程式的併發要求),因此此更改不會影響現有的低吞吐量應用程式。

從版本 1.4.2 開始,CachingConnectionFactory 有一個名為 channelCheckoutTimeout 的屬性。當此屬性大於零時,channelCacheSize 成為可以在連線上建立的通道數量的限制。如果達到限制,呼叫執行緒會阻塞,直到通道可用或達到此超時,在這種情況下會丟擲 AmqpTimeoutException

框架內使用的通道(例如 RabbitTemplate)會可靠地返回到快取中。如果您在框架外部建立通道(例如,透過直接訪問連線並呼叫 createChannel()),則必須可靠地將其返回(透過關閉),可能在 finally 塊中,以避免通道耗盡。

以下示例顯示如何建立新 connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 時,配置可能如下例所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
還有一個 SingleConnectionFactory 實現,僅在框架的單元測試程式碼中可用。它比 CachingConnectionFactory 更簡單,因為它不快取通道,但由於其效能和彈性不足,不適合在簡單測試之外的實際使用。如果出於某種原因需要實現自己的 ConnectionFactoryAbstractConnectionFactory 基類可以提供一個很好的起點。

可以使用 Rabbit 名稱空間快速方便地建立 ConnectionFactory,如下所示

<rabbit:connection-factory id="connectionFactory"/>

在大多數情況下,此方法更可取,因為框架可以為您選擇最佳預設值。建立的例項是 CachingConnectionFactory。請記住,通道的預設快取大小為 25。如果您希望快取更多通道,請透過設定“channelCacheSize”屬性來設定更大的值。在 XML 中,它看起來像這樣

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用名稱空間,您可以新增“channel-cache-size”屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

預設快取模式是 CHANNEL,但您可以將其配置為快取連線。在以下示例中,我們使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用名稱空間提供主機和埠屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在叢集環境中執行,您可以使用地址屬性,如下所示

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有關 address-shuffle-mode 的資訊,請參見 連線到叢集

以下示例使用自定義執行緒工廠,該工廠將執行緒名稱字首為 rabbitmq-

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

地址解析器

從版本 2.1.15 開始,您現在可以使用 AddressResolver 來解析連線地址。這將覆蓋 addresseshost/port 屬性的任何設定。

命名連線

從版本 1.7 開始,提供了 ConnectionNameStrategy 用於注入 AbstractionConnectionFactory。生成的名稱用於目標 RabbitMQ 連線的應用程式特定標識。如果 RabbitMQ 伺服器支援,連線名稱會顯示在管理 UI 中。此值不必是唯一的,不能用作連線識別符號——例如,在 HTTP API 請求中。此值應該是人類可讀的,並且是 ClientPropertiesconnection_name 鍵的一部分。您可以使用簡單的 Lambda,如下所示

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 引數可用於透過某些邏輯區分目標連線名稱。預設情況下,AbstractConnectionFactorybeanName、表示物件的十六進位制字串和內部計數器用於生成 connection_name<rabbit:connection-factory> 名稱空間元件也提供了 connection-name-strategy 屬性。

SimplePropertyValueConnectionNameStrategy 的實現將連線名稱設定為應用程式屬性。您可以將其宣告為 @Bean 並將其注入連線工廠,如下例所示

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

該屬性必須存在於應用程式上下文的 Environment 中。

使用 Spring Boot 及其自動配置的連線工廠時,您只需宣告 ConnectionNameStrategy @Bean。Spring Boot 會自動檢測 bean 並將其連線到工廠中。

阻塞的連線和資源限制

連線可能會因 Broker 的互動而阻塞,這對應於 記憶體警報。從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 例項,以便在連線阻塞和解除阻塞事件時收到通知。此外,AbstractConnectionFactory 透過其內部 BlockedListener 實現分別發出 ConnectionBlockedEventConnectionUnblockedEvent。這些允許您提供應用程式邏輯,以適當響應 Broker 上的問題並(例如)採取一些糾正措施。

當應用程式配置為單個 CachingConnectionFactory 時(Spring Boot 自動配置預設如此),當連線被 Broker 阻塞時,應用程式將停止工作。當它被 Broker 阻塞時,其任何客戶端都會停止工作。如果我們在同一個應用程式中有生產者和消費者,當生產者阻塞連線(因為 Broker 上不再有資源)並且消費者無法釋放它們(因為連線被阻塞)時,我們可能會陷入死鎖。為了緩解這個問題,我們建議再建立一個單獨的 CachingConnectionFactory 例項,具有相同的選項——一個用於生產者,一個用於消費者。對於在消費者執行緒上執行的事務性生產者,單獨的 CachingConnectionFactory 是不可能的,因為它們應該重用與消費者事務關聯的 Channel

從版本 2.0.2 開始,RabbitTemplate 有一個配置選項,可以在不使用事務的情況下自動使用第二個連線工廠。有關更多資訊,請參見 使用單獨連線。釋出者連線的 ConnectionNameStrategy 與主策略相同,並在呼叫方法的結果後附加 .publisher

從版本 1.7.7 開始,提供了一個 AmqpResourceNotAvailableException,當 SimpleConnection.createChannel() 無法建立 Channel 時(例如,因為達到 channelMax 限制並且快取中沒有可用通道),將丟擲此異常。您可以在 RetryPolicy 中使用此異常,以在一些退避後恢復操作。

配置底層客戶端連線工廠

CachingConnectionFactory 使用 Rabbit 客戶端 ConnectionFactory 的例項。當在 CachingConnectionFactory 上設定等效屬性時,許多配置屬性會傳遞(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)。要設定其他屬性(例如 clientProperties),您可以定義 Rabbit 工廠的例項,並透過使用 CachingConnectionFactory 的相應建構函式提供對其的引用。使用名稱空間時(如前所述),您需要提供對 connection-factory 屬性中配置的工廠的引用。為方便起見,提供了一個工廠 bean,以幫助在 Spring 應用程式上下文中配置連線工廠,如 下一節 所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客戶端預設啟用自動恢復。雖然與此功能相容,但 Spring AMQP 有自己的恢復機制,通常不需要客戶端恢復功能。我們建議停用 amqp-client 自動恢復,以避免在 Broker 可用但連線尚未恢復時出現 AutoRecoverConnectionNotCurrentlyOpenException 例項。例如,當 RabbitTemplate 中配置了 RetryTemplate 時,即使在叢集中故障轉移到另一個 Broker,您也可能會注意到此異常。由於自動恢復連線在計時器上恢復,因此使用 Spring AMQP 的恢復機制可以更快地恢復連線。從版本 1.7.1 開始,Spring AMQP 停用 amqp-client 自動恢復,除非您明確建立自己的 RabbitMQ 連線工廠並將其提供給 CachingConnectionFactoryRabbitConnectionFactoryBean 建立的 RabbitMQ ConnectionFactory 例項也預設停用此選項。

RabbitConnectionFactoryBean 和配置 SSL

從版本 1.4 開始,提供了一個方便的 RabbitConnectionFactoryBean,可以透過依賴注入方便地配置底層客戶端連線工廠上的 SSL 屬性。其他設定器委託給底層工廠。以前,您必須以程式設計方式配置 SSL 選項。以下示例顯示如何配置 RabbitConnectionFactoryBean

  • Java

  • XML

@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
<bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

<rabbit:connection-factory id="connectionFactory"
    connection-factory="rabbitConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

Spring Boot 應用程式檔案(.yaml.properties

  • 屬性

  • YAML

spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
  rabbitmq:
    host: ...
    ssl:
      keyStoreType: jks
      trustStoreType: jks
      keyStore: ...
      trustStore: ...
      trustStorePassword: ...
      keyStorePassword: ...
      enabled: true

有關配置 SSL 的資訊,請參見 RabbitMQ 文件。省略 keyStoretrustStore 配置可在不驗證證書的情況下透過 SSL 連線。下一個示例顯示如何提供金鑰和信任庫配置。

sslPropertiesLocation 屬性是一個 Spring Resource,指向一個包含以下鍵的屬性檔案

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向儲存的 Spring Resources。通常,此屬性檔案由作業系統保護,應用程式具有讀取訪問許可權。

從 Spring AMQP 1.5 版開始,您可以直接在工廠 bean 上設定這些屬性。如果同時提供了離散屬性和 sslPropertiesLocation,則後者中的屬性會覆蓋離散值。

從版本 2.0 開始,預設情況下會驗證伺服器證書,因為它更安全。如果出於某種原因要跳過此驗證,請將工廠 bean 的 skipServerCertificateValidation 屬性設定為 true。從版本 2.1 開始,RabbitConnectionFactoryBean 現在預設呼叫 enableHostnameVerification()。要恢復到以前的行為,請將 enableHostnameVerification 屬性設定為 false
從版本 2.2.5 開始,工廠 bean 預設將始終使用 TLS v1.2;以前,在某些情況下它使用 v1.1,在其他情況下使用 v1.2(取決於其他屬性)。如果出於某種原因需要使用 v1.1,請設定 sslAlgorithm 屬性:setSslAlgorithm("TLSv1.1")

連線到叢集

要連線到叢集,請在 CachingConnectionFactory 上配置 addresses 屬性

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

從版本 3.0 開始,每當建立新連線時,底層連線工廠都會透過選擇隨機地址來嘗試連線到主機。要恢復到以前的從第一個到最後一個嘗試連線的行為,請將 addressShuffleMode 屬性設定為 AddressShuffleMode.NONE

從版本 2.3 開始,添加了 INORDER 洗牌模式,這意味著在建立連線後,第一個地址會移到末尾。如果您希望使用 RabbitMQ 分片外掛CacheMode.CONNECTION 和適當的併發性從所有節點的所有分片進行消費,則可能希望使用此模式。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由連線工廠

從版本 1.3 開始,引入了 AbstractRoutingConnectionFactory。該工廠提供了一種機制,用於配置多個 ConnectionFactories 的對映,並根據執行時的一些 lookupKey 確定目標 ConnectionFactory。通常,實現會檢查執行緒繫結的上下文。為了方便起見,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它從 SimpleResourceHolder 獲取當前執行緒繫結的 lookupKey。以下示例展示瞭如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用後解除繫結資源很重要。有關更多資訊,請參閱 AbstractRoutingConnectionFactoryJavaDoc

從版本 1.4 開始,RabbitTemplate 支援 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 屬性,這些屬性在每個 AMQP 協議互動操作(sendsendAndReceivereceivereceiveAndReply)上進行評估,解析為提供的 AbstractRoutingConnectionFactorylookupKey 值。您可以在表示式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)。對於 send 操作,要傳送的訊息是根評估物件。對於 receive 操作,queueName 是根評估物件。

路由演算法如下:如果選擇器表示式為 null 或被評估為 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的例項,則一切照舊,依賴於提供的 ConnectionFactory 實現。如果評估結果不為 null,但該 lookupKey 沒有目標 ConnectionFactory,並且 AbstractRoutingConnectionFactory 配置為 lenientFallback = true,也會發生同樣的情況。在 AbstractRoutingConnectionFactory 的情況下,它會根據 determineCurrentLookupKey() 回退到其 routing 實現。但是,如果 lenientFallback = false,則會丟擲 IllegalStateException

名稱空間支援還為 <rabbit:template> 元件提供了 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 屬性。

此外,從版本 1.4 開始,您可以在監聽器容器中配置路由連線工廠。在這種情況下,佇列名稱列表用作查詢鍵。例如,如果使用 setQueueNames("thing1", "thing2") 配置容器,則查詢鍵為 [thing1,thing]"(請注意鍵中沒有空格)。

從版本 1.6.9 開始,您可以使用監聽器容器上的 setLookupKeyQualifier 為查詢鍵新增限定符。這樣做可以實現,例如,偵聽同名但在不同虛擬主機中的佇列(在這種情況下,您將為每個佇列擁有一個連線工廠)。

例如,如果查詢鍵限定符為 thing1,並且容器正在偵聽佇列 thing2,則您可以註冊目標連線工廠的查詢鍵可以是 thing1[thing2]

目標(以及提供的預設)連線工廠必須具有相同的釋出者確認和返回設定。請參閱 釋出者確認和返回

從版本 2.4.4 開始,可以停用此驗證。如果您在某些情況下需要確認和返回的值不相等,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 關閉驗證。請注意,新增到 AbstractRoutingConnectionFactory 的第一個連線工廠將確定 confirmsreturns 的一般值。

如果您遇到某些訊息需要檢查確認/返回而其他訊息不需要的情況,這可能會很有用。例如

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

這樣,帶有 x-use-publisher-confirms: true 頭部的訊息將透過快取連線傳送,您可以確保訊息傳遞。有關確保訊息傳遞的更多資訊,請參閱 釋出者確認和返回

佇列親和性和 LocalizedQueueConnectionFactory

在叢集中使用 HA 佇列時,為了獲得最佳效能,您可能希望連線到主佇列所在的物理 Broker。CachingConnectionFactory 可以配置多個 Broker 地址。這是為了故障轉移,客戶端根據配置的 AddressShuffleMode 順序嘗試連線。LocalizedQueueConnectionFactory 使用管理外掛提供的 REST API 來確定哪個節點是佇列的主節點。然後,它建立一個(或從快取中檢索)CachingConnectionFactory,該工廠僅連線到該節點。如果連線失敗,則確定新的主節點,並且消費者連線到該節點。LocalizedQueueConnectionFactory 配置了一個預設連線工廠,以防無法確定佇列的物理位置,在這種情況下,它會像正常一樣連線到叢集。

LocalizedQueueConnectionFactory 是一個 RoutingConnectionFactorySimpleMessageListenerContainer 使用佇列名稱作為查詢鍵,如上文 路由連線工廠 中所述。

因此(使用佇列名稱進行查詢),LocalizedQueueConnectionFactory 只能在容器配置為偵聽單個佇列時使用。
每個節點都必須啟用 RabbitMQ 管理外掛。
此連線工廠用於長期連線,例如 SimpleMessageListenerContainer 使用的連線。它不適用於短期連線,例如與 RabbitTemplate 一起使用,因為在建立連線之前呼叫 REST API 會產生開銷。此外,對於釋出操作,佇列是未知的,並且訊息無論如何都會發布到所有叢集成員,因此查詢節點的邏輯價值不大。

以下配置示例顯示瞭如何配置工廠

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

請注意,前三個引數是 addressesadminUrisnodes 的陣列。這些是位置性的,即當容器嘗試連線到佇列時,它使用管理 API 來確定哪個節點是佇列的主節點,並連線到與該節點在同一陣列位置的地址。

從版本 3.0 開始,RabbitMQ http-client 不再用於訪問 Rest API。相反,預設情況下,如果 spring-webflux 在類路徑上,則使用 Spring Webflux 中的 WebClient;否則使用 RestTemplate

要將 WebFlux 新增到類路徑

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您還可以透過實現 LocalizedQueueConnectionFactory#setNodeLocator 並重寫其 createClientrestCall 和(可選)close 方法來使用其他 REST 技術。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
        ...
    }

});

該框架提供了 WebFluxNodeLocatorRestTemplateNodeLocator,預設值如上所述。

釋出者確認和返回

透過將 CachingConnectionFactory 屬性 publisherConfirmType 設定為 ConfirmType.CORRELATED 並將 publisherReturns 屬性設定為“true”,支援已確認(帶關聯)和已返回的訊息。

當設定了這些選項後,工廠建立的 Channel 例項將包裝在 PublisherCallbackChannel 中,用於促進回撥。當獲得這樣的通道時,客戶端可以向 Channel 註冊一個 PublisherCallbackChannel.ListenerPublisherCallbackChannel 實現包含將確認或返回路由到相應監聽器的邏輯。這些功能將在以下部分中進一步解釋。

另請參閱 關聯的釋出者確認和返回作用域操作 中的 simplePublisherConfirms

有關更多背景資訊,請參閱 RabbitMQ 團隊的部落格文章 Introducing Publisher Confirms

連線和通道監聽器

連線工廠支援註冊 ConnectionListenerChannelListener 實現。這允許您接收連線和通道相關事件的通知。(RabbitAdmin 使用 ConnectionListener 在建立連線時執行宣告——有關更多資訊,請參見 交換機、佇列和繫結的自動宣告)。以下列表顯示了 ConnectionListener 介面定義

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 物件可以提供 com.rabbitmq.client.BlockedListener 例項,以便在連線阻塞和解除阻塞事件時收到通知。以下示例顯示了 ChannelListener 介面定義

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有關您可能希望註冊 ChannelListener 的一種場景,請參見 釋出是非同步的——如何檢測成功和失敗

日誌通道關閉事件

版本 1.5 引入了一種機制,允許使用者控制日誌級別。

AbstractConnectionFactory 使用預設策略記錄通道關閉,如下所示

  • 正常的通道關閉(200 OK)不會被記錄。

  • 如果通道因被動佇列宣告失敗而關閉,則以 DEBUG 級別記錄。

  • 如果通道因排他消費者條件導致 basic.consume 被拒絕而關閉,則以 DEBUG 級別記錄(從 3.1 開始,以前是 INFO)。

  • 所有其他都以 ERROR 級別記錄。

要修改此行為,您可以將自定義 ConditionalExceptionLogger 注入 CachingConnectionFactorycloseExceptionLogger 屬性中。

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 現在是公共的,允許其子類化。

另請參閱 消費者事件

執行時快取屬性

從版本 1.6 開始,CachingConnectionFactory 現在透過 getCacheProperties() 方法提供快取統計資訊。這些統計資訊可用於調整快取在生產中的最佳化。例如,高水位線可用於確定是否應增加快取大小。如果它等於快取大小,您可能需要考慮進一步增加。下表描述了 CacheMode.CHANNEL 屬性

表 1. CacheMode.CHANNEL 的快取屬性
財產 含義
connectionName

ConnectionNameStrategy 生成的連線名稱。

channelCacheSize

當前配置的允許空閒的最大通道數。

localPort

連線的本地埠(如果可用)。這可用於與 RabbitMQ 管理 UI 上的連線和通道關聯。

idleChannelsTx

當前空閒(快取)的事務性通道數。

idleChannelsNotTx

當前空閒(快取)的非事務性通道數。

idleChannelsTxHighWater

同時空閒(快取)的事務性通道的最大數量。

idleChannelsNotTxHighWater

同時空閒(快取)的非事務性通道的最大數量。

下表描述了 CacheMode.CONNECTION 屬性

表 2. CacheMode.CONNECTION 的快取屬性
財產 含義
connectionName:<localPort>

ConnectionNameStrategy 生成的連線名稱。

openConnections

表示與 Broker 連線的連線物件數。

channelCacheSize

當前配置的允許空閒的最大通道數。

connectionCacheSize

當前配置的允許空閒的最大連線數。

idleConnections

當前空閒的連線數。

idleConnectionsHighWater

同時空閒的最大連線數。

idleChannelsTx:<localPort>

此連線當前空閒(快取)的事務性通道數。您可以使用屬性名稱的 localPort 部分來與 RabbitMQ 管理 UI 上的連線和通道相關聯。

idleChannelsNotTx:<localPort>

此連線當前空閒(快取)的非事務性通道數。屬性名稱的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和通道相關聯。

idleChannelsTxHighWater:<localPort>

同時空閒(快取)的事務性通道的最大數量。屬性名稱的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和通道相關聯。

idleChannelsNotTxHighWater:<localPort>

同時空閒(快取)的非事務性通道的最大數量。您可以使用屬性名稱的 localPort 部分來與 RabbitMQ 管理 UI 上的連線和通道相關聯。

還包括 cacheMode 屬性(CHANNELCONNECTION)。

cacheStats
圖 1. JVisualVM 示例

RabbitMQ 自動連線/拓撲恢復

自 Spring AMQP 的第一個版本以來,該框架在 Broker 發生故障時提供了自己的連線和通道恢復。此外,如 配置 Broker 中所述,當連線重新建立時,RabbitAdmin 會重新宣告任何基礎設施 bean(佇列等)。因此,它不依賴於 amqp-client 庫現在提供的 自動恢復amqp-client 預設啟用自動恢復。兩種恢復機制之間存在一些不相容性,因此,Spring 預設將底層 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 屬性設定為 false。即使該屬性為 true,Spring 也會透過立即關閉任何已恢復的連線來有效停用它。

預設情況下,只有被定義為 bean 的元素(佇列、交換器、繫結)會在連線失敗後重新宣告。有關如何更改該行為的資訊,請參閱 恢復自動刪除宣告
© . This site is unofficial and not affiliated with VMware.