資料緩衝區和編解碼器

Java NIO 提供了 ByteBuffer,但許多庫在此之上構建了自己的位元組緩衝區 API,特別是對於網路操作,其中重用緩衝區和/或使用直接緩衝區有助於提高效能。例如,Netty 有 ByteBuf 層次結構,Jetty 使用帶有回撥函式的池化位元組緩衝區來釋放,等等。spring-core 模組提供了一組抽象來處理各種位元組緩衝區 API,如下所示:

DataBufferFactory

DataBufferFactory 用於以以下兩種方式之一建立資料緩衝區:

  1. 分配一個新的資料緩衝區,如果容量已知,可以選擇預先指定容量,這更高效,儘管 DataBuffer 的實現可以按需增長和縮小。

  2. 包裝一個現有的 byte[]java.nio.ByteBuffer,它用 DataBuffer 實現裝飾給定的資料,並且不涉及分配。

請注意,WebFlux 應用程式不直接建立 DataBufferFactory,而是透過 ServerHttpResponse 或客戶端的 ClientHttpRequest 訪問它。工廠的型別取決於底層客戶端或伺服器,例如,Reactor Netty 的 NettyDataBufferFactory,其他情況的 DefaultDataBufferFactory

DataBuffer

DataBuffer 介面提供與 java.nio.ByteBuffer 類似的操作,但也帶來了一些額外的優點,其中一些受到了 Netty ByteBuf 的啟發。以下是部分優點列表:

  • 讀寫具有獨立的 position,即無需呼叫 flip() 來在讀寫之間切換。

  • 容量按需擴充套件,如同 java.lang.StringBuilder

  • 透過 PooledDataBuffer 實現的池化緩衝區和引用計數。

  • 將緩衝區視為 java.nio.ByteBufferInputStreamOutputStream

  • 確定給定位元組的索引或最後一個索引。

PooledDataBuffer

ByteBuffer 的 Javadoc 中所述,位元組緩衝區可以是直接的或非直接的。直接緩衝區可以駐留在 Java 堆之外,這消除了本地 I/O 操作的複製需求。這使得直接緩衝區對於透過套接字接收和傳送資料特別有用,但它們的建立和釋放成本也更高,這導致了池化緩衝區的想法。

PooledDataBufferDataBuffer 的一個擴充套件,有助於引用計數,這對於位元組緩衝區池化至關重要。它是如何工作的?當分配一個 PooledDataBuffer 時,引用計數為 1。呼叫 retain() 會增加計數,而呼叫 release() 會減少計數。只要計數大於 0,就保證緩衝區不會被釋放。當計數減少到 0 時,池化緩衝區可以被釋放,這實際上可能意味著為緩衝區保留的記憶體會返回到記憶體池。

請注意,在大多數情況下,最好使用 DataBufferUtils 中的便利方法,它們僅當 DataBufferPooledDataBuffer 的例項時才對其應用釋放或保留操作,而不是直接操作 PooledDataBuffer

DataBufferUtils

DataBufferUtils 提供了許多實用方法來操作資料緩衝區:

  • 將資料緩衝區流合併到單個緩衝區中,可能進行零複製,例如,如果底層位元組緩衝區 API 支援,則透過複合緩衝區實現。

  • InputStream 或 NIO Channel 轉換為 Flux<DataBuffer>,反之,將 Publisher<DataBuffer> 轉換為 OutputStream 或 NIO Channel

  • 如果緩衝區是 PooledDataBuffer 的例項,則提供釋放或保留 DataBuffer 的方法。

  • 從位元組流中跳過或獲取直到達到特定位元組數。

編解碼器 (Codecs)

org.springframework.core.codec 包提供以下策略介面:

  • Encoder 用於將 Publisher<T> 編碼為資料緩衝區流。

  • Decoder 用於將 Publisher<DataBuffer> 解碼為更高級別的物件流。

spring-core 模組提供了 byte[]ByteBufferDataBufferResourceString 的編碼器和解碼器實現。spring-web 模組增加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他編碼器和解碼器。請參閱 WebFlux 部分的 編解碼器

使用 DataBuffer

處理資料緩衝區時,必須特別小心以確保緩衝區被釋放,因為它們可能是 池化的。我們將使用編解碼器來說明其工作原理,但這些概念更普遍適用。讓我們看看編解碼器在內部必須如何管理資料緩衝區。

一個 Decoder 是在建立更高級別物件之前最後讀取輸入資料緩衝區的,因此它必須按如下方式釋放它們:

  1. 如果 Decoder 只是簡單地讀取每個輸入緩衝區並準備立即釋放它,它可以透過 DataBufferUtils.release(dataBuffer) 來完成。

  2. 如果 Decoder 使用 FluxMono 運算子,例如 flatMapreduce 等在內部預取和快取資料項,或者使用 filterskip 等運算子遺漏項,那麼必須將 doOnDiscard(DataBuffer.class, DataBufferUtils::release) 新增到組合鏈中,以確保此類緩衝區在被丟棄之前被釋放,這可能也是錯誤或取消訊號的結果。

  3. 如果 Decoder 以任何其他方式持有一個或多個數據緩衝區,它必須確保在完全讀取時,或在快取的資料緩衝區被讀取和釋放之前發生錯誤或取消訊號時,它們會被釋放。

請注意,DataBufferUtils#join 提供了一種安全高效的方法,可以將資料緩衝區流聚合到單個數據緩衝區中。同樣,skipUntilByteCounttakeUntilByteCount 是解碼器可以使用的其他安全方法。

一個 Encoder 分配資料緩衝區供其他元件讀取(並釋放)。因此 Encoder 沒有太多事情要做。然而,如果填充緩衝區資料時發生序列化錯誤,Encoder 必須注意釋放資料緩衝區。例如:

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的消費者負責釋放它接收到的資料緩衝區。在 WebFlux 應用程式中,Encoder 的輸出用於寫入 HTTP 伺服器響應或客戶端 HTTP 請求,在這種情況下,釋放資料緩衝區是寫入伺服器響應或客戶端請求的程式碼的責任。

請注意,在 Netty 上執行時,有針對 緩衝區洩漏故障排除 的除錯選項。

© . This site is unofficial and not affiliated with VMware.