CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析

时间:2021-03-04 13:23:34   收藏:0   阅读:0

转:

CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析

技术图片
本文主要介绍和对比我们常用的几种并发工具类,主要涉及 CountDownLatchCyclicBarrierSemaphoreExchanger 相关的内容,如果对多线程相关内容不熟悉,可以看笔者之前的一些文章:

  • 《Java并发编程-线程基础》
  • 《总算把线程六种状态的转换说清楚了!》
  • [《[高频面试]解释线程池的各个参数含义》](https://mp.weixin.qq.com/s/mX...
  • 《知道线程池的四种拒绝策略吗?》
  • 《java中常见的六种线程池详解》
  • 《基于synchronized的锁的深度解析》??推荐
  • 《JAVA中常见的阻塞队列详解》
  • 《优雅关闭线程池的方案》

  • 介绍 CountDownLatchCyclicBarrier 两者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段,
  • 介绍 SemaphoreExchanger 的使用,semaphore 是信号量,可以用来控制允许的线程数,而 Exchanger 可以用来交换两个线程间的数据。

CountDownLatch

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

多个线程等待一个线程


    /**
     * @url i-code.onlien
     * 云栖简码
     */
    public static void main(String[] args) throws InterruptedException {
        //模拟跑步比赛,裁判说开始,所有选手开始跑,我们可以使用countDownlatch来实现

        //这里需要等待裁判说开始,所以时等着一个线程
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手1").start();
        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已准备");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("裁判:预备~~~");
        countDownLatch.countDown();
        System.out.println("裁判:跑~~~");
    }

技术图片

在上述代码中,我们首先创建了一个计数为1 的 CountDownLatch 对象,这代表我们需要等待的线程数,之后再创建了两个线程,用来代表选手线程,同时在选手的线程中我们都调用了 await 方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程 main 方法中我们等待 1秒后执行 countDown 方法,这个方法就是减一,此时的 N 则为零了,那么选手线程则开始执行后面的内容,整体的输出如上图所示

一个/多个线程等待多个线程


    public static void main(String[] args) throws InterruptedException {
        /**
         * i-code.online
         * 云栖简码 
         */
        //等待的个数
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "从住所出发...");
                try {
                    TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 到达目的地-----");
                countDownLatch.countDown();
            },"人员-"+i).start();
        }

        System.out.println("大巴正在等待人员中.....");
        countDownLatch.await();
        System.out.println("-----所有人到齐,出发-----");
    }

技术图片

从上述代码中我们可以看到,定义了一个计数为5的 countDownLatch ,之后通过循环创建五个线程,模拟五个人员,当他们到达指定地点后执行 countDown 方法,对计数减一。主线程相当于是大巴车的线程,执行 await 方法进行阻塞,只有当 N 的值减到0后则执行后面的输出

CountDownLatch 主要方法介绍

public CountDownLatch(int count) {  };
它的构造函数是传入一个参数,该参数 count 是需要倒数的数值。
上面的案例介绍了 CountDownLatch 的使用,但是 CountDownLatch 有个特点,那就是不能够重用,比如已经完成了倒数,那可不可以在下一次继续去重新倒数呢?是可以的,一旦倒数到0 则结束了,无法再次设置循环执行,但是我们实际需求中有很多场景中需要循环来处理,这时候我们可以使用 CyclicBarrier 来实现

CyclicBarrier

技术图片

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

案例

    /*
    CyclicBarrier 与countDownLatch 比较相似,也是等待线程完成,
    不过countDownLatch 是await等待其他的线程通过countDown的数量,达到一定数则执行,
    而 CyclicBarrier 则是直接看await的数量,达到一定数量直接全部执行,
     */
    public static void main(String[] args) {
        //好比情侣约会,不管谁先到都的等另一个,这里就是两个线程,
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{
            System.out.println("快速收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("到了约会地点等待女朋友前来~~");
                cyclicBarrier.await();
                System.out.println("女朋友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        },"男朋友").start();
        new Thread(() ->{
            System.out.println("慢慢收拾,出门~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("到了约会地点等待男朋友前来~~");
                cyclicBarrier.await();
                System.out.println("男朋友到来嗨皮出发~~约会");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        },"女朋友").start();

    }

技术图片

上面代码,相对简单,创建一个拦截数为2的屏障,之后创建两个线程,调用await方法,只有当调用两次才会触发后面的流程。

    /*
    CyclicBarrier是可重复使用到,也就是每当几个满足是不再等待执行,
    比如公司组织出游,安排了好多辆大把,每坐满一辆就发车,不再等待,类似这种场景,实现如下:
     */

    public static void main(String[] args) {
        //公司人数
        int peopleNum = 2000;
        //每二十五个人一辆车,凑够二十五则发车~
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
            //达到25人出发
            System.out.println("------------25人数凑齐出发------------");
        });

        for (int j = 1; j <= peopleNum; j++) {
            new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
        }

    }

    static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(name+"从家里出发,正在前往聚合地....");
            try {
                TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name+"到达集合地点,等待其他人..");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

CyclicBarrier 和 CountDownLatch 的异同

相同点:

不同点:

Semaphore

技术图片

案例

    /*
        Semaphore 是信号量, 可以用来控制线程的并发数,可以协调各个线程,以达到合理的使用公共资源
     */

    public static void main(String[] args) {
        //创建10个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(100);
        //设置信号量的值5 ,也就是允许五个线程来执行
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {
            service.submit(() ->{
                try {
                    s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("数据库耗时操作"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在执行....");
                s.release();
            });
        }

    }
如上代码,创建了一个容量100的线程池,模拟我们程序中大量的线程,添加一百个任务,让线程池执行。创建了一个容量为5的信号量,在线程中我们调用 acquire 来获得信号量的许可,只有获得了才能只能下面的内容不然阻塞。当执行完后释放该许可,通过 release 方法,
    private static int count = 0;
    /*
        Semaphore 中如果我们允许的的许可证数量为1 ,那么它的效果与锁相似。
     */
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {
            service.submit(() ->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行了");
                count ++;
                semaphore.release();
            });
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }

其他主要方法介绍

Exchanger

public class ExchangerTest {

    /*
    Exchanger 交换, 用于线程间协作的工具类,可以交换线程间的数据,
    其提供一个同步点,当线程到达这个同步点后进行数据间的交互,遗传算法可以如此来实现,
    以及校对工作也可以如此来实现
     */

    public static void main(String[] args) {
        /*
        模拟 两个工作人员录入记录,为了防止错误,两者录的相同内容,程序仅从校对,看是否有错误不一致的
         */

        //开辟两个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger exchanger = new Exchanger<>();

        service.submit(() ->{
            //模拟数据 线程 A的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程A";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其他...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程A 交换数据====== 得到"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        service.submit(() ->{
            //模拟数据 线程 B的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程B";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其他...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程B 交换数据====== 得到"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("数据不一致~~请稽核");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        service.shutdown();
    }

    static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString() {
            return "InfoMsg{" +
                    "id=‘" + id + ‘‘‘ +
                    ", name=‘" + name + ‘‘‘ +
                    ", message=‘" + message + ‘‘‘ +
                    ", content=‘" + content + ‘‘‘ +
                    ", desc=‘" + desc + ‘‘‘ +
                    ‘}‘;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name, message, content, desc);
        }
    }
}

技术图片

上面代码运行可以看到,当我们线程 A/B 到达同步点即调用 exchange 后进行数据的交换,拿到对方的数据再与自己的数据对比可以做到稽核 的效果


本文由AnonyStar 发布,可转载但需声明原文出处。
欢迎关注微信公账号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online

转:

CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!