動態和執行時整合流
IntegrationFlow
及其所有依賴元件都可以在執行時註冊。在 5.0 版本之前,我們使用 BeanFactory.registerSingleton()
鉤子。從 Spring Framework 5.0
開始,我們使用 instanceSupplier
鉤子進行程式設計式 BeanDefinition
註冊。下面的示例展示瞭如何程式設計式註冊一個 Bean
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
請注意,在前面的示例中,instanceSupplier
鉤子是 genericBeanDefinition
方法的最後一個引數,在本例中由 lambda 提供。
所有必要的 Bean 初始化和生命週期都將自動完成,與標準上下文配置的 Bean 定義一樣。
為了簡化開發體驗,Spring Integration 引入了 IntegrationFlowContext
來在執行時註冊和管理 IntegrationFlow
例項,如下面的示例所示
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
這在有多個配置選項且必須建立多個類似流的例項時非常有用。為此,我們可以迭代我們的選項並在迴圈中建立和註冊 IntegrationFlow
例項。另一種情況是當我們的資料來源不是基於 Spring 時,我們必須即時建立它。一個這樣的示例是 Reactive Streams 事件源,如下面的示例所示
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
IntegrationFlowRegistrationBuilder
(作為 IntegrationFlowContext.registration()
的結果)可用於為要註冊的 IntegrationFlow
指定 Bean 名稱,控制其 autoStartup
,並註冊非 Spring Integration Bean。通常,這些額外的 Bean 是連線工廠(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他所需的支撐元件。
您可以使用 IntegrationFlowRegistration.destroy()
回撥來移除動態註冊的 IntegrationFlow
及其所有依賴 Bean,當您不再需要它們時。有關更多資訊,請參閱 IntegrationFlowContext
Javadoc。
從 5.0.6 版本開始,IntegrationFlow 定義中所有生成的 Bean 名稱都會以字首形式新增 flow ID。我們建議始終指定顯式的 flow ID。否則,會在 IntegrationFlowContext 中啟動一個同步屏障,以生成 IntegrationFlow 的 Bean 名稱並註冊其 Bean。我們對這兩個操作進行同步,以避免當相同的生成 Bean 名稱可能用於不同的 IntegrationFlow 例項時發生競態條件。 |
此外,從 5.0.6 版本開始,註冊構建器 API 有一個新方法:useFlowIdAsPrefix()
。如果您希望宣告同一個流的多個例項,並在流中的元件具有相同 ID 時避免 Bean 名稱衝突,此方法非常有用,如下面的示例所示
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在這種情況下,第一個流的訊息處理器可以使用 Bean 名稱 tcp1.client.handler
來引用。
當您使用 useFlowIdAsPrefix() 時,需要一個 id 屬性。 |