訊息轉換器

AmqpTemplate 還定義了幾個用於傳送和接收訊息的方法,這些方法委託給 MessageConverterMessageConverter 為每個方向提供一個方法:一個用於轉換**到** Message,另一個用於轉換**從** Message。注意,在轉換為 Message 時,除了物件外,還可以提供屬性。object 引數通常對應於 Message 體。以下清單顯示了 MessageConverter 介面定義:

public interface MessageConverter {

    Message toMessage(Object object, MessageProperties messageProperties)
            throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;

}

AmqpTemplate 上相關的訊息傳送方法比我們之前討論的方法更簡單,因為它們不需要 Message 例項。相反,MessageConverter 負責透過將提供的物件轉換為 Message 體的位元組陣列,然後新增任何提供的 MessageProperties 來“建立”每個 Message。以下清單顯示了各種方法的定義:

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有兩個方法:一個接受佇列名稱,另一個依賴於模板的“queue”屬性已設定。以下清單顯示了這兩個方法的定義:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;
非同步消費者中提到的 MessageListenerAdapter 也使用 MessageConverter

SimpleMessageConverter

MessageConverter 策略的預設實現稱為 SimpleMessageConverter。如果您沒有明確配置替代方案,RabbitTemplate 例項將使用此轉換器。它處理基於文字的內容、序列化的 Java 物件和位元組陣列。

Message 轉換

如果輸入 Message 的內容型別以 "text" 開頭(例如,"text/plain"),它還會檢查 content-encoding 屬性,以確定將 Message 體位元組陣列轉換為 Java String 時使用的字元集。如果輸入 Message 上沒有設定 content-encoding 屬性,則預設使用 UTF-8 字元集。如果您需要覆蓋此預設設定,可以配置一個 SimpleMessageConverter 例項,設定其 defaultCharset 屬性,並將其注入到 RabbitTemplate 例項中。

如果輸入 Message 的 content-type 屬性值設定為 "application/x-java-serialized-object",SimpleMessageConverter 會嘗試將位元組陣列反序列化(重構)為 Java 物件。雖然這對於簡單的原型設計可能很有用,但我們不建議依賴 Java 序列化,因為它會導致生產者和消費者之間緊密耦合。當然,這也排除了在任何一方使用非 Java 系統。鑑於 AMQP 是一個線級協議,因這些限制而失去大部分優勢將是不幸的。在接下來的兩節中,我們將探討一些無需依賴 Java 序列化即可傳遞豐富領域物件內容的替代方案。

對於所有其他內容型別,SimpleMessageConverter 直接返回 Message 體內容作為位元組陣列。

請參閱Java 反序列化以獲取重要資訊。

轉換為 Message

當從任意 Java Object 轉換為 Message 時,SimpleMessageConverter 同樣處理位元組陣列、字串和可序列化例項。它將每個轉換為位元組(對於位元組陣列,無需轉換),並相應地設定 content-type 屬性。如果要轉換的 Object 與這些型別之一不匹配,則 Message 體為 null。

SerializerMessageConverter

此轉換器類似於 SimpleMessageConverter,不同之處在於它可以配置其他 Spring Framework 的 SerializerDeserializer 實現,用於 application/x-java-serialized-object 轉換。

請參閱Java 反序列化以獲取重要資訊。

Jackson2JsonMessageConverter

本節介紹如何使用 Jackson2JsonMessageConverterMessage 之間進行轉換。它包含以下部分:

轉換為 Message

正如上一節所述,通常不建議依賴 Java 序列化。一種更靈活且可跨不同語言和平臺移植的常見替代方案是 JSON(JavaScript Object Notation)。該轉換器可以在任何 RabbitTemplate 例項上配置,以覆蓋其對 SimpleMessageConverter 預設的用法。Jackson2JsonMessageConverter 使用 com.fasterxml.jackson 2.x 庫。以下示例配置了 Jackson2JsonMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
            <!-- if necessary, override the DefaultClassMapper -->
            <property name="classMapper" ref="customClassMapper"/>
        </bean>
    </property>
</bean>

如上所示,Jackson2JsonMessageConverter 預設使用 DefaultClassMapper。型別資訊被新增到(並從)MessageProperties 中檢索。如果入站訊息在 MessageProperties 中不包含型別資訊,但您知道預期的型別,則可以使用 defaultType 屬性配置靜態型別,如下例所示:

<bean id="jsonConverterWithDefaultType"
      class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="thing1.PurchaseOrder"/>
        </bean>
    </property>
</bean>

此外,您可以提供從 TypeId header 中的值到自定義對映。以下示例顯示瞭如何進行操作:

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
    DefaultClassMapper classMapper = new DefaultClassMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

現在,如果傳送系統將 header 設定為 thing1,轉換器將建立一個 Thing1 物件,依此類推。有關從非 Spring 應用轉換訊息的完整討論,請參閱從非 Spring 應用接收 JSON示例應用。

從版本 2.4.3 開始,如果 supportedMediaType 包含 charset 引數,轉換器將不再新增 contentEncoding 訊息屬性;此引數也用於編碼。添加了一個新方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

Message 轉換

入站訊息根據傳送系統新增到 header 中的型別資訊轉換為物件。

從版本 2.4.3 開始,如果不存在 contentEncoding 訊息屬性,轉換器將嘗試在 contentType 訊息屬性中檢測 charset 引數並使用它。如果兩者都不存在,如果 supportedMediaType 包含 charset 引數,它將被用於解碼,最終回退到 defaultCharset 屬性。添加了一個新方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

在 1.6 版本之前,如果不存在型別資訊,轉換將失敗。從 1.6 版本開始,如果型別資訊丟失,轉換器將使用 Jackson 預設值(通常是 map)轉換 JSON。

此外,從版本 1.6 開始,當您使用 @RabbitListener 註解(在方法上)時,推斷出的型別資訊會被新增到 MessageProperties 中。這使得轉換器能夠轉換為目標方法的引數型別。這僅適用於只有一個引數且沒有註解或只有一個帶有 @Payload 註解的引數的情況。型別為 Message 的引數在分析期間會被忽略。

預設情況下,推斷出的型別資訊將覆蓋入站的 TypeId 和傳送系統建立的相關 header。這使得接收系統可以自動轉換為不同的領域物件。這僅適用於引數型別是具體型別(非抽象或介面)或來自 java.util 包的情況。在所有其他情況下,將使用 TypeId 和相關 header。在某些情況下,您可能希望覆蓋預設行為並始終使用 TypeId 資訊。例如,假設您有一個 @RabbitListener 方法,它接受一個 Thing1 引數,但訊息包含一個 Thing2,它是 Thing1 的子類(並且是具體的)。推斷出的型別將不正確。為了處理這種情況,將 Jackson2JsonMessageConverter 上的 TypePrecedence 屬性設定為 TYPE_ID,而不是預設的 INFERRED。(該屬性實際上在轉換器的 DefaultJackson2JavaTypeMapperDefaultClassMapper)上,但為了方便,轉換器提供了 setter)。如果您注入了自定義型別對映器,則應在該對映器上設定屬性。
Message 轉換時,傳入的 MessageProperties.getContentType() 必須符合 JSON 規範(使用 contentType.contains("json") 進行檢查)。從版本 2.2 開始,如果沒有 contentType 屬性,或者其值為預設值 application/octet-stream,則假定為 application/json。要恢復到之前的行為(返回未轉換的 byte[]),請將轉換器的 assumeSupportedContentType 屬性設定為 false。如果內容型別不受支援,將發出一條 WARN 級別的日誌訊息 Could not convert incoming message with content-type […​],並按原樣返回 message.getBody() - 作為 byte[]。因此,為了滿足消費者端 Jackson2JsonMessageConverter 的要求,生產者必須新增 contentType 訊息屬性 - 例如,設定為 application/jsontext/x-json,或者使用 Jackson2JsonMessageConverter,它會自動設定該 header。以下清單顯示了許多轉換器呼叫:
@RabbitListener
public void thing1(Thing1 thing1) {...}

@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在前面清單中的前四種情況下,轉換器嘗試轉換為 Thing1 型別。第五個示例無效,因為我們無法確定哪個引數應接收訊息負載。對於第六個示例,由於泛型型別是 WildcardType,Jackson 的預設值適用。

但是,您可以建立一個自定義轉換器,並使用 targetMethod 訊息屬性來決定將 JSON 轉換為哪種型別。

只有當 @RabbitListener 註解宣告在方法級別時,才能實現此型別推斷。對於類級別的 @RabbitListener,轉換後的型別用於選擇要呼叫的 @RabbitHandler 方法。因此,基礎設施提供了 targetObject 訊息屬性,您可以在自定義轉換器中使用它來確定型別。
從版本 1.6.11 開始,Jackson2JsonMessageConverter 以及因此而來的 DefaultJackson2JavaTypeMapper (DefaultClassMapper) 提供了 trustedPackages 選項,以克服序列化 Gadgets 漏洞。預設情況下,為了向後相容,Jackson2JsonMessageConverter 信任所有包 - 也就是說,它對該選項使用 *

從版本 2.4.7 開始,可以將轉換器配置為在 Jackson 反序列化訊息體返回 null 後返回 Optional.empty()。這使得 @RabbitListener 可以透過兩種方式接收 null 負載:

@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
    handleOptional(payload); // payload might be null
}

@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
    handleOptional(optional.orElse(this.emptyThing));
}

要啟用此功能,請將 setNullAsOptionalEmpty 設定為 true;當設定為 false(預設值)時,轉換器將回退到原始訊息體(byte[])。

@Bean
Jackson2JsonMessageConverter converter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setNullAsOptionalEmpty(true);
    return converter;
}

反序列化抽象類

在版本 2.2.8 之前,如果 @RabbitListener 的推斷型別是抽象類(包括介面),轉換器會回退到查詢 header 中的型別資訊,如果存在,則使用該資訊;如果不存在,它會嘗試建立抽象類。這導致了問題,當使用了配置了自定義反序列化器以處理抽象類的自定義 ObjectMapper,但傳入訊息具有無效的型別 header 時。

從版本 2.2.8 開始,預設保留以前的行為。如果您有這樣一個自定義 ObjectMapper,並且希望忽略型別 header 並始終使用推斷型別進行轉換,請將 alwaysConvertToInferredType 設定為 true。這是為了向後相容並避免在(使用標準 ObjectMapper)轉換會失敗時嘗試轉換的開銷。

使用 Spring Data Projection 介面

從版本 2.2 開始,您可以將 JSON 轉換為 Spring Data Projection 介面,而不是具體型別。這允許對資料進行非常有選擇性和低耦合的繫結,包括從 JSON 文件內部的多個位置查詢值。例如,可以將以下介面定義為訊息負載型別:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

訪問器方法將預設用於將屬性名稱作為欄位在接收到的 JSON 文件中查詢。@JsonPath 表示式允許自定義值查詢,甚至可以定義多個 JSON path 表示式,從多個位置查詢值,直到某個表示式返回實際值。

要啟用此功能,請在訊息轉換器上將 useProjectionForInterfaces 設定為 true。您還必須將 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 新增到類路徑中。

當用作 @RabbitListener 方法的引數時,介面型別會自動作為普通型別傳遞給轉換器。

使用 RabbitTemplateMessage 轉換

如前所述,型別資訊透過訊息 header 傳遞,以幫助轉換器從訊息進行轉換。這在大多數情況下都能正常工作。然而,當使用泛型型別時,它只能轉換簡單物件和已知的“容器”物件(列表、陣列和 map)。從版本 2.0 開始,Jackson2JsonMessageConverter 實現了 SmartMessageConverter,這使得它可以與接受 ParameterizedTypeReference 引數的新 RabbitTemplate 方法一起使用。這允許轉換複雜的泛型型別,如下例所示:

Thing1<Thing2<Cat, Hat>> thing1 =
    rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
從版本 2.1 開始,AbstractJsonMessageConverter 類已被刪除。它不再是 Jackson2JsonMessageConverter 的基類。它已被 AbstractJackson2MessageConverter 替換。

MarshallingMessageConverter

另一個選項是 MarshallingMessageConverter。它委託給 Spring OXM 庫的 MarshallerUnmarshaller 策略介面的實現。您可以在此處閱讀更多關於該庫的資訊。在配置方面,通常只需要提供建構函式引數,因為大多數 Marshaller 的實現也實現了 Unmarshaller。以下示例顯示瞭如何配置 MarshallingMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
            <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
        </bean>
    </property>
</bean>

Jackson2XmlMessageConverter

此類在版本 2.1 中引入,可用於將訊息從 XML 和轉換為 XML。

Jackson2XmlMessageConverterJackson2JsonMessageConverter 具有相同的基類:AbstractJackson2MessageConverter

引入 AbstractJackson2MessageConverter 類是為了替換已刪除的類:AbstractJsonMessageConverter

Jackson2XmlMessageConverter 使用 com.fasterxml.jackson 2.x 庫。

您可以像使用 Jackson2JsonMessageConverter 一樣使用它,只是它支援 XML 而不是 JSON。以下示例配置了 Jackson2JsonMessageConverter

<bean id="xmlConverterWithDefaultType"
        class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="foo.PurchaseOrder"/>
        </bean>
    </property>
</bean>

有關更多資訊,請參閱Jackson2JsonMessageConverter

從版本 2.2 開始,如果沒有 contentType 屬性,或者其值為預設值 application/octet-stream,則假定為 application/xml。要恢復到之前的行為(返回未轉換的 byte[]),請將轉換器的 assumeSupportedContentType 屬性設定為 false

ContentTypeDelegatingMessageConverter

此類在版本 1.4.2 中引入,允許根據 MessageProperties 中的內容型別屬性委託給特定的 MessageConverter。預設情況下,如果不存在 contentType 屬性或其值與所有配置的轉換器都不匹配,則委託給 SimpleMessageConverter。以下示例配置了 ContentTypeDelegatingMessageConverter

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter" />
            <entry key="application/xml" value-ref="xmlMessageConverter" />
        </map>
    </property>
</bean>

Java 反序列化

本節介紹如何反序列化 Java 物件。

從不受信任的源反序列化 Java 物件時可能存在漏洞。

如果您接受來自不受信任源的 content-typeapplication/x-java-serialized-object 的訊息,則應考慮配置允許反序列化的包和類。這適用於 SimpleMessageConverterSerializerMessageConverter,無論它是隱式配置還是透過配置使用 DefaultDeserializer

預設情況下,允許列表為空,這意味著不會反序列化任何類。

您可以設定模式列表,例如 thing1.thing1.thing2.Cat.MySafeClass

模式按順序檢查,直到找到匹配項。如果沒有匹配項,則丟擲 SecurityException

您可以使用這些轉換器上的 allowedListPatterns 屬性設定模式。或者,如果您信任所有訊息發起者,可以將環境變數 SPRING_AMQP_DESERIALIZATION_TRUST_ALL 或系統屬性 spring.amqp.deserialization.trust.all 設定為 true

訊息屬性轉換器

MessagePropertiesConverter 策略介面用於在 Rabbit Client BasicProperties 和 Spring AMQP MessageProperties 之間進行轉換。預設實現(DefaultMessagePropertiesConverter)通常足以滿足大多數目的,但如果需要,您可以實現自己的轉換器。預設屬性轉換器會將型別為 LongStringBasicProperties 元素轉換為 String 例項,當其大小不大於 1024 位元組時。較大的 LongString 例項不會被轉換(參見下一段)。此限制可以透過建構函式引數覆蓋。

從版本 1.6 開始,預設情況下,DefaultMessagePropertiesConverter 將長於長字串限制(預設值:1024)的 header 保留為 LongString 例項。您可以透過 getBytes[]toString()getStream() 方法訪問其內容。

以前,DefaultMessagePropertiesConverter 會將此類 header“轉換”為 DataInputStream(實際上它只是引用了 LongString 例項的 DataInputStream)。在輸出時,此 header 未被轉換(除了轉換為 String,例如透過呼叫 stream 的 toString() 轉換為 java.io.DataInputStream@1d057a39)。

大的入站 LongString header 現在在輸出時也能正確“轉換”(預設情況下)。

提供了一個新的建構函式,允許您配置轉換器,使其與以前一樣工作。以下清單顯示了該方法的 Javadoc 註釋和宣告:

/**
 * Construct an instance where LongStrings will be returned
 * unconverted or as a java.io.DataInputStream when longer than this limit.
 * Use this constructor with 'true' to restore pre-1.6 behavior.
 * @param longStringLimit the limit.
 * @param convertLongLongStrings LongString when false,
 * DataInputStream when true.
 * @since 1.6
 */
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

同樣從版本 1.6 開始,向 MessageProperties 中添加了一個名為 correlationIdString 的新屬性。以前,在轉換為和從 RabbitMQ 客戶端使用的 BasicProperties 時,會執行不必要的 byte[] <→ String 轉換,因為 MessageProperties.correlationId 是一個 byte[],而 BasicProperties 使用一個 String。(最終,RabbitMQ 客戶端使用 UTF-8 將 String 轉換為位元組以放入協議訊息中)。

為了提供最大的向後相容性,向 DefaultMessagePropertiesConverter 中添加了一個名為 correlationIdPolicy 的新屬性。它接受一個 DefaultMessagePropertiesConverter.CorrelationIdPolicy 列舉引數。預設情況下,它設定為 BYTES,這複製了以前的行為。

對於入站訊息:

  • STRING:僅對映 correlationIdString 屬性

  • BYTES:僅對映 correlationId 屬性

  • BOTH:對映兩個屬性

對於出站訊息:

  • STRING:僅對映 correlationIdString 屬性

  • BYTES:僅對映 correlationId 屬性

  • BOTH:考慮兩個屬性,其中 String 屬性優先

同樣從版本 1.6 開始,入站 deliveryMode 屬性不再對映到 MessageProperties.deliveryMode。它被對映到 MessageProperties.receivedDeliveryMode。此外,入站 userId 屬性不再對映到 MessageProperties.userId。它被對映到 MessageProperties.receivedUserId。這些更改是為了避免如果同一個 MessageProperties 物件用於出站訊息時意外傳播這些屬性。

從版本 2.2 開始,DefaultMessagePropertiesConverter 會將任何值型別為 Class<?> 的自定義 header 使用 getName() 而不是 toString() 進行轉換;這避免了消費端應用程式需要從 toString() 表示形式中解析類名。對於滾動升級,您可能需要更改您的消費者,使其能夠理解兩種格式,直到所有生產者都升級完成。