Kafka 佇列(共享消費者)
從 4.0 版本開始,Spring for Apache Kafka 透過共享消費者提供了對 Kafka 佇列的支援,共享消費者是 Apache Kafka 4.0.0 的一部分,並實現了 KIP-932(Kafka 佇列)。此功能目前處於早期訪問階段。
與傳統消費者組相比,Kafka 佇列啟用了不同的消費模型。傳統的基於分割槽的分配模型中,每個分割槽都獨佔分配給一個消費者,而共享消費者可以協作消費相同的分割槽,記錄會在共享組的消費者之間進行分配。
共享消費者工廠
ShareConsumerFactory 負責建立共享消費者例項。Spring Kafka 提供了 DefaultShareConsumerFactory 實現。
配置
您可以像配置常規 ConsumerFactory 一樣配置 DefaultShareConsumerFactory。
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
建構函式選項
DefaultShareConsumerFactory 提供了多種建構函式選項
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
反序列化器配置
您可以透過多種方式配置反序列化器
-
透過配置屬性(推薦用於簡單情況)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -
透過 Setters:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer()); -
透過 Suppliers(用於需要為每個消費者建立反序列化器的情況)
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
如果您的反序列化器已完全配置且不應由工廠重新配置,請將 configureDeserializers 設定為 false。
生命週期監聽器
您可以新增監聽器來監控共享消費者的生命週期
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
共享訊息監聽器容器
ShareKafkaMessageListenerContainer
ShareKafkaMessageListenerContainer 為共享消費者提供了一個容器,支援併發處理
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
容器屬性
共享容器支援常規消費者可用的容器屬性的子集
-
topics: 要訂閱的主題名稱陣列 -
groupId: 共享組 ID -
clientId: 消費者的客戶端 ID -
kafkaConsumerProperties: 額外的消費者屬性
|
共享消費者不支援
|
併發性
ShareKafkaMessageListenerContainer 透過在單個容器中建立多個消費者執行緒來支援併發處理。每個執行緒都執行自己的 ShareConsumer 例項,該例項參與同一個共享組。
與傳統消費者組中併發涉及分割槽分配不同,共享消費者利用 Kafka 在代理上的記錄級別分發。這意味著同一容器中的多個消費者執行緒作為共享組的一部分協同工作,Kafka 代理將記錄分發到所有消費者例項。
|
併發性在應用程式例項之間是累加的 從共享組的角度來看,每個 例如: * 應用程式例項 1: 這意味著在單個容器中設定 |
以程式設計方式配置併發性
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
透過工廠配置併發性
您可以在工廠級別設定預設併發性,該設定適用於該工廠建立的所有容器
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
每個監聽器的併發性
可以使用 concurrency 屬性覆蓋每個監聽器的併發設定
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
併發性注意事項
-
執行緒安全:每個消費者執行緒都有自己的
ShareConsumer例項並獨立管理其自身的確認 -
客戶端 ID:每個消費者執行緒都會收到一個帶有數字字尾的唯一客戶端 ID(例如,
myContainer-0、myContainer-1等) -
指標:來自所有消費者執行緒的指標會被聚合並透過
container.metrics()訪問 -
生命週期:所有消費者執行緒作為一個單元同時啟動和停止
-
工作分配:Kafka 代理處理共享組中所有消費者例項的記錄分發
-
顯式確認:每個執行緒獨立管理其記錄的確認;一個執行緒中未確認的記錄不會阻塞其他執行緒
顯式確認下的併發性
併發性與顯式確認模式無縫協作。每個消費者執行緒獨立跟蹤並確認其自己的記錄
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
|
記錄獲取和分發行為 共享消費者使用拉取模型,其中每個消費者執行緒呼叫
當記錄被一個消費者獲取時,其他消費者無法訪問它們。當獲取鎖到期時,未確認的記錄會自動返回“可用”狀態,並可以交付給另一個消費者。 代理使用 對併發性的影響
配置
|
註解驅動的監聽器
帶有共享消費者的 @KafkaListener
您可以透過配置 ShareKafkaListenerContainerFactory 來使用帶有共享消費者的 @KafkaListener
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
然後在您的監聽器中使用它
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
共享組偏移量重置
與常規消費者組不同,共享組使用不同的配置來控制偏移量重置行為。您可以透過程式設計方式配置此項
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
記錄確認
共享消費者支援兩種確認模式,控制記錄在處理後如何確認。
隱式確認(預設)
在隱式模式下,記錄根據處理結果自動確認
成功處理:記錄被確認為 ACCEPT 處理錯誤:記錄被確認為 REJECT
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
顯式確認
在顯式模式下,應用程式必須使用提供的 ShareAcknowledgment 手動確認每個記錄。
有兩種方式配置顯式確認模式
選項 1:使用 Kafka 客戶端配置
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
選項 2:使用 Spring 容器配置
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}
確認型別
共享消費者支援三種確認型別
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry
ShareAcknowledgment API
ShareAcknowledgment 介面提供了顯式確認的方法
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}
監聽器介面
共享消費者支援針對不同用例的專用監聽器介面
基本訊息監聽器
對於簡單情況,使用標準 MessageListener
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
AcknowledgingShareConsumerAwareMessageListener
此介面提供對 ShareConsumer 例項的訪問,並支援可選的確認。確認引數可為空,具體取決於容器的確認模式
隱式模式示例(確認為空)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
顯式模式示例(確認非空)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
確認約束
在顯式確認模式下,容器強制執行重要的約束
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. One-time Acknowledgment: Each record can only be acknowledged once. Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
| 在顯式模式下,如果未能確認記錄,將阻塞後續的訊息處理。請務必確保在所有程式碼路徑中都確認記錄。 |
確認超時檢測
為了幫助識別缺失的確認,Spring Kafka 提供了可配置的超時檢測。當記錄在指定超時時間內未被確認時,會記錄一條警告,其中包含未確認記錄的詳細資訊。
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
當記錄超過超時時間時,您會看到如下警告
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record.
此功能有助於開發人員快速識別程式碼中缺少確認呼叫的情況,從而防止因忘記確認而導致的常見問題“Spring Kafka 不再消費新記錄”。
確認示例
混合確認模式
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
條件確認
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
毒藥訊息保護和遞送計數
KIP-932 包含了代理端的毒藥訊息保護,以防止無法處理的記錄被無休止地重新遞送。
工作原理
每當共享組中的消費者獲取記錄時,代理都會遞增內部遞送計數。第一次獲取將遞送計數設定為 1,每次後續獲取都會遞增。當遞送計數達到配置的限制(預設:5)時,記錄會進入已歸檔狀態,並且不再有資格進行額外的遞送嘗試。
配置
最大遞送嘗試次數可以透過 Admin API 為每個共享組配置
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
// Default is 5, adjust based on your retry tolerance
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
|
遞送計數不暴露給應用程式 遞送計數由代理內部維護,並且不暴露給消費者應用程式。這是 KIP-932 中的有意設計決策。遞送計數是近似值,用作毒藥訊息保護機制,而不是精確的重遞送計數器。應用程式無法透過任何 API 查詢或訪問此值。 對於應用程式級別的重試邏輯,請使用確認型別
一旦達到 |
重試策略建議
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.attempt.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
代理的毒藥訊息保護確保即使您始終對錯誤使用 RELEASE,記錄也不會無休止地重試。它們在達到遞送嘗試限制後會自動歸檔。