連線和資源管理
儘管我們在上一節中描述的 AMQP 模型是通用的,適用於所有實現,但當涉及到資源管理時,具體細節取決於 Broker 的實現。因此,在本節中,我們專注於“spring-rabbit”模組中存在的程式碼,因為目前 RabbitMQ 是唯一受支援的實現。
用於管理與 RabbitMQ Broker 連線的核心元件是 ConnectionFactory 介面。ConnectionFactory 實現的職責是提供 org.springframework.amqp.rabbit.connection.Connection 的例項,它是 com.rabbitmq.client.Connection 的包裝器。
選擇連線工廠
有三種連線工廠可供選擇
-
PooledChannelConnectionFactory -
ThreadChannelConnectionFactory -
CachingConnectionFactory
前兩個是在版本 2.3 中新增的。
對於大多數用例,應使用 CachingConnectionFactory。如果您想在不需要使用 作用域操作 的情況下確保嚴格的訊息排序,則可以使用 ThreadChannelConnectionFactory。PooledChannelConnectionFactory 與 CachingConnectionFactory 類似,因為它使用單個連線和通道池。它的實現更簡單,但不支援關聯的釋出者確認。
所有三個工廠都支援簡單的釋出者確認。
從版本 2.3.2 開始,當配置 RabbitTemplate 以使用 單獨連線 時,您現在可以將釋出連線工廠配置為不同的型別。預設情況下,釋出工廠的型別與主工廠相同,並且在主工廠上設定的任何屬性也會傳播到釋出工廠。
從版本 3.1 開始,AbstractConnectionFactory 包含 connectionCreatingBackOff 屬性,該屬性支援連線模組中的退避策略。目前,createChannel() 的行為中支援處理達到 channelMax 限制時發生的異常,實現基於嘗試次數和間隔的退避策略。
PooledChannelConnectionFactory
該工廠管理一個連線和兩個基於 Apache Pool2 的通道池。一個池用於事務性通道,另一個用於非事務性通道。這些池是具有預設配置的 GenericObjectPool;提供了一個回撥來配置池;有關更多資訊,請參閱 Apache 文件。
要使用此工廠,classpath 中必須包含 Apache commons-pool2 jar。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
該工廠管理一個連線和兩個 ThreadLocal,一個用於事務性通道,另一個用於非事務性通道。該工廠確保同一執行緒上的所有操作都使用相同的通道(只要它保持開啟狀態)。這有助於實現嚴格的訊息排序,而無需 作用域操作。為避免記憶體洩漏,如果您的應用程式使用許多短生命週期的執行緒,則必須呼叫工廠的 closeThreadChannel() 來釋放通道資源。從版本 2.3.7 開始,一個執行緒可以將其通道傳輸到另一個執行緒。有關更多資訊,請參見 多執行緒環境中的嚴格訊息排序。
CachingConnectionFactory
提供的第三個實現是 CachingConnectionFactory,它預設建立一個可以由應用程式共享的單個連線代理。連線共享是可能的,因為 AMQP 訊息傳遞的“工作單元”實際上是“通道”(在某些方面,這類似於 JMS 中連線和會話之間的關係)。連線例項提供 createChannel 方法。CachingConnectionFactory 實現支援這些通道的快取,並且它根據通道是否是事務性來維護單獨的通道快取。建立 CachingConnectionFactory 例項時,可以透過建構函式提供“主機名”。您還應該提供“使用者名稱”和“密碼”屬性。要配置通道快取的大小(預設為 25),可以呼叫 setChannelCacheSize() 方法。
從版本 1.3 開始,您可以配置 CachingConnectionFactory 以快取連線以及僅快取通道。在這種情況下,每次呼叫 createConnection() 都會建立一個新連線(或從快取中檢索一個空閒連線)。關閉連線會將其返回到快取中(如果未達到快取大小)。在此類連線上建立的通道也會被快取。在某些環境中,使用單獨的連線可能很有用,例如從 HA 叢集消費,結合負載均衡器連線到不同的叢集成員等等。要快取連線,請將 cacheMode 設定為 CacheMode.CONNECTION。
| 這不限制連線的數量。相反,它指定允許有多少空閒的開放連線。 |
從版本 1.5.5 開始,提供了一個名為 connectionLimit 的新屬性。設定此屬性後,它會限制允許的總連線數。設定後,如果達到限制,則使用 channelCheckoutTimeLimit 等待連線變為空閒。如果超過時間,則丟擲 AmqpTimeoutException。
|
當快取模式為 此外,在撰寫本文時, |
重要的是要理解快取大小(預設情況下)不是限制,而僅僅是可以快取的通道數量。例如,如果快取大小為 10,則實際上可以使用任意數量的通道。如果使用了超過 10 個通道,並且它們都返回到快取中,則有 10 個通道進入快取。其餘的通道會被物理關閉。
從版本 1.6 開始,預設通道快取大小已從 1 增加到 25。在高併發、多執行緒環境中,小快取意味著通道以高速率建立和關閉。增加預設快取大小可以避免這種開銷。您應該透過 RabbitMQ 管理 UI 監控正在使用的通道,如果看到許多通道正在建立和關閉,請考慮進一步增加快取大小。快取僅在按需時增長(以適應應用程式的併發要求),因此此更改不會影響現有的低吞吐量應用程式。
從版本 1.4.2 開始,CachingConnectionFactory 有一個名為 channelCheckoutTimeout 的屬性。當此屬性大於零時,channelCacheSize 成為可以在連線上建立的通道數量的限制。如果達到限制,呼叫執行緒會阻塞,直到通道可用或達到此超時,在這種情況下會丟擲 AmqpTimeoutException。
框架內使用的通道(例如 RabbitTemplate)會可靠地返回到快取中。如果您在框架外部建立通道(例如,透過直接訪問連線並呼叫 createChannel()),則必須可靠地將其返回(透過關閉),可能在 finally 塊中,以避免通道耗盡。 |
以下示例顯示如何建立新 connection
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 時,配置可能如下例所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
還有一個 SingleConnectionFactory 實現,僅在框架的單元測試程式碼中可用。它比 CachingConnectionFactory 更簡單,因為它不快取通道,但由於其效能和彈性不足,不適合在簡單測試之外的實際使用。如果出於某種原因需要實現自己的 ConnectionFactory,AbstractConnectionFactory 基類可以提供一個很好的起點。 |
可以使用 Rabbit 名稱空間快速方便地建立 ConnectionFactory,如下所示
<rabbit:connection-factory id="connectionFactory"/>
在大多數情況下,此方法更可取,因為框架可以為您選擇最佳預設值。建立的例項是 CachingConnectionFactory。請記住,通道的預設快取大小為 25。如果您希望快取更多通道,請透過設定“channelCacheSize”屬性來設定更大的值。在 XML 中,它看起來像這樣
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用名稱空間,您可以新增“channel-cache-size”屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
預設快取模式是 CHANNEL,但您可以將其配置為快取連線。在以下示例中,我們使用 connection-cache-size
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用名稱空間提供主機和埠屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在叢集環境中執行,您可以使用地址屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有關 address-shuffle-mode 的資訊,請參見 連線到叢集。
以下示例使用自定義執行緒工廠,該工廠將執行緒名稱字首為 rabbitmq-
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名連線
從版本 1.7 開始,提供了 ConnectionNameStrategy 用於注入 AbstractionConnectionFactory。生成的名稱用於目標 RabbitMQ 連線的應用程式特定標識。如果 RabbitMQ 伺服器支援,連線名稱會顯示在管理 UI 中。此值不必是唯一的,不能用作連線識別符號——例如,在 HTTP API 請求中。此值應該是人類可讀的,並且是 ClientProperties 中 connection_name 鍵的一部分。您可以使用簡單的 Lambda,如下所示
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory 引數可用於透過某些邏輯區分目標連線名稱。預設情況下,AbstractConnectionFactory 的 beanName、表示物件的十六進位制字串和內部計數器用於生成 connection_name。<rabbit:connection-factory> 名稱空間元件也提供了 connection-name-strategy 屬性。
SimplePropertyValueConnectionNameStrategy 的實現將連線名稱設定為應用程式屬性。您可以將其宣告為 @Bean 並將其注入連線工廠,如下例所示
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
該屬性必須存在於應用程式上下文的 Environment 中。
使用 Spring Boot 及其自動配置的連線工廠時,您只需宣告 ConnectionNameStrategy @Bean。Spring Boot 會自動檢測 bean 並將其連線到工廠中。 |
阻塞的連線和資源限制
連線可能會因 Broker 的互動而阻塞,這對應於 記憶體警報。從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 例項,以便在連線阻塞和解除阻塞事件時收到通知。此外,AbstractConnectionFactory 透過其內部 BlockedListener 實現分別發出 ConnectionBlockedEvent 和 ConnectionUnblockedEvent。這些允許您提供應用程式邏輯,以適當響應 Broker 上的問題並(例如)採取一些糾正措施。
當應用程式配置為單個 CachingConnectionFactory 時(Spring Boot 自動配置預設如此),當連線被 Broker 阻塞時,應用程式將停止工作。當它被 Broker 阻塞時,其任何客戶端都會停止工作。如果我們在同一個應用程式中有生產者和消費者,當生產者阻塞連線(因為 Broker 上不再有資源)並且消費者無法釋放它們(因為連線被阻塞)時,我們可能會陷入死鎖。為了緩解這個問題,我們建議再建立一個單獨的 CachingConnectionFactory 例項,具有相同的選項——一個用於生產者,一個用於消費者。對於在消費者執行緒上執行的事務性生產者,單獨的 CachingConnectionFactory 是不可能的,因為它們應該重用與消費者事務關聯的 Channel。 |
從版本 2.0.2 開始,RabbitTemplate 有一個配置選項,可以在不使用事務的情況下自動使用第二個連線工廠。有關更多資訊,請參見 使用單獨連線。釋出者連線的 ConnectionNameStrategy 與主策略相同,並在呼叫方法的結果後附加 .publisher。
從版本 1.7.7 開始,提供了一個 AmqpResourceNotAvailableException,當 SimpleConnection.createChannel() 無法建立 Channel 時(例如,因為達到 channelMax 限制並且快取中沒有可用通道),將丟擲此異常。您可以在 RetryPolicy 中使用此異常,以在一些退避後恢復操作。
配置底層客戶端連線工廠
CachingConnectionFactory 使用 Rabbit 客戶端 ConnectionFactory 的例項。當在 CachingConnectionFactory 上設定等效屬性時,許多配置屬性會傳遞(例如 host、port、userName、password、requestedHeartBeat 和 connectionTimeout)。要設定其他屬性(例如 clientProperties),您可以定義 Rabbit 工廠的例項,並透過使用 CachingConnectionFactory 的相應建構函式提供對其的引用。使用名稱空間時(如前所述),您需要提供對 connection-factory 屬性中配置的工廠的引用。為方便起見,提供了一個工廠 bean,以幫助在 Spring 應用程式上下文中配置連線工廠,如 下一節 所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客戶端預設啟用自動恢復。雖然與此功能相容,但 Spring AMQP 有自己的恢復機制,通常不需要客戶端恢復功能。我們建議停用 amqp-client 自動恢復,以避免在 Broker 可用但連線尚未恢復時出現 AutoRecoverConnectionNotCurrentlyOpenException 例項。例如,當 RabbitTemplate 中配置了 RetryTemplate 時,即使在叢集中故障轉移到另一個 Broker,您也可能會注意到此異常。由於自動恢復連線在計時器上恢復,因此使用 Spring AMQP 的恢復機制可以更快地恢復連線。從版本 1.7.1 開始,Spring AMQP 停用 amqp-client 自動恢復,除非您明確建立自己的 RabbitMQ 連線工廠並將其提供給 CachingConnectionFactory。RabbitConnectionFactoryBean 建立的 RabbitMQ ConnectionFactory 例項也預設停用此選項。 |
RabbitConnectionFactoryBean 和配置 SSL
從版本 1.4 開始,提供了一個方便的 RabbitConnectionFactoryBean,可以透過依賴注入方便地配置底層客戶端連線工廠上的 SSL 屬性。其他設定器委託給底層工廠。以前,您必須以程式設計方式配置 SSL 選項。以下示例顯示如何配置 RabbitConnectionFactoryBean
-
Java
-
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot 應用程式檔案(.yaml 或 .properties)
-
屬性
-
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: true
有關配置 SSL 的資訊,請參見 RabbitMQ 文件。省略 keyStore 和 trustStore 配置可在不驗證證書的情況下透過 SSL 連線。下一個示例顯示如何提供金鑰和信任庫配置。
sslPropertiesLocation 屬性是一個 Spring Resource,指向一個包含以下鍵的屬性檔案
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore 和 truststore 是指向儲存的 Spring Resources。通常,此屬性檔案由作業系統保護,應用程式具有讀取訪問許可權。
從 Spring AMQP 1.5 版開始,您可以直接在工廠 bean 上設定這些屬性。如果同時提供了離散屬性和 sslPropertiesLocation,則後者中的屬性會覆蓋離散值。
從版本 2.0 開始,預設情況下會驗證伺服器證書,因為它更安全。如果出於某種原因要跳過此驗證,請將工廠 bean 的 skipServerCertificateValidation 屬性設定為 true。從版本 2.1 開始,RabbitConnectionFactoryBean 現在預設呼叫 enableHostnameVerification()。要恢復到以前的行為,請將 enableHostnameVerification 屬性設定為 false。 |
從版本 2.2.5 開始,工廠 bean 預設將始終使用 TLS v1.2;以前,在某些情況下它使用 v1.1,在其他情況下使用 v1.2(取決於其他屬性)。如果出於某種原因需要使用 v1.1,請設定 sslAlgorithm 屬性:setSslAlgorithm("TLSv1.1")。 |
連線到叢集
要連線到叢集,請在 CachingConnectionFactory 上配置 addresses 屬性
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
從版本 3.0 開始,每當建立新連線時,底層連線工廠都會透過選擇隨機地址來嘗試連線到主機。要恢復到以前的從第一個到最後一個嘗試連線的行為,請將 addressShuffleMode 屬性設定為 AddressShuffleMode.NONE。
從版本 2.3 開始,添加了 INORDER 洗牌模式,這意味著在建立連線後,第一個地址會移到末尾。如果您希望使用 RabbitMQ 分片外掛 與 CacheMode.CONNECTION 和適當的併發性從所有節點的所有分片進行消費,則可能希望使用此模式。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由連線工廠
從版本 1.3 開始,引入了 AbstractRoutingConnectionFactory。該工廠提供了一種機制,用於配置多個 ConnectionFactories 的對映,並根據執行時的一些 lookupKey 確定目標 ConnectionFactory。通常,實現會檢查執行緒繫結的上下文。為了方便起見,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它從 SimpleResourceHolder 獲取當前執行緒繫結的 lookupKey。以下示例展示瞭如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用後解除繫結資源很重要。有關更多資訊,請參閱 AbstractRoutingConnectionFactory 的 JavaDoc。
從版本 1.4 開始,RabbitTemplate 支援 SpEL sendConnectionFactorySelectorExpression 和 receiveConnectionFactorySelectorExpression 屬性,這些屬性在每個 AMQP 協議互動操作(send、sendAndReceive、receive 或 receiveAndReply)上進行評估,解析為提供的 AbstractRoutingConnectionFactory 的 lookupKey 值。您可以在表示式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)。對於 send 操作,要傳送的訊息是根評估物件。對於 receive 操作,queueName 是根評估物件。
路由演算法如下:如果選擇器表示式為 null 或被評估為 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的例項,則一切照舊,依賴於提供的 ConnectionFactory 實現。如果評估結果不為 null,但該 lookupKey 沒有目標 ConnectionFactory,並且 AbstractRoutingConnectionFactory 配置為 lenientFallback = true,也會發生同樣的情況。在 AbstractRoutingConnectionFactory 的情況下,它會根據 determineCurrentLookupKey() 回退到其 routing 實現。但是,如果 lenientFallback = false,則會丟擲 IllegalStateException。
名稱空間支援還為 <rabbit:template> 元件提供了 send-connection-factory-selector-expression 和 receive-connection-factory-selector-expression 屬性。
此外,從版本 1.4 開始,您可以在監聽器容器中配置路由連線工廠。在這種情況下,佇列名稱列表用作查詢鍵。例如,如果使用 setQueueNames("thing1", "thing2") 配置容器,則查詢鍵為 [thing1,thing]"(請注意鍵中沒有空格)。
從版本 1.6.9 開始,您可以使用監聽器容器上的 setLookupKeyQualifier 為查詢鍵新增限定符。這樣做可以實現,例如,偵聽同名但在不同虛擬主機中的佇列(在這種情況下,您將為每個佇列擁有一個連線工廠)。
例如,如果查詢鍵限定符為 thing1,並且容器正在偵聽佇列 thing2,則您可以註冊目標連線工廠的查詢鍵可以是 thing1[thing2]。
| 目標(以及提供的預設)連線工廠必須具有相同的釋出者確認和返回設定。請參閱 釋出者確認和返回。 |
從版本 2.4.4 開始,可以停用此驗證。如果您在某些情況下需要確認和返回的值不相等,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 關閉驗證。請注意,新增到 AbstractRoutingConnectionFactory 的第一個連線工廠將確定 confirms 和 returns 的一般值。
如果您遇到某些訊息需要檢查確認/返回而其他訊息不需要的情況,這可能會很有用。例如
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
這樣,帶有 x-use-publisher-confirms: true 頭部的訊息將透過快取連線傳送,您可以確保訊息傳遞。有關確保訊息傳遞的更多資訊,請參閱 釋出者確認和返回。
佇列親和性和 LocalizedQueueConnectionFactory
在叢集中使用 HA 佇列時,為了獲得最佳效能,您可能希望連線到主佇列所在的物理 Broker。CachingConnectionFactory 可以配置多個 Broker 地址。這是為了故障轉移,客戶端根據配置的 AddressShuffleMode 順序嘗試連線。LocalizedQueueConnectionFactory 使用管理外掛提供的 REST API 來確定哪個節點是佇列的主節點。然後,它建立一個(或從快取中檢索)CachingConnectionFactory,該工廠僅連線到該節點。如果連線失敗,則確定新的主節點,並且消費者連線到該節點。LocalizedQueueConnectionFactory 配置了一個預設連線工廠,以防無法確定佇列的物理位置,在這種情況下,它會像正常一樣連線到叢集。
LocalizedQueueConnectionFactory 是一個 RoutingConnectionFactory,SimpleMessageListenerContainer 使用佇列名稱作為查詢鍵,如上文 路由連線工廠 中所述。
因此(使用佇列名稱進行查詢),LocalizedQueueConnectionFactory 只能在容器配置為偵聽單個佇列時使用。 |
| 每個節點都必須啟用 RabbitMQ 管理外掛。 |
此連線工廠用於長期連線,例如 SimpleMessageListenerContainer 使用的連線。它不適用於短期連線,例如與 RabbitTemplate 一起使用,因為在建立連線之前呼叫 REST API 會產生開銷。此外,對於釋出操作,佇列是未知的,並且訊息無論如何都會發布到所有叢集成員,因此查詢節點的邏輯價值不大。 |
以下配置示例顯示瞭如何配置工廠
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
請注意,前三個引數是 addresses、adminUris 和 nodes 的陣列。這些是位置性的,即當容器嘗試連線到佇列時,它使用管理 API 來確定哪個節點是佇列的主節點,並連線到與該節點在同一陣列位置的地址。
從版本 3.0 開始,RabbitMQ http-client 不再用於訪問 Rest API。相反,預設情況下,如果 spring-webflux 在類路徑上,則使用 Spring Webflux 中的 WebClient;否則使用 RestTemplate。 |
要將 WebFlux 新增到類路徑
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您還可以透過實現 LocalizedQueueConnectionFactory#setNodeLocator 並重寫其 createClient、restCall 和(可選)close 方法來使用其他 REST 技術。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
});
該框架提供了 WebFluxNodeLocator 和 RestTemplateNodeLocator,預設值如上所述。
釋出者確認和返回
透過將 CachingConnectionFactory 屬性 publisherConfirmType 設定為 ConfirmType.CORRELATED 並將 publisherReturns 屬性設定為“true”,支援已確認(帶關聯)和已返回的訊息。
當設定了這些選項後,工廠建立的 Channel 例項將包裝在 PublisherCallbackChannel 中,用於促進回撥。當獲得這樣的通道時,客戶端可以向 Channel 註冊一個 PublisherCallbackChannel.Listener。PublisherCallbackChannel 實現包含將確認或返回路由到相應監聽器的邏輯。這些功能將在以下部分中進一步解釋。
另請參閱 關聯的釋出者確認和返回 和 作用域操作 中的 simplePublisherConfirms。
| 有關更多背景資訊,請參閱 RabbitMQ 團隊的部落格文章 Introducing Publisher Confirms。 |
連線和通道監聽器
連線工廠支援註冊 ConnectionListener 和 ChannelListener 實現。這允許您接收連線和通道相關事件的通知。(RabbitAdmin 使用 ConnectionListener 在建立連線時執行宣告——有關更多資訊,請參見 交換機、佇列和繫結的自動宣告)。以下列表顯示了 ConnectionListener 介面定義
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection 物件可以提供 com.rabbitmq.client.BlockedListener 例項,以便在連線阻塞和解除阻塞事件時收到通知。以下示例顯示了 ChannelListener 介面定義
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
有關您可能希望註冊 ChannelListener 的一種場景,請參見 釋出是非同步的——如何檢測成功和失敗。
日誌通道關閉事件
版本 1.5 引入了一種機制,允許使用者控制日誌級別。
AbstractConnectionFactory 使用預設策略記錄通道關閉,如下所示
-
正常的通道關閉(200 OK)不會被記錄。
-
如果通道因被動佇列宣告失敗而關閉,則以 DEBUG 級別記錄。
-
如果通道因排他消費者條件導致
basic.consume被拒絕而關閉,則以 DEBUG 級別記錄(從 3.1 開始,以前是 INFO)。 -
所有其他都以 ERROR 級別記錄。
要修改此行為,您可以將自定義 ConditionalExceptionLogger 注入 CachingConnectionFactory 的 closeExceptionLogger 屬性中。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger 現在是公共的,允許其子類化。
另請參閱 消費者事件。
執行時快取屬性
從版本 1.6 開始,CachingConnectionFactory 現在透過 getCacheProperties() 方法提供快取統計資訊。這些統計資訊可用於調整快取在生產中的最佳化。例如,高水位線可用於確定是否應增加快取大小。如果它等於快取大小,您可能需要考慮進一步增加。下表描述了 CacheMode.CHANNEL 屬性
| 財產 | 含義 |
|---|---|
connectionName |
由 |
channelCacheSize |
當前配置的允許空閒的最大通道數。 |
localPort |
連線的本地埠(如果可用)。這可用於與 RabbitMQ 管理 UI 上的連線和通道關聯。 |
idleChannelsTx |
當前空閒(快取)的事務性通道數。 |
idleChannelsNotTx |
當前空閒(快取)的非事務性通道數。 |
idleChannelsTxHighWater |
同時空閒(快取)的事務性通道的最大數量。 |
idleChannelsNotTxHighWater |
同時空閒(快取)的非事務性通道的最大數量。 |
下表描述了 CacheMode.CONNECTION 屬性
| 財產 | 含義 |
|---|---|
connectionName:<localPort> |
由 |
openConnections |
表示與 Broker 連線的連線物件數。 |
channelCacheSize |
當前配置的允許空閒的最大通道數。 |
connectionCacheSize |
當前配置的允許空閒的最大連線數。 |
idleConnections |
當前空閒的連線數。 |
idleConnectionsHighWater |
同時空閒的最大連線數。 |
idleChannelsTx:<localPort> |
此連線當前空閒(快取)的事務性通道數。您可以使用屬性名稱的 |
idleChannelsNotTx:<localPort> |
此連線當前空閒(快取)的非事務性通道數。屬性名稱的 |
idleChannelsTxHighWater:<localPort> |
同時空閒(快取)的事務性通道的最大數量。屬性名稱的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和通道相關聯。 |
idleChannelsNotTxHighWater:<localPort> |
同時空閒(快取)的非事務性通道的最大數量。您可以使用屬性名稱的 |
還包括 cacheMode 屬性(CHANNEL 或 CONNECTION)。
RabbitMQ 自動連線/拓撲恢復
自 Spring AMQP 的第一個版本以來,該框架在 Broker 發生故障時提供了自己的連線和通道恢復。此外,如 配置 Broker 中所述,當連線重新建立時,RabbitAdmin 會重新宣告任何基礎設施 bean(佇列等)。因此,它不依賴於 amqp-client 庫現在提供的 自動恢復。amqp-client 預設啟用自動恢復。兩種恢復機制之間存在一些不相容性,因此,Spring 預設將底層 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 屬性設定為 false。即使該屬性為 true,Spring 也會透過立即關閉任何已恢復的連線來有效停用它。
| 預設情況下,只有被定義為 bean 的元素(佇列、交換器、繫結)會在連線失敗後重新宣告。有關如何更改該行為的資訊,請參閱 恢復自動刪除宣告。 |