執行緒處理與非同步消費者

非同步消費者涉及多種不同的執行緒。

配置在 SimpleMessageListenerContainer 中的 TaskExecutor 的執行緒用於在 RabbitMQ Client 傳遞新訊息時呼叫 MessageListener。如果未配置,則使用 SimpleAsyncTaskExecutor。如果您使用執行緒池執行器,需要確保執行緒池大小足以處理配置的併發數。對於 DirectMessageListenerContainerMessageListener 會直接在 RabbitMQ Client 執行緒上被呼叫。在這種情況下,taskExecutor 用於監視消費者的任務。

使用預設的 SimpleAsyncTaskExecutor 時,監聽器被呼叫的執行緒的 threadNamePrefix 會使用監聽器容器的 beanName。這對於日誌分析很有用。我們通常建議在日誌 appender 配置中始終包含執行緒名稱。當透過容器上的 taskExecutor 屬性專門提供了 TaskExecutor 時,它將按原樣使用,不做修改。建議您使用類似的技術來命名由自定義 TaskExecutor bean 定義建立的執行緒,以便於在日誌訊息中識別執行緒。

在建立連線時,配置在 CachingConnectionFactory 中的 Executor 會傳遞給 RabbitMQ Client,其執行緒用於將新訊息傳遞給監聽器容器。如果未配置,客戶端會使用內部執行緒池執行器,每個連線的執行緒池大小(在撰寫本文時)為 Runtime.getRuntime().availableProcessors() * 2

如果您有大量工廠或正在使用 CacheMode.CONNECTION,您可能需要考慮使用一個共享的 ThreadPoolTaskExecutor,它應具備足夠的執行緒來滿足您的工作負載。

對於 DirectMessageListenerContainer,您需要確保連線工廠配置的 task executor 具有足夠的執行緒來支援所有使用該工廠的監聽器容器所需的併發數。預設執行緒池大小(在撰寫本文時)為 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ client 使用 ThreadFactory 建立用於低階 I/O(套接字)操作的執行緒。要修改此工廠,您需要配置底層的 RabbitMQ ConnectionFactory,如配置底層客戶端連線工廠中所述。