示例應用程式
Spring AMQP 示例專案包含兩個示例應用程式。第一個是簡單的“Hello World”示例,演示了同步和非同步訊息接收。它為理解基本元件提供了極好的起點。第二個示例基於股票交易用例,演示了在實際應用程式中常見的互動型別。在本章中,我們將對每個示例進行快速演練,以便您能夠專注於最重要的元件。這兩個示例都是基於 Maven 的,因此您應該能夠將它們直接匯入到任何支援 Maven 的 IDE(例如 SpringSource Tool Suite)。
“Hello World”示例
“Hello World”示例演示了同步和非同步訊息接收。您可以將 spring-rabbit-helloworld 示例匯入 IDE,然後按照下面的討論進行操作。
同步示例
在 src/main/java 目錄中,導航到 org.springframework.amqp.helloworld 包。開啟 HelloWorldConfiguration 類,注意它在類級別包含 @Configuration 註解,並且在方法級別包含一些 @Bean 註解。這是 Spring 基於 Java 的配置示例。您可以在此處瞭解更多資訊。
以下清單顯示了連線工廠的建立方式
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中還包含一個 RabbitAdmin 例項,它預設查詢任何型別為 exchange、queue 或 binding 的 bean,然後將它們宣告到代理上。事實上,在 HelloWorldConfiguration 中生成的 helloWorldQueue bean 就是一個示例,因為它是一個 Queue 例項。
以下清單顯示了 helloWorldQueue bean 定義
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顧 rabbitTemplate bean 配置,您可以看到它將 helloWorldQueue 的名稱設定為其 queue 屬性(用於接收訊息)和 routingKey 屬性(用於傳送訊息)。
現在我們已經探索了配置,接下來可以檢視實際使用這些元件的程式碼。首先,開啟同一個包中的 Producer 類。它包含一個 main() 方法,其中建立了 Spring ApplicationContext。
以下清單顯示了 main 方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate bean 被檢索並用於傳送 Message。由於客戶端程式碼應儘可能依賴介面,因此型別為 AmqpTemplate 而不是 RabbitTemplate。即使在 HelloWorldConfiguration 中建立的 bean 是 RabbitTemplate 的例項,依賴介面意味著此程式碼更具可移植性(您可以獨立於程式碼更改配置)。由於呼叫了 convertAndSend() 方法,模板將其委託給 MessageConverter 例項。在此示例中,它使用預設的 SimpleMessageConverter,但可以為 HelloWorldConfiguration 中定義的 rabbitTemplate bean 提供不同的實現。
現在開啟 Consumer 類。它實際上共享相同的配置基類,這意味著它共享 rabbitTemplate bean。這就是為什麼我們將該模板配置為同時具有 routingKey(用於傳送)和 queue(用於接收)。正如我們在 AmqpTemplate 中描述的那樣,您可以將“routingKey”引數傳遞給傳送方法,將“queue”引數傳遞給接收方法。Consumer 程式碼基本上是 Producer 的映象,呼叫 receiveAndConvert() 而不是 convertAndSend()。
以下清單顯示了 Consumer 的 main 方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您執行 Producer,然後執行 Consumer,您應該會在控制檯輸出中看到 Received: Hello World。
非同步示例
同步示例 演練了同步的 Hello World 示例。本節描述了一個稍微高階但功能更強大的選項。只需進行少量修改,Hello World 示例就可以提供非同步接收的示例,也稱為訊息驅動 POJO。事實上,有一個子包正好提供了這一點:org.springframework.amqp.samples.helloworld.async。
同樣,我們從傳送端開始。開啟 ProducerConfiguration 類,注意它建立了一個 connectionFactory 和一個 rabbitTemplate bean。這次,由於配置專門用於訊息傳送端,我們甚至不需要任何佇列定義,並且 RabbitTemplate 只設置了“routingKey”屬性。回想一下,訊息是傳送到交換機而不是直接傳送到佇列的。AMQP 預設交換機是一個沒有名稱的直連交換機。所有佇列都以其名稱作為路由鍵繫結到該預設交換機。這就是為什麼我們在這裡只需要提供路由鍵。
以下清單顯示了 rabbitTemplate 定義
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由於此示例演示了非同步訊息接收,因此生產端設計為持續傳送訊息(如果它是像同步版本那樣的每次執行一條訊息的模型,那麼它實際上是訊息驅動的消費者就不那麼明顯了)。負責持續傳送訊息的元件被定義為 ProducerConfiguration 中的一個內部類。它被配置為每三秒執行一次。
以下清單顯示了該元件
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要理解所有細節,因為真正的重點應該放在接收端(我們接下來會介紹)。但是,如果您還不熟悉 Spring 任務排程支援,可以在這裡瞭解更多。簡而言之,ProducerConfiguration 中的 postProcessor bean 將任務註冊到排程程式。
現在我們可以轉向接收端。為了強調訊息驅動的 POJO 行為,我們從對訊息作出反應的元件開始。該類名為 HelloWorldHandler,在以下清單中顯示
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
該類是一個 POJO。它不擴充套件任何基類,不實現任何介面,甚至不包含任何匯入。它透過 Spring AMQP MessageListenerAdapter 被“適配”到 MessageListener 介面。然後,您可以將該介面卡配置在 SimpleMessageListenerContainer 上。對於此示例,容器是在 ConsumerConfiguration 類中建立的。您可以在那裡看到被介面卡包裝的 POJO。
以下清單顯示了 listenerContainer 的定義方式
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer 是一個 Spring 生命週期元件,預設情況下會自動啟動。如果您檢視 Consumer 類,可以看到它的 main() 方法只包含一行引導程式碼來建立 ApplicationContext。Producer 的 main() 方法也是一行引導程式碼,因為其方法帶有 @Scheduled 註解的元件也會自動啟動。您可以按任意順序啟動 Producer 和 Consumer,您應該會看到每隔三秒傳送和接收訊息。
股票交易
股票交易示例演示了比 Hello World 示例 更高階的訊息傳遞場景。然而,配置非常相似,只是稍微複雜一些。由於我們已經詳細介紹了 Hello World 配置,這裡我們重點介紹此示例的不同之處。有一個伺服器將市場資料(股票報價)推送到主題交換機。然後,客戶端可以透過將佇列與路由模式(例如 app.stock.quotes.nasdaq.*)繫結來訂閱市場資料來源。此演示的另一個主要功能是客戶端發起並由伺服器處理的請求-回覆“股票交易”互動。這涉及客戶端在訂單請求訊息本身中傳送的私有 replyTo 佇列。
伺服器的核心配置位於 org.springframework.amqp.rabbit.stocks.config.server 包中的 RabbitServerConfiguration 類中。它擴充套件了 AbstractStockAppRabbitConfiguration。這是定義伺服器和客戶端共用資源的地方,包括市場資料主題交換機(其名稱為 'app.stock.marketdata')和伺服器公開用於股票交易的佇列(其名稱為 'app.stock.request')。在該通用配置檔案中,您還可以看到 JacksonJsonMessageConverter 配置在 RabbitTemplate 上。
伺服器特定配置包括兩件事。首先,它在 RabbitTemplate 上配置了市場資料交換機,這樣它就不需要每次傳送 Message 時都提供該交換機名稱。它透過基配置類中定義的抽象回撥方法完成此操作。以下清單顯示了該方法
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,聲明瞭股票請求佇列。在這種情況下,它不需要任何顯式繫結,因為它以其自身的名稱作為路由鍵繫結到預設的無名交換機。如前所述,AMQP 規範定義了這種行為。以下清單顯示了 stockRequestQueue bean 的定義
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
現在您已經看到了伺服器 AMQP 資源的配置,請導航到 src/test/java 目錄下的 org.springframework.amqp.rabbit.stocks 包。在那裡,您可以看到提供了 main() 方法的實際 Server 類。它基於 server-bootstrap.xml 配置檔案建立了一個 ApplicationContext。在那裡,您可以看到釋出虛擬市場資料的計劃任務。該配置依賴於 Spring 的 task 名稱空間支援。引導配置檔案還匯入了其他一些檔案。最有趣的是 server-messaging.xml,它直接位於 src/main/resources 下。在那裡,您可以看到負責處理股票交易請求的 messageListenerContainer bean。最後,檢視 server-handlers.xml 中定義的 serverHandler bean(也位於 'src/main/resources')。該 bean 是 ServerHandler 類的一個例項,是訊息驅動 POJO 的一個很好的例子,它也可以傳送回覆訊息。請注意,它本身不與框架或任何 AMQP 概念耦合。它接受一個 TradeRequest 並返回一個 TradeResponse。以下清單顯示了 handleMessage 方法的定義
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
現在我們已經瞭解了伺服器最重要的配置和程式碼,接下來我們轉向客戶端。最好的起點可能是 org.springframework.amqp.rabbit.stocks.config.client 包中的 RabbitClientConfiguration。注意它聲明瞭兩個沒有提供顯式名稱的佇列。以下清單顯示了兩個佇列的 bean 定義
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
這些是私有佇列,會自動生成唯一的名稱。第一個生成的佇列由客戶端用於繫結到伺服器公開的市場資料交換。回想一下,在 AMQP 中,消費者與佇列互動,而生產者與交換機互動。將佇列“繫結”到交換機就是告訴代理將訊息從給定交換機傳遞(或路由)到佇列。由於市場資料交換機是主題交換機,因此繫結可以用路由模式表示。RabbitClientConfiguration 使用 Binding 物件執行此操作,並且該物件是使用 BindingBuilder 流式 API 生成的。以下清單顯示了 Binding
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
請注意,實際值已外部化到屬性檔案(src/main/resources 下的 client.properties)中,並且我們使用 Spring 的 @Value 註解來注入該值。這通常是一個好主意。否則,該值將被硬編碼到類中,並且在不重新編譯的情況下無法修改。在這種情況下,在更改用於繫結的路由模式時,執行多個客戶端版本要容易得多。我們現在可以嘗試一下。
首先執行 org.springframework.amqp.rabbit.stocks.Server,然後執行 org.springframework.amqp.rabbit.stocks.Client。您應該會看到 NASDAQ 股票的虛擬報價,因為 client.properties 中 'stocks.quote.pattern' 鍵的當前值為 'app.stock.quotes.nasdaq.'。現在,在保持現有 Server 和 Client 執行的情況下,將該屬性值更改為 'app.stock.quotes.nyse.' 並啟動第二個 Client 例項。您應該會看到第一個客戶端仍然接收 NASDAQ 報價,而第二個客戶端接收 NYSE 報價。您可以更改模式以獲取所有股票,甚至單個股票程式碼。
我們探索的最後一個功能是從客戶端角度的請求-回覆互動。回想一下,我們已經看到了接受 TradeRequest 物件並返回 TradeResponse 物件的 ServerHandler。客戶端對應程式碼是 org.springframework.amqp.rabbit.stocks.gateway 包中的 RabbitStockServiceGateway。它委託給 RabbitTemplate 來發送訊息。以下清單顯示了 send 方法
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
請注意,在傳送訊息之前,它設定了 replyTo 地址。它提供了由 traderJoeQueue bean 定義(前面顯示)生成的佇列。以下清單顯示了 StockServiceGateway 類本身的 @Bean 定義
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再執行伺服器和客戶端,請立即啟動它們。嘗試傳送格式為 '100 TCKR' 的請求。在模擬請求“處理”的短暫人工延遲之後,您應該會看到客戶端上出現確認訊息。