Kafka 佇列(共享消費者)

從 4.0 版本開始,Spring for Apache Kafka 透過共享消費者提供了對 Kafka 佇列的支援,共享消費者是 Apache Kafka 4.0.0 的一部分,並實現了 KIP-932(Kafka 佇列)。此功能目前處於早期訪問階段。

與傳統消費者組相比,Kafka 佇列啟用了不同的消費模型。傳統的基於分割槽的分配模型中,每個分割槽都獨佔分配給一個消費者,而共享消費者可以協作消費相同的分割槽,記錄會在共享組的消費者之間進行分配。

共享消費者工廠

ShareConsumerFactory 負責建立共享消費者例項。Spring Kafka 提供了 DefaultShareConsumerFactory 實現。

配置

您可以像配置常規 ConsumerFactory 一樣配置 DefaultShareConsumerFactory

@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultShareConsumerFactory<>(props);
}

建構函式選項

DefaultShareConsumerFactory 提供了多種建構函式選項

// Basic configuration
new DefaultShareConsumerFactory<>(configs);

// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);

// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);

反序列化器配置

您可以透過多種方式配置反序列化器

  1. 透過配置屬性(推薦用於簡單情況)

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. 透過 Setters:

    factory.setKeyDeserializer(new StringDeserializer());
    factory.setValueDeserializer(new StringDeserializer());
  3. 透過 Suppliers(用於需要為每個消費者建立反序列化器的情況)

    factory.setKeyDeserializerSupplier(() -> new StringDeserializer());
    factory.setValueDeserializerSupplier(() -> new StringDeserializer());

如果您的反序列化器已完全配置且不應由工廠重新配置,請將 configureDeserializers 設定為 false

生命週期監聽器

您可以新增監聽器來監控共享消費者的生命週期

factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
    @Override
    public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
        // Called when a new consumer is created
        System.out.println("Consumer added: " + id);
    }

    @Override
    public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
        // Called when a consumer is closed
        System.out.println("Consumer removed: " + id);
    }
});

共享訊息監聽器容器

ShareKafkaMessageListenerContainer

ShareKafkaMessageListenerContainer 為共享消費者提供了一個容器,支援併發處理

@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received: " + record.value());
        }
    });

    return container;
}

容器屬性

共享容器支援常規消費者可用的容器屬性的子集

  • topics: 要訂閱的主題名稱陣列

  • groupId: 共享組 ID

  • clientId: 消費者的客戶端 ID

  • kafkaConsumerProperties: 額外的消費者屬性

共享消費者不支援

  • 顯式分割槽分配 (TopicPartitionOffset)

  • 主題模式

  • 手動偏移量管理

併發性

ShareKafkaMessageListenerContainer 透過在單個容器中建立多個消費者執行緒來支援併發處理。每個執行緒都執行自己的 ShareConsumer 例項,該例項參與同一個共享組。

與傳統消費者組中併發涉及分割槽分配不同,共享消費者利用 Kafka 在代理上的記錄級別分發。這意味著同一容器中的多個消費者執行緒作為共享組的一部分協同工作,Kafka 代理將記錄分發到所有消費者例項。

併發性在應用程式例項之間是累加的

從共享組的角度來看,每個 ShareConsumer 例項都是一個獨立的成員,無論它在哪裡執行。在單個容器中設定 concurrency=3 會建立 3 個共享組成員。如果您執行多個具有相同共享組 ID 的應用程式例項,它們的所有消費者執行緒將合併到一個池中。

例如: * 應用程式例項 1:concurrency=3 → 3 個共享組成員 * 應用程式例項 2:concurrency=3 → 3 個共享組成員 * 總計:6 個共享組成員可供代理分發記錄

這意味著在單個容器中設定 concurrency=5 在操作上等同於執行 5 個獨立的應用程式例項,每個例項的 concurrency=1(都使用相同的 group.id)。Kafka 代理對所有消費者例項一視同仁,並將記錄分發到整個池中。

以程式設計方式配置併發性

@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    // Set concurrency to create 5 consumer threads
    container.setConcurrency(5);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
        }
    });

    return container;
}

透過工廠配置併發性

您可以在工廠級別設定預設併發性,該設定適用於該工廠建立的所有容器

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set default concurrency for all containers created by this factory
    factory.setConcurrency(3);

    return factory;
}

每個監聽器的併發性

可以使用 concurrency 屬性覆蓋每個監聽器的併發設定

@Component
public class ConcurrentShareListener {

    @KafkaListener(
        topics = "high-throughput-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group",
        concurrency = "10"  // Override factory default
    )
    public void listen(ConsumerRecord<String, String> record) {
        // This listener will use 10 consumer threads
        System.out.println("Processing: " + record.value());
    }
}

併發性注意事項

  • 執行緒安全:每個消費者執行緒都有自己的 ShareConsumer 例項並獨立管理其自身的確認

  • 客戶端 ID:每個消費者執行緒都會收到一個帶有數字字尾的唯一客戶端 ID(例如,myContainer-0myContainer-1 等)

  • 指標:來自所有消費者執行緒的指標會被聚合並透過 container.metrics() 訪問

  • 生命週期:所有消費者執行緒作為一個單元同時啟動和停止

  • 工作分配:Kafka 代理處理共享組中所有消費者例項的記錄分發

  • 顯式確認:每個執行緒獨立管理其記錄的確認;一個執行緒中未確認的記錄不會阻塞其他執行緒

顯式確認下的併發性

併發性與顯式確認模式無縫協作。每個消費者執行緒獨立跟蹤並確認其自己的記錄

@KafkaListener(
    topics = "order-queue",
    containerFactory = "explicitShareKafkaListenerContainerFactory",
    groupId = "order-processors",
    concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    try {
        // Process the order
        processOrderLogic(record.value());
        acknowledgment.acknowledge(); // ACCEPT
    }
    catch (RetryableException e) {
        acknowledgment.release(); // Will be redelivered
    }
    catch (Exception e) {
        acknowledgment.reject(); // Permanent failure
    }
}

記錄獲取和分發行為

共享消費者使用拉取模型,其中每個消費者執行緒呼叫 poll() 從代理獲取記錄。當消費者進行輪詢時,代理的共享分割槽領導者

  • 選擇處於“可用”狀態的記錄

  • 將它們移動到“已獲取”狀態,並帶有時間限制的獲取鎖(預設為 30 秒,可透過 group.share.record.lock.duration.ms 配置)

  • 為了提高效率,傾向於返回完整的記錄批次

  • max.poll.records 應用為軟限制,這意味著即使超過此值,也會獲取完整的記錄批次

當記錄被一個消費者獲取時,其他消費者無法訪問它們。當獲取鎖到期時,未確認的記錄會自動返回“可用”狀態,並可以交付給另一個消費者。

代理使用 group.share.partition.max.record.locks 限制每個分割槽可獲取的記錄數量。一旦達到此限制,後續的輪詢將暫時不返回任何記錄,直到鎖過期。

對併發性的影響

  • 每個消費者執行緒獨立輪詢,每次輪詢可能獲取不同數量的記錄

  • 記錄線上程間的分發取決於輪詢時間和批次可用性

  • 多個執行緒增加了可用於獲取記錄的消費者池

  • 在訊息量少或分割槽單一的情況下,記錄可能會集中在少數執行緒上

  • 對於長時間執行的工作負載,分發往往更均勻

配置

  • 每個執行緒獨立輪詢和處理記錄

  • 確認約束適用於每個執行緒(一個執行緒中未確認的記錄不會阻塞其他執行緒)

  • 併發設定必須大於 0,並且在容器執行時不能更改

註解驅動的監聽器

帶有共享消費者的 @KafkaListener

您可以透過配置 ShareKafkaListenerContainerFactory 來使用帶有共享消費者的 @KafkaListener

@Configuration
@EnableKafka
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

然後在您的監聽器中使用它

@Component
public class ShareMessageListener {

    @KafkaListener(
        topics = "my-queue-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group"
    )
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received from queue: " + record.value());
        // Record is automatically acknowledged with ACCEPT
    }
}

共享組偏移量重置

與常規消費者組不同,共享組使用不同的配置來控制偏移量重置行為。您可以透過程式設計方式配置此項

private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

記錄確認

共享消費者支援兩種確認模式,控制記錄在處理後如何確認。

隱式確認(預設)

在隱式模式下,記錄根據處理結果自動確認

成功處理:記錄被確認為 ACCEPT 處理錯誤:記錄被確認為 REJECT

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    // Implicit mode is the default - no additional configuration needed
    return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}

顯式確認

在顯式模式下,應用程式必須使用提供的 ShareAcknowledgment 手動確認每個記錄。

有兩種方式配置顯式確認模式

選項 1:使用 Kafka 客戶端配置

@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
    return new DefaultShareConsumerFactory<>(props);
}

選項 2:使用 Spring 容器配置

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Configure acknowledgment mode at container factory level
    // true means explicit acknowledgment is required
    factory.getContainerProperties().setExplicitShareAcknowledgment(true);

    return factory;
}

配置優先順序

當兩種配置方法都使用時,Spring Kafka 遵循以下優先順序順序(從高到低)

  1. 容器屬性containerProperties.setExplicitShareAcknowledgment(true/false)

  2. 消費者配置ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG("implicit" 或 "explicit")

  3. 預設值false(隱式確認)

確認型別

共享消費者支援三種確認型別

ACCEPT: Record processed successfully, mark as completed
RELEASE: Temporary failure, make record available for redelivery
REJECT: Permanent failure, do not retry

ShareAcknowledgment API

ShareAcknowledgment 介面提供了顯式確認的方法

public interface ShareAcknowledgment {
    void acknowledge();
    void release();
    void reject();
}

監聽器介面

共享消費者支援針對不同用例的專用監聽器介面

基本訊息監聽器

對於簡單情況,使用標準 MessageListener

@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
    System.out.println("Received: " + record.value());
    // Automatically acknowledged in implicit mode
}

AcknowledgingShareConsumerAwareMessageListener

此介面提供對 ShareConsumer 例項的訪問,並支援可選的確認。確認引數可為空,具體取決於容器的確認模式

隱式模式示例(確認為空)
@KafkaListener(
    topics = "my-topic",
    containerFactory = "shareKafkaListenerContainerFactory"  // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
                  @Nullable ShareAcknowledgment acknowledgment,
                  ShareConsumer<?, ?> consumer) {

    // In implicit mode, acknowledgment is null
    System.out.println("Received: " + record.value());

    // Access consumer metrics if needed
    Map<MetricName, ? extends Metric> metrics = consumer.metrics();

    // Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
顯式模式示例(確認非空)
@Component
public class ExplicitAckListener {
    @KafkaListener(
        topics = "my-topic",
        containerFactory = "explicitShareKafkaListenerContainerFactory"
    )
    public void listen(ConsumerRecord<String, String> record,
                      @Nullable ShareAcknowledgment acknowledgment,
                      ShareConsumer<?, ?> consumer) {

        // In explicit mode, acknowledgment is non-null
        try {
            processRecord(record);
            acknowledgment.acknowledge(); // ACCEPT
        }
		catch (RetryableException e) {
            acknowledgment.release(); // Will be redelivered
        }
		catch (Exception e) {
            acknowledgment.reject(); // Permanent failure
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}

確認約束

在顯式確認模式下,容器強制執行重要的約束

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
在顯式模式下,如果未能確認記錄,將阻塞後續的訊息處理。請務必確保在所有程式碼路徑中都確認記錄。

確認超時檢測

為了幫助識別缺失的確認,Spring Kafka 提供了可配置的超時檢測。當記錄在指定超時時間內未被確認時,會記錄一條警告,其中包含未確認記錄的詳細資訊。

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set acknowledgment timeout (default is 30 seconds)
    factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));

    return factory;
}

當記錄超過超時時間時,您會看到如下警告

WARN: Record not acknowledged within timeout (30 seconds).
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
or ack.reject() for every record.

此功能有助於開發人員快速識別程式碼中缺少確認呼叫的情況,從而防止因忘記確認而導致的常見問題“Spring Kafka 不再消費新記錄”。

確認示例

混合確認模式

@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
    public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
        String orderId = record.key();
        String orderData = record.value();
        try {
            if (isValidOrder(orderData)) {
                if (processOrder(orderData)) {
                    acknowledgment.acknowledge(); // Success - ACCEPT
                }
                else {
                    acknowledgment.release(); // Temporary failure - retry later
                }
            }
            else {
                acknowledgment.reject(); // Invalid order - don't retry
            }
        }
        catch (Exception e) {
            // Exception automatically triggers REJECT
            throw e;
        }
}

條件確認

@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    ValidationResult result = validator.validate(record.value());
    switch (result.getStatus()) {
        case VALID:
            acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
            break;
        case INVALID_RETRYABLE:
            acknowledgment.acknowledge(AcknowledgeType.RELEASE);
            break;
        case INVALID_PERMANENT:
            acknowledgment.acknowledge(AcknowledgeType.REJECT);
            break;
    }
}

毒藥訊息保護和遞送計數

KIP-932 包含了代理端的毒藥訊息保護,以防止無法處理的記錄被無休止地重新遞送。

工作原理

每當共享組中的消費者獲取記錄時,代理都會遞增內部遞送計數。第一次獲取將遞送計數設定為 1,每次後續獲取都會遞增。當遞送計數達到配置的限制(預設:5)時,記錄會進入已歸檔狀態,並且不再有資格進行額外的遞送嘗試。

配置

最大遞送嘗試次數可以透過 Admin API 為每個共享組配置

private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);

        // Default is 5, adjust based on your retry tolerance
        ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

遞送計數不暴露給應用程式

遞送計數由代理內部維護,並且不暴露給消費者應用程式。這是 KIP-932 中的有意設計決策。遞送計數是近似值,用作毒藥訊息保護機制,而不是精確的重遞送計數器。應用程式無法透過任何 API 查詢或訪問此值。

對於應用程式級別的重試邏輯,請使用確認型別

  • RELEASE - 使記錄可用於重遞送(計入遞送計數)

  • REJECT - 標記為永久失敗(不會導致重遞送)

  • ACCEPT - 成功處理(不會導致重遞送)

一旦達到 group.share.delivery.attempt.limit,代理會自動防止無休止的重遞送,將記錄移至已歸檔狀態。

重試策略建議

@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
    try {
        // Attempt to process the order
        orderService.process(record.value());
        ack.acknowledge(); // ACCEPT - successfully processed
    }
    catch (TransientException e) {
        // Temporary failure (network issue, service unavailable, etc.)
        // Release the record for redelivery
        // Broker will retry up to group.share.delivery.attempt.limit times
        logger.warn("Transient error processing order, will retry: {}", e.getMessage());
        ack.release(); // RELEASE - make available for retry
    }
    catch (ValidationException e) {
        // Permanent semantic error (invalid data format, business rule violation, etc.)
        // Do not retry - this record will never succeed
        logger.error("Invalid order data, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - permanent failure, do not retry
    }
    catch (Exception e) {
        // Unknown error - typically safer to reject to avoid infinite loops
        // But could also release if you suspect it might be transient
        logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - avoid poison message loops
    }
}

代理的毒藥訊息保護確保即使您始終對錯誤使用 RELEASE,記錄也不會無休止地重試。它們在達到遞送嘗試限制後會自動歸檔。

與常規消費者的區別

共享消費者與常規消費者在幾個關鍵方面有所不同

  1. 無分割槽分配:共享消費者不能被分配特定分割槽

  2. 無主題模式:共享消費者不支援訂閱主題模式

  3. 協作消費:同一共享組中的多個消費者可以同時消費同一分割槽

  4. 記錄級別確認:支援帶有 ACCEPTRELEASEREJECT 型別的顯式確認

  5. 不同的組管理:共享組使用不同的協調器協議

  6. 無批處理:共享消費者單獨處理記錄,而不是批次處理

  7. 代理端重試管理:遞送計數跟蹤和毒藥訊息保護由代理管理,不暴露給應用程式

限制和注意事項

當前限制

  • 預覽版:此功能處於預覽模式,未來版本可能會更改

  • 無訊息轉換器:共享消費者尚不支援訊息轉換器

  • 無批處理監聽器:共享消費者不支援批處理

  • 輪詢約束:在顯式確認模式下,未確認的記錄會阻塞每個消費者執行緒中的後續輪詢

© . This site is unofficial and not affiliated with VMware.