SFTP 流式入站通道介面卡
版本 4.3 引入了流式入站通道介面卡。此介面卡生成一個載荷型別為 InputStream 的訊息,允許您在不寫入本地檔案系統的情況下獲取檔案。由於會話保持開啟狀態,因此消費應用程式負責在檔案被消費後關閉會話。會話在 closeableResource 頭部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件(例如 FileSplitter 和 StreamTransformer)會自動關閉會話。有關這些元件的更多資訊,請參閱檔案分割器和流轉換器。以下示例展示瞭如何配置 SFTP 流式入站通道介面卡
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用 filename-pattern、filename-regex、filter 或 filter-expression 中的一個。
從版本 5.0 開始,預設情況下,SftpStreamingMessageSource 介面卡透過使用基於記憶體 SimpleMetadataStore 的 SftpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此過濾器也會與檔名模式(或正則表示式)一起應用。如果需要允許重複,可以使用 AcceptAllFileListFilter。您可以透過使用 CompositeFileListFilter(或 ChainFileListFilter)來處理任何其他用例。後面展示的 Java 配置展示了一種在處理後刪除遠端檔案以避免重複的技術。 |
有關 SftpPersistentAcceptOnceFileListFilter 及其使用方式的更多資訊,請參閱遠端持久檔案列表過濾器。
您可以使用 max-fetch-size 屬性來限制每次輪詢時(如果需要獲取檔案)獲取的檔案數量。在叢集環境中執行時,將其設定為 1 並使用持久過濾器。有關更多資訊,請參閱入站通道介面卡:控制遠端檔案獲取。
介面卡將遠端目錄和檔名放在頭部(分別為 FileHeaders.REMOTE_DIRECTORY 和 FileHeaders.REMOTE_FILE)。從版本 5.0 開始,FileHeaders.REMOTE_FILE_INFO 頭部提供額外的遠端檔案資訊(採用 JSON 格式)。如果您將 SftpStreamingMessageSource 上的 fileInfoJson 屬性設定為 false,則頭部包含一個 SftpFileInfo 物件。您可以使用 SftpFileInfo.getFileInfo() 方法訪問底層 SftpClient 提供的 SftpClient.DirEntry 物件。當您使用 XML 配置時,fileInfoJson 屬性不可用,但您可以透過將 SftpStreamingMessageSource 注入到您的配置類之一中來設定它。另請參閱遠端檔案資訊。
使用 Java 配置
以下 Spring Boot 應用程式展示瞭如何使用 Java 配置入站介面卡的示例
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
請注意,在此示例中,轉換器下游的訊息處理器有一個 advice,它在處理後刪除遠端檔案。