執行緒和非同步消費者
非同步消費者涉及多個不同的執行緒。
當 RabbitMQ 客戶端傳送新訊息時,SimpleMessageListenerContainer 中配置的 TaskExecutor 的執行緒用於呼叫 MessageListener。如果未配置,則使用 SimpleAsyncTaskExecutor。如果您使用執行緒池執行器,則需要確保執行緒池大小足以處理配置的併發量。對於 DirectMessageListenerContainer,MessageListener 直接在 RabbitMQ 客戶端執行緒上呼叫。在這種情況下,taskExecutor 用於監視消費者的任務。
當使用預設的 SimpleAsyncTaskExecutor 時,用於呼叫監聽器的執行緒,監聽器容器的 beanName 將用作 threadNamePrefix。這對於日誌分析很有用。我們通常建議始終在日誌附加程式配置中包含執行緒名稱。當透過容器的 taskExecutor 屬性明確提供 TaskExecutor 時,它將原樣使用,不進行修改。建議您使用類似的技術來命名由自定義 TaskExecutor bean 定義建立的執行緒,以幫助在日誌訊息中識別執行緒。 |
CachingConnectionFactory 中配置的 Executor 在建立連線時會傳遞給 RabbitMQ 客戶端,其執行緒用於將新訊息傳遞到監聽器容器。如果未配置,客戶端將使用內部執行緒池執行器,每個連線的執行緒池大小(撰寫本文時)為 Runtime.getRuntime().availableProcessors() * 2。
如果您有大量工廠或正在使用 CacheMode.CONNECTION,您可能需要考慮使用共享的 ThreadPoolTaskExecutor,並確保其具有足夠的執行緒來滿足您的工作負載。
對於 DirectMessageListenerContainer,您需要確保連線工廠配置了一個任務執行器,該執行器具有足夠的執行緒來支援所有使用該工廠的監聽器容器所需的併發量。預設執行緒池大小(撰寫本文時)為 Runtime.getRuntime().availableProcessors() * 2。 |
RabbitMQ 客戶端使用 ThreadFactory 為低階 I/O (socket) 操作建立執行緒。要修改此工廠,您需要配置底層的 RabbitMQ ConnectionFactory,如配置底層客戶端連線工廠中所述。