一、问题阐明

此篇文章会触及到SpringBoot RocketMQ源码,后面会单独出一篇详细介绍源码

  1. SpringBoot RocketMQ能够从源码中查看到完结了SmartInitializingSingleton接口,在所有Bean注册完结之后就会开端注册,注册运用的是main线程,就会导致假如音讯队列积压很多音讯时注册占用main线程时刻过长,导致SpringBoot不能完全完结初始化(首要影响web无法对外供给接口访问)
  • 源码注册进口:org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration#afterSingletonsInstantiated

SpringBoot RocketMQ消息监听器(RocketMQMessageListener)后置注册

  1. 例如生产环境运用k8s集群布置,当音讯积压过大时无法快速完结发动对外供给安排妥当检查接口,就会导致pod安排妥当检查失败,然后pod不停重启无法完结正常发动(实际上服务在正常处理音讯,消费占用了初始化时长)

本次首要运用RocketMQ音讯监听器后置注册计划,k8s层面也能够处理,可是不在本次评论规模之内

RocketMQ音讯监听器后置注册:便是在SpringBoot发动完结能够正常接纳外部接口恳求时再动态注册音讯加监听器

本篇文章初衷如下

  • 首要评论RocketMQ假如在SpringBoot中后置注册,也便是咱们自己操控注册次序,而不是SpringBoot主动装配完结注册
  • 评论的计划不触及改动源码,而是直接经过正常方法达到后置注册意图

首要处理

  • 不期望在服务发动时就注册音讯监听器,而是服务完全发动后再开端注册音讯监听器

前置文章参阅

  • SpringBoot根底整合RocketMQ前面有发布过两篇文章,根底整合不会再过多阐明,有需求能够参阅下面的文章
  • RocketMQ与SpringBoot整合、核心运用、多租户主动隔离、Java8时刻支撑 – ()
  • RocketMQ与SpringBoot整合进行生产级二次封装 – ()

二、处理计划

  1. 经过上面的源码能够看到,afterSingletonsInstantiated办法完结音讯监听器的注册,所以此刻就需求阻拦音讯监听器的注册,阻拦注册有两个办法
  • 改源码(不属于本篇文章评论规模,一起也不主张,除非公司有专门保护的同事)
  • 经过AOP切片,阻拦办法注册
  1. AOP切片原理
  • 在SpringAOP中,咱们能够经过 @Around 注解切入一个办法,只有咱们自己调用ProceedingJoinPoint的proceed() 办法时才会触发办法本身的调用
  • 所以原理便是经过@Around切入办法,可是不调用proceed()办法,直接回来,就能够完结阻拦的意图

三、代码示例

3.1 AOP阻拦RocketMQMessageListener注册

  • 代码示例如下
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
 * RocketMQ注册阻拦,自行注册,加速容器发动速度
 *
 * @author tianxincoord@163.com
 * @since 2022/8/15
 */
@Component
@Aspect
@Order(1)
@Slf4j
public class RocketMqSkipRegisterAop {
    /**
     * 经过切入afterSingletonsInstantiated阻拦注册
     */
    @Around("execution(* org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.afterSingletonsInstantiated())")
    public Object init(ProceedingJoinPoint joinPoint) {
        log.info("RocketMQ开启代理,默许注册已越过");
        /// 不调用原生履行办法,直接回来null,越过办法调用
        // Object proceed = joinPoint.proceed();
        // return proceed
        return null;
    }
}

3.2 编写自定义注册类

因为ListenerContainerConfiguration注册类办法被切面阻拦,所以能够仿制一个这个类出来保留里边的办法,然后调整一下参数等信息,根据自己需求,也能够直接运用实例中的代码
示例类代码和ListenerContainerConfiguration根本一致,去掉继承联系和接口完结,当做普通类

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
 * RocketMQ队列监听器
 *
 * @author tianxincoord@163.com
 * @since 2022/09/23
 */
public class RocketMqListenerRegistry {
    private final static Logger log = LoggerFactory.getLogger(
            org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.class);
    private final ConfigurableApplicationContext applicationContext;
    private final AtomicLong counter = new AtomicLong(0);
    private final StandardEnvironment environment;
    private final RocketMQProperties rocketMqProperties;
    private final RocketMQMessageConverter rocketMqMessageConverter;
    public RocketMqListenerRegistry(ConfigurableApplicationContext applicationContext,
                                    RocketMQMessageConverter rocketMqMessageConverter,
                                    StandardEnvironment environment,
                                    RocketMQProperties rocketMqProperties) {
        this.applicationContext = applicationContext;
        this.rocketMqMessageConverter = rocketMqMessageConverter;
        this.environment = environment;
        this.rocketMqProperties = rocketMqProperties;
    }
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        beans.forEach(this::registerContainer);
    }
    public void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
        if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(
                bean.getClass())) {
            throw new IllegalStateException(
                    clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
        }
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(
                bean.getClass())) {
            throw new IllegalStateException(
                    clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
        }
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled = (boolean) rocketMqProperties.getConsumer().getListeners()
                .getOrDefault(consumerGroup, Collections.EMPTY_MAP).getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName,
                containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
                                                                             RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
        container.setRocketMQMessageListener(annotation);
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMqProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQListener((RocketMQListener) bean);
        } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQReplyListener((RocketMQReplyListener) bean);
        }
        container.setMessageConverter(rocketMqMessageConverter.getMessageConverter());
        container.setName(name);
        return container;
    }
    private void validate(RocketMQMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                    "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
        }
    }
}

3.3 发动类发动自定义注册

在发动类中单独在SpringApplication.run(xxx.class, args);之后开端注册。SpringApplication.run履行完结才表示整个SpringBoot服务真正意义上的发动完结

import com.codecoord.rocketmq.config.RocketMqListenerRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.StandardEnvironment;
/**
 * RocketMQ发动类
 */
@Slf4j
@SpringBootApplication
public class RocketMqApplication {
    public static void main(String[] args) {
        /*
         * 指定运用的日志框架,否则将会报错
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
        ConfigurableApplicationContext run = SpringApplication.run(RocketMqApplication.class, args);
        log.info("SpringBoot RocketMQ服务发动完结");
        ConfigurableEnvironment environment = run.getEnvironment();
        registerRocketMq(environment, run);
    }
    private static void registerRocketMq(ConfigurableEnvironment environment,
                                         ConfigurableApplicationContext applicationContext) {
        try {
            log.info("开端注册RocketMQ");
            RocketMQMessageConverter messageConverter = applicationContext.getBean(RocketMQMessageConverter.class);
            RocketMQProperties mqProperties = applicationContext.getBean(RocketMQProperties.class);
            RocketMqListenerRegistry registry = new RocketMqListenerRegistry(applicationContext, messageConverter, (StandardEnvironment) environment, mqProperties);
            // 手工调用此办法完结原来应该完结的注册调用
            registry.afterSingletonsInstantiated();
            log.info("RocketMQ注册成功");
        } catch (Exception e) {
            // 此处能够整合音讯告知,当注册失败时发送企业微信、钉钉、邮件等告警
            log.error("RocketMQ注册异常", e);
            e.printStackTrace();
        }
    }
}
  • 上述注册也能够单独拆成的组件类,这样乃至能够经过controller接纳指令的方法完结注册,美滋滋

四、代码测试

  1. 发动代码调查注册进程

SpringBoot RocketMQ消息监听器(RocketMQMessageListener)后置注册
2. 经过以上,能够保证服务快速发动,尽快供给web服务来告知外界服务正常

五、特殊阐明

  1. 假如项目是web和任务是别离的,那么上面的问题不存在,因为任务处理服务不需求处理web服务
  2. 欢迎我们留言探讨