資料緩衝區和編解碼器
Java NIO 提供了 ByteBuffer
,但許多庫在此基礎上構建了自己的位元組緩衝區 API,特別是對於網路操作,重用緩衝區和/或使用直接緩衝區對效能有利。例如,Netty 有 ByteBuf
層次結構,Undertow 使用 XNIO,Jetty 使用帶釋放回調的池化位元組緩衝區,等等。spring-core
模組提供了一組抽象來處理各種位元組緩衝區 API,如下所示
-
DataBufferFactory
抽象了資料緩衝區的建立。 -
DataBuffer
表示一個位元組緩衝區,它可以是池化的。 -
DataBufferUtils
為資料緩衝區提供了工具方法。 -
編解碼器 將資料緩衝區流解碼或編碼為更高級別的物件。
DataBufferFactory
DataBufferFactory
用於透過以下兩種方式之一建立資料緩衝區
-
分配一個新的資料緩衝區,如果容量已知,可以選擇預先指定,這樣效率更高,儘管
DataBuffer
的實現可以按需增長和收縮。 -
包裝現有的
byte[]
或java.nio.ByteBuffer
,這將使用DataBuffer
實現裝飾給定資料,且不涉及分配。
注意,WebFlux 應用不直接建立 DataBufferFactory
,而是透過 ServerHttpResponse
或客戶端的 ClientHttpRequest
訪問它。工廠的型別取決於底層的客戶端或伺服器,例如,Reactor Netty 使用 NettyDataBufferFactory
,其他則使用 DefaultDataBufferFactory
。
DataBuffer
DataBuffer
介面提供與 java.nio.ByteBuffer
類似的操作,但也帶來了一些額外的優勢,其中一些受到了 Netty ByteBuf
的啟發。以下是部分優勢列表
-
具有獨立的讀寫位置,即在讀寫之間切換時不需要呼叫
flip()
。 -
容量按需擴充套件,類似於
java.lang.StringBuilder
。 -
透過
PooledDataBuffer
進行池化緩衝區和引用計數。 -
將緩衝區視為
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
確定給定位元組的索引或最後一個索引。
PooledDataBuffer
正如 ByteBuffer 的 Javadoc 中所解釋的,位元組緩衝區可以是直接的或非直接的。直接緩衝區可能駐留在 Java 堆之外,這消除了原生 I/O 操作中的複製需求。這使得直接緩衝區特別適合透過 socket 接收和傳送資料,但它們的建立和釋放成本也更高,從而產生了池化緩衝區的想法。
PooledDataBuffer
是 DataBuffer
的一個擴充套件,它有助於引用計數,這對於位元組緩衝區池化至關重要。它是如何工作的?當分配一個 PooledDataBuffer
時,引用計數為 1。呼叫 retain()
會增加計數,而呼叫 release()
會減少計數。只要計數大於 0,緩衝區就保證不會被釋放。當計數減少到 0 時,池化緩衝區可以被釋放,這實際上意味著為緩衝區保留的記憶體被返還給記憶體池。
注意,在大多數情況下,最好不要直接操作 PooledDataBuffer
,而是使用 DataBufferUtils
中的便利方法,這些方法僅當 DataBuffer
是 PooledDataBuffer
的例項時才對其應用 release 或 retain 操作。
DataBufferUtils
DataBufferUtils
提供了許多操作資料緩衝區的工具方法
-
將資料緩衝區流連線成一個單獨的緩衝區,如果底層位元組緩衝區 API 支援,可能實現零複製,例如透過組合緩衝區。
-
將
InputStream
或 NIOChannel
轉換為Flux<DataBuffer>
,反之,將Publisher<DataBuffer>
轉換為OutputStream
或 NIOChannel
。 -
如果緩衝區是
PooledDataBuffer
的例項,則釋放或 retainDataBuffer
的方法。 -
從位元組流中跳過或獲取指定位元組數。
編解碼器
org.springframework.core.codec
包提供以下策略介面
-
Encoder
用於將Publisher<T>
編碼為資料緩衝區流。 -
Decoder
用於將Publisher<DataBuffer>
解碼為更高級別的物件流。
spring-core
模組提供了 byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
的編碼器和解碼器實現。spring-web
模組增加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 以及其他編碼器和解碼器。詳見 WebFlux 部分的編解碼器。
使用 DataBuffer
使用資料緩衝區時,必須特別注意確保緩衝區被釋放,因為它們可能是池化的。我們將使用編解碼器來說明這一點,但這些概念更具普遍性。讓我們看看編解碼器內部必須如何管理資料緩衝區。
Decoder
是最後一個讀取輸入資料緩衝區的,在建立更高級別物件之前,因此它必須如下釋放它們
-
如果
Decoder
只是簡單地讀取每個輸入緩衝區並準備立即釋放它,它可以透過DataBufferUtils.release(dataBuffer)
來做到。 -
如果
Decoder
使用Flux
或Mono
運算子,如flatMap
、reduce
等,這些運算子在內部預取和快取資料項,或者使用filter
、skip
等丟棄某些項的運算子,則必須在組合鏈中新增doOnDiscard(DataBuffer.class, DataBufferUtils::release)
,以確保這些緩衝區在被丟棄之前被釋放,即使是由於錯誤或取消訊號導致。 -
如果
Decoder
以任何其他方式持有一個或多個數據緩衝區,它必須確保在完全讀取後釋放它們,或者在快取的資料緩衝區被讀取和釋放之前發生錯誤或取消訊號時釋放它們。
注意,DataBufferUtils#join
提供了一種安全高效的方式將資料緩衝區流聚合成一個單獨的資料緩衝區。類似地,skipUntilByteCount
和 takeUntilByteCount
是供解碼器使用的其他安全方法。
Encoder
分配資料緩衝區供其他人讀取(和釋放)。因此 Encoder
不需要做太多工作。但是,如果在用資料填充緩衝區時發生序列化錯誤,Encoder
必須注意釋放該資料緩衝區。例如
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder
的消費者負責釋放其接收的資料緩衝區。在 WebFlux 應用中,Encoder
的輸出用於寫入 HTTP 伺服器響應或客戶端 HTTP 請求,在這種情況下,釋放資料緩衝區的責任在於寫入伺服器響應或客戶端請求的程式碼。
注意,在 Netty 上執行時,有一些用於排查緩衝區洩露的除錯選項。