協程
Kotlin 協程 是 Kotlin 的輕量級執行緒,允許以命令式方式編寫非阻塞程式碼。在語言層面,掛起函式為非同步操作提供了抽象,而在庫層面,kotlinx.coroutines 提供了諸如 async { }
的函式和諸如 Flow
的型別。
Spring Framework 在以下範圍提供協程支援
-
Spring MVC 和 WebFlux 帶註解
@Controller
中的 Deferred 和 Flow 返回值支援 -
Spring MVC 和 WebFlux 帶註解
@Controller
中的掛起函式支援 -
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWebFilter
-
RSocket 帶
@MessageMapping
註解方法中的掛起函式和Flow
支援 -
RSocketRequester
的擴充套件 -
Spring AOP
依賴
當類路徑中存在 kotlinx-coroutines-core
和 kotlinx-coroutines-reactor
依賴時,會啟用協程支援
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支援版本 1.4.0
及以上。
響應式如何轉換為協程?
對於返回值,從響應式 API 到協程 API 的轉換如下
-
fun handler(): Mono<Void>
變為suspend fun handler()
-
fun handler(): Mono<T>
變為suspend fun handler(): T
或suspend fun handler(): T?
,這取決於Mono
是否可以為空(優點是型別更加靜態) -
fun handler(): Flux<T>
變為fun handler(): Flow<T>
對於輸入引數
-
如果不需要惰性求值,
fun handler(mono: Mono<T>)
變為fun handler(value: T)
,因為可以呼叫掛起函式來獲取 value 引數。 -
如果需要惰性求值,
fun handler(mono: Mono<T>)
變為fun handler(supplier: suspend () → T)
或fun handler(supplier: suspend () → T?)
Flow
是協程世界中 Flux
的等價物,適用於熱流或冷流,有限流或無限流,主要區別如下
-
Flow
是基於推送的,而Flux
是推拉混合的 -
透過掛起函式實現背壓
-
Flow
只有一個 掛起collect
方法,運算子作為 擴充套件 實現 -
得益於協程,運算子易於實現
-
擴充套件允許向
Flow
新增自定義運算子 -
收集操作是掛起函式
-
map
運算子 支援非同步操作(無需使用flatMap
),因為它接受一個掛起函式引數
閱讀這篇關於 使用 Spring、協程和 Kotlin Flow 邁向響應式程式設計 的部落格文章,瞭解更多詳細資訊,包括如何使用協程併發執行程式碼。
控制器
這是一個協程 @RestController
的示例。
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
使用 @Controller
進行檢視渲染也受支援。
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
這是一個透過 coRouter { } DSL 定義的協程路由及其相關處理器的示例。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
事務
透過響應式事務管理的程式設計式變體支援協程上的事務。
對於掛起函式,提供了 TransactionalOperator.executeAndAwait
擴充套件。
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
對於 Kotlin Flow
,提供了 Flow<T>.transactional
擴充套件。
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}