Kotlin 支援

框架也已改進以支援用於函式的 Kotlin Lambda 表示式,因此您現在可以使用 Kotlin 語言和 Spring Integration 流定義結合進行開發

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin Coroutines

從 6.0 版本開始,Spring Integration 提供了對 Kotlin Coroutines 的支援。現在,suspend 函式以及 kotlinx.coroutines.Deferredkotlinx.coroutines.flow.Flow 返回型別可用於服務方法

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

框架將其視為 Reactive Streams 互動,並使用 ReactiveAdapterRegistry 轉換為相應的 MonoFlux Reactor 型別。然後,此類函式回覆會在回覆通道(如果是 ReactiveStreamsSubscribableChannel)或相應的回撥中作為 CompletableFuture 的結果進行處理。

預設情況下,具有 Flow 結果的函式在 @ServiceActivator 上不是非同步的,因此 Flow 例項作為回覆訊息的 payload 生成。目標應用程式負責分別將此物件作為協程處理或將其轉換為 Flux

@MessagingGateway 介面方法在 Kotlin 中宣告時,也可以使用 suspend 修飾符標記。框架內部使用 Mono 來透過下游流執行請求-回覆。此類 Mono 結果由內部的 MonoKt.awaitSingleOrNull() API 處理,以滿足被呼叫的閘道器 suspend 函式的 kotlin.coroutines.Continuation 引數

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

根據 Kotlin 語言要求,必須將此方法作為協程呼叫

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}