多线程获取结果还在使用Future轮询吗?CompletionService快来了解下吧。

时间:2020-07-10 09:19:49   收藏:0   阅读:62
背景

二胖上次写完参数校验(《二胖写参数校验的坎坷之路》)之后,领导一直不给他安排其他开发任务,就一直让他看看代码熟悉业务。二胖每天上班除了偶尔跟坐在隔壁的前端小姐姐聊聊天,就是看看这些
枯燥无味的业务代码,无聊的一匹。虽然二胖已是久经职场的老油条了,但是看到同事们的周报都写的满满的,而自己的周报,就一两行,熟悉了什么功能。心里还是慌得一匹,毕竟公司不养闲人啊。于是乎二胖终于鼓起勇气为了向领导表明自己的上进心,主动向领导要开发任务。领导一看这小伙子这么有上进心,于是就到任务看板里面挑了一个业务逻辑比较简单的任务分配给了二胖。二胖拿到这个任务屁颠屁颠的回到座位。任务比较简单,就是通过爬虫去爬取某些卖机票(某猪、某携、某团等)的网站的一些机票,然后保存到数据库。

同步入库

二胖拿到任务,三下五除二就把任务完成了。

 public static void main(String[] args) throws InterruptedException {
        String mouZhuFlightPrice = getMouZhuFlightPrice();
        String mouXieFlightPrice = getMouXieFlightPrice();
        String mouTuanFlightPrice = getMouTuanFlightPrice();
        saveDb(mouZhuFlightPrice);
        saveDb(mouXieFlightPrice);
        saveDb(mouTuanFlightPrice);
    }

    /**
     * 模拟请求某猪网站 爬取机票信息
     *
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouZhuFlightPrice() throws InterruptedException {
        // 模拟请求某猪网站 爬取机票信息
        Thread.sleep(10000);
        return "获取到某猪网站的机票信息了";
    }

    /**
     * 模拟请求某携网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouXieFlightPrice() throws InterruptedException {
        // 模拟请求某携网站 爬取机票信息
        Thread.sleep(5000);
        return "获取到某携网站的机票信息了";
    }

    /**
     * 模拟请求团网站 爬取机票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouTuanFlightPrice() throws InterruptedException {
        // 模拟请求某团网站 爬取机票信息
        Thread.sleep(3000);
        return "获取到某团网站的机票信息了";
    }

    /**
     * 保存DB
     *
     * @param flightPriceList
     */
    public static void saveDb(String flightPriceList) {
            // 解析字符串 进行异步入库
    }

这次二胖学乖了,任务完成了先去找下坐他对面的技术大拿(看他那发际线就知道了)同事“二狗”让二狗大拿帮忙指点一二,看看代码是否还能有优化的地方。毕竟领导对代码的性能、以及代码的优雅是有要求的。领导多次在部门的周会上提到让我们多看看“二狗”写的代码,学习下人家写代码的优雅、抽象、封装等等。二狗大概的瞄了下二胖写的代码,提出了个小小的建议“这个代码可以采用多线程来优化下哦,你看某猪这个网站耗时是拿到结果需要10s,其他的耗时都比它短,先有结果的我们可以先处理的,不需要等到大家都返回了再来处理的”。

轮循futureList获取结果

幸好二胖对多线程了解一点点,于是乎采用future的方式来实现。二胖使用一个List来保存每个任务返回的Future,然后去轮询这些Future,直到每个Future都已完成。由于需要先完成的任务需要先执行,且不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0

  public static void main(String[] args) {
        int taskSize = 3;
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
        List<Future<String>> futureList = new ArrayList<>();
        futureList.add(mouZhuFlightPriceFuture);
        futureList.add(mouXieFlightPriceFuture);
        futureList.add(mouTuanFlightPriceFuture);
        // 轮询,获取完成任务的返回结果
        while (taskSize > 0) {
            for (Future<String> future : futureList) {
                String result = null;
                try {
                    result = future.get(0, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常
                }
                // 任务已经完成
                if (result != null) {
                    System.out.println("result=" + result);
                    // 从future列表中删除已经完成的任务
                    futureList.remove(future);
                    taskSize--;
                    // 此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
                    break; // 进行下一次while循环
                }
            }
        }
    }

上述代码有两个小细节需要注意下:

  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

        // 创建阻塞队列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
        // 异步保存所有机票价格
        for (int i = 0; i < 3; i++) {
            String result = blockingQueue.take();
            System.out.println(result);
            saveDb(result);
        }
    }

    private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
        try {
            blockingQueue.put(flightPriceFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。

  final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletionService completionService = new ExecutorCompletionService(executor);
        completionService.submit(() -> getMouZhuFlightPrice());
        completionService.submit(() -> getMouXieFlightPrice());
        completionService.submit(() -> getMouTuanFlightPrice());
        for (int i = 0; i < 3; i++) {
            String result = (String)completionService.take().get();
            System.out.println(result);
            saveDb(result);
        }
    }

当我们使用了CompletionService不用遍历future列表,也不需要去自定义队列了,代码变得简洁了。下面我们就来分析下CompletionService实现的原理吧。

CompletionService 介绍

结束

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!