使用 RabbitMQ Binder 進行分割槽
RabbitMQ 原生不支援分割槽。
有時,將資料傳送到特定分割槽是有益的——例如,當您希望嚴格按順序處理訊息時,某個特定客戶的所有訊息都應該傳送到同一個分割槽。
RabbitMessageChannelBinder
透過為每個分割槽繫結一個佇列到目標交換器來提供分割槽功能。
以下 Java 和 YAML 示例展示瞭如何配置生產者
生產者
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前面的示例中的配置使用了預設分割槽( 只有當您需要在部署生產者時建立/配置消費者佇列,才需要 |
以下配置配置並建立一個 topic 交換器

以下佇列繫結到該交換器

以下繫結將佇列關聯到交換器

以下 Java 和 YAML 示例繼續前面的示例,展示瞭如何配置消費者
消費者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
RabbitMessageChannelBinder 不支援動態伸縮。每個分割槽必須至少有一個消費者。消費者的 instanceIndex 用於指示消費的是哪個分割槽。像 Cloud Foundry 這樣的平臺每個 instanceIndex 只能有一個例項。 |