非同步請求
DeferredResult
一旦在 Servlet 容器中啟用了非同步請求處理功能,控制器方法就可以用 DeferredResult
包裝任何支援的控制器方法返回值,如下例所示
-
Java
-
Kotlin
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(result);
@GetMapping("/quotes")
@ResponseBody
fun quotes(): DeferredResult<String> {
val deferredResult = DeferredResult<String>()
// Save the deferredResult somewhere..
return deferredResult
}
// From some other thread...
deferredResult.setResult(result)
控制器可以從不同的執行緒非同步產生返回值,例如,響應外部事件 (JMS 訊息)、計劃任務或其他事件。
Callable
控制器可以用 java.util.concurrent.Callable
包裝任何支援的返回值,如下例所示
-
Java
-
Kotlin
@PostMapping
public Callable<String> processUpload(final MultipartFile file) {
return () -> "someView";
}
@PostMapping
fun processUpload(file: MultipartFile) = Callable<String> {
// ...
"someView"
}
然後可以透過配置的 AsyncTaskExecutor
執行給定任務來獲取返回值。
處理流程
以下是 Servlet 非同步請求處理的非常簡潔的概覽
-
透過呼叫
request.startAsync()
可以將ServletRequest
置於非同步模式。這樣做主要的效果是 Servlet (以及任何過濾器) 可以退出,但響應保持開啟狀態以允許稍後完成處理。 -
呼叫
request.startAsync()
會返回AsyncContext
,您可以使用它來進一步控制非同步處理。例如,它提供了dispatch
方法,類似於 Servlet API 中的 forward,不同之處在於它允許應用程式在 Servlet 容器執行緒上恢復請求處理。 -
ServletRequest
提供了對當前DispatcherType
的訪問,您可以使用它來區分處理初始請求、非同步分派、forward 和其他分派型別。
DeferredResult
處理流程如下
-
控制器返回一個
DeferredResult
並將其儲存在某個記憶體佇列或列表中,以便可以訪問它。 -
Spring MVC 呼叫
request.startAsync()
。 -
同時,
DispatcherServlet
和所有配置的過濾器退出請求處理執行緒,但響應保持開啟狀態。 -
應用程式從某個執行緒設定
DeferredResult
,然後 Spring MVC 將請求分派回 Servlet 容器。 -
DispatcherServlet
再次被呼叫,處理流程繼續進行,使用非同步產生的返回值。
Callable
處理流程如下
-
控制器返回一個
Callable
。 -
Spring MVC 呼叫
request.startAsync()
並將Callable
提交給AsyncTaskExecutor
以在單獨的執行緒中進行處理。 -
同時,
DispatcherServlet
和所有過濾器退出 Servlet 容器執行緒,但響應保持開啟狀態。 -
最終
Callable
產生結果,Spring MVC 將請求分派回 Servlet 容器以完成處理。 -
DispatcherServlet
再次被呼叫,處理流程繼續進行,使用從Callable
非同步產生的返回值。
有關更多背景和上下文,您還可以閱讀在 Spring MVC 3.2 中引入非同步請求處理支援的部落格文章。
異常處理
使用 DeferredResult
時,您可以選擇呼叫 setResult
或 setErrorResult
並帶上異常。在兩種情況下,Spring MVC 都會將請求分派回 Servlet 容器以完成處理。然後,它被視為控制器方法返回了給定值,或者視為產生了給定異常。異常會透過常規的異常處理機制(例如,呼叫 @ExceptionHandler
方法)進行處理。
使用 Callable
時,會發生類似的處理邏輯,主要區別在於結果是從 Callable
返回,或者由它丟擲異常。
攔截
HandlerInterceptor
例項可以是 AsyncHandlerInterceptor
型別,以在啟動非同步處理的初始請求上接收 afterConcurrentHandlingStarted
回撥(而不是 postHandle
和 afterCompletion
)。
HandlerInterceptor
實現還可以註冊一個 CallableProcessingInterceptor
或 DeferredResultProcessingInterceptor
,以更深入地整合到非同步請求的生命週期中(例如,處理超時事件)。有關更多詳細資訊,請參見AsyncHandlerInterceptor
的 javadoc。
DeferredResult
提供了 onTimeout(Runnable)
和 onCompletion(Runnable)
回撥。有關更多詳細資訊,請參見DeferredResult
的 javadoc。Callable
可以替換為 WebAsyncTask
,後者暴露了超時和完成回撥的附加方法。
非同步 Spring MVC 與 WebFlux 對比
Servlet API 最初設計用於在 Filter-Servlet 鏈中進行單次傳遞。非同步請求處理允許應用程式退出 Filter-Servlet 鏈但保留響應處於開啟狀態以供進一步處理。Spring MVC 的非同步支援就是圍繞這一機制構建的。當控制器返回 DeferredResult
時,Filter-Servlet 鏈被退出,Servlet 容器執行緒被釋放。稍後,當 DeferredResult
被設定時,會進行一次 ASYNC
分派(到相同的 URL),在此期間控制器再次被對映,但不是呼叫它,而是使用 DeferredResult
值(就像控制器返回了它一樣)來恢復處理。
相比之下,Spring WebFlux 既不是基於 Servlet API 構建的,也不需要這樣的非同步請求處理功能,因為它從設計上就是非同步的。非同步處理內置於所有框架契約中,並在請求處理的所有階段都得到內在支援。
從程式設計模型的角度來看,Spring MVC 和 Spring WebFlux 都支援將非同步和響應式型別作為控制器方法的返回值。Spring MVC 甚至支援流式傳輸,包括響應式背壓。然而,對響應的單次寫入仍然是阻塞的(並在單獨的執行緒上執行),這與 WebFlux 不同,WebFlux 依賴非阻塞 I/O,並且每次寫入都不需要額外的執行緒。
另一個根本區別是,Spring MVC 不支援在控制器方法引數中使用非同步或響應式型別(例如,@RequestBody
、@RequestPart
等),也沒有明確支援將非同步和響應式型別作為模型屬性。Spring WebFlux 支援所有這些。
最後,從配置的角度來看,必須在Servlet 容器級別啟用非同步請求處理功能。
HTTP 流式傳輸
您可以使用 DeferredResult
和 Callable
處理單個非同步返回值。如果您想產生多個非同步值並將其寫入響應,該怎麼辦?本節描述瞭如何做到這一點。
物件
您可以使用 ResponseBodyEmitter
返回值產生物件流,其中每個物件都使用HttpMessageConverter
進行序列化並寫入響應,如下例所示
-
Java
-
Kotlin
@GetMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events")
fun handle() = ResponseBodyEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
您還可以將 ResponseBodyEmitter
用作 ResponseEntity
中的 body,允許您自定義響應的狀態和頭部。
當 emitter
丟擲 IOException
(例如,遠端客戶端斷開連線)時,應用程式無需負責清理連線,也不應呼叫 emitter.complete
或 emitter.completeWithError
。相反,servlet 容器會自動啟動 AsyncListener
錯誤通知,Spring MVC 在此通知中呼叫 completeWithError
。這個呼叫進而嚮應用程式執行最後一次 ASYNC
分派,在此期間 Spring MVC 呼叫配置的異常解析器並完成請求。
SSE
SseEmitter
(ResponseBodyEmitter
的子類)提供了對Server-Sent Events 的支援,其中從伺服器傳送的事件按照 W3C SSE 規範進行格式化。要從控制器生成 SSE 流,返回 SseEmitter
,如下例所示
-
Java
-
Kotlin
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {
SseEmitter emitter = new SseEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun handle() = SseEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
雖然 SSE 是流式傳輸到瀏覽器的主要選項,但請注意 Internet Explorer 不支援 Server-Sent Events。考慮使用 Spring 的WebSocket 訊息以及SockJS 回退傳輸(包括 SSE),這些傳輸目標廣泛的瀏覽器。
另請參閱上一節關於異常處理的說明。
原始資料
有時,跳過訊息轉換並直接流式傳輸到響應的 OutputStream
非常有用(例如,用於檔案下載)。您可以使用 StreamingResponseBody
返回值型別來實現此目的,如下例所示
-
Java
-
Kotlin
@GetMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}
@GetMapping("/download")
fun handle() = StreamingResponseBody {
// write...
}
您可以使用 StreamingResponseBody
作為 ResponseEntity
中的 body,以自定義響應的狀態和頭部。
響應式型別
Spring MVC 支援在控制器中使用響應式客戶端庫(另請閱讀 WebFlux 部分中的響應式庫)。這包括來自 spring-webflux
的 WebClient
以及其他庫,例如 Spring Data 響應式資料倉庫。在這種情況下,能夠從控制器方法返回響應式型別非常方便。
響應式返回值的處理方式如下
-
單值 promise 會被適配,類似於使用
DeferredResult
。示例包括Mono
(Reactor) 或Single
(RxJava)。 -
具有流式媒體型別(例如
application/x-ndjson
或text/event-stream
)的多值流會被適配,類似於使用ResponseBodyEmitter
或SseEmitter
。示例包括Flux
(Reactor) 或Observable
(RxJava)。應用程式還可以返回Flux<ServerSentEvent>
或Observable<ServerSentEvent>
。 -
具有任何其他媒體型別(例如
application/json
)的多值流會被適配,類似於使用DeferredResult<List<?>>
。
Spring MVC 透過來自 spring-core 的 ReactiveAdapterRegistry 支援 Reactor 和 RxJava,這使得它可以適配來自多個響應式庫的型別。 |
對於流式傳輸到響應,支援響應式背壓,但對響應的寫入仍然是阻塞的,並透過配置的 AsyncTaskExecutor
在單獨的執行緒上執行,以避免阻塞上游源,例如從 WebClient
返回的 Flux
。
上下文傳播
通常透過 java.lang.ThreadLocal
傳播上下文。這對於在同一執行緒上的處理是透明工作的,但對於跨多個執行緒的非同步處理需要額外的工作。Micrometer Context Propagation 庫簡化了跨執行緒以及跨上下文機制(如 ThreadLocal
值、Reactor 上下文、GraphQL Java 上下文等)的上下文傳播。
如果 Micrometer Context Propagation 存在於類路徑上,當控制器方法返回響應式型別(如 Flux
或 Mono
)時,所有已註冊了 io.micrometer.ThreadLocalAccessor
的 ThreadLocal
值都會以鍵值對的形式寫入 Reactor Context
,使用由 ThreadLocalAccessor
分配的鍵。
對於其他非同步處理場景,您可以直接使用 Context Propagation 庫。例如
// Capture ThreadLocal values from the main thread ...
ContextSnapshot snapshot = ContextSnapshot.captureAll();
// On a different thread: restore ThreadLocal values
try (ContextSnapshot.Scope scope = snapshot.setThreadLocals()) {
// ...
}
提供了以下現成的 ThreadLocalAccessor
實現
-
LocaleContextThreadLocalAccessor
— 透過LocaleContextHolder
傳播LocaleContext
-
RequestAttributesThreadLocalAccessor
— 透過RequestContextHolder
傳播RequestAttributes
以上實現不會自動註冊。您需要在啟動時透過 ContextRegistry.getInstance()
進行註冊。
有關更多詳細資訊,請參見 Micrometer Context Propagation 庫的文件。
斷開連線
Servlet API 不提供遠端客戶端斷開連線時的任何通知。因此,在流式傳輸到響應時,無論是透過SseEmitter 還是響應式型別,定期傳送資料非常重要,因為如果客戶端已斷開連線,寫入會失敗。傳送的資料可以採用空的(僅註釋)SSE 事件形式,或者任何其他對方需要解釋為心跳並忽略的資料。
或者,考慮使用具有內建心跳機制的 Web 訊息解決方案(例如基於 WebSocket 的 STOMP 或帶有SockJS 的 WebSocket)。
配置
必須在 Servlet 容器級別啟用非同步請求處理功能。MVC 配置也暴露了一些非同步請求的選項。
Servlet 容器
Filter 和 Servlet 宣告有一個 asyncSupported
標誌,需要設定為 true
以啟用非同步請求處理。此外,應宣告 Filter mapping 來處理 ASYNC
型別的 jakarta.servlet.DispatchType
。
在 Java 配置中,當您使用 AbstractAnnotationConfigDispatcherServletInitializer
初始化 Servlet 容器時,這會自動完成。
在 web.xml
配置中,您可以將 <async-supported>true</async-supported>
新增到 DispatcherServlet
和 Filter
宣告中,並將 <dispatcher>ASYNC</dispatcher>
新增到 filter mapping 中。
Spring MVC
MVC 配置暴露了以下非同步請求處理選項
-
Java 配置:在
WebMvcConfigurer
上使用configureAsyncSupport
回撥。 -
XML 名稱空間:在
<mvc:annotation-driven>
下使用<async-support>
元素。
您可以配置以下內容
-
非同步請求的預設超時值取決於底層的 Servlet 容器,除非明確設定。
-
用於在使用響應式型別進行流式傳輸時進行阻塞寫入以及執行從控制器方法返回的
Callable
例項的AsyncTaskExecutor
。預設使用的 executor 不適合在負載下的生產環境中使用。 -
DeferredResultProcessingInterceptor
實現和CallableProcessingInterceptor
實現。
注意,你也可以在 DeferredResult
、ResponseBodyEmitter
和 SseEmitter
上設定預設的超時值。對於 Callable
,你可以使用 WebAsyncTask
來提供超時值。