Java多线程批量操作,居然有人不做事务控制?

前语

公司业务中遇到一个需求,需求一起修正最多约5万条数据,并且还不支持批量或异步修正操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。

详细操作如下:

一、循环操作的代码

先写一个最简略的for循环代码,看看耗时状况怎么样。

/***
 * 一条一条顺次对50000条数据进行更新操作
 * 耗时:2m27s,1m54s
 */
@Test
void updateStudent() {
    List<Student> allStudents = studentMapper.getAll();
    allStudents.forEach(s -> {
        //更新教师信息
        String teacher = s.getTeacher();
        String newTeacher = "TNO_" + new Random().nextInt(100);
        s.setTeacher(newTeacher);
        studentMapper.update(s);
    });
}

循环修正整体耗时约 1分54秒,且代码中没有手动业务操控应该是主动业务提交,所以每次操作业务都会提交所以操作比较慢,咱们先对代码中增加手动业务操控,看查询功率怎样。

二、运用手动业务的操作代码

修正后的代码如下:

@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
 * 因为希望更新操作 一次性完结,需求手动操控增加业务
 * 耗时:24s
 * 从测验结果能够看出,增加业务后插入数据的功率有明显的提高
 */
@Test
void updateStudentWithTrans() {
    List<Student> allStudents = studentMapper.getAll();
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    try {
        allStudents.forEach(s -> {
            //更新教师信息
            String teacher = s.getTeacher();
            String newTeacher = "TNO_" + new Random().nextInt(100);
            s.setTeacher(newTeacher);
            studentMapper.update(s);
        });
        dataSourceTransactionManager.commit(transactionStatus);
    } catch (Throwable e) {
        dataSourceTransactionManager.rollback(transactionStatus);
        throw e;
    }
}

增加手动业务操操控后,整体耗时约 24秒,这相关于主动业务提交的代码,快了约5倍,关于大量循环数据库提交操作,增加手动业务能够有用提高操作功率。

三、测验多线程进行数据修正

增加数据库手动业务后操作功率有明细提高,但还是比较长,接下来测验多线程提交看是不是能够再快一些。

先增加一个Service将批量修正操作整合一下,详细代码如下:

StudentServiceImpl.java

@Service
public class StudentServiceImpl implements StudentService {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Autowired
    private TransactionDefinition transactionDefinition;
    @Override
    public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        System.out.println("子线程:" + Thread.currentThread().getName());
        try {
            students.forEach(s -> {
                // 更新教师信息
                // String teacher = s.getTeacher();
                String newTeacher = "TNO_" + new Random().nextInt(100);
                s.setTeacher(newTeacher);
                studentMapper.update(s);
            });
            dataSourceTransactionManager.commit(transactionStatus);
            threadLatch.countDown();
        } catch (Throwable e) {
            e.printStackTrace();
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }
}

批量测验代码,咱们采用了多线程进行提交,修正后测验代码如下:

@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Autowired
private StudentService studentService;
/**
 * 对用户而言,27s 任是一个较长的时刻,咱们测验用多线程的方法来经行修正操作看能否加快处理速度
 * 估计创立10个线程,每个线程进行5000条数据修正操作
 * 耗时计算
 * 1 线程数:1      耗时:25s
 * 2 线程数:2      耗时:14s
 * 3 线程数:5      耗时:15s
 * 4 线程数:10     耗时:15s
 * 5 线程数:100    耗时:15s
 * 6 线程数:200    耗时:15s
 * 7 线程数:500    耗时:17s
 * 8 线程数:1000    耗时:19s
 * 8 线程数:2000    耗时:23s
 * 8 线程数:5000    耗时:29s
 */
@Test
void updateStudentWithThreads() {
    //查询总数据
    List<Student> allStudents = studentMapper.getAll();
    // 线程数量
    final Integer threadCount = 100;
    //每个线程处理的数据量
    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
    // 创立多线程处理使命
    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
    CountDownLatch threadLatchs = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
        // 每个线程处理的数据
        List<Student> threadDatas = allStudents.stream()
                .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
        studentThreadPool.execute(() -> {
            studentService.updateStudents(threadDatas, threadLatchs);
        });
    }
    try {
        // 倒计时锁设置超时时刻 30s
        threadLatchs.await(30, TimeUnit.SECONDS);
    } catch (Throwable e) {
        e.printStackTrace();
    }
    System.out.println("主线程完结");
}

多线程提交修正时,咱们测验了不同线程数对提交速度的影响,详细能够看下面表格,

Java多线程批量操作,居然有人不做事务控制?

多线程修正50000条数据时 不同线程数耗时比照(秒)

依据表格,咱们线程数增大提交速度并非一向增大,在当前状况下约在2-5个线程数时,提交速度最快(实践线程数还是需求依据服务器装备实践测验)。

四、根据两个CountDownLatch操控多线程业务提交

因为多线程提交时,每个线程业务时独自的,无法确保一致性,咱们测验给多线程增加业务操控,来确保每个线程都是在插入数据完结后在提交业务,

这里咱们运用两个 CountDownLatch 来操控主线程与子线程业务提交,并设置了超时时刻为 30 秒。咱们对代码进行了一点修正:

@Override
public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    System.out.println("子线程:" + Thread.currentThread().getName());
    try {
        students.forEach(s -> {
            // 更新教师信息
            // String teacher = s.getTeacher();
            String newTeacher = "TNO_" + new Random().nextInt(100);
            s.setTeacher(newTeacher);
            studentMapper.update(s);
        });
    } catch (Throwable e) {
        taskStatus.setIsError();
    } finally {
        threadLatch.countDown(); // 切换到主线程履行
    }
    try {
        mainLatch.await();  //等候主线程履行
    } catch (Throwable e) {
        taskStatus.setIsError();
    }
    // 判别是否有过错,如有过错 就回滚业务
    if (taskStatus.getIsError()) {
        dataSourceTransactionManager.rollback(transactionStatus);
    } else {
        dataSourceTransactionManager.commit(transactionStatus);
    }
}
/**
 * 因为每个线程都是独自的业务,需求增加对线程业务的统一操控
 * 咱们这边运用两个 CountDownLatch 对子线程的业务进行操控
 */
@Test
void updateStudentWithThreadsAndTrans() {
    //查询总数据
    List<Student> allStudents = studentMapper.getAll();
    // 线程数量
    final Integer threadCount = 4;
    //每个线程处理的数据量
    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
    // 创立多线程处理使命
    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
    CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
    CountDownLatch mainLatch = new CountDownLatch(1); // 用于判别主线程是否提交
    StudentTaskError taskStatus = new StudentTaskError(); // 用于判别子线程使命是否有过错
    for (int i = 0; i < threadCount; i++) {
        // 每个线程处理的数据
        List<Student> threadDatas = allStudents.stream()
                .skip(i * dataPartionLength).limit(dataPartionLength)
                .collect(Collectors.toList());
        studentThreadPool.execute(() -> {
            studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
        });
    }
    try {
        // 倒计时锁设置超时时刻 30s
        boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
        if (!await) { // 等候超时,业务回滚
            taskStatus.setIsError();
        }
    } catch (Throwable e) {
        e.printStackTrace();
        taskStatus.setIsError();
    }
    mainLatch.countDown(); // 切换到子线程履行
    studentThreadPool.shutdown(); //封闭线程池
    System.out.println("主线程完结");
}

本想再次测验一下不同线程数对履行功率的影响时,发现当线程数超越10个时,履行时就报错。详细过错内容如下:

Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
 at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
 at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
 at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
 at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
 ... 7 more

过错的大致意思时,不能为数据库业务翻开jdbc Connection,衔接在30s的时分超时了。因为前面启动的十个线程需求等候主线程完结后才能提交,所以一向占用衔接未开释,造成后边的进程创立衔接超时。

看过错日志中过错的来历是HikariPool,咱们来重新装备一下这个衔接池的参数,将最大衔接数修正为100,详细装备如下:

# 衔接池中答应的最小衔接数。缺省值:10
spring.datasource.hikari.minimum-idle=10
# 衔接池中答应的最大衔接数。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
# 主动提交
spring.datasource.hikari.auto-commit=true
# 一个衔接idle状况的最大时长(毫秒),超时则被开释(retired),缺省:10分钟
spring.datasource.hikari.idle-timeout=30000
# 一个衔接的生命时长(毫秒),超时并且没被运用则被开释(retired),缺省:30分钟,主张设置比数据库超时时长少30秒
spring.datasource.hikari.max-lifetime=1800000
# 等候衔接池分配衔接的最大时长(毫秒),超越这个时长还没可用的衔接则发生SQLException, 缺省:30秒

再次履行测验发现没有报错,修正线程数为20又履行了一下,同样履行成功了。

五、根据TransactionStatus调集来操控多线程业务提交

在同事引荐下咱们运用业务调集来进行多线程业务操控,主要代码如下

@Service
public class StudentsTransactionThread {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private StudentService studentService;
    @Autowired
    private PlatformTransactionManager transactionManager;
    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public void updateStudentWithThreadsAndTrans() throws InterruptedException {
        //查询总数据
        List<Student> allStudents = studentMapper.getAll();
        // 线程数量
        final Integer threadCount = 2;
        //每个线程处理的数据量
        final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
        // 创立多线程处理使命
        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        AtomicBoolean isError = new AtomicBoolean(false);
        try {
            for (int i = 0; i < threadCount; i++) {
                // 每个线程处理的数据
                List<Student> threadDatas = allStudents.stream()
                        .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                studentThreadPool.execute(() -> {
                    try {
                        try {
                            studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
                        } catch (Throwable e) {
                            e.printStackTrace();
                            isError.set(true);
                        }finally {
                            threadLatchs.countDown();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        isError.set(true);
                    }
                });
            }
            // 倒计时锁设置超时时刻 30s
            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
            // 判别是否超时
            if (!await) {
                isError.set(true);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            isError.set(true);
        }
        if (!transactionStatuses.isEmpty()) {
            if (isError.get()) {
                transactionStatuses.forEach(s -> transactionManager.rollback(s));
            } else {
                transactionStatuses.forEach(s -> transactionManager.commit(s));
            }
        }
        System.out.println("主线程完结");
    }
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
    // 运用这种方法将业务状况都放在同一个业务里面
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物阻隔等级,开启新业务,这样会比较安全些。
    TransactionStatus status = transactionManager.getTransaction(def); // 取得业务状况
    transactionStatuses.add(status);
    students.forEach(s -> {
        // 更新教师信息
        // String teacher = s.getTeacher();
        String newTeacher = "TNO_" + new Random().nextInt(100);
        s.setTeacher(newTeacher);
        studentMapper.update(s);
    });
    System.out.println("子线程:" + Thread.currentThread().getName());
}

因为这个中方法去前面方法相同,需求等候线程履行完结后才会提交业务,所有任会占用Jdbc衔接池,假如线程数量超越衔接池最大数量会发生衔接超时。所以在运用过程中任要操控线程数量,

六、运用union衔接多个select完成批量update

有些状况写不支持,批量update,但支持insert 多条数据,这个时分可测验将需求更新的数据拼接成多条select 句子,然后运用union 衔接起来,再运用update 关联这个数据进行update,详细代码演示如下:

update student,(
 (select  1 as id,'teacher_A' as teacher) union
 (select  2 as id,'teacher_A' as teacher) union
 (select  3 as id,'teacher_A' as teacher) union
 (select  4 as id,'teacher_A' as teacher)
    /* ....more data ... */
    ) as new_teacher
set
 student.teacher=new_teacher.teacher
where
 student.id=new_teacher.id

这种方法在Mysql 数据库没有装备allowMultiQueries=true也能够完成批量更新。

总结

  • 关于大批量数据库操作,运用手动业务提交能够很多程度上提高操作功率
  • 多线程对数据库进行操作时,并非线程数越多操作时刻越快,按上述示例大约在2-5个线程时操作时刻最快。
  • 关于多线程堵塞业务提交时,线程数量不能过多。
  • 假如能有办法完成批量更新那是最好