1.问题介绍

Springboot项目中运用RabbitMQ作为音讯中间件运用过程中,有时候咱们会运用java目标作为传输的音讯,默认情况下允许运用java目标作为传输的音讯,假如强制性的运用Java目标作为音讯(不运用json东西的情况下),会由于序列化和反序列化导出运行时反常, 但是却有着严格的条件限制:

  1. 生产者和顾客的Java目标有必要相同
  2. Java目标的类路径(包名)有必要相同

假如在项目中开发者和顾客属于同一个项目则没有问题,假如开发者和顾客属于不同项目则第2个条件则比较麻烦,当然也有处理办法。比方实体类作为根底包(微服务/多模块项目中叫做根底模块)引进生产者和顾客项目中等等。

还有一种办法是经过在生产者中将java目标转为json字符串,在顾客中将json字符串转回java目标。这种办法有许多弊端,首先要添加代码量(互相转化),还有便是针对不同的目标需要添加转化的代码。

2.处理办法

音讯序列化(Jackson2)

针对音讯生产者,咱们都是用RabbitTemplate建议音讯,在RabbitTemplate目标中有一个办法专门用来完成Java目标的序列化,

@SpringBootTest
public class MqTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void producertTest(){
     // setMessageConverter 办法便是修正序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        User user = new User();
        user.setName("张三");
        user.setAge(23);
        rabbitTemplate.convertAndSend(MqConstant.Exchange,"",user);
    }
}

留意: 上面这种序列化方法太low了,假如针对多个建议音讯或许在不同的地方运用,每次都还要去修正序列化,所以,咱们就将RabbitTemplate装备成一个Bean,咱们每次去依靠注入的也便是这个Bean。

@Configuration
public class RabbitMQConfig {
    @Bean
    @SuppressWarnings("all")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        // 在注入Bean的时候有必要将衔接工厂装配进来,由于这是自己定义的一个Bean
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // setMessageConverter 办法运用jackson2去完成序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

音讯反序列化(Jackson2)

顾客便是针对 @RabbitListener 这个注解反序列化,就要去完成RabbitListenerConfigurer 接口。

@Configuration
public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer{
    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        // 这儿的转化器设置完成了 经过 @Payload 注解 主动反序列化message body
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }
}

在反序列化中,你能够运用 @Payload 注解直接将音讯转化成Java目标,例如下面.

@Service
public class RabbitService {
    @RabbitListener(queues = MQConstant.QUEUE)
    public void message(@Payload User message){
        System.out.println(message);
    }
}

在后面还有许多注解能够运用,例如获取音讯的id,获取音讯的头。这些都是在运用spring boot整合RabbitMQ结构中最常用的方法

留意: 完成序列化和反序列化都是完成 MessageConverter 接口 ,结果一个严重的问题出现了,假如你是经过第三方json结构去封装的音讯序列化,那么你就要特别留意了。

完成序列化运用的是:org.springframework.amqp.support.converter 包下的 MessageConverter

完成反序列化运用的是:org.springframework.messaging.converter 包下的MessageConverter

音讯序列化(FastJson2)

Fastjson便是一个第三方结构,之所以在项目常见到它,便是由于他的解析速度快的惊人(他们官方自己说的),而且在这次项目中咱们运用的是Fastjson2版本,解析速度更不用说了。

下面咱们要在spring boot中运用第三方结构完成RabbitMQ的序列化问题,那么就要先去找FastJson中是否有这个完成 org.springframework.amqp.support.converter 包下的 MessageConverter 接口 或 承继 AbstractMessageConverter 抽象类的完成类,发现没有,那咱们就要去完成。

  1. 导入fastJson的依靠
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2-extension-spring5</artifactId>
    <version>2.0.34</version>
</dependency>
  1. 扫除spring boot的默认Jackson依靠,也能够不扫除,个人习惯
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </exclusion>
    </exclusions>
</dependency>
  1. 完成 MessageConverter 接口或许AbstractMessageConverter抽象类
import com.alibaba.fastjson2.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class FastjsonMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        byte[] bytes = JSON.toJSONBytes(object);
        messageProperties.setContentType("application/json");
        messageProperties.setContentEncoding("UTF-8");
        messageProperties.setContentLength(bytes.length);
        return new Message(bytes, messageProperties);
    }
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        byte[] bytes = message.getBody();
        return JSON.parseObject(bytes, Object.class);
    }
}
import com.alibaba.fastjson2.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
public class FastjsonMessageConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object o, MessageProperties messageProperties) {
        byte[] bytes = JSON.toJSONBytes(o);
        messageProperties.setContentType("application/json");
        messageProperties.setContentEncoding("UTF-8");
        messageProperties.setContentLength(bytes.length);
        return new Message(bytes, messageProperties);
    }
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        byte[] bytes = message.getBody();
        return JSON.parseObject(bytes, Object.class);
    }
}

其实 AbstractMessageConverter 抽象类 也是完成了MessageConverter接口,承继AbstractMessageConverter去完成,首要是由于AbstractMessageConverter帮咱们主动生成了音讯的id,看看源码就知道。

spring boot整合RabbitMQ,使用fastjson2实现序列化

音讯反序列化(FastJson2)

运用FastJson处理mq反序列化,其实与上面jackson的反序列化一模一样,只是将 MappingJackson2MessageConverter 改成MappingFastJsonMessageConverter即可

@Configuration
public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer {
    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        // 这儿的转化器设置完成了 经过 @Payload 注解 主动反序列化message body
        factory.setMessageConverter(new MappingFastJsonMessageConverter());
        return factory;
    }
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }
}

提示: 假如你运用其他的json库,也都是这样完成序列化问题