執行緒安全
當使用併發訊息監聽器容器時,所有消費者執行緒都會呼叫單個監聽器例項。因此,監聽器需要是執行緒安全的,並且最好使用無狀態監聽器。如果無法使您的監聽器執行緒安全,或者新增同步會顯著降低併發帶來的好處,您可以使用以下幾種技術之一:
-
使用
n個容器,concurrency=1,並使用原型作用域的MessageListenerbean,以便每個容器都有自己的例項(在使用@KafkaListener時無法實現)。 -
將狀態儲存在
ThreadLocal<?>例項中。 -
讓單例監聽器委託給在
SimpleThreadScope(或類似作用域)中宣告的 bean。
為了方便清理執行緒狀態(針對上述列表中的第二項和第三項),從版本 2.2 開始,監聽器容器在每個執行緒退出時釋出 ConsumerStoppedEvent。您可以使用 ApplicationListener 或 @EventListener 方法來消費這些事件,以移除 ThreadLocal<?> 例項或從作用域中 remove() 執行緒作用域的 bean。請注意,SimpleThreadScope 不會銷燬具有銷燬介面(如 DisposableBean)的 bean,因此您應該自行 destroy() 例項。
| 預設情況下,應用程式上下文的事件多播器在呼叫執行緒上呼叫事件監聽器。如果您將多播器更改為使用非同步執行器,執行緒清理將無效。 |
關於虛擬執行緒和併發訊息監聽器容器的特別說明
由於底層庫類仍然使用 synchronized 塊進行執行緒協調的某些限制,應用程式在使用虛擬執行緒與併發訊息監聽器容器時需要謹慎。當啟用虛擬執行緒時,如果併發數超過可用的平臺執行緒數,虛擬執行緒很可能會被固定在平臺執行緒上,並可能出現競態條件。因此,隨著 Spring for Apache Kafka 所使用的第三方庫發展以完全支援虛擬執行緒,建議將訊息監聽器容器的併發數保持等於或小於平臺執行緒數。這樣,應用程式可以避免執行緒之間以及虛擬執行緒被固定在平臺執行緒上的任何競態條件。