FTP入站通道介面卡

FTP入站通道介面卡是一種特殊的監聽器,它連線到FTP伺服器並監聽遠端目錄事件(例如,建立了新檔案),並在此時啟動檔案傳輸。以下示例展示瞭如何配置一個inbound-channel-adapter

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如上所示配置,您可以使用inbound-channel-adapter元素配置FTP入站通道介面卡,同時提供各種屬性的值,例如local-directoryfilename-pattern(基於簡單的模式匹配,而非正則表示式),以及對session-factory的引用。

預設情況下,傳輸的檔案與原始檔案同名。如果您想覆蓋此行為,可以設定local-filename-generator-expression屬性,該屬性允許您提供一個SpEL表示式來生成本地檔案的名稱。與出站閘道器和介面卡不同,後者SpEL評估上下文的根物件是一個Message,此入站介面卡在評估時還沒有訊息,因為最終它會使用傳輸的檔案作為payload生成訊息。因此,SpEL評估上下文的根物件是遠端檔案的原始名稱(一個String)。

入站通道介面卡首先檢索本地目錄的File物件,然後根據輪詢器配置發射每個檔案。從版本5.0開始,您現在可以限制在需要檢索新檔案時從FTP伺服器獲取的檔案數量。當目標檔案非常大或在帶有持久化檔案列表過濾器(稍後討論)的集群系統中執行時,這可能非常有益。為此目的使用max-fetch-size。負值(預設值)表示無限制,將檢索所有匹配的檔案。更多資訊請參閱入站通道介面卡:控制遠端檔案獲取。自版本5.0以來,您還可以透過設定scanner屬性為inbound-channel-adapter提供自定義的DirectoryScanner實現。

從Spring Integration 3.0開始,您可以指定preserve-timestamp屬性(預設為false)。當設定為true時,本地檔案的修改時間戳會被設定為從伺服器獲取的值。否則,它會被設定為當前時間。

從版本4.2開始,您可以指定remote-directory-expression代替remote-directory,允許您在每次輪詢時動態確定目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"

從版本4.3開始,您可以省略remote-directoryremote-directory-expression屬性。它們預設為null。在這種情況下,根據FTP協議,客戶端工作目錄將用作預設的遠端目錄。

有時,基於filename-pattern屬性指定的簡單模式的檔案過濾可能不夠。在這種情況下,您可以使用filename-regex屬性來指定正則表示式(例如filename-regex=".*\.test$")。此外,如果您需要完全控制,可以使用filter屬性並提供對o.s.i.file.filters.FileListFilter的任何自定義實現的引用,這是一個用於過濾檔案列表的策略介面。此過濾器確定要檢索哪些遠端檔案。您還可以透過使用CompositeFileListFilter將基於模式的過濾器與其他過濾器(例如AcceptOnceFileListFilter,以避免同步以前已獲取的檔案)組合使用。

AcceptOnceFileListFilter將其狀態儲存在記憶體中。如果您希望狀態在系統重啟後仍然存在,請考慮使用FtpPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名儲存在MetadataStore策略的例項中(參見元資料儲存)。此過濾器根據檔名和遠端修改時間進行匹配。

自版本4.0起,此過濾器需要ConcurrentMetadataStore。當與共享資料儲存(例如帶有RedisMetadataStore的Redis)一起使用時,它允許在多個應用程式或伺服器例項之間共享過濾器鍵。

從版本5.0開始,帶有記憶體中SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter預設應用於FtpInboundFileSynchronizer。在XML配置中,此過濾器也適用於regexpattern選項,以及Java DSL中的FtpInboundChannelAdapterSpec。任何其他用例可以透過CompositeFileListFilter(或ChainFileListFilter)進行管理。

前面的討論是指在檢索檔案之前進行過濾。一旦檔案被檢索到,系統會向檔案系統上的檔案應用一個額外的過濾器。預設情況下,這是一個AcceptOnceFileListFilter,如前所述,它在記憶體中保留狀態,並且不考慮檔案的修改時間。除非您的應用程式在處理後刪除檔案,否則介面卡在應用程式重啟後會預設重新處理磁碟上的檔案。

此外,如果您將過濾器配置為使用FtpPersistentAcceptOnceFileListFilter,並且遠端檔案的時間戳發生變化(導致它被重新獲取),預設的本地過濾器將不允許處理這個新檔案。

有關此過濾器及其使用方法的更多資訊,請參閱遠端持久化檔案列表過濾器

您可以使用local-filter屬性配置本地檔案系統過濾器的行為。從版本4.3.8開始,預設配置了FileSystemPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名和修改時間戳儲存在MetadataStore策略的例項中(參見元資料儲存),並檢測本地檔案修改時間的變化。預設的MetadataStoreSimpleMetadataStore,它將狀態儲存在記憶體中。

自版本4.1.5以來,這些過濾器有一個新屬性(flushOnUpdate),該屬性使得它們在每次更新時重新整理元資料儲存(如果儲存實現了Flushable)。

此外,如果您使用分散式MetadataStore(例如Redis),您可以擁有同一介面卡或應用程式的多個例項,並確保每個檔案只處理一次。

實際的本地過濾器是一個CompositeFileListFilter,它包含提供的過濾器和一個模式過濾器,後者用於防止處理正在下載的檔案(基於temporary-file-suffix)。檔案會以這個字尾(預設為.writing)下載,並在傳輸完成後重新命名為最終名稱,使其對過濾器“可見”。

remote-file-separator屬性允許您配置檔案分隔符,以便在預設的'/'不適用於您的特定環境時使用。

有關這些屬性的更多詳細資訊,請參閱schema

您還應該瞭解,FTP入站通道介面卡是一個輪詢消費者。因此,您必須配置一個輪詢器(透過使用全域性預設配置或本地子元素)。檔案傳輸完成後,將生成一個以java.io.File作為payload的訊息,併發送到由channel屬性標識的通道。

從版本6.2開始,您可以使用FtpLastModifiedFileListFilter基於最後修改策略過濾FTP檔案。可以為該過濾器配置一個age屬性,以便只有比該值舊的檔案才能透過過濾器。預設的age為60秒,但您應該選擇一個足夠大的值,以避免過早地獲取檔案(例如,由於網路故障)。請查閱其Javadoc瞭解更多資訊。

更多關於檔案過濾和不完整檔案

有時剛出現在受監控的(遠端)目錄中的檔案是不完整的。通常,這樣的檔案會以臨時副檔名(例如somefile.txt.writing)寫入,然後在寫入過程完成後重新命名。在大多數情況下,您只對完整的檔案感興趣,並希望只過濾完整的檔案。為了處理這些場景,您可以使用filename-patternfilename-regexfilter屬性提供的過濾支援。以下示例使用了自定義過濾器實現

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

入站FTP介面卡的輪詢器配置說明

入站FTP介面卡的任務包括兩個

  1. 與遠端伺服器通訊,以便將檔案從遠端目錄傳輸到本地目錄。

  2. 對於每個傳輸的檔案,生成一個以該檔案為payload的訊息,並將其傳送到由“channel”屬性標識的通道。這就是為什麼它們被稱為“通道介面卡”,而不是僅僅是“介面卡”。這種介面卡的主要工作是生成要傳送到訊息通道的訊息。本質上,第二個任務優先執行,如果您的本地目錄中已經有一個或多個檔案,它會首先從這些檔案生成訊息。只有在所有本地檔案都被處理完畢後,它才會啟動遠端通訊來檢索更多檔案。

此外,在配置輪詢器上的觸發器時,您應該密切注意max-messages-per-poll屬性。對於所有SourcePollingChannelAdapter例項(包括FTP),其預設值為1。這意味著,只要處理完一個檔案,它就會等待由您的觸發器配置確定的下一次執行時間。如果您在local-directory中有意存放了一個或多個檔案,它會在啟動與遠端FTP伺服器的通訊之前處理這些檔案。此外,如果max-messages-per-poll設定為1(預設值),它會按照觸發器定義的間隔一次只處理一個檔案,本質上工作方式是“一次輪詢 === 一個檔案”。

對於典型的檔案傳輸用例,您最有可能想要相反的行為:在每次輪詢時處理儘可能多的檔案,然後才等待下一次輪詢。如果是這種情況,請將max-messages-per-poll設定為-1。然後,在每次輪詢時,介面卡會嘗試生成儘可能多的訊息。換句話說,它會處理本地目錄中的所有內容,然後連線到遠端目錄,將那裡所有可供本地處理的檔案都傳輸過來。只有這樣,輪詢操作才算完成,並且輪詢器才會等待下一次執行時間。

或者,您可以將max-messages-per-poll的值設定為一個正數,表示每次輪詢從檔案建立的訊息數量的上限。例如,值為10表示在每次輪詢時,它嘗試處理的檔案數量不超過十個。

從故障中恢復

理解介面卡的架構非常重要。有一個檔案同步器負責獲取檔案,還有一個FileReadingMessageSource負責為每個同步的檔案傳送訊息。如前所述,涉及到兩個過濾器。filter屬性(和模式)指代遠端(FTP)檔案列表,以避免獲取已經獲取過的檔案。local-filterFileReadingMessageSource使用,以確定哪些檔案將被作為訊息傳送。

同步器列出遠端檔案並諮詢其過濾器。然後檔案被傳輸。如果在檔案傳輸過程中發生IO錯誤,任何已經新增到過濾器的檔案都會被移除,以便在下一次輪詢時可以重新獲取。這隻適用於過濾器實現了ReversibleFileListFilter(例如AcceptOnceFileListFilter)的情況。

如果在同步檔案後,下游流程處理檔案時發生錯誤,過濾器不會自動回滾,因此預設情況下失敗的檔案不會被重新處理。

如果您希望在發生故障後重新處理這些檔案,可以使用類似於以下的配置來方便地從過濾器中移除失敗的檔案

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置適用於任何ResettableFileListFilter

從版本5.0開始,入站通道介面卡可以在本地構建與生成的本地檔名對應的子目錄。這也可以是遠端子路徑。為了能夠根據層次結構支援遞迴讀取本地目錄的修改,您現在可以為內部的FileReadingMessageSource提供一個新的基於Files.walk()演算法的RecursiveDirectoryScanner。更多資訊請參閱AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您現在可以使用setUseWatchService()選項將AbstractInboundFileSynchronizingMessageSource切換到基於WatchServiceDirectoryScanner。它也配置了所有WatchEventType例項,以對本地目錄中的任何修改作出反應。前面展示的重新處理示例基於FileReadingMessageSource.WatchServiceDirectoryScanner的內建功能,即當檔案從本地目錄中刪除(StandardWatchEventKinds.ENTRY_DELETE)時執行ResettableFileListFilter.remove()。更多資訊請參閱WatchServiceDirectoryScanner

使用Java配置

以下Spring Boot應用程式展示瞭如何使用Java配置配置入站介面卡

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用Java DSL配置

以下Spring Boot應用程式展示瞭如何使用Java DSL配置入站介面卡

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

處理不完整資料

請參閱處理不完整資料

提供了FtpSystemMarkerFilePresentFileListFilter用於過濾遠端系統中沒有對應標記檔案的遠端檔案。有關配置資訊,請參閱Javadoc(並瀏覽父類)。