接收批次訊息

對於 RabbitMQ Binder,消費者繫結處理的批次訊息有兩種型別

生產者建立的批次訊息

通常,如果生產者繫結設定了 batch-enabled=true(參見 Rabbit 生產者屬性),或者訊息是由 BatchingRabbitTemplate 建立的,則批次訊息中的元素會作為對監聽器方法的單獨呼叫返回。從 3.0 版本開始,如果將 spring.cloud.stream.bindings.<name>.consumer.batch-mode 設定為 true,則任何此類批次訊息都可以作為 List<?> 呈現給監聽器方法。

消費者端批次處理

從 3.1 版本開始,可以將消費者配置為將多個入站訊息組裝成批次訊息,然後作為已轉換負載的 List<?> 呈現給應用程式。下面的簡單應用演示瞭如何使用此技術

spring.cloud.stream.bindings.input-in-0.group=someGroup

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<List<Thing>> input() {
		return list -> {
			System.out.println("Received " + list.size());
			list.forEach(thing -> {
				System.out.println(thing);

				// ...

			});
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getField() {
			return this.field;
		}

		public void setField(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1]
Thing [field=value2]

批次訊息中的訊息數量由 batch-sizereceive-timeout 屬性指定;如果 receive-timeout 超時且沒有新訊息到達,則會交付一個“短”批次訊息。

消費者端批次處理僅支援 container-type=simple(預設值)。

如果您希望檢查消費者端批次訊息的頭部,您應該消費 Message<List<?>>;頭部是 List<Map<String, Object>>,位於頭部 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS 中,其中每個負載元素的頭部位於相應的索引位置。再次,這裡有一個簡單的例子

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<Message<List<Thing>>> input() {
		return msg -> {
			List<Thing> things = msg.getPayload();
			System.out.println("Received " + things.size());
			List<Map<String, Object>> headers =
					(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
			for (int i = 0; i < things.size(); i++) {
				System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));

				// ...

			}
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue1");
				return msg;
			});
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue2");
				return msg;
			});
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getfield() {
			return this.field;
		}

		public void setfield(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2