連線與資源管理
雖然我們在上一節中描述的 AMQP 模型是通用的,適用於所有實現,但當我們談到資源管理時,細節就取決於 Broker 的具體實現。因此,在本節中,我們專注於只存在於我們的“spring-rabbit”模組中的程式碼,因為目前只支援 RabbitMQ 實現。
用於管理與 RabbitMQ Broker 連線的核心元件是 ConnectionFactory
介面。ConnectionFactory
實現的職責是提供 org.springframework.amqp.rabbit.connection.Connection
的例項,它是 com.rabbitmq.client.Connection
的包裝類。
選擇連線工廠
有三種連線工廠可供選擇
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前兩種在版本 2.3 中新增。
對於大多數用例,應使用 CachingConnectionFactory
。如果您想確保嚴格的訊息順序而無需使用 Scoped Operations,則可以使用 ThreadChannelConnectionFactory
。PooledChannelConnectionFactory
與 CachingConnectionFactory
類似,因為它使用單個連線和通道池。它的實現更簡單,但不支援關聯的釋出者確認。
所有這三種工廠都支援簡單的釋出者確認。
配置 RabbitTemplate
使用 單獨連線 時,從版本 2.3.2 開始,您可以配置釋出連線工廠為不同的型別。預設情況下,釋出工廠與主工廠型別相同,並且設定在主工廠上的任何屬性也會傳播到釋出工廠。
從版本 3.1 開始,AbstractConnectionFactory
包含 connectionCreatingBackOff
屬性,該屬性在連線模組中支援指數退避策略。目前,createChannel()
方法的行為支援處理達到 channelMax
限制時發生的異常,實現了基於嘗試次數和間隔的退避策略。
PooledChannelConnectionFactory
該工廠管理單個連線和兩個通道池,基於 Apache Pool2。一個池用於事務性通道,另一個用於非事務性通道。這些池是配置預設為 GenericObjectPool
的池;提供回撥以配置這些池;有關更多資訊,請參閱 Apache 文件。
要使用此工廠,必須將 Apache commons-pool2
jar 放在類路徑上。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
該工廠管理單個連線和兩個 ThreadLocal
,一個用於事務性通道,另一個用於非事務性通道。此工廠確保同一執行緒上的所有操作都使用同一通道(只要它保持開啟狀態)。這有助於實現嚴格的訊息順序,而無需 Scoped Operations。為避免記憶體洩漏,如果您的應用程式使用許多生命週期短暫的執行緒,則必須呼叫工廠的 closeThreadChannel()
方法釋放通道資源。從版本 2.3.7 開始,執行緒可以將其通道轉移到另一個執行緒。有關更多資訊,請參閱 多執行緒環境下的嚴格訊息順序。
CachingConnectionFactory
提供的第三種實現是 CachingConnectionFactory
,它預設建立一個可供應用程式共享的單個連線代理。由於 AMQP 訊息傳遞的“工作單元”實際上是一個“通道”(在某些方面,這類似於 JMS 中連線和會話之間的關係),因此連線共享成為可能。連線例項提供了一個 createChannel
方法。CachingConnectionFactory
實現支援快取這些通道,並根據它們是否具有事務性來維護獨立的通道快取。建立 CachingConnectionFactory
例項時,您可以透過建構函式提供“hostname”。您還應該提供“username”和“password”屬性。要配置通道快取的大小(預設為 25),您可以呼叫 setChannelCacheSize()
方法。
從版本 1.3 開始,您可以配置 CachingConnectionFactory
同時快取連線和通道。在這種情況下,每次呼叫 createConnection()
都會建立一個新連線(或從快取中檢索一個空閒連線)。關閉連線會將其返回到快取(如果尚未達到快取大小)。在此類連線上建立的通道也會被快取。在某些環境中,例如從 HA 叢集消費時,結合負載均衡器連線到不同的叢集成員,以及其他情況下,使用單獨的連線可能很有用。要快取連線,請將 cacheMode
設定為 CacheMode.CONNECTION
。
這不會限制連線的數量。相反,它指定允許快取多少個空閒的開啟連線。 |
從版本 1.5.5 開始,提供了一個名為 connectionLimit
的新屬性。設定此屬性後,它會限制允許的總連線數。設定後,如果達到限制,將使用 channelCheckoutTimeLimit
等待連線變為空閒。如果超出時間,將丟擲 AmqpTimeoutException
。
當快取模式為 此外,在撰寫本文時, |
重要的是要理解,快取大小(預設情況下)不是限制,而只是可以快取的通道數量。例如,快取大小為 10,實際上可以使用任意數量的通道。如果使用了超過 10 個通道並且它們都返回到快取,則 10 個進入快取。其餘的將被物理關閉。
從版本 1.6 開始,預設通道快取大小從 1 增加到 25。在高流量、多執行緒環境中,小快取意味著通道建立和關閉的速度很快。增加預設快取大小可以避免這種開銷。您應該透過 RabbitMQ 管理 UI 監控正在使用的通道,如果您看到大量通道被建立和關閉,請考慮進一步增加快取大小。快取僅在按需增長(以適應應用程式的併發要求),因此此更改不會影響現有的低流量應用程式。
從版本 1.4.2 開始,CachingConnectionFactory
有一個名為 channelCheckoutTimeout
的屬性。當此屬性大於零時,channelCacheSize
成為可以在連線上建立的通道數量的限制。如果達到限制,呼叫執行緒將阻塞,直到通道可用或達到此超時時間,此時將丟擲 AmqpTimeoutException
。
框架內使用的通道(例如,RabbitTemplate )可靠地返回到快取。如果您在框架外部建立通道(例如,透過直接訪問連線並呼叫 createChannel() ),則必須可靠地(也許在 finally 塊中)關閉它們,以避免通道耗盡。 |
以下示例展示瞭如何建立新連線
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 時,配置可能如下例所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
框架的單元測試程式碼中還有一個 SingleConnectionFactory 實現。它比 CachingConnectionFactory 更簡單,因為它不快取通道,但由於其效能和彈性不足,不適合在簡單測試之外的實際應用中使用。如果您因某些原因需要實現自己的 ConnectionFactory ,AbstractConnectionFactory 基類可能是一個不錯的起點。 |
可以使用 rabbit 名稱空間快速方便地建立 ConnectionFactory
,如下所示
<rabbit:connection-factory id="connectionFactory"/>
在大多數情況下,這種方法是首選,因為框架可以為您選擇最佳預設值。建立的例項是 CachingConnectionFactory
。請記住,通道的預設快取大小是 25。如果您希望快取更多通道,請透過設定 'channelCacheSize' 屬性設定一個更大的值。在 XML 中,它將如下所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用名稱空間,您可以新增 'channel-cache-size' 屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
預設快取模式是 CHANNEL
,但您可以將其配置為快取連線。在以下示例中,我們使用 connection-cache-size
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用名稱空間提供 host 和 port 屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在叢集環境中執行,可以使用 addresses 屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有關 address-shuffle-mode
的資訊,請參閱 連線到叢集。
以下示例使用一個自定義執行緒工廠,將執行緒名稱加上 rabbitmq-
字首
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
連線命名
從版本 1.7 開始,提供了 ConnectionNameStrategy
,用於注入到 AbstractionConnectionFactory
中。生成的名稱用於目標 RabbitMQ 連線的應用程式特定標識。如果 RabbitMQ 伺服器支援,連線名稱將顯示在管理 UI 中。此值不必是唯一的,並且不能用作連線識別符號(例如,在 HTTP API 請求中)。此值應該是人類可讀的,並且是 ClientProperties
下的 connection_name
鍵的一部分。您可以使用簡單的 Lambda,如下所示
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory
引數可用於透過某些邏輯區分目標連線名稱。預設情況下,使用 AbstractConnectionFactory
的 beanName
、表示物件的十六進位制字串以及內部計數器來生成 connection_name
。<rabbit:connection-factory>
名稱空間元件也提供了 connection-name-strategy
屬性。
SimplePropertyValueConnectionNameStrategy
的實現將連線名稱設定為應用程式屬性。您可以將其宣告為 @Bean
並將其注入到連線工廠中,如下例所示
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
該屬性必須存在於應用程式上下文的 Environment
中。
使用 Spring Boot 及其自動配置的連線工廠時,您只需宣告 ConnectionNameStrategy 的 @Bean 。Boot 會自動檢測該 Bean 並將其注入到工廠中。 |
阻塞的連線和資源約束
連線可能因 Broker 的 記憶體警報 而被阻塞。從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection
可以提供 com.rabbitmq.client.BlockedListener
例項,以接收連線阻塞和取消阻塞事件的通知。此外,AbstractConnectionFactory
透過其內部的 BlockedListener
實現,分別發出 ConnectionBlockedEvent
和 ConnectionUnblockedEvent
。這些事件使您能夠提供應用程式邏輯,以適當響應 Broker 上的問題並(例如)採取一些糾正措施。
當應用程式配置了單個 CachingConnectionFactory 時(Spring Boot 自動配置預設如此),當連線被 Broker 阻塞時,應用程式將停止工作。而當連線被 Broker 阻塞時,它的任何客戶端都會停止工作。如果我們在同一個應用程式中有生產者和消費者,當生產者阻塞連線(因為 Broker 上沒有資源了)並且消費者無法釋放它們(因為連線被阻塞)時,我們可能會遇到死鎖。為了緩解這個問題,我們建議使用另一個具有相同選項的獨立 CachingConnectionFactory 例項——一個用於生產者,一個用於消費者。對於在消費者執行緒上執行的事務性生產者來說,使用單獨的 CachingConnectionFactory 是不可能的,因為它們應該重用與消費者事務關聯的 Channel 。 |
從版本 2.0.2 開始,RabbitTemplate
有一個配置選項,可以在不使用事務的情況下自動使用第二個連線工廠。有關更多資訊,請參閱 使用單獨的連線。釋出者連線的 ConnectionNameStrategy
與主策略相同,並在呼叫方法的結果後附加 .publisher
。
從版本 1.7.7 開始,提供了 AmqpResourceNotAvailableException
,當 SimpleConnection.createChannel()
無法建立 Channel
時會丟擲此異常(例如,因為達到 channelMax
限制且快取中沒有可用通道)。您可以在 RetryPolicy
中使用此異常,在退避後恢復操作。
配置底層客戶端連線工廠
CachingConnectionFactory
使用 Rabbit 客戶端的 ConnectionFactory
例項。透過在 CachingConnectionFactory
上設定等效屬性時,許多配置屬性(例如 host
、port
、userName
、password
、requestedHeartBeat
和 connectionTimeout
)會透傳下去。要設定其他屬性(例如 clientProperties
),您可以定義一個 Rabbit 工廠例項,並透過使用 CachingConnectionFactory
的適當建構函式提供對其的引用。使用名稱空間(如前所述)時,需要在 connection-factory
屬性中提供對已配置工廠的引用。為了方便起見,提供了一個工廠 Bean,以幫助在 Spring 應用程式上下文中配置連線工廠,如 下一節 中所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客戶端預設啟用自動恢復。儘管與此功能相容,Spring AMQP 有其自己的恢復機制,並且客戶端恢復功能通常不需要。我們建議停用 amqp-client 自動恢復,以避免在 Broker 可用但連線尚未恢復時出現 AutoRecoverConnectionNotCurrentlyOpenException 例項。您可能會注意到此異常,例如,當在 RabbitTemplate 中配置了 RetryTemplate 時,即使故障轉移到叢集中的另一個 Broker 時也是如此。由於自動恢復連線會按計時器恢復,使用 Spring AMQP 的恢復機制可能會更快地恢復連線。從版本 1.7.1 開始,除非您明確建立自己的 RabbitMQ 連線工廠並將其提供給 CachingConnectionFactory ,否則 Spring AMQP 將停用 amqp-client 自動恢復。由 RabbitConnectionFactoryBean 建立的 RabbitMQ ConnectionFactory 例項也預設停用此選項。 |
RabbitConnectionFactoryBean
和配置 SSL
從版本 1.4 開始,提供了一個方便的 RabbitConnectionFactoryBean
,透過依賴注入方便地配置底層客戶端連線工廠的 SSL 屬性。其他 setter 方法委託給底層工廠。以前,您必須以程式設計方式配置 SSL 選項。以下示例展示瞭如何配置 RabbitConnectionFactoryBean
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有關配置 SSL 的資訊,請參閱 RabbitMQ 文件。省略 keyStore
和 trustStore
配置以透過 SSL 連線而無需證書驗證。下一個示例展示瞭如何提供金鑰庫和信任庫配置。
sslPropertiesLocation
屬性是一個指向包含以下鍵的屬性檔案的 Spring Resource
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore
和 truststore
是指向儲存檔案的 Spring Resource
。通常,此屬性檔案由作業系統保護,應用程式具有讀取許可權。
從 Spring AMQP 1.5 版本開始,您可以直接在工廠 Bean 上設定這些屬性。如果同時提供了離散屬性和 sslPropertiesLocation
,則後者的屬性會覆蓋離散值。
從版本 2.0 開始,伺服器證書預設經過驗證,因為它更安全。如果您出於某種原因希望跳過此驗證,請將工廠 Bean 的 skipServerCertificateValidation 屬性設定為 true 。從版本 2.1 開始,RabbitConnectionFactoryBean 現在預設呼叫 enableHostnameVerification() 。要恢復到以前的行為,請將 enableHostnameVerification 屬性設定為 false 。 |
從版本 2.2.5 開始,工廠 Bean 預設將始終使用 TLS v1.2;之前,在某些情況下使用 v1.1,在其他情況下使用 v1.2(取決於其他屬性)。如果您出於某種原因需要使用 v1.1,請設定 sslAlgorithm 屬性:setSslAlgorithm("TLSv1.1") 。 |
連線到叢集
要連線到叢集,請在 CachingConnectionFactory
上配置 addresses
屬性
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
從版本 3.0 開始,底層連線工廠將在建立新連線時嘗試連線到隨機選擇的地址。要恢復到從第一個到最後一個的先前行為,請將 addressShuffleMode
屬性設定為 AddressShuffleMode.NONE
。
從版本 2.3 開始,添加了 INORDER
混洗模式,這意味著連線建立後第一個地址會移到末尾。如果您希望在所有節點上消費所有分片,並且使用 CacheMode.CONNECTION
和適當的併發性,則可以考慮將此模式與 RabbitMQ Sharding Plugin 一起使用。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由連線工廠
從版本 1.3 開始,引入了 AbstractRoutingConnectionFactory
。此工廠提供了一種機制,可以配置多個 ConnectionFactories
的對映,並在執行時透過某個 lookupKey
確定目標 ConnectionFactory
。通常,實現會檢查執行緒繫結的上下文。為了方便起見,Spring AMQP 提供了 SimpleRoutingConnectionFactory
,它從 SimpleResourceHolder
獲取當前執行緒繫結的 lookupKey
。以下示例展示瞭如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用後解綁資源非常重要。有關更多資訊,請參閱 AbstractRoutingConnectionFactory
的 JavaDoc。
從版本 1.4 開始,RabbitTemplate
支援 SpEL 表示式屬性 sendConnectionFactorySelectorExpression
和 receiveConnectionFactorySelectorExpression
,它們在每次 AMQP 協議互動操作(send
、sendAndReceive
、receive
或 receiveAndReply
)時進行評估,解析為提供的 AbstractRoutingConnectionFactory
的 lookupKey
值。您可以在表示式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)
。對於 send
操作,要傳送的訊息是根評估物件。對於 receive
操作,queueName
是根評估物件。
路由演算法如下:如果選擇器表示式為 null
或評估結果為 null
,或者提供的 ConnectionFactory
不是 AbstractRoutingConnectionFactory
的例項,則一切如常,依賴於提供的 ConnectionFactory
實現。如果評估結果不為 null
,但對於該 lookupKey
沒有目標 ConnectionFactory
並且 AbstractRoutingConnectionFactory
配置為 lenientFallback = true
,也會發生同樣的情況。對於 AbstractRoutingConnectionFactory
,它確實會根據 determineCurrentLookupKey()
回退到其路由實現。但是,如果 lenientFallback = false
,則會丟擲 IllegalStateException
。
名稱空間支援還在 <rabbit:template>
元件上提供了 send-connection-factory-selector-expression
和 receive-connection-factory-selector-expression
屬性。
此外,從版本 1.4 開始,您可以在監聽器容器中配置路由連線工廠。在這種情況下,佇列名稱列表將用作查詢鍵。例如,如果您將容器配置為監聽 setQueueNames("thing1", "thing2")
,則查詢鍵將是 [thing1,thing]
(注意鍵中沒有空格)。
從版本 1.6.9 開始,您可以使用監聽器容器上的 setLookupKeyQualifier
方法向查詢鍵新增限定符。這樣做可以實現,例如,監聽名稱相同但位於不同虛擬主機(您將為每個虛擬主機設定一個連線工廠)中的佇列。
例如,使用查詢鍵限定符 thing1
和監聽佇列 thing2
的容器,您可以註冊目標連線工廠的查詢鍵可以是 thing1[thing2]
。
目標連線工廠(如果提供預設連線工廠,也包括預設連線工廠)對於釋出者確認和返回必須具有相同的設定。請參閱 釋出者確認和返回。 |
從版本 2.4.4 開始,可以停用此驗證。如果您遇到確認和返回之間的值需要不相等的情況,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
關閉驗證。請注意,新增到 AbstractRoutingConnectionFactory
的第一個連線工廠將決定 confirms
和 returns
的通用值。
如果您遇到某些訊息需要檢查確認/返回而其他訊息不需要的情況,這可能很有用。例如
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
這樣,帶有頭 x-use-publisher-confirms: true
的訊息將透過快取連線傳送,您可以確保訊息的投遞。有關確保訊息投遞的更多資訊,請參閱 釋出者確認和返回。
佇列親和性與 LocalizedQueueConnectionFactory
在叢集中使用 HA 佇列時,為了獲得最佳效能,您可能希望連線到主佇列所在的物理 Broker。CachingConnectionFactory
可以配置多個 Broker 地址。這是為了實現故障轉移,客戶端會根據配置的 AddressShuffleMode
順序嘗試連線。LocalizedQueueConnectionFactory
使用管理外掛提供的 REST API 來確定哪個節點是該佇列的主節點。然後,它會建立一個連線到該節點的 CachingConnectionFactory
(或從快取中檢索一個)。如果連線失敗,會確定新的主節點,並且消費者會連線到它。LocalizedQueueConnectionFactory
配置了一個預設連線工廠,以防無法確定佇列的物理位置,在這種情況下,它會正常連線到叢集。
LocalizedQueueConnectionFactory
是一個 RoutingConnectionFactory
,SimpleMessageListenerContainer
使用佇列名稱作為查詢鍵,如上文 路由連線工廠 中所述。
因此(由於使用佇列名稱進行查詢),只有當容器配置為監聽單個佇列時,才能使用 LocalizedQueueConnectionFactory 。 |
必須在每個節點上啟用 RabbitMQ 管理外掛。 |
此連線工廠旨在用於長連線,例如 SimpleMessageListenerContainer 使用的連線。由於在建立連線之前呼叫 REST API 的開銷,它不適用於短連線使用,例如與 RabbitTemplate 一起使用。此外,對於釋出操作,佇列是未知的,並且訊息無論如何都會發布到所有叢集成員,因此查詢節點的邏輯價值不大。 |
以下示例配置展示瞭如何配置這些工廠
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
請注意,前三個引數是 addresses
、adminUris
和 nodes
的陣列。它們是位置引數,即當容器嘗試連線到佇列時,它使用管理 API 確定哪個節點是該佇列的主節點,並連線到與該節點在同一陣列位置的地址。
從版本 3.0 開始,不再使用 RabbitMQ 的 http-client 來訪問 Rest API。預設情況下,如果類路徑中存在 spring-webflux ,則使用 Spring Webflux 的 WebClient ;否則,使用 RestTemplate 。 |
將 WebFlux
新增到類路徑
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您還可以透過實現 LocalizedQueueConnectionFactory.NodeLocator
並重寫其 createClient
、restCall
方法,以及可選的 close
方法來使用其他 REST 技術。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
框架提供了 WebFluxNodeLocator
和 RestTemplateNodeLocator
,預設設定如上所述。
Publisher Confirms 和 Returns
透過將 CachingConnectionFactory
屬性 publisherConfirmType
設定為 ConfirmType.CORRELATED
並將 publisherReturns
屬性設定為 'true',可以支援 confirmed (帶關聯) 和 returned 訊息。
設定這些選項後,工廠建立的 Channel
例項將包裝在 PublisherCallbackChannel
中,用於促進回撥。獲取此類 channel 後,客戶端可以向 Channel
註冊一個 PublisherCallbackChannel.Listener
。PublisherCallbackChannel
實現包含將 confirm 或 return 路由到相應監聽器的邏輯。這些功能將在以下章節中進一步解釋。
另請參閱 關聯的 Publisher Confirms 和 Returns,以及 作用域操作 中的 simplePublisherConfirms
。
更多背景資訊,請參閱 RabbitMQ 團隊題為 Introducing Publisher Confirms 的部落格文章。 |
Connection 和 Channel Listeners
連線工廠支援註冊 ConnectionListener
和 ChannelListener
實現。這允許您接收連線和 channel 相關事件的通知。(RabbitAdmin
在建立連線時使用 ConnectionListener
來執行宣告 - 更多資訊請參見 Exchange、Queue 和 Binding 的自動宣告)。以下列表顯示了 ConnectionListener
介面定義
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
從版本 2.0 開始,org.springframework.amqp.rabbit.connection.Connection
物件可以提供 com.rabbitmq.client.BlockedListener
例項,以便在連線被阻塞和解除阻塞時收到通知。以下示例顯示了 ChannelListener 介面定義
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
有關何時可能需要註冊 ChannelListener
的一個場景,請參見 釋出是非同步的 — 如何檢測成功和失敗。
記錄 Channel 關閉事件
版本 1.5 引入了一種機制,允許使用者控制日誌記錄級別。
AbstractConnectionFactory
使用預設策略記錄 channel 關閉,如下所示
-
正常的 channel 關閉 (200 OK) 不會記錄日誌。
-
如果 channel 因被動佇列宣告失敗而關閉,則以 DEBUG 級別記錄日誌。
-
如果 channel 因獨佔消費者條件導致
basic.consume
被拒絕而關閉,則以 DEBUG 級別記錄日誌 (自 3.1 版起,之前是 INFO)。 -
所有其他情況都以 ERROR 級別記錄日誌。
要修改此行為,您可以在 CachingConnectionFactory
的 closeExceptionLogger
屬性中注入自定義的 ConditionalExceptionLogger
。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger
現在是公共的,允許對其進行子類化。
另請參閱 消費者事件。
執行時快取屬性
從版本 1.6 開始,CachingConnectionFactory
現在透過 getCacheProperties()
方法提供快取統計資訊。這些統計資訊可用於調整快取以最佳化生產環境。例如,高水位標記可用於確定是否應增加快取大小。如果它等於快取大小,您可能需要考慮進一步增加。下表描述了 CacheMode.CHANNEL
的屬性
屬性 | 含義 |
---|---|
connectionName |
由 |
channelCacheSize |
當前配置的最大允許空閒 channel 數。 |
localPort |
連線的本地埠(如果可用)。這可用於與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。 |
idleChannelsTx |
當前空閒(快取)的事務性 channel 數。 |
idleChannelsNotTx |
當前空閒(快取)的非事務性 channel 數。 |
idleChannelsTxHighWater |
曾經併發空閒(快取)的最大事務性 channel 數。 |
idleChannelsNotTxHighWater |
曾經併發空閒(快取)的最大非事務性 channel 數。 |
下表描述了 CacheMode.CONNECTION
的屬性
屬性 | 含義 |
---|---|
connectionName:<localPort> |
由 |
openConnections |
代表與 broker 連線的連線物件數。 |
channelCacheSize |
當前配置的最大允許空閒 channel 數。 |
connectionCacheSize |
當前配置的最大允許空閒連線數。 |
idleConnections |
當前空閒的連線數。 |
idleConnectionsHighWater |
曾經併發空閒的最大連線數。 |
idleChannelsTx:<localPort> |
此連線當前空閒(快取)的事務性 channel 數。您可以使用屬性名稱中的 |
idleChannelsNotTx:<localPort> |
此連線當前空閒(快取)的非事務性 channel 數。屬性名稱中的 |
idleChannelsTxHighWater:<localPort> |
曾經併發空閒(快取)的最大事務性 channel 數。屬性名稱中的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和 channel 相關聯。 |
idleChannelsNotTxHighWater:<localPort> |
曾經併發空閒(快取)的最大非事務性 channel 數。您可以使用屬性名稱中的 |
還包括 cacheMode
屬性 (CHANNEL
或 CONNECTION
)。

RabbitMQ 自動連線/拓撲恢復
自 Spring AMQP 的第一個版本以來,該框架在發生 broker 故障時提供了自己的連線和 channel 恢復機制。此外,如 配置 Broker 中所述,RabbitAdmin
在重新建立連線時會重新宣告任何基礎設施 bean (佇列等)。因此,它不依賴於現在由 amqp-client
庫提供的自動恢復。預設情況下,amqp-client
啟用了自動恢復。這兩種恢復機制之間存在一些不相容性,因此預設情況下,Spring 將底層 RabbitMQ connectionFactory
上的 automaticRecoveryEnabled
屬性設定為 false
。即使該屬性為 true
,Spring 也透過立即關閉任何已恢復的連線來有效停用它。
預設情況下,連線失敗後僅會重新宣告定義為 bean 的元素(佇列、交換器、繫結)。有關如何更改此行為,請參閱 恢復自動刪除宣告。 |