一、什么是 Redis 音讯行列?

字面意思就是寄存音讯的行列。最简略的音讯行列模型包括3个人物:

  • 音讯行列:存储和管理音讯,也被称为音讯署理(Message Broker)
  • 生产者:发送音讯到音讯行列
  • 顾客:从音讯行列获取音讯并处理音讯

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

使用行列的优点在于 解耦 免除数据之间的耦合性

这儿最好的是使用MQ、RabbitMQ、RocketMQ、Kafka等音讯行列,咱们本节主要介绍 Redis 的音讯行列。

二、Redis 音讯行列 — 根据 Redis List 完结音讯行列

根据List结构模仿音讯行列

音讯行列(Message Queue):字面意思就是寄存音讯的行列。而Redis的list数据结构是一个双向链表,很简略模仿出行列作用。

行列是入口和出口不在一边,咱们能够通过 LPush、RPOP、RPush、LPOP 这些来完结。

留意 : 假如获取 LPOP、RPOP获取音讯假如没有的话,会直接回来null,所以咱们使用堵塞:BLPOP、BRPOP来完结堵塞作用

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

根据List 结构的音讯行列的优缺点?

优点:

  • 使用Redis存储、不受限于JVM 内存上限
  • 根据Redis 的持久化机制、数据安全性有保障
  • 能够满足音讯有序性

缺点:

  • 无法防止音讯丢掉
  • 只支撑单顾客

三、Redis 音讯行列 — 根据 Pubsub 的音讯行列

PubSub(发布订阅)Redis2.0版别引进的音讯传递模型

顾名思义,顾客能够订阅一个或多个channel,生产者向对应channel发送音讯后,一切订阅者都能收到相关音讯。

Pubsub 常用命令

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送音讯
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的一切频道

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

根据PubSub的音讯行列有哪些优缺点? 优点

  • 采用发布订阅模型,支撑多生产、多消费

缺点

  • 不支撑数据持久化
  • 无法防止音讯丢掉
  • 音讯堆积有上限,超出时数据丢掉

四、根据Redis 的Stream 的消费行列

Stream 是 Redis 5.0 引进的一种新数据类型,能够完结一个功用非常完善的音讯行列。

⛅Stream 简略语法

Stream 常用语法:

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

例如

创立为 users 的音讯行列,并向其间发送一条音讯 使用Redis 主动生成id

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

读取音讯的方法之一:XRead

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

使用 XRead 读取一个音讯

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

XRead 堵塞方法,读取最新的音讯

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

事务开发中,咱们能够循环的调用XREAD堵塞方法来查询最新音讯,从而完结继续监听行列的作用

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

留意: 当咱们指定开始ID 为 $ 时,代表读取最新的音讯,假如咱们处理一条音讯的过程中,又有超过1条以上的音讯到达行列,则下次获取的也是只有最新的一条,会出现音讯漏读的问题

STREAM类型音讯行列的XREAD命令特色

  • 音讯可回溯
  • 一个音讯能够被多个顾客读取
  • 能够堵塞读取
  • 有音讯漏读的危险

⚡Stream 的顾客组

顾客组(Consumer Group):将多个顾客划分到一个组中,监听同一个行列。具有下列特色:

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

创立顾客组:

XGROUP CREATE key groupName ID [MKSTREAM]
  • key:行列称号
  • groupName:顾客组称号
  • ID:开始ID标明,$代表行列中最终一个音讯,0则代表行列中第一个音讯
  • MKSTREAM:行列不存在时主动创立行列

其它常用命令

删除指定的顾客组

XGROUP DESTORY key groupName

给指定的顾客组添加顾客

XGROUP CREATECONSUMER key groupname consumername

删除顾客组中的指定顾客

XGROUP DELCONSUMER key groupname consumername

从顾客组读取音讯:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组称号
  • consumer:顾客称号,假如顾客不存在,会主动创立一个顾客
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有音讯时最长等待时间
  • NOACK:无需手动ACK,获取到音讯后主动确认
  • STREAMS key:指定行列称号
  • ID:获取音讯的开始ID:

>“:从下一个未消费的音讯开始 其它:根据指定id从pending-list中获取已消费但未确认的音讯,例如0,是从pending-list中的第一个音讯开始

顾客监听音讯的基本思路:

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

STREAM类型音讯行列的XREADGROUP命令特色:

  • 音讯可回溯
  • 能够多顾客争抢音讯,加速消费速度
  • 能够堵塞读取
  • 没有音讯漏读的危险
  • 有音讯确认机制,保证音讯至少被消费一次

三种音讯行列对比

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

五、根据Redis Stream音讯行列完结异步秒杀

需求:

  • 创立一个Stream类型的音讯行列,名为stream.orders
  • 修正之前的秒杀下单Lua脚本,在认定有抢购资历后,直接向stream.orders中添加音讯,内容包括voucherId、userId、orderId
  • 项目启动时,敞开一个线程使命,测验获取stream.orders中的音讯,完结下单

修正 seckill.lua 脚本

-- 1.3.订单id
local orderId = ARGV[3]
-- 3.6.发送音讯到行列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

修正VoucherOrderService

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在类初始化之后履行,因为当这个类初始化好了之后,随时都是有可能要履行的
@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
/**
     * 使用 Redis音讯行列建立 读行列、编写下订单使命
     */
private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取音讯行列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );
                // 2.判别订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 假如为null,阐明没有音讯,继续下一次循环
                    continue;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创立订单
                createVoucherOrder(voucherOrder);
                // 4.确认音讯 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单反常", e);
                //处理反常音讯 去 Pading-List读取音讯
                handlePendingList();
            }
        }
    }
}
/**
     *  Redis音讯行列出现反常,调用此方法去 Pading—List中从头读取
     */
private void handlePendingList() {
    while (true) {
        try {
            // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                Consumer.from("g1", "c1"),
                StreamReadOptions.empty().count(1),
                StreamOffset.create("stream.orders", ReadOffset.from("0"))
            );
            // 2.判别订单信息是否为空
            if (list == null || list.isEmpty()) {
                // 假如为null,阐明没有反常音讯,完毕循环
                break;
            }
            // 解析数据
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
            // 3.创立订单
            createVoucherOrder(voucherOrder);
            // 4.确认音讯 XACK
            stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
        } catch (Exception e) {
            log.error("处理pendding订单反常", e);
            try{
                Thread.sleep(20);
            }catch(Exception ee){
                ee.printStackTrace();
            }
        }
    }
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //1.获取用户
    Long userId = voucherOrder.getUserId();
    // 2.创立锁目标
    RLock lock = redissonClient.getLock("lock:order:" + userId);
    // 3.测验获取锁
    boolean isLock = lock.tryLock();
    // 4.判别是否取得锁成功
    if (!isLock) {
        // 获取锁失利,直接回来失利或者重试
        log.error("不允许重复下单!");
        return;
    }
    try {
        //留意:由所以spring的事务是放在threadLocal中,此刻的是多线程,事务会失效
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        // 开释锁
        lock.unlock();
    }
}
// 署理目标
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
    //获取用户
    Long userId = UserHolder.getUser().getId();
    //生成订单ID
    long orderId = redisIdWorker.nextId("order");
    // 1.履行lua脚本
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    int r = result.intValue(); // 转成int
    // 2.判别结果是否为0
    if (r != 0) {
        // 2.1.不为0 ,代表没有购买资历
        return Result.fail(r == 1 ? "库存缺乏" : "不能重复下单");
    }
    //3.获取署理目标
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    //4.回来订单id
    return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder (VoucherOrder voucherOrder){
    // 5.一人一单逻辑
    // 5.1.用户id
    Long userId = voucherOrder.getUserId();
    // 判别是否存在
    int count = query().eq("user_id", userId)
        .eq("voucher_id", voucherOrder.getId()).count();
    // 5.2.判别是否存在
    if (count > 0) {
        // 用户现已购买过了
        log.error("用户现已购买过了");
    }
    //6,扣减库存
    boolean success = seckillVoucherService.update()
        .setSql("stock= stock -1") //set stock = stock -1
        .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update(); //where id = ? and stock > 0
    // .eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?
    if (!success) {
        //扣减库存
        log.error("库存缺乏!");
    }
    save(voucherOrder);
}

六、程序测验

ApiFox 简略测验

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

请求成功,完结基本测验,下面康复数据库,进行压力测验

Jmeter 压力测验

Jmeter测验

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

检查Redis

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

检查MySQL

微服务Spring Boot 整合Redis 基于Redis的Stream 消息队列 实现异步秒杀下单

⛵小结

以上就是【Bug 终结者】对 微服务Spring Boot 整合Redis 根据Redis的Stream 音讯行列 完结异步秒杀下单 的简略介绍,在分布式系统下,高并发的场景下,使用音讯行列来完结秒杀下单,可见功能提升了很大! 在开发中,咱们仍是使用MQ比较多一点的,Redis 音讯行列作为拓宽,本次秒杀下单系列到此就更新完毕啦! 如有需要源码的,可去公众号获取!

假如这篇【文章】有帮助到你,期望能够给【Bug 终结者】点个赞,创作不易,假如有对【后端技能】、【前端范畴】感兴趣的小可爱,也欢迎重视❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收成与惊喜】!