WebFlux 支援
WebFlux Spring Integration 模組(spring-integration-webflux
)允許以響應式方式執行 HTTP 請求和處理入站 HTTP 請求。
你需要將此依賴新增到你的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.4.4"
在非 Servlet 的伺服器配置情況下,必須包含 io.projectreactor.netty:reactor-netty
依賴。
WebFlux 支援包含以下閘道器實現:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。該支援完全基於 Spring WebFlux 和 Project Reactor 的基礎。有關更多資訊,請參閱HTTP 支援,因為響應式 HTTP 元件和常規 HTTP 元件共享許多選項。
WebFlux Namespace 支援
Spring Integration 提供了 webflux
namespace 和相應的 Schema 定義。要將其包含在你的配置中,請在你的應用上下文配置檔案中新增以下 namespace 宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站元件
從 5.0 版本開始,提供了 WebFluxInboundEndpoint
實現 WebHandler
。此元件類似於基於 MVC 的 HttpRequestHandlingEndpointSupport
,它們透過新提取的 BaseHttpInboundEndpoint
共享一些通用選項。它用於 Spring WebFlux 響應式環境(而非 MVC)。以下示例展示了 WebFlux 端點的簡單實現
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
配置類似於(示例中提到的)HttpRequestHandlingEndpointSupport
,不同之處在於我們使用 @EnableWebFlux
將 WebFlux 基礎設施新增到我們的整合應用中。此外,WebFluxInboundEndpoint
透過響應式 HTTP 伺服器實現提供的背壓(back-pressure)和按需能力,對下游流執行 sendAndReceive
操作。
回覆部分也是非阻塞的,並且基於內部的 FutureReplyChannel ,該通道被平鋪對映到回覆 Mono ,用於按需解析。 |
你可以使用自定義的 ServerCodecConfigurer
、RequestedContentTypeResolver
甚至 ReactiveAdapterRegistry
配置 WebFluxInboundEndpoint
。後者提供了一種機制,你可以使用它以任何響應式型別返回回覆:Reactor Flux
、RxJava Observable
、Flowable
等。透過這種方式,我們可以使用 Spring Integration 元件實現Server Sent Events 場景,示例如下
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有關更多可能的配置選項,請參閱請求對映支援和跨源資源共享 (CORS) 支援。
當請求體為空或 payloadExpression
返回 null
時,請求引數(MultiValueMap<String, String>
)將用作目標訊息的 payload
進行處理。
負載驗證
從 5.2 版本開始,WebFluxInboundEndpoint
可以配置 Validator
。與HTTP 支援中的 MVC 驗證不同,它用於在執行回退和 payloadExpression
函式之前,驗證請求透過 HttpMessageReader
轉換為的 Publisher
中的元素。框架無法假定構建最終 payload 後 Publisher
物件可能有多複雜。如果需要限制驗證可見性僅針對最終 payload(或其 Publisher
元素),則驗證應該放在下游而非 WebFlux 端點。更多資訊請參閱 Spring WebFlux 文件。無效的 payload 會被 IntegrationWebExchangeBindException
(WebExchangeBindException
的擴充套件)拒絕,其中包含所有驗證 Errors
。更多資訊請參閱 Spring Framework 參考手冊關於驗證的部分。
WebFlux 出站元件
WebFluxRequestExecutingMessageHandler
(從 5.0 版本開始)的實現類似於 HttpRequestExecutingMessageHandler
。它使用 Spring Framework WebFlux 模組中的 WebClient
。要配置它,請定義一個類似於以下的 bean
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("https://:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("https://:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("https://:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="https:///test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="https:///example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
的 exchange()
操作返回一個 Mono<ClientResponse>
,該 Mono 通過幾個 Mono.map()
步驟對映到 AbstractIntegrationMessageBuilder
,作為 WebFluxRequestExecutingMessageHandler
的輸出。與 ReactiveChannel
作為 outputChannel
一起,Mono<ClientResponse>
的評估會延遲,直到下游訂閱為止。否則,它被視為 async
模式,Mono 響應會被適配到 SettableListenableFuture
,用於從 WebFluxRequestExecutingMessageHandler
返回非同步回覆。輸出訊息的目標 payload 取決於 WebFluxRequestExecutingMessageHandler
的配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
標識響應體元素轉換的目標型別。如果 replyPayloadToFlux
設定為 true
,響應體將轉換為一個 Flux
,其中每個元素具有提供的 expectedResponseType
,並且此 Flux
將作為 payload 傳送到下游。之後,你可以使用拆分器以響應式方式迭代此 Flux
。
此外,除了 expectedResponseType
和 replyPayloadToFlux
屬性之外,還可以將 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中。它可用於低級別訪問 ClientHttpResponse
,以及對主體和 HTTP 訊息頭轉換進行更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor
作為身份函式,以便(下游)生成整個 ClientHttpResponse
以及任何其他可能的自定義邏輯。
從 5.2 版本開始,WebFluxRequestExecutingMessageHandler
支援響應式 Publisher
、Resource
和 MultiValueMap
型別作為請求訊息 payload。內部使用相應的 BodyInserter
填充到 WebClient.RequestBodySpec
中。當 payload 是響應式 Publisher
時,配置的 publisherElementType
或 publisherElementTypeExpression
可用於確定 publisher 元素型別。表示式必須解析為 Class<?>
、解析為目標 Class<?>
的 String
或 ParameterizedTypeReference
。
從 5.5 版本開始,WebFluxRequestExecutingMessageHandler
暴露了一個 extractResponseBody
標誌(預設為 true
),用於僅返回響應體,或者返回整個 ResponseEntity
作為回覆訊息 payload,這獨立於提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中不存在 body,則此標誌被忽略並返回整個 ResponseEntity
。
有關更多可能的配置選項,請參閱HTTP 出站元件。
WebFlux 訊息頭對映
由於 WebFlux 元件完全基於 HTTP 協議,因此 HTTP 訊息頭對映沒有區別。有關更多可能的選項和用於對映訊息頭的元件,請參閱HTTP 訊息頭對映。
WebFlux 請求屬性
從 6.0 版本開始,WebFluxRequestExecutingMessageHandler
可以配置為透過 setAttributeVariablesExpression()
評估請求屬性。此 SpEL 表示式必須在 Map
中進行評估。然後,此 Map 將傳播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 請求配置回撥。如果需要以鍵值物件的形式將資訊從 Message
傳遞到請求,並且下游過濾器將訪問這些屬性以進行進一步處理,這將很有幫助。