檔案分割器

FileSplitter 在 4.1.2 版本中新增,其名稱空間支援在 4.2 版本中新增。FileSplitter 根據 BufferedReader.readLine() 將文字檔案分割成單獨的行。預設情況下,分割器使用 Iterator 在從檔案中讀取行時逐行發出。將 iterator 屬性設定為 false 會使其在發出訊息之前將所有行讀入記憶體。這樣做的一種用例是,您可能想在傳送任何包含行的訊息之前檢測檔案上的 I/O 錯誤。然而,這隻適用於相對較短的檔案。

入站載荷可以是 FileString(檔案路徑)、InputStreamReader。其他載荷型別將原樣發出。

以下列表顯示了配置 FileSplitter 的可能方式

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 分割器的 bean 名稱。
2 設定為 true(預設值)以使用迭代器,或者設定為 false 以在傳送行之前將檔案載入到記憶體中。
3 設定為 true 以在檔案資料之前和之後發出檔案開始和檔案結束標記訊息。標記是載荷為 FileSplitter.FileMarker 的訊息(其 mark 屬性包含 STARTEND 值)。當您在下游流中按順序處理檔案,並且某些行被過濾掉時,可以使用標記。它們使下游處理能夠知道檔案何時已完全處理完畢。此外,一個包含 STARTENDfile_marker 頭會被新增到這些訊息中。END 標記包含行計數。如果檔案為空,則只發出 STARTEND 標記,其 lineCount0。預設值為 false。當設定為 true 時,apply-sequence 預設為 false。另請參閱 markers-json(下一個屬性)。
4 markers 為 true 時,將其設定為 true 可將 FileMarker 物件轉換為 JSON 字串。(底層使用 SimpleJsonSerializer)。
5 設定為 false 可停用在訊息中包含 sequenceSizesequenceNumber 頭。預設值為 true,除非 markerstrue。當為 truemarkerstrue 時,標記會包含在序列中。當為 trueiteratortrue 時,sequenceSize 頭會被設定為 0,因為大小未知。
6 設定為 true 可在檔案沒有行時丟擲 RequiresReplyException。預設值為 false
7 設定讀取文字資料到 String 載荷時使用的字元集名稱。預設值是平臺字元集。
8 第一行作為頭資訊包含在為剩餘行發出的訊息中的頭名稱。自 5.0 版本起。
9 設定用於向分割器傳送訊息的輸入通道。
10 設定訊息傳送到的輸出通道。
11 設定傳送超時。僅當 output-channel 可能阻塞時適用,例如滿的 QueueChannel
12 設定為 false 可停用在上下文重新整理時自動啟動分割器。預設值為 true
13 如果 input-channel<publish-subscribe-channel/>,則設定此端點的順序。
14 設定分割器的啟動階段(當 auto-startuptrue 時使用)。

FileSplitter 還可以將任何基於文字的 InputStream 分割成行。從 4.3 版本開始,當與 FTP 或 SFTP 流式入站通道介面卡,或使用 stream 選項檢索檔案的 FTP 或 SFTP 出站閘道器結合使用時,當檔案完全被消費時,分割器會自動關閉支援該流的會話。有關這些功能的更多資訊,請參見 FTP 流式入站通道介面卡SFTP 流式入站通道介面卡,以及 FTP 出站閘道器SFTP 出站閘道器

使用 Java 配置時,還有一個額外的建構函式可用,如下例所示

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson 為 true 時,標記表示為 JSON 字串(使用 SimpleJsonSerializer)。

5.0 版本引入了 firstLineAsHeader 選項,用於指定內容的第一行是頭(例如 CSV 檔案中的列名)。傳遞給此屬性的引數是頭名稱,在該名稱下,第一行作為頭資訊包含在為剩餘行發出的訊息中。此行不包含在序列頭中(如果 applySequence 為 true),也不包含在與 FileMarker.END 關聯的 lineCount 中。注意:從 5.5 版本開始,lineCount 也作為 FileHeaders.LINE_COUNT 包含在 FileMarker.END 訊息的頭中,因為 FileMarker 可以被序列化為 JSON。如果檔案只包含頭行,則該檔案被視為空檔案,因此在分割期間只發出 FileMarker 例項(如果啟用了標記 — 否則,不發出任何訊息)。預設情況下(如果未設定頭名稱),第一行被視為資料,併成為第一個發出的訊息的載荷。

如果您需要更復雜的邏輯來從檔案內容中提取頭(不是第一行,不是行的全部內容,不是某個特定的頭等等),可以考慮在 FileSplitter 之前使用 頭增強器。請注意,已移至頭的行可能會在下游被過濾掉,而不參與正常的內容處理。

分割檔案後的冪等下游處理

apply-sequence 為 true 時,分割器會在 SEQUENCE_NUMBER 頭中新增行號(當 markers 為 true 時,標記被計為行)。行號可以與 冪等接收器 一起使用,以避免在重新啟動後重新處理行。

例如

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}