接收批次訊息
使用 RabbitMQ 繫結器時,消費者繫結處理兩種型別的批次
生產者建立的批次
通常,如果生產者繫結啟用了 batch-enabled=true(請參閱 Rabbit 生產者屬性),或者訊息由 BatchingRabbitTemplate 建立,則批次的元素將作為對偵聽器方法的單獨呼叫返回。從 3.0 版本開始,如果將 spring.cloud.stream.bindings.<name>.consumer.batch-mode 設定為 true,則任何此類批次都可以作為 List<?> 呈現在偵聽器方法中。
消費者端批處理
從 3.1 版本開始,消費者可以配置為將多個入站訊息組裝成一個批次,該批次作為已轉換有效負載的 List<?> 呈現給應用程式。以下簡單應用程式演示瞭如何使用此技術
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
批次中的訊息數量由 batch-size 和 receive-timeout 屬性指定;如果在沒有新訊息的情況下 receive-timeout 超時,則會交付一個“短”批次。
消費者端批處理僅支援 container-type=simple(預設值)。 |
如果您希望檢查消費者端批處理訊息的頭部,您應該消費 Message<List<?>>;頭部是 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS 頭部中的 List<Map<String, Object>>,其中每個有效負載元素的頭部位於相應的索引中。同樣,這裡有一個簡單的示例
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2