Debezium 支援

Debezium 引擎,變更資料捕獲 (CDC) 入站通道介面卡。DebeziumMessageProducer 允許捕獲資料庫變更事件,將其轉換為訊息,然後流式傳輸到出站通道。

您需要將 Spring Integration Debezium 依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:7.0.0"

您還需要為您的輸入資料庫包含一個 Debezium 聯結器 依賴項。例如,要將 Debezium 與 PostgreSQL 一起使用,您將需要 PostgreSQL Debezium 聯結器

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

debezium-version 替換為與正在使用的 spring-integration-debezium 版本相容的版本。

入站 Debezium 通道介面卡

Debezium 介面卡需要一個預配置的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 例項。

debezium-supplier 提供了一個開箱即用的 DebeziumEngine.Builder Spring Boot 自動配置,以及一個方便的 DebeziumProperties 配置抽象。

Debezium Java DSL 可以從提供的 DebeziumEngine.Builder 以及從普通的 Debezium 配置(例如 java.util.Properties)建立 DebeziumMessageProducer 例項。後者對於一些具有特定配置和序列化格式的常見用例非常有用。

此外,DebeziumMessageProducer 可以透過以下配置屬性進行調整

  • contentType - 允許處理 JSON(預設)、AVROPROTOBUF 訊息內容。contentType 必須與為提供的 DebeziumEngine.Builder 配置的 SerializationFormat 對齊。

  • enableBatch - 當設定為 false(預設)時,Debezium 介面卡將為從源資料庫接收到的每個 ChangeEvent 資料變更事件傳送新的 Message。如果設定為 true,則介面卡將為從 Debezium 引擎接收到的每個 ChangeEvent 批次向下遊傳送單個 Message。此類有效負載不可序列化,需要自定義序列化/反序列化實現。

  • enableEmptyPayload - 啟用對墓碑(又稱刪除)訊息的支援。在資料庫行刪除時,Debezium 可以傳送一個墓碑變更事件,該事件具有與已刪除行相同的鍵和 Optional.empty 的值。預設為 false

  • headerMapper - 自定義 HeaderMapper 實現,允許選擇和轉換 ChangeEvent 標頭到 Message 標頭。預設的 DefaultDebeziumHeaderMapper 實現提供了一個用於 setHeaderNamesToMap 的 setter。預設情況下,所有標頭都被對映。

  • taskExecutor - 為 Debezium 引擎設定自定義 TaskExecutor

以下程式碼片段演示了此通道介面卡的各種配置

使用 Java 配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 事件預期邏輯目標的名稱。通常,目標由 topic.prefix 配置選項、資料庫名稱和表名稱組成。例如:my-topic.inventory.orders
2 包含變更表鍵的 schema 和變更行實際鍵的 schema。鍵 schema 及其對應的鍵有效負載都包含一個欄位,用於聯結器建立事件時變更表 PRIMARY KEY(或唯一約束)中的每個列。
3 與鍵一樣,有效負載有一個 schema 部分和一個有效負載值部分。schema 部分包含描述有效負載值部分的 Envelope 結構的 schema,包括其巢狀欄位。用於建立、更新或刪除資料的操作的變更事件都具有帶有 Envelope 結構的有效負載值。

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允許分別停用鍵或有效負載中的訊息內 schema 內容。

同樣,我們可以配置 DebeziumMessageProducer 以批處理方式處理傳入的變更事件

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL 支援

spring-integration-debezium 透過 Debezium 工廠和 DebeziumMessageProducerSpec 實現提供了方便的 Java DSL 流式 API。

Debezium Java DSL 的入站通道介面卡是

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者從原生 Debezium 配置屬性建立 DebeziumMessageProducerSpec 例項,並預設使用 JSON 序列化格式。

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

以下 Spring Boot 應用程式提供了一個使用 Java DSL 配置入站介面卡的示例

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}
© . This site is unofficial and not affiliated with VMware.