0.布景

现在有一个大数据渠道,咱们需求经过spark对hive里的数据读取清洗转化(etl)再加其它的事务操作的进程,然后需求把这批数据落地到tbase数据库(腾讯的一款分布式数据库)。
数据导入的特点是不守时,但量大。每次导入的数据量在几亿到几十亿上百亿之间。
假如运用dataset.write的方法写入,spark内部也是运用的sql connection以jdbc的方法进行写入。在这样的数据量之下,会非常慢,慢到彻底无法承受。

经研讨,tbase底层为pgsql,支持以文件的方法copy写入。
语法为:

COPY table FROM '/mnt/g/file.csv' WITH CSV HEADER;

这样效率高了许多。

经过测试,十亿等级的数据在半小时单位就能够写入。当然,建立了索引,以及跟着表数据量的增大,写入效率会降低,但彻底能够承受。

那么,现在便是运用spark读取hive,经过处理,再dataset.repartion(num)重分区,将数据写入HDFS形成num个文件。再将这些小文件多线程批量copy到tbds。

hdfs小文件数量nums从几千到几万,而批量写入的衔接数connections不可能无限大, 把文件笼统成出产者,数据库衔接笼统成顾客。出产者源源不断出产,顾客才干有限跟不上出产者的速率,就需求堵塞在消费端。

分享一个生产者-消费者的真实场景

1.完结方法

出产者-顾客模式的完结,不论是自己运用锁,还是运用堵塞行列,其间心都是堵塞。

1.1 方法1 线程池自带堵塞行列

咱们批量写入是经过多线程来的,完结一个线程池的其间之一方法是经过Executors,并指定一个带线程数的参数。
这样的方法在线上7*24小时运转的事务系统中是绝对不推荐运用的,但在一些大数据渠道的守时使命也不是彻底制止,看自身状况。

运用Executors构建线程池最大问题在于它底层也是经过ThreadPoolExecutor来构建线程池,中心线程和最大线程相同,且堵塞行列默许为LinkedBlockingQueue,这个堵塞行列 没有设置长度,那么它的最大长度为Integer.MAX_VALUE
这样就可能形成内存的无限添加,内存耗尽导致OOM。

分享一个生产者-消费者的真实场景

分享一个生产者-消费者的真实场景

但具体到咱们现在的这个场景下,文件数为几千到几万,那么线程池堵塞行列的长度在这个范围以内,假如渠道资源能够承受,也不是不能够。
一起,刚好能够运用线程池的堵塞行列来构建顾客-出产者。

public static void main(String[] args) throws Exception {
        List<File> fileList = cn.hutool.core.io.FileUtil.loopFiles(new File("测试途径"));
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        LongAdder longAdder = new LongAdder();
        for(File file : fileList){
            try {
                executorService.execute(new TestRun(fileList, longAdder));
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
        executorService.shutdown();
    }
    public static class TestRun implements Runnable{
        private List<File> fileList;
        LongAdder longAdder;
        public TestRun(List<File> fileList, LongAdder longAdder) {
            this.fileList = fileList;
            this.longAdder = longAdder;
        }
        @SneakyThrows
        @Override
        public void run() {
            try {
                // 可经过衔接池
                longAdder.increment();
                ConnectionUtils.getConnection();
                System.out.println(Thread.currentThread() + "第"+ longAdder.longValue() + "/"+ fileList.size() +"个文件获取衔接正在入库");
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(Thread.currentThread() + "第"+ longAdder.longValue() + "/"+ fileList.size() +"个文件完结入库偿还衔接");
            } finally {
            }
        }
    }

运转输出:

数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
Thread[pool-1-thread-5,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-9,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-1,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-2,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-7,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-10,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-6,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-8,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-3,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-1,5,main]10/33个文件完结入库偿还衔接
数据库驱动加载成功
Thread[pool-1-thread-1,5,main]11/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]11/33个文件完结入库偿还衔接
数据库驱动加载成功
.
.
.
数据库驱动加载成功
Thread[pool-1-thread-3,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-9,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-8,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-6,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-7,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-10,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-5,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-4,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-3,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-2,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-1,5,main]33/33个文件完结入库偿还衔接

这儿的longAdder只是为了方便观看,并没有严格按线程递增。
咱们模仿33个文件,线程池的中心巨细为10,能够看到最大只需10个文件在一起履行,只需当其间文件入库结束,新的文件才干履行。达到了咱们想要的作用。

1.2 方法2 运用堵塞行列+CountDownLatch

CountDownLatch是什么?

它是一种同步辅助工具,答应一个或多个线程等候,直到在其他线程中履行的一组操作完结。

CountDownLatch运用给定的计数进行初始化。await()会堵塞,直到当时计数由于countDown()的调用而达到零,之后一切等候线程都会被开释,任何后续的await()调用都会立即回来。这是一种一次性现象——计数无法重置。

CountDownLatch是一种通用的同步工具,可用于多种意图。用计数1初始化的CountDownLatch用作简单的开/关锁存器或门:一切调用的线程都在门处等候,直到调用countDown的线程翻开它。初始化为N的CountDownLatch能够用来让一个线程等候,直到N个线程完结了一些操作,或许一些操作现已完结了N次。

自定义一个堵塞行列,并将这个堵塞行列构建成数据库衔接池,运用10个固定的巨细,只需文件take到衔接才会入库操作,拿不到的时分就堵塞直到其它文件入库完结偿还数据库衔接。

@Slf4j
public class ConnectionQueue {
    LinkedBlockingQueue<Connection> connections = null;
    private int size = 10;
    public ConnectionQueue(int size) throws Exception{
        new ConnectionQueue(null, size);
    }
    public ConnectionQueue(LinkedBlockingQueue<Connection> connections, int size) throws IllegalArgumentException{
        if (size <= 0 || size > 100) {
            throw new IllegalArgumentException("size 长度有必要适宜,在1-100之间");
        }
        this.connections = connections;
        this.size = size;
    }
    /**
     * 初始化数据库衔接
     */
    public void init(){
        if (connections == null) {
            connections = new LinkedBlockingQueue<>(size);
        }
        for (int i = 0; i < size; i++) {
            connections.add(ConnectionUtils.getConnection());
        }
    }
    /**
     * 获取一个数据库衔接,假如没有闲暇衔接将堵塞直到拿到衔接
     * @return
     * @throws InterruptedException
     */
    public Connection get() throws InterruptedException {
        return connections.take();
    }
    public Connection poll() throws InterruptedException {
        return connections.poll();
    }
    /**
     * 偿还闲暇衔接
     * @param connection
     */
    public void put(Connection connection){
        connections.add(connection);
    }
    public int size(){
        return connections.size();
    }
    /**
     * 毁掉
     */
    public void destroy() {
        Iterator<Connection> it = connections.iterator();
        while (it.hasNext()) {
            Connection conn = it.next();
            if (conn != null) {
                try {
                    conn.close();
                    log.info("封闭衔接 " + conn);
                } catch (SQLException e) {
                    log.error("封闭衔接失利", e);
                }
            } else {
                log.info("conn = {}为空", conn);
            }
        }
        if (connections != null) {
            connections.clear();
        }
    }
}

一起运用CountDownLatch进行计数,await()直到一切线程都履行结束,再进行资源毁掉和其它事务操作。

public static void main(String[] args) throws Exception {
        List<File> fileList = cn.hutool.core.io.FileUtil.loopFiles(new File("测试途径"));
        ConnectionQueue connectionQueue = new ConnectionQueue(10);
        connectionQueue.init();
        ExecutorService executorService = new ThreadPoolExecutor(10,
                10,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10),
                 (r, executor) -> {
                     if (r instanceof Test.TestRun) {
                         ((TestRun) r).getCountDownLatch().countDown();
                     }
                     System.out.println(Thread.currentThread() +" reject countdown");
                 }
                );
        CountDownLatch countDownLatch = new CountDownLatch(fileList.size());
        for(File file : fileList){
            try {
                Connection conn = connectionQueue.get();
                executorService.execute(new TestRun(countDownLatch, connectionQueue, fileList, conn));
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
        countDownLatch.await();
        executorService.shutdown();
        connectionQueue.destroy();
    }
    public static class TestRun implements Runnable{
        private CountDownLatch countDownLatch;
        private ConnectionQueue connectionQueue;
        private Connection connection;
        private List<File> fileList;
        public TestRun(CountDownLatch countDownLatch, ConnectionQueue connectionQueue, List<File> fileList, Connection connection) {
            this.countDownLatch = countDownLatch;
            this.connectionQueue = connectionQueue;
            this.fileList = fileList;
            this.connection = connection;
        }
        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }
        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @SneakyThrows
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread() + "第"+ countDownLatch.getCount() + "/"+ fileList.size() +"个文件获取衔接正在入库");
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(Thread.currentThread() + "第"+ countDownLatch.getCount() + "/"+ fileList.size() +"个文件完结入库偿还衔接");
            } finally {
                connectionQueue.put(connection);
                countDownLatch.countDown();
            }
        }
    }

履行成果:

数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
Thread[pool-1-thread-1,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-3,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-2,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-10,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-6,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-7,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-8,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-9,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-5,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-4,5,main]32/33个文件获取衔接正在入库
Thread[pool-1-thread-8,5,main]32/33个文件完结入库偿还衔接
Thread[pool-1-thread-8,5,main]31/33个文件获取衔接正在入库
Thread[pool-1-thread-8,5,main]31/33个文件完结入库偿还衔接
Thread[pool-1-thread-8,5,main]30/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]30/33个文件完结入库偿还衔接
...
Thread[pool-1-thread-2,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-5,5,main]10/33个文件完结入库偿还衔接
Thread[pool-1-thread-4,5,main]9/33个文件完结入库偿还衔接
Thread[pool-1-thread-9,5,main]8/33个文件完结入库偿还衔接
Thread[pool-1-thread-2,5,main]7/33个文件完结入库偿还衔接
Thread[pool-1-thread-6,5,main]6/33个文件完结入库偿还衔接
Thread[pool-1-thread-7,5,main]5/33个文件完结入库偿还衔接
Thread[pool-1-thread-10,5,main]4/33个文件完结入库偿还衔接
Thread[pool-1-thread-3,5,main]3/33个文件完结入库偿还衔接
Thread[pool-1-thread-1,5,main]2/33个文件完结入库偿还衔接
Thread[pool-1-thread-8,5,main]1/33个文件完结入库偿还衔接

1.2.1 假如线程池触发reject会发生什么?

需求注意的是,这儿要考虑到线程池的回绝战略。

咱们知道JDK线程池回绝战略完结了四种:

AbortPolicy 默许战略,抛出反常
CallerRunsPolicy  从姓名上能够看出,调用者履行
DiscardOldestPolicy 丢掉最老的使命,再测验履行
DiscardPolicy  直接丢掉不做任何操作

ThreadPoolExecutor默许回绝战略为AbortPolicy,便是抛出一个反常,那么这时分就履行不到后边的countdown。 所以需求重写战略,在线程池行列已满回绝新进使命的时分履行countdown,避免countDownLatch.await()永久等候。

假如运用默许的回绝战略,履行如下:

分享一个生产者-消费者的真实场景

1.3 方法3 运用Semaphore

在 java 中,运用了 synchronized 关键字和 Lock 锁完结了资源的并发拜访控制,在同一时间只答应一个线程进入临界区拜访资源 (读锁在外)。但考虑到别的一种场景,共享资源在同一时间能够提供给多个线程拜访,如厕一切多个坑位,能够一起提供给多人运用。这种场景下,就能够运用Semaphore信号量来完结。

信号量通常用于限制能够拜访某些(物理或逻辑)资源的线程数量。信号量保护一组答应(permit),在拜访资源前,每个线程有必要从信号量取得一个答应,以确保资源的有限拜访。当线程处理完后,向信号量回来一个答应,答应另一个线程获取。 当信号量答应>1,意味能够拜访资源,假如信号量答应<=0,线程进入休眠。 当信号量答应=1,约等于synchronizedlock的作用。

就好比一个厕所管理员,站在门口,只需厕一切空位,就开门答应与空侧数量等量的人进入厕所。多个人进入厕所后,相当于N个人来分配运用N个空位。为避免多个人来一起竞争同一个侧卫,在内部依然运用锁来控制资源的同步拜访。

在咱们的场景下,共享资源便是数据库衔接池N个,M个文件需求拿到衔接池进行入库操作,但衔接池数量N有限,远小于文件数M,所以需求对衔接池的拜访并发度进行控制。

信号量在这儿起到了控流的作用。
Semaphore semaphore = new Semaphore(10);
答应线程池最多10个使命并行履行,只需当其它使命履行结束偿还permit,新的使命拿到permit才干开端履行。

public static void main(String[] args) throws Exception {
        List<File> fileList = FileUtil.loopFiles(new File("测试途径"));
        Semaphore semaphore = new Semaphore(10);
        Random random = new Random();
        ExecutorService executorService = new ThreadPoolExecutor(10,
                10,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10));
        AtomicInteger count = new AtomicInteger(1);
        for (File file : fileList) {
            semaphore.acquire();
                executorService.execute(() -> {
                try {
                    int subCount = count.getAndIncrement();
                    System.out.println(Thread.currentThread() + "第" + subCount + "/" + fileList.size() + "个文件获取衔接正在入库");
                    // 模仿入库操作
                    int time = random.nextInt(1000);
                    Thread.sleep(time);
                    System.out.println(Thread.currentThread() + "第" + subCount + "/" + fileList.size() + "个文件完结入库偿还衔接");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        System.out.println("shutdown");
        executorService.shutdown();
    }

由于咱们的大数据结构本身有获取衔接池的轮子,这儿省掉了从衔接池获取衔接的操作。

运转日志:

Thread[pool-1-thread-1,5,main]1/33个文件获取衔接正在入库
Thread[pool-1-thread-3,5,main]3/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]2/33个文件获取衔接正在入库
Thread[pool-1-thread-10,5,main]5/33个文件获取衔接正在入库
Thread[pool-1-thread-9,5,main]4/33个文件获取衔接正在入库
Thread[pool-1-thread-8,5,main]8/33个文件获取衔接正在入库
Thread[pool-1-thread-2,5,main]9/33个文件获取衔接正在入库
Thread[pool-1-thread-7,5,main]7/33个文件获取衔接正在入库
Thread[pool-1-thread-6,5,main]6/33个文件获取衔接正在入库
Thread[pool-1-thread-5,5,main]10/33个文件获取衔接正在入库
Thread[pool-1-thread-5,5,main]10/33个文件完结入库偿还衔接
Thread[pool-1-thread-5,5,main]11/33个文件获取衔接正在入库
Thread[pool-1-thread-3,5,main]3/33个文件完结入库偿还衔接
...
Thread[pool-1-thread-2,5,main]23/33个文件完结入库偿还衔接
shutdown
Thread[pool-1-thread-2,5,main]33/33个文件获取衔接正在入库
Thread[pool-1-thread-4,5,main]24/33个文件完结入库偿还衔接
Thread[pool-1-thread-5,5,main]32/33个文件完结入库偿还衔接
Thread[pool-1-thread-1,5,main]30/33个文件完结入库偿还衔接
Thread[pool-1-thread-9,5,main]26/33个文件完结入库偿还衔接
Thread[pool-1-thread-3,5,main]19/33个文件完结入库偿还衔接
Thread[pool-1-thread-2,5,main]33/33个文件完结入库偿还衔接
Thread[pool-1-thread-8,5,main]22/33个文件完结入库偿还衔接
Thread[pool-1-thread-6,5,main]27/33个文件完结入库偿还衔接
Thread[pool-1-thread-10,5,main]31/33个文件完结入库偿还衔接
Thread[pool-1-thread-7,5,main]28/33个文件完结入库偿还衔接

1.3.1 假如引发了默许线程池回绝战略,Semaphore会有问题吗?

咱们知道CountDownLatch由于线程池回绝战略,没有履行到countdown()会导致程序一向堵塞。那么Semaphore会有相应的问题吗?

假如线程池行列满了,触发了默许回绝战略,这时分,Semaphore履行了acquire(),但没履行release()
写一个测试比如:

public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        Semaphore semaphore = new Semaphore(10);
        ExecutorService executorService = new ThreadPoolExecutor(5,
                5,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1), (r, executor) -> {
                    Random random = new Random();
                    try {
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (r instanceof TestRun) {
                        ((TestRun) r).getCountDownLatch().countDown();
//                                    ((TestRun) r).getSemaphore().release();
                    }
                    System.out.println(Thread.currentThread() + " reject countdown " + semaphore.availablePermits());
        });
        for (int i = 0; i < 30; i++) {
            semaphore.acquire();
            Thread.sleep(100);
            executorService.execute(new TestRun(countDownLatch, semaphore));
        }
//        countDownLatch.await();
        System.out.println("完结");
        executorService.shutdown();
    }
    public static class TestRun implements Runnable {
        private CountDownLatch countDownLatch;
        private Semaphore semaphore;
        public TestRun(CountDownLatch countDownLatch, Semaphore semaphore) {
            this.countDownLatch = countDownLatch;
            this.semaphore = semaphore;
        }
        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }
        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        public Semaphore getSemaphore() {
            return semaphore;
        }
        public void setSemaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
        @SneakyThrows
        @Override
        public void run() {
//            semaphore.acquire();
            Random random = new Random();
            Thread.sleep(random.nextInt(1000));
            countDownLatch.countDown();
            semaphore.release();
            System.out.println(Thread.currentThread() + " start" + " semaphore = " + semaphore.availablePermits());
            System.out.println(Thread.currentThread() + " countdown");
        }
    }

履行日志:

Thread[pool-1-thread-1,5,main] start semaphore = 8
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 5
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 4
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 5
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 6
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 7
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 7
Thread[pool-1-thread-4,5,main] start semaphore = 5
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 5
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 3
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 3
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 4
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 4
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 4
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 3
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 3
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 2
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 2
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 2
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 3
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 5
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 6
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 6
完结
Thread[pool-1-thread-5,5,main] start semaphore = 4
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 5
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 6
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 7
Thread[pool-1-thread-3,5,main] countdown

能够看到履行了3次reject,最终semaphore值为7,正常应该为初始值10。 首先程序能够正常履行结束,然后并发度下降了。
假如极点状况下,触发回绝战略增多,semaphore的值降为1,这儿semaphore就变成了lock或许synchronized,多线程就失去了作用变成了单线程串行履行。

经过JDK线程池回绝战略之一的CallerRunsPolicy源码可知,这儿的r即为调用者线程,在这儿便是main线程。咱们在main线程履行了acquire(),那么咱们只需求重写回绝战略,在这儿履行release()就可确保并发度与初始值保持一致。

分享一个生产者-消费者的真实场景

可是假如semaphore=0呢?会堵塞履行吗?

1.3.2 假如初始化的时分就为0

Semaphore semaphore = new Semaphore(0);

那么程序会永久堵塞不履行,由于没有可用的permit。

分享一个生产者-消费者的真实场景

jdk源码这儿没有对传入的参数做判别,甚至能够传入负数。

分享一个生产者-消费者的真实场景

由于与countdownlatch不同,这儿能够开释添加恣意大于0的permit数量。

分享一个生产者-消费者的真实场景

1.3.3 假如reject次数大于等于初始化长度

初化长度大于1,比如10,
Semaphore semaphore = new Semaphore(10);
一起,线程池回绝次数>= 10,理论上,这个时分Semaphore就会出现0或负数。
线程就会堵塞。

但这种状况真的会发生吗?

我模仿了许屡次都没出现堵塞的状况。 把线程池巨细调整为1,将Semaphore巨细设置为>1,这儿为4。

public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        Semaphore semaphore = new Semaphore(4);
        ExecutorService executorService = new ThreadPoolExecutor(1,
                1,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1), (r, executor) -> {
                    Random random = new Random();
                    try {
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (r instanceof TestRun) {
                        ((TestRun) r).getCountDownLatch().countDown();
        //                            ((TestRun) r).getSemaphore().acquire();
//                                    ((TestRun) r).getSemaphore().release();
                    }
                    System.out.println(Thread.currentThread() + " reject countdown " + semaphore.availablePermits());
        });
        for (int i = 0; i < 30; i++) {
            semaphore.acquire();
//            Thread.sleep(100);
            executorService.execute(new TestRun(countDownLatch, semaphore));
        }
//        countDownLatch.await();
        System.out.println("完结");
        executorService.shutdown();
    }
    public static class TestRun implements Runnable {
        private CountDownLatch countDownLatch;
        private Semaphore semaphore;
        public TestRun(CountDownLatch countDownLatch, Semaphore semaphore) {
            this.countDownLatch = countDownLatch;
            this.semaphore = semaphore;
        }
        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }
        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        public Semaphore getSemaphore() {
            return semaphore;
        }
        public void setSemaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
        @SneakyThrows
        @Override
        public void run() {
//            semaphore.acquire();
            Random random = new Random();
            Thread.sleep(random.nextInt(1000));
            countDownLatch.countDown();
            semaphore.release();
            System.out.println(Thread.currentThread() + " start" + " semaphore = " + semaphore.availablePermits());
            System.out.println(Thread.currentThread() + " countdown");
        }
    }

履行成果:

Thread[pool-1-thread-1,5,main] start semaphore = 2
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 2
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 1
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 0
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 0
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
完结
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown

最终semaphore = 1. 当我将semaphore初始化值调整为3,5,2,最终semaphore的值总是为1。 线程池触发回绝次数总是为semaphore初始化值-1

其实也很好理解,由于当permit>=1的时分,acquire()方法才会回来,不然就一向堵塞。所以初始permit>0的状况下,永久不会出现permit为0。


所以,结论是只需semaphore的初始值大于0,就不必担心程序会一向堵塞不履行。 一起,线程池触发回绝战略,假如没有重写回绝战略履行semaphore.release(),就会将并发度降低。

2. 总结

1.直接运用线程池行列要注意堵塞行列巨细为Integer.MAX_VALUE可能导致内存耗费问题。
2.这儿运用信号量最为简单快捷。
3.不论运用的是coundownlatch还是信号量,都要注意线程池回绝的状况。
假如countdownlatch由于线程池回绝战略没有履行countdown会导致await一向等候堵塞;
假如信号量由于线程池回绝战略没有履行release,导致没有满足的permit,不会导致程序堵塞,但会降低并发 度。