資料緩衝區和編解碼器
Java NIO 提供了 ByteBuffer,但許多庫在此之上構建了自己的位元組緩衝區 API,特別是對於網路操作,其中重用緩衝區和/或使用直接緩衝區有助於提高效能。例如,Netty 有 ByteBuf 層次結構,Jetty 使用帶有回撥函式的池化位元組緩衝區來釋放,等等。spring-core 模組提供了一組抽象來處理各種位元組緩衝區 API,如下所示:
-
DataBufferFactory抽象了資料緩衝區的建立。 -
DataBuffer表示一個位元組緩衝區,它可以是 池化的。 -
DataBufferUtils提供資料緩衝區的實用方法。 -
編解碼器 (Codecs) 將資料緩衝區流解碼或編碼為更高級別的物件。
DataBufferFactory
DataBufferFactory 用於以以下兩種方式之一建立資料緩衝區:
-
分配一個新的資料緩衝區,如果容量已知,可以選擇預先指定容量,這更高效,儘管
DataBuffer的實現可以按需增長和縮小。 -
包裝一個現有的
byte[]或java.nio.ByteBuffer,它用DataBuffer實現裝飾給定的資料,並且不涉及分配。
請注意,WebFlux 應用程式不直接建立 DataBufferFactory,而是透過 ServerHttpResponse 或客戶端的 ClientHttpRequest 訪問它。工廠的型別取決於底層客戶端或伺服器,例如,Reactor Netty 的 NettyDataBufferFactory,其他情況的 DefaultDataBufferFactory。
DataBuffer
DataBuffer 介面提供與 java.nio.ByteBuffer 類似的操作,但也帶來了一些額外的優點,其中一些受到了 Netty ByteBuf 的啟發。以下是部分優點列表:
-
讀寫具有獨立的 position,即無需呼叫
flip()來在讀寫之間切換。 -
容量按需擴充套件,如同
java.lang.StringBuilder。 -
透過
PooledDataBuffer實現的池化緩衝區和引用計數。 -
將緩衝區視為
java.nio.ByteBuffer、InputStream或OutputStream。 -
確定給定位元組的索引或最後一個索引。
PooledDataBuffer
如 ByteBuffer 的 Javadoc 中所述,位元組緩衝區可以是直接的或非直接的。直接緩衝區可以駐留在 Java 堆之外,這消除了本地 I/O 操作的複製需求。這使得直接緩衝區對於透過套接字接收和傳送資料特別有用,但它們的建立和釋放成本也更高,這導致了池化緩衝區的想法。
PooledDataBuffer 是 DataBuffer 的一個擴充套件,有助於引用計數,這對於位元組緩衝區池化至關重要。它是如何工作的?當分配一個 PooledDataBuffer 時,引用計數為 1。呼叫 retain() 會增加計數,而呼叫 release() 會減少計數。只要計數大於 0,就保證緩衝區不會被釋放。當計數減少到 0 時,池化緩衝區可以被釋放,這實際上可能意味著為緩衝區保留的記憶體會返回到記憶體池。
請注意,在大多數情況下,最好使用 DataBufferUtils 中的便利方法,它們僅當 DataBuffer 是 PooledDataBuffer 的例項時才對其應用釋放或保留操作,而不是直接操作 PooledDataBuffer。
DataBufferUtils
DataBufferUtils 提供了許多實用方法來操作資料緩衝區:
-
將資料緩衝區流合併到單個緩衝區中,可能進行零複製,例如,如果底層位元組緩衝區 API 支援,則透過複合緩衝區實現。
-
將
InputStream或 NIOChannel轉換為Flux<DataBuffer>,反之,將Publisher<DataBuffer>轉換為OutputStream或 NIOChannel。 -
如果緩衝區是
PooledDataBuffer的例項,則提供釋放或保留DataBuffer的方法。 -
從位元組流中跳過或獲取直到達到特定位元組數。
編解碼器 (Codecs)
org.springframework.core.codec 包提供以下策略介面:
-
Encoder用於將Publisher<T>編碼為資料緩衝區流。 -
Decoder用於將Publisher<DataBuffer>解碼為更高級別的物件流。
spring-core 模組提供了 byte[]、ByteBuffer、DataBuffer、Resource 和 String 的編碼器和解碼器實現。spring-web 模組增加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他編碼器和解碼器。請參閱 WebFlux 部分的 編解碼器。
使用 DataBuffer
處理資料緩衝區時,必須特別小心以確保緩衝區被釋放,因為它們可能是 池化的。我們將使用編解碼器來說明其工作原理,但這些概念更普遍適用。讓我們看看編解碼器在內部必須如何管理資料緩衝區。
一個 Decoder 是在建立更高級別物件之前最後讀取輸入資料緩衝區的,因此它必須按如下方式釋放它們:
-
如果
Decoder只是簡單地讀取每個輸入緩衝區並準備立即釋放它,它可以透過DataBufferUtils.release(dataBuffer)來完成。 -
如果
Decoder使用Flux或Mono運算子,例如flatMap、reduce等在內部預取和快取資料項,或者使用filter、skip等運算子遺漏項,那麼必須將doOnDiscard(DataBuffer.class, DataBufferUtils::release)新增到組合鏈中,以確保此類緩衝區在被丟棄之前被釋放,這可能也是錯誤或取消訊號的結果。 -
如果
Decoder以任何其他方式持有一個或多個數據緩衝區,它必須確保在完全讀取時,或在快取的資料緩衝區被讀取和釋放之前發生錯誤或取消訊號時,它們會被釋放。
請注意,DataBufferUtils#join 提供了一種安全高效的方法,可以將資料緩衝區流聚合到單個數據緩衝區中。同樣,skipUntilByteCount 和 takeUntilByteCount 是解碼器可以使用的其他安全方法。
一個 Encoder 分配資料緩衝區供其他元件讀取(並釋放)。因此 Encoder 沒有太多事情要做。然而,如果填充緩衝區資料時發生序列化錯誤,Encoder 必須注意釋放資料緩衝區。例如:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder 的消費者負責釋放它接收到的資料緩衝區。在 WebFlux 應用程式中,Encoder 的輸出用於寫入 HTTP 伺服器響應或客戶端 HTTP 請求,在這種情況下,釋放資料緩衝區是寫入伺服器響應或客戶端請求的程式碼的責任。
請注意,在 Netty 上執行時,有針對 緩衝區洩漏故障排除 的除錯選項。