R2DBC 支援

Spring Integration 提供通道介面卡,用於透過 R2DBC 驅動程式以響應式方式訪問資料庫來接收和傳送訊息。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:7.0.0"

R2DBC 入站通道介面卡

R2dbcMessageSource 是一個基於 R2dbcEntityOperations 的可輪詢 MessageSource 實現,它根據 expectSingleResult 選項生成以 FluxMono 作為有效負載的訊息,用於從資料庫中獲取資料。SELECT 查詢可以是靜態提供的,也可以基於 SpEL 表示式,該表示式在每次 receive() 呼叫時進行評估。R2dbcMessageSource.SelectCreator 作為求值上下文的根物件存在,以允許使用 StatementMapper.SelectSpec 流式 API。預設情況下,此通道介面卡將選擇中的記錄對映到 LinkedCaseInsensitiveMap 例項。可以透過提供 payloadType 選項進行自定義,該選項由基於 this.r2dbcEntityOperations.getConverter()EntityRowMapper 在底層使用。updateSql 是可選的,用於標記資料庫中已讀取的記錄,以便在後續輪詢中跳過。UPDATE 操作可以與 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> 一起提供,以根據 SELECT 結果中的記錄將值繫結到 UPDATE

此通道介面卡的典型配置可能如下所示:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道介面卡的配置如下:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道介面卡

R2dbcMessageHandler 是一個 ReactiveMessageHandler 實現,用於使用提供的 R2dbcEntityOperations 在資料庫中執行 INSERT(預設)、UPDATEDELETE 查詢。R2dbcMessageHandler.Type 可以靜態配置,也可以透過針對請求訊息的 SpEL 表示式進行配置。要執行的查詢可以基於 tableNamevaluescriteria 表示式選項,或者(如果未提供 tableName)整個訊息有效負載被視為 org.springframework.data.relational.core.mapping.Table 實體來執行 SQL。包 org.springframework.data.relational.core.query 作為匯入註冊到 SpEL 求值上下文中,以便直接訪問用於 UPDATEDELETE 查詢的 Criteria 流式 API。valuesExpression 用於 INSERTUPDATE,並且必須求值為 Map 形式的列值對,以針對請求訊息在目標表中執行更改。

此通道介面卡的典型配置可能如下所示:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道介面卡的配置如下:

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))
© . This site is unofficial and not affiliated with VMware.