釋出者確認

有兩種機制可以獲取釋出訊息的結果;在每種情況下,連線工廠必須將 publisherConfirmType 設定為 ConfirmType.CORRELATED。 “傳統”機制是將 confirmAckChannel 設定為訊息通道的 bean 名稱,您可以從該通道非同步檢索確認;否定的確認(negative acks)將傳送到錯誤通道(如果啟用)——請參閱錯誤通道

在版本 3.1 中新增的首選機制是使用關聯資料頭(correlation data header)並透過其 Future<Confirm> 屬性等待結果。這對於批次監聽器特別有用,因為您可以在等待結果之前傳送多條訊息。要使用此技術,請將 useConfirmHeader 屬性設定為 true。以下簡單的應用程式是使用此技術的示例

spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true

spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Autowired
	private StreamBridge bridge;

	@Bean
	Consumer<List<String>> input() {
		return list -> {
			List<MyCorrelationData> results = new ArrayList<>();
			list.forEach(str -> {
				log.info("Received: " + str);
				MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
				results.add(corr);
				this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
						.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
						.build());
			});
			results.forEach(correlation -> {
				try {
					Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
					log.info(confirm + " for " + correlation.getPayload());
					if (correlation.getReturnedMessage() != null) {
						log.error("Message for " + correlation.getPayload() + " was returned ");

						// throw some exception to invoke binder retry/error handling

					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new IllegalStateException(e);
				}
				catch (ExecutionException | TimeoutException e) {
					throw new IllegalStateException(e);
				}
			});
		};
	}


	@Bean
	public ApplicationRunner runner(BatchingRabbitTemplate template) {
		return args -> IntStream.range(0, 10).forEach(i ->
				template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
	}

	@Bean
	public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
		BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
		return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
	}

}

class MyCorrelationData extends CorrelationData {

	private final String payload;

	MyCorrelationData(String id, String payload) {
		super(id);
		this.payload = payload;
	}

	public String getPayload() {
		return this.payload;
	}

}

如您所見,我們傳送每條訊息,然後等待發布結果。如果訊息無法路由,則在 future 完成之前,關聯資料(correlation data)會填充返回的訊息。

必須為關聯資料(correlation data)提供唯一的 id,以便框架可以執行關聯。

您不能同時設定 useConfirmHeaderconfirmAckChannel,但當 useConfirmHeader 為 true 時,您仍然可以在錯誤通道中接收返回的訊息,不過使用關聯頭(correlation header)會更方便。