Java线程池

时间:2020-07-01 09:30:08   收藏:0   阅读:59

Java线程池

基本概念

特点:

线程池核心设计与实现

线程池运行机制

生命周期管理


任务执行机制

任务调度

流程:

  1. 检查线程池运行状态。若不是RUNNING,则直接拒绝。
  2. workerCount<corePoolSize,则创建并启动一个线程并执行新提交的任务。
  3. workerCount>=corePoolSize,且线程池的阻塞队列未满,则任务添加到阻塞队列中。
  4. workerCount>=corePoolSize并且worker<maximumPoolSize,则线程池的阻塞队列已满,创建并启动一个线程来执行新提交的任务。
  5. workerCount>=maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务。(默认抛出异常)

任务调度流程:
技术图片

任务缓冲

阻塞队列(Blocking Queue):

阻塞队列:
技术图片

常见的阻塞队列:

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,按照FIFO对任务排序,支持公平锁和非公平锁
LinkedBlockingQueue 基于链表结构的阻塞队列,按照FIFO排序任务.可以设置容量,不设置时为无界阻塞队列,最大长度为Integer.MAX_VALUE,newFixedThreadPool使用该队列.
DelayQueue 延迟获取的无界队列,指定多久才能从队列中获取当前元素,只有延时期后才能从队列中获取元素,newScheduledThreadPool线程池使用该队列.
PriorityBlockingQueue 支持线程优先级的无界队列,默认自然序排列,可自定义实现compareTo()方法指定元素的排序规则,不保证同一优先级的先后顺序
SynchronousQueue 不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移出操作,否则插入操作处于阻塞状态。支持公平锁和非公平锁。newCachedThreadPool线程池使用该队列,新任务到来时创建线程,有空闲线程则重复使用,线程空闲60秒会被回收。
LinkedTransferQueue 链表结构组成的无界阻塞队列,额外有transfer()tryTransfer()方法
LinkedBlockingDeque 链表结构组成的双向阻塞队列,队列的头尾均可添加和移出元素,可减低锁的竞争

任务申请

两种任务执行方式:

tryTask()方法实现线程从阻塞队列中获取任务:

  1. 获取线程池状态及线程数量
  2. 线程池是否停止,若停止,则返回null(线程空闲)
  3. 若没有停止,则检查线程数是否过多。若过多,则返回null。
  4. 若没有过多,则检查该线程是否为可回收线程,若是,则限时获取任务。
  5. 若否,则阻塞获取任务。
  6. 任务申请完成。

技术图片

任务拒绝

当线程池的任务缓存队列已满,并且线程池中的线程数达到最大值时,需要拒绝新提交的任务。

可用实现接口RejectExecutionHadler来定制拒绝策略,或者使用四种已有策略:

技术图片


线程的管理

Worker线程

为了管理线程的状态并维护线程的生命周期,设计工作线程Worker

final Thread thread; // 当前Worker持有的线程
Runnable firstTask; // 初始化线程的任务(可为null)
volatile long completedTasks; // 完成的任务数

Worker执行任务的模型:
技术图片

注:

线程的状态:

技术图片

Worker线程增加

新增线程流程:
技术图片

Worker线程回收

线程的销毁:
技术图片

Worker线程执行任务

执行任务流程:
技术图片

线程池的创建

可以通过ThreadPoolExecutor来创建。

构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

参数:

任务执行

线程池执行流程(对应于execute()方法)

  1. 提交一个任务,线程池里存活的核心线程数小于corePoolSie时,线程池创建一个核心线程处理提交的任务。
  2. 若线程池核心线程数已满(线程数等于corePoolSie),新提交的任务被放在任务队列workQueue排队等待执行。
  3. 当线程池里面的存活线程数达到corePoolSize时,并且任务队列workQueue已满,则判断是否到达maximumPoolSize(即是否达到最大线程数)。若没有,则创建非核心线程执行提交的任务。
  4. 若线程数达到maxmumPoolSize时,新的任务采用拒绝策略处理。

技术图片

四种拒绝策略

线程池的异常处理

示例:

// 1. 通过try...catch捕获异常
try {
    System.out.println(3/0);
} catch (Exception e){
    System.out.println(e.getMessage());
}

// 2. sumbit执行,Future.get接收异常
Future<?> future = threadPool.submit(() -> {
    System.out.println(3 / 0); // 产生异常
});

try {
    future.get(); // 接收异常
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

// 3. 设置Thread.UncaughtExceptionHandler处理未检测的异常
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
    Thread t = new Thread(r);
    t.setUncaughtExceptionHandler((t1, e) -> {
        System.out.println(t1.getName() + " 抛出异常 " + e);
    });
    return t;
});

// 4. 重写ThreadPoolExecutor.afterExecute方法,处理传递的异常引用
class ExtendExecutor extends ThreadPoolExecutor {
    public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>){
            try {
                Object result = ((Future<?>) r).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        if (t != null)
            System.out.println(t);
    }
}

常用的线程池

newFixedThreadPool

特点:

工作机制:

  1. 任务被提交.
  2. 检查线程池中线程数是否小于coreSize.若小于,则创建核心线程执行任务.
  3. 若不小于,则将任务加入阻塞队列,等待被取出执行.
  4. 若队列容量一直增加,可能导致OOM.

异常:
使用无界队列可能导致内存飙升.
若一个线程获取任务后,任务的执行时间比较长,导致队列的任务堆积过多,内存不足后抛出OOM.

使用场景:
适用于CPU密集型任务,确保CPU被工作线程使用的情况下,尽可能少的分配线程,适用与执行长期任务.

newCachedThreadPool

特点:

提交的任务速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程.
空闲60秒的线程会被终止,长时间保持空闲的CachedThreadPool不会占用资源.

工作机制:

  1. 提交任务.
  2. 若没有核心线程,则任务直接加到SynchronousQueue队列.
  3. 判断是否有空闲线程,若有,则取出任务执行.
  4. 若没有,则新建一个线程执行.
  5. 执行完任务的线程,还可以存活60秒.若存活期间接收到任务,可以继续存活;否则被销毁.

使用场景:
并发执行大量短期的小任务.

newSingleThreadExecutor

特点:

工作机制:

  1. 提交任务.
  2. 线程池是否有一个线程,若没有,则新建线程执行.
  3. 若有,则将任务加到阻塞队列.
  4. 唯一的线程执行完一个任务后,再从队列中取出任务.

使用场景:
串行执行任务的场景,一个任务接着一个任务地执行.

newScheduledThreadPool

特点:

工作机制:

  1. 添加一个任务.
  2. 线程池中的线程从DelayQueue中取任务.
  3. 线程从DelayQueue中获取time大于等于当前时间的任务.
  4. 执行完后修改这个任务的time为下次被执行的时间.
  5. 将该任务放回DelayQueue队列中.

使用场景:
周期性执行任务的场景,需要限制线程数量的场景.


线程池状态

运行状态 描述
RUNNING 能够接受新提交的任务,也能处理阻塞队列中的任务
SHUTDOWN 关闭状态。不再接受新提交的任务,但可以继续处理阻塞队列中保存的任务
STOP 不能接受新任务,不处理队列中的任务,会中断正在处理任务的线程
TIDYING 所有任务都已终止,workCount为0
TERMINATED terminated()执行完后进入该状态

线程池生命周期:
技术图片

参考:

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!