1. 为什么要⽤线程池?

池化技术想必⼤家已经屡⻅不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应⽤。池化技术主要是为了减少每次获取资源的消耗,提⾼对资源的利⽤率。

线程池提供了⼀种限制和管理资源(包括执⾏⼀个任务)的能力。 每个线程池还维护⼀些基本统计信息,例如已完成任务的数量。
这⾥借⽤《Java 并发编程的艺术》来说⼀下使⽤线程池的好处:

  • 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。
  • 提⾼响应速度。当任务到达时,任务可以不需要等到线程创建就能⽴即执⾏。
  • 提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控。

2. 实现 Runnable 接⼝和 Callable 接⼝的区别

Runnable⾃ Java 1.0 以来⼀直存在,但Callable仅在 Java 1.5 中引⼊,⽬的就是为了来处理Runnable不⽀持的⽤例。Runnable接⼝不会返回结果或抛出检查异常,但是Callable接⼝可以。所以,如果任务不需要返回结果或抛出异常推荐使⽤Runnable接⼝,这样代码看起来会更加简洁。

工具类Executors可以实现将Runnable对象转换成Callable对象。(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。

Runnable.java:

@FunctionalInterface
public interface Runnable {
    /**
     * 被线程执⾏,没有返回值也⽆法抛出异常
     */
    public abstract void run();
}

Callable.java

@FunctionalInterface
public interface Callable < V > {
    /**
     * 计算结果,或在⽆法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果⽆法计算结果,则抛出异常
     */
    V call() throws Exception;
}

3. 执⾏ execute()⽅法和 submit()⽅法的区别是什么呢?

  1. execute()⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执⾏成功与否;
  2. submit()⽅法⽤于提交需要返回值的任务。线程池会返回⼀个Future类型的对象,通过这个Future对象可以判断任务是否执⾏成功,并且可以通过Futureget()⽅法来获取返回值,get()⽅法会阻塞当前线程直到任务完成,⽽使⽤get(long timeout, TimeUnit unit)⽅法则会阻塞当前线程⼀段时间后⽴即返回,这时候有可能任务没有执⾏完。

我们以AbstractExecutorService接⼝中的⼀个submit⽅法为例⼦来看看源代码:

public Future << ? > submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture < Void > ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

上⾯⽅法调⽤的newTaskFor⽅法返回了⼀个FutureTask对象。

protected < T > RunnableFuture < T > newTaskFor(Runnable runnable, T value) {
    return new FutureTask < T > (runnable, value);
}

我们再来看看execute()⽅法:

public void execute(Runnable command) {
    ...
}

4. 如何创建线程池

《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤Executors去创建,⽽是通过ThreadPoolExecutor的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的⻛险

Executors返回线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:允许请求的队列⻓度为Integer.MAX_VALUE,可能堆积⼤量的请求,从⽽导致 OOM。
  • CachedThreadPoolScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能会创建⼤量线程,从⽽导致 OOM。

⽅式⼀:通过构造⽅法实现

file

⽅式⼆:通过 Executor 框架的⼯具类 Executors 来实现

我们可以创建三种类型的ThreadPoolExecutor

  • FixedThreadPool:该⽅法返回⼀个固定线程数量的线程池。该线程池中的线程数量始终不变。当有⼀个新的任务提交时,线程池中若有空闲线程,则⽴即执⾏。若没有,则新的任务会被暂存在⼀个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor:⽅法返回⼀个只有⼀个线程的线程池。若多余⼀个任务被提交到该线程池,任务会被保存在⼀个任务队列中,待线程空闲,按先⼊先出的顺序执⾏队列中的任务。
  • CachedThreadPool:该⽅法返回⼀个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复⽤,则会优先使⽤可复⽤的线程。若所有线程均在⼯作,⼜有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执⾏完毕后,将返回线程池进⾏复⽤。

对应Executors⼯具类中的⽅法如图所示:

file

5. ThreadPoolExecutor 类分析

ThreadPoolExecutor类中提供的四个构造⽅法。我们来看最⻓的那个,其余三个都是在这个构造⽅法的基础上产⽣(其他⼏个构造⽅法说⽩点都是给定某些默认参数的构造⽅法,⽐如默认制定拒绝策略是什么),这⾥就不贴代码讲了,⽐较简单。

/**
 * ⽤给定的初始参数创建⼀个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue < Runnable > workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下⾯这些对创建⾮常重要,在后⾯使⽤线程池的过程中你⼀定会⽤到!所以,务必拿着⼩本本记清楚。

5.1. ThreadPoolExecutor 构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize:核⼼线程数定义了最⼩可以同时运⾏的线程数量。
  • maximumPoolSize:当队列中存放的任务达到队列容量的时候,当前可以同时运⾏的线程数量变为最⼤线程数。
  • workQueue:当新任务来的时候会先判断当前运⾏的线程数量是否达到核⼼线程数,如果达到的话,新任务就会被存放在队列中。
    ThreadPoolExecutor其他常⻅参数:
  • keepAliveTime:当线程池中的线程数量⼤于corePoolSize的时候,如果这时没有新的任务提交,核⼼线程外的线程不会⽴即销毁,⽽是会等待,直到等待的时间超过了keepAliveTime才会被回收销毁。
  • unitkeepAliveTime参数的时间单位。
  • threadFactoryexecutor创建新线程的时候会⽤到。
  • handler:饱和策略。关于饱和策略下⾯单独介绍⼀下。

5.2. ThreadPoolExecutor 饱和策略

ThreadPoolExecutor饱和策略定义:
如果当前同时运⾏的线程数量达到最⼤线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor定义⼀些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

举个例⼦:Spring 通过ThreadPoolTaskExecutor或者我们直接通过ThreadPoolExecutor的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler饱和策略的话来配置线程池的时候默认使⽤的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor将抛出RejectedExecutionException来拒绝新来的任务,这代表你将丢失对这个任务的处理。对于可伸缩的应⽤程序,建议使⽤ThreadPoolExecutor.CallerRunsPolicy。当最⼤池被填满时,此策略为我们提供可伸缩队列。(这个直接查看ThreadPoolExecutor的构造函数源码就可以看出,⽐较简单的原因,这⾥就不贴代码了)

6. 一个简单的线程池 Demo

为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。首先创建一个Runnable接口的实现类(当然也可以是Callable接口,我们上面也说了两者的区别)。
MyRunnable.java:

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

编写测试程序,我们这里以阿里巴巴推荐的使用ThreadPoolExecutor构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

可以看到我们上面的代码指定了:

  • corePoolSize:核心线程数为 5
  • maximumPoolSize:最大线程数 10
  • keepAliveTime:等待时间为 1L
  • unit:等待时间的单位为TimeUnit.SECONDS
  • workQueue:任务队列为ArrayBlockingQueue,并且容量为 100
  • handler:饱和策略为CallerRunsPolicy

Output:

pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020

7. 线程池原理分析

承接 4.6 节,我们通过代码输出结果可以看出:线程池每次会同时执⾏ 5 个任务,这 5 个任务执⾏完之后,剩余的 5 个任务才会被执⾏。 ⼤家可以先通过上⾯讲解的内容,分析⼀下到底是咋回事?(⾃⼰独⽴思考⼀会)

现在,我们就分析上⾯的输出内容来简单分析⼀下线程池原理。

为了搞懂线程池的原理,我们需要⾸先分析⼀下execute⽅法。在 4.6 节中的 Demo 中我们使⽤executor.execute(worker)来提交⼀个任务到线程池中去,这个⽅法⾮常重要,下⾯我们来看看它的源码:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
    return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}

通过下图可以更好的对上⾯这 3 步做⼀个展示,下图是我为了省事直接从⽹上找到,原地址不明。
file
现在,让我们在回到 4.6 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?没搞懂的话,也没关系,可以看看我的分析:

我们在代码中模拟了 10 个任务,我们配置的核⼼线程数为 5 、等待队列容量为 100 ,所以
每次只可能存在 5 个任务同时执⾏,剩下的 5 个任务会被放到等待队列中去。当前的 5 个
任务之⾏完成后,才会之⾏剩下的 5 个任务。

最后修改日期: 2021年11月30日

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。