前文了解了线程通讯方法中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下来咱们持续了解其他的线程间通讯方法。

Phaser

Phaser是JDK1.7中引入的一种功能上和CycliBarrier和CountDownLatch相似的同步东西,相对这两者而言其用法更加灵活,一起Phaser也支撑重用。

在Phaser中将需求协作完结的任务分成多个阶段,每个阶段的参加者可指定,参加者能够随时注册并参加到某个阶段或者撤销参加本阶段。以选修课考试为例,阐明Phaser的作业逻辑,假设现有选修课3门,政治,前史,地舆,各选修人数分别为20,10,10.按Phaser实现考试逻辑如下:

  • 第一阶段考政治,一共应有9名同学参加考试,在考试开端时,8位同学开端答题,别的一位同学未到,考试半途,最后一位同学进入,开端考试,一切同学答题完结后,政治考试完毕
  • 第二阶段考前史,一共9名同学参考考试,在考试完毕前,3名同学弃考,则实践参加考试有6名同学,一切同学答题完结后,前史考试完毕
  • 第三阶段考地舆,一共9名同学参加考试,半途无意外,一切同学答题完结后,地舆考试完毕

至此选修课考试的三个阶段均完结,所以选修课考试这个任务完毕,其中第一阶段中晚到参考考试的同学说的便是参加者能够随时注册并参加到某个阶段,第二阶段中弃考的同学说的便是参加者能够随时撤销参加本阶段,当一切参加本阶段的参加者均撤销,则意味着该阶段完结。

在Phaser中,针对一个阶段而言,每一个参加者都被称为一个party,能够经过构造函数指定参加者数量,也能够经过register使parties(party的总和)自增,当当时阶段的一切参加者等于parties的数量时,此刻phase自增1,进入下一个阶段,回调onAdvance方法

Phaser提供的中心函数如下所示:

函数称号 描述 补白
register() 注册一个party,使得parties+1 /
bulkRegister(int parties) 批量注册party,使得parties变为已有个数与传入参数之和 /
arriveAndDeregister() 当时任务已完结,使parties计数减1,不会构成堵塞 /
arriveAndAwaitAdvance() 已抵达履行点,线程堵塞,等候下一阶段唤醒持续履行 /
awaitAdvance(int phase) 参数是一个已完结的阶段编号,通常以已完结任务的arrive或者arriveAndDeregister函数的返回值作为取值,如果传入参数的阶段编号和当时阶段编号相同,则在此处等候,如果不同或者Phaser已经是terminated状况,则当即返回 /
arrive() 抵达当时阶段,不等候其他参加者抵达 /

arriveAndAwaitAdvance

以上述政治考试为例,学习Phaser根本使用

public static void main(String[] args) {
    // 创立Phaser
    Phaser phaser = new Phaser(){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("政治考试完结");
                    break;
                case 1:
                    System.out.println("前史考试完结");
                    break;
                case 2:
                    System.out.println("地舆考试完结");
                    break;
            }
            // 如果抵达某一阶段,Phaser中参加者为0,则会毁掉该Phaser
            return super.onAdvance(phase, registeredParties);
        }
    };
    IntStream.range(1,10).forEach(number->{
        phaser.register();
        Thread student= new Thread(()->{
            System.out.println("学生"+number+"arrive advance");
            // 等候其他线程,此刻block
            phaser.arriveAndAwaitAdvance();
            System.out.println("学生"+number+"政治开端答题");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("学生"+number+"政治交卷");
            // 考试完结,撤销计数,参加者减1
            phaser.arriveAndDeregister();
            System.out.println("Phaser is terminated :" +phaser.isTerminated());
        });
        student.start();
    });
    System.out.println("Phaser is terminated :" +phaser.isTerminated());
}

输出如下:

线程间通信方式(3)

从上面能够看出,Phaser中经过arriveAndAwaitAdvance堵塞当时线程,当一切线程抵达堵塞栅门时,唤醒等候线程持续履行,从而抵达线程间同步协作。

awaitAdvance

有时候,当Phaser 在当时阶段完毕时,咱们需求兜底做一些策略,比如说资源的开释,状况的查看上报等,此刻就需求用到awaitAdvance,awaitAdvance承受一个阶段编号,如果当时阶段编号和传入的持平,则会进入等候状况,等到一切参加者都抵达该阶段栅门时,被唤醒。实例代码如下:

public static class ThreadA implements Runnable {
    private Phaser phaser;
    public ThreadA(Phaser phaser) {
        this.phaser = phaser;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start ");
        phaser.arriveAndAwaitAdvance();
        System.out.println(Thread.currentThread().getName() + " end " );
    }
}
public static class ThreadB implements Runnable {
    private Phaser phaser;
    public ThreadB(Phaser phaser) {
        this.phaser = phaser;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start " );
        phaser.arriveAndAwaitAdvance();
        System.out.println(Thread.currentThread().getName() + " end ");
    }
}
public static class ThreadC implements Runnable {
    private Phaser phaser;
    public ThreadC(Phaser phaser) {
        this.phaser = phaser;
    }
    @Override
    public void run() {
            System.out.println(Thread.currentThread().getName() + " start ");
            System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());
            phaser.awaitAdvance(0);
            System.out.println(Thread.currentThread().getName() + " end ");
    }
}
public static class ThreadD implements Runnable {
    private Phaser phaser;
    public ThreadD(Phaser phaser) {
        this.phaser = phaser;
    }
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " begin sleep");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " sleep completed ");
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public static void main(String[] args) {
    // 声明Phaser
    Phaser phaser = new Phaser(3) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("Phaser arrived at :"+phase);
            return super.onAdvance(phase, registeredParties);
        }
    };
    Thread t1 = new Thread(new ThreadA(phaser));
    Thread t2 = new Thread(new ThreadB(phaser));
    Thread t3 = new Thread(new ThreadC(phaser));
    Thread t4 = new Thread(new ThreadD(phaser));
    t1.setName("ThreadA");
    t2.setName("ThreadB");
    t3.setName("ThreadC");
    t4.setName("ThreadD");
    t1.start();
    t2.start();
    t3.start();
    t4.start();
}

如上代码所示,声明Phaser有三个参加者ThreadA,ThreadB,ThreadD,在三个参加者都履行到arriveAndAwaitAdvance之前,ThreadC 堵塞等候,当三个参加者都履行到arriveAndAwaitAdvance后,回调onAdvance方法,此刻被堵塞的参加者被唤醒履行,之后ThreadC被唤醒持续履行,运转结果如下:

线程间通信方式(3)

Exchanger

Exchanger用于两个线程之间的通讯,无论哪个线程先调用Exchanger,都会等候别的一个线程调用时进行数据交换,示例代码如下:

private static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" sleep start");
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName()+" sleep end");
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String aa = exchanger.exchange("data from Thread1");
            System.out.println(Thread.currentThread().getName() + "   "+aa);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread1").start();
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String bb = exchanger.exchange("data from Thread2");
            System.out.println(Thread.currentThread().getName() + "   "+bb);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread2").start();
}

运转输出如下:

线程间通信方式(3)

总结

结合前文,咱们一共学习了种线程间通讯方法,主要有:

  1. Object.wait/Object.notify/Object.notifyAll + synchronized
  2. Semaphore(信号量)
  3. CountDownLatch
  4. CyclicBarrier
  5. Condition+ReentrantLock
  6. Phaser
  7. Exchanger

我们日常开发中可灵活使用,针对各通讯方法比较见下表:

通讯方法 使用场景 是否可重用 子任务反常处理 补白
Object.wait/Object.notify/Object.notifyAll + synchronized 大多数线程通讯场景 依靠开发者保护,在finally块中完结开释,防止死锁 /
Semaphore(信号量) 通知唤醒类线程间通讯场景 依靠开发者保护,在finally块中开释信号量,防止死锁 /
CountDownLatch 串行多线程运转场景 不加处理的话,子任务产生反常导致退出,则一切等候的线程都会一致等候,直到超时时刻来临 /
CyclicBarrier 聚合类线程通讯场景 不加处理的话,如果在一切线程都抵达屏障堕入堵塞前,如果有线程产生反常导致未抵达栅门提前退出,则一切等候在栅门都会以BrokenBarrierException或InterruptedException反常退出 /
Condition+ReentrantLock 大多数线程通讯场景 依靠开发者保护,在finally块中完结开释,防止死锁 /
Phaser 适用CountDownLatch与CyclicBarrier组合场景 依靠开发者保护,在finally块中撤销参加者,防止死锁 /
Exchanger 线程间数据交换场景 依靠开发者保护,保证两个线程状况正常,并行运转 /