并发工具类

#简介

在并发包java.util.concurrent中提供了一些并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,而Exchanger工具类则提供了在线程间交换数据的一种手段。

#CountDownLatch(线程计数器)

CountDownLatch允许一个或多个线程等待其他线程完成操作。

案例场景:

解析一个Excel中的多个sheet数据。考虑使用多线程,每个线程解析一个sheet数据,等到所有sheet解析完成,程序需要提示解析完成。

方法1:采用join()方法。join用于让当前执行线程等待join线程执行结束,直到join线程终止后,线程的this.notifyAll()方法会被调用。

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(() -> {});
        Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));
        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }
}

方法2:采用CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch c = new CountDownLatch(2);
        for (int i = 1; i <= 2; i++) {
            new Thread(() -> {
                System.out.println("parser" + Thread.currentThread().getName() + "正在处理...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("parser" + Thread.currentThread().getName() + "处理完毕...");
                c.countDown();
            }, String.valueOf(i)).start();
        }
        c.await();
        System.out.println(Thread.currentThread().getName() + ": all parser finish");
    }
}

结果:

concurrent-utils-countdownlatch.png

CountDownLatch的构造函数传入一个int类型参数作为计数器。当调用countDown()方法时,计数器减一,await()方法会阻塞当前线程直到计数器的值减为0。另外,为了避免某一个线程执行时间过长,可以用await(long time,TimeUnit unit)方法指定等待时间,超过等待时间后就会不再阻塞当前线程。

#CyclicBarrier(循环栅栏)

CyclicBarrier字面意思是可循环使用的屏障。作用是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续允许。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier已经到达屏障,然后当前线程被阻塞。下例中当主线程和子线程都到达CyclicBarrier,屏障打开。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo01 {
    public static void main(String[] args) {
        int N = 2;
        CyclicBarrier c = new CyclicBarrier(N);
        new Thread(() -> {
            try {
                c.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        }).start();
        try {
            c.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

同时CyclicBarrier还提供更高级的构造方法CyclicBarrier(int parties, Runnalbe barrierAction),用于再线程到达屏障时,优先执行barrierAction,方便更复杂的业务场景。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo02 {
    public static void main(String[] args) {
        int N = 2;
        CyclicBarrier c = new CyclicBarrier(N, new MyThread());
        new Thread(() -> {
            try {
                c.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(1);
        }).start();

        try {
            c.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }

    static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

当所有线程到达CyclicBarrier后,屏障打开,优先执行MyThread,然后再执行其它线程。

CyclicBarrier和CountDownLatch的区别:

CountDownLatch的计数器只能用1次,而CyclicBarrier的计数器可以使用reset()方法重置处理更复杂的业务场景。

#Semaphore(信号量)

Semaphore可以指定多个线程同时访问某个资源,synchronized和ReentrantLock都是一次允许一个线程访问某个资源。

应用场景:用于流量控制。

案例::若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    private static final int WORKER_COUNT = 8;
    private static Semaphore s = new Semaphore(5);

    public static void main(String[] args) {
        for (int i = 1; i <= WORKER_COUNT; i++) {
            new Worker(i, s).start();
        }

    }

    static class Worker extends Thread {
        private int num;
        private Semaphore semaphore;

        public Worker(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("Worker" + this.num + " is working on a machine.");
                Thread.sleep(2000);
                System.out.println("Worker" + this.num + " has finished the work and released the machine.");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

concurrent-utils-semaphore.png

使用Semaphore的过程中,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。

#Exchanger(线程间交换数据)

Exchanger是一个用于线程间协作的工具类,用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。

采用exchange()方法来交换数据:若第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出的数据传递给对方。

案例:队伍的选手交易

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerDemo {
    private static final Exchanger<String> EXGR = new Exchanger<>();

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(new PlayerExchange("T1", "P1", EXGR));
        es.execute(new PlayerExchange("T2", "P2", EXGR));
        es.execute(new PlayerExchange("T3", "P3", EXGR));
        es.execute(new PlayerExchange("T4", "P4", EXGR));
        es.execute(new PlayerExchange("T5", "P5", EXGR));
        es.execute(new PlayerExchange("T6", "P6", EXGR));
        es.shutdown();
    }

    static class PlayerExchange extends Thread {
        private String teamName;
        private String playernName;
        private Exchanger<String> exchanger;

        public PlayerExchange(String teamName, String playerName, Exchanger<String> exchanger) {
            this.teamName = teamName;
            this.playernName = playerName;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            Trade(teamName, playernName, exchanger);
        }

        public static void Trade (String teamName, String playerName, Exchanger<String> exchanger) {
            try {
                System.out.println(teamName + "在交易截止前把" + playerName + "交易出去");
                Thread.sleep((long) Math.random() * 1000);
                System.out.println(teamName + "交易得到" + exchanger.exchange(playerName));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

concurrent-utils-exchanger.png

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。