RocketMQ与SpringBoot整合进行生产级二次封装

零、文章前言阐明

前置把握:SpringBoot根底运用、RocketMQ和SpringBoot的整合运用

  • 首要运用参阅第二节
  • 中心运用参阅第一篇文章

文章难度:四颗星

代码不难,重点是封装的思维优先级需求windows许可证即将过期怎么办领会

优先级调度算法同观念欢迎大家在评论区一同评论学习,没有对优先级英文错之分,每个体系事务特性不同,适合体系的才是最泛型编程好的~

源码地址优先级排序gitee.com/tianxincode… 文章只会阐明中心Windows代码,其他的根底整合配置和多环境自动隔离参阅源码即可

RocketMQ与SpringBoot整合进行出产级二次封装

一、为什么要二次封装

为了不发生歧义,文章中说到的二次封装均是依据原始运用办法的封装,而非源码级别的二次封装

换句话说:假如都需求对源码进行封装了,那么阐明公司事务规划都到一定程度了,二次封装这种东西现已不需求评论了,封装现已是一个giti一致

  1. 首先清晰一点:不进行二次封装彻底不影响RocketMQwindows许可证即将过期怎么办的运用,能够挑选二次封装和不挑选二次封装
    • 二次封装能够供给更多的功用和更简练的运用办法
    • 假如一个封装搞得比原始运用办法更杂乱,那么就失去了二次封装的意义
    • Q1:二次封装可不能够不要?彻底能够,彻底不影响正常运用
    • Q2:泛型list和一般list有什么区别二次封装有没有必要?仁者见仁智者见智的问题,假如觉得没有必要那么这篇文章能够越过~
  2. ORM结构中典型的一个二次封装优先级排序c语言结构便是MyBat设计模式面试题isPlusgitlab(简称MP),后者是对MyBatis原生运用的添加,不运用MP直接运用MyBatis可不能够?彻底优先级排序能够,那为什么要用二次封装后的MP?
    • 场景泛型方法的定义和使用:大部分的数据库操设计模式的6大原则作,无外乎Cwindows7怎样重设计模式及其应用场景装体系RUD,那么最常用的比方(依设计模式有哪些据称号就泛型方法能够知道这个方优先级越小越优先吗法做什么用,就没有必要再二次阐明晰):updatwi泛型的作用ndows更新有必要吗e优先级调度算法ById泛型方法的定义和使用、batchUpdate、windows系统dele设计模式及其应用场景t设计模式之禅eById、saveOrUpdate、batchInsert。关于上面这5个操作,改优先级排序动的仅仅表字段和表规划方式面试题规划方式的团体教育活动计划,剩余的语法都是设计模式面试题相同
    • 不封装:直接运用MyBatis彻底能够自己完成上面办法的功用,可是每个表都需求写一遍自己上面的办法,假设有100张表,那么就会多出495个(下面阐明)优先级排序c言语重复功用代码,并且一切代码都是冗余的
    • 封装后:由设计模式封装者供给上面5个办法的公共完成,然后一切需求运用上面功用的Service只需求承继封装好的类就自然的具有了上面的5大功用,那么代码的冗余量就从100张设计模式之禅表*5个办法==设计模式一般用来解决什么问题500,去掉封装的5个,节省了495的代码冗余量
  3. 所以二次封装是为了更便利用、更简练、愈加适用于体系,量身打造能够大大提高开发功率。就如上面的5个办法,彻底重复性的东西为规划方式一般用来处理什么问题什么要浪费开发giti轮胎时刻来做这些冗余的工作呢?

1.1 二次封装不同观念

让咱们以一个生活中的蛋炒饭开个头

原始结构比方供给了原材料:厨具、鸡蛋,米饭等食材、菜谱

  1. 关于结构的运用一般有以下两种办法
  • 第一种:依据菜谱来进优先级是什么意思行做饭(运用原生的办法调用),洗菜、做饭、刷碗、打扫不管啥入参自己管
  • 第二种:找一个优先级行列人来学会这道菜(担任二次封装的人)的多种做法(封装大部分事务场景)并做成一种点餐式的泛型办法的界说和泛型的作用运用服务,谁想吃哪种类型的蛋炒饭直接点餐(调用封Git装好的办法)就能够吃上香馥馥的蛋Windows炒饭

问题:github哪种计划更优先级队列好? 答案:两种各有各的优势(在说废话,哈哈~)

  1. 第一种:原生办法
    • 长处:能够依照各种办法灵敏调用,比方每个人都运用Rocke优先级排序tMQTempgithub中文官网网页late设计模式一般用来解决什么问题原生send发送音讯,想要发giticomfort是什么轮胎送什么类型的音讯就发送什么类型的音讯,比较随意
    • 缺陷泛型是什么代码大量冗余,从构建参数目标、发送音讯、消费音讯、反常处理、日志记载、反常重试啥啥的都是自己w设计模式之禅indows体系搞,每个消费行列就会呈现优先级最高的运算符上面设计模式有哪些一切的过程。比方现在windows10激活密gitlab有一个订单处理github中文官网网页中心A接纳来自各种类型订单,此刻如B、优先级调度算法C两个原始订单来历想让A处理订单,那么B和Cwindows10都需求依Windows照A的要求进行调用,代码会冗余
  2. 第二种:点餐式服务
    • 长处:封优先级队列装了大部分一致的办法调用,比方 发送音讯、反常处理、日志记载等等都是重复的,封装后点优先级表餐的泛型list和一般list有什么区别人不win泛型通配符dows11有必要晋级吗需求再联系这一部分要怎样处理,只需求告知点餐服务要不要进行泛型办法反常、要不反常重试等windows7怎么重装系统gitlab,那么此刻关于点餐人来说只需求 付钱(调用服务)吃饭(消费音讯),除此之外windows10啥也不必管,设计模式面试题悉数由点餐服务供给者完成泛型办法一切上述两个过程外的其它windows10激活密钥操作
    • 缺陷优先级行列无法满足一切点餐人的要求,有的人喜欢味道重一点,有的泛型list和普通list有什么区别喜欢味道淡一点。可是这个缺陷彻底能够处理,比方点餐服务供给了自界说厨房(回来原始发送目标),此刻调用者能够依照第一种办法进行运用
  3. 选哪giti种?
    • 个人而言:事务体系杂乱的giti优先挑选第二种简单事务的挑选第一种(优先级是什么意思尽量采优先级表型编程用封装,后续保护方规划方式之禅便)。关于一个杂乱的体系,本身事务级的代码就现已很多了,成果还要每个人giticomfort是什么轮胎giticomfort是什么轮胎理悉数相同的东西,顾客越多代码冗余github泛型方法多。假如一个体系泛型的优点仅仅为了运用MQ来进行事务分离,消费windows体系者也不多,那么能够挑选最快的办法,可是泛型是什么最终会挑选第git设计模式的集体教学活动方案教程二种,假如事务跟着时刻添加越杂乱,越晚改成第二种花费的价值越大!
    • 第一种就比方此设计模式一般用来解决什么问题刻咱们要直接操作内存设计模式的两大主题,原生操作就比方C++或C,能够直接操作内存,可是一起用完后还要自己写各种反常处理和释放内存;代码封装就比方Java,咱们只需求告知Java咱们要运用内存,然后设计模式面试题用完就不必管
  4. 企业中,泛型是什么事务功用产出是一级优先优先级越小越优先吗级,在此之上才干有更高档的东西。技能服务于事务,而不是事务服务于技能git教程比方现在3giticomfort是什么轮胎0个人的体系,咱们要运用缓存加快访问,那么咱们是挑选 内部缓设计模式及其应用场景存(直设计模式的两大主题接用调集或许map存起来)仍是用Redis?
    • 内部缓存和Redis能不优先级队列能达到意图?能
    • 哪个更便利更快?内部缓存!内部目标就很快完成
    • 假如事务开展迟早会转为Redis这种专业的缓存中间件,就比方事务开展前第一种,泛型list和普通list有什么区别windows11有必要晋级吗务开展后挑选第二种,可是关于大部分优先级最高的运算符事务体系来说功用添加是很快的,特别是产品搭档上一分钟提需求下一分钟就要上线这windows10种(开个打趣~),所以咱们在引用一个技能需不需求进行二次封装时需求技能担任人对事务添加有一个预判。主张是都进行windwindows许可证即将过期怎么办ows体系封装一下

1.2 封装的抽离点

  1. 关于二次封规划方式的两大主题装,其间最首要的便是找出该结构在日优先级表常运用中所呈现的大部分涉及到的操作,然后找优先级出改动操作和不改动操作
  2. RocketMQ日常运用主优先级排序要场景为例:
    • 发送音讯git教程阶段:预备需求发送的音讯、发送音giti轮胎讯、记载原始音讯日志、发送失利处理、可靠性处理优先级行列
    • 消费git优先级是什么意思ee音讯泛型办法阶段:记载接纳音讯日优先级英文志、业优先级务处理、事务日志记载、反常处理、泛型是什么反常重试、反常告知、死信处理
  3. 提取改动点和不改动点(能够抽取为优先级排序c语言公共处理windows11有必要升级吗的场景)
    • 发送音讯windows怎么激活优先级是什么意思阶段:
      • 改动点:预备需求发送的音讯
      • 不改动点:发送音讯、记载原始音讯日志、发送失利处泛型擦除理、设计模式的集体教学活动方案可靠性处理
    • 消费音讯阶段
      • 改动点:事务处理、事务日志记载
      • 不改动点:记载接纳音讯日志、反常处理、反常重试、反常告知、死信处理
  4. 从上能够看到,规划方式关于RocketMQ的运用,大部分场景都是能够抽离成一个公共的办法处理,只需事务级的规划方式的两大主题需求自优先级调度算法己处理,所以假泛型list和普通list有什么区别如咱们把不改动场景抽取后,每个搭档只需求写自己事务相关部分即可
  5. 抽取后的杂乱度:关于新加一个顾客,只需求处github中文官网网页理业泛型办法务相关三个场景(预备需求发送的音讯、事务处理、事务日志记载)优先级行列,剩余的九个场景,只需求封装一次就能够。优先级越小越优先吗需求现在就几十个顾客,能够想想一些减少了多少代码冗余

1.3 规划方式的运用

  1. 要封装出一个好的笼统层,【规划方式】主张好好领会和学习一下
  2. 规划方式关于用优先级是什么意设计模式之禅不到的人来说比较虚幻,关于用的到的人来说,这windows体系个真牛X

二、二次封装中心关键

2.1 二次封装中心点

2.1.1 封装首要评论点

  1. 关于设计模式之禅RocketMQ或许说优先级排序关于整个MQ体系来说(不管是Rabb泛型的作用itMQ、RocketMQ、Ka泛型办法fka)等封装的中心首要有两个规划方式面试题:发送音讯、消费音讯者两个场景
  2. 关于RocketMQ咱们首要评论三个当地:RocketMQTe泛型是什么mplate封装、RocketMQListener封装和播送github中文官网网页g设计模式及其应用场景itee音讯的封装
  3. 播送音讯是散布式体系中一起让优先级c言语一切节点都干一件工作的一个好的办法,假如用不到忽略播送音讯即可

2.1.2 发送/消费的几种音讯实体

  1. RocketMQ发送音讯关于不同的运用来说,大部分挑优先级最高的运算符选下面的几种发送音讯类型
    • A、发优先级表送Json目标,比方Fastjson的JSONObwindows10激活密钥ject
    • B、直接发送转Json后的String目标
    • C、依据事务封装对应实体类
    • D、泛型list和一般list有什么区别直接运用原生MessageExt接github永久回家地址
  2. 怎样挑选?怎样挑选才是最优先级调度算git命令优?
    • 上面哪一种都能够达到意图,假如要一致封装就有必要要有一个标准
    • 怎样挑选只需求回答这个问题:在不看音讯发送者的状况泛型优先级是什么意思下,顾客怎样知道发送者发送的音讯意义?泛型擦除
    • 比方现在有一个订单优先级排序音讯,假如咱们不看音讯发送者,怎样windows7怎样重装优先级体系知道发送者给顾客发送哪些字段
      • A、B、D能够吗?一定不能够!JSON目标和String目标,假如咱们不看音讯发送者不或许知道到底发送了啥,这点我相信没有能够评论的当泛型擦除地,由于类型决议了这优先级调度算法个操作不或许
      • C能够吗?能够!此刻不需求看音讯发送者,只需求优先级看顾客的实体类点优先级调度算法进去,有哪些事务字段一览无余
      • 或许有杠要抬了,有看实体类的功夫,我看音讯发送者都看完了
        • 魂灵拷问1:假如音讯优先级是什么意思发送者和消费泛型者不在一个体系怎样看?邪魅一笑,不搭档务线或许没github中文官网网页代码权规划方式及其运用场景限吧?散布优先级排序c言语式体系彻底独泛型编程立或许吧?
        • 魂灵拷问2:假如现在需求一个功用,假如某些有必要要的字段音讯发送者假如没有给的话需求校验,一般String和JSONObject怎样完成?换成实体类呢?
  3. 依据上述评论点,封装主张依据实体类来,实体类不管是排查问题、新人了解体系代码、信息校验等String和JSON设计模式属于行为型的是Object无法像实体类相同轻松胜任

2.2 RocketMQTempwindows是什么意思late封装

2.泛型的作用2.1优先级调度算法 封装根底实体类

  1. 根底消wigitindows10息实体类封装了除了事务音讯外一切其他公共字段,首要看下面代码优先级表中的规划方式的两大主题字段和注释
  2. 根底笼优先级统音讯实体,包括根底的音讯优先级越小越优先吗、依据自己的事务音讯设置更多的字段
    • 其间也能够包括一切windows10顾客或许用得到的办法等,比方做些数据的加解密
package com.codecoord.rocketmq.domain;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.UUID;
/**
 * 根底音讯实体,包括根底的音讯
 * 依据自己的事务音讯设置更多的字段
 *
 * @author tianxincoord@163.com
 * @since 2022/6/16
 */
@Data
public abstract class BaseMqMessage {
    /**
     * 事务键,用于RocketMQ控制台检查消费状况
     */
    protected String key;
    /**
     * 发送音讯来历,用于排查问题
     */
    protected String source = "";
    /**
     * 发送时刻
     */
    protected LocalDateTime sendTime = LocalDateTime.now();
    /**
     * 盯梢id,用于slf4j等日志记载盯梢id,便利查询事务链
     */
    protected String traceId = UUID.randomUUID().toString();
    /**
     * 重试次数,用于判断重试次数,超越重试次数发送反常正告
     */
    protected Integer retryTimes = 0;
}
  1. 有了此根底笼统实体类,那么剩余的一切事务音讯实体只需求承继此基类,然后在自己事务类中包括自己需求设计模式的集体教学活动方案的字泛型是什么段即可,由于这些公共字段不管是向上转型仍是向下转型,子类和父类都能够看得到

2.2.2 RocketMQ规划方式面试题Tempgit教程late

  1. Rockewindows7怎样重装体系tMQTemplate发送音讯的代码假如不封装,咱们发送音讯需求这样
    • String destination = topic + “:” + tag;
    • template.syncSend(destinwindows是什么意思ation, message);
  2. 每个人发优先级表送音讯泛型的长处都要自己处理这个冒泛型号,直接传入top优先级最高的运算符ic和tag不香吗?依照抽离改动点中的改动点,只需消规划方式面试题息是改动的,除此之外的其他规设计模式面试题则交给封装类
  3. RocketMQTemplate首要封装发送泛型list和一般list有什么区别音讯的日志、反常的处理、音讯规划方式23方式介绍key设置、等等其他配置
  4. 封装代码类如下,下面包括了首要发送办法,更多优先级表自己添加即可
    • 这儿便是windows是什么意思音讯发giti轮胎送的点餐机器,一起也供给了封装优先级是什么意思办法也供给原始RocketMQTemplate供运用
    • 此处仅仅供给windows系统一种办法,生设计模式的两大主题泛型办法产中依照项目组商议决议
package com.codecoord.rocketmq.template;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * RocketMQ模板类
 *
 * @author tianxincoord@163.com
 * @since 2022/4/15
 */
@Component
public class RocketMqTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate template;
    /**
     * 获取模板,假如封装的办法不够供给原生的运用办法
     */
    public RocketMQTemplate getTemplate() {
        return template;
    }
    /**
     * 构建意图地
     */
    public String buildDestination(String topic, String tag) {
        return topic + RocketMqSysConstant.DELIMITER + tag;
    }
    /**
     * 发送同步音讯
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
        // 留意分隔符
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
    }
    public <T extends BaseMqMessage> SendResult send(String destination, T message) {
        // 设置事务键,此处依据公共的参数进行处理
        // 更多的其它根底事务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了便利检查给日志转了json,依据挑选挑选日志记载办法,例如ELK采集
        LOGGER.info("[{}]同步音讯[{}]发送成果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
    /**
     * 发送推迟音讯
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
    }
    public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        LOGGER.info("[{}]推迟等级[{}]音讯[{}]发送成果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
        return sendResult;
    }
}
  1. 这个类是最根底windows7旗舰版的原始封装类,相当于饭馆供给的点餐服务。上面供给无事务特性的发送,比方想要发送日志音讯或许动态发送音讯意图场景

3.2.3 增强RocketMQTemplate

  1. 以订单处理中心来说优先级排序,改动点仅仅仅仅单号等事务数据不相同,发往订单处理中心的音讯不管是topic仍是tag等等其实彻底都相同,那么此刻能够依据事务来gitee添加封装
  2. 增强原始功用需求留意下面两个点
    • 一切父类规划方式有哪些能呈现的当giti泛型擦除,子类都能呈现:也便是子类具有功用 >= 父类 ,比方Java的优先级List,只需入参是List的地泛型办法的界说和泛型的作用运用方,传ArrayList和Linkwindows10激活密钥edList彻底泛型擦除能够
    • 设计模式属于行为型的是强功用不能改动原泛型通配符始功用的行为:比方父类有一个办法say是说windows更新有必要吗话,成果子类覆写了sgiticomfort是什么轮胎ay改泛型办法成了行为是吃饭,然后当调用者调用sa泛型的效果y的时分得到了一个彻底预期外的规划方式的团体教育活动计划成果
  3. 就以订单中心音讯发规划方式一般用来处理什么问题送为例,泛型的效果封装OrderMgiteeessageTemplate承继自RocketMqTemplate,此刻泛型的效果前者就具有了封装父类的所泛型的效果有根底办法,具有了一切父类的功用。然后能够在前者添加本身事务特性的发送方规划方式的团体教育活动计划法,比方发送订单泛型编程处理音讯
package com.codecoord.rocketmq.template;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
 * 订单类发送音讯模板东西类
 *
 * @author tianxincode@163.com
 * @since 2022/6/16
 */
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
    /// 假如不采用承继也能够直接注入运用
    /* @Resource
    private RocketMqTemplate rocketMqTemplate; */
    /**
     * 入参只需求传入是哪个订单号和事务体音讯即可,其他操作依据需求处理
     * 这样关于调用者而言,能够愈加简化调用
     */
    public SendResult sendOrderPaid(@NotNull String orderId, String body) {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        message.setKey(orderId);
        message.setSource("订单支付");
        message.setMessage(body);
        // 这两个字段仅仅为了测验
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
    }
}
  • 此刻关于调用者只需求 orderMessageTwindows怎样激活emplate.sendOrderPaid(“O设计模式23模式介绍001″, “xxx”);就能够把音讯发送到订单处理中心
  1. 封装后泛型的优点,假如现在有10个订单来历,现在需求调整音讯发送格式,假如不进行封装那么10个规划方式来历发送的当地都需求改;假如进行了二次封装,windows怎样激活只需求改sendOrderPaid办法即可,并且还不会犯错,此刻优势就体现出来了

2.3 RocketMQListener封装

  1. RocketMQList设计模式之禅ener是gitlab消费音讯的中心,一起也涉及到更多的操作,比方:根底日志记载、反常处理、音讯重试、正告告知等等等
  2. 泛型是什么照抽离改动点,RocketMQListener只应该处理与本身事务相关的,除此之外的其它应该交给父类,子类只需求告知父类要不要反常处理泛型办法的界说和运用、要不要重试等等,点餐giti轮胎式服务
  3. 封装音讯消费的笼统类
    • 留意泛型限定为标准根底音讯类,这样能到顾客的一定有一致的标准类BaseMq泛型Message
    • 下面简单封装示例
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;
/**
 * 笼统音讯监听器,封装了一切公共处理事务,如
 * 1、根底日志记载
 * 2、反常处理
 * 3、音讯重试
 * 4、正告告知
 * 5、....
 *
 * @author tianxincoord@163.com
 * @since 2022/4/17
 */
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
    /**
     * 这儿的日志记载器是哪个子类的就会被哪个子类的类进行初始化
     */
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    /**
     * 音讯者称号
     *
     * @return 顾客称号
     */
    protected abstract String consumerName();
    /**
     * 音讯处理
     *
     * @param message 待处理音讯
     * @throws Exception 消费反常
     */
    protected abstract void handleMessage(T message) throws Exception;
    /**
     * 超越重试次数音讯,需求启用isRetry
     *
     * @param message 待处理音讯
     */
    protected abstract void overMaxRetryTimesMessage(T message);
    /**
     * 是否过滤音讯,例如某些
     *
     * @param message 待处理音讯
     * @return true: 本次音讯被过滤,false:不过滤
     */
    protected boolean isFilter(T message) {
        return false;
    }
    /**
     * 是否反常时重复发送
     *
     * @return true: 音讯重试,false:不重试
     */
    protected abstract boolean isRetry();
    /**
     * 消费反常时是否抛出反常
     *
     * @return true: 抛出反常,false:消费反常(假如没有敞开重试则音讯会被自动ack)
     */
    protected abstract boolean isThrowException();
    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默许10次
     */
    protected int maxRetryTimes() {
        return 10;
    }
    /**
     * isRetry敞开时,从头入队推迟时刻
     *
     * @return -1:当即入队重试
     */
    protected int retryDelayLevel() {
        return -1;
    }
    /**
     * 由父类来完成根底的日志和调配,下面的仅仅供给一个思路
     */
    public void dispatchMessage(T message) {
        MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
        // 根底日志记载被父类处理了
        logger.info("[{}]顾客收到音讯[{}]", consumerName(), JsonUtil.toJson(message));
        if (isFilter(message)) {
            logger.info("音讯不满足消费条件,已过滤");
            return;
        }
        // 超越最大重试次数时调用子类办法处理
        if (message.getRetryTimes() > maxRetryTimes()) {
            overMaxRetryTimesMessage(message);
            return;
        }
        try {
            long start = Instant.now().toEpochMilli();
            handleMessage(message);
            long end = Instant.now().toEpochMilli();
            logger.info("音讯消费成功,耗时[{}ms]", (end - start));
        } catch (Exception e) {
            logger.error("音讯消费反常", e);
            // 是捕获反常仍是抛出,由子类决议
            if (isThrowException()) {
                throw new RuntimeException(e);
            }
            if (isRetry()) {
                // 获取子类RocketMQMessageListener注解拿到topic和tag
                RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
                if (Objects.nonNull(annotation)) {
                    message.setSource(message.getSource() + "音讯重试");
                    message.setRetryTimes(message.getRetryTimes() + 1);
                    SendResult sendResult;
                    try {
                        // 假如音讯发送不成功,则再次从头发送,假如发送反常则抛出由MQ再次处理(反常时不走推迟音讯)
                        // 此处捕获之后,相当于此条音讯被音讯完成然后从头发送新的音讯
                        sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    // 发送失利的处理便是不进行ACK,由RocketMQ重试
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RuntimeException("重试音讯发送失利");
                    }
                }
            }
        }
    }
}
  1. 封装消费最终类
    • 留意:收到的音讯windowswindows许可证即将过期怎么办10是先委派给父类,父类进行调度管理
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
 * 实体类消费监听器,在完成RocketMQListener中间还加了一层BaseMqMessageListener来处理根底事务音讯
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定顾客线程数,默许64,出产中请留意配置,防止过大或许过小
        consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
                                         implements RocketMQListener<RocketMqEntityMessage> {
    /**
     * 此处仅仅阐明封装的思维,更多仍是要依据事务操作决议
     * 内功心法有了,无论什么招式都能够发挥最大威力
     */
    @Override
    protected String consumerName() {
        return "RocketMQ二次封装音讯顾客";
    }
    @Override
    public void onMessage(RocketMqEntityMessage message) {
        // 留意,此刻这儿没有直接处理事务,而是先委派给父类做根底操作,然后父类做完根底操作后会调用子类的实践处理类型
        super.dispatchMessage(message);
    }
    @Override
    protected void handleMessage(RocketMqEntityMessage message) throws Exception {
        // 此刻这儿才是最终的事务处理,代码只需求处理资源类封闭反常,其他的能够交给父类重试
        System.out.println("事务音讯处理");
    }
    @Override
    protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
        // 当超越指定重试次数音讯时此处办法会被调用
        // 出产中能够进行回退或其他事务操作
    }
    @Override
    protected boolean isRetry() {
        return false;
    }
    @Override
    protected int maxRetryTimes() {
        // 指定需求的重试次数,超越重试次数overMaxRetryTimesMessage会被调用
        return 5;
    }
    @Override
    protected boolean isThrowException() {
        // 是否抛出反常,到消费反常时是被父类阻拦处理仍是直接抛出反常
        return false;
    }
}
  1. 封装后关于子类来说,只需求告知父类要不要做就具有了最开端说的一切功用,简化了运用,windows体系此刻子类顾客只需求专注于自己的事务中心规划方式处理泛型方法的定义和使用就能够了

2.4 播送音讯的运用场景

  1. 运用场景:多租户或许服务有内部缓存需求改写状况下假如需优先级排序c言语要改写租户信息或许缓存信息
  2. 也便是需求一切服务节点优先级windows体系需求搭档做某一件工作的时分,此刻能够凭借播送音讯发送音讯到一切规划方式23方式介绍节点改写,无设计模式面试题需一个节点一个节泛型的作用点的处理
  3. gitee别说Git:播送音讯默许会在家目录下创立消费进展文件github永久回家地址,会以wwwindows10w.tianxincoord.com:9876@www.tianxincgitlaboord.com:987优先级是什么意思windows是什么意思6这种地址方式生成文件途径,可是由于带有:符号,windows下是不允许此符号作为文件夹称泛型方法号的,所以假如rocketMQ的链接地址不是连接gitlab串(不带有端口)能够取消下面的messageModel注释,不然启动的时分就会提示目标卷或许途径泛型办法优先级表的界说和运用不存在,其gitlabwindows10是由于这个问题
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
 * 播送音讯
 * 运用场景:多租户或许服务有内部缓存需求改写状况下假如需求改写租户信息或许缓存信息
 *      也便是需求一切服务节点都需求搭档做某一件工作的时分
 * 此刻能够凭借播送音讯发送音讯到一切节点改写,无需一个节点一个节点的处理
 *
 * 特别阐明:播送音讯默许会在家目录下创立消费进展文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876
 *      这种地址方式生成文件途径,可是由于带有:符号,windows下是不允许此符号作为文件夹称号的
 *      所以假如rocketMQ的链接地址不是连接串(不带有端口)能够取消下面的messageModel注释
 *      不然启动的时分就会提示目标卷或许途径不存在,其实是由于这个问题
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
        // messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {
    /**
     * MessageExt:内置的音讯实体,出产中依据需求自己封装实体
     */
    @Override
    public void onMessage(MessageExt message) {
        log.info("收到播送音讯【{}】", new String(message.getBody()));
    }
}

2.3 代码封装完结测验

封装测验大家能够直接参阅RocketMqCon规划方式面试题trolwindows10ler即可

package com.codecoord.rocketmq.controller;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
/**
 * 音讯发送
 *
 * @author tianxin01@huice.com
 * @since 2022/6/16
 */
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    /**
     * 留意此处注入的是封装的RocketMqTemplate
     */
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    /**
     * 注入对应事务的模板类
     */
    @Resource
    private OrderMessageTemplate orderMessageTemplate;
    /**
     * 经过实体类发送音讯,发送留意事项请参阅实体类
     * 阐明:也能够在RocketMqTemplate依照事务封装发送办法,这样只需求调用办法指定根底事务音讯接口
     */
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        // 设置事务key
        message.setKey(UUID.randomUUID().toString());
        // 设置音讯来历,便于查询we年
        message.setSource("封装测验");
        // 事务音讯内容
        message.setMessage("当时音讯发送时刻为:" + LocalDateTime.now());
        // Java时刻字段需求独自处理,不然会序列化失利
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
    }
    /**
     * 此刻关于调用者并且,无需创立任何类
     * 假如某天需求调整音讯发送来历,假如不封装,一切原来发生message的当地悉数改
     * 假如封装了,只需求改sendOrderPaid就能够切换
     */
    @RequestMapping("/order/paid")
    public Object sendOrderPaidMessage() {
        return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货");
    }
    /**
     * 直接将目标进行传输,也能够自己进行json转化后传输
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        // 出产中不引荐运用jsonObject传递,不看发送者无法知道传递的音讯包括什么信息
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
        // 假如要走内部办法发送则有必要要依照标准来,不然就运用原生的音讯发送
        return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
    }
}

发表评论

提供最优质的资源集合

立即查看 了解详情