在公司的实践事务场景中,初始化数据是个避不过去的论题。比方项目上线初期,将数据库的产品数据同步到搜索引擎 ElasticSearcgh 、缓存 Redis 或许其它的数据库。

在这种初始化同步的场景下,假如运用单线程同步,几千万或许上亿的数据或许需求同步很长时刻,而且其中一些细节知识点也很值得考究。怎么处理大数据深分页问题?怎么防止应用内存溢出等。

数据同步方针

一句话能够概述,产品数据全量初始化使命需求快速履行完结,其非有必要稳定。

  1. 防止 MySQL 大数据量读取形成的深分页问题。
  2. 初始化使命需求再短时刻内同步大批量数据。
  3. 躲避因读取过多数据量而形成内存溢出问题。

MySQL 深分页

让咱们先来评论深分页问题,这是一个经久不衰的问题。假如你还不了解 MySQL 深分页是什么,请持续阅读。

MySQL 大数据量深分页问题是指当运用 MySQL 数据库进行大数据量分页查询时,例如从一个包含数百万条记载的表中查询终究一小部分数据时,会呈现功能问题。

具体来说,当运用 LIMIT 和 OFFSET 关键字来分页查询时,MySQL 会将查询结果集中的一切记载都加载到内存中,然后再将指定的分页数据回来给客户端。这种办法在处理小数据集时是有用的,但关于大数据集来说,它会导致很多的内存运用和功能问题。

当 OFFSET 数量很大时,MySQL 有必要扫描并越过一切之前的记载,这会导致十分慢的查询速度。此外,当运用 InnoDB 存储引擎时,跟着 OFFSET 增加,I/O 操作会变得愈加频频,由于 MySQL 有必要从磁盘中读取更多的数据页。

因而,当需求进行大数据量深分页查询时,应该防止运用 OFFSET 和 LIMIT,而是运用类似于“根据游标的分页”或“分段查询”等技术来处理功能问题。

1. 书签相关

关于 LIMIT 深分页问题,中心在于 OFFSET 值,它会 导致 MySQL 扫描很多不需求的记载行然后扔掉掉。

咱们能够先运用书签 记载获取上次取数据的位置,下次就能够直接从该位置开端扫描,这样能够 防止运用 OFFEST。

假定需求查询 3000000 行数据后的第 1 条记载,查询能够这么写。

select * from product where id < 3000000 limit 1;

由于咱们的方针是同步一切产品库的数据,所以每次取 5000 条,记住最大的 ID,下次以该 ID 充任查询条件。

经过该办法完美处理深分页形成的功能损耗,也就是上文说的分段查询。

2. 流式查询

上面的分段查询办法在咱们流程中真的能够处理一切问题么?

直接得出结论:假如没有分表,该办法能够有用处理深度分页的问题。然而,一旦进行了分表,该办法将不再适用。这是由于产品表的分片键不是 ID 字段,假如履行此 SQL 语句,将会查询一切产品分表。

那么,就到了处理深分页的主角上台了,那就是流式查询。

流式查询经过在MySQL服务器端运用游标进行数据的逐行读取,能够防止一次性将一切数据加载到内存中,然后节省内存空间并提高查询功率。

简单来说,当你调用 MySQL 流式查询时,就像建立了一个“管道”一样,能够源源不断地将数据传输到客户端。

可是,流式查询不是银弹,当流式查询数据没有完全传输完时,当时的数据库衔接是独占的,无法被其它线程所运用

假如说需求频频进行流式查询操作,能够独自拆一个数据库衔接池,不要和正常事务逻辑共用一个衔接池。

出产消费模型

1. 模型界说

简单来说,出产者-顾客并发模型是由两类线程构成:

  • 出产者线程:“出产”产品,并把产品放到一个行列里;
  • 顾客线程:“消费”产品。

有了这个行列,出产者就只需求重视出产,而不必管顾客的消费行为,更不必等待顾客线程履行完;顾客也只管消费,不必管出产者是怎么出产的,更不必等着出产者出产。

出产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,顾客也在缓冲区耗费这些数据。出产者和顾客之间有必要保持同步,要确保出产者不会在缓冲区满时放入数据,顾客也不会在缓冲区空时耗费数据。

如何实现快速同步亿级商品数据至 Elasticsearch?

2. 堵塞行列完成

网上有很多种完成模式,根据 wait() / notify() 或许 await() / signal() 等等,这里直接经过堵塞行列充任产品容器。

import lombok.SneakyThrows;
​
import java.util.concurrent.LinkedBlockingQueue;
​
public class ProductSyncExecutor {
​
  private static final LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>(10);
​
  public static void main(String[] args) {
    Thread product1 = new Thread(new ProductProduce());
    Thread product2 = new Thread(new ProductProduce());
    Thread product3 = new Thread(new ProductProduce());
    Thread consumer1 = new Thread(new ProductConsumer());
    Thread consumer2 = new Thread(new ProductConsumer());
    Thread consumer3 = new Thread(new ProductConsumer());
    product1.setName("Thread-Product-1");
    product1.start();
    product2.setName("Thread-Product-2");
    product2.start();
    product3.setName("Thread-Product-3");
    product3.start();
    consumer1.setName("Thread-Consumer-1");
    consumer1.start();
    consumer2.setName("Thread-Consumer-2");
    consumer2.start();
    consumer3.setName("Thread-Consumer-3");
    consumer3.start();
   }
​
  public static class ProductProduce implements Runnable {
​
    @SneakyThrows
    @Override
    public void run() {
      while (true) {
        blockingQueue.put(new Object());
        System.out.println("【出产者:" + Thread.currentThread().getName()
            + "】放入一个产品,现容量:" + blockingQueue.size());
        Thread.sleep(100);
       }
     }
   }
​
  public static class ProductConsumer implements Runnable {
​
    @SneakyThrows
    @Override
    public void run() {
      while (true) {
        blockingQueue.take();
        System.out.println("【顾客:" + Thread.currentThread().getName()
            + "】消费了一个产品,现容量:" + blockingQueue.size());
        Thread.sleep(200);
       }
     }
   }
}

流式查询&并发编程

上述内容现已聊清楚了,产品数据从数据库查出来需求运用流式查询,防止一次性把数据读入形成的 OOM 以及大数据量情况下的深分页慢查询。

其次,经过出产者-顾客并发模型,由流式查询充任出产者,顾客由线程池内的消费线程组成。

终究套用到产品大数据量同步三方数据库的代码就变成以下这种形状。

1. 开发流式查询

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.ResultType;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.mapping.ResultSetType;
import org.apache.ibatis.session.ResultHandler;
import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;
​
/**
 * 产品 SKU 耐久层
 */
public interface ProductSkuMapper extends BaseMapper<ProductSkuDO> {
  
  /**
   * 经过流式查询的办法获取一切产品 SKU
   */
  @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
  @ResultType(ProductSkuDO.class)
  @Select("SELECT * FROM product_sku WHERE del_flag = '0'")
  void listAllProductSkuStreamQuery(ResultHandler<ProductSkuDO> handler);
}

2. 并发同步程序

事务流程如下:

  1. 界说堵塞行列为 LinkedBlockingQueue,并设置最大容量,防止同步过程中由于消费缓慢形成内存溢出;
  2. 流式查询履行回来数据插入到 blockingQueueCachePool 堵塞行列容器中;
  3. 假如堵塞行列容器已满,则堵塞出产者线程,等待顾客线程消费堵塞容器;
  4. 判别当时堵塞容器容量是否大于最小同步数量,假如大于则主张同步,不满足则越过本次流程;
  5. 假如流式查询现已将一切数据遍历一遍,那么 listAllProductSkuStreamQuery 流程就结束了。

Q:为什么会有兜底,将终究缓冲的使命履行操作?

A:为了防止频频调用 ElasticSearch,所以将每次同步最大数据设置为 5000。所以,当终究 一条记载履行完后,或许当时堵塞容器里的数量不足五千,那么就或许不会同步。

余下的这些产品数据,会由兜底使命统一取出并同步 ElasticSearch。

Q:线程池参数怎么设置?

A:中心线程数:CPU 中心线程数 / 0.2,最大线程数:中心线程数 + (中心线程数 / 2),堵塞行列:SynchronousQueue,回绝策略:CallerRunsPolicy,运用当时主线程运行线程池使命。

线程池运用了 Hippo4j,能够在实践事务运行中去评价这个参数是否合理,假如合理,那么就不必变。不合理的话,比方 CPU 飙升,履行时刻长等,再下降或许增加线程数。

多线程并发代码如下:

import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;
import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
​
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
​
/**
 * 初始化产品使命,经过并发编程完结出产-消费模型,到达快速同步的作用
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class InitializeProductJobHandlerTwo {
  
  /**
   * 产品 SKU 耐久层
   */
  private final ProductSkuMapper productSkuMapper;
  
  /**
   * Hippo4j 线程池,履行同步程序
   */
  private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor;
  
  /**
   * 单次同步 ElasticSearch 数量
   */
  private static final Integer MAX_SYNC_SIZE = 5000;
  
  /**
   * 堵塞行列最大容量,相当于一个缓冲池巨细
   */
  private static final Integer MAX_POOL_SIZE = 200000;
​
  /**
   * 记载同步
   */
  private static final AtomicInteger COUNT_NUM = new AtomicInteger(0);
  
  /**
   * 记载实践同步数量
   */
  private static final LongAdder SYNC_SUM = new LongAdder();
  
  @GetMapping("/init/product")
  public void execute() throws Exception {
    BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
    productSkuMapper.listAllProductSkuStreamQuery(resultContext -> {
      // 记载流式查询总数量
      COUNT_NUM.incrementAndGet();
      // 每次向缓冲池增加 MAX_SYNC_SIZE 记载
      try {
        blockingQueueCachePool.put(resultContext.getResultObject());
       } catch (Exception ex) {
        log.error("产品SKU根底数据初始化流程, 增加堵塞行列缓冲池失利, 数据记载: {}",
            JSON.toJSONString(resultContext.getResultObject()), ex);
       }
      // 防止恳求方针数据库(ElasticSearch 或其它)次数过多,所以主张每次 MAX_SYNC_SIZE 条数,虽然或许不行这个数
      if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) {
        productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool));
       }
     });
    // 兜底,将终究缓冲的使命履行
    productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool));
   }
  
  private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {
    List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE);
    try {
      int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE);
      if (drainTo > 0) {
        // 此处决定向何处同步数据
        // ......
        SYNC_SUM.add(drainTo);
       }
     } catch (Exception ex) {
      log.error("产品SKU根底数据初始化流程履行失利", ex);
     }
   }
  
  private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {
    List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList());
    try {
      SYNC_SUM.add(lastProductSkus.size());
      // 此处决定向何处同步数据
      // ......
     } catch (Exception ex) {
      log.error("产品SKU根底数据初始化流程履行终究一次同步失利", ex);
     }
   }
}

3. 使命进展监控

经过上述代码,现已能够完结咱们最初定的数据初始化同步方针。可是,总觉得缺点什么?

上亿条数据,明显不是短时刻内能够同步完结的。那同步过程中,进展从哪里查看?假如不知道进展的话,总感觉心里没底。

这种同步进展能够经过向 Redis 这种中间件写入自增指令,或许经过守时线程池固守时刻内打印同步进展,相对来说后者更容易些。

第二个版别更健壮的数据同步程序如下:

import com.alibaba.fastjson.JSON;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;
import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper;
import org.springframework.stereotype.Component;
​
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
​
/**
 * 初始化产品使命,经过并发编程完结出产-消费模型,到达快速同步的作用
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class InitializeProductJobHandler extends IJobHandler {
  
  /**
   * 产品 SKU 耐久层
   */
  private final ProductSkuMapper productSkuMapper;
  
  /**
   * Hippo4j 线程池,履行同步程序
   */
  private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor;
  
  /**
   * 单次同步 ElasticSearch 数量
   */
  private static final Integer MAX_SYNC_SIZE = 5000;
  
  /**
   * 堵塞行列最大容量,相当于一个缓冲池巨细
   */
  private static final Integer MAX_POOL_SIZE = 200000;
  
  /**
   * 记载开端时刻
   */
  private static Long START_TIME = 0L;
  
  /**
   * 记载同步
   */
  private static final AtomicInteger COUNT_NUM = new AtomicInteger(0);
  
  /**
   * 记载实践同步数量
   */
  private static final LongAdder SYNC_SUM = new LongAdder();
  
  /**
   * 打印输出监控守时器
   */
  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
  
  @XxlJob(value = "demoJobHandler")
  @Override
  public void execute() throws Exception {
    // 守时打印履行进展
    printPoolAndScheduledInfo();
    // 履行产品 SKU 同步程序
    executeProductSkuSync();
    // 释放守时器、同步线程池资源
    shutdownPoolAndPrintCountSize();
   }
  
  void executeProductSkuSync() {
    BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
    productSkuMapper.listAllProductSkuStreamQuery(resultContext -> {
      // 记载流式查询总数量
      COUNT_NUM.incrementAndGet();
      // 每次向缓冲池增加 MAX_SYNC_SIZE 记载
      try {
        blockingQueueCachePool.put(resultContext.getResultObject());
       } catch (Exception ex) {
        log.error("产品SKU根底数据初始化流程, 增加堵塞行列缓冲池失利, 数据记载: {}",
            JSON.toJSONString(resultContext.getResultObject()), ex);
       }
      // 防止恳求方针数据库(ElasticSearch 或其它)次数过多,所以主张每次 MAX_SYNC_SIZE 条数,虽然或许不行这个数
      if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) {
        productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool));
       }
     });
    // 兜底,将终究缓冲的使命履行
    productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool));
   }
  
  private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {
    List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE);
    try {
      int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE);
      if (drainTo > 0) {
        // 此处决定向何处同步数据
        // ......
        SYNC_SUM.add(drainTo);
       }
     } catch (Exception ex) {
      log.error("产品SKU根底数据初始化流程履行失利", ex);
     }
   }
  
  private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {
    List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList());
    try {
      SYNC_SUM.add(lastProductSkus.size());
      // 此处决定向何处同步数据
      // ......
     } catch (Exception ex) {
      log.error("产品SKU根底数据初始化流程履行终究一次同步失利", ex);
     }
   }
  
  private void printPoolAndScheduledInfo() {
    START_TIME = System.currentTimeMillis();
    SCHEDULED_EXECUTOR.scheduleAtFixedRate(() -> {
      log.info("产品SKU根底数据初始化流程, 当时已同步总数量: {}", COUNT_NUM.get());
      log.info("产品SKU根底数据初始化流程, 线程池状态打印, 当时活动线程数: {}, 当时排队使命数: {}, 履行完结线程数: {}, 线程池使命总数: {}",
          productSkuInitSyncThreadPoolExecutor.getActiveCount(),
          productSkuInitSyncThreadPoolExecutor.getQueue().size(),
          productSkuInitSyncThreadPoolExecutor.getCompletedTaskCount(),
          productSkuInitSyncThreadPoolExecutor.getTaskCount());
     }, 30, 10, TimeUnit.SECONDS);
   }
  
  private void shutdownPoolAndPrintCountSize() {
    // 封闭守时器线程池
    SCHEDULED_EXECUTOR.shutdown();
    // 封闭数据同步线程池
    productSkuInitSyncThreadPoolExecutor.shutdown();
    while (true) {
      if (SCHEDULED_EXECUTOR.isTerminated() && productSkuInitSyncThreadPoolExecutor.isTerminated()) {
        log.info("产品SKU根底数据初始化流程, 总条数: {}, 同步成功数: {}, 同步履行总耗时: {}",
            COUNT_NUM.get(),
            SYNC_SUM.longValue(),
            System.currentTimeMillis() - START_TIME);
        break;
       }
     }
   }
}

仔细的同学或许发现,除了增加守时使命线程池外,还增加调用 shutdownPoolAndPrintCountSize 封闭线程池资源的流程。运用过且不会再运用的资源,及时封闭可释放系统相关资源,这是个很好的编码习惯。

4. 功能报告

之前出产环境经过文章中的流程跑过,大约一千多万的数据两分钟左右就搞定了。

咱们在进行运用上述程序的时候,服务器资源、线程数量以及 ElasticSearch 配置都是功能考量点,每一个步骤都会影响终究完结时刻,需求不断模仿各种参数进行逐步骤优。

文末总结

在本文中,咱们评论了怎么快速同步亿级产品数据到三方数据库的问题。

为了完成高效的数据同步,咱们采用了并发编程和处理深分页问题的办法。具体来说,咱们经过运用线程池和多线程技术,对数据同步过程进行了并发处理,然后提高了同步功率。同时,为了处理深分页问题,咱们采用了 MySQL 流式查询技术,防止了一次性将很多数据加载到内存中的问题,提高了查询功率和减少了内存占用。

总之,经过合理的并发编程和深度分页技术,咱们能够在处理很多数据时提高功率和功能,并有用地处理数据同步的问题。这些办法和技术能够在处理其他类似的大规模数据问题时发挥重要作用。

近期文章精选

  • # 线程池这样用,架构师看了都说好

  • # 美团:某动态线程池框架是官方开源的么?

  • # 发现一个商城开源项目,谷粒商郊外又多了个挑选