SFTP 入站通道介面卡

SFTP 入站通道介面卡是一種特殊的監聽器,它連線到伺服器並監聽遠端目錄事件(例如建立新檔案),此時它會啟動檔案傳輸。以下示例展示瞭如何配置 SFTP 入站通道介面卡。

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

上述配置示例展示瞭如何為各種屬性提供值,包括以下內容:

  • local-directory:檔案將被傳輸到的位置。

  • remote-directory:檔案將被傳輸的遠端源目錄。

  • session-factory:對我們之前配置的 bean 的引用。

預設情況下,傳輸的檔案與原始檔案具有相同的名稱。如果您想覆蓋此行為,可以設定 local-filename-generator-expression 屬性,該屬性允許您提供一個 SpEL 表示式來生成本地檔案的名稱。與出站閘道器和介面卡不同,在出站閘道器和介面卡中,SpEL 評估上下文的根物件是 Message,此入站介面卡在評估時還沒有訊息,因為它最終會生成以傳輸檔案作為其 payload 的訊息。因此,SpEL 評估上下文的根物件是遠端檔案的原始名稱(一個 String)。

入站通道介面卡首先將檔案檢索到本地目錄,然後根據輪詢器配置發出每個檔案。從版本 5.0 開始,當需要新的檔案檢索時,您可以限制從 SFTP 伺服器獲取的檔案數量。當目標檔案較大或在具有持久檔案列表過濾器的集群系統中執行時,這可能是有益的,本節稍後將討論。為此,請使用 max-fetch-size。負值(預設值)表示沒有限制,並且檢索所有匹配的檔案。有關更多資訊,請參閱入站通道介面卡:控制遠端檔案獲取。從版本 5.0 開始,您還可以透過設定 scanner 屬性為 inbound-channel-adapter 提供自定義的 DirectoryScanner 實現。

從 Spring Integration 3.0 開始,您可以指定 preserve-timestamp 屬性(預設值為 false)。當為 true 時,本地檔案的修改時間戳被設定為從伺服器檢索到的值。否則,它被設定為當前時間。

從版本 4.2 開始,您可以指定 remote-directory-expression 而不是 remote-directory,這允許您在每次輪詢時動態確定目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"

有時,基於 filename-pattern 屬性指定的簡單模式進行檔案過濾可能不足。在這種情況下,您可以使用 filename-regex 屬性來指定正則表示式,例如 filename-regex=".*\.test$"。如果您需要完全控制,可以使用 filter 屬性來提供對 org.springframework.integration.file.filters.FileListFilter 自定義實現的引用,這是一個用於過濾檔案列表的策略介面。此過濾器決定哪些遠端檔案被檢索。您還可以透過使用 CompositeFileListFilter 將基於模式的過濾器與其他過濾器(例如 AcceptOnceFileListFilter,以避免同步以前已獲取的檔案)結合起來。

AcceptOnceFileListFilter 將其狀態儲存在記憶體中。如果您希望該狀態在系統重新啟動後仍然存在,請考慮使用 SftpPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名儲存在 MetadataStore 策略的例項中(請參閱元資料儲存)。此過濾器根據檔名和遠端修改時間進行匹配。

自版本 4.0 以來,此過濾器需要一個 ConcurrentMetadataStore。當與共享資料儲存(例如使用 RedisMetadataStoreRedis)一起使用時,這允許在多個應用程式或伺服器例項之間共享過濾器鍵。

從版本 5.0 開始,預設情況下,對於 SftpInboundFileSynchronizer,應用帶有記憶體中 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter。此過濾器也與 XML 配置中的 regexpattern 選項以及 Java DSL 中的 SftpInboundChannelAdapterSpec 一起應用。您可以透過使用 CompositeFileListFilter(或 ChainFileListFilter)來處理任何其他用例。

以上討論指的是在檢索檔案之前進行過濾。檔案檢索後,會有一個額外的過濾器應用於檔案系統上的檔案。預設情況下,這是一個 AcceptOnceFileListFilter,如本節所述,它將狀態保留在記憶體中,並且不考慮檔案的修改時間。除非您的應用程式在處理後刪除檔案,否則介面卡預設會在應用程式重新啟動後重新處理磁碟上的檔案。

此外,如果您將 filter 配置為使用 SftpPersistentAcceptOnceFileListFilter 並且遠端檔案時間戳發生更改(導致其被重新獲取),則預設的本地過濾器不允許處理此新檔案。

有關此過濾器及其使用方式的更多資訊,請參閱遠端持久檔案列表過濾器

您可以使用 local-filter 屬性來配置本地檔案系統過濾器的行為。從版本 4.3.8 開始,預設配置了一個 FileSystemPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名和修改時間戳儲存在 MetadataStore 策略的例項中(請參閱元資料儲存),並檢測本地檔案修改時間的變化。預設的 MetadataStore 是一個將狀態儲存在記憶體中的 SimpleMetadataStore

自版本 4.1.5 以來,這些過濾器有一個名為 flushOnUpdate 的新屬性,它導致它們在每次更新時重新整理元資料儲存(如果儲存實現了 Flushable)。

此外,如果您使用分散式 MetadataStore(例如Redis 元資料儲存),您可以擁有同一介面卡或應用程式的多個例項,並確保只有一個例項處理一個檔案。

實際的本地過濾器是一個 ChainFileListFilter,它包含一個模式過濾器,用於防止處理正在下載的檔案(基於 temporary-file-suffix)和提供的過濾器。檔案以此後綴(預設為 .writing)下載,並在傳輸完成後將檔案重新命名為最終名稱,使其對過濾器“可見”。

有關這些屬性的更多詳細資訊,請參閱模式

SFTP 入站通道介面卡是一個輪詢消費者。因此,您必須配置一個輪詢器(可以是全域性預設的,也可以是本地元素)。一旦檔案被傳輸到本地目錄,就會生成一個以 java.io.File 為其 payload 型別併發送到由 channel 屬性標識的通道的訊息。

從版本 6.2 開始,您可以使用 SftpLastModifiedFileListFilter 根據最後修改策略過濾 SFTP 檔案。此過濾器可以配置一個 age 屬性,以便只有早於此值的檔案才能透過過濾器。年齡預設為 60 秒,但您應該選擇一個足夠大的年齡,以避免過早地獲取檔案(例如,由於網路故障)。有關更多資訊,請參閱其 Javadoc。

相反,從版本 6.5 開始,引入了 SftpRecentFileListFilter 以僅接受不早於提供 age 的檔案。

更多關於檔案過濾和大型檔案

有時,剛出現在受監視(遠端)目錄中的檔案不完整。通常,此類檔案以某種臨時副檔名(例如名為 something.txt.writing 的檔案上的 .writing)寫入,然後在寫入過程完成後重新命名。在大多數情況下,開發人員只對完整的檔案感興趣,並希望只過濾這些檔案。為了處理這些情況,您可以使用 filename-patternfilename-regexfilter 屬性提供的過濾支援。如果您需要自定義過濾器實現,您可以透過設定 filter 屬性在介面卡中包含一個引用。以下示例展示瞭如何實現:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

從故障中恢復

您應該瞭解介面卡的架構。檔案同步器獲取檔案,FileReadingMessageSource 為每個同步的檔案發出訊息。如前面討論的,涉及兩個過濾器。filter 屬性(和模式)指遠端(SFTP)檔案列表,以避免獲取已經獲取的檔案。FileReadingMessageSource 使用 local-filter 來確定哪些檔案將作為訊息傳送。

同步器列出遠端檔案並諮詢其過濾器。然後傳輸檔案。如果在檔案傳輸過程中發生 IO 錯誤,則已新增到過濾器的任何檔案都將被刪除,以便它們可以在下一次輪詢時重新獲取。這僅適用於過濾器實現了 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的情況。

如果同步檔案後,在下游流程處理檔案時發生錯誤,則不會自動回滾過濾器,因此預設情況下不會重新處理失敗的檔案。

如果您希望在故障後重新處理此類檔案,您可以使用類似於以下內容的配置來方便地從過濾器中刪除失敗的檔案:

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置適用於任何 ResettableFileListFilter

從版本 5.0 開始,入站通道介面卡可以根據生成的本地檔名在本地構建子目錄。這也可以是一個遠端子路徑。為了能夠根據層次結構支援遞迴讀取本地目錄以進行修改,您現在可以使用基於 Files.walk() 演算法的新 RecursiveDirectoryScanner 向內部 FileReadingMessageSource 提供服務。有關更多資訊,請參閱AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您現在可以透過使用 setUseWatchService() 選項將 AbstractInboundFileSynchronizingMessageSource 切換到基於 WatchServiceDirectoryScanner。它還配置了所有 WatchEventType 例項,以對本地目錄中的任何修改作出反應。前面顯示的回處理示例基於 FileReadingMessageSource.WatchServiceDirectoryScanner 的內建功能,該功能在檔案從本地目錄刪除 (StandardWatchEventKinds.ENTRY_DELETE) 時使用 ResettableFileListFilter.remove()。有關更多資訊,請參閱WatchServiceDirectoryScanner

使用 Java 配置

以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 進行配置

以下 Spring Boot 應用程式展示瞭如何使用 Java DSL 配置入站介面卡的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

處理不完整資料

請參閱處理不完整資料

提供了 SftpSystemMarkerFilePresentFileListFilter 來過濾遠端系統上沒有相應標記檔案的遠端檔案。有關配置資訊,請參閱Javadoc

© . This site is unofficial and not affiliated with VMware.