流支援
在許多情況下,應用程式資料是從流中獲取的。不建議將流的引用作為訊息載荷傳送給消費者。相反,訊息是從輸入流讀取的資料建立的,訊息載荷則逐個寫入輸出流。
您需要在專案中包含此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.4.4"
從流中讀取
Spring Integration 提供了兩種流介面卡。`ByteStreamReadingMessageSource` 和 `CharacterStreamReadingMessageSource` 都實現了 `MessageSource` 介面。透過在 channel-adapter 元素中配置其中之一,可以配置輪詢週期,並且訊息匯流排可以自動檢測並排程它們。位元組流版本需要一個 `InputStream` 作為唯一的建構函式引數,字元流版本需要一個 `Reader` 作為唯一的建構函式引數。`ByteStreamReadingMessageSource` 還接受 'bytesPerMessage' 屬性來確定它嘗試讀取到每個 `Message` 中的位元組數。預設值為 `1024`。以下示例建立了一個輸入流,該輸入流建立的訊息每個包含 2048 位元組。
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
CharacterStreamReadingMessageSource 會將讀取器包裝在 BufferedReader 中(如果它本身不是的話)。您可以在第二個建構函式引數中設定緩衝讀取器使用的緩衝區大小。從 5.0 版本開始,第三個建構函式引數(`blockToDetectEOF`)控制 CharacterStreamReadingMessageSource 的行為。當設定為 false(預設值)時,`receive()` 方法會檢查讀取器是否 ready(),如果不是則返回 null。在這種情況下不會檢測到 EOF(檔案末尾)。當設定為 true 時,`receive()` 方法會阻塞,直到資料可用或在底層流上檢測到 EOF。當檢測到 EOF 時,會發佈一個 `StreamClosedEvent`(應用程式事件)。您可以使用實現 ApplicationListener<StreamClosedEvent> 的 bean 來消費此事件。
為了便於 EOF 檢測,輪詢執行緒會阻塞在 `receive()` 方法中,直到資料到達或檢測到 EOF。 |
一旦檢測到 EOF,輪詢器會在每次輪詢時繼續釋出事件。應用程式監聽器可以停止介面卡以阻止此行為。事件在輪詢執行緒上釋出。停止介面卡會導致執行緒中斷。如果您打算在停止介面卡後執行一些可中斷的任務,您必須要麼在不同的執行緒上執行 `stop()`,要麼為下游活動使用不同的執行緒。請注意,傳送到 QueueChannel 是可中斷的,因此,如果您希望從監聽器傳送訊息,請在停止介面卡之前進行。 |
這有助於“管道化”或將資料重定向到 `stdin`,如下面兩個示例所示
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
這種方法使得當管道關閉時應用程式能夠停止。
提供了四種方便的工廠方法
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
寫入流
對於目標流,您可以使用 `ByteStreamWritingMessageHandler` 或 `CharacterStreamWritingMessageHandler` 這兩種實現之一。每種實現都需要一個建構函式引數(位元組流為 `OutputStream`,字元流為 `Writer`),並且每種實現都提供了第二個建構函式,用於新增可選的 'bufferSize'。由於它們最終都實現了 `MessageHandler` 介面,您可以在 `channel-adapter` 配置中引用它們,如通道介面卡中所述。
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
流名稱空間支援
Spring Integration 定義了一個名稱空間,以減少流相關通道介面卡所需的配置。使用它需要以下 schema 位置
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下程式碼片段顯示了支援配置入站通道介面卡的不同配置選項
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
從 5.0 版本開始,您可以設定 `detect-eof` 屬性,該屬性設定 `blockToDetectEOF` 屬性。更多資訊請參見從流中讀取。
要配置出站通道介面卡,您也可以使用名稱空間支援。以下示例展示了出站通道介面卡的不同配置
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>