WebFlux 支援
WebFlux Spring Integration 模組 (spring-integration-webflux) 允許以響應式方式執行 HTTP 請求並處理入站 HTTP 請求。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:7.0.0"
在非 Servlet 伺服器配置的情況下,必須包含 io.projectreactor.netty:reactor-netty 依賴。
WebFlux 支援包括以下閘道器實現:WebFluxInboundEndpoint 和 WebFluxRequestExecutingMessageHandler。此支援完全基於 Spring WebFlux 和 Project Reactor 基礎。有關更多資訊,請參閱HTTP 支援,因為許多選項在響應式和常規 HTTP 元件之間是共享的。
WebFlux 名稱空間支援
Spring Integration 提供了 webflux 名稱空間和相應的模式定義。要將其包含在您的配置中,請在應用程式上下文配置檔案中新增以下名稱空間宣告:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站元件
從 5.0 版本開始,提供了 WebHandler 的 WebFluxInboundEndpoint 實現。此元件類似於基於 MVC 的 HttpRequestHandlingEndpointSupport,它透過新提取的 BaseHttpInboundEndpoint 共享一些常用選項。它用於 Spring WebFlux 響應式環境(而不是 MVC)。以下示例展示了 WebFlux 端點的簡單實現:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
此配置與 HttpRequestHandlingEndpointSupport(在示例之前提到)類似,不同之處在於我們使用 @EnableWebFlux 將 WebFlux 基礎設施新增到我們的整合應用程式中。此外,WebFluxInboundEndpoint 透過響應式 HTTP 伺服器實現提供的背壓、按需能力執行到下游流的 sendAndReceive 操作。
回覆部分也是非阻塞的,並且基於內部的 FutureReplyChannel,該通道被扁平對映到回覆 Mono 以實現按需解析。 |
您可以為 WebFluxInboundEndpoint 配置自定義的 ServerCodecConfigurer、RequestedContentTypeResolver,甚至 ReactiveAdapterRegistry。後者提供了一種機制,您可以使用它將回復作為任何響應式型別返回:Reactor Flux、RxJava Observable、Flowable 等。透過這種方式,我們可以使用 Spring Integration 元件實現伺服器傳送事件場景,如下例所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有關更多可能的配置選項,請參閱請求對映支援和跨域資源共享 (CORS) 支援。
當請求體為空或 payloadExpression 返回 null 時,請求引數 (MultiValueMap<String, String>) 用作要處理的目標訊息的 payload。
負載驗證
從 5.2 版本開始,WebFluxInboundEndpoint 可以配置 Validator。與 HTTP 支援中的 MVC 驗證不同,它用於在執行回退和 payloadExpression 函式之前,驗證 Publisher 中的元素(請求已透過 HttpMessageReader 轉換為 Publisher)。框架無法假設在構建最終負載後 Publisher 物件會多麼複雜。如果需要限制驗證可見性以準確地針對最終負載(或其 Publisher 元素),則驗證應該在 WebFlux 端點下游進行。有關更多資訊,請參閱 Spring WebFlux 文件。無效負載將被 IntegrationWebExchangeBindException(WebExchangeBindException 的擴充套件)拒絕,其中包含所有驗證 Errors。有關驗證的更多資訊,請參閱 Spring Framework 參考手冊。
WebFlux 出站元件
WebFluxRequestExecutingMessageHandler(從 5.0 版本開始)的實現類似於 HttpRequestExecutingMessageHandler。它使用 Spring Framework WebFlux 模組中的 WebClient。要配置它,請定義一個類似於以下的 bean:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("https://:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("https://:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("https://:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="https:///test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="https:///example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient 的 exchange() 操作返回一個 Mono<ClientResponse>,它被對映(透過使用多個 Mono.map() 步驟)到 AbstractIntegrationMessageBuilder 作為 WebFluxRequestExecutingMessageHandler 的輸出。與作為 outputChannel 的 ReactiveChannel 一起,Mono<ClientResponse> 的評估被推遲,直到進行下游訂閱。否則,它被視為 async 模式,並且 Mono 響應被適配為 SettableListenableFuture,用於從 WebFluxRequestExecutingMessageHandler 進行非同步回覆。輸出訊息的目標負載取決於 WebFluxRequestExecutingMessageHandler 的配置。setExpectedResponseType(Class<?>) 或 setExpectedResponseTypeExpression(Expression) 標識響應體元素轉換的目標型別。如果 replyPayloadToFlux 設定為 true,則響應體將轉換為 Flux,每個元素都具有提供的 expectedResponseType,並且此 Flux 作為負載傳送到下游。之後,您可以使用拆分器以響應式方式迭代此 Flux。
此外,可以將 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中,而不是 expectedResponseType 和 replyPayloadToFlux 屬性。它可用於對 ClientHttpResponse 進行低階訪問,並更好地控制正文和 HTTP 頭部的轉換。Spring Integration 提供了 ClientHttpResponseBodyExtractor 作為標識函式,用於生成(下游)整個 ClientHttpResponse 和任何其他可能的自定義邏輯。
從 5.2 版本開始,WebFluxRequestExecutingMessageHandler 支援響應式 Publisher、Resource 和 MultiValueMap 型別作為請求訊息負載。內部使用相應的 BodyInserter 填充到 WebClient.RequestBodySpec 中。當負載是響應式 Publisher 時,可以使用配置的 publisherElementType 或 publisherElementTypeExpression 來確定釋出者元素型別。表示式必須解析為 Class<?>、解析為目標 Class<?> 的 String 或 ParameterizedTypeReference。
從 5.5 版本開始,WebFluxRequestExecutingMessageHandler 暴露了一個 extractResponseBody 標誌(預設為 true),用於僅返回響應體,或返回整個 ResponseEntity 作為回覆訊息負載,而與提供的 expectedResponseType 或 replyPayloadToFlux 無關。如果 ResponseEntity 中不存在正文,則此標誌將被忽略並返回整個 ResponseEntity。
有關更多可能的配置選項,請參閱HTTP 出站元件。
WebFlux 頭部對映
由於 WebFlux 元件完全基於 HTTP 協議,因此 HTTP 頭部對映沒有區別。有關更多可能的選項和用於對映頭部的元件,請參閱HTTP 頭部對映。
WebFlux 請求屬性
從 6.0 版本開始,WebFluxRequestExecutingMessageHandler 可以配置為透過 setAttributeVariablesExpression() 評估請求屬性。此 SpEL 表示式必須在 Map 中進行評估。然後,此 Map 將傳播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 請求配置回撥。如果需要以鍵值物件的形式將資訊從 Message 傳遞到請求,並且下游過濾器將訪問這些屬性進行進一步處理,這將很有幫助。