執行緒安全

當使用併發訊息監聽器容器時,所有消費者執行緒都會呼叫同一個監聽器例項。因此,監聽器需要是執行緒安全的,並且最好使用無狀態監聽器。如果無法使監聽器執行緒安全,或者新增同步會顯著降低增加併發性的好處,可以使用以下幾種技術之一:

  • 使用 `n` 個 `concurrency=1` 的容器,並結合原型範圍(prototype scoped)的 `MessageListener` bean,這樣每個容器都會有自己的例項(在使用 `@KafkaListener` 時無法做到這一點)。

  • 將狀態儲存在 `ThreadLocal<?>` 例項中。

  • 讓單例監聽器委託給在 `SimpleThreadScope`(或類似範圍)中宣告的 bean。

為了方便清理執行緒狀態(針對前述列表中的第二項和第三項),從 2.2 版本開始,監聽器容器在每個執行緒退出時會發佈一個 `ConsumerStoppedEvent`。您可以使用 `ApplicationListener` 或 `@EventListener` 方法來消費這些事件,以移除 `ThreadLocal<?>` 例項或從範圍中 `remove()` 執行緒範圍的 bean。請注意,`SimpleThreadScope` 不會銷燬具有銷燬介面(如 `DisposableBean`)的 bean,因此您應該自己呼叫 `destroy()` 例項。

預設情況下,應用上下文的事件多播器(event multicaster)在呼叫執行緒上呼叫事件監聽器。如果您更改多播器使用非同步執行器,執行緒清理將無效。

關於虛擬執行緒和併發訊息監聽器容器的特別說明

由於底層庫類線上程協調方面仍然使用 `synchronized` 塊存在某些限制,因此在使用併發訊息監聽器容器與虛擬執行緒時,應用需要謹慎。啟用虛擬執行緒後,如果併發數超過可用的平臺執行緒數,虛擬執行緒很可能被固定在平臺執行緒上,並且可能出現競態條件。因此,隨著 Spring for Apache Kafka 使用的第三方庫逐漸演進以完全支援虛擬執行緒,建議將訊息監聽器容器的併發數保持等於或少於平臺執行緒數。透過這種方式,應用可以避免執行緒之間的競態條件以及虛擬執行緒被固定在平臺執行緒上的情況。