資料緩衝區和編解碼器

Java NIO 提供了 ByteBuffer,但許多庫在此基礎上構建了自己的位元組緩衝區 API,特別是對於網路操作,重用緩衝區和/或使用直接緩衝區對效能有利。例如,Netty 有 ByteBuf 層次結構,Undertow 使用 XNIO,Jetty 使用帶釋放回調的池化位元組緩衝區,等等。spring-core 模組提供了一組抽象來處理各種位元組緩衝區 API,如下所示

DataBufferFactory

DataBufferFactory 用於透過以下兩種方式之一建立資料緩衝區

  1. 分配一個新的資料緩衝區,如果容量已知,可以選擇預先指定,這樣效率更高,儘管 DataBuffer 的實現可以按需增長和收縮。

  2. 包裝現有的 byte[]java.nio.ByteBuffer,這將使用 DataBuffer 實現裝飾給定資料,且不涉及分配。

注意,WebFlux 應用不直接建立 DataBufferFactory,而是透過 ServerHttpResponse 或客戶端的 ClientHttpRequest 訪問它。工廠的型別取決於底層的客戶端或伺服器,例如,Reactor Netty 使用 NettyDataBufferFactory,其他則使用 DefaultDataBufferFactory

DataBuffer

DataBuffer 介面提供與 java.nio.ByteBuffer 類似的操作,但也帶來了一些額外的優勢,其中一些受到了 Netty ByteBuf 的啟發。以下是部分優勢列表

  • 具有獨立的讀寫位置,即在讀寫之間切換時不需要呼叫 flip()

  • 容量按需擴充套件,類似於 java.lang.StringBuilder

  • 透過 PooledDataBuffer 進行池化緩衝區和引用計數。

  • 將緩衝區視為 java.nio.ByteBufferInputStreamOutputStream

  • 確定給定位元組的索引或最後一個索引。

PooledDataBuffer

正如 ByteBuffer 的 Javadoc 中所解釋的,位元組緩衝區可以是直接的或非直接的。直接緩衝區可能駐留在 Java 堆之外,這消除了原生 I/O 操作中的複製需求。這使得直接緩衝區特別適合透過 socket 接收和傳送資料,但它們的建立和釋放成本也更高,從而產生了池化緩衝區的想法。

PooledDataBufferDataBuffer 的一個擴充套件,它有助於引用計數,這對於位元組緩衝區池化至關重要。它是如何工作的?當分配一個 PooledDataBuffer 時,引用計數為 1。呼叫 retain() 會增加計數,而呼叫 release() 會減少計數。只要計數大於 0,緩衝區就保證不會被釋放。當計數減少到 0 時,池化緩衝區可以被釋放,這實際上意味著為緩衝區保留的記憶體被返還給記憶體池。

注意,在大多數情況下,最好不要直接操作 PooledDataBuffer,而是使用 DataBufferUtils 中的便利方法,這些方法僅當 DataBufferPooledDataBuffer 的例項時才對其應用 release 或 retain 操作。

DataBufferUtils

DataBufferUtils 提供了許多操作資料緩衝區的工具方法

  • 將資料緩衝區流連線成一個單獨的緩衝區,如果底層位元組緩衝區 API 支援,可能實現零複製,例如透過組合緩衝區。

  • InputStream 或 NIO Channel 轉換為 Flux<DataBuffer>,反之,將 Publisher<DataBuffer> 轉換為 OutputStream 或 NIO Channel

  • 如果緩衝區是 PooledDataBuffer 的例項,則釋放或 retain DataBuffer 的方法。

  • 從位元組流中跳過或獲取指定位元組數。

編解碼器

org.springframework.core.codec 包提供以下策略介面

  • Encoder 用於將 Publisher<T> 編碼為資料緩衝區流。

  • Decoder 用於將 Publisher<DataBuffer> 解碼為更高級別的物件流。

spring-core 模組提供了 byte[]ByteBufferDataBufferResourceString 的編碼器和解碼器實現。spring-web 模組增加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 以及其他編碼器和解碼器。詳見 WebFlux 部分的編解碼器

使用 DataBuffer

使用資料緩衝區時,必須特別注意確保緩衝區被釋放,因為它們可能是池化的。我們將使用編解碼器來說明這一點,但這些概念更具普遍性。讓我們看看編解碼器內部必須如何管理資料緩衝區。

Decoder 是最後一個讀取輸入資料緩衝區的,在建立更高級別物件之前,因此它必須如下釋放它們

  1. 如果 Decoder 只是簡單地讀取每個輸入緩衝區並準備立即釋放它,它可以透過 DataBufferUtils.release(dataBuffer) 來做到。

  2. 如果 Decoder 使用 FluxMono 運算子,如 flatMapreduce 等,這些運算子在內部預取和快取資料項,或者使用 filterskip 等丟棄某些項的運算子,則必須在組合鏈中新增 doOnDiscard(DataBuffer.class, DataBufferUtils::release),以確保這些緩衝區在被丟棄之前被釋放,即使是由於錯誤或取消訊號導致。

  3. 如果 Decoder 以任何其他方式持有一個或多個數據緩衝區,它必須確保在完全讀取後釋放它們,或者在快取的資料緩衝區被讀取和釋放之前發生錯誤或取消訊號時釋放它們。

注意,DataBufferUtils#join 提供了一種安全高效的方式將資料緩衝區流聚合成一個單獨的資料緩衝區。類似地,skipUntilByteCounttakeUntilByteCount 是供解碼器使用的其他安全方法。

Encoder 分配資料緩衝區供其他人讀取(和釋放)。因此 Encoder 不需要做太多工作。但是,如果在用資料填充緩衝區時發生序列化錯誤,Encoder 必須注意釋放該資料緩衝區。例如

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的消費者負責釋放其接收的資料緩衝區。在 WebFlux 應用中,Encoder 的輸出用於寫入 HTTP 伺服器響應或客戶端 HTTP 請求,在這種情況下,釋放資料緩衝區的責任在於寫入伺服器響應或客戶端請求的程式碼。

注意,在 Netty 上執行時,有一些用於排查緩衝區洩露的除錯選項。