并发编程之协作

并发编程领域可以抽象为3个核心问题:分工、协作、互斥。

本文介绍协作。

在多线程环境中,多个线程之间会有依赖关系,例如该任务完成后如何通知后续任务开始。Java JDK中提供了Semaphore、CountDownLatch、CyclicBarrier等工具来完成线程之间的协作。这些工具是针对特定场景抽象出的解决方案,若遇到某些特殊场景无法使用工具的,则需要自己完成线程之间的协作。这时可以使用管程(Monitor)来完成,管程是Java并发领域解决协作问题的核心方法,JDK中线程协作工具的底层实现都是管程。

管程

什么是管程

管程是操作系统中的概念,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。管程是解决线程同步和互斥的方法。管程与信号量一样,可以解决所有的并发问题。管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。管程解决了信号量使用不便的问题。

在管程模型里,共享变量和对共享变量的操作是被封装起来的。进入管程只有一个入口,并且在入口旁边还有一个等待队列。当多个线程同时试图进入管程内部时,只允许一个线程可以获取到互斥锁并进入,其他线程则在入口等待队列中等待。

管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。假设有个线程T1要执行出队操作,出队操作在队列不为空时才能执行,队列不空这个前提条件就是管程里的条件变量。如果线程T1进入管程后发现队列是空的,就去条件变量对应的等待队列里面等待。线程T1进入条件变量的等待队列后,会释放互斥锁,允许其他线程进入管程。之后另外一个线程T2执行入队操作,入队操作执行成功,队列不为空。线程T1的条件变量已经满足,此时线程T2要通知T1,告诉它条件已经满足。当线程 T1 得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面,获取互斥锁后才能进入管程。

管程中的入口等待队列解决的是线程互斥问题,条件变量解决的是线程同步问题。

Java中的管程

在Java1.5之前,管程是Java提供的唯一并发原语。

Java有两种管程的实现方式:内置的synchronized和并发工具Lock。

内置管程:synchronized配合wait()、notify()、notifyAll()实现管程。synchronized负责线程互斥,被修饰的代码块会在编译期自动生成相关加锁和解锁的代码。wait()、notify()、notifyAll()负责线程同步。我们锁的是哪个对象,后续就应该调用该对象的wait()、notify()方法。另外还需要注意,wait()、notify()方法需要在synchronized同步块中调用。仅支持一个条件变量。

并发包管程:接口Lock、Condition配合实现管程,Lock负责线程互斥,Condition负责线程同步。需要开发人员进行加解锁操作。支持多个条件变量。

内置管程与并发包管程的区别

在Java1.5之前已经有了synchronized管程,为什么在1.5要引入新的管程工具呢。因为新的管程工具引入了新特性,这些新特性解决了内置管程的一些问题:

  1. 防死锁。死锁的产生需要满足4个条件,只要破坏其中的一个条件,就可以解决死锁。并发包管程破坏了不可抢占这个条件。
    响应中断。synchronized的问题是,在持有锁A的情况下,如果获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。Lock锁能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放锁A。

    支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。

    非阻塞获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。

  2. 支持公平锁。synchronized管程是非公平的,且不支持公平模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//Java SDK并发包实现的管程
public class BlockedQueue<T>{
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

//Java内置的管程
class Test {
synchronized void method1(){
while(条件不满足){
try{
wait();
}catch(Exception e){
}
}
//消耗资源
}

synchronized void mothod2(){
//归还资源
notifyAll();
}
}

异步转同步

在编程领域,异步的场景还是挺多的,比如TCP协议本身就是异步的,但我们在使用RPC调用时,大多数调用过程是同步的。这个TCP协议异步转为RPC同步的过程由RPC框架帮助我们完成。下面是Dubbo的异步转同步的实现方式。使用Lock和Condition实现管程,来达到异步转同步的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() || cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}

// RPC结果是否已经返回
boolean isDone() {
return response != null;
}

// RPC结果返回时调用该方法,唤醒同步过程
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

信号量

什么是信号量

信号量(Semaphore)是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于1965年提出,在这之后的 15 年,信号量一直都是并发编程领域的终结者,直到1980年管程被提出来,我们才有了第二选择。

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。这三个方法分别是:init()、down()和up()。其中down()、up()这两个操作历史上最早称为P操作和V操作。

  • init():设置计数器的初始值。
  • down():计数器减 1。如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行
  • up():计数器加1。如果此时计数器的值小于或者等于0,说明有等待的线程,唤醒等待队列中的一个线程。

信号量与管程的区别

  1. 临界区线程数量不同:信号量允许多个线程进入临界区,管程只允许一个。
  2. 唤醒线程数量不同:信号量只能唤醒阻塞中的一个线程,管程可以唤醒多个线程去争抢锁。
  3. 是否有条件等待:信号量没有条件变量的概念,管程有条件表里。

Semaphore用来控制并发数量,可用作限流器。当数量为1时,Semaphore就退化为普通的锁。Semaphore不可重入。

1
2
3
4
5
6
7
8
9
10
Semaphore semaphore = new Semaphore(10);
public void run() {
try {
semaphore.acquire();
//do something,控制该资源访问并发数为10
} catch (Exception e) {
e.printStackTrace();
}
semaphore.release();
}

CountDownLatch和CyclicBarrier

CountDownLatch是一个计数器闭锁,通过它可以完成线程等待的功能,即一个或多个线程一直等待,直到其他线程完成特定操作。CountDownLatch描述的是等待线程与被等待线程之间的关系,这两种线程属于不同任务类型。

CyclicBarrier可以理解为循环使用的栅栏,当有特定数量的线程到达栅栏处时,该栅栏就会放行这些线程。放行后栅栏自动重置,拦截下一批特定数量的线程。CyclicBarrier主要是实现了多个线程之间相互等待,描述多个线程内部相互等待的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//CountDownLatch
final CountDownLatch latch = new CountDownLatch(1);

//线程1
public void run() {
//do something
latch.countDown();
}

//线程2
public void run() {
try {
latch.await();//需等待线程1完成之后再继续
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//CyclicBarrier
public class CyclicBarrierExample {
private static CyclicBarrier barrier = new CyclicBarrier(2);

public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
try {
//因为barrier的值为2,那么就会两个线程为一组通过栅栏
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}

CyclicBarrier经常被拿来与CountDownLatch比较。

他们主要有两点不同:

  1. CyclicBarrier会自动重置计数器,可循环使用;CountDownLatch是一次性的。
  2. CyclicBarrier描述的是同种任务线程之间的关系;CountDownLatch描述的是两种不同任务线程之间的关系。