并发编程之分工

并发编程领域可以抽象为3个核心问题:分工、协作、互斥。

本文介绍分工。

所谓分工,类似于现实中完成一个项目,项目经理要拆分任务,安排合适的成员去完成。在并发编程领域,线程就是成员,分工直接决定了并发程序的性能。Java并发包里的Executor、Fork/Join、Future本质上都是一种分工方法。除此之外,并发编程领域还总结了一些设计模式,基本上都是和分工方法相关的,例如生产者-消费者、Thread-Per-Message、Worker Thread模式等都是用来指导如何分工的。

线程池

虽然在Java语言中创建线程看上去就像创建一个对象一样简单,只需要new Thread()就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在JVM的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。我们可以使用线程池来避免创建销毁线程的开销。

Java线程池采用的是生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者。Java提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor。

ThreadPoolExecutor共有7个参数可设置

  • corePoolSize 线程池核心线程数
  • maximumPoolSize 线程池最大线程池
  • keepAliveTime 空闲线程存活时间
  • unit 存活时间单位
  • workQueue 工作队列
  • threadFactory 创建线程的工厂
  • handler 拒绝策略
1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//...
}
1
2
3
4
5
6
7
/* 线程池的生产者-消费者模式示意图
*
* 任务生产 |---------------| 多线程消费任务
* -------> task task task ------------>
* 投入队列 |---------------|
*
*/

同时满足以下三个条件,线程池会开始回收线程:

  1. 线程数量大于corePoolSize;
  2. 任务队列为空;
  3. 线程等待任务超过keepAliveTime。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//ThreadPoolExecutor.java
//Class ThreadPoolExecutor
//execute的参数是Runnable,如果准备执行Callable,Callable会被包装成Runnable再调用该方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();

//线程池中的线程数量小于corePoolSize,此时新增线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

//线程数量已达到corePoolSize,此时将任务放入队列,不新增线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

//队列已满,此时再次新增线程
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//ThreadPoolExecutor.java
//Class Worker
//线程池消费者,一个Worker对应一个线程
//Worker的工作是从队列拿任务并消费,然后不断重复这个循环
//如果Worker跳出这个循环,说明线程的run()方法结束,线程要销毁
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//从队列拿任务
//若该线程需要销毁,则会跳出这个while循环
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//对于接收Runnable任务的线程池而言,这里的task.run就是运行任务本身
//对于Callable而言,run的是Futuretask。Futuretask将Callable包装成了Runnable
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//FutureTask.java
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里包装了用户的Callable任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//FutureTask.java
private void finishCompletion() {
//waiters是一个线程安全的栈,里面保存了正在等待future结果的线程
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒在future.get等待着的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//FutureTask.java
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//若任务还在执行中,则将该线程作为等待节点压入栈中
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
//阻塞线程,等待FutureTask完成
LockSupport.park(this);
}
}

在FutureTask的set(result)中,除了改变任务的状态外,还唤醒了在等待的节点。

设置线程数量

对于IO密集型:CPU逻辑核数*[]

对于CPU密集型:CPU逻辑核数+1

如何界定一个程序是IO密集还是CPU,如何测量IO耗时与CPU耗时的比值,这些数据不是很容易就能拿到。再者,就算我们给出了理论值,这个值也仅供参考,实际上多少线程数是合理的还是需要在环境中进行压测。

工程上比较常用的经验值:2*CPU逻辑核数+1

任务分工

CompletionStage接口负责多任务的分工,用于异步执行中的阶段处理,该接口是Java8新增的一个接口,其大量用在Lambda表达式计算过程中。CompletableFuture是该接口的官方实现,目前只有这一个实现类。

在Java8之前,我们常使用Future来进行异步计算。那么在已经有Future的情况下,为什么要引入CompletionStage呢。

Future的局限性:Future的结果需要手动通过get()获取,以进行下一步操作。

CompletableFuture可以定义多个任务之间的关系,上一步的结果可以自动传递给下一步的任务。

CompletableFuture对象的创建主要通过supplyAsync(Supplier<U>)runAsync(Runnable)这两个静态方法完成。这两个方法都是异步的,会在新线程中运行任务,不会阻塞调用线程。这两个方法还有一个带Executor参数的版本,可以传入自定义的线程池。

任务关系

任务之间的关系大致可总结为以下三类:串行关系、并行关系、汇聚关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
串行关系(先执行任务A,再执行任务B)
--A--> --B-->


汇聚关系(先执行任务AB,C需要等待AB)
AND汇聚:C需要等待AB都执行完成
OR汇聚:AB中任意一个完成,C即可开始执行
--A-->|
|--C-->
--B-->|


并行关系(任务AB同时运行)
--A-->
--B-->

根据任务之间的依赖关系,CompletableFuture中的方法大致可分为以下三类。为什么没有并行关系的方法呢,因为只要在这些方法后面加上Async,就可以实现并行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
串行关系:
thenApply(Function)
thenAccept(Consumer)
thenRun(Runnable)
thenCompose(Function)

AND汇聚关系:
thenCombine(CompletionStage, BiFunction)
thenAcceptBoth(CompletionStage, Consumer)
runAfterBoth(CompletionStage, Runnable)

OR 汇聚关系:
applyToEither(CompletionStage, Function)
acceptEither(CompletionStage, Consumer)
runAfterEither(CompletionStage, Runnable)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("runAsync: " + Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
}
});

//thenRun()具体在哪个线程中运行
//取决于调用thenRun()时,runAsync()中的任务状态
//当f1 runAsync()中的任务在执行时,thenRun()中的任务会在相同的异步线程中运行
//当f1 runAsync()中的任务结束时,thenRun()中的任务会在调用线程中运行
f1.thenRun(() ->
System.out.println("thenRun: " + Thread.currentThread().getId()));

//不带Executor参数的异步回调
//会从ForkJoinPool.commonPool()获取线程来运行任务
//若是带Executor参数的异步回调
//会从Executor线程池获取线程来执行
f1.thenRunAsync(() ->
System.out.println("thenRun: " + Thread.currentThread().getId()));

任务批处理

CompletableFuture中还提供了批量处理任务的方法

1
2
3
4
5
//当所有的CompletableFuture都执行完后执行计算
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

//最快的那个CompletableFuture执行完之后执行计算
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

异常处理

在CompletableFuture调用链中,前面的任务抛出异常,后面的任务就不会被执行,Future将以异常状态结束,即调用get()的时候会抛异常。

1
2
3
4
5
6
7
8
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
}).thenApply(result -> {
//不会执行
return "thenApply";
}).thenAccept(result -> {
//不会执行
});

CompletableFuture提供了exceptionally()handle()whenComplete()等多个方法来对任务进行扫尾处理,这三个方法都可以感知到异常。

其中exceptionally(Function)只有在抛异常时会被调用。handle(BiFunction)whenComplete(BiConsumer)会在任务结束时被调用(不管以什么方式结束)。

如果在exceptionally(Function)handle(BiFunction)中对异常进行处理,那么后续调用get()join()就不会抛出异常。如果只有whenComplete(BiConsumer),后续还是会抛出异常。

1
2
3
4
5
6
7
.whenComplete((s, e) -> {
//s是上一步的结果,若无结果,则是null
//e是异常信息,若无异常,则是null
})
.handle((s, e) -> {
//同whenComplete
})

获取结果

CompletableFuture实现了Future接口,提供get()方法来获取结果。除了get()外,CompletableFuture还提供了join()getNow()join()可能会抛出unchecked异常。getNow()无论如何都会返回一个结果,如果任务完成,就返回任务的结果;如果未完成,就返回指定的结果。

批量任务

CompletionService接口解耦了任务的生产和消费,该接口将线程池Executor和阻塞队列BlockingQueue功能融合在了一起,能够让批量异步任务的管理更简单。ExecutorCompletionService是CompletionService接口的官方实现。
CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,先进行后续处理,避免无谓的等待。

RPC框架Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行地调用多个服务,只要有一个成功返回结果,整个服务就可以返回了。这个特性也可以借助CompletionService来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//CompletionService使用例子
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);

//批量提交任务
cs.submit(()->task(1));
cs.submit(()->task(2));
cs.submit(()->task(3));

//按照执行完成的顺序取结果
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
//其他业务逻辑
}