Debezium 支援
Debezium Engine,變更資料捕獲 (CDC) 入站通道介面卡。DebeziumMessageProducer
允許捕獲資料庫變更事件,將它們轉換為訊息並隨後流式傳輸到出站通道。
你需要將 spring integration Debezium 依賴新增到你的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.4.4"
你還需要為你的輸入資料庫包含一個 Debezium 聯結器 依賴。例如,要在 PostgreSQL 中使用 Debezium,你需要 postgres Debezium 聯結器
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
將 |
Debezium 入站通道介面卡
Debezium 介面卡需要一個預先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
例項。
Debezium-supplier 提供了一個開箱即用的 |
Debezium Java DSL 可以從提供的 |
此外,DebeziumMessageProducer
可以透過以下配置屬性進行調整
-
contentType
- 允許處理JSON
(預設)、AVRO
和PROTOBUF
訊息內容。此contentType
必須
與提供的DebeziumEngine.Builder
配置的SerializationFormat
對齊。 -
enableBatch
- 當設定為false
(預設)時,Debezium 介面卡會為從源資料庫接收到的每個ChangeEvent
資料變更事件傳送新的Message
。如果設定為true
,則介面卡會為從 Debezium 引擎接收到的每批ChangeEvent
傳送一個單獨的Message
。這種 payload 不可序列化,需要自定義的序列化/反序列化實現。 -
enableEmptyPayload
- 啟用對墓碑訊息(即刪除訊息)的支援。在資料庫行刪除時,Debezium 可以傳送一個墓碑變更事件,該事件的 key 與被刪除行的 key 相同,值為Optional.empty
。預設為false
。 -
headerMapper
- 自定義HeaderMapper
實現,用於選擇並轉換ChangeEvent
頭到Message
頭。預設的DefaultDebeziumHeaderMapper
實現提供了setHeaderNamesToMap
的 setter 方法。預設情況下,所有頭都會被對映。 -
taskExecutor
- 為 Debezium 引擎設定自定義TaskExecutor
。
以下程式碼片段演示了此通道介面卡的各種配置
使用 Java 配置
以下 Spring Boot 應用展示瞭如何使用 Java 配置配置入站介面卡
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | 事件所針對的邏輯目標的名稱。通常,目標由 topic.prefix 配置選項、資料庫名稱和表名稱組成。例如:my-topic.inventory.orders 。 |
2 | 包含變更表的 key 的 schema 以及變更行的實際 key。key schema 及其對應的 key payload 都包含變更表中在聯結器建立事件時 PRIMARY KEY (或唯一約束)中每個列的欄位。 |
3 | 與 key 類似,payload 具有 schema 部分和 payload value 部分。schema 部分包含描述 payload value 部分的 Envelope 結構的 schema,包括其巢狀欄位。建立、更新或刪除資料的操作產生的變更事件都具有帶 Envelope 結構的 value payload。 |
|
類似地,我們可以配置 DebeziumMessageProducer
以批處理方式處理傳入的變更事件
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支援
spring-integration-debezium
透過 Debezium
工廠和 DebeziumMessageProducerSpec
實現提供了方便的 Java DSL 流式 API。
Debezium Java DSL 的入站通道介面卡是
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者從原生 Debezium 配置屬性建立一個 DebeziumMessageProducerSpec
例項,並預設使用 JSON
序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 應用提供了一個使用 Java DSL 配置入站介面卡的示例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}