流支援

在許多情況下,應用程式資料是從流中獲取的。不建議將流的引用作為訊息負載傳送給消費者。相反,訊息是從輸入流中讀取的資料建立的,並且訊息負載逐一寫入輸出流。

專案需要此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:7.0.0"

從流中讀取

Spring Integration 為流提供了兩個介面卡。ByteStreamReadingMessageSourceCharacterStreamReadingMessageSource 都實現了 MessageSource。透過在 channel-adapter 元素中配置其中一個,可以配置輪詢週期,並且訊息匯流排可以自動檢測和排程它們。位元組流版本需要一個 InputStream,字元流版本需要一個 Reader 作為唯一的建構函式引數。ByteStreamReadingMessageSource 還接受 'bytesPerMessage' 屬性來確定它嘗試讀取到每個 Message 中的位元組數。預設值為 1024。以下示例建立了一個輸入流,該輸入流建立的每個訊息包含 2048 位元組

<bean class="org.springframework.integration.stream.inbound.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.inbound.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

CharacterStreamReadingMessageSource 將讀取器包裝在 BufferedReader 中(如果它還不是)。您可以在第二個建構函式引數中設定緩衝讀取器使用的緩衝區大小。從 5.0 版本開始,第三個建構函式引數(blockToDetectEOF)控制 CharacterStreamReadingMessageSource 的行為。當為 false(預設值)時,receive() 方法檢查讀取器是否 ready(),如果不就返回 null。在這種情況下不會檢測到 EOF(檔案結束)。當為 true 時,receive() 方法會阻塞,直到資料可用或在底層流上檢測到 EOF。當檢測到 EOF 時,會發佈一個 StreamClosedEvent(應用程式事件)。您可以使用實現 ApplicationListener<StreamClosedEvent> 的 bean 來消費此事件。

為了方便 EOF 檢測,輪詢器執行緒會在 receive() 方法中阻塞,直到資料到達或檢測到 EOF。
一旦檢測到 EOF,輪詢器會繼續在每次輪詢時釋出事件。應用程式監聽器可以停止介面卡以防止這種情況。事件在輪詢器執行緒上釋出。停止介面卡會導致執行緒中斷。如果您打算在停止介面卡後執行一些可中斷的任務,您必須在不同的執行緒上執行 stop(),或者為這些下游活動使用不同的執行緒。請注意,傳送到 QueueChannel 是可中斷的,因此,如果您希望從監聽器傳送訊息,請在停止介面卡之前進行。

這有助於“管道”或將資料重定向到 stdin,如下面兩個示例所示

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

這種方法允許應用程式在管道關閉時停止。

提供了四個方便的工廠方法

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

寫入流

對於目標流,您可以使用以下兩種實現之一:ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandler。每個都需要一個建構函式引數(位元組流為 OutputStream,字元流為 Writer),並且每個都提供一個新增可選“bufferSize”的第二個建構函式。由於這兩者最終都實現了 MessageHandler 介面,您可以從 channel-adapter 配置中引用它們,如 通道介面卡 中所述。

<bean class="org.springframework.integration.stream.outbound.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.outbound.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

流名稱空間支援

Spring Integration 定義了一個名稱空間,以減少流相關通道介面卡所需的配置。需要以下模式位置才能使用它

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下程式碼片段顯示了支援的不同配置選項,用於配置入站通道介面卡

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

從 5.0 版本開始,您可以設定 detect-eof 屬性,它會設定 blockToDetectEOF 屬性。有關更多資訊,請參閱 從流中讀取

要配置出站通道介面卡,您也可以使用名稱空間支援。以下示例顯示了出站通道介面卡的不同配置

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>
© . This site is unofficial and not affiliated with VMware.