RocketMQ与SpringBoot整合、核心使用、多租户自动隔离、Java8时间支持

一、文章核心内容

通过篇文章,将会用到以下的内容项,可以在生产中直接使用

  • RocketMQ与SpringBoot的整合以及基础的应用配置

  • RocketMQTjson怎么读emplate模板类的各种设计模式的集体教学活动方案使用,如顺序消息、异步消息、响应消http://192.168.1.1登录息、单向消息、指定消息k设计模式及其应用场景ey等等

  • R源码编辑器ocketMQ多租户/环境自动隔离topicjson格式怎么打开/group/tag,如只需实体类型要配置基础的topic、group、tag,部署到dev、test、pr实体类od环境json自动隔离,只需要写一次配置;多租户同样适用

  • Java时间模块支持,默认如果RocketMQMessageListener用实体类接收消息时,字段不支持LocalDate/Locahttp 500lDateTime类型,发送会报错,需要增加单独处理

  • 启动日志报Rock源码交易平台etMQLog:WARN Pl源码编程器ease initialize thjson格式e l实体类ogger system properly错误解决

    文章预告

  • 下一篇文章分享在企业级项目中RocketMQ的二次封装,达到核JSON心代码抽离,实现类只需要关注业务模块,日志记录,自动重试啥啥的全都扔给抽象层

  • 抽象层源码精灵永久兑换码json怎么读解耦、复用的一大有效手段,需要结合业务、设计模式、实战经验,抽象http://www.baidu.com出一个适合自身项目的抽象层

    特殊说明

  • 这篇文章分享的是高级部分,并非基础RocketMQ,所以RocketMQ基础使用、Rhttp代理ocketMQ控制台等基础部分不会过多说明

二、核心使用

2.http://www.baidu.com1 与SpringBoot整合

整体项目结构如下,文实体类是什么章未列出所有代码,未提及的设计模式请参考下面源代码

  • 文章源码:gitee.comjson数据/tianxincode…

RocketMQ与SpringBoot整合、核心使用、多租户自动隔离、Java8时间支持

  1. 添加maven设计模式有哪些依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.codecoord</groupId>
    <artifactId>springboot-rocketmq</artifactId>
    <version>1.0</version>
    <name>springboot-rocketmq</name>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
        <!-- RocketMQ核心依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  1. 配置文件增加服务器配置
    • 核心的配置在application.yml配置注释中有说明
server:
  port: 8888
spring:
  application:
    name: springboot-rocketmq
rocketmq:
  # 多个NameServer,host:port;host:port,RocketMQProperties
  name-server: 127.0.0.1:9876
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: springboot_producer_group
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 3000
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
    # access-key
    #accessKey: xxx
    # secret-key
    #secretKey: xxx
    # 是否启用消息跟踪,默认false
    enableMsgTrace: false
    # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
    customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
  consumer:
    # 配置指定group是否启动监听器 group1.topic1 = false
    listeners:
      # key:group名称
      rocketmq_source_code_group:
        # value:{key: topic名称: value: true/false}
        rocketmq_source_code: true
    # 指定消费组
    group: springboot_consumer_group
    # 指定topic,启动时就会注册
    #topic: springboot_consumer_xxx
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
    # 其他配置参考属性类
logging:
  level:
    io.netty: ERROR
    # 关闭RocketmqClient info日志,不然每隔几秒就会打印broker信息
    RocketmqClient: error
  1. 新建启动类
    • 解决启动RocketMQLog:WARN Please initialize the logger system properly.报错
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * RocketMQ启动类
 */
@SpringBootApplication
public class RocketMqApplication {
    public static void main(String[] args) {
        /*
         * 指定使用的日志框架,否则将会告警
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
        SpringApplication.run(RocketMqApplication.class, args);
    }
}
  1. 以上完成,和SpringBoot整合已经完成,基本上大部分框架与SpringBoot整合都只需要以下三步
    • 添加pom
    • applihttp协议cation.yml配置
    • 启动类(如果需要开启注解)

2.2 项目基础数据类

  1. 新建消息实体类,LocalDate和LocalDateTime类型默认不支持,需要单独处理,参考后面配置,因为LocalDate和LocalDateTime日常使用中用的较多
import com.codecoord.rocketmq.config.RocketMqConfig;
import lombok.Data;
import java.time.LocalDate;
import java.time.LocalDateTime;
@Data
public class RocketMqMessage {
    private Long id;
    private String message;
    private String version;
    /**
     * LocalDate和LocalDateTime默认不支持,需要单独处理
     * {@link RocketMqConfig}
     */
    private LocalDate currentDate;
    private LocalDateTime currentDateTime;
}
  1. 新增常量类,包含主要源码编辑器topic等信息
    • 生产环境中一但上线90%情况下不会去更改原有的tag或者topic消息信息,所json数据以可以将其定义为常量类,方便全局引用,避免出错
public interface RocketMqBizConstant {
    String SOURCE_TOPIC = "rocketmq_source_code_topic";
    String SOURCE_GROUP = "rocketmq_source_code_group";
    String SOURCE_TAG = "rocketmq_source_code_tag";
}
  • 也可以在配置文件中定义,然后通过 ${} 表达式注入,这json解析种方式适用于需要更改不同配置信息让队列生效情况
# 配置文件中配置组信息
service:
  topic: rocketmq_source_code_topic
  group: rocketmq_source_code_group
  tag: rocketmq_source_code_tag
  1. 新增延迟等级常量类,延迟等级和基础RocketMQ中的延迟等级一致
public interface RocketMqDelayLevel {
    int ONE_SECOND = 1;
    int FIVE_SECOND = 2;
    int TEN_SECOND = 3;
    int THIRTY_SECOND = 4;
    int ONE_MINUTE = 5;
    int TWO_MINUTE = 6;
    int THREE_MINUTE = 7;
    int FOUR_MINUTE = 8;
    int FIVE_MINUTE = 9;
    int SIX_MINUTE = 10;
    int SEVEN_MINUTE = 11;
    int EIGHT_MINUTE = 12;
    int NINE_MINUTE = 13;
    int TEN_MINUTE = 14;
    int TWENTY_MINUTE = 15;
    int THIRTY_MINUTE = 16;
    int ONE_HOUR = 17;
    int TWO_HOUR = 18;
}

2.3 RocketMQ配置

  1. 配置跨json解析域,此配置可选,是为了方便网页版apipost请求的时候处理浏览器跨域问题,如果不需要测试可以不配置
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
 * web拦截器配置,可以使用网页版本的apipost
 */
@Component
public class WebConfig implements WebMvcConfigurer {
    /**
     * 跨域配置方式一
     */
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*")
                .allowedHeaders("*")
                .allowedMethods("*")
                // .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
                .maxAge(3600L);
    }
    /**
     * 跨域配置方式二
     */
    /*@Bean
    public CorsFilter corsFilter() {
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        CorsConfiguration configuration = new CorsConfiguration();
        configuration.setAllowCredentials(true);
        configuration.setMaxAge(3600L);
        // 配置允许的原始ip、头、方法
        configuration.addAllowedOrigin("*");
        configuration.addAllowedHeader("*");
        configuration.addAllowedMethod("*");
        source.registerCorsConfiguration("/**", configuration);
        return new CorsFilter(source);
    }*/
}
  1. 重要:RocketMQ序列化器处理,增加Java8时间类型处理,配置完成之后可以在实httpwatch体类中使用Java8时间类型作为属性
    • 未配置出现错误参考第四节
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import java.util.List;
/**
 * RocketMQ序列化器处理
 */
@Configuration
public class RocketMqConfig {
    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter createRocketMQMessageConverter() {
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if (messageConverter instanceof MappingJackson2MessageConverter) {
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                // 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTime
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

2源码之家.4 RocketMQTemplate发送消息

  1. SpringBoot中使用RocketMQTemplate模板类进行消息发送
    • org.apache.rocketmq.spring.corejson是什么意思.RocketMQTemplate
  2. 多种发送消息方式示例
    • 注意:发送带key消息是通过设置请求头完成
    • org.apache.rocketmq.spring.support.RocketMQHeaders源码中的图片
    • org.apache.rocketmq.spring.suppohttp://www.baidu.comrt.RocketMQUtil#convertToSpringMhttpwatchessage(MessageExt)
      • 需要注意目的地发送格式为 topic:tag
    • 下面不httpwatch同消息间用了注释,实体类的作用需要测试哪种发送消息方式打开注释即可
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.constant.RocketMqDelayLevel;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate rocketMqTemplate;
    /**
     * 通过实体类发送消息,发送注意事项请参考实体类
     * http://localhost:8888/rocketmq/entity/message
     */
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        // 目的:topic:tag,如果不指定则发往配置的默认地址
        String destination = RocketMqBizConstant.SOURCE_TOPIC + ":" + RocketMqBizConstant.SOURCE_TAG;
        RocketMqMessage message = new RocketMqMessage();
        message.setId(System.currentTimeMillis());
        message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
        // Java时间字段需要单独处理,否则会序列化失败
        message.setCurrentDate(LocalDate.now());
        message.setCurrentDateTime(LocalDateTime.now());
        message.setVersion("1.0");
        /// 发送同步消息,消息成功发送到Broker时才返回,message可以入参批量消息
        // 通过SendResult来处理发送结果
        // SendResult sendResult = rocketMqTemplate.syncSend(destination, message);
        /// 发送时指定业务key
        /*Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
                // 设置keys
                .setHeader(RocketMQHeaders.KEYS, message.getId())
                .build();
        SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage);*/
        /// 发送延迟消息
        Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
        SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage, 3000, RocketMqDelayLevel.FIVE_SECOND);
        /// 发送同步有序消息,需要指定hashKey,可以用业务唯一键
        // rocketMqTemplate.syncSendOrderly(destination, message, message.getId().toString());
        /// 发送异步消息,消息发送后及时返回,然后通过回调方法通知
        // rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
        //     @Override
        //     public void onSuccess(SendResult sendResult) {
        //         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败【{}】", e.getMessage());
        //     }
        // });
        /// 发送异步有序消息,需要指定hashKey,可以用业务唯一键
        // rocketMqTemplate.asyncSendOrderly(destination, message, message.getId().toString(), new SendCallback() {
        //     @Override
        //     public void onSuccess(SendResult sendResult) {
        //         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败【{}】", e.getMessage());
        //     }
        // });
        /// 发送单向消息
        // rocketMqTemplate.sendOneWay(destination, message);
        /// 发送单向有序消息,通过MessageBuilder构建
        // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
        // rocketMqTemplate.sendOneWayOrderly(destination, buildMessage, message.getId().toString());
        /// 发送和接收回调消息,需要实现 RocketMQReplyListener 监听器类才可以,否则将会超时错误
        // rocketMqTemplate.sendAndReceive(destination, message, new RocketMQLocalRequestCallback<String>() {
        //     @Override
        //     public void onSuccess(String message) {
        //         log.info("消息发送成功,消息类型【{}】", message);
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败", e);
        //     }
        // });
        /// 调用抽象类方法发送,最终也是syncSend
        // rocketMqTemplate.convertAndSend(destination, "convertAndSend");
        // 转换消息和发送,底层使用的是syncSend(destination, message),将会被RocketEntityMessageListener消费
        // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
        //         // 设置请求头
        //         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
        //         .build();
        // 将会被RocketEntityMessageListener03消费
        // Message<Object> buildMessage = MessageBuilder.withPayload(new Object()).build();
        // rocketMqTemplate.send(destination, buildMessage);
        /// 发送批量消息,批量消息最终转为单挑进行发送
        // List<Message<String>> msgList = new ArrayList<>();
        // for (int i = 0; i < 10; i++) {
        //     msgList.add(MessageBuilder.withPayload("消息:" + i).build());
        // }
        // rocketMqTemplate.syncSend(destination, msgList);
        return message;
    }
    /**
     * 直接将对象进行传输,也可以自己进行json转化后传输
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        String destination = "rocketmq_source_code_topic:rocketmq_source_code_ext_tag";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        return rocketMqTemplate.syncSend(destination, jsonObject);
    }
}
  1. 需要测试哪种发送方式,把其他方式注释打开需要的发送方式注释即可,上面发送延迟消息

2.5 RocketMQListener消费

  1. 消息消费需要使用注解RocketMQMessajson数据gejsonobjectListener进行监听
    • electorExpression = “*”:指定t源码之家ag,*表示监听所有tag
    • consumerGroup:指定消费组
      • 一般可以将consumerGroup和selectorExpression设源码时代置为一样
    • topic:指定topic,topic至关重要,通常表示某一类业务或者平台,例如订单topic、仓储topic
  2. 监听器注解类需要实现RocketMQListener接口,泛型为发送消息源码1688时的类型
    • org.apache.rocketmq.spring.core.RocketMQListener
  3. 监听器实例
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
 * 实体类消费监听器
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG
)
public class RocketEntityMessageListener implements RocketMQListener<RocketMqMessage> {
    /**
     * 普通消息
     */
    @Override
    public void onMessage(RocketMqMessage message) {
        log.info("收到消息【{}】", JSONObject.toJSON(message));
        try {
            // 方法执行完成之后会自动进行进行ack,后续会分享源码解读
            TimeUnit.SECONDS.sleep(3);
            // 出现异常,将会自动进入重试队列
            // int ex = 10 / 0;
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
        log.info("休眠了3s后消费完成");
    }
}
  1. 通过表达式注入
@RocketMQMessageListener(
        topic = "${service.topic}",
        consumerGroup = "${service.group}",
        selectorExpression = "${service.tag}"
)

2.6 发送消息测试

  1. RocketMqController已经提供了消息发设计模式属于行为型的是送测试接口,通过接口发送后,即可自动发送到指定的topic:tag,特别注意是发送到 topic:tag,而不是 group:tag
  2. 发送之后通过RocketMQ控制台查看

三、消费配置

3.1 配置线程数

  1. RocketMQMessjsonobjectageListener默认是64个线程并发json解析消息,一些时候需要源码1688单线程消费,所以可以配置 consumeThreadMax 参数指定并发消费json是什么意思线程数,避免太大源码中的图片导致资源不够
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
        consumeThreadMax = 5
)

3.2 消息确认

  1. RocketMQ在SpringBoot中,当方法执行成功且未抛出异常时会自动进行确认,如果出现异常将会进行重试
    • 源码类:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt messageExt : msgs) {
        log.debug("received msg: {}", messageExt);
        try {
            long now = System.currentTimeMillis();
            handleMessage(messageExt);
            long costTime = System.currentTimeMillis() - now;
            log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
        } catch (Exception e) {
            // 只要出现Exception异常,将会进行重试
            log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
            context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
	// 如果没有出现异常则消费成功
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
  1. 其中delayLevelWhenNextConsume指定重试频率
    • -1:不重试,直接放入死信队列
    • 0:broker控制重设计模式有哪些试频率
    • >0:客户端控制设计模式属于行为型的是重试频率

3.3 WARhttpclientN No appenders could be found for logger

  1. 引入SpringBoot之后不在启动类中做日志配置情况下将会有以下警告
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
  1. 需要在启动json格式类中启动前设置环境变量 rocketmq.client.logHTTPUseSlf4j 为 true 明确指定RocketMQ的日志框架
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqApplication {
    public static void main(String[] args) {
        /*
         * 指定使用的日志框架,否则将会报错
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
        SpringApplication.run(RocketMqApplication.class, args);
    }
}

四、Java8时间类型支持

2.3节有提到,在此处做详细说明

  1. RocketMQ内置使用的转换器是RocketMQMessageConverter
    • org.apache.rocketmq.sp设计模式ring.autoconfigure.MessageConverterConfiguration
  2. 转换JSON时使用的是MappingJackson2MessageConverter,但是其不支持Java的时间类型,比如LocalDate
  3. json数据消息实体中存在上面的时间https和http的区别类型字段时源码网站将会报以下错误
java.lang.RuntimeException: cannot convert message to class com.codecoord.rocketmq.domain.RocketMqMessage
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:486) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.1.jar:4.9.1]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_231]
  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_231]
  at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_231]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Expected array or string.
  1. 所以需要自定义消息转换器,将MappingJacksoHTTPn2MessageConverter进行替换,然后添加支持时间模块
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import java.util.List;
/**
 * 序列化器处理
 */
@Configuration
public class RocketMqConfig {
    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter createRocketMQMessageConverter() {
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if (messageConverter instanceof MappingJackson2MessageConverter) {
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

五、多环境自动隔离

5.1 环境隔离说明

  1. 从上面代码中可以看到,topic等源码时代信息通过代码常量方式维护,此时如果不对环境json进行隔离,将会导致消息被错误的环境消费
  2. 其中一种解决方案是通过$Value注入,但是这个需要每个环境都维护自己的topic等信息
  3. 我们可以进行环境区分自动隔离,比如dev、test、prod等不同环境只需要简单配置一个选项,所有的消息将被自动隔离源码网站,这样当协作人员需要单独测试的时候较为方便
  4. 环境隔离利用BeanPostProcessor的postProcessBeforeInitialization在监听器实例初始化前把对应topic修改

5.2 环境隔离配置

多环境自动隔离这部分实用性五颗星

  1. 配置文件中增加属性配置
# 自定义属性
system:
  environment:
    # 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串
    name: tianxin
    # 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
    # 默认为true,配置类:EnvironmentIsolationConfig
    isolation: true
  1. 配置Bean修改配置,自动隔离核心原理
    • Spring生命周期中BeanPostProcessor在实体类是什么类初始化前在postProcessBefor源码编程器eInitia设计模式的两大主题lization中搞事情
    • 监听器类脑设计模式面试题袋上面有@Compo源码编辑器下载nent注解,所以一定会被Spring收HTTP拾到容器中
    • 所以可以在初始化前,把监听器类的t设计模式opic/group/tag神不知鬼不觉给改掉,然后实例http 500化的时候用的http://www.baidu.com就是改后的值
    • 可以选择隔离topic/group/tag或者全部隔离,如果是多租户那就拼接多租户信息即可
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;
/**
 * RocketMQ多环境隔离配置
 * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
 */
@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
    @Value("${system.environment.isolation:true}")
    private boolean enabledIsolation;
    @Value("${system.environment.name:''}")
    private String environmentName;
    @Override
    public Object postProcessBeforeInitialization(@NonNull Object bean,
                                                  @NonNull String beanName) throws BeansException {
        // DefaultRocketMQListenerContainer是监听器实现类
        if (bean instanceof DefaultRocketMQListenerContainer) {
            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
            // 开启消息隔离情况下获取隔离配置,此处隔离topic,根据自己的需求隔离group或者tag
            if (enabledIsolation && StringUtils.hasText(environmentName)) {
                container.setTopic(String.join("_", container.getTopic(), environmentName));
            }
            return container;
        }
        return bean;
    }
}
  1. 启动项目,从控制台中可以看到当开启隔离的时候会自动在topic后面加上隔离名称
  2. 提示:如果按照topic隔离,请提前创建好topic或者开启broker的topic自动创建功能

发表评论

提供最优质的资源集合

立即查看 了解详情