使用 Kafka Binder 進行分割槽
Apache Kafka 原生支援主題分割槽。
有時將資料傳送到特定分割槽會很有優勢——例如,當您希望嚴格按順序處理訊息時(某個特定客戶的所有訊息應傳送到同一分割槽)。
以下示例展示瞭如何配置生產者和消費者端
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
請務必記住,由於 Apache Kafka 原生支援分割槽,除非您像示例中一樣使用自定義分割槽鍵,或者使用涉及載荷(payload)本身的表示式,否則無需依賴上述描述的 binder 分割槽機制。binder 提供的分割槽選擇機制是為那些不支援原生分割槽的中介軟體技術設計的。請注意,在上面的示例中,我們使用了一個名為 partitionKey 的自定義鍵,它將是決定分割槽的關鍵因素,因此在這種情況下,使用 binder 分割槽是合適的。當使用原生 Kafka 分割槽時,即當您不提供 partition-key-expression 時,Apache Kafka 將選擇一個分割槽,預設情況下,它將是記錄鍵的雜湊值對可用分割槽數取模的結果。要向出站記錄新增鍵,請在 spring-messaging Message<?> 中將 KafkaHeaders.KEY 頭部設定為所需的鍵值。預設情況下,如果沒有提供記錄鍵,Apache Kafka 將根據 Apache Kafka 文件中描述的邏輯選擇一個分割槽。 |
必須為主題預置足夠的分割槽,以實現所有消費者組所需的併發度。上述配置最多支援 12 個消費者例項(如果 concurrency 為 2,則支援 6 個;如果 concurrency 為 3,則支援 4 個,依此類推)。通常最好“過度配置”分割槽,以便未來能夠增加消費者或併發度。 |
上述配置使用了預設分割槽策略(key.hashCode() % partitionCount )。根據鍵值的不同,這可能提供,也可能不提供一個均衡的演算法。特別需要注意的是,這種分割槽策略與獨立 Kafka 生產者(例如 Kafka Streams 使用的生產者)的預設策略不同,這意味著相同的鍵值在這些客戶端生產時可能在分割槽之間產生不同的均衡結果。您可以透過使用 partitionSelectorExpression 或 partitionSelectorClass 屬性來覆蓋此預設設定。 |
由於分割槽由 Kafka 原生處理,消費者端無需特殊配置。Kafka 會在例項間分配分割槽。
Kafka 主題的分割槽數可能在執行時發生變化(例如由於管理任務)。之後計算出的分割槽將有所不同(例如,屆時將使用新的分割槽)。從 Spring Cloud Stream 執行時版本 4.0.3 開始將支援分割槽數的變化。另請參見引數 'spring.kafka.producer.properties.metadata.max.age.ms' 來配置更新間隔。由於某些限制,不能使用引用訊息 'payload' 的 'partition-key-expression',在這種情況下該機制將被停用。此整體行為預設是停用的,可以透過配置引數 'producer.dynamicPartitionUpdatesEnabled=true' 來啟用。 |
以下 Spring Boot 應用監聽 Kafka 流,並將每條訊息傳送到的分割槽 ID 列印(到控制檯)出來
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根據需要新增例項。Kafka 會重新平衡分割槽分配。如果例項數量(或 例項數量 * 併發數
)超過分割槽數量,一些消費者將處於空閒狀態。