Appearance
消息转换器
一、概述
消息转换器(Message Converter)是 Spring AMQP 中负责消息序列化和反序列化的组件。它将 Java 对象转换为消息体,或将消息体转换为 Java 对象,是消息传递过程中的关键环节。
1.1 转换器架构
┌─────────────────────────────────────────────────────────────────────┐
│ 消息转换器架构 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 发送消息: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Object │ ──► │ Message │ ──► │ byte[] │ ──► │ RabbitMQ │ │
│ │ (对象) │ │Converter │ │ (消息体) │ │ Server │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 接收消息: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ RabbitMQ │ ──► │ byte[] │ ──► │ Message │ ──► │ Object │ │
│ │ Server │ │ (消息体) │ │Converter │ │ (对象) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘1.2 内置转换器
text
┌─────────────────────────────────────────────────────────────────────┐
│ 内置消息转换器 │
├───────────────────────┬─────────────────────────────────────────────┤
│ 转换器 │ 说明 │
├───────────────────────┼─────────────────────────────────────────────┤
│ SimpleMessageConverter│ 默认转换器,处理 String、byte[]、Serializable│
├───────────────────────┼─────────────────────────────────────────────┤
│ Jackson2JsonMessage │ JSON 转换器,使用 Jackson │
│ Converter │ 推荐使用 │
├───────────────────────┼─────────────────────────────────────────────┤
│ MarshallingMessage │ XML 转换器,使用 Spring Marshaller │
│ Converter │ 适合 XML 格式 │
├───────────────────────┼─────────────────────────────────────────────┤
│ SerializerMessageConv │ 使用 Spring Serializer │
│ erter │ 支持多种序列化方式 │
└───────────────────────┴─────────────────────────────────────────────┘二、核心知识点
2.1 Jackson2JsonMessageConverter
2.1.1 基础配置
java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JsonConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
return template;
}
}2.1.2 自定义 ObjectMapper
java
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CustomJsonConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
objectMapper.setSerializationInclusion(
com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL);
return new Jackson2JsonMessageConverter(objectMapper);
}
}2.1.3 类型映射
java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TypeMappingConverterConfig {
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setTypeIdMappings(java.util.Map.of(
"order", Order.class,
"payment", Payment.class,
"user", User.class
));
converter.setDefaultCharset("UTF-8");
return converter;
}
}2.2 自定义消息转换器
2.2.1 实现接口
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class CustomMessageConverter implements MessageConverter {
private static final String CONTENT_TYPE = "application/x-custom";
@Override
public Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
if (object == null) {
throw new MessageConversionException("对象不能为空");
}
byte[] body;
String contentType;
if (object instanceof String) {
body = ((String) object).getBytes();
contentType = "text/plain";
} else if (object instanceof byte[]) {
body = (byte[]) object;
contentType = "application/octet-stream";
} else if (object instanceof java.io.Serializable) {
try {
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(baos);
oos.writeObject(object);
oos.close();
body = baos.toByteArray();
contentType = "application/x-java-serialized-object";
} catch (Exception e) {
throw new MessageConversionException("序列化失败", e);
}
} else {
throw new MessageConversionException("不支持的对象类型: " + object.getClass());
}
messageProperties.setContentType(contentType);
messageProperties.setContentEncoding("UTF-8");
return new Message(body, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
byte[] body = message.getBody();
if (contentType == null) {
return body;
}
switch (contentType) {
case "text/plain":
return new String(body);
case "application/octet-stream":
return body;
case "application/x-java-serialized-object":
try {
java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(body);
java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bais);
return ois.readObject();
} catch (Exception e) {
throw new MessageConversionException("反序列化失败", e);
}
default:
return body;
}
}
}2.2.2 复合转换器
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.util.HashMap;
import java.util.Map;
public class CompositeMessageConverter implements MessageConverter {
private final Map<String, MessageConverter> converters = new HashMap<>();
private final MessageConverter defaultConverter;
public CompositeMessageConverter(MessageConverter defaultConverter) {
this.defaultConverter = defaultConverter;
}
public void addConverter(String contentType, MessageConverter converter) {
converters.put(contentType, converter);
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
String contentType = messageProperties.getContentType();
if (contentType != null && converters.containsKey(contentType)) {
return converters.get(contentType).toMessage(object, messageProperties);
}
return defaultConverter.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (contentType != null && converters.containsKey(contentType)) {
return converters.get(contentType).fromMessage(message);
}
return defaultConverter.fromMessage(message);
}
}2.3 消息类型推断
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
@Component
public class TypeInferenceConsumer {
@RabbitListener(queues = "typed.queue")
public void handleTypedMessage(Message message) {
MessageConverter converter = new Jackson2JsonMessageConverter();
Object result = converter.fromMessage(message);
if (result instanceof Order) {
handleOrder((Order) result);
} else if (result instanceof Payment) {
handlePayment((Payment) result);
} else {
System.out.println("未知类型: " + result.getClass());
}
}
private void handleOrder(Order order) {
System.out.println("处理订单: " + order.getOrderId());
}
private void handlePayment(Payment payment) {
System.out.println("处理支付: " + payment.getPaymentId());
}
}2.4 消息头信息
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Component
public class HeaderMessageService {
private final RabbitTemplate rabbitTemplate;
public HeaderMessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendWithHeaders(String exchange, String routingKey, Object message) {
Map<String, Object> headers = new HashMap<>();
headers.put("X-Message-Id", UUID.randomUUID().toString());
headers.put("X-Source-System", "order-service");
headers.put("X-Timestamp", System.currentTimeMillis());
headers.put("X-Version", "1.0");
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setHeaders(headers);
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
msg.getMessageProperties().setContentType("application/json");
msg.getMessageProperties().setHeader("__TypeId__", message.getClass().getName());
return msg;
});
}
public void receiveWithHeaders(Message message) {
MessageProperties props = message.getMessageProperties();
String messageId = props.getMessageId();
String contentType = props.getContentType();
Map<String, Object> headers = props.getHeaders();
System.out.println("消息ID: " + messageId);
System.out.println("内容类型: " + contentType);
System.out.println("消息头: " + headers);
Object typeId = headers.get("__TypeId__");
System.out.println("类型标识: " + typeId);
}
}三、代码示例
3.1 完整转换器配置
java
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class CompleteConverterConfig {
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return mapper;
}
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(objectMapper);
converter.setDefaultCharset("UTF-8");
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("order", Order.class);
typeIdMappings.put("payment", Payment.class);
typeIdMappings.put("user", User.class);
typeIdMappings.put("notification", Notification.class);
converter.setTypeIdMappings(typeIdMappings);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
return factory;
}
}3.2 多格式转换器
java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class MultiFormatConverterConfig {
@Bean
public MessageConverter multiFormatMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
SimpleMessageConverter simpleConverter = new SimpleMessageConverter();
return new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
if (object instanceof String || object instanceof byte[]) {
return simpleConverter.toMessage(object, messageProperties);
}
messageProperties.setContentType("application/json");
return jsonConverter.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (contentType != null && contentType.contains("json")) {
return jsonConverter.fromMessage(message);
}
return simpleConverter.fromMessage(message);
}
};
}
}3.3 加密转换器
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class EncryptedMessageConverter implements MessageConverter {
private final MessageConverter delegate;
private final SecretKeySpec secretKey;
private final String algorithm = "AES";
public EncryptedMessageConverter(MessageConverter delegate, String secret) {
this.delegate = delegate;
this.secretKey = new SecretKeySpec(secret.getBytes(), algorithm);
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
Message message = delegate.toMessage(object, messageProperties);
byte[] encryptedBody = encrypt(message.getBody());
messageProperties.setContentType("application/x-encrypted");
messageProperties.setHeader("x-encrypted", true);
return new Message(encryptedBody, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Boolean encrypted = (Boolean) message.getMessageProperties()
.getHeaders().get("x-encrypted");
if (encrypted != null && encrypted) {
byte[] decryptedBody = decrypt(message.getBody());
message = new Message(decryptedBody, message.getMessageProperties());
}
return delegate.fromMessage(message);
}
private byte[] encrypt(byte[] data) throws MessageConversionException {
try {
Cipher cipher = Cipher.getInstance(algorithm);
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
return cipher.doFinal(data);
} catch (Exception e) {
throw new MessageConversionException("加密失败", e);
}
}
private byte[] decrypt(byte[] data) throws MessageConversionException {
try {
Cipher cipher = Cipher.getInstance(algorithm);
cipher.init(Cipher.DECRYPT_MODE, secretKey);
return cipher.doFinal(data);
} catch (Exception e) {
throw new MessageConversionException("解密失败", e);
}
}
}四、实际应用场景
4.1 微服务消息传递
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
public class MicroserviceMessageService {
private final RabbitTemplate rabbitTemplate;
public MicroserviceMessageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendEvent(String exchange, String routingKey, String eventType, Object data) {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("eventType", eventType);
event.put("source", "order-service");
event.put("timestamp", LocalDateTime.now());
event.put("data", data);
rabbitTemplate.convertAndSend(exchange, routingKey, event, message -> {
message.getMessageProperties().setHeader("__TypeId__", "event");
message.getMessageProperties().setHeader("X-Event-Type", eventType);
return message;
});
}
public void sendCommand(String exchange, String routingKey, String commandType,
Object payload, String targetService) {
Map<String, Object> command = new HashMap<>();
command.put("commandId", UUID.randomUUID().toString());
command.put("commandType", commandType);
command.put("source", "order-service");
command.put("target", targetService);
command.put("timestamp", LocalDateTime.now());
command.put("payload", payload);
rabbitTemplate.convertAndSend(exchange, routingKey, command, message -> {
message.getMessageProperties().setHeader("__TypeId__", "command");
message.getMessageProperties().setHeader("X-Command-Type", commandType);
message.getMessageProperties().setHeader("X-Target-Service", targetService);
return message;
});
}
}4.2 多版本消息兼容
java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public class VersionAwareMessageConverter extends Jackson2JsonMessageConverter {
private final ObjectMapper objectMapper;
public VersionAwareMessageConverter(ObjectMapper objectMapper) {
super(objectMapper);
this.objectMapper = objectMapper;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String version = (String) headers.get("X-Version");
Object result = super.fromMessage(message);
if (result instanceof Map) {
Map<String, Object> data = (Map<String, Object>) result;
return convertToCurrentVersion(data, version);
}
return result;
}
private Object convertToCurrentVersion(Map<String, Object> data, String version) {
if (version == null || "1.0".equals(version)) {
return convertFromV1(data);
} else if ("2.0".equals(version)) {
return convertFromV2(data);
}
return data;
}
private Object convertFromV1(Map<String, Object> data) {
data.put("version", "2.0");
if (data.containsKey("orderNo")) {
data.put("orderId", data.remove("orderNo"));
}
return data;
}
private Object convertFromV2(Map<String, Object> data) {
return data;
}
}五、常见问题与解决方案
5.1 序列化失败
问题描述: 对象序列化时抛出异常。
解决方案:
java
ObjectMapper mapper = new ObjectMapper();
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
mapper.registerModule(new JavaTimeModule());5.2 类型丢失
问题描述: 反序列化后无法识别原始类型。
解决方案:
java
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setTypeIdMappings(Map.of(
"order", Order.class,
"payment", Payment.class
));5.3 日期格式问题
问题描述: 日期类型序列化格式不正确。
解决方案:
java
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);六、最佳实践建议
6.1 转换器选择
text
选择建议:
├── 通用场景:Jackson2JsonMessageConverter
├── 性能敏感:自定义二进制转换器
├── 安全敏感:加密转换器
├── 兼容性要求:版本感知转换器
└── 多格式支持:复合转换器6.2 配置建议
text
配置建议:
├── 统一配置 ObjectMapper
├── 注册类型映射
├── 设置默认字符集
├── 处理日期时间格式
└── 配置忽略未知属性6.3 性能建议
text
性能建议:
├── 复用 ObjectMapper 实例
├── 避免重复创建转换器
├── 控制消息大小
├── 使用高效的序列化格式
└── 缓存类型信息