使用 Kafka 繫結器進行分割槽

Apache Kafka 原生支援主題分割槽。

有時將資料傳送到特定分割槽是有利的——例如,當您想嚴格排序訊息處理時(特定客戶的所有訊息都應傳送到同一分割槽)。

以下示例展示瞭如何配置生產者和消費者端。

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
需要注意的是,由於 Apache Kafka 原生支援分割槽,除非您像示例中那樣使用自定義分割槽鍵或涉及有效負載本身的表示式,否則無需依賴上述繫結器分割槽。繫結器提供的分割槽選擇旨在用於不支援原生分割槽的中介軟體技術。請注意,在上述示例中我們使用了一個名為 partitionKey 的自定義鍵,它將成為分割槽的重要決定因素,因此在這種情況下使用繫結器分割槽是合適的。當使用原生 Kafka 分割槽時,即當您不提供 partition-key-expression 時,Apache Kafka 將選擇一個分割槽,預設情況下,該分割槽將是記錄鍵的雜湊值除以可用分割槽數的餘數。要向出站記錄新增鍵,請在 spring-messaging Message<?> 中將 KafkaHeaders.KEY 頭設定為所需的鍵值。預設情況下,當未提供記錄鍵時,Apache Kafka 將根據 Apache Kafka 文件中描述的邏輯選擇一個分割槽。
主題必須配置足夠的分割槽,以實現所有消費者組所需的併發性。上述配置最多支援 12 個消費者例項(如果它們的 concurrency 為 2,則為 6 個;如果它們的併發性為 3,則為 4 個,依此類推)。通常最好“過度配置”分割槽,以允許未來增加消費者或併發性。
前面的配置使用預設分割槽 (key.hashCode() % partitionCount)。這可能提供也可能不提供一個適當平衡的演算法,具體取決於鍵值。特別需要注意的是,此分割槽策略與獨立 Kafka 生產者(例如 Kafka Streams 使用的生產者)使用的預設策略不同,這意味著由這些客戶端生成時,相同的鍵值在不同分割槽上的平衡方式可能不同。您可以透過使用 partitionSelectorExpressionpartitionSelectorClass 屬性來覆蓋此預設值。

由於分割槽由 Kafka 原生處理,因此消費者端不需要特殊配置。Kafka 在例項之間分配分割槽。

Kafka 主題的分割槽數可能在執行時發生變化(例如,由於管理任務)。此後,計算出的分割槽將有所不同(例如,將使用新分割槽)。自 Spring Cloud Stream 4.0.3 起,將支援分割槽數的執行時更改。另請參閱引數“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新間隔。由於某些限制,無法使用引用訊息“payload”的“partition-key-expression”,在這種情況下該機制將被停用。總體行為預設停用,可以透過配置引數“producer.dynamicPartitionUpdatesEnabled=true”啟用。

以下 Spring Boot 應用程式監聽 Kafka 流並列印(到控制檯)每條訊息到達的分割槽 ID

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

您可以根據需要新增例項。Kafka 會重新平衡分割槽分配。如果例項數(或 例項數 * 併發數)超過分割槽數,則某些消費者將處於空閒狀態。

© . This site is unofficial and not affiliated with VMware.