Debezium 支援
Debezium 引擎,變更資料捕獲 (CDC) 入站通道介面卡。DebeziumMessageProducer 允許捕獲資料庫變更事件,將其轉換為訊息,然後流式傳輸到出站通道。
您需要將 Spring Integration Debezium 依賴項包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:7.0.0"
您還需要為您的輸入資料庫包含一個 Debezium 聯結器 依賴項。例如,要將 Debezium 與 PostgreSQL 一起使用,您將需要 PostgreSQL Debezium 聯結器
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
|
將 |
入站 Debezium 通道介面卡
Debezium 介面卡需要一個預配置的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 例項。
|
debezium-supplier 提供了一個開箱即用的 |
|
Debezium Java DSL 可以從提供的 |
此外,DebeziumMessageProducer 可以透過以下配置屬性進行調整
-
contentType- 允許處理JSON(預設)、AVRO和PROTOBUF訊息內容。contentType必須與為提供的DebeziumEngine.Builder配置的SerializationFormat對齊。 -
enableBatch- 當設定為false(預設)時,Debezium 介面卡將為從源資料庫接收到的每個ChangeEvent資料變更事件傳送新的Message。如果設定為true,則介面卡將為從 Debezium 引擎接收到的每個ChangeEvent批次向下遊傳送單個Message。此類有效負載不可序列化,需要自定義序列化/反序列化實現。 -
enableEmptyPayload- 啟用對墓碑(又稱刪除)訊息的支援。在資料庫行刪除時,Debezium 可以傳送一個墓碑變更事件,該事件具有與已刪除行相同的鍵和Optional.empty的值。預設為false。 -
headerMapper- 自定義HeaderMapper實現,允許選擇和轉換ChangeEvent標頭到Message標頭。預設的DefaultDebeziumHeaderMapper實現提供了一個用於setHeaderNamesToMap的 setter。預設情況下,所有標頭都被對映。 -
taskExecutor- 為 Debezium 引擎設定自定義TaskExecutor。
以下程式碼片段演示了此通道介面卡的各種配置
使用 Java 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
| 1 | 事件預期邏輯目標的名稱。通常,目標由 topic.prefix 配置選項、資料庫名稱和表名稱組成。例如:my-topic.inventory.orders。 |
| 2 | 包含變更表鍵的 schema 和變更行實際鍵的 schema。鍵 schema 及其對應的鍵有效負載都包含一個欄位,用於聯結器建立事件時變更表 PRIMARY KEY(或唯一約束)中的每個列。 |
| 3 | 與鍵一樣,有效負載有一個 schema 部分和一個有效負載值部分。schema 部分包含描述有效負載值部分的 Envelope 結構的 schema,包括其巢狀欄位。用於建立、更新或刪除資料的操作的變更事件都具有帶有 Envelope 結構的有效負載值。 |
|
|
同樣,我們可以配置 DebeziumMessageProducer 以批處理方式處理傳入的變更事件
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支援
spring-integration-debezium 透過 Debezium 工廠和 DebeziumMessageProducerSpec 實現提供了方便的 Java DSL 流式 API。
Debezium Java DSL 的入站通道介面卡是
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者從原生 Debezium 配置屬性建立 DebeziumMessageProducerSpec 例項,並預設使用 JSON 序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 應用程式提供了一個使用 Java DSL 配置入站介面卡的示例
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}