FTP 流式入站通道介面卡
4.3 版引入了流式入站通道介面卡。此介面卡生成一個載荷型別為 InputStream 的訊息,允許在不寫入本地檔案系統的情況下獲取檔案。由於會話保持開放,因此消費應用程式負責在檔案被消費後關閉會話。會話在 closeableResource 頭部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件(例如 FileSplitter 和 StreamTransformer)會自動關閉會話。有關這些元件的更多資訊,請參閱 檔案分割器 和 流式轉換器。以下示例展示瞭如何配置 inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
filename-pattern、filename-regex、filter 或 filter-expression 中只允許使用一個。
從 5.0 版開始,預設情況下,FtpStreamingMessageSource 介面卡會基於記憶體中的 SimpleMetadataStore 使用 FtpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也應用於檔名模式(或正則表示式)。如果需要允許重複,可以使用 AcceptAllFileListFilter。任何其他用例可以透過 CompositeFileListFilter(或 ChainFileListFilter)處理。Java 配置(本文件稍後)展示了一種在處理後刪除遠端檔案以避免重複的技術。 |
有關 FtpPersistentAcceptOnceFileListFilter 及其使用方式的更多資訊,請參閱 遠端持久檔案列表過濾器。
當需要獲取檔案時,使用 max-fetch-size 屬性來限制每次輪詢時獲取的檔案數量。在叢集環境中執行時,將其設定為 1 並使用持久化過濾器。有關更多資訊,請參閱 入站通道介面卡:控制遠端檔案獲取。
介面卡將遠端目錄和檔名分別放在 FileHeaders.REMOTE_DIRECTORY 和 FileHeaders.REMOTE_FILE 頭部中。從 5.0 版開始,FileHeaders.REMOTE_FILE_INFO 頭部提供額外的遠端檔案資訊(預設以 JSON 表示)。如果將 FtpStreamingMessageSource 上的 fileInfoJson 屬性設定為 false,則該頭部包含一個 FtpFileInfo 物件。可以透過使用 FtpFileInfo.getFileInfo() 方法訪問底層 Apache Net 庫提供的 FTPFile 物件。當您使用 XML 配置時,fileInfoJson 屬性不可用,但您可以透過將 FtpStreamingMessageSource 注入到您的一個配置類中來設定它。另請參閱 遠端檔案資訊。
從 5.1 版開始,comparator 的泛型型別為 FTPFile。以前是 AbstractFileInfo<FTPFile>。這是因為排序現在在處理的早期(在過濾和應用 maxFetch 之前)執行。
使用 Java 配置進行配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
請注意,在此示例中,轉換器下游的訊息處理器有一個 advice,它在處理後刪除遠端檔案。