JDBC 元件的 Java DSL
7.0 版本引入了 JDBC 模組中通道介面卡的 Java DSL API。核心 Java DSL 類(通常是起點)是一個 org.springframework.integration.jdbc.dsl.Jdbc 工廠。它提供了一個不言自明的方法來啟動目標通道介面卡或閘道器的配置。現成的通道介面卡的標準 IntegrationComponentSpec 實現是
-
JdbcInboundChannelAdapterSpec 擴充套件 MessageSourceSpec<JdbcInboundChannelAdapterSpec, JdbcPollingChannelAdapter> -
JdbcOutboundChannelAdapterSpec 擴充套件 MessageHandlerSpec<JdbcOutboundChannelAdapterSpec, JdbcMessageHandler> -
JdbcOutboundGatewaySpec 擴充套件 MessageHandlerSpec<JdbcOutboundGatewaySpec, JdbcOutboundGateway> -
JdbcStoredProcInboundChannelAdapterSpec 擴充套件 MessageSourceSpec<JdbcStoredProcInboundChannelAdapterSpec, StoredProcPollingChannelAdapter> -
JdbcStoredProcOutboundChannelAdapterSpec 擴充套件 MessageHandlerSpec<JdbcStoredProcOutboundChannelAdapterSpec, StoredProcMessageHandler> -
JdbcStoredProcOutboundGatewaySpec 擴充套件 MessageHandlerSpec<JdbcStoredProcOutboundGatewaySpec, StoredProcOutboundGateway>
此外,還提供了一個 StoredProcExecutorSpec,這是一個方便的、類似構建器的元件,用於 StoredProcExecutor 的建立和配置。
以下是一些如何使用 Jdbc 工廠配置 IntegrationFlow 的示例
@Bean
public DataSource h2DataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScripts("classpath:dsl-h2.sql", "classpath:h2-stored-procedures.sql")
.build();
}
@Bean
public IntegrationFlow outboundFlow(DataSource h2DataSource) {
return flow -> flow
.handle(Jdbc.outboundAdapter(h2DataSource,
"insert into outbound (id, status, name) values (1, 0, ?)")
.preparedStatementSetter((ps, requestMessage) ->
ps.setObject(1, requestMessage.getPayload()))
.usePayloadAsParameterSource(false)
.keysGenerated(false));
}
@Bean
public IntegrationFlow storedProcInboundFlow(DataSource h2DataSource) {
return IntegrationFlow.from(Jdbc.storedProcInboundAdapter(h2DataSource)
.expectSingleResult(true)
.configurerStoredProcExecutor(configurer -> configurer
.ignoreColumnMetaData(true)
.isFunction(false)
.storedProcedureName("GET_PRIME_NUMBERS")
.procedureParameter(new ProcedureParameter("beginRange", 1, null))
.procedureParameter(new ProcedureParameter("endRange", 10, null))
.sqlParameter(new SqlParameter("beginRange", Types.INTEGER))
.sqlParameter(new SqlParameter("endRange", Types.INTEGER))
.returningResultSetRowMapper("out", new PrimeMapper())
),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("storedProcInboundPollerChannel"))
.get();
}