SFTP 流式入站通道介面卡

4.3 版本引入了流式入站通道介面卡。此介面卡生成負載型別為 InputStream 的訊息,允許您在不寫入本地檔案系統的情況下抓取檔案。由於 session 保持開啟狀態,因此使用方應用程式負責在檔案消費後關閉 session。session 會在 closeableResource 頭部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件,如 FileSplitterStreamTransformer,會自動關閉 session。有關這些元件的更多資訊,請參見檔案分割器流轉換器。以下示例展示瞭如何配置 SFTP 流式入站通道介面卡。

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

您只能使用 filename-patternfilename-regexfilterfilter-expression 中的一個。

從 5.0 版本開始,SftpStreamingMessageSource 介面卡預設使用基於記憶體 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也與檔名模式(或正則表示式)一起應用。如果需要允許重複,可以使用 AcceptAllFileListFilter。您可以使用 CompositeFileListFilter(或 ChainFileListFilter)來處理任何其他用例。稍後展示的 Java 配置展示了一種在處理後刪除遠端檔案以避免重複的技術。

有關 SftpPersistentAcceptOnceFileListFilter 的更多資訊及其使用方法,請參見遠端持久化檔案列表過濾器

您可以使用 max-fetch-size 屬性來限制每次輪詢時需要抓取的檔案數量。在叢集環境中執行時,將其設定為 1 並使用持久化過濾器。有關更多資訊,請參見入站通道介面卡:控制遠端檔案抓取

介面卡會將遠端目錄和檔名分別放入頭部(FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE)。從 5.0 版本開始,FileHeaders.REMOTE_FILE_INFO 頭部提供了額外的遠端檔案資訊(JSON 格式)。如果您將 SftpStreamingMessageSource 上的 fileInfoJson 屬性設定為 false,則頭部包含一個 SftpFileInfo 物件。您可以透過使用 SftpFileInfo.getFileInfo() 方法訪問底層 SftpClient 提供的 SftpClient.DirEntry 物件。在使用 XML 配置時,fileInfoJson 屬性不可用,但您可以透過將 SftpStreamingMessageSource 注入到您的配置類中來設定它。另請參見遠端檔案資訊

使用 Java 配置進行配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

注意,在此示例中,轉換器下游的訊息處理器有一個 `advice`,用於在處理後刪除遠端檔案。