入站通道介面卡:輪詢多個伺服器和目錄
從 5.0.7 版本開始,提供了 RotatingServerAdvice;當將其配置為輪詢器建議時,入站介面卡可以輪詢多個伺服器和目錄。配置建議並將其新增到輪詢器的建議鏈中,這與正常操作無異。一個 DelegatingSessionFactory 用於選擇伺服器,更多資訊請參閱 委託會話工廠。該建議的配置由 RotationPolicy.KeyDirectory 物件的列表組成。
示例
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建議將輪詢伺服器 one 上的目錄 foo,直到沒有新檔案存在,然後移至伺服器 two 上的目錄 bar,接著是目錄 baz,依此類推。
此預設行為可透過 fair 建構函式引數進行修改
公平
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在這種情況下,無論前一次輪詢是否返回檔案,建議都將移至下一個伺服器/目錄。
或者,您可以提供自己的 RotationPolicy 以根據需要重新配置訊息源
policy
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
custom
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression 屬性(同步器上的 localFilenameGeneratorExpression)現在可以包含 #remoteDirectory 變數。這使得從不同目錄檢索的檔案可以下載到本地相似的目錄中
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
使用此建議時,請勿在輪詢器上配置 TaskExecutor;更多資訊請參閱 訊息源的條件輪詢器。 |
此外,當並非所有獲取的檔案都在單個輪詢週期內處理,但 SessionFactory 可能會輪換到不同的會話時,還可以使用便捷的 AbstractRemoteFileStreamingMessageSource.clearFetchedCache() API。