Java并发编程那些事儿(六)——Executor框架及线程池

这是并发编程系列的第六篇文章。上一篇介绍了一些比较常用的并发工具类,这篇主要说一下Executor框架及线程池。

什么是线程池

顾名思义,线程池就是存放线程的池子,池子里面存放的是已经创建好的N个线程;Java里面一般用ListSet等容器类来存储线程,实现线程池功能。

为什么要使用线程池

首先,线程的创建和销毁是很耗费时间和资源的一件事情。
其次,线程不能无限制的创建,每个线程都会占用内存资源,而且如果线程过多,线程之间的调度也是一件很消耗系统性能的事情。

线程池带来的好处

  1. 可以做到随用随取,节省因创建线程而花费的时间
  2. 通过设置线程池容量,可以保证创建的线程数量在一个合理范围区间,不会耗光系统资源。

    如何创建线程池

    Java提供了Executor框架,可以让我们简单方便的使用线程池。

举个例子

1
2
3
4
5
6
//创建一个为容量为2的线程池
Executor exec = Executors.newFixedThreadPool(2);
//向线程池提交5个任务
for (int i = 0; i < 5; i++){
exec.execute(new Task());
}

Executors

ExecutorsExecutor框架提供的一个工具类,该工具类提供了一些方法,让我们可以方便的创建各种类型的线程池。比如
newCachedThreadPool() : 创建具有缓存功能的线程池,如果线程数量超过处理需求时,可以对空闲线程进行回收,当线程数量不够时,则新增线程。
newFixedThreadPool():创建固定容量的线程池,一旦创建数量设置好就不会改变。
newScheduledThreadPool(): 创建可以延迟或者定时调度的固定容量的线程池。
newSingleThreadExecutor() :创建只有一个线程的线程池。

以上方法都会返回一个 ExecutorService 接口描述的对象。

Executor

Executor是个接口,只有短短的三行代码。

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor接口虽然很简单,但可以将任务的提交和执行成功解耦。

对于我们之前的代码 都是通过 new Thread(runnable).start()方式驱动任务执行,有了Executor之后,建议大家都换成Executor方式驱动任务,见上面的例子。

ExecutorService

Executor也是有生命周期的。分为运行,关闭和已终止三种状态。运行态表示可以向线程池提交任务;关闭状态表示不可以提交任务,但是已提交的会被执行;已终止状态表示所有任务都已经执行完毕。

为了能够管理Executor的生命周期,JDK提供了ExecutorService接口,该接口继承自Executor,除了提供跟生命周期相关的shutdown() 等方法外,还提供了一些功能更强大的任务提交方法。

ExecutorService接口的主要方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ExecutorService extends Executor {

//平缓关闭,不在接受新任务,但是会等待所有已提交的任务完成
void shutdown();

//强制关闭,尝试关闭所有正在执行的任务,并且不会执行等待中的任务,同时返回那些等待执行的任务
List<Runnable> shutdownNow();

//执行所有任务,并拿到所有任务的返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks)    throws InterruptedException;

//提交Callable任务,并拿到返回结果
<T> Future<T> submit(Callable<T> task);

//提交Runnable任务,并拿到任务执行的异常信息
Future<?> submit(Runnable task);
}

在实际生产中,基本上都会使用ExecutorService替代Executor

Callable和Runnable

Runnable:我们之前介绍过,用于表示一项任务,该接口只有一个void类型的run方法来描述任务逻辑,换句话说,使用Runnable描述任务逻辑的时候不能有任何返回值。

Callable: Runnable只能表述很少的一部分任务场景,如果任务有返回值,比如多线程搜索,那么Runnable就不能满足要求了。这个时候可以使用Callable接口来描述任务。Callable接口提供了一个call()方法用来描述任务逻辑,该方法可以定义任务的返回值类型。

Future

通过Executor框架执行的任务,有四个阶段,分别是创建,提交,开始和完成。Future就是用来表示任务生命周期的,除此之外,还可以通过Future拿到任务的返回结果,甚至取消任务的执行。

聪明的Future.get()方法

Future接口提供的get()方法是个神奇的方法,它的行为会根据任务状态的不同,作出不同的反应。如果任务已经执行完成,那么会立刻返回任务的执行结果,如果任务正在执行,那么get()方法将会阻塞,直到任务执行完成之后在返回任务的执行结果。如果任务抛出异常,那么get()就会把异常信息封装之后给你。

举个例子

我们实现一个模拟并发搜索功能,来直观感受一下FutureCallable的强大功能。

1
2
3
4
5
6
7
8
9
public class TaskCallable implements Callable<String> {
//模拟搜索任务,每个任务返回String对象
@Override
public String call() throws Exception {
Thread.sleep(new Random().nextInt(10)*100);
String result = Thread.currentThread().getName();
return result;
}
}

提交并执行搜索任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args){
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++){
Future<String> future = exec.submit(new TaskCallable());
try {
//通过get方法拿到任务的执行结果
//实际生产不要这样用,因为get()会阻塞。
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
exec.shutdown();
}

invokeAll()

invokeAll()可以做到,一次性提交全部任务,等全部任务结束后,在返回每个任务的直接结果。还是拿上面的并行搜索举例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 public static void main(String[] args){
ExecutorService exec = Executors.newFixedThreadPool(4);
//任务列表
List<TaskCallable> tasks = new ArrayList<TaskCallable>();
for (int i = 0; i < 10; i++){
tasks.add(new TaskCallable());
}

try {
//一次提交全部任务
List<Future<String>> futures = exec.invokeAll(tasks);
//所有任务都完成后,返回每个任务的Future
for (Future<String> future : futures){
String result = future.get();
System.out.println(result);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

System.out.println("主线程正在做一些事情");
exec.shutdown();
}

CompletionService

对于上面介绍的方法,需要等所有的搜索任务都完成才能拿到返回结果。因为每个任务执行的时间不一样,所以invokeAll()方法的执行时间取决于最慢的那个任务。

有的时候我们希望,只要有任务完成,就立刻把搜索结果展示给用户,不必等全部任务执行完成,这样可以大大提高用户的使用体验。

Executor框架中提供的CompletionService就是完成此项工作的。CompletionService借助ExecutorBlockingQueue来实现,当一个任务完成之后,立刻加入到BlockingQueue队列当中。

继续拿上面并行搜索的例子来做演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args){
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletionService<String> completionService = new ExecutorCompletionService<String>(exec);
for (int i = 0; i < 10; i++){
completionService.submit(new TaskCallable());
}
//关闭Executor
exec.shutdown();
System.out.println("子线程执行搜索任务的时候,主线程可以做一些搜索展示的准备工作..." );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("搜索展示的准备工作完成,准备获取搜索结果" );
for (int i = 0; i < 10; i++){
try {
//只要有任务完成,会立刻返回,不用等全部任务执行完成
Future<String> future = completionService.take();
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
结束

在实际生产中,建议各位都使用JDK提供的Executor框架进行并发编程。对于执行任务有返回结果的,可以通过CallableFuture来实现;对于任务之间没有任何依赖,只要有任务完成,就可以立刻返回的,可以通过CompletionService来实现。


推荐阅读
1. Java并发编程那些事儿(一) ——任务与线程
2. Java8的Stream流真香,没体验过的永远不知道
3. Awk这件上古神兵你会用了吗
4. 手把手教你搭建一套ELK日志搜索运维平台

-------------本文结束-------------
坚持原创技术分享,您的支持将鼓励我继续创作!
0%