R2DBC 支援

Spring Integration 提供通道介面卡,用於透過 R2DBC 驅動程式以響應式方式訪問資料庫來接收和傳送訊息。

你需要將此依賴新增到你的專案

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.4.4"

R2DBC 入站通道介面卡

R2dbcMessageSource 是一個基於 R2dbcEntityOperations 的可輪詢的 MessageSource 實現,根據 expectSingleResult 選項,生成以 FluxMono 作為 Payload 的訊息,用於從資料庫獲取資料。SELECT 查詢可以是靜態提供的,也可以基於 SpEL 表示式,該表示式在每次 receive() 呼叫時都會進行評估。R2dbcMessageSource.SelectCreator 作為評估上下文的根物件存在,允許使用 StatementMapper.SelectSpec 流式 API。預設情況下,此通道介面卡將 SELECT 中的記錄對映到 LinkedCaseInsensitiveMap 例項。可以透過提供 payloadType 選項進行自定義,底層將基於 this.r2dbcEntityOperations.getConverter() 使用 EntityRowMapperupdateSql 是可選的,用於在資料庫中標記已讀記錄,以便後續輪詢時跳過。UPDATE 操作可以提供一個 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以便根據 SELECT 結果中的記錄將值繫結到 UPDATE 中。

此通道介面卡的典型配置可能如下所示

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道介面卡的配置如下所示

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道介面卡

R2dbcMessageHandler 是一個 ReactiveMessageHandler 實現,用於使用提供的 R2dbcEntityOperations 在資料庫中執行 INSERT(預設)、UPDATEDELETE 查詢。R2dbcMessageHandler.Type 可以靜態配置,或透過針對請求訊息的 SpEL 表示式進行配置。要執行的查詢可以基於 tableNamevaluescriteria 表示式選項,或者(如果未提供 tableName)整個訊息 Payload 被視為一個 org.springframework.data.relational.core.mapping.Table 實體來執行 SQL。包 org.springframework.data.relational.core.query 在 SpEL 評估上下文中註冊為匯入,以便直接訪問 Criteria 流式 API,該 API 用於 UPDATEDELETE 查詢。valuesExpression 用於 INSERTUPDATE,並且必須針對請求訊息評估為 Map,其中包含要對目標表進行更改的列-值對。

此通道介面卡的典型配置可能如下所示

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道介面卡的配置如下所示

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))