FutureTask(未来任务) 源码解析

时间:2021-07-26 16:46:03   收藏:0   阅读:0

FutureTask(未来任务)

一、前情回顾(重要)

首先我们先回顾一下多线程创建的方式

  1. 直接继承Thread方式
  2. 实现Runnable 方式
  3. 实现Callable方式
  4. 线程池方式

这四种方式主要分为两类:没返回值的(1,2) 有返回值的(3,4)

没返回值的相信已经烂熟于心了。这次我们讲讲有返回值的,下面先给出3,4的两种创建多线程的示例:

1.实现Callable方式

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
		//创建任务
        FutureTask task = new FutureTask(() -> {
            System.out.println(Thread.currentThread().getName() + " is running");
            return 200;
        });
        
        //创建线程
        new Thread(task,"A").start();
		//获得任务的返回值
        System.out.println(task.get());
    }
}

/**
执行结果:
	A is running
	200
**/

提问?

1.Thread为什么能接收FutureTask?
    
2.为什么线程能执行Callable实现的call方法?

2.线程池方式

//实现Runnable
class RunnableDemo implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " runnable is running");
    }
}

//实现Callable
class CallableDemo implements Callable{

    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName() + " callable is running");
        return 200;
    }
}

public class ThreadPoolTest {
    public static void main(String[] args) throws Exception {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
		
        //提交并获得任务
        Future callableTask = threadPool.submit(new CallableDemo());
        Future runnableTask = threadPool.submit(new RunnableDemo(), 400);
		
        //根据任务获得返回值
        System.out.println(callableTask.get());
        System.out.println(runnableTask.get());
		
        //关闭线程池
        threadPool.shutdown();
    }
}

/**
	pool-1-thread-1 callable is running
    200
    pool-1-thread-2 runnable is running
    400
**/

提问?

1. 为何线程池中能传入Callable 或 Runnable的实现?
    
2. submit的方法返回的Future类是什么?

3. 为何也都可以根据任务获取返回值,且Runnable也有返回值?

你在学习以上两种方式创建多线程的时候是否会有这些疑问呢,根据上面的几个提问,来一点点获取线索,相信自己最终可以破案。

3.获取线索

我们首先要找出 实现Callable方式线程池方式 两者的共同点。

public interface ExecutorService extends Executor{
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
}

不难发现,ExecutorService是一个接口,那么就直接去看他的一个实现类

public abstract class AbstractExecutorService implements ExecutorService {
    
    //提交Runnable实现 不带任务返回值
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
	//提交Runnable实现 带任务返回值
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
	
    //提交Callable实现 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

有三个submit() 的重载方法,其中有个共同的方法就是newTaskFor()submit的传参的值原封不动的传了进去,且返回了个 RunnableFuture ,那么直接来看这个newTaskFor()

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

惊奇的发现它内部也是 new FutureTask()

那么接下来就来到了我们今天的主题 解密 FutureTask

二、继承与实现

public class FutureTask<V> implements RunnableFuture<V> 

public interface RunnableFuture<V> extends Runnable, Future<V> 

FutureTask的实现关系来看,它实际上是实现了 RunnableFuture 这两个接口

这里也就解释了之前的一个问题:

问:Thread为什么能接收FutureTask?
    
答:因为FutureTask实现了Runnable接口。

三、构造方法

    //接收Callable实现的构造
	public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        //赋值 Callable的实现 
        this.callable = callable;
        //将当前任务标记为 NEW(任务尚未执行)
        this.state = NEW;       // ensure visibility of callable
    }

	//接收Runnable实现的构造
    public FutureTask(Runnable runnable, V result) {
        //将Runnable包装成Callable
        this.callable = Executors.callable(runnable, result);
        //将当前任务标记为 NEW(任务尚未执行)
        this.state = NEW;       // ensure visibility of callable
    }

接收Runnable实现的构造中,使用了适配器模式Runnable包装成Callable

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        //使用的是适配器模式将runnable转换为了 callable接口,外部线程 通过get获取当前任务执行结果时
    	//结果可能为null 也可以能为 传进来的result值
        return new RunnableAdapter<T>(task, result);
    }

	//实现了Callable,并实现了
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            //将runnable的实现赋值给task
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

RunnableAdapter实现了Callable 并在实现call()方法中调用了Runnable实现的run()方法。这样就成功将Runnable 包装成了 Callable

四、成员变量及内部类

    //当前任务的状态
	private volatile int state;
	
	//以下7个对应任务有哪些状态
	
	//当前任务尚未执行
    private static final int NEW          = 0;
	//当前任务正在结束,一种临界状态
    private static final int COMPLETING   = 1;
	//当前任务正常结束,且有正常的返回值
    private static final int NORMAL       = 2;
	//当前任务异常结束,且会向上抛异常
    private static final int EXCEPTIONAL  = 3;
	//当前任务被取消
    private static final int CANCELLED    = 4;
	//当前任务中断中...
    private static final int INTERRUPTING = 5;
	//当前任务已中断, 中断就是个标记,并不代表已经结束了。
    private static final int INTERRUPTED  = 6;

	//构造方法中传进来的 runnable/callable, 
	//callable直接赋值
	//runnable 经过 RunnableAdapter包装成Callable后赋值
    private Callable<V> callable;
	
	//存放任务的执行结果,有两种结果
	//正常情况下,任务正常执行结束, outcome保存 callable的call方法的返回值
	//异常情况下,callable的 call方法向上抛出异常,outcome保存异常
    private Object outcome;
	
	//当前任务被线程执行期间,保存当前执行任务的线程的对象引用,有且仅有一个线程。
    private volatile Thread runner;
	
	//因为会有很多线程来get当前任务的结果, 所以这里使用了链表,来存放这些线程。
    private volatile WaitNode waiters;

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

五、成员方法

看下面的代码时,需要规定一个场景

场景 : 多线程并发

1.执行入口run()方法

为什么说run()是执行入口?

当我们启动线程时,最基本的就是new Thread().start()
start()内部实际上又调用了 native start0() 方法
通过操作系统为我们创建一个线程,并调用run() 方法。
最终会执行 target.run() 也就会调用Runnable实现的run()方法 (详情查看Thread源码)

由于FutureTaskRunnable的实现,因此该 run()方法是执行入口。

    public void run() {
        
        /**
        	判断当前任务状态是否为未执行
        	state != NEW  
            
            线程池中的所有线程会争抢该task任务,
            通过CAS控制并发,赋值runner属性,也就意味着只有一个线程能够抢到该任务去执行
        	!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
        **/
        //该判断的目的主要是为了保证只能有一个线程来执行该任务。
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        
        //执行到这里,当前task一定是NEW状态,且只有一个当前线程抢占成功,runner属性是当前线程的引用。
        try {
            //callable就是程序员自己实现的callable/通过装饰后的runnable(RunnableAdapter)
            Callable<V> c = callable;
            
            //相当于是一个DCL
            //条件一:c != null  防止空指针
            //条件二:state==NEW 防止被其它线程Cancel了,所以再次确认下任务状态  
            if (c != null && state == NEW) {
                
                //执行call的返回结果的引用
                V result;
                
                //true  表示callable.call 执行成功 未抛出异常
                //false 表示callable.call 执行失败 抛出异常
                boolean ran;
                try {
                    //执行程序员实现的业务逻辑
                    result = c.call();
                    //若没抛异常,则设置为true
                    ran = true;
                } catch (Throwable ex) {
      				//若抛异常,则设置为false,返回值也置为null
                    result = null;
                    ran = false;
                    //TODO 后面会说... (透露:会将异常信息设置到outcome中)
                    setException(ex);
                }
                //如果为true 代表执行成功,
                if (ran)
                    //TODO 后面会说...  (透露:会将成功返回值结果设置到outcome中)
                    set(result);
            }
        } finally {
			//将执行当前任务的线程引用置为null, 表示该任务没有线程对其进行执行了
            runner = null;
            
            // 判断当前任务状态是不是中断状态
            int s = state;
            if (s >= INTERRUPTING)
                //如果是中断状态,那么该任务会一直死循环在该处,直到由外部对其进行cancel 或者 unpark
                handlePossibleCancellationInterrupt(s);
        }
    }

下面是 run()中调用的方法

    protected void set(V v) {
        
        //使用CAS方式,设置当前任务状态完成中...
        //有没有可能失败呢? 外部线程等不及了,直接在set执行CAS前,将task取消了
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //赋值正常返回值
            outcome = v;
            //结果赋值结束后,马上会将当前任务状态修改为 NORMAL状态 表示正结束
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //TODO 该方法后面再说...
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        //使用CAS方式,设置当前任务状态完成中...
        //有没有可能失败呢? 外部线程等不及了,直接在setException执行CAS前,将task取消了
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
           //赋值异常信息
            outcome = t;
            //将当前任务状态修改为 EXCEPTIONAL状态 表示异常状态
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //TODO 该方法后面再说...
            finishCompletion();
        }
    }
    private void handlePossibleCancellationInterrupt(int s) {
		
        //若任务状态是中断中... 则一直在此死循环,且一直释放CPU,直到该状态不是中断中...
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

2.获取任务结果get()方法

get()方法有两种,一种是不带超时的,一种是带超时的。这里我们主要看不带超时的。

    public V get() throws InterruptedException, ExecutionException {
        //获得当前任务状态
        int s = state;
        
        //条件成立:当前任务状态为 NEW,COMPLETING 表示当前任务还未执行或正在完成中..
        if (s <= COMPLETING)
            
            //所有调用get()方法的线程 会被阻塞在此
            //直到会得到结果(任务的状态返回值) 或者 向上抛出中断异常
            s = awaitDone(false, 0L);
        
        //返回 结果或者抛异常
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        
        //timed = false
        //deadline = 0L
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        
        //引用当前线程 封装成WaitNode对象
        WaitNode q = null;
        
        //表示当前get()的线程有没有入队
        boolean queued = false;
        
        //自旋
        for (;;) {
            //条件成立: 当前执行get的线程被interrupt 叫醒
            if (Thread.interrupted()) {
                //将引用当前线程的WaitNode对象 移除队,且在移除的过程中同样会移除其它q=null的节点对象
                removeWaiter(q);
                //向上抛出中断的异常
                throw new InterruptedException();
            }
			
            //获取当前任务的状态
            int s = state;
            
            //条件成立:说明当前任务已经有结果了,可能是好结果,也可以能使抛出异常的结果
            if (s > COMPLETING) {
                
                //条件成立:说明当前线程已经创建过WaitNode节点,
                if (q != null)
                    
                    //此时由于得到了结果,需要将q.thread = null  
                    //helpGC 或者 当有线程执行removeWaiter方法时 会主动将该节点移除 
                    q.thread = null      
                    
                    
                //直接返回当前任务的状态
                return s;
            }
            
        //条件成立:说明当前任务正在完成中,还差赋值和设置成NORMAL或者EXCEPTIONAL两步
            else if (s == COMPLETING) // cannot time out yet
       //由于任务快要完成了,因此主动释放CPU,进行下一次CPU的抢占,目的是尽量让执行任务的线程去抢到CPU
                Thread.yield();
            
            //条件成立: 第一次自旋,当前任务是NEW状态,当前线程还未创建WaitNode对象
            else if (q == null)
                
                //为当前线程创建WaitNode对象
                q = new WaitNode();
            
      //条件成立:第二次自旋,当前任务是NEW状态,当前线程已经创建WaitNode对象,但是WaitNode对象还为入队
            else if (!queued)
                
                //头插法入队
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
     //条件成立:第三次自旋 当前任务是NEW状态,由于规定timed=false,则不会进入这个判断,会进入下面的else        //也就意味着 如果是带超时的会走当前判断, 不带超时的会走下一个else判断
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            
            //不带超时的
            //当前get操作的线程就会被park,线程会进入waiting状态 休眠
            //除非有其它线程将其unpark(thread) 唤醒 或者 将当前线程中断 才会进入下一次自旋
            else
                LockSupport.park(this);
        }
    }

返回给用户结果的方法,该结果可能是好结果,也可能是抛异常的结果

    private V report(int s) throws ExecutionException 
        //outcome 保存的是Callable运行结束的结果,可能是正常的好结果,也可以能是异常的信息
        Object x = outcome;

		//条件成立:当前任务状态正常结束
        if (s == NORMAL)
            //直接返回好结果
            return (V)x;

		//条件成立:任务被取消 或 中断
        if (s >= CANCELLED)
            //抛出异常
            throw new CancellationException();
		//执行到这 说明任务状态为EXCEPTIONAL 程序员的Callable实现的代码有bug了
        throw new ExecutionException((Throwable)x);
    }

3.finishCompletion()方法

    private void finishCompletion() {
        // assert state > COMPLETING;
        
        //循环Waiters链表
        for (WaitNode q; (q = waiters) != null;) {
            
            //使用CAS设置Waiters为null,是因为防止外部线程使用cancel()方法取消当前任务
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    
                   	//获取当前Node节点封装的thread
                    Thread t = q.thread;
                    
                    //条件成立:说明当前下次呢很难过不会为null
                    if (t != null) {
                        
                        //help GC
                        q.thread = null;
                       	//唤醒get()阻塞的的线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    //如果到链表末尾了就退出循环
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
		//可扩展的点
        done();
		
        //将callable 设置为null  help GC
        callable = null;        // to reduce footprint
    }

4.取消任务 cancel()方法

    public boolean cancel(boolean mayInterruptIfRunning) {
        
		//也就意味着 当前任务状态如果是NEW 不允许被cancel 将状态改为INTERRUPTING 或者CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        
        //执行到这就说明 当前任务是从非NEW状态 被改为了INTERRUPTING 或CANCELLED 的
        //mayInterruptIfRunning 
        //        true: INTERRUPTING  false:CANCELLED
        try {   
            //如果是true 也就是状态改成了INTERRUPTING
            if (mayInterruptIfRunning) {
                try {
                    //将执行当前任务的线程取出来
                    Thread t = runner;
                   	//判断是否有线程执行该任务
                    if (t != null)
                        //有的话  则中断该线程
                        t.interrupt();
                } finally { // final state
                    //最终将该任务的状态设置为 INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //然后唤醒所有执行get()阻塞的线程
            finishCompletion();
        }
        return true;
    }
评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!