前言

mysql update的时分到底是锁行仍是锁表?

InnoDb 锁简略分类

    1. 依照数据操作的粒度 1)行级锁:锁住记录行 2)表级锁:锁住整张表
    1. 依照对数据操作的类型 1)读锁(共享锁):针对同一份数据,多个读操作能够一起进行而不会相互影响。 2) 写锁(排它锁):当时操作没有完结之前,它会阻断其他写锁和读锁。
    mysql 在update时会依据where 条件的类型决议锁行仍是锁表 where的过滤条件列,假如用索引,锁行,无法用索引,锁表。依照索引规则,假如能运用索引,锁行,不能运用索引,锁表。 行锁是排他锁,当一条记录已经被一条update句子锁住时会阻断其他的update操作,在高并发场景下,关于热门数据来说会进行频频的更新操作形成其他update操作锁等待超时恳求失败

背景

以旅行支付场景为例,伴随着业务量的添加,系统的并发量会逐渐上升,例如“北京长城休假区”的账户流水会变得十分频频,每次支付或许退款操作都需求去更新一下账户余额,并发较低时并不会有什么问题,但当旅行高峰期到来时并发量上升,数据库更新的时分需求获得数据行锁,在未开释这个行锁之前,其他业务只能是等待。

解决方案

1.支付时异步入账,退款添加一个欠款垫资户

场景剖析

用户支付入款需求给账户加钱时此时商户关于资金的实时性要求不高,寻求精确性,因而能够将账户加款放到异步线程池,到达错峰的目的 可是当用户建议退款时,我们有必要及时而且精确的从账户扣款,因而退款采纳同步进行,退款的订单相关于支付来说量就会少很多,满足要求。可是存在一个问题高并发状态下某一个热门账户余额时刻在变很有可能退款建议时账户余额足够可是实践扣除时由于上一笔支付未入账,形成金额缺乏

解决方法

1.加分布式锁

对特定账户加锁,保证某一刻只要一笔退款恳求获得该账户的操作权 坏处:多个用时一起退款时只要一笔成功,对用户不友好,pass掉

2.新增垫资商户

热门账户添加一个指定透支额度的垫资户,实践账户余额缺乏时从垫资户告贷,然后守时核对垫资户透支额度从实践账户一次性扣款, 推荐

2.兼并恳求

兼并多条需求更新余额的恳求

将一段时间内的恳求,先进行堵塞,兼并各个账户需求更新的金额,一次性处理,然后将成果拆分,唤醒被堵塞的恳求

java高并发热点数据更新

demo完成

/**
 * @Author: xiaokunkun
 * @CreateTime: 2023-04-23  14:37
 * @Description: 兼并更新,能够不捕捉异常报错后外层调用方直接捕获异常业务回滚
 */
@Service
public class CommodityAmountService {
    class Request {
        AcctUpdateDto acctUpdateDto;
        //预留字段 可不运用
        String atomCode;
        //暂定回来成果为true或许false
        CompletableFuture<Boolean> future; // 承受成果
    }
    // 积累恳求(每隔N毫秒批量处理一次)
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    // 守时使命的完成,N秒钟处理一次数据
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 1、取出queue的恳求,生成一次兼并更新
            int size = queue.size();
            if (size == 0) {
                return;
            }
            ArrayList<Request> requests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //队列出栈
                Request request = queue.poll();
                requests.add(request);
            }
            System.out.println("批量处理数据量:" + size);
            // 2、拼装一个兼并更新 key为账户value为sum(amount)
            Map<String, Long> amountMap = new HashMap<>();
            ArrayList<String> commodityCodes = new ArrayList<>();
            for (Request request : requests) {
                //todo 依据accountNo分组
            }
            for (String key : amountMap.keySet()) {
                Long amount = amountMap.get(key);
                //update mysql
            }
            // 3、将成果响应 分发给每一个独自的用户恳求。由守时使命处理线程 --> n个用户的恳求线程
            for (Request request : requests) {
                // 将成果回来到对应的恳求线程,只要不报错此批次全部回来true,否则false
                request.future.complete(true);
            }}, 0, 1000, TimeUnit.MILLISECONDS);
    }
    @Autowired
    CommodityRemoteService commodityRemoteService;
    // 兼并金额并更新,多个用户恳求
    public Boolean updateMergeAmount(String movieCode)
            throws ExecutionException, InterruptedException {
        // 并非马上建议接口调用,恳求搜集起来,再进行
        Request request = new Request();
        request.atomCode = movieCode;
        // 异步编程:获取异步处理的成果
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        return future.get(); // 此处get方法,会堵塞线程运行,直到future有回来
    }
}

测验类:

@Test
//模仿500的并发量
public void updateMerge() {
    AcctCmdDriver acctCmdDriver = new AcctCmdDriver();
    TradeAccntOrderDetail detail = new TradeAccntOrderDetail();
    AcctNoInfo acctNoInfo = new AcctNoInfo();
    OrderConsist consistForOrder = OrderConsist.newInstance("0200_202304", "trade_accnt_merchant_order");
    detail.setConsistForOrder(consistForOrder);
    acctCmdDriver.setDetail(detail);
    acctCmdDriver.setAcctNoInfo(acctNoInfo);
    detail.setAccountCategory(AccountCategoryEnum.MERCHANT);
    detail.setAccountNo("02020001010000262977202304");
    detail.setAmount(6l);
    System.out.println("start build thread" + acctCmdDriver);
    Random rand = new Random();
    for (int i = 1; i <= 500; i++) {
        final String index = "code_" + i;
        Thread thread = new Thread(() -> {
            try {
                System.out.println("amount is:" + detail.getAmount());
                countDownLatch.await();
                Thread.sleep(rand.nextInt(150));
                Boolean res = updateMergeAmountService.mergeUpdate(acctCmdDriver);
                System.out.println("current i" + index + "res:" + res);
            } catch (InterruptedException e) {
                System.out.println("thread error is:" + e);
            }
        });
        thread.start();
        // 启动后,倒计时器倒计数减一,代表又有一个线程就绪了
        countDownLatch.countDown();
    }
}