FTP 流式入站通道介面卡

版本 4.3 引入了流式入站通道介面卡。此介面卡生成的Payload型別為 InputStream 的訊息,允許在不寫入本地檔案系統的情況下獲取檔案。由於會話保持開啟狀態,消費應用程式負責在檔案被消費後關閉會話。會話在 closeableResource 訊息頭 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件,例如 FileSplitterStreamTransformer,會自動關閉會話。有關這些元件的更多資訊,請參閱 檔案分割器流轉換器。以下示例展示瞭如何配置 inbound-streaming-channel-adapter

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

只能指定 filename-pattern, filename-regex, filterfilter-expression 中的一個。

從版本 5.0 開始,預設情況下,FtpStreamingMessageSource 介面卡使用基於記憶體 SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也適用於檔名模式(或正則表示式)。如果需要允許重複,可以使用 AcceptAllFileListFilter。任何其他用例可以透過 CompositeFileListFilter(或 ChainFileListFilter)處理。Java 配置(文件後面部分)展示了一種在處理後刪除遠端檔案以避免重複的技術。

有關 FtpPersistentAcceptOnceFileListFilter 及其使用方式的更多資訊,請參閱 遠端持久檔案列表過濾器

使用 max-fetch-size 屬性來限制每次輪詢時(需要獲取檔案時)獲取的檔案數量。在叢集環境中執行時,將其設定為 1 並使用持久過濾器。有關更多資訊,請參閱 入站通道介面卡:控制遠端檔案獲取

介面卡分別將遠端目錄和檔名放在 FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE 訊息頭中。從版本 5.0 開始,FileHeaders.REMOTE_FILE_INFO 訊息頭提供了額外的遠端檔案資訊(預設以 JSON 格式表示)。如果將 FtpStreamingMessageSource 上的 fileInfoJson 屬性設定為 false,則訊息頭包含一個 FtpFileInfo 物件。可以透過底層 Apache Net 庫提供的 FTPFile 物件,使用 FtpFileInfo.getFileInfo() 方法進行訪問。在使用 XML 配置時,fileInfoJson 屬性不可用,但可以透過將 FtpStreamingMessageSource 注入到配置類中來設定它。另請參閱 遠端檔案資訊

從版本 5.1 開始,comparator 的泛型型別是 FTPFile。之前是 AbstractFileInfo<FTPFile>。這是因為排序現在在處理過程中的更早階段執行,即在過濾和應用 maxFetch 之前。

使用 Java 配置進行配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置來配置入站介面卡的示例

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

注意,在此示例中,轉換器下游的訊息處理器包含一個 advice,用於在處理後刪除遠端檔案。