小木箱成长营并发编程系列教程:

并发编程 根底篇(上) android线程那些事

并发编程 根底篇(中) 三大剖析法剖析Handler

并发编程 进步篇(上) Java并发关键字那些事

并发编程 进步篇(下) Java锁安全性那些事

并发编程 高档篇(上) Java内存模型那些事

并发编程 高档篇(下) Java并发BATJ面试之谈

并发编程 实战篇 android下载器完结

Tips: 重视微信大众号小木箱成长营,回复 “并发编程” 可获得并发编程免费思维导图

一、序文

Hello,我是小木箱,欢迎来到小木箱成长营并发编程系列教程,今日将共享并发编程 根底篇(下) 三大剖析法剖析线程池

三大剖析法剖析android线程池首要分为四部分,榜首部分是4W2H剖析线程池,第二部分是MECE剖析线程池,第三部分是SCQA剖析线程池,终究一部分是结语。

其间,4W2H剖析线程池首要围绕线程池提出了6个高价值问题。

其间,MECE剖析线程池首要分为线程池根本操作、线程池生命周期、线程池作业原理、线程池代码事例剖析、线程池的功用优化、线程池留意事项、线程池线程数量确认和线程池事务防劣化8部分。

终究,以SCQA的办法在B站上投放一些来自BATJD等大厂的高频面试题。

并发编程  基础篇(下)  三大分析法分析线程池

假如彻底掌握小木箱成长营并发编程系列教程,那么任何人都能经过高并发相关的技能面试。

二、4W2H剖析线程池

2.1 What: 线程池详细界说

并发编程  基础篇(下)  三大分析法分析线程池

线程池是一种办理和调度线程的机制,线程池能够操控线程的数量,确保线程有效地作业,而且能够重复运用以前创立的线程,然后削减体系的开支。

2.2 Why: 线程池运用原因

并发编程  基础篇(下)  三大分析法分析线程池

假如不运用线程池,每个使命都新开一个线程处理 for循环创立线程,开支太大,咱们期望有固定数量的线程,来履行这1000个线程,就防止了重复创立并毁掉线程所带来的开支问题

2.3 How: 线程池运用办法

2.3.1 线程池API文档

API 简介
isShutdown 判别其时线程的状况是否是SHUTDOWN,即是否调用了shutdown办法
isTerminating 其时线程池的状况是否小于TERMINATED,而且大于等于SHUTDOWN,即其时线程是否现已shutdown而且正在中止。
isTerminated 线程池是否中止完结
awaitTermination 等候直到线程状况变为TERMINATED
finalize 从头Object的办法,当线程池目标被收回的时分调用,在这儿调用shutdown办法中止线程,防止呈现内存走漏
prestartCoreThread 预先发动一个中心线程
prestartAllCoreThreads 预先发动一切的中心线程
remove 从使命行列中移除指定使命
purge 从行列中移除一切的以及撤销的Future使命
getPoolSize 获取线程池中线程的数量,即Worker的数量
getActiveCount 获取线程池中正在履行使命的线程Worker数量
getLargestPoolSize 获取线程池曾经敞开过的最大的线程数量
getTaskCount 获取总的使命数量,该值为每个Worker以及完结的使命数量,以及正在履行的使命数量和行列中的使命数量
getCompletedTaskCount 获取Worker以及履行的使命数量
beforeExecute 使命履行前调用
afterExecute 使命履行后调用
terminated 线程中止时调用,用来收回资源

2.3.2 线程池根底结构

线程池的根底结构分为三部分: 堵塞行列BlockingQueue、中心参数和Worker作业线程。

2.3.2.1 堵塞行列

线程池ThreadLocal是一个堵塞行列BlockingQueue

private final BlockingQueue<Runnable> workQueue;

堵塞行列BlockingQueue首要是用来供给使命缓冲区的功用,作业线程从堵塞行列BlockingQueue中取出使命来履行。

线程池中存放使命用的是offer办法,取出使命用的是poll办法。 堵塞行列BlockingQueue有三种通用战略

直接提交

直接提交,当一个线程提交一个使命的时分,假如没有一个对应的线程来取使命,提交堵塞或许失利。相同的当一个线程取使命的时分,假如没有一个对应的线程来提交使命,取堵塞或许失利。

SynchronousQueue便是这种行列的完结,这种行列一般要求maximumPoolSizes最大线程数量是无界的,防止提交的使命由于offer失利而被回绝履行。

当提交使命的速率大于使命履行的速率的时分,这种行列会导致线程数量无限的添加。

无界行列

无界行列,无界行列完结的例子是LinkedBlockingQueue,当中心线程都处于繁忙的状况的时分, 提交的使命都会添加到无界行列中,不会有超越中心线程数corePoolSize的线程被创立。

这种行列或许适用于使命之间都是独立的,使命的履行都是互不影响的。

例如,在一个web服务器中,这种行列能够用来使短时刻很多的并发恳求变得愈加平滑,当提交使命的速率大于使命履行的速率的时分,这种行列会导致行列空间无限添加。

有界行列

有界行列,有界行列完结的例子是ArrayBlockingQueue,运用该行列配合设置有限的maximumPoolSizes能够防止资源耗尽,这种状况下协谐和操控资源和吞吐量是比较困难的。

行列巨细和maximumPoolSize的设置是比较矛盾的:运用容量大的行列和较少的线程资源会削减CPU、OS资源、线程上下文切换等的消耗,可是会下降体系吞吐量。

假如使命频频的堵塞,例如使命是IO密布的类型,这种状况下体系能够调度更多的线程。运用小容量行列,就要要求maximumPoolSize大一些,然后让CPU保持繁忙的状况,可是或许呈现线程上下文切换频频、线程数量过多调度困难现已创立线程OOM导致资源耗尽的状况,然后下降吞吐量。

SynchronousQueue vs LinkedBlockingQueue vs ArrayBlockingQueue

SynchronousQueue

SynchronousQueue适用于恳求呼应要求无推迟,恳求 并发 量较少的场景

当线程Worker没有从行列取使命的时分,offer回来false,直接敞开线程。当Worker从行列取使命的时分,该使命能够直接提交给Worker履行。

因而,这种线程池不会呈现等候的状况,呼应速度很快。

行列的缺陷是提交使命速度大于使命履行速度时,会导致线程无限添加,因而,运用场景需求是并发量较少的状况。

例如,在OkHttp结构中履行HTTP恳求便是用的这种行列构建的线程池。

LinkedBlockingQueue

LinkedBlockingQueue适用于并发量高,使命之间都是独立的,互不影响的场景。

比方在web服务器中,面临瞬时很多恳求的涌入,能够愈加平滑的处理,然后起到削峰的效果,而且防止线程资源的耗尽。

ArrayBlockingQueue

ArrayBlockingQueue是介于前两者之间的行列,能够协调体系资源和吞吐量之间的平衡。

2.3.2.2 中心参数

一个Worker代表一个作业线程,wrokers存储的是线程池中一切的作业线程。

作业线程的中心参数有如下

private final HashSet<Worker> workers = new HashSet<Worker>();
// ---------------------------------------------------------------
// 曾经敞开过的最大的线程数量
private int largestPoolSize;
// 完结的使命的数量
private long completedTaskCount;
// ---------------------------------------------------------------
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;

这几个变量都是用户设置的参数变量,为了确保参数设置的可见性,一切参数都运用volatile润饰。 ThreadFactory是线程创立工厂,供给线程创立和装备的接口,这儿运用的是工厂办法办法,默许的完结是DefaultThreadFactory。

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    //留意,Runnable r 便是作业线程接口Worker,需求传到Thread中。
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group,r,
                               namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

默许的线程工厂创立的线程称号为pool-poolNumber-thread-threadNumber,榜首个线程池榜首个线程称号为pool-0-thread-0,第二个线程称号为pool-0-thread-1,第二个线程池榜首个线程称号为pool-1-thread-0,第二个线程称号为pool-1-thread-1,顺次类推。

RejectedExecutionHandler

是当使命 被回绝时的履行接口,供给了4种完结战略,默许采取AbortPolicy战略,假如不设置,线程池在回绝使命的时分会抛出反常。

CallerRunsPolicy

在其时提交线程直接运转该使命。

AbortPolicy

直接抛出RejectedExecutionException反常。

DiscardPolicy

丢掉该使命,什么都不做。

DiscardOldestPolicy

从行列中移除头节点使命,然后再次提交使命到线程池。

keepAliveTime

搁置线程等候使命的超时时刻,线程运用时刻当线程数量超越corePoolSize的时分或许设置了答应中心线程超时的时分,不然线程会一向等候直到有新的使命。

allowCoreThreadTimeOut

答应中心线程超时,默许是false,假如设置为true,则中心线程会运用超时时刻来等候使命。

corePoolSize

中心线程数量,默许状况下中心线程会一向等候直到有新的使命,假如设置了答应中心线程超时,则最小线程数为0。

maximumPoolSize

能够敞开的最大的线程数量。

2.3.2.3 作业线程

2.3.2.1.1 Worker界说

Worker代表一个作业线程,该类完结了Runnable接口,承继自AQS,内部完结了一套简略的锁机制。这儿运用的是署理办法,Worker完结了Runnable,然后轮训使命行列,取出使命履行。详细代码如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    //代表作业线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    //该线程履行的榜首个使命
    Runnable firstTask;
    /** Per-thread task counter */
    //该线程现已履行的使命数量
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        //留意 将其时署理Runnable传递到ThreadFactory作为线程的履行载体。
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    //该Runnable作为署理,敞开轮训,从行列中取出提交的Runnable来履行。
    public void run() {
        runWorker(this);
    }
    // Lock methods
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    //简略的互斥锁,把0批改为1代表获取到锁,把1批改为0代表开释锁。
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(01)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
}
2.3.2.1.2 Worker创立

那么该怎么创立一个作业线程呢?

创立作业线程首要分为四步:

  1. 判别其时线程的数量是否超越了corePoolSize或许maximumPoolSize,假如超越回来false,假如没有,经过cas添加其时线程的数量。
  2. 创立Worker,在其结构办法中会经过ThreadFactory创立线程,然后将Worker添加到调集中。
  3. 假如Worker创立成功,调用Thread的start办法发动线程,敞开使命轮训。
  4. 假如线程发动失利,处理Worker创立失利的状况,将Worker移除,防止内存走漏,然后测验中止线程池。
private boolean addWorker(Runnable firstTask,boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //获取其时线程的数量,假如是中心线程办法,线程数量不能大于corePoolSize
            //假如对错中心线程办法,线程数量不能大于maximumPoolSize 不然回来false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //运用cas 更新线程Worker的数量,更新成功退出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创立Worker,Worker结构函数中会经过ThreadFactory创立Thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //加锁,并发安全操控
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //线程现已发动 抛出反常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //添加到 HashSet<Worker>调集中
                    workers.add(w);
                    //更新其时线程数量 给largestPoolSize赋值
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //发动线程 敞开使命轮训
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //假如线程发动失利 会将刚刚添加的Worker移除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
//Worker添加失利的处理
 private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //移除Worker 防止内存走漏
            if (w != null)
                workers.remove(w);
            //削减Worker的数量
            decrementWorkerCount();
            //测验中止线程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
2.3.2.1.3 Worker线程原理

Worker线程原理分为七步:

  1. 敞开循环,先履行firstTask,然后调用getTask()从行列中取出使命履行。
  2. 使命履行前会先判别线程池的状况,当线程池的状况是STOP的时分,中止使命线程。
  3. 调用beforeExecute办法。
  4. 调用使命的run办法履行使命。
  5. 调用afterExecute办法。
  6. 使命履行完结后,累加其时Worker履行的使命数量到Wroker的completedTasks变量中。
  7. 循环结束后,线程履行结束,处理后续状况。
 //Worker的run办法中调用runWorker办法敞开轮训
public void run() {
            runWorker(this);       
 }
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //首要履行firstTask 然后在经过getTask()办法从行列中取出使命履行。
        while (task != null || (task = getTask()) != null) {
            //履行使命前先加锁 能够经过tryLock办法 判别其时Worker线程是否在履行使命
            //假如在履行使命,tryLock回来false,不然,回来true
            w.lock();
            // If pool is stopping,ensure thread is interrupted;
            // if not,ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //假如线程池是处于STOP的状况,即调用了shutDownNow办法,确保线程是中止的
            if ((runStateAtLeast(ctl.get(),STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(),STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //hook办法,使命履行前调用beforeExecute,使命履行后调用afterExecute
                //能够经过这两个办法来监控线程池中使命的履行状况
                beforeExecute(wt,task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task,thrown);
                }
            } finally {
                task = null;
                //累加其时Worker现已完结的使命数量
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker退出时的后续处理
        processWorkerExit(w,completedAbruptly);
    }
}
2.3.2.1.3 Task使命的获取

使命获取首要分为两步:

  1. 线程池并没有差异中心线程和非中心线程,只是确保中心线程的数量。 当线程数量大于中心线程数量,或许设置了中心线程可超时,则经过超时polll办法获取使命,不然经过无限堵塞take办法获取使命。,线程数量等于中心线程数量时,剩下的线程会一向堵塞直到有使命履行,线程数量大于中心线程数量是,非中心线程会在超时时刻之后退出。刚开端创立的中心线程或许会退出,后来创立的非中心线程或许会一向存活到终究。
  2. 当线程池的状况是STOP或许线程池的状况是SHUTDOWN而且行列是空的时分,会回来null,Wroker线程结束履行,削减Worker的数量。
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //回来null的状况有两种
        //1.线程池的状况变为STOP。
        //2.线程池的状况是SHUTDOWN,其时行列是空的。
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        //线程池并没有差异中心线程和非中心线程,只是依据其时的线程数量来运用不同的获取使命的办法
        //1.线程数量大于corePoolSize 或许设置了中心线程超时,则运用超时poll办法获取使命
        //2.线程数量等于corePoolSize而且没有设置中心线程超时,运用take办法获取使命
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //超时,其时的Worker行将退出循环,因而,批改Worker的数量,然后回来null。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :
                workQueue.take();
            //假如不是空的回来,假如是空的,阐明现已超时,设置timeOut为true
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
2.3.2.1.4 Worker线程退出
  1. 正常退出状况下,在getTask办法中现已调整了线程数量,可是反常退出状况,来不及调整,在这儿需求从头调整线程数量。
  2. 移除Worker,计算总使命数量。
  3. 测验中止线程池,调用tryTerminate()办法。
  4. 假如其时线程状况小于STOP,即RUNNING和SHUTDOWN状况,需求补齐线程数量。假如线程反常退出,直接调用addWorker办法补齐线程;假如线程正常退出,判别其时线程数量是否小于线程池最小线程数量,假如小于,直接补齐,不然,直接回来。正常退出或许是超越中心线程数量的线程获取 使命超时了,这种状况是不需求补齐的。假如最小线程数量为0,可是行列中还有使命,线程池的状况不是STOP,是需求补齐的。
private void processWorkerExit(Worker w,boolean completedAbruptly) {
    //completedAbruptly 代表线程是正常退出 还是反常退出
    //假如线程是正常退出,在getTask办法中现已调整了workerCount
    //假如线程反常退出,需求在这儿调整workerCount
    if (completedAbruptly) // If abrupt,then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //把该Worker履行的使命数量累加到总使命数量变量中 然后从调集中移除Worker
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //测验中止线程池,只要终究一个Worker线程履行完,才会中止线程池
    tryTerminate();
    //获取线程状况,假如线程池的状况小于STOP 即RUNNING和SHUTDOWN状况,
    //而且线程是正常退出,计算其时应该存活的最小线程数量,假如min为0,可是行列不是空的,
    //则线程池还需求线程来履行使命,批改min为1
    //假如其时线程数量大于min,则直接回来,不需求补齐线程空缺
    //假如其时线程数量小于min,则调用addWorker补齐线程空缺。
    int c = ctl.get();
    if (runStateLessThan(c,STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null,false);
    }
}
 //测验中止线程池
 final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
           //1.其时线程状况为RUNNING,直接回来,此时未调用shutDown或许shutDownNow办法,不需求中止。
           //2.其时线程状况大于TIDYING,阐明其他Worker现已开端履行terminated()办法,为了确保该法仅            //  履行1次,直接回来。
           //3.其时线程状况为SHUTDOWN而且行列不是空的,直接回来,需求等候行列使命履行完再中止。
            if (isRunning(c) ||
                runStateAtLeast(c,TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //假如worker数量不为0,测验中止其时搁置的线程,即在poll和take中等候的线程,然后让一切线程
            //都退出使命轮训,加速线程池收回进程。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //更新线程池的状况为TIDYING,调用terminated(),这是一个hook办法,
                //能够在这儿面做一些资源收回的操作,履行完后,设置线程池状况为TERMINATED
                //唤醒在awaitTermination办法上等候的线程。
                if (ctl.compareAndSet(c,ctlOf(TIDYING,0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED,0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
//测验中止等候在取出使命的线程,假如onlyOnw为true,只会中止一个。
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
2.3.2.1.5 Worker线程中止
  1. 线程池的状况为RUNNING,直接回来,不需求中止。
  2. 线程状况为SHUTDOWN而且行列不是空的,直接回来,需求等候行列使命履行完再中止。
  3. 其时线程状况大于TIDYING,阐明其他Worker现已开端履行terminated()办法,为了确保该办法仅履行一次,直接回来。
  4. 假如worker数量不为0,测验中止其时搁置的线程,即在poll和take中等候的线程,然后让一切线程都退出使命轮训,加速线程池收回进程。
  5. 更新线程池的状况为TIDYING,调用terminated(),这是一个hook办法,能够在这儿面做一些资源收回的操作,履行完后,设置线程池状况为TERMINATED,唤醒在awaitTermination办法上等候的线程。

2.3.3 创立和中止线程池

并发编程  基础篇(下)  三大分析法分析线程池

ExecutorService executorService=Executors.newFixedThreadPool(5);
       // 1.创立线程池
        for(int i=0;i< 10;i++){
           executorService.execute(new Runnable(){
          @Override
          public void run(){
             System.out.println(Thread.currentThread().getName()+"办理事务");     
}
        });
    }
// 2.封闭线程池
        executorService.shutdown();
    // executorService.shutdownNow();
       }
    }

2.3.4 手动创立 vs 自动创立

  • 一般状况下,应该手动创立线程池,由于手动创立能够更好地操控线程池的巨细,以及线程池中线程的生命周期。

  • 自动创立的线程池巨细或许不够,导致线程饥饿,或许线程池中线程的生命周期或许太长,导致体系资源糟蹋。

2.3.5 线程数量装备

一般来说,线程数量的设定要取决于使命的杂乱度和计算机的功用。

假如使命比较杂乱,那么线程数量能够设定的比较多,能够进步程序的并行处理才能,然后进步功率。

可是,假如线程数量设定的太多,或许会导致体系资源运用率过高,然后下降体系的功率。

因而,线程数量的设定应依据使命的杂乱度和计算机的功用来合理设定。

2.3.6 中止线程池

并发编程  基础篇(下)  三大分析法分析线程池

运用shutdown()办法来中止线程池,shutdown()办法会等候线程池中正在履行的使命完结,然后才会中止线程池。

假如需求当即中止线程池,能够运用shutdownNow()办法,shutdownNow()办法会测验中止正在履行的使命,而且回绝接收新的使命。

2.3.7 线程池状况

并发编程  基础篇(下)  三大分析法分析线程池

System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());
//封闭线程池
executorService.shutdown();
System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());

输出成果:

false

false

true

true

线程池的状况有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

线程池把线程数量和线程状况打包到了一个int变量中,然后运用AtomicInteger原子类来批改和获取该值。一方面能够节约内存,一方面能够削减一个原子性操作,供给功用,究竟原子型操作是不可中止的,阻碍体系的切换上下文。 线程池的5个状况值存储到高3位中,线程数量存储到低29位中。runStateOf办法是用来获取高3位的线程池状况值的,workerCountOf是用来获取低29位的线程池中的线程数量的,ctlOf是把两个值经过或运算打包到一个值中。


private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs,int wc) { return rs | wc; }

RUNNING是运转状况,承受新使命,处理行列中的使命。

SHUTDOWN是封闭状况, 不承受新使命,可是处理行列中的使命。

STOP是中止状况,不承受新使命,不处理行列中的使命,中止正在履行使命的线程。

TIDYING是收拾状况, 一切的使命现已被中止,一切的线程现已履行完变为TERMINATED状况,workerCount为0,线程池之后会调用terminated()扩展hook办法,终究变为TERMINATED状况。

TERMINATED是中止状况,terminated()办法履行完结,在该办法能够做一些资源收回的作业,此时的线程池行列清空,线程终结,资源收回结束。

线程池的状况是不可逆的,一旦进入TERMINATED 状况,便无法重置,必须从头创立一个新的线程池才能提交使命,和线程的运用是相同的。

2.3.8 线程池状况切换

那么,线程池怎么进行状况切换呢?

RUNNING / SHUTDOWN

调用shutdown()办法,或许隐式的在finalize()办法中调用, 线程池的状况变为RUNNING或SHUTDOWN

RUNNING / SHUTDOWN -> STOP

调用shutdownNow(), 线程池的状况由SHUTDOWN变成STOP

SHUTDOWN -> TIDYING

行列是空的,线程数量是0,使命都履行结束, 线程池的状况由SHUTDOWN变成TIDYING

STOP -> TIDYING

线程数量为0, 线程池的状况由STOP变成TIDYING

TIDYING -> TERMINATED

terminated() 履行完结时分, 线程池的状况由TIDYING变成TERMINATED

TERMINATED

awaitTermination() 调用该办法会一向堵塞直到线程池的状况变成TERMINATED

2.4 When: 线程池运用机遇

并发编程  基础篇(下)  三大分析法分析线程池

  1. 短时刻使命:假如需求在运用程序中履行多个短期使命,那么运用线程池能够进步功率并下降资源消耗。
  2. 多用户恳求:假如运用程序需求处理多个用户恳求,而每个恳求需求履行耗时的操作,那么运用线程池能够让运用程序更好地呼运用户恳求。
  3. 并发拜访:假如多个线程需求拜访共享资源,例如数据库或文件体系,那么运用线程池能够防止线程之间的竞争条件,并进步运用程序的吞吐量。
  4. 异步使命:假如运用程序需求履行异步使命,例如下载文件或处理很多数据,那么运用线程池能够让运用程序愈加高效地履行这些使命,而且防止堵塞主线程。

2.5 How Much: 线程池事务价值

并发编程  基础篇(下)  三大分析法分析线程池

由于线程存在两个弊端.榜首个是重复创立线程开支大,第二个是过多的线程会占用太多内存

处理过多的线程会占用太多内存的思路是让这部分线程都保持作业,且能够重复履行使命防止生命周期的损耗

处理过多的线程会占用太多内存的思路是用少数的线程防止内存占用过多

而线程池刚好契合了上述两个优势,而且线程池有以下三个事务价值:

榜首个是复用线程,削减线程频频创立和毁掉带来的体系开支。加速呼应速度;

第二个是合理运用CPU和内存;

第三个是一致的线程办理,防止呈现随意敞开线程导致线程数量过多然后引发OOM。

2.6 Where: 线程池运用场景

并发编程  基础篇(下)  三大分析法分析线程池

在Android开发中,线程池常常被用于以下场景:

  1. 处理网络恳求:Android运用一般需求与服务器进行数据交互,网络恳求一般是一个异步操作,运用线程池能够防止网络恳求堵塞UI线程,保持运用的呼应性。
  2. 处理耗时操作:例如对大文件的读写、仿制、压缩等操作,操作会堵塞UI线程,导致运用卡顿,运用线程池能够将操作放到作业线程中履行。
  3. 并行履行多个使命:当需求一起履行多个使命时,例如下载多个文件,运用线程池能够使使命并行履行,进步功率。
  4. 处理守时使命:当需求履行守时使命时,例如轮询服务器,守时更新UI等,运用线程池能够在守时使命完结后将成果回来到UI线程中。
  5. 处理很多使命行列:例如运用RecyclerView展现很多数据,需求异步加载图片等操作,运用线程池能够办理使命行列,优化体系资源的运用。

综上所述,线程池在Android开发中的运用场景首要是处理网络恳求、处理耗时操作、并行履行多个使命、处理守时使命以及处理很多使命行列等,能够进步运用的功用和呼应速度,一起防止UI线程堵塞和ANR问题。

三、MECE剖析线程池

3.1 线程池的根本操作

并发编程  基础篇(下)  三大分析法分析线程池

线程池的根本操作包含:

3.1.1 创立线程池

运用ThreadPoolExecutor类的结构函数创立线程池。ThreadPoolExecutor是创立线程池的东西类,封装了几种常用线程池的创立办法,常用办法有如下几种:

固定线程线程池(FixedThreadPool)

固定线程线程池界说

线程池中的线程数量固定不变,当有使命提交时,假如线程池中有闲暇线程,则当即运用闲暇线程履行使命;假如没有,则等候有线程闲暇停止。

固定线程线程池原理剖析

创立线程数量固定的线程池,中心线程数corePoolSize和最大线程数maximumPoolSize相同,中心线程不超时,行列是LinkedBlockingQueue,行列巨细没有约束。

固定线程线程池运用场景

合适数据数量固定的数据处理场景,例如百度网盘中的批量文件下载功用,指定五个线程一起下载文件,其他使命都在行列排队。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
固定线程线程池项目运用

相片导出功用,把10000个人员相片加密后导出到U盘里,运用线程池数量为5的固定线程池履行使命,还有一个使命分配线程,首要负责查询数据库,监控线程池和提交使命。

缓存线程池(CachedThreadPool)

缓存线程池界说

线程池中的线程数量能够依据使命的多少自动调整,假如有很多使命提交,则线程池会动态添加线程数量;假如没有使命提交,则线程池会动态削减线程数量。

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
缓存线程池原理剖析

创立缓存线程池,中心线程数corePoolSize为0,最大线程数maximumPoolSize是Integer.MAX_VALUE,线程超时时刻是60s,行列是SynchronousQueue同步行列。

缓存线程池运用场景

合适对呼应速度要求高,并发少的场景,Okhttp便是用的缓存线程池来处理http恳求的,符合手机上并发恳求少,呼应速度快的要求。

缓存线程池项目运用

运用Okhttp的进程间接运用来了缓存线程池,项目中应该慎重运用该线程池。

守时器线程池(ScheduledThreadPool)

能够在固定的时刻距离或许指定的时刻履行使命,该线程池能够设置固定的线程数量或许可变的线程数量。

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

单线程池(SingleThreadExecutor)

单线程线程池界说

线程池中只要一个线程,一切使命都在同一个线程中依照行列次序顺次履行。

单线程线程池原理剖析

创立单线程线程池,中心线程数corePoolSize和最大线程数maximumPoolSize都是1,中心线程不超时,行列是LinkedBlockingQueue,行列巨细没有约束。

单线程线程池运用场景

合适使命并发少,触发频频,使命履行时刻固定的事务场景。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
单线程线程池项目运用

人脸辨认成功保存记载是经过一个单线程线程池保存记载的,人脸辨认的距离时刻大于记载保存时刻,因而正常状况下使命不会呈现堵塞在行列的状况。

ForkJoin线程池

该线程池是Java 7引入的一种专门用于处理分治算法的线程池,能够递归地将使命拆分成小使命,并将小使命分配给线程池中的线程履行,然后将小使命的成果合并起来,终究得到大使命的成果。

ForkJoinPool forkJoinPool = new ForkJoinPool();

3.1.2 提交使命

提交使命办法

运用execute、submit办法提交使命到线程池的使命行列中。

fixedThreadPool.execute(new Runnable() {
    public void run() {
        // 履行使命
    }
});
cachedThreadPool.execute(new Runnable() {
    public void run() {
        // 履行使命
    }
});
scheduledThreadPool.schedule(new Runnable() {
    public void run() {
        // 履行使命
    }
}, 5TimeUnit.SECONDS);
singleThreadExecutor.execute(new Runnable() {
    public void run() {
        // 履行使命
    }
});
forkJoinPool.invoke(new RecursiveTask() {
    public Object compute() {
        // 履行使命
    }
});

线程池的 execute() 办法和 submit() 办法都用于向线程池提交使命,可是它们有以下几个差异:

  1. 回来值不同:execute() 办法没有回来值,而 submit() 办法回来一个 Future 目标。
  2. 反常处理不同:execute() 办法没有办法处理使命履行时抛出的反常,而 submit() 办法能够运用回来的 Future 目标处理使命履行时抛出的反常。
  3. 使命类型不同:execute() 办法只能提交 Runnable 类型的使命,而 submit() 办法能够提交 RunnableCallable 类型的使命。
  4. 办法重载:execute() 办法只要一种重载办法,而 submit() 办法有多种重载办法,能够指定回来成果、推迟履行等参数。

因而,当需求获取使命履行成果或许处理使命履行时或许会抛出的反常时,应该运用 submit() 办法;

当不需求获取使命履行成果或许不需求处理使命履行时或许会抛出的反常时,能够运用 execute() 办法。 execute() 办法源码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     /*
      * 1.假如其时线程数量小于中心线程数,敞开一个新线程,然后把使命作为该线程首个使命来履行
      * 2.假如其时线程数量等于中心线程数,测验添加使命到堵塞行列BlockingQueue
      * 3.假如添加行列失利,即行列已满,敞开一个新线程,然后把使命作为该线程首个使命来履行
      * 4.假如第3步敞开线程失利,即线程数量超越最大线程数,调用RejectedExecutionHandler的       *       *   rejectedExecution办法履行回绝战略。
      */
    //获取线程池状况和线程数量的组合值,这两个值被打包到了一个int中
    int c = ctl.get();
    //workerCountOf 获取Worker的数量,即线程数量。
    //isRunning  获取线程池的状况,判别线程池是否是运转状况
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command,true))
            return;
        //从头获取,多线程环境,该值或许现已发生变化
        c = ctl.get();
    }
    //测验添加到行列
    if (isRunning(c) && workQueue.offer(command)) {
        //从头查看状况值 假如线程现已shutdown 则回绝添加使命
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null,false);
    }
    //以非中心线程添加办法创立线程,假如失利,走回绝战略
    else if (!addWorker(command,false))
        reject(command);
}

3.1.3 封闭线程池

  1. shutdown

运用shutdown办法封闭线程池,等候使命行列中的使命履行结束后再封闭。

调用shutdown办法后的状况转化:RUNNING–>SHUTDOWN–>TIDYING–>TERMINATED

  1. shutdownNow

运用shutdownNow办法当即封闭线程池,会中止正在履行的使命,并回来未履行的使命列表。

3.1.4 动态批改线程池巨细

运用setCorePoolSize和setMaximumPoolSize办法动态批改线程池中的线程数量。

调用shutdownNow办法后的状况转化:RUNNING–>STOP–>TIDYING–>TERMINATED

3.1.5 回绝战略

当使命行列已满而且线程池中的线程数量已到达最大值时,运用setRejectedExecutionHandler办法设置回绝战略来处理无法处理的使命。

3.1.6 监控线程池

运用getActiveCount、getCompletedTaskCount、getTaskCount等办法获取线程池的状况信息。

根本操作能够满意大多数线程池的需求,一起Java线程池还供给了很多高档特性,例如守时使命、线程池工厂等,能够依据详细需求进行选择。

3.2 线程池的生命周期

并发编程  基础篇(下)  三大分析法分析线程池

线程池的生命周期一般包含以下阶段:

  1. 创立:线程池被创立,但还没有开端处理使命。
  2. 发动:线程池被发动,开端承受使命,而且依据装备参数创立指定数量的线程。
  3. 运转:线程池正在运转中,等候接收使命而且履行。
  4. 中止:线程池被中止,一切的使命现已被履行结束,线程池中的一切线程被毁掉。

需求留意的是,在线程池被中止之前,或许会存在一些状况导致线程池被封闭,比方程序发生反常、线程池被自动封闭等状况。此时,线程池中的一切使命或许无法悉数被履行结束,因而在实践运用中需求留意线程池的封闭战略,防止呈现数据丢失等问题。

3.3 线程池的作业原理

线程池是一种多线程处理的机制,线程池答应在运用程序中预先创立必定数量的线程并将它们放在一个池中,线程能够重复运用,以削减线程的创立和毁掉开支。

一句话总结线程池作业原理: 线程池的完结整体流程是一个可装备的出产者顾客模型,然后依据单一的堵塞缓冲行列来完结的

线程池作业原理分为六个进程解说,榜首个进程是初始化线程池,第二个进程是将使命添加到使命行列,第三个进程是查看线程池状况,第四个进程是取出使命并履行,第五个进程是处理使命反常,第六个进程是封闭线程池。

  1. 初始化线程池

创立一个线程池目标,并设置线程池的参数,如线程池的巨细、使命行列的巨细、线程的优先级等。

参阅 #3.1.1 四种线程池的创立办法

  1. 将使命添加到使命行列

当有使命需求履行时,将使命添加到使命行列中。

参阅 #2.3.2.1.2 Worker创立

  1. 查看线程池状况

线程池会周期性地查看自身状况,假如其时线程池中的线程数小于预设的最小线程数,则会创立新的线程。

参阅 #2.3.2.1.3 Worker线程原理

  1. 取出使命并履行

线程池中的线程会不断从使命行列中取出使命并履行。

参阅 #2.3.2.1.4 Task使命的获取

并发编程  基础篇(下)  三大分析法分析线程池

  1. 假如其时线程数量小于中心线程数,敞开新线程,将其时使命做为新线程的榜首个使命来履行。
  2. 假如其时线程数量等于中心线程数,测验添加使命到行列。
  3. 假如添加行列失利,即行列是满的,则以敞开新线程,将其时使命做为新线程的榜首个使命来履行。
  4. 假如新线程敞开失利,即其时线程数量等于最大线程数量,履行回绝战略。
  1. 处理使命反常

假如使命履行进程中发生了反常,线程池能够处理反常并记载反常信息。

参阅 #3.1.5 回绝战略

  1. 封闭线程池

当线程池不再需求运用时,需求将线程池封闭。封闭线程池时,首要需求将使命行列中的使命履行结束,然后再将线程池中的线程封闭。

参阅 #3.1.3 封闭线程池

经过运用线程池,能够优化体系功用,削减线程创立和毁掉的开支,防止线程过多导致体系资源不足的状况,并进步体系的可维护性和可扩展性。

3.4 线程池代码事例剖析

3.4.1 OkHttp

在OKHttp中AsyncCall、Dispatcher和ConnectionPool都是经过线程池进行维护的。

并发编程  基础篇(下)  三大分析法分析线程池

AsyncCall

AsyncCall是一个Runnable接口,能够经过线程池异步履行,下面是run办法

异步恳求的履行流程:

  1. 运用AsyncTimeout监听恳求是否超时,会敞开一个子线程,线程称号Okio Watchdog ,超时后会调用Call的cancel撤销恳求 。AsyncTimeout监听的是http恳求的完好进程,包含dns解析、恳求数据发送、服务器处理、恳求数据读取的整个流程。
  2. 拼装过滤器链,开端履行恳求流程。
  3. 回调恳求成果,告诉Dispatcher恳求现已履行完。
    override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            //运用AsyncTimeout监听恳求是否超时,会敞开一个子线程,线程称号 Okio Watchdog    
            //超时后会调用Call的cancel撤销恳求    
            timeout.enter()
            try {
                //拼装过滤器链 开端履行恳求流程
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
                //回调恳求成果
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
                //恳求履行完结 
              client.dispatcher.finished(this)
            }
          }
        }
      }

Dispatcher

Dispatcher效果

Dispatcher是用来办理衔接和分发恳求的,运用线程池履行异步使命。默许运用的线程池是缓存线程池,能够在构建OkHttpClient的时分经过Dispatcher的结构参数传入自己的线程池。

Dispatcher缓存线程池

缓存线程池运用同步行列,中心线程数为0。

提交使命的时分,假如其时没有线程在取使命就会敞开新线程履行,也便是说假如其时线程都在忙于履行恳求,会马上敞开一个新线程。缓存线程池吞吐量高,呼应速度快,可是并发高的状况下会创立很多线程,占用体系资源。

Dispatcher缓存线程池用处

移动客户端的网络恳求特色是并发量少,大多数状况只要2、3个一起宣布的恳求,可是由于大多恳求都是由用户触发的恳求,因而对呼应速度要求较高。缓存线程池恰好满意了移动端的网络需求特色。 为了防止恳求过多很多创立线程,因而运用两个行列约束异步恳求的数量。一起履行的最大恳求数量是64,假如运用缓存线程池,也就适当于约束了一起运转的最大线程数量是64。相同域名的最大恳求数量是5。

Dispatcher行列界说

readyAsyncCalls

调用enqueue办法提交给Dispatcher的恳求,假如没有提交给线程池履行,那么提交给线程池会从行列中移除。

runningAsyncCalls

正在线程池中履行的异步恳求,还没有履行完,履行完会从行列中移除。

runningSyncCalls

正在履行的同步恳求,还没有履行完,履行完会从该行列中移除。

Dispatcher成员变量
    //一起履行的最大恳求的数量 
    var maxRequests = 64
    //每个域名能够一起履行的最大恳求数量
    var maxRequestsPerHost = 5
      private var executorServiceOrNull: ExecutorService? = null
    //线程池默许运用缓存线程池 
      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("$okHttpName Dispatcher"false))
          }
          return executorServiceOrNull!!
        }
     /** 调用enqueue提交给Dispatcher的恳求,还没有提交到线程池履行 */
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
      /** 提交到线程池正在履行的异步恳求,还没有履行完 */
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private val runningSyncCalls = ArrayDeque<RealCall>()
      constructor(executorService: ExecutorService) : this() {
        this.executorServiceOrNull = executorService
      }
Dispatcher异步恳求enqueue
  1. 把AsyncCall添加到待履行行列 。
  2. 非webSocket恳求,从待履行异步行列和已履行异步行列中查找相同host的恳求,把现已存在的call的callsPerHost复制到新call的callsPerHost。这样做的目的是,例如榜首个恳求查找不到相同host的恳求,因而callsPerHost是0,添加到线程池中,callsPerHost变成了1。第2个恳求从以履行行列中查找到了相同host的恳求,复制callsPerHost,新AsyncCall的callsPerHost就变成了1。这样传递下去,新添加的AsyncCall中的callsPerHost便是该host一起履行的恳求数量。
  3. 提交给线程池履行
    internal fun enqueue(call: AsyncCall) {
      synchronized(this) {
        //1.添加到待履行行列  
        readyAsyncCalls.add(call)
        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.call.forWebSocket) {
            //2.不是webSocket恳求 从待履行异步行列和已履行异步行列中查找相同host的恳求
            //把现已存在的call的callsPerHost复制到新call的callsPerHost
          val existingCall = findExistingCallWithHost(call.host)
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
      }
        //3.提交给线程池履行
      promoteAndExecute()
    }
promoteAndExecute履行流程
private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
      //1.遍历带履行行列
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()
      //正在履行的恳求数量大于64 直接退出循环
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      //单个域名的最大恳求数量大于5 处理下一个恳求    
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
      //将恳求从待履行行列移除 添加到已履行行列中 
      i.remove()
      //添加相同域名的恳求数量    
      asyncCall.callsPerHost.incrementAndGet()
      //添加到 暂时的可履行行列     
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  //遍历可履行行列 提交给线程池履行
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }
  //isRunning是同步已履行恳求和异步已履行恳求的使命总量 isRunning>0阐明有使命在履行 =0阐明没有使命在履行
  return isRunning
}

遍历待履行行列,假如已履行行列的使命数量大于64,跳出循环,假如其时asyncCall的相同域名恳求数量大于5,处理下一个恳求。

将恳求从待履行行列移除,添加到已履行行列和暂时可履行行列中,更新asyncCall中的callsPerHost。

遍历暂时可履行行列,把使命添加到线程池中履行。

回来是否有使命在履行

ConnectionPool

ConnectionPool运用署理办法,被署理类是RealConnectionPool,在此根底上供给了一些开发功用。默许的最大约束衔接数是5,保持衔接的最大时长是5分钟。

constructor() : this(55, TimeUnit.MINUTES)
/** Returns the number of idle connections in the pool. */
fun idleConnectionCount(): Int = delegate.idleConnectionCount()
/** Returns total number of connections in the pool. */
fun connectionCount(): Int = delegate.connectionCount()
/** Close and remove all idle connections in the pool. */
//铲除和封闭衔接池中的一切衔接
fun evictAll() {
  delegate.evictAll()
}
//RealConnectionPool
//衔接存活的最长时刻 默许是5分钟
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
//整理衔接的使命 TaskRunner中运用一个缓存线程池履行改使命
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
  override fun runOnce() = cleanup(System.nanoTime())
}
/**
 *  运用cas无锁行列存储RealConnection
 */
private val connections = ConcurrentLinkedQueue<RealConnection>()

3.4.2 ThreadLocal

ThreadLocal原理

ThreadLocal经过线程数据阻隔的办法来处理并发数据拜拜访题,每个线程都有自己的数据副本,ThreadLocal的原理图如下

并发编程  基础篇(下)  三大分析法分析线程池

线程数据阻隔的中心是每个Thread目标都有一个归于自己的ThreadLocalMap目标,ThreadLocalMap经过数组完结数据存取,每个数组元素都是一个Entry。

Entry用ThreadLocal做为key,value是咱们要存放的数据。

ThreadLocal

一个ThreadLocal只能存取一品种型的数据,存取多品种型的数据能够运用多个ThreadLocal,也能够把数据封装到同一个目标中。


public class ThreadLocal<T> {
    private final int threadLocalHashCode = nextHashCode();
    //生成ThreadLocal的hashcode的原子计数器
    private static AtomicInteger nextHashCode =
        new AtomicInteger();
    /**
     * The difference between successively generated hash codes - turns
     * implicit sequential thread-local IDs into near-optimally spread
     * multiplicative hash values for power-of-two-sized tables.
       两个ThreadLocal的hash值的距离差
     */
    private static final int HASH_INCREMENT = 0x61c88647;
    /**
     * Returns the next hash code.
       hash值 以HASH_INCREMENT累加
     */
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
}    

ThreadLocal的hash值是经过计数器自增生成的,运用多个ThreadLocal的状况下会呈现hash抵触。

ThreadLocalMap

ThreadLocalMap运用WeakReference,为监听ThreadLocal是否被收回。

ThreadLocal.ThreadLocalMap threadLocals = null;
//当创立子线程的时分,子线程能够得到父线程的inheritableThreadLocals
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//-------------------------------------------------------------

当Entry.get()回来null的时分,阐明ThreadLocal现已被收回,这时就要将Entry中的value引证设置为null,防止呈现内存走漏。

//运用WeakReference监听ThreadLocal是否被收回
static class Entry extends WeakReference<ThreadLocal<?>> {
      /** The value associated with this ThreadLocal. */
      Object value;
      Entry(ThreadLocal<?> k,Object v) {
          super(k);
          value = v;
      }
  }
   //初始化容量 16
   private static final int INITIAL_CAPACITY = 16;
  /**
   * The table,resized as necessary.
   * table.length MUST always be a power of two.
   Entry数组
   */
  private Entry[] table;
  /**
   * The number of entries in the table.
     实践存储的数据量
   */
  private int size = 0;
  /**
   * The next size value at which to resize.
     扩容阈值 默许是size到达容量的2/3时扩容
   */
  private int threshold; // Default to 0

为什么运用WeakReference,为监听ThreadLocal是否被收回。当Entry.get()回来null的时分,阐明ThreadLocal现已被收回,这时就要将Entry中的value引证设置为null,防止呈现内存走漏。

ThreadLocal中心办法

set
  1. 获取其时线程Thread引证,获取或创立Thread中的ThreadLocalMap,调用ThreadLocalMap的set办法。
  2. 让ThreadLocal的hash值和数组长度做与运算得到对应的数组索引index。
  3. 线性勘探法处理hash抵触,假如数组索引方位的Entry是空的,创立一个新的Entry设置到该方位。假如数组索引方位的key和其时ThreadLocal地址相同,用新值更新旧值。不然便是呈现了hash抵触,从其时的i开端向后查找,直到找到一个空的方位停止。在查找的进程中,假如发现ThreadLocal现已被收回,就会调用replaceStaleEntry办法整理对应方位的value数据。
  4. 创立Entry,添加size。
  5. 先整理ThreadLocal现已被收回的Entry,然后判别是否需求扩容。
public void set(T value) {
        //获取其时线程引证
        Thread t = Thread.currentThread();
        //获取其时线程的ThreadLocalMap 假如是空的会创立一个
        ThreadLocalMap map = getMap(t);
        if (map != null)
            //设置值
            map.set(this,value);
        else
            createMap(t,value);
    }
    private void set(ThreadLocal<?> key,Object value) {
            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones,in which case,a fast
            // path would fail more often than not.
            Entry[] tab = table;
            int len = tab.length;
            //1.对hashcode做与运算 确认ThreadLocal在数组中的索引
            int i = key.threadLocalHashCode & (len-1);
            //2.从其时索引开端向后查找,找到key直接赋值回来,不然找到一个Entry为空的方位,记载i
            //用线性勘探的办法处理hash抵触的问题
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i,len)]) {
                ThreadLocal<?> k = e.get();
                //key现已存在 直接替换旧值
                if (k == key) {
                    e.value = value;
                    return;
                }
                //key为空 整理对应的value数据 
                if (k == null) {
                    replaceStaleEntry(key,value,i);
                    return;
                }
            }
            //3.创立一个新的Entry
            tab[i] = new Entry(key,value);
            int sz = ++size;
            //4.先整理key为空的Entry 然后判别是否需求扩容
            if (!cleanSomeSlots(i,sz) && sz >= threshold)
                rehash();
        }
get
  1. 获取其时线程的ThreadLocalMap,调用ThreadLocalMap的getEntry获取Entry。
  2. 依据hashcode和数组长度计算对应的数组索引,假如对应方位的Entry不为空而且key和其时ThreadLocal相同,回来Entry。不然履行第3步。
  3. set数据的时分或许呈现了hash抵触,从数组为i的方位开端向后查找,假如找到了对应的key就回来。假如遇到一个方位Entry为null,阐明后续的方位都是null,因而直接回来null。假如遇到ThreadLocal被收回的状况,调用expungeStaleEntry移除过期的数据。
  4. hash抵触状况下,Entry的查找采用线性查找。因而,在同一个Thread中运用很多ThreadLocal的状况下会比较消耗功用。
public T get() {
    //1.获取其时线程的ThreadLocalMap
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //获取其时ThreadLocal对应的Entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}
//---------------------------------------------
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    //2.Entry不为空而且key和其时ThreadLocal相同
    if (e != null && e.get() == key)
        return e;
    else
        //3.或许呈现了hash抵触 从i开端向数组后边查找
        return getEntryAfterMiss(key,i,e);
}
//---------------------------------------------
private Entry getEntryAfterMiss(ThreadLocal<?> key,int i,Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    //从i开端循环遍历 当Entry=null的时分 阐明后边的必定都是空的
    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i);
        else
            i = nextIndex(i,len);
        e = tab[i];
    }
    return null;
}
扩容
  1. 当数据量到达数组容量的2/3的时分才会扩容,扩容后容量是之前容量的2倍,扩容后会把旧数组中的数据复制到新数组中,经过hash运算和线性勘探计算元素在新数组中的索引。
  2. 扩容进程是不存在线程安全问题的,由于每个线程都有自己的ThreadLocalMap。
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    //扩容速度为原先容量的2倍
    Entry[] newTab = new Entry[newLen];
    int count = 0;
    //把原先的数据复制到新数组中
    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                //从头计算hash值 处理hash抵触
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h,newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    //从头设置新的扩容阈值 为数组长度的2/3
    setThreshold(newLen);
    size = count;
    table = newTab;
}
内存泄露

并发编程  基础篇(下)  三大分析法分析线程池

当ThreadLocal的强引证被置为null的时分,或许会被gc收回,此时就无法经过ThreadLocal拜访到它存储的资源了。

可是还存在一条引证链路,从Thread–>ThreadLocalMap–>Entry–>value的引证链,因而在线程没有结束的状况下,就会发送内存走漏。假如运用的是线程池,内存走漏的持续时刻就会比较长。 ThreadLocal现已处理了问题,经过弱引证来监听ThreadLocal是否被收回,被收回的时分,断开value和Entry之间的引证链。

运用场景

多线程下载文件或从设备导入图片的时分,计算每个线程处理的文件数量,失利的数量、成功的数量等。

3.4.3 alpha

关于大型App来说,发动使命多,使命依靠杂乱。保障使命逻辑的单一性,解耦发动使命逻辑,合理运用多核CPU优势,进步线程运转功率是要点重视的问题。

为了运用多核cpu,进步使命履行功率,让单个使命责任愈加清晰,代码愈加高雅进而进步发动速度,咱们会尽或许让这些作业并发进行。

但这些作业之间或许存在前后依靠的联系,咱们又需求想办法确保他们履行次序的正确性。

所以咱们要做的作业是将使命颗粒化,界说好自己的使命,并描绘它依靠的使命,将它添加到Project中。结构会自动并发有序地履行这些使命,并将履行的成果抛出来。

那么怎样对使命进行分类呢?

使命进行分类策阅能够经过Alpha等发动器把发动使命办理起来。详细分为四个进程: 榜首个进程是将发动使命原子化,分为各个使命。

第二个进程是运用有向无环图办理发动使命。前后依靠的使命串行,无依靠的使命线程池化并行。优先级高的使命在前,优先级低的使命在后。

第三个进程是发动使命集中化,分使命区块:中心使命,首要使命,推迟使命,懒加载使命。中心使命在attachBaseContext中履行,首要使命在发动页或主页履行,推迟使命在主页后闲暇时刻履行,懒加载使命在特定的机遇履行。

终究一个进程是发动使命计算化,供给使命的耗时计算和卡口。

使命办理结构图参阅如下:

并发编程  基础篇(下)  三大分析法分析线程池

而阿里巴巴的alpha发动器恰好处理了发动使命办理不合理的事务痛点, 那么发动使命办理不合理的事务痛点详细表现有哪些呢?

事务痛点

发动使命办理不合理的事务痛点详细表现有如下五个特征:

  1. 多线程办理
  2. 使命的优先级
  3. 使命之间的先后联系
  4. 使命是否需求在主线程履行
  5. 多进程处理

源码剖析

为了深入学习阿里巴巴的alpha发动器的原理, 小木箱经过以下流程图带我们认识一下阿里巴巴的alpha发动器的中心常识。

并发编程  基础篇(下)  三大分析法分析线程池

AlphaManager.getInstance(mContext).start()

start判别是否为其时进程和是否能匹配到相关进程使命, 详细参数装备如下:

  • MAIN_PROCESS_MODE : 主进程使命
  • SECONDARY_PROCESS_MODE :非主进程使命
  • ALL_PROCESS_MODE:适用于一切进程的使命
    public void start() {
        Project project = null;
        do {
            //1.是否有为其时进程单独装备的Project,此为最高优先级
            if (mProjectForCurrentProcess != null) {
                project = (Project) mProjectForCurrentProcess;
                break;
            }
            //2.假如其时是主进程,是否有装备主进程Project
            if (AlphaUtils.isInMainProcess(mContext)
                    && mProjectArray.indexOfKey(MAIN_PROCESS_MODE) >= 0) {
                project = (Project) mProjectArray.get(MAIN_PROCESS_MODE);
                break;
            }
            //3.假如对错主进程,是否有装备非主进程的Project
            if (!AlphaUtils.isInMainProcess(mContext)
                    && mProjectArray.indexOfKey(SECONDARY_PROCESS_MODE) >= 0) {
                project = (Project) mProjectArray.get(SECONDARY_PROCESS_MODE);
                break;
            }
            //4.是否有装备适用一切进程的Project
            if (mProjectArray.indexOfKey(ALL_PROCESS_MODE) >= 0) {
                project = (Project) mProjectArray.get(ALL_PROCESS_MODE);
                break;
            }
        } while (false);
        if (project != null) {
            addListeners(project);
            project.start();
        } else {
            AlphaLog.e(AlphaLog.GLOBAL_TAG, "No startup project for current process.");
        }
    }

Where: 装备相关进程使命方位?

    public void addProject(Task project, int mode) {
        if (project == null) {
            throw new IllegalArgumentException("project is null");
        }
        if (mode < MAIN_PROCESS_MODE || mode > ALL_PROCESS_MODE) {
            throw new IllegalArgumentException("No such mode: " + mode);
        }
        if (AlphaUtils.isMatchMode(mContext, mode)) {
            mProjectArray.put(mode, project);
        }
    }
project start
    @Override
    public void start() {
        mStartTask.start();
    }

敞开一个mStartTask?这个mStartTask是之前设置的些使命中榜首个使命吗?

        //Project.java
        private void init() {
        ...
            mProject = new Project();
            mFinishTask = new AnchorTask(false"==AlphaDefaultFinishTask==");
            mFinishTask.setProjectLifecycleCallbacks(mProject);
            mStartTask = new AnchorTask(true"==AlphaDefaultStartTask==");
            mStartTask.setProjectLifecycleCallbacks(mProject);
            mProject.setStartTask(mStartTask);
            mProject.setFinishTask(mFinishTask);
       ...
        }
    private static class AnchorTask extends Task {
        private boolean mIsStartTask = true;
        private OnProjectExecuteListener mExecuteListener;
        public AnchorTask(boolean isStartTask, String name) {
            super(name);
            mIsStartTask = isStartTask;
        }
        public void setProjectLifecycleCallbacks(OnProjectExecuteListener callbacks) {
            mExecuteListener = callbacks;
        }
        @Override
        public void run() {
            if (mExecuteListener != null) {
                if (mIsStartTask) {
                    mExecuteListener.onProjectStart();
                } else {
                    mExecuteListener.onProjectFinish();
                }
            }
        }
    }        

Why: 界说一个开端使命和一个结束使命

履行视点: 一个使命序列必须有一个开端节点和一个结束节点。

出产视点: 多个使命能够一起开端,而且有多个使命能够一起作为结束点

规划准则:

  1. 设置两个节点便利操控整个流程
  2. 标记流程开端和结束,便利使命的监听
AnchorTask父类Task
  1. 界说Runnable,判别是否主线程,并履行这个Runnable,交叉了一些状况的改变
  2. Runnable内部首要是履行了Task.this.run(),并履行了使命自身。
  3. 其间setThreadPriority办法设置了线程优先级,比方THREAD_PRIORITY_DEFAULT线程优先级处理CPU资源竞争问题,不影响Task之间的优先级。
  4. 假如在主线程履行使命,经过Handler(sHandler)将事件传递给主线程履行。
  5. 假如在非主线程履行的使命,经过线程池(sExecutor)履行线程使命。
    public synchronized void start() {
        ...
        switchState(STATE_WAIT);
        if (mInternalRunnable == null) {
            mInternalRunnable = new Runnable() {
                @Override
                public void run() {
                    android.os.Process.setThreadPriority(mThreadPriority);
                    long startTime = System.currentTimeMillis();
                    switchState(STATE_RUNNING);
                    Task.this.run();
                    switchState(STATE_FINISHED);
                    long finishTime = System.currentTimeMillis();
                    recordTime((finishTime - startTime));
                    notifyFinished();
                    recycle();
                }
            };
        }
        if (mIsInUiThread) {
            sHandler.post(mInternalRunnable);
        } else {
            sExecutor.execute(mInternalRunnable);
        }
    }
notifyFinished

mSuccessorList 排序

「紧后使命列表」,也便是接下来要履行的使命列表。所以流程便是先把其时使命之后的使命列表进行一个排序,依据优先级排序。然后按次序履行onPredecessorFinished办法。 假如紧后使命列表为空,也就代表没有后续使命了,那么就会走onTaskFinish回调办法,奉告其时Project现已履行结束。

遍历mSuccessorList列表,履行onPredecessorFinished办法

监听回调onTaskFinish办法

    void notifyFinished() {
        if (!mSuccessorList.isEmpty()) {
            AlphaUtils.sort(mSuccessorList);
            for (Task task : mSuccessorList) {
                task.onPredecessorFinished(this);
            }
        }
        if (!mTaskFinishListeners.isEmpty()) {
            for (OnTaskFinishListener listener : mTaskFinishListeners) {
                listener.onTaskFinish(mName);
            }
            mTaskFinishListeners.clear();
        }
    }

How: 紧后使命怎样加进来?紧后使命怎样排序?

onPredecessorFinished

紧后使命列表是经过after办法完结的

builder.add(Task2).after(Task1),所以after代表Task2要在Task1后边履行,Task2成了Task1的紧后使命。

同理,Task1也就成了Task2的紧前使命。是经过addPredecessor办法,在添赶紧后使命一起也添赶紧前使命。

紧前使命添加了有什么用呢?难不成还倒退回去履行?

假如有多个使命的紧后使命都是一个,比方这种状况:builder.add(Task4).after(Task2,Task3)

Task4是Task2和Task3的紧后使命,所以在Task2履行完之后,还要判别Task3是否履行成功,然后才能履行Task4,这便是紧前使命列表的效果。

onPredecessorFinished便是做这样的作业的。

How: 紧后使命列表的排序是怎么排序呢?

经过getExecutePriority办法获取task履行优先级数字,正序排列,越小使命履行机遇越早。

setExecutePriority办法设置了排序优先级。

  • 各种回调:包含一些task的回调,project的回调。
  • 日志记载:比方耗时时刻的记载,刚才履行使命时分的recordTime办法,便是记载了每个task的耗时。
  • Task脚本化:经过XmlPullParser类来解析xml数据,然后生成Project来装备Project的Task。
  • 规划办法:构建Project的制作者办法,经过传入task称号能够创立Task工厂办法。
    //1、紧后使命添加
    public Builder after(Task task) {
        task.addSuccessor(mCacheTask);
        mFinishTask.removePredecessor(task);
        mIsSetPosition = true;
        return Builder.this;
    }
    void addSuccessor(Task task) {
        task.addPredecessor(this);
        mSuccessorList.add(task);
    }
    //2、紧后使命列表排序 
    public static void sort(List<Task> tasks) {
        if (tasks.size() <= 1) {
            return;
        }
        Collections.sort(tasks, sTaskComparator);
    }    
    private static Comparator<Task> sTaskComparator = new Comparator<Task>() {
        @Override
        public int compare(Task lhs, Task rhs) {
            return lhs.getExecutePriority() - rhs.getExecutePriority();
        }
    };    
    //3、紧后使命履行
    synchronized void onPredecessorFinished(Task beforeTask) {
        if (mPredecessorSet.isEmpty()) {
            return;
        }
        mPredecessorSet.remove(beforeTask);
        if (mPredecessorSet.isEmpty()) {
            start();
        }
    }    

3.4.4 一键暂停和康复下载

百度网盘批量下载文件,怎么完结一键暂停和康复?能够扩展线程池完结一个能够暂停和康复的线程池

线程池能够让一切Worker暂停新使命的履行,可是正在下载的使命并没有被暂停,所以需求在下载使命中处理暂停和康复的状况。

当使命暂停的时分,退出读取数据的循环,封闭衔接。

由于暂停的时刻肯能比较长,为了防止资源占用时刻较长,需求先封闭衔接,循环退出后有两种处理办法。

循环退出即使命履行完,等回复履行的时分从头提交该使命进行断点续传,由于线程池的排队机制,暂停的使命将无法持续履行,而是在行列中排队,不符合需求。

循环退出后,在外层循环判别其时使命是否下载完,假如没有持续断点续传,续传前判别履行是否暂停,假如暂停则等候,当康复履行的时分,唤醒其时线程.。

在beforeExecute办法中将使命添加到调集中,在afterExecute中将使命移除。因而,当暂停线程池的时分,调集中的使命便是正在履行的使命,顺次遍历调用使命的pause办法,当康复线程池的时分,顺次遍历调用使命的resume办法。

public class CustomExecutor extends ThreadPoolExecutor {
    private List<DownLoadTask>  executingTask=new LinkedList<>();
    private volatile  boolean isRunning;
    private ReentrantLock pauseLock=new ReentrantLock();
    private Condition pauseCondition=pauseLock.newCondition();
    public CustomExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
    }
    public void pause(){
        if (!isRunning){
            return;
        }
        synchronized (this){
            Iterator<DownLoadTask> iterator = executingTask.iterator();
            while (iterator.hasNext()){
                DownLoadTask task = iterator.next();
                task.pause();
            }
            isRunning=false;
        }
    }
    public void resume(){
        if (isRunning){
            return;
        }
        pauseLock.lock();
        try {
            pauseCondition.signalAll();
            Iterator<DownLoadTask> iterator = executingTask.iterator();
            while (iterator.hasNext()){
                DownLoadTask task = iterator.next();
                task.resume();
            }
            isRunning=true;
        } finally {
            pauseLock.unlock();
        }
    }
    @Override
    protected void beforeExecute(Thread t,Runnable r) {
        super.beforeExecute(t,r); 
        pauseLock.lock();
        try {
            while (!isRunning){
                pauseCondition.await();
            }
        } catch (InterruptedException e) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
        synchronized (this){
            if (r instanceof DownLoadTask){
                executingTask.add((DownLoadTask) r);
            }
        }
    }
    @Override
    protected void afterExecute(Runnable r,Throwable t) {
        super.afterExecute(r,t);
        synchronized (this){
            if (r instanceof DownLoadTask){
                executingTask.remove(r);
            }
        }
    }
}

3.5 线程池功用优化

  1. 选择合适的线程池巨细:线程池巨细应该依据体系的处理才能、资源约束以及使命的特性来选择。过小的线程池会导致使命排队等候,过大的线程池会导致资源糟蹋和调度开支添加。一般来说,线程池巨细应该设置为处理器中心数的两倍。
  2. 运用合适的行列类型:线程池的使命行列能够是堵塞行列或非堵塞行列。堵塞行列能够防止使命排队等候时的 busy waiting,但会添加体系开支。非堵塞行列能够削减体系开支,但会添加使命排队等候的时刻。选择合适使命特性和处理需求的行列类型能够进步线程池功用。
  3. 防止使命过多和过少:过多的使命会导致线程池过载,过少的使命会导致线程池资源糟蹋。应该依据实践使命需求和体系处理才能来合理分配使命,防止过多或过少的使命。
  4. 合理设置线程池参数:线程池的参数包含中心线程数、最大线程数、线程存活时刻和使命行列巨细等。应该依据体系特性和使命需求来设置这些参数,以进步线程池功用。
  5. 优化使命履行功率:线程池的功用也与使命履行功率有关。能够优化使命代码,防止耗时操作和竞争条件,进步使命履行功率,然后进步线程池功用。
  6. 监控线程池状况:应该定期监控线程池的状况,包含线程池巨细、线程运用状况、使命行列状况等。及时发现问题并进行调整,以确保线程池的功用。

3.6 线程池留意事项

并发编程  基础篇(下)  三大分析法分析线程池

线程池监控

项目中呈现过记载丢失的状况,有一个记载保存进程中卡住,然后导致后边的使命都存储到了堵塞行列BlockingQueue中。

设备重启后,直接导致行列中的记载悉数丢失。

处理办法是对线程池的运转状况进行监控,正常状况下堵塞行列BlockingQueue里应该是没有使命的,当堵塞行列BlockingQueue中的使命数量超越某个阈值后触发反常使命办理机制。

不同使命类型应该运用不同的线程池,不要把一切使命都用同一个线程池履行。首要从使命类型事务场景使命时刻三个维度去考量。

使命类型

使命首要分为CPU 密布IO 密布两品种型,现代计算机IO都是经过DMA直接存储拜访器处理的,处理完结后在给CPU发一个中止,CPU在持续履行。

假如把这两种使命放到一个线程,读取IO文件的时分,线程就需求等候IO完结才能持续履行。

文件读取和数据处理不能并发处理,使命履行时刻就添加了。

事务场景

app中比较常见的有前台使命和后台使命,面向用户操作的是前台使命,前台使命对呼应速度要求比较高,例如用户点击按钮恳求服务器。

后台使命是长时刻在后台运转的使命,例如百度网盘批量下载文件。

假如前台使命用Okhttp异步恳求,后台使命也运用Okhttp异步恳求,适当于都运用Okhttp缓存线程池,或许会导致线程数量很多添加。

这种状况后台下载文件应该自界说线程池,运用Okhttp同步恳求。

使命时刻

时刻长的使命和时刻短的使命不要运用同一个线程池,会导致时刻短的使命不能及时履行。

3.7 线程池线程数量确认

CPU 密布型

线程数能够设置为N+1,N是CPU中心数。多出来一个是为了防止线程缺页中止或其他原因导致的使命暂停,这时分多出来的线程就能够充沛运用CPU的闲暇时刻。

IO 密布型

线程数能够设置为N*2,IO密布型使命不占用CPU,现代计算机都是经过DMA直接内存拜访操控器处理的。在履行这类使命的时分,CPU会有许多闲暇时刻履行其他使命,因而能够多设置一些线程。

通用公式

IO耗时占比越多,线程数量越多。线程数通用公式参阅如下:

线程数 = CPU 中心数 * (1+ IO 耗时/CPU 耗时)

3.8 线程池事务防劣化Lint东西

为什么不主张运用Executors创立线程? 实践开发中,不主张运用Executors创立线程池,有如下三个原因:

  1. 单线程和固定数量线程线程池的堵塞行列BlockingQueue都没有设置巨细,假如有一个使命堵塞,或许会导致行列中的使命无限添加,终究触发oom或许导致使命悉数丢失。

  2. 缓存线程池线程数量无上限,假如使命过多,而且使命履行时刻都很长,或许会导致线程数量无限添加,终究触发oom。

  3. 不能指定使命回绝战略,默许的回绝战略为AbortPolicy,假如不设置或许会导致程序崩溃。

那么该怎么在编译期去发现上述问题呢? Android Lint是Google供给给Android开发者的静态代码查看东西。

运用Lint对Android工程代码进行扫描和查看,能够发现代码潜在的问题,提醒程序员及早批改。

经过lint东西防劣化, 提醒事务运用架构组独享的线程池。

下面就由小木箱带我们完结一下禁止运用Executors创立线程池的Lint东西吧~

public class ThreadPoolDetector extends Detector implements Detector.JavaScanner {
    public static final Issue ISSUE = Issue.create(
            "创立线程池""防止自己创立ThreadPool""请勿直接运用Executors创立线程,主张运用一致的线程池办理东西类",
            Category.PERFORMANCE,
            6,
            Severity.WARNING,
            new Implementation(ThreadPoolDetector.class, Scope.JAVA_FILE_SCOPE)
    );
    @Override
    public List<Class<? extends Node>> getApplicableNodeTypes() {
        return ImmutableList.of(MethodCall.class);
    }
    @Override
    public void visitMethodCall(@NonNull JavaContext context, @NonNull UCall node, @NonNull PsiMethodCallExpression call) {
        if (node.getMethodName().equals("newFixedThreadPool") || node.getMethodName().equals("newCachedThreadPool") || node.getMethodName().equals("newSingleThreadExecutor")) {
            context.report(ISSUE, node, context.getLocation(node), "不主张运用Executors创立线程, 改用ThreadPoolExecutor");
        }
    }
}

四、SCQA剖析线程池

答案未来将上传B站, 请重视B站号: 小木箱成长营

  1. 日常作业中有用到线程池吗?什么是线程池?为什么要运用线程池?
  2. 作业线程Worker 承继 AQS 完结了锁机制,那 ThreadPoolExecutor 都用到了哪些锁?为什么要用锁?
  3. 项目中是怎样运用线程池的?Executors 了解吗?
  4. 线程池有哪些参数?
  5. 线程池的运转原理是什么?
  6. 线程池的履行流程?
  7. 怎么合理装备线程池?
  8. 中心线程能否退出?
  9. 回绝战略有哪些?适用场景是怎样样的?
  10. 运用线程池的进程中遇到过哪些坑或许需求留意的当地?
  11. 怎么监控线程池?
  12. JDK自带的线程池品种有哪些?
  13. 为什么不推荐运用JDK自带的线程池?
  14. 怎么合理设置中心线程数的巨细?
  15. 说说submit和 execute两个办法有什么差异?
  16. shutdownNow() 和 shutdown() 两个办法有什么差异?
  17. 调用了shutdownNow或许shutdown,线程必定会退出么?
  18. 什么是堵塞行列?堵塞行列有哪些?为什么线程池要运用堵塞行列?
  19. 经过 ThreadPoolExecutor 来创立线程池,那中心参数设置多少合适呢?

五、结语

三大剖析法剖析线程池首要分为四部分,榜首部分是4W2H剖析线程池,第二部分是MECE剖析线程池,第三部分是SCQA剖析线程池,终究一部分是结语。

其间,4W2H剖析线程池首要围绕线程池提出了6个高价值问题。

其间,MECE剖析线程池首要分为线程池根本操作、线程池生命周期、线程池作业原理、线程池代码事例剖析、线程池的功用优化、线程池留意事项、线程池线程数量确认和线程池事务防劣化8部分。

线程池学习的重要性是不可忽视的。在现代互联网时代,线程池是一种重要的多线程编程技能,能够进步程序的功用、稳定性和可靠性。因而,学习线程池成为了每一位Android开发工程师的必备技能。

期望经过经过本文线程池学习,能够让您更快的经过职场面试一起也能处理作业中的事务痛点。

假如你觉的小木箱的文章对你有所协助,那么能够重视大众号小木箱成长营。让你的常识和视界得到更宽广的拓宽吧,下一篇将介绍Java并发关键字那些事,相同是并发编程中心内容。 今日就到这儿啦,我是小木箱,咱们下一篇见~

参阅资料:

更快!更高效!异步发动结构Alpha彻底解析 更快!更高效!异步发动结构Alpha彻底解析