FTP 入站通道介面卡
FTP入站通道介面卡是一個特殊的監聽器,它連線到FTP伺服器並監聽遠端目錄事件(例如,建立新檔案),此時它會啟動檔案傳輸。以下示例展示瞭如何配置inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如上述配置所示,您可以使用inbound-channel-adapter元素配置FTP入站通道介面卡,同時為各種屬性提供值,例如local-directory、filename-pattern(基於簡單的模式匹配,而不是正則表示式),以及對session-factory的引用。
預設情況下,傳輸的檔案與原始檔案具有相同的名稱。如果您想覆蓋此行為,可以設定local-filename-generator-expression屬性,它允許您提供一個SpEL表示式來生成本地檔案的名稱。與出站閘道器和介面卡不同,後者SpEL評估上下文的根物件是Message,此入站介面卡在評估時還沒有訊息,因為它最終會生成以傳輸檔案作為其負載的訊息。因此,SpEL評估上下文的根物件是遠端檔案的原始名稱(一個String)。
入站通道介面卡首先為本地目錄檢索File物件,然後根據輪詢器配置發出每個檔案。從版本5.0開始,當需要檢索新檔案時,現在可以限制從FTP伺服器獲取的檔案數量。當目標檔案非常大或在具有持久檔案列表過濾器的集群系統中執行時,這可能很有用,稍後將討論。為此,請使用max-fetch-size。負值(預設值)表示沒有限制,並且檢索所有匹配的檔案。有關更多資訊,請參閱入站通道介面卡:控制遠端檔案獲取。從版本5.0開始,您還可以透過設定scanner屬性向inbound-channel-adapter提供自定義的DirectoryScanner實現。
從Spring Integration 3.0開始,您可以指定preserve-timestamp屬性(其預設值為false)。當為true時,本地檔案的修改時間戳設定為從伺服器檢索到的值。否則,它設定為當前時間。
從版本4.2開始,您可以指定remote-directory-expression而不是remote-directory,這允許您在每次輪詢時動態確定目錄——例如,remote-directory-expression="@myBean.determineRemoteDir()"。
從版本4.3開始,您可以省略remote-directory和remote-directory-expression屬性。它們預設為null。在這種情況下,根據FTP協議,客戶端工作目錄用作預設遠端目錄。
有時,基於filename-pattern屬性指定的簡單模式進行檔案過濾可能不足。在這種情況下,您可以使用filename-regex屬性指定正則表示式(例如filename-regex=".*\.test$")。此外,如果您需要完全控制,可以使用filter屬性並提供對o.s.i.file.filters.FileListFilter的任何自定義實現的引用,這是一個用於過濾檔案列表的策略介面。此過濾器決定哪些遠端檔案將被檢索。您還可以透過使用CompositeFileListFilter將基於模式的過濾器與其他過濾器(例如AcceptOnceFileListFilter以避免同步已獲取的檔案)結合起來。
AcceptOnceFileListFilter將其狀態儲存在記憶體中。如果您希望狀態在系統重啟後仍然存在,請考慮改用FtpPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名儲存在MetadataStore策略的例項中(參見元資料儲存)。此過濾器根據檔名和遠端修改時間進行匹配。
自版本4.0以來,此過濾器需要ConcurrentMetadataStore。當與共享資料儲存(例如使用RedisMetadataStore的Redis)一起使用時,它允許在多個應用程式或伺服器例項之間共享過濾鍵。
從版本5.0開始,預設情況下,FtpPersistentAcceptOnceFileListFilter與記憶體中的SimpleMetadataStore一起應用於FtpInboundFileSynchronizer。此過濾器也應用於XML配置中的regex或pattern選項,以及Java DSL中的FtpInboundChannelAdapterSpec。任何其他用例都可以透過CompositeFileListFilter(或ChainFileListFilter)進行管理。
前面的討論指的是在檢索檔案之前進行過濾。一旦檔案被檢索,還會對檔案系統上的檔案應用附加過濾器。預設情況下,這是一個AcceptOnceFileListFilter,如前所述,它將狀態保留在記憶體中,並且不考慮檔案的修改時間。除非您的應用程式在處理後刪除檔案,否則預設情況下,介面卡將在應用程式重啟後重新處理磁碟上的檔案。
此外,如果將filter配置為使用FtpPersistentAcceptOnceFileListFilter並且遠端檔案時間戳發生變化(導致其被重新獲取),則預設的本地過濾器不允許處理此新檔案。
有關此過濾器及其使用方式的更多資訊,請參閱遠端持久檔案列表過濾器。
您可以使用local-filter屬性配置本地檔案系統過濾器的行為。從版本4.3.8開始,預設配置了FileSystemPersistentAcceptOnceFileListFilter。此過濾器將接受的檔名和修改時間戳儲存在MetadataStore策略的例項中(參見元資料儲存),並檢測本地檔案修改時間的變化。預設的MetadataStore是SimpleMetadataStore,它將狀態儲存在記憶體中。
自版本4.1.5以來,這些過濾器有一個新屬性(flushOnUpdate),它使它們在每次更新時重新整理元資料儲存(如果儲存實現了Flushable)。
此外,如果您使用分散式MetadataStore(例如Redis),您可以擁有相同介面卡或應用程式的多個例項,並確保每個檔案只處理一次。 |
實際的本地過濾器是一個ChainFileListFilter,它包含一個模式過濾器,用於防止處理正在下載的檔案(基於temporary-file-suffix)和提供的過濾器。檔案將以下綴下載(預設為.writing),並在傳輸完成後將檔案重新命名為其最終名稱,使其對過濾器“可見”。
remote-file-separator屬性允許您配置檔案分隔符,以防預設的'/'不適用於您的特定環境。
有關這些屬性的更多詳細資訊,請參閱模式。
您還應該瞭解FTP入站通道介面卡是一個輪詢消費者。因此,您必須配置一個輪詢器(透過使用全域性預設值或本地子元素)。一旦檔案傳輸完成,將生成一個以java.io.File作為其負載的訊息,並將其傳送到由channel屬性標識的通道。
從版本6.2開始,您可以使用FtpLastModifiedFileListFilter根據上次修改策略過濾FTP檔案。此過濾器可以用age屬性進行配置,以便只有比此值舊的檔案才透過過濾器。age預設為60秒,但您應該選擇一個足夠大的age,以避免過早地拾取檔案(例如,由於網路故障)。有關更多資訊,請參閱其Javadocs。
相比之下,從版本6.5開始,引入了FtpRecentFileListFilter以僅接受不早於給定age的檔案。
更多關於檔案過濾和不完整檔案
有時,剛剛出現在受監控(遠端)目錄中的檔案不完整。通常,此類檔案會以臨時副檔名(例如somefile.txt.writing)寫入,並在寫入過程完成後重新命名。在大多數情況下,您只對完整檔案感興趣,並希望只過濾完整檔案。要處理這些場景,您可以使用filename-pattern、filename-regex和filter屬性提供的過濾支援。以下示例使用自定義過濾器實現
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站FTP介面卡的輪詢器配置說明
入站FTP介面卡的工作包括兩個任務
-
與遠端伺服器通訊,以便將檔案從遠端目錄傳輸到本地目錄。
-
對於每個傳輸的檔案,生成一個以該檔案作為負載的訊息,並將其傳送到由“channel”屬性標識的通道。這就是為什麼它們被稱為“通道介面卡”而不是僅僅是“介面卡”。這種介面卡的主要工作是生成訊息以傳送到訊息通道。本質上,第二個任務優先,這樣,如果您的本地目錄已經有一個或多個檔案,它會首先從這些檔案生成訊息。只有在所有本地檔案都已處理後,它才會啟動遠端通訊以檢索更多檔案。
此外,在配置輪詢器上的觸發器時,您應該密切關注max-messages-per-poll屬性。對於所有SourcePollingChannelAdapter例項(包括FTP),其預設值為1。這意味著,一旦處理了一個檔案,它就會等待由您的觸發器配置確定的下一個執行時間。如果您碰巧在local-directory中有一個或多個檔案,它將在啟動與遠端FTP伺服器的通訊之前處理這些檔案。此外,如果max-messages-per-poll設定為1(預設值),它將一次只處理一個檔案,間隔由您的觸發器定義,本質上是“一次輪詢 === 一個檔案”。
對於典型的檔案傳輸用例,您很可能希望獲得相反的行為:在每次輪詢時處理所有檔案,然後才等待下一次輪詢。如果是這種情況,請將max-messages-per-poll設定為-1。然後,在每次輪詢時,介面卡會嘗試生成儘可能多的訊息。換句話說,它會處理本地目錄中的所有內容,然後連線到遠端目錄,將那裡所有可供本地處理的內容傳輸過來。只有此時,輪詢操作才被視為完成,輪詢器才會等待下一次執行時間。
您可以選擇將“max-messages-per-poll”值設定為一個正值,該值表示每次輪詢從檔案建立訊息的上限。例如,值為10表示在每次輪詢時,它會嘗試處理不超過十個檔案。
從故障中恢復
瞭解介面卡的架構很重要。有一個檔案同步器用於獲取檔案,還有一個FileReadingMessageSource用於為每個同步檔案發出訊息。如前所述,涉及兩個過濾器。filter屬性(和模式)指的是遠端(FTP)檔案列表,以避免獲取已獲取的檔案。local-filter由FileReadingMessageSource使用,以確定哪些檔案將作為訊息傳送。
同步器列出遠端檔案並諮詢其過濾器。然後傳輸檔案。如果在檔案傳輸過程中發生IO錯誤,任何已新增到過濾器的檔案都將被刪除,以便它們可以在下一次輪詢時重新獲取。這僅適用於過濾器實現ReversibleFileListFilter(例如AcceptOnceFileListFilter)的情況。
如果在同步檔案後,下游流處理檔案時發生錯誤,則不會自動回滾過濾器,因此預設情況下不會重新處理失敗的檔案。
如果您希望在故障後重新處理此類檔案,可以使用類似於以下配置來方便地從過濾器中刪除失敗的檔案
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置適用於任何ResettableFileListFilter。
從版本5.0開始,入站通道介面卡可以在本地構建與生成的本地檔名相對應的子目錄。這也可以是遠端子路徑。為了能夠根據層次結構支援遞迴地讀取本地目錄進行修改,您現在可以為內部FileReadingMessageSource提供一個基於Files.walk()演算法的新RecursiveDirectoryScanner。有關更多資訊,請參見AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您現在可以透過使用setUseWatchService()選項將AbstractInboundFileSynchronizingMessageSource切換到基於WatchService的DirectoryScanner。它還配置了所有WatchEventType例項,以響應本地目錄中的任何修改。前面顯示的處理示例基於FileReadingMessageSource.WatchServiceDirectoryScanner的內建功能,以便在檔案從本地目錄刪除(StandardWatchEventKinds.ENTRY_DELETE)時執行ResettableFileListFilter.remove()。有關更多資訊,請參見WatchServiceDirectoryScanner。
使用 Java 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java DSL 配置入站介面卡的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}