Apache Kafka 支援
概述
Spring Integration for Apache Kafka 基於 Spring for Apache Kafka 專案。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:7.0.0"
它提供以下元件:
出站通道介面卡
出站通道介面卡用於將訊息從 Spring Integration 通道釋出到 Apache Kafka 主題。該通道在應用程式上下文中定義,然後連線到向 Apache Kafka 傳送訊息的應用程式。傳送應用程式可以透過使用 Spring Integration 訊息釋出到 Apache Kafka,這些訊息由出站通道介面卡在內部轉換為 Kafka 記錄,如下所示:
-
Spring Integration 訊息的負載用於填充 Kafka 記錄的負載。
-
預設情況下,Spring Integration 訊息的
kafka_messageKey頭用於填充 Kafka 記錄的鍵。
您可以透過 kafka_topic 和 kafka_partitionId 頭分別自定義釋出訊息的目標主題和分割槽。
此外,<int-kafka:outbound-channel-adapter> 提供了透過對出站訊息應用 SpEL 表示式來提取鍵、目標主題和目標分割槽的能力。為此,它支援三對互斥的屬性:
-
topic和topic-expression -
message-key和message-key-expression -
partition-id和partition-id-expression
這些屬性允許您將 topic、message-key 和 partition-id 分別指定為介面卡上的靜態值,或者在執行時根據請求訊息動態評估它們的值。
KafkaHeaders 介面(由 spring-kafka 提供)包含用於與頭互動的常量。messageKey 和 topic 預設頭現在需要 kafka_ 字首。從使用舊版本的舊頭遷移時,您需要在 <int-kafka:outbound-channel-adapter> 上指定 message-key-expression="headers['messageKey']" 和 topic-expression="headers['topic']"。或者,您可以使用 <header-enricher> 或 MessageBuilder 將上游的頭更改為 KafkaHeaders 中的新頭。如果您使用常量值,您還可以使用 topic 和 message-key 在介面卡上配置它們。 |
注意:如果介面卡配置了主題或訊息鍵(無論是常量還是表示式),則使用這些值,並忽略相應的頭。如果您希望頭覆蓋配置,您需要在表示式中配置它,例如:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
介面卡需要一個 KafkaTemplate,而 KafkaTemplate 又需要一個配置適當的 KafkaProducerFactory。
如果提供了 send-failure-channel (sendFailureChannel) 並且收到了 send() 失敗(同步或非同步),則會向該通道傳送一個 ErrorMessage。其負載是一個 KafkaSendFailureException,具有 failedMessage、record (ProducerRecord) 和 cause 屬性。您可以透過設定 error-message-strategy 屬性來覆蓋 DefaultErrorMessageStrategy。
如果提供了 send-success-channel (sendSuccessChannel),則在成功傳送後傳送一個負載型別為 org.apache.kafka.clients.producer.RecordMetadata 的訊息。
如果您的應用程式使用事務,並且使用相同的通道介面卡來發布訊息,其中事務由監聽器容器啟動,以及在沒有現有事務的情況下發布,則必須在 KafkaTemplate 上配置 transactionIdPrefix 以覆蓋容器或事務管理器使用的字首。容器啟動的事務(生產者工廠或事務管理器屬性)使用的字首在所有應用程式例項上必須相同。僅生產者事務使用的字首在所有應用程式例項上必須唯一。 |
您可以配置一個 flushExpression,它必須解析為布林值。如果您使用 linger.ms 和 batch.size Kafka 生產者屬性,在傳送多條訊息後重新整理可能會很有用;該表示式應該在最後一條訊息上評估為 Boolean.TRUE,並且未完成的批次將立即傳送。預設情況下,該表示式在 KafkaIntegrationHeaders.FLUSH 頭 (kafka_flush) 中查詢 Boolean 值。如果值為 true,則會發生重新整理;如果為 false 或頭不存在,則不會發生重新整理。
KafkaProducerMessageHandler.sendTimeoutExpression 的預設值已從 10 秒更改為 delivery.timeout.ms Kafka 生產者屬性 + 5000,以便在超時後將實際的 Kafka 錯誤傳播到應用程式,而不是由該框架生成的超時。為了保持一致性,這已進行了更改,因為您可能會遇到意外行為(Spring 可能會超時傳送,而實際上它最終會成功)。重要提示:該超時預設為 120 秒,因此您可能希望將其縮短以獲得更及時的失敗。
配置
以下示例演示如何為 Apache Kafka 配置出站通道介面卡:
-
Java DSL
-
Java
-
XML
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
訊息驅動通道介面卡
KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) 使用 spring-kafka 的 KafkaMessageListenerContainer 或 ConcurrentListenerContainer。
此外,還提供了 mode 屬性。它可以接受 record 或 batch 值(預設值:record)。對於 record 模式,每個訊息負載都從單個 ConsumerRecord 轉換而來。對於 batch 模式,負載是所有由消費者輪詢返回的 ConsumerRecord 例項轉換而來的物件列表。與批處理的 @KafkaListener 一樣,KafkaHeaders.RECEIVED_KEY、KafkaHeaders.RECEIVED_PARTITION、KafkaHeaders.RECEIVED_TOPIC 和 KafkaHeaders.OFFSET 頭也都是列表,其位置與負載中的位置相對應。
接收到的訊息會填充某些頭。有關更多資訊,請參閱 KafkaHeaders 類。
Consumer 物件(在 kafka_consumer 頭中)不是執行緒安全的。您只能在介面卡內呼叫監聽器的執行緒上呼叫其方法。如果您將訊息交給另一個執行緒,則不得呼叫其方法。 |
如果提供了 retry-template,則根據其重試策略重試傳遞失敗。如果還提供了 error-channel,則在重試耗盡後,將使用預設的 ErrorMessageSendingRecoverer 作為恢復回撥。您還可以使用 recovery-callback 指定在這種情況下采取的其他操作,或者將其設定為 null 以將最終異常拋給監聽器容器,以便在那裡進行處理。
在構建 ErrorMessage(用於 error-channel 或 recovery-callback)時,您可以透過設定 error-message-strategy 屬性來自定義錯誤訊息。預設情況下,使用 RawRecordHeaderErrorMessageStrategy,以提供對轉換後的訊息以及原始 ConsumerRecord 的訪問。
這種形式的重試是阻塞的,如果所有輪詢記錄的總重試延遲可能超過 max.poll.interval.ms 消費者屬性,則可能導致重新平衡。相反,請考慮向監聽器容器新增一個 DefaultErrorHandler,並配置一個 KafkaErrorSendingMessageRecoverer。 |
配置
以下示例演示如何配置訊息驅動通道介面卡:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
您還可以使用用於 @KafkaListener 註解的容器工廠來建立用於其他目的的 ConcurrentMessageListenerContainer 例項。有關示例,請參閱 Spring for Apache Kafka 文件。
使用 Java DSL 時,容器不必配置為 @Bean,因為 DSL 會將容器註冊為 bean。以下示例演示如何執行此操作:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
請注意,在這種情況下,介面卡被賦予一個 id (topic2Adapter)。容器以 topic2Adapter.container 的名稱註冊到應用程式上下文中。如果介面卡沒有 id 屬性,則容器的 bean 名稱是容器的完全限定類名加上 #n,其中 n 為每個容器遞增。
入站通道介面卡
KafkaMessageSource 提供了一個可輪詢的通道介面卡實現。
配置
-
Java DSL
-
Kotlin
-
Java
-
XML
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
integrationFlow(Kafka.inboundChannelAdapter(cf,
ConsumerProperties(TEST_TOPIC3).also {
it.groupId = "kotlinMessageSourceGroup"
}),
{ poller(Pollers.fixedDelay(100)) }) {
handle { m ->
}
}
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
consumerProperties.setGroupId("myGroupId");
consumerProperties.setClientId("myClientId");
retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
consumer-properties="consumerProperties1"
ack-factory="ackFactory"
channel="inbound"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="max.poll.records" value="1"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
<constructor-arg name="topics" value="topic1"/>
<property name="groupId" value="group"/>
<property name="clientId" value="client"/>
</bean>
有關可用屬性,請參閱 Javadocs。
預設情況下,max.poll.records 必須在消費者工廠中顯式設定,否則如果消費者工廠是 DefaultKafkaConsumerFactory,則會強制設定為 1。您可以將屬性 allowMultiFetch 設定為 true 來覆蓋此行為。
您必須在 max.poll.interval.ms 內輪詢消費者以避免重新平衡。如果您將 allowMultiFetch 設定為 true,則必須處理所有檢索到的記錄,並在 max.poll.interval.ms 內再次輪詢。 |
此介面卡發出的訊息包含一個名為 kafka_remainingRecords 的頭,其中包含上次輪詢後剩餘的記錄數。
從 6.2 版本開始,KafkaMessageSource 支援消費者屬性中提供的 ErrorHandlingDeserializer。DeserializationException 從記錄頭中提取並拋給呼叫者。使用 SourcePollingChannelAdapter 時,此異常被包裝到 ErrorMessage 中併發布到其 errorChannel。有關更多資訊,請參閱 ErrorHandlingDeserializer 文件。
出站閘道器
出站閘道器用於請求/回覆操作。它與大多數 Spring Integration 閘道器的不同之處在於,傳送執行緒不會在閘道器中阻塞,並且回覆在回覆監聽器容器執行緒上處理。如果您的程式碼在同步 訊息閘道器 後面呼叫閘道器,則使用者執行緒將在那裡阻塞,直到收到回覆(或發生超時)。
KafkaProducerMessageHandler 的 sendTimeoutExpression 預設值是 delivery.timeout.ms Kafka 生產者屬性 + 5000,以便在超時後將實際的 Kafka 錯誤傳播到應用程式,而不是由該框架生成的超時。為了保持一致性,這已進行了更改,因為您可能會遇到意外行為(Spring 可能會超時 send(),而實際上它最終會成功)。重要提示:該超時預設為 120 秒,因此您可能希望將其縮短以獲得更及時的失敗。
配置
以下示例演示如何配置閘道器:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
有關可用屬性,請參閱 Javadocs。
請注意,使用的類與 出站通道介面卡 相同,唯一的區別是傳遞給建構函式的 KafkaTemplate 是 ReplyingKafkaTemplate。有關更多資訊,請參閱 Spring for Apache Kafka 文件。
出站主題、分割槽、鍵等以與出站介面卡相同的方式確定。回覆主題的確定方式如下:
-
名為
KafkaHeaders.REPLY_TOPIC的訊息頭(如果存在,則必須具有String或byte[]值)將根據模板的回覆容器訂閱的主題進行驗證。 -
如果模板的
replyContainer僅訂閱了一個主題,則使用該主題。
您還可以指定 KafkaHeaders.REPLY_PARTITION 頭來確定用於回覆的特定分割槽。同樣,這也會根據模板的回覆容器訂閱進行驗證。
或者,您也可以使用類似於以下 bean 的配置:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
入站閘道器
入站閘道器用於請求/回覆操作。
配置
以下示例演示如何配置入站閘道器:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
有關可用屬性,請參閱 Javadocs。
如果提供了 RetryTemplate,則根據其重試策略重試傳遞失敗。如果還提供了 error-channel,則在重試耗盡後,將使用預設的 ErrorMessageSendingRecoverer 作為恢復回撥。您還可以使用 recovery-callback 指定在這種情況下采取的其他操作,或者將其設定為 null 以將最終異常拋給監聽器容器,以便在那裡進行處理。
在構建 ErrorMessage(用於 error-channel 或 recovery-callback)時,您可以透過設定 error-message-strategy 屬性來自定義錯誤訊息。預設情況下,使用 RawRecordHeaderErrorMessageStrategy,以提供對轉換後的訊息以及原始 ConsumerRecord 的訪問。
這種形式的重試是阻塞的,如果所有輪詢記錄的總重試延遲可能超過 max.poll.interval.ms 消費者屬性,則可能導致重新平衡。相反,請考慮向監聽器容器新增一個 DefaultErrorHandler,並配置一個 KafkaErrorSendingMessageRecoverer。 |
以下示例演示如何使用 Java DSL 配置一個簡單的轉換為大寫的功能:
或者,您可以使用類似於以下程式碼的配置來配置一個轉換為大寫的功能:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
您還可以使用用於 @KafkaListener 註解的容器工廠來建立用於其他目的的 ConcurrentMessageListenerContainer 例項。有關示例,請參閱 Spring for Apache Kafka 文件 和 訊息驅動通道介面卡。
由 Apache Kafka 主題支援的通道
Spring Integration 具有由 Apache Kafka 主題支援的 MessageChannel 實現以實現永續性。
每個通道都需要一個 KafkaTemplate 用於傳送端,以及一個監聽器容器工廠(用於可訂閱通道)或一個 KafkaMessageSource 用於可輪詢通道。
Java DSL 配置
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
訊息轉換
提供了一個 StringJsonMessageConverter。有關更多資訊,請參閱 Spring for Apache Kafka 文件。
當將此轉換器與訊息驅動通道介面卡一起使用時,您可以指定希望將傳入的負載轉換為的型別。這可以透過在介面卡上設定 payload-type 屬性 (payloadType 屬性) 來實現。以下示例演示瞭如何在 XML 配置中進行此操作:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
以下示例演示瞭如何在 Java 配置中設定介面卡上的 payload-type 屬性 (payloadType 屬性):
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
空負載和日誌壓縮“墓碑”記錄
Spring Messaging Message<?> 物件不能有 null 負載。當您使用 Apache Kafka 的端點時,null 負載(也稱為墓碑記錄)由 KafkaNull 型別的負載表示。有關更多資訊,請參閱 Spring for Apache Kafka 文件。
Spring Integration 端點的 POJO 方法可以使用真正的 null 值而不是 KafkaNull。為此,請使用 @Payload(required = false) 標記引數。以下示例演示瞭如何執行此操作:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
從 KStream 呼叫 Spring Integration 流
您可以使用 MessagingTransformer 從 KStream 呼叫整合流:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
當整合流以介面開始時,建立的代理具有流 bean 的名稱,並附加 “.gateway”,因此如果需要,此 bean 名稱可以用作 @Qualifier。
讀/處理/寫場景的效能考慮
許多應用程式從一個主題消費訊息,執行一些處理,然後寫入另一個主題。在大多數情況下,如果“寫”操作失敗,應用程式會希望丟擲異常,以便可以重試傳入請求和/或將其傳送到死信主題。此功能由底層訊息監聽器容器以及配置適當的錯誤處理程式支援。然而,為了支援此功能,我們需要阻塞監聽器執行緒,直到寫入操作成功(或失敗),以便可以將任何異常拋給容器。當消費單個記錄時,這透過在出站介面卡上設定 sync 屬性來實現。但是,當消費批次時,使用 sync 會導致顯著的效能下降,因為應用程式會在生成下一條訊息之前等待每個傳送操作的結果。您還可以執行多次傳送,然後等待這些傳送操作的結果。這透過向訊息處理程式新增 futuresChannel 來實現。要啟用此功能,請將 KafkaIntegrationHeaders.FUTURE_TOKEN 新增到出站訊息中;然後可以使用此功能將 Future 與特定的已傳送訊息關聯起來。以下是您可能如何使用此功能的示例:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}