示例應用
Spring AMQP Samples 專案包含兩個示例應用。第一個是簡單的“Hello World”示例,演示了同步和非同步訊息接收。它為理解基本元件提供了一個很好的起點。第二個示例基於一個股票交易用例,用於演示現實世界應用中常見的互動型別。在本章中,我們將快速介紹每個示例,以便您可以重點關注最重要的元件。這兩個示例都是基於 Maven 的,因此您可以直接將它們匯入到任何支援 Maven 的 IDE(例如 SpringSource Tool Suite)。
“Hello World”示例
“Hello World”示例演示了同步和非同步訊息接收。您可以將 spring-rabbit-helloworld
示例匯入到 IDE 中,然後按照下面的討論進行操作。
同步示例
在 src/main/java
目錄中,導航到 org.springframework.amqp.helloworld
包。開啟 HelloWorldConfiguration
類,注意它在類級別包含 @Configuration
註解,並在方法級別包含一些 @Bean
註解。這是 Spring 基於 Java 配置的一個示例。您可以在此處閱讀更多相關內容。
以下列表顯示了連線工廠是如何建立的
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
該配置還包含一個 RabbitAdmin
例項,它預設會查詢型別為 exchange、queue 或 binding 的任何 bean,然後將它們宣告在 broker 上。實際上,在 HelloWorldConfiguration
中生成的 helloWorldQueue
bean 就是一個例子,因為它是一個 Queue
例項。
以下列表顯示了 helloWorldQueue
bean 的定義
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顧 rabbitTemplate
bean 的配置,您可以看到它將 helloWorldQueue
的名稱設定為其 queue
屬性(用於接收訊息)和 routingKey
屬性(用於傳送訊息)。
現在我們已經探索了配置,接下來可以看看實際使用這些元件的程式碼。首先,開啟同一個包中的 Producer
類。它包含一個 main()
方法,在該方法中建立 Spring ApplicationContext
。
以下列表顯示了 main
方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,檢索並使用 AmqpTemplate
bean 傳送訊息。由於客戶端程式碼應儘可能依賴介面,因此型別是 AmqpTemplate
而不是 RabbitTemplate
。即使在 HelloWorldConfiguration
中建立的 bean 是 RabbitTemplate
的例項,依賴介面意味著此程式碼更具可移植性(您可以在不更改程式碼的情況下獨立更改配置)。由於呼叫了 convertAndSend()
方法,模板會委託給其 MessageConverter
例項。在本例中,它使用了預設的 SimpleMessageConverter
,但可以向 HelloWorldConfiguration
中定義的 rabbitTemplate
bean 提供不同的實現。
現在開啟 Consumer
類。它實際上共享相同的配置基類,這意味著它共享 rabbitTemplate
bean。這就是為什麼我們將該模板同時配置了 routingKey
(用於傳送)和 queue
(用於接收)。正如我們在 AmqpTemplate 中所述,您也可以將“routingKey”引數傳遞給傳送方法,將“queue”引數傳遞給接收方法。Consumer
程式碼基本上是 Producer
的映象,呼叫 receiveAndConvert()
而不是 convertAndSend()
。
以下列表顯示了 Consumer
的 main 方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您執行 Producer
,然後執行 Consumer
,您應該在控制檯輸出中看到 Received: Hello World
。
非同步示例
同步示例 部分詳細介紹了同步的 Hello World 示例。本節將介紹一個稍微更高階但功能強大得多的選項。透過一些修改,Hello World 示例可以提供一個非同步接收的示例,也稱為訊息驅動的 POJO。事實上,有一個子包正好提供了這個功能:org.springframework.amqp.samples.helloworld.async
。
同樣,我們從傳送端開始。開啟 ProducerConfiguration
類,注意它建立了一個 connectionFactory
和一個 rabbitTemplate
bean。這次,由於配置專用於訊息傳送端,我們甚至不需要任何佇列定義,並且 RabbitTemplate
只設置了 routingKey
屬性。回想一下,訊息是傳送到交換機而不是直接傳送到佇列。AMQP 預設交換機是一個沒有名稱的 direct exchange。所有佇列都繫結到該預設交換機,並以其名稱作為路由鍵。這就是為什麼我們只需要在此處提供路由鍵的原因。
以下列表顯示了 rabbitTemplate
的定義
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由於此示例演示了非同步訊息接收,傳送端被設計為持續傳送訊息(如果它像同步版本那樣是每執行一次傳送一條訊息的模型,那麼它實際上是一個訊息驅動消費者的事實就不會那麼明顯了)。負責持續傳送訊息的元件在 ProducerConfiguration
中定義為一個內部類。它被配置為每三秒執行一次。
以下列表顯示了該元件
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要理解所有細節,因為真正的重點應該放在接收端(我們將在下一節介紹)。但是,如果您還不熟悉 Spring 任務排程支援,可以在此處瞭解更多資訊。簡而言之,ProducerConfiguration
中的 postProcessor
bean 會將任務註冊到排程器中。
現在我們可以轉向接收端。為了強調訊息驅動的 POJO 行為,我們從響應訊息的元件開始。該類名為 HelloWorldHandler
,如下所示
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
該類是一個 POJO。它不繼承任何基類,不實現任何介面,甚至不包含任何 import。它透過 Spring AMQP 的 MessageListenerAdapter
被“適配”到 MessageListener
介面。然後,您可以在 SimpleMessageListenerContainer
上配置該介面卡。對於此示例,容器是在 ConsumerConfiguration
類中建立的。您可以在那裡看到 POJO 被封裝在介面卡中。
以下列表顯示了 listenerContainer
的定義方式
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer
是一個 Spring 生命週期元件,預設情況下會自動啟動。如果您檢視 Consumer
類,可以看到其 main()
方法只不過是一行用於建立 ApplicationContext
的引導程式碼。Producer
的 main()
方法也是一行引導程式碼,因為其方法帶有 @Scheduled
註解的元件也會自動啟動。您可以按任意順序啟動 Producer
和 Consumer
,並且應該會看到每三秒傳送和接收一次訊息。
股票交易
股票交易示例演示了比Hello World 示例 更高階的訊息場景。然而,配置非常相似,只是稍微複雜一些。由於我們已經詳細介紹了 Hello World 配置,這裡我們將重點關注此示例的不同之處。有一個伺服器將市場資料(股票報價)推送到一個 topic exchange。然後,客戶端可以透過繫結一個帶有路由模式(例如,app.stock.quotes.nasdaq.*
)的佇列來訂閱市場資料饋送。此演示的另一個主要特性是由客戶端發起並由伺服器處理的請求-回覆“股票交易”互動。這涉及客戶端在訂單請求訊息本身中傳送的一個私有 replyTo
佇列。
伺服器的核心配置位於 org.springframework.amqp.rabbit.stocks.config.server
包中的 RabbitServerConfiguration
類中。它繼承了 AbstractStockAppRabbitConfiguration
。這裡定義了伺服器和客戶端共有的資源,包括市場資料 topic exchange(其名稱為 app.stock.marketdata
)以及伺服器為股票交易暴露的佇列(其名稱為 app.stock.request
)。在該通用配置檔案中,您還可以看到在 RabbitTemplate
上配置了一個 Jackson2JsonMessageConverter
。
伺服器特定的配置包含兩部分。首先,它在 RabbitTemplate
上配置市場資料交換機,以便在每次呼叫傳送訊息時無需提供該交換機名稱。它在基配置類中定義的一個抽象回撥方法中完成此操作。以下列表顯示了該方法
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,聲明瞭股票請求佇列。在這種情況下,它不需要任何顯式繫結,因為它繫結到預設的無名交換機,並以其自己的名稱作為路由鍵。如前所述,AMQP 規範定義了此行為。以下列表顯示了 stockRequestQueue
bean 的定義
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
現在您已經看到了伺服器 AMQP 資源的配置,請導航到 src/test/java
目錄下的 org.springframework.amqp.rabbit.stocks
包。在那裡,您可以看到實際的 Server
類,它提供了一個 main()
方法。它基於 server-bootstrap.xml
配置檔案建立 ApplicationContext
。在那裡,您可以看到釋出模擬市場資料的計劃任務。該配置依賴於 Spring 的 task
名稱空間支援。引導配置檔案還匯入了其他一些檔案。最有趣的是 server-messaging.xml
,它直接位於 src/main/resources
下。在那裡,您可以看到負責處理股票交易請求的 messageListenerContainer
bean。最後,看看在 server-handlers.xml
(也在 src/main/resources
中)中定義的 serverHandler
bean。該 bean 是 ServerHandler
類的一個例項,是訊息驅動 POJO 的一個很好的例子,它也可以傳送回覆訊息。請注意,它本身並不與框架或任何 AMQP 概念耦合。它接受一個 TradeRequest
並返回一個 TradeResponse
。以下列表顯示了 handleMessage
方法的定義
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
現在我們已經看到了伺服器最重要的配置和程式碼,可以轉向客戶端了。最好的起點可能是在 org.springframework.amqp.rabbit.stocks.config.client
包中的 RabbitClientConfiguration
。注意它聲明瞭兩個佇列,但沒有提供顯式名稱。以下列表顯示了這兩個佇列的 bean 定義
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
這些是私有佇列,會自動生成唯一的名稱。第一個生成的佇列被客戶端用來繫結到伺服器暴露的市場資料交換機。回想一下,在 AMQP 中,消費者與佇列互動,而生產者與交換機互動。佇列與交換機的“繫結”就是告訴 broker 將來自給定交換機的訊息傳送(或路由)到佇列。由於市場資料交換機是一個 topic exchange,繫結可以用路由模式表示。RabbitClientConfiguration
使用 Binding
物件完成了這一點,並且該物件是使用 BindingBuilder
流式 API 生成的。以下列表顯示了 Binding
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意,實際值已外部化到屬性檔案(src/main/resources
下的 client.properties
)中,並且我們使用 Spring 的 @Value
註解來注入該值。這通常是一個好主意。否則,該值將被硬編碼在類中,並且在不重新編譯的情況下無法修改。在這種情況下,更容易在更改用於繫結的路由模式時執行客戶端的多個版本。我們現在可以嘗試一下。
首先執行 org.springframework.amqp.rabbit.stocks.Server
,然後執行 org.springframework.amqp.rabbit.stocks.Client
。您應該會看到 NASDAQ 股票的模擬報價,因為 client.properties
中與 stocks.quote.pattern
鍵關聯的當前值是 app.stock.quotes.nasdaq.*
。現在,在保持現有 Server
和 Client
執行的同時,將該屬性值更改為 app.stock.quotes.nyse.*
並啟動第二個 Client
例項。您應該看到第一個客戶端仍在接收 NASDAQ 報價,而第二個客戶端接收 NYSE 報價。您也可以更改模式以獲取所有股票,甚至是單個股票程式碼。
我們將探討的最後一個特性是客戶端角度的請求-回覆互動。回想一下,我們已經看到了接受 TradeRequest
物件並返回 TradeResponse
物件的 ServerHandler
。客戶端對應的程式碼是 org.springframework.amqp.rabbit.stocks.gateway
包中的 RabbitStockServiceGateway
。它委託給 RabbitTemplate
來發送訊息。以下列表顯示了 send
方法
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
注意,在傳送訊息之前,它設定了 replyTo
地址。它提供了由 traderJoeQueue
bean 定義(前面已介紹)生成的佇列。以下列表顯示了 StockServiceGateway
類本身的 @Bean
定義
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您沒有執行伺服器和客戶端,請立即啟動它們。嘗試傳送格式為“100 TCKR”的請求。在短暫的模擬請求“處理”的人工延遲之後,您應該在客戶端看到確認訊息。