Kotlin 支援
該框架也已得到改進,支援 Kotlin Lambda 表示式用於函式,因此您現在可以使用 Kotlin 語言和 Spring Integration 流定義的組合
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 協程
從 6.0 版本開始,Spring Integration 提供對 Kotlin 協程的支援。現在,suspend 函式以及 kotlinx.coroutines.Deferred 和 kotlinx.coroutines.flow.Flow 返回型別可用於服務方法
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
該框架將其視為 Reactive Streams 互動,並使用 ReactiveAdapterRegistry 轉換為相應的 Mono 和 Flux reactor 型別。然後,如果回覆通道是 ReactiveStreamsSubscribableChannel,則在回覆通道中處理此類函式回覆,或者作為相應回撥中 CompletableFuture 的結果進行處理。
預設情況下,帶有 Flow 結果的函式在 @ServiceActivator 上不是 async 的,因此 Flow 例項作為回覆訊息負載生成。目標應用程式負責將此物件作為協程處理或相應地將其轉換為 Flux。 |
當在 Kotlin 中宣告時,@MessagingGateway 介面方法也可以使用 suspend 修飾符標記。框架在內部利用 Mono 使用下游流執行請求-回覆。此 Mono 結果在內部由 MonoKt.awaitSingleOrNull() API 處理,以滿足閘道器被呼叫的 suspend 函式的 kotlin.coroutines.Continuation 引數
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根據 Kotlin 語言要求,此方法必須作為協程呼叫
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}