Skip to content

消息转换器

一、概述

消息转换器(Message Converter)是 Spring AMQP 中负责消息序列化和反序列化的组件。它将 Java 对象转换为消息体,或将消息体转换为 Java 对象,是消息传递过程中的关键环节。

1.1 转换器架构

┌─────────────────────────────────────────────────────────────────────┐
│                     消息转换器架构                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  发送消息:                                                         │
│  ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐  │
│  │  Object  │ ──► │ Message  │ ──► │  byte[]  │ ──► │ RabbitMQ │  │
│  │  (对象)   │     │Converter │     │ (消息体)  │     │  Server  │  │
│  └──────────┘     └──────────┘     └──────────┘     └──────────┘  │
│                                                                     │
│  接收消息:                                                         │
│  ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐  │
│  │ RabbitMQ │ ──► │  byte[]  │ ──► │ Message  │ ──► │  Object  │  │
│  │  Server  │     │ (消息体)  │     │Converter │     │  (对象)   │  │
│  └──────────┘     └──────────┘     └──────────┘     └──────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 内置转换器

text
┌─────────────────────────────────────────────────────────────────────┐
│                     内置消息转换器                                   │
├───────────────────────┬─────────────────────────────────────────────┤
│         转换器        │                    说明                      │
├───────────────────────┼─────────────────────────────────────────────┤
│ SimpleMessageConverter│ 默认转换器,处理 String、byte[]、Serializable│
├───────────────────────┼─────────────────────────────────────────────┤
│ Jackson2JsonMessage   │ JSON 转换器,使用 Jackson                   │
│ Converter             │ 推荐使用                                    │
├───────────────────────┼─────────────────────────────────────────────┤
│ MarshallingMessage    │ XML 转换器,使用 Spring Marshaller          │
│ Converter             │ 适合 XML 格式                               │
├───────────────────────┼─────────────────────────────────────────────┤
│ SerializerMessageConv │ 使用 Spring Serializer                      │
│ erter                 │ 支持多种序列化方式                           │
└───────────────────────┴─────────────────────────────────────────────┘

二、核心知识点

2.1 Jackson2JsonMessageConverter

2.1.1 基础配置

java
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JsonConverterConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        return template;
    }
}

2.1.2 自定义 ObjectMapper

java
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CustomJsonConverterConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        ObjectMapper objectMapper = new ObjectMapper();
        
        objectMapper.registerModule(new JavaTimeModule());
        
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        
        objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
        
        objectMapper.setSerializationInclusion(
            com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL);
        
        return new Jackson2JsonMessageConverter(objectMapper);
    }
}

2.1.3 类型映射

java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TypeMappingConverterConfig {
    
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        
        converter.setTypeIdMappings(java.util.Map.of(
            "order", Order.class,
            "payment", Payment.class,
            "user", User.class
        ));
        
        converter.setDefaultCharset("UTF-8");
        
        return converter;
    }
}

2.2 自定义消息转换器

2.2.1 实现接口

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class CustomMessageConverter implements MessageConverter {
    
    private static final String CONTENT_TYPE = "application/x-custom";
    
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) 
            throws MessageConversionException {
        
        if (object == null) {
            throw new MessageConversionException("对象不能为空");
        }
        
        byte[] body;
        String contentType;
        
        if (object instanceof String) {
            body = ((String) object).getBytes();
            contentType = "text/plain";
        } else if (object instanceof byte[]) {
            body = (byte[]) object;
            contentType = "application/octet-stream";
        } else if (object instanceof java.io.Serializable) {
            try {
                java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
                java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(baos);
                oos.writeObject(object);
                oos.close();
                body = baos.toByteArray();
                contentType = "application/x-java-serialized-object";
            } catch (Exception e) {
                throw new MessageConversionException("序列化失败", e);
            }
        } else {
            throw new MessageConversionException("不支持的对象类型: " + object.getClass());
        }
        
        messageProperties.setContentType(contentType);
        messageProperties.setContentEncoding("UTF-8");
        
        return new Message(body, messageProperties);
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        byte[] body = message.getBody();
        
        if (contentType == null) {
            return body;
        }
        
        switch (contentType) {
            case "text/plain":
                return new String(body);
            case "application/octet-stream":
                return body;
            case "application/x-java-serialized-object":
                try {
                    java.io.ByteArrayInputStream bais = new java.io.ByteArrayInputStream(body);
                    java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bais);
                    return ois.readObject();
                } catch (Exception e) {
                    throw new MessageConversionException("反序列化失败", e);
                }
            default:
                return body;
        }
    }
}

2.2.2 复合转换器

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.util.HashMap;
import java.util.Map;

public class CompositeMessageConverter implements MessageConverter {
    
    private final Map<String, MessageConverter> converters = new HashMap<>();
    private final MessageConverter defaultConverter;
    
    public CompositeMessageConverter(MessageConverter defaultConverter) {
        this.defaultConverter = defaultConverter;
    }
    
    public void addConverter(String contentType, MessageConverter converter) {
        converters.put(contentType, converter);
    }
    
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) 
            throws MessageConversionException {
        
        String contentType = messageProperties.getContentType();
        
        if (contentType != null && converters.containsKey(contentType)) {
            return converters.get(contentType).toMessage(object, messageProperties);
        }
        
        return defaultConverter.toMessage(object, messageProperties);
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        
        if (contentType != null && converters.containsKey(contentType)) {
            return converters.get(contentType).fromMessage(message);
        }
        
        return defaultConverter.fromMessage(message);
    }
}

2.3 消息类型推断

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

@Component
public class TypeInferenceConsumer {
    
    @RabbitListener(queues = "typed.queue")
    public void handleTypedMessage(Message message) {
        MessageConverter converter = new Jackson2JsonMessageConverter();
        
        Object result = converter.fromMessage(message);
        
        if (result instanceof Order) {
            handleOrder((Order) result);
        } else if (result instanceof Payment) {
            handlePayment((Payment) result);
        } else {
            System.out.println("未知类型: " + result.getClass());
        }
    }
    
    private void handleOrder(Order order) {
        System.out.println("处理订单: " + order.getOrderId());
    }
    
    private void handlePayment(Payment payment) {
        System.out.println("处理支付: " + payment.getPaymentId());
    }
}

2.4 消息头信息

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Component
public class HeaderMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public HeaderMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendWithHeaders(String exchange, String routingKey, Object message) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("X-Message-Id", UUID.randomUUID().toString());
        headers.put("X-Source-System", "order-service");
        headers.put("X-Timestamp", System.currentTimeMillis());
        headers.put("X-Version", "1.0");
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setHeaders(headers);
            msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
            msg.getMessageProperties().setContentType("application/json");
            msg.getMessageProperties().setHeader("__TypeId__", message.getClass().getName());
            return msg;
        });
    }
    
    public void receiveWithHeaders(Message message) {
        MessageProperties props = message.getMessageProperties();
        
        String messageId = props.getMessageId();
        String contentType = props.getContentType();
        Map<String, Object> headers = props.getHeaders();
        
        System.out.println("消息ID: " + messageId);
        System.out.println("内容类型: " + contentType);
        System.out.println("消息头: " + headers);
        
        Object typeId = headers.get("__TypeId__");
        System.out.println("类型标识: " + typeId);
    }
}

三、代码示例

3.1 完整转换器配置

java
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CompleteConverterConfig {
    
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        
        mapper.registerModule(new JavaTimeModule());
        
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        
        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        
        return mapper;
    }
    
    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(objectMapper);
        
        converter.setDefaultCharset("UTF-8");
        
        Map<String, Class<?>> typeIdMappings = new HashMap<>();
        typeIdMappings.put("order", Order.class);
        typeIdMappings.put("payment", Payment.class);
        typeIdMappings.put("user", User.class);
        typeIdMappings.put("notification", Notification.class);
        
        converter.setTypeIdMappings(typeIdMappings);
        
        return converter;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         MessageConverter jsonMessageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        return template;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter);
        
        return factory;
    }
}

3.2 多格式转换器

java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;

@Configuration
public class MultiFormatConverterConfig {
    
    @Bean
    public MessageConverter multiFormatMessageConverter() {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        SimpleMessageConverter simpleConverter = new SimpleMessageConverter();
        
        return new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) 
                    throws MessageConversionException {
                
                if (object instanceof String || object instanceof byte[]) {
                    return simpleConverter.toMessage(object, messageProperties);
                }
                
                messageProperties.setContentType("application/json");
                return jsonConverter.toMessage(object, messageProperties);
            }
            
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                String contentType = message.getMessageProperties().getContentType();
                
                if (contentType != null && contentType.contains("json")) {
                    return jsonConverter.fromMessage(message);
                }
                
                return simpleConverter.fromMessage(message);
            }
        };
    }
}

3.3 加密转换器

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

public class EncryptedMessageConverter implements MessageConverter {
    
    private final MessageConverter delegate;
    private final SecretKeySpec secretKey;
    private final String algorithm = "AES";
    
    public EncryptedMessageConverter(MessageConverter delegate, String secret) {
        this.delegate = delegate;
        this.secretKey = new SecretKeySpec(secret.getBytes(), algorithm);
    }
    
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) 
            throws MessageConversionException {
        
        Message message = delegate.toMessage(object, messageProperties);
        
        byte[] encryptedBody = encrypt(message.getBody());
        
        messageProperties.setContentType("application/x-encrypted");
        messageProperties.setHeader("x-encrypted", true);
        
        return new Message(encryptedBody, messageProperties);
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Boolean encrypted = (Boolean) message.getMessageProperties()
            .getHeaders().get("x-encrypted");
        
        if (encrypted != null && encrypted) {
            byte[] decryptedBody = decrypt(message.getBody());
            message = new Message(decryptedBody, message.getMessageProperties());
        }
        
        return delegate.fromMessage(message);
    }
    
    private byte[] encrypt(byte[] data) throws MessageConversionException {
        try {
            Cipher cipher = Cipher.getInstance(algorithm);
            cipher.init(Cipher.ENCRYPT_MODE, secretKey);
            return cipher.doFinal(data);
        } catch (Exception e) {
            throw new MessageConversionException("加密失败", e);
        }
    }
    
    private byte[] decrypt(byte[] data) throws MessageConversionException {
        try {
            Cipher cipher = Cipher.getInstance(algorithm);
            cipher.init(Cipher.DECRYPT_MODE, secretKey);
            return cipher.doFinal(data);
        } catch (Exception e) {
            throw new MessageConversionException("解密失败", e);
        }
    }
}

四、实际应用场景

4.1 微服务消息传递

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
public class MicroserviceMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    public MicroserviceMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendEvent(String exchange, String routingKey, String eventType, Object data) {
        Map<String, Object> event = new HashMap<>();
        event.put("eventId", UUID.randomUUID().toString());
        event.put("eventType", eventType);
        event.put("source", "order-service");
        event.put("timestamp", LocalDateTime.now());
        event.put("data", data);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, event, message -> {
            message.getMessageProperties().setHeader("__TypeId__", "event");
            message.getMessageProperties().setHeader("X-Event-Type", eventType);
            return message;
        });
    }
    
    public void sendCommand(String exchange, String routingKey, String commandType, 
                           Object payload, String targetService) {
        Map<String, Object> command = new HashMap<>();
        command.put("commandId", UUID.randomUUID().toString());
        command.put("commandType", commandType);
        command.put("source", "order-service");
        command.put("target", targetService);
        command.put("timestamp", LocalDateTime.now());
        command.put("payload", payload);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, command, message -> {
            message.getMessageProperties().setHeader("__TypeId__", "command");
            message.getMessageProperties().setHeader("X-Command-Type", commandType);
            message.getMessageProperties().setHeader("X-Target-Service", targetService);
            return message;
        });
    }
}

4.2 多版本消息兼容

java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;

public class VersionAwareMessageConverter extends Jackson2JsonMessageConverter {
    
    private final ObjectMapper objectMapper;
    
    public VersionAwareMessageConverter(ObjectMapper objectMapper) {
        super(objectMapper);
        this.objectMapper = objectMapper;
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        String version = (String) headers.get("X-Version");
        
        Object result = super.fromMessage(message);
        
        if (result instanceof Map) {
            Map<String, Object> data = (Map<String, Object>) result;
            return convertToCurrentVersion(data, version);
        }
        
        return result;
    }
    
    private Object convertToCurrentVersion(Map<String, Object> data, String version) {
        if (version == null || "1.0".equals(version)) {
            return convertFromV1(data);
        } else if ("2.0".equals(version)) {
            return convertFromV2(data);
        }
        
        return data;
    }
    
    private Object convertFromV1(Map<String, Object> data) {
        data.put("version", "2.0");
        if (data.containsKey("orderNo")) {
            data.put("orderId", data.remove("orderNo"));
        }
        return data;
    }
    
    private Object convertFromV2(Map<String, Object> data) {
        return data;
    }
}

五、常见问题与解决方案

5.1 序列化失败

问题描述: 对象序列化时抛出异常。

解决方案

java
ObjectMapper mapper = new ObjectMapper();
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
mapper.registerModule(new JavaTimeModule());

5.2 类型丢失

问题描述: 反序列化后无法识别原始类型。

解决方案

java
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setTypeIdMappings(Map.of(
    "order", Order.class,
    "payment", Payment.class
));

5.3 日期格式问题

问题描述: 日期类型序列化格式不正确。

解决方案

java
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

六、最佳实践建议

6.1 转换器选择

text
选择建议:
├── 通用场景:Jackson2JsonMessageConverter
├── 性能敏感:自定义二进制转换器
├── 安全敏感:加密转换器
├── 兼容性要求:版本感知转换器
└── 多格式支持:复合转换器

6.2 配置建议

text
配置建议:
├── 统一配置 ObjectMapper
├── 注册类型映射
├── 设置默认字符集
├── 处理日期时间格式
└── 配置忽略未知属性

6.3 性能建议

text
性能建议:
├── 复用 ObjectMapper 实例
├── 避免重复创建转换器
├── 控制消息大小
├── 使用高效的序列化格式
└── 缓存类型信息

七、相关链接