NingG +

Java并发:concurrent 包

21.7 新类库中的构件

Java SE5的 java.util.concurrent 包中引入了大量设计用来解决并发问题的新类。学习这些“工具类”就可以专注于自己想要实现的功能而不是线程的同步、死锁等一堆令人头疼的细节。这一小节内容非常多,建议的学习方法是:

嗯,上面总结了一下学习这个小节的步骤(其实是因为太多了。。。。。我不想全看 T_T),那么我们就把目录摘出来看看吧。

一、前言

下面是21.7小节的目录。嗯,发现一共是7个构件,现在从文档出发,逐个浏览一下:

下面我们先简单的“望文生义”一下,然后再逐个击破:)

二、代码来了

下面给每个构件都写个小例子,然后总结一下它们产生的原因和最佳使用场景。go go go!!

1. CountDownLatch

CountDownLatch 被用于:

  1. 主线程等待多个子线程执行结束后
  2. 主线程再执行

因此,具体使用过程中:

  1. 主线程:定义 CountDownLatch 需要等待的子线程个数
  2. 子线程:调整 CountDownLatch 的剩余线程数
  3. 主线程:countDownLatch.await() 阻塞等待子线程执行结束

从上面过程可以看出:主线程定义 CountDownLatch,子线程调整 CountDownLatch,主线程在 CountDownLatch 上保持等待状态。

文档也太详细了吧:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon – the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier【和 CyclicBarrier 的区别】.

A CountDownLatch is a versatile(多功能的) synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.【这里是使用场景:count=1为开关;count=N 重复 N 次】

A useful property of a CountDownLatch is that it doesn’t require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.

同时文档提供了演示代码:

 Here is a pair of classes in which a group of worker threads use two countdown latches:
 
 1.The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
 2.The second is a completion signal that allows the driver to wait until all workers have completed.
  
 class Driver { // ...
	void main() throws InterruptedException {
	  CountDownLatch startSignal = new CountDownLatch(1);
	  CountDownLatch doneSignal = new CountDownLatch(N);
 
	  for (int i = 0; i < N; ++i) // create and start threads
		new Thread(new Worker(startSignal, doneSignal)).start();
 
	  doSomethingElse();            // don't let run yet
	  startSignal.countDown();      // let all threads proceed
	  doSomethingElse();
	  doneSignal.await();           // wait for all to finish
	}
  }
 
  class Worker implements Runnable {
	private final CountDownLatch startSignal;
	private final CountDownLatch doneSignal;
	Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
	   this.startSignal = startSignal;
	   this.doneSignal = doneSignal;
	}
	public void run() {
	   try {
		 startSignal.await();
		 doWork();
		 doneSignal.countDown();
	   } catch (InterruptedException ex) {} // return;
	}
 
	void doWork() { ... }
  }

文档已经够清晰了,这里就不多废话了。

2. CyclicBarrier

使用 CyclicBarrier,多个子线程之间相互等待,具体操作:

  1. 主线程:定义 CyclicBarrier 需要等待的子线程个数
  2. 子线程:调用 CyclicBarrier.await() 等待其他线程

直译为循环栅栏,通过它可以让一组线程全部到达某个状态后再同时执行,也就是说假如有5个线程协作完成一个任务,那么只有当每个线程都完成了各自的任务(都到达终点),才能继续运行(开始领奖)。循环的意思是当所有等待线程都被释放(也就是所有线程完成各自的任务,整个程序开始继续执行)以后,CyclicBarrier 可以被重用。而上面的 CountDownLatch 只能用一次。

所有线程达到指定的状态后,一起继续执行:

package top.ningg.java.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class TestOfCyclicBarrier {

    public static void main(String[] args) {
        // 说明:
        // 1. 创建 CyclicBarrier 时,可以指定一个 Runnable 的任务;
        // 2. 所有线程都到齐后,先执行这个任务,之后才会继续执行;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int index = 0; index < 5; index++) {
            Thread newThread = new Thread(new TaskOfCyclicBarrier(cyclicBarrier));
            newThread.start();
        }
    }

}

class TaskOfCyclicBarrier implements Runnable {

    private CyclicBarrier cyclicBarrier;

    public TaskOfCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("等待所有人到齐后,开始开会...");
        try {
            TimeUnit.SECONDS.sleep(3);
            cyclicBarrier.await();
            System.out.println("开始开会...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

CyclicBarrier 和 CountDownLatch 之间的差异:

  1. CyclicBarrier是多个线程 互相等待,CountDownLatch是一个线程 等待多个线程
  2. CyclicBarrier可以重复使用,而CountDownLatch 一次有效

3. DelayQueue

DelayQueue 就是一个无界队列,是用 PriorityQueue 实现的 BlockingQueue,如果要使用 DelayQueue,其中的元素必须实现 Delayed 接口,Delayed 接口有2个方法需要重写:compareTo()和 getDelay()方法。因为使用的是优先队列,所以需要确定元素之间的优先级,那么重写 compareTo()就很明显了,又为了满足 DelayQueue 的特性(每次队头是延期到期时间最长的元素),那么就需要知道元素的到期时间,而这个时间就是通过 getDelay()获取的。

延迟到期时间最长:这个刚看的时候还挺迷糊的,现在知道了。就是到期之后保存时间最长的元素。比如2个元素,在10:00:00这个时间点都到期了,但是 A 元素到期后保存时间为2分钟,B 元素到期后保存时间为1分钟,那么优先级最高的肯定是 A 元素了(本质来说,这个 order 是通过小顶堆维护的,所以获取延迟到期时间最长元素的时间复杂度为 O(lgN))。

写了一个例子,但是因为输出有点问题,就看了一下 DelayQueue 的源码,发现里面的实现是委托给 PriorityQueue 的,于是写了篇文章跟了下 PriorityQueue 的基本操作( 也是 DelayQueue 的基本操作),结合文档和源码和我给的例子,应该就非常 easy 了:PriorityQueue 源码剖析

4. PriorityBlockingQueue

  1. BlockingQueue:线程安全
    • ArrayBlockingQueue:
    • LinkedBlockingQueue:
  2. Queue:非线程安全

哈哈,前面刚看完 PriorityQueue 的源码,这里就遇到了 PriorityBlockingQueue,其实 PriorityBlockingQueue就是用 PriorityQueue 实现的 BlockingQueue,所以没啥可说的。写了个例子低空掠过:

package concurrency;

import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;

class Leader implements Comparable {
	private String name;
	private int degree;

	public Leader(String name, int degree) {
		this.name = name;
		this.degree = degree;
	}

	@Override
	public int compareTo(Object o) {
		Leader leader = (Leader) o;
		return leader.degree - this.degree;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getDegree() {
		return degree;
	}

	public void setDegree(int degree) {
		this.degree = degree;
	}

}

public class WhoGoFirst {

	// 通过随机数给领导分级别
	private static PriorityBlockingQueue<Leader> leaders = new PriorityBlockingQueue<Leader>();

	public static void watchFilm(Leader leader) {
		leaders.add(leader);
	}

	public static void goFirst(PriorityBlockingQueue<Leader> leaders) {
		try {
			while (!leaders.isEmpty()) {
				Leader leader = leaders.take();
				System.out.println("级别: " + leader.getDegree() + "的 " + leader.getName() + " 正在撤离...");
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		Random random = new Random();
		for (int i = 1; i <= 10; i++) {
			watchFilm(new Leader("leader " + i, random.nextInt(10)));
		}

		System.out.println("所有领导已经就坐,开始播放电影:速度与激情7...");

		System.out.println("着火了!!!");

		goFirst(leaders);

	}
}/*output:	
所有领导已经就坐,开始播放电影:速度与激情7...
着火了!!!
级别: 8 leader 3 正在撤离...
级别: 7 leader 8 正在撤离...
级别: 6 leader 4 正在撤离...
级别: 6 leader 9 正在撤离...
级别: 6 leader 2 正在撤离...
级别: 5 leader 5 正在撤离...
级别: 4 leader 6 正在撤离...
级别: 4 leader 7 正在撤离...
级别: 2 leader 10 正在撤离...
级别: 0 leader 1 正在撤离...
*/

5. ScheduledExcutor

这个小节讲的是定时触发任务,知道 crontab 的应该都不陌生。看完以后我 google 了一下,发现几个类似功能的类,先知道有这几个东西,用到了再具体看文档吧。

给个随便搜到的资料:几种任务调度的 Java 实现方法与比较

对于举例子的 ScheduledThreadPoolExecutor,大概看下源码,本质是使用 DelayWorkQueue 实现的 BlockingQueue。其中 DelayWorkQueue 和 DelayQueue 类似,不过没有复用 DelayQueue 中用到的 PriorityQueue,而是自己捯饬了一个新的小(大)顶堆。看来concurrent 也不是100%完美的代码呀,哈哈哈。

public class ScheduledThreadPoolExecutorTest {
	public static void main(String[] args) {
		ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
		BusinessTask task = new BusinessTask();
		//1秒后开始执行任务,以后每隔2秒执行一次
		executorService.scheduleWithFixedDelay(task, 1000, 2000,TimeUnit.MILLISECONDS);
	}

	private static class BusinessTask implements Runnable{
		@Override
		public void run() { 
			System.out.println("任务开始...");
			// doBusiness();
			System.out.println("任务结束...");
		}
	}
}

嗯,这个例子虽然简单,但是我想说几点:

但是为什么 doBusiness() 抛出异常就会中止定时任务的执行呢?看文档就知道了:

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.

简单翻译就是:

创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。

所以上面的例子应该改成下面这样:

 public class ScheduledThreadPoolExecutorTest {
	 public static void main(String[] args) {
		 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
		 BusinessTask task = new BusinessTask();
		 //1秒后开始执行任务,以后每隔2秒执行一次
		 executorService.scheduleWithFixedDelay(task, 1000, 2000,TimeUnit.MILLISECONDS);
	 }
 
	 private static class BusinessTask implements Runnable{
		 @Override
		 public void run() { 
			 //捕获所有的异常,保证定时任务能够继续执行
			 try{
				 System.out.println("任务开始...");
				 // doBusiness();
				 System.out.println("任务结束...");
			 }catch (Throwable e) {
				 // solve the exception problem
			 }
		 }
	 }
 }

6. Semaphore

Semaphore 是一个计数信号量,平常的锁(来自 concurrent.locks 或者内建的 synchronized)再任何时刻都只能允许一个任务访问一项资源,但是 Semaphore 允许 N 个任务同时访问这个资源。你还可以将信号量看做是在向外分发使用资源的“许可证”,尽管实际上没有使用任何许可证对象。

总结来说,一般的锁是保证一个资源只能被一个任务访问;Semaphore 是保证一堆资源可以同时有多个任务访问。举个例子,现在有一个厕所,5个坑位,如果使用 synchronized 的话,同步厕所就只能让1个人进入,浪费了4个坑位;稍微往前一步是使用 BlockingQueue(如果你用 synchronized 来同步5个坑位就很复杂多了),再往前一步,concurrent 提供了 Semaphore ,它通过 acquire()和 release()来保证资源的分发使用。

7. Exchanger

终于来到21.7小节的最后一个构件了!!!!

这个构件很简单,是为了让两个任务交换对象,当两个任务进入 Exchanger 提供的“栅栏”时,他们各自拥有一个对象,当它们离开时,都拥有了之前由对方拥有的对象。为什么要有这么个东西呢?考虑下面的场景:

一个任务在创建对象,这些对象的生产/销毁代价都非常高。上面 Semaphore 的例子还算靠谱,因为我用完了资源并没有销毁,直接还给资源池了,然后立马可以被复用。但是如果两个线程需要知晓对方的工作状态信息,就可以用 Exchanger 交换各自的工作状态。

参考来源

Top