前言

SpringBoot 集成 RabbitMQ 公司老迈觉得运用注解太繁琐了,而且不能动态生成行列所以让我研讨是否能够动态绑定,所以就有了这个事情。打工人便是命苦没办法,硬着头皮直接就上了,接下来进入主题吧。

需求思路剖析

依据老迈的需求,大致分为运用装备文件进行装备,然后代码动态产生行列,交换机,生产者,顾客,以及假如装备了死信行列则动态绑定死信行列。由此得出一切的这些都是依据装备进行操作。然后百度有无代码创立就完事了。

装备文件思路剖析

问百度 RabbItMQ 支撑代码创立行列,交换机,以及两者之间绑定的代码,依据这些资料得出以下装备,下面示例装备只给出常用装备,其他装备后面会有个装备类

spring:
  rabbitmq:
    # 动态创立和绑定行列、交换机的装备
    modules:
      - routing-key: 路由KEY
        producer: 生产者
        consumer: 顾客
        autoAck: 是否主动ACK
        queue: 行列
          name: 行列称号
          dead-letter-exchange: 死信行列交换机
          dead-letter-routing-key: 死信行列路由KEY
          arguments: 行列其他参数,此装备支撑RabbitMQ的扩展装备
            # 1分钟(测验),单位毫秒
            x-message-ttl: 3000 # 推迟行列
        exchange: 交换机
          name: 交换机称号
      ..... 省略其他装备

从这儿开端便是界说中心代码模块需求的类,能够从这儿开端越过,直接看中心装备逻辑和中心代码,后续需求了解详细类的功用再回来看

装备类完成

装备有了,接下来便是创立Java目标把装备目标化了,因为支撑多个所以用的是调集接纳装备

/**
 * 绑定装备根底类
 * @author FJW
 * @version 1.0
 * @date 2023年04月11日 14:58
 */
@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {
    /**
     * 模块装备
     */
    List<ModuleProperties> modules;
}

对应YML的装备类

/**
 * YML装备类
 * @author FJW
 * @version 1.0
 * @date 2023年04月11日 17:16
 */
@Data
public class ModuleProperties {
    /**
     * 路由Key
     */
    private String routingKey;
    /**
     * 生产者
     */
    private String producer;
    /**
     * 顾客
     */
    private String consumer;
    /**
     * 主动承认
     */
    private Boolean autoAck = true;
    /**
     * 行列信息
     */
    private Queue queue;
    /**
     * 交换机信息
     */
    private Exchange exchange;
    /**
     * 交换机信息类
     */
    @Data
    public static class Exchange {
        /**
         * 交换机类型
         * 默许主题交换机
         */
        private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.TOPIC;
        /**
         * 交换机称号
         */
        private String name;
        /**
         * 是否耐久化
         * 默许true耐久化,重启音讯不会丢失
         */
        private boolean durable = true;
        /**
         * 当一切队绑定列均不在运用时,是否主动删去交换机
         * 默许false,不主动删去
         */
        private boolean autoDelete = false;
        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;
    }
    /**
     * 行列信息类
     */
    @Data
    public static class Queue {
        /**
         * 行列称号
         */
        private String name;
        /**
         * 是否耐久化
         */
        private boolean durable = true; // 默许true耐久化,重启音讯不会丢失
        /**
         * 是否具有排他性
         */
        private boolean exclusive = false; // 默许false,可多个顾客消费同一个行列
        /**
         * 当顾客均断开衔接,是否主动删去行列
         */
        private boolean autoDelete = false; // 默许false,不主动删去,避免顾客断开行列丢掉音讯
        /**
         * 绑定死信行列的交换机称号
         */
        private String deadLetterExchange;
        /**
         * 绑定死信行列的路由key
         */
        private String deadLetterRoutingKey;
        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;
    }
}

生产者&顾客,这儿只需求界说个接口,后续会有完成类进行完成

生产者

/**
 * 生产者接口
 * @author FJW
 * @version 1.0
 * @date 2023年04月11日 13:52
 */
public interface ProducerService {
    /**
     * 发送音讯
     * @param message
     */
    void send(Object message);
}

顾客, 这儿需求继承 RabbitMQ 的顾客接口,后续会直接把此接口给动态绑定到 RabbitMQ 中


/**
 * 顾客接口
 * @author FJW
 * @version 1.0
 * @date 2023年04月11日 13:52
 */
public interface ConsumerService extends ChannelAwareMessageListener {
}

重试处理器


/**
 * 重试处理器
 * @author FJW
 * @version 1.0
 * @date 2023年04月19日 16:40
 */
public interface CustomRetryListener {
    /**
     * 最终一次重试失利回调
     * @param context
     * @param callback
     * @param throwable
     * @param <E>
     * @param <T>
     */
    public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
    /**
     * 每次失利的回调
     * @param context
     * @param callback
     * @param throwable
     * @param <E>
     * @param <T>
     */
    public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
}

常量枚举界说

交换机类型枚举

    /**
     * 交换机类型枚举
     * @author FJW
     * @version 1.0
     * @date 2023年04月11日 15:19
     */
    public enum  RabbitExchangeTypeEnum {
        /**
         * 直连交换机
         * <p>
         * 依据routing-key精准匹配行列(最常运用)
         */
        DIRECT,
        /**
         * 主题交换机
         * <p>
         * 依据routing-key含糊匹配行列,*匹配任意一个字符,#匹配0个或多个字符
         */
        TOPIC,
        /**
         * 扇形交换机
         * <p>
         * 直接分发给一切绑定的行列,忽略routing-key,用于播送音讯
         */
        FANOUT,
        /**
         * 头交换机
         * <p>
         * 相似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(运用较少)
         */
        HEADERS;
    }

行列,交换机,路由 常量枚举

    /**
     * 行列,交换机。路由 常量枚举
     * @author FJW
     * @version 1.0
     * @date 2023年04月18日 16:39
     */
    public enum  RabbitEnum {
        QUEUE("xxx.{}.queue", "行列称号"),
        EXCHANGE("xxx.{}.exchange", "交换机称号"),
        ROUTER_KEY("xxx.{}.key", "路由称号"),
        ;
        RabbitEnum(String value, String desc) {
            this.value = value;
            this.desc = desc;
        }
        @Getter
        private String value;
        @Getter
        private String desc;
    }

中心代码

生产者完成类封装

/**
 * 生产者完成类
 * @author FJW
 * @version 1.0
 * @date 2023年04月18日 14:32
 */
@Slf4j
public class AbsProducerService implements ProducerService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 交换机
     */
    private String exchange;
    /**
     * 路由
     */
    private String routingKey;
    @Override
    public void send(Object msg) {
        MessagePostProcessor messagePostProcessor = (message) -> {
            MessageProperties messageProperties = message.getMessageProperties();
            messageProperties.setMessageId(IdUtil.randomUUID());
            messageProperties.setTimestamp(new Date());
            return message;
        };
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentEncoding("UTF-8");
        messageProperties.setContentType("text/plain");
        String data = JSONUtil.toJsonStr(msg);
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
    }
    public void setExchange(String exchange) {
        this.exchange = exchange;
    }
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
}

顾客完成类封装

/**
 * @author FJW
 * @version 1.0
 * @date 2023年04月18日 17:53
 */
@Slf4j
public abstract class AbsConsumerService<T> implements ConsumerService {
    private Class<T> clazz = (Class<T>) new TypeToken<T>(getClass()) {}.getRawType();
    /**
     * 音讯
     */
    private Message message;
    /**
     * 通道
     */
    private Channel channel;
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        this.message = message;
        this.channel = channel;
        String body = new String(message.getBody());
        onConsumer(genObject(body));
    }
    /**
     * 依据反射获取泛型
     * @param body
     * @return
     */
    private T genObject(String body) throws JsonProcessingException, IllegalAccessException, InstantiationException {
        try {
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(body, clazz);
        }catch (Exception e) {
            log.error("MQ转发层过错,请检查泛型是否与实践类型匹配, 指定的泛型是: {}", clazz.getName(), e);
        }
        return clazz.newInstance();
    }
    /**
     * 扩展消费办法,对音讯进行封装
     * @param data
     * @throws IOException
     */
    public void  onConsumer(T data) throws IOException {
        log.error("未对此办法进行完成: {}", data);
    }
    /**
     * 承认音讯
     */
    protected void ack() throws IOException {
        ack(Boolean.FALSE);
    }
    /**
     * 回绝音讯
     */
    protected void nack() throws IOException {
        nack(Boolean.FALSE, Boolean.FALSE);
    }
    /**
     * 回绝音讯
     */
    protected void basicReject() throws IOException {
        basicReject(Boolean.FALSE);
    }
    /**
     * 回绝音讯
     * @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
     */
    protected void basicReject(Boolean multiple) throws IOException {
        this.channel.basicReject(this.message.getMessageProperties().getDeliveryTag(), multiple);
    }
    /**
     * 是否主动承认
     * @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
     */
    protected void ack(Boolean multiple) throws IOException {
        this.channel.basicAck(this.message.getMessageProperties().getDeliveryTag(), multiple);
    }
    /**
     * 回绝音讯
     * @param multiple 当时 DeliveryTag 的音讯是否承认一切 true 是, false 否
     * @param requeue 当时 DeliveryTag 音讯是否重回行列 true 是 false 否
     */
    protected void nack(Boolean multiple, Boolean requeue) throws IOException {
        this.channel.basicNack(this.message.getMessageProperties().getDeliveryTag(), multiple, requeue);
    }
}

音讯监听工厂类完成,此完成非常重要,此处的代码便是绑定顾客的中心代码


/**
 * MQ详细音讯监听器工厂
 * @author FJW
 * @version 1.0
 * @date 2023年04月18日 10:48
 */
@Data
@Slf4j
@Builder
public class ConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
    /**
     * MQ衔接工厂
     */
    private ConnectionFactory connectionFactory;
    /**
     * 操作MQ管理器
     */
    private AmqpAdmin amqpAdmin;
    /**
     * 行列
     */
    private Queue queue;
    /**
     * 交换机
     */
    private Exchange exchange;
    /**
     * 顾客
     */
    private ConsumerService consumer;
    /**
     * 重试回调
     */
    private CustomRetryListener retryListener;
    /**
     * 最大重试次数
     */
    private final Integer maxAttempts = 5;
    /**
     * 是否主动承认
     */
    private Boolean autoAck;
    @Override
    public SimpleMessageListenerContainer getObject() throws Exception {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setAmqpAdmin(amqpAdmin);
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setPrefetchCount(20);
        container.setConcurrentConsumers(20);
        container.setMaxConcurrentConsumers(100);
        container.setDefaultRequeueRejected(Boolean.FALSE);
        container.setAdviceChain(createRetry());
        container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        if (Objects.nonNull(consumer)) {
            container.setMessageListener(consumer);
        }
        return container;
    }
    /**
     * 装备重试
     * @return
     */
    private Advice createRetry() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                // 第一次重试调用
                return true;
            }
            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            }
            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                if (Objects.nonNull(retryListener)) {
                    retryListener.onRetry(context, callback, throwable);
                    if (maxAttempts.equals(context.getRetryCount())) {
                        retryListener.lastRetry(context, callback, throwable);
                    }
                }
            }
        });
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
        retryTemplate.setBackOffPolicy(genExponentialBackOffPolicy());
        return RetryInterceptorBuilder.stateless()
                .retryOperations(retryTemplate).recoverer(new RejectAndDontRequeueRecoverer()).build();
    }
    /**
     * 设置过期时间
     * @return
     */
    private BackOffPolicy genExponentialBackOffPolicy() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        // 重试距离基数(秒)
        backOffPolicy.setInitialInterval(5000);
        // 从重试的第一次至最终一次,最大时间距离(秒)
        backOffPolicy.setMaxInterval(86400000);
        // 重试指数
        backOffPolicy.setMultiplier(1);
        return backOffPolicy;
    }
    @Override
    public Class<?> getObjectType() {
        return SimpleMessageListenerContainer.class;
    }
}

中心装备类

/**
 * RabbitMQ 全局装备,SpringBoot 发动后会回调此类
 * @author FJW
 * @version 1.0
 * @date 2023年04月11日 13:55
 */
@Slf4j
public class RabbitMqConfig implements SmartInitializingSingleton {
    /**
     * MQ链接工厂
     */
    private ConnectionFactory connectionFactory;
    /**
     * MQ操作管理器
     */
    private AmqpAdmin amqpAdmin;
    /**
     * YML装备
     */
    private RabbitModuleProperties rabbitModuleProperties;
    @Autowired
    public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) {
        this.amqpAdmin = amqpAdmin;
        this.rabbitModuleProperties = rabbitModuleProperties;
        this.connectionFactory = connectionFactory;
    }
    @Override
    public void afterSingletonsInstantiated() {
        StopWatch stopWatch = StopWatch.create("MQ");
        stopWatch.start();
        log.debug("初始化MQ装备");
        List<ModuleProperties> modules = rabbitModuleProperties.getModules();
        if (CollUtil.isEmpty(modules)) {
            log.warn("未装备MQ");
            return;
        }
        for (ModuleProperties module : modules) {
            try {
                Queue queue = genQueue(module);
                Exchange exchange = genQueueExchange(module);
                queueBindExchange(queue, exchange, module);
                bindProducer(module);
                bindConsumer(queue, exchange, module);
            } catch (Exception e) {
                log.error("初始化失利", e);
            }
        }
        stopWatch.stop();
        log.info("初始化MQ装备成功耗时: {}ms", stopWatch.getTotal(TimeUnit.MILLISECONDS));
    }
    /**
     * 绑定生产者
     * @param module
     */
    private void bindProducer(ModuleProperties module) {
        try {
            AbsProducerService producerService = SpringUtil.getBean(module.getProducer());
            producerService.setExchange(module.getExchange().getName());
            producerService.setRoutingKey(module.getRoutingKey());
            log.debug("绑定生产者: {}", module.getProducer());
        } catch (Exception e) {
            log.warn("无法在容器中找到该生产者[{}],若需求此生产者则需求做详细完成", module.getConsumer());
        }
    }
    /**
     * 绑定顾客
     * @param queue
     * @param exchange
     * @param module
     */
    private void bindConsumer(Queue queue, Exchange exchange, ModuleProperties module) {
        CustomRetryListener customRetryListener = null;
        try {
            customRetryListener = SpringUtil.getBean(module.getRetry());
        }catch (Exception e) {
            log.debug("无法在容器中找到该重试类[{}],若需求重试则需求做详细完成", module.getRetry());
        }
        try {
            ConsumerContainerFactory factory = ConsumerContainerFactory.builder()
                    .connectionFactory(connectionFactory)
                    .queue(queue)
                    .exchange(exchange)
                    .consumer(SpringUtil.getBean(module.getConsumer()))
                    .retryListener(customRetryListener)
                    .autoAck(module.getAutoAck())
                    .amqpAdmin(amqpAdmin)
                    .build();
            SimpleMessageListenerContainer container = factory.getObject();
            if (Objects.nonNull(container)) {
                container.start();
            }
            log.debug("绑定顾客: {}", module.getConsumer());
        } catch (Exception e) {
            log.warn("无法在容器中找到该顾客[{}],若需求此顾客则需求做详细完成", module.getConsumer());
        }
    }
    /**
     * 行列绑定交换机
     * @param queue
     * @param exchange
     * @param module
     */
    private void queueBindExchange(Queue queue, Exchange exchange, ModuleProperties module) {
        log.debug("初始化交换机: {}", module.getExchange().getName());
        String queueName = module.getQueue().getName();
        String exchangeName = module.getExchange().getName();
        module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey()));
        String routingKey = module.getRoutingKey();
        Binding binding = new Binding(queueName,
                Binding.DestinationType.QUEUE,
                exchangeName,
                routingKey,
                null);
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareExchange(exchange);
        amqpAdmin.declareBinding(binding);
        log.debug("行列绑定交换机: 行列: {}, 交换机: {}", queueName, exchangeName);
    }
    /**
     * 创立交换机
     * @param module
     * @return
     */
    private Exchange genQueueExchange(ModuleProperties module) {
        ModuleProperties.Exchange exchange = module.getExchange();
        RabbitExchangeTypeEnum exchangeType = exchange.getType();
        exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName()));
        String exchangeName = exchange.getName();
        Boolean isDurable = exchange.isDurable();
        Boolean isAutoDelete = exchange.isAutoDelete();
        Map<String, Object> arguments = exchange.getArguments();
        return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments);
    }
    /**
     * 依据类型生成交换机
     * @param exchangeType
     * @param exchangeName
     * @param isDurable
     * @param isAutoDelete
     * @param arguments
     * @return
     */
    private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) {
        AbstractExchange exchange = null;
        switch (exchangeType) {
            // 直连交换机
            case DIRECT:
                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            // 主题交换机
            case TOPIC:
                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            //扇形交换机
            case FANOUT:
                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            // 头交换机
            case HEADERS:
                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            default:
                log.warn("未匹配到交换机类型");
                break;
        }
        return exchange;
    }
    /**
     * 创立行列
     * @param module
     * @return
     */
    private Queue genQueue(ModuleProperties module) {
        ModuleProperties.Queue queue = module.getQueue();
        queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName()));
        log.debug("初始化行列: {}", queue.getName());
        Map<String, Object> arguments = queue.getArguments();
        if (MapUtil.isEmpty(arguments)) {
            arguments = new HashMap<>();
        }
        // 转化ttl的类型为long
        if (arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        // 绑定死信行列
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
            deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange);
            deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey);
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
            log.debug("绑定死信行列: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey);
        }
        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }
}

以上装备完成后,开端RUN代码

运用示例

这儿装备了推迟行列和死信行列,开RUN

spring:
  rabbitmq:
    # 动态创立和绑定行列、交换机的装备
    modules:
      # 正常行列
      - routing-key: test
        consumer: testConsumerService
        producer: testProducerService
        autoAck: false
        queue:
          name: test
          dead-letter-exchange: dead
          dead-letter-routing-key: dead
          arguments:
            # 1分钟(测验),单位毫秒
            x-message-ttl: 3000
        exchange:
          name: test
      # 死信行列
      - routing-key: dead
        consumer: deadConsumerService
        producer: deadProducerService
        autoAck: false
        queue:
          name: dead
        exchange:
          name: dead

项目发动部分日志

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

MQ管理端查看是否绑定成功

交换机

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

行列

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

由此能够得出代码是正常的,都有

生产者发送音讯

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

PostMan

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

顾客,因为测验死信行列,所以这儿回绝消费了

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

顾客消费信息

优雅封装RabbitMQ实现动态队列、动态生产者,动态消费者绑定

搞定收工,接下来还有什么问题等到详细生产上去发现了,横竖需求完成了,假如你们有方面需求此文章仍是个不错的参考,各位看客记住留下赞哦