協程

Kotlin 協程 是 Kotlin 的輕量級執行緒,允許以命令式方式編寫非阻塞程式碼。在語言方面,掛起函式提供了非同步操作的抽象,而在庫方面,kotlinx.coroutines 提供了諸如 async { } 的函式和諸如 Flow 的型別。

Spring Framework 在以下範圍提供協程支援

  • 在 Spring MVC 和 WebFlux 的 @Controller 註解中支援 DeferredFlow 返回值

  • 在 Spring MVC 和 WebFlux 的 @Controller 註解中支援掛起函式

  • WebFlux 客戶端伺服器 函式式 API 的擴充套件。

  • WebFlux.fn coRouter { } DSL

  • WebFlux CoWebFilter

  • 在 RSocket @MessageMapping 註解方法中支援掛起函式和 Flow

  • RSocketRequester 的擴充套件

  • Spring AOP

依賴項

kotlinx-coroutines-corekotlinx-coroutines-reactor 依賴項在類路徑中時,協程支援被啟用

build.gradle.kts

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支援 1.4.0 及以上版本。

響應式如何轉換為協程?

對於返回值,從響應式到協程 API 的轉換如下

  • fun handler(): Mono<Void> 變為 suspend fun handler()

  • fun handler(): Mono<T> 變為 suspend fun handler(): Tsuspend fun handler(): T?,取決於 Mono 是否可以為空(優點是具有更強的靜態型別)

  • fun handler(): Flux<T> 變為 fun handler(): Flow<T>

對於輸入引數

  • 如果不需要惰性,fun handler(mono: Mono<T>) 變為 fun handler(value: T),因為可以呼叫掛起函式來獲取值引數。

  • 如果需要惰性,fun handler(mono: Mono<T>) 變為 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)

Flow 是協程世界中的 Flux 等價物,適用於熱或冷流,有限或無限流,主要區別如下

  • Flow 是推式的,而 Flux 是推拉混合的

  • 背壓透過掛起函式實現

  • Flow 只有一個 單個掛起 collect 方法,並且運算子作為 擴充套件 實現

  • 得益於協程,運算子易於實現

  • 擴充套件允許向 Flow 新增自定義運算子

  • 收集操作是掛起函式

  • map 運算子 支援非同步操作(無需 flatMap),因為它接受一個掛起函式引數

閱讀這篇關於 使用 Spring、協程和 Kotlin Flow 走向響應式 的部落格文章,瞭解更多細節,包括如何使用協程併發執行程式碼。

控制器

這是一個協程 @RestController 的示例。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

也支援使用 @Controller 進行檢視渲染。

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

這是一個透過 coRouter { } DSL 定義的協程路由器及其相關處理程式的示例。

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

事務

協程上的事務透過響應式事務管理以程式設計方式支援。

對於掛起函式,提供了 TransactionalOperator.executeAndAwait 擴充套件。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

	suspend fun initDatabase() = operator.executeAndAwait {
		insertPerson1()
		insertPerson2()
	}

	private suspend fun insertPerson1() {
		// INSERT SQL statement
	}

	private suspend fun insertPerson2() {
		// INSERT SQL statement
	}
}

對於 Kotlin Flow,提供了 Flow<T>.transactional 擴充套件。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

	fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

	private fun findPeople(): Flow<Person> {
		// SELECT SQL statement
	}

	private suspend fun updatePerson(person: Person): Person {
		// UPDATE SQL statement
	}
}

上下文傳播

Spring 應用程式 使用 Micrometer 進行可觀測性支援。對於追蹤支援,當前的觀察結果透過 ThreadLocal 傳播給阻塞程式碼,或者透過 Reactor Context 傳播給響應式管道。但當前的觀察結果也需要在掛起函式的執行上下文中可用。如果沒有這一點,當前的“traceId”將不會自動新增到協程的日誌語句中。

PropagationContextElement 運算子通常確保 Micrometer 上下文傳播庫 與 Kotlin 協程一起工作。

它需要 io.micrometer:context-propagation 依賴項,以及可選的 org.jetbrains.kotlinx:kotlinx-coroutines-reactor。透過呼叫 Hooks.enableAutomaticContextPropagation() 可以啟用 CoroutinesUtils#invokeSuspendingFunction(Spring 用於將協程適配到 Reactor FluxMono)的自動上下文傳播。

應用程式也可以顯式使用 PropagationContextElement 來使用上下文傳播機制增強 CoroutineContext

fun main() {
	runBlocking(Dispatchers.IO + PropagationContextElement()) {
		waitAndLog()
	}
}

suspend fun waitAndLog() {
	delay(10)
	logger.info("Suspending function with traceId")
}

在這裡,假設 Micrometer Tracing 已配置,生成的日誌語句將顯示當前的“traceId”,併為您的應用程式解鎖更好的可觀測性。

© . This site is unofficial and not affiliated with VMware.