Java并发编程那些事儿(五)——闭锁、栅栏、信号量

这是并发编程系列的第五篇文章。上一篇介绍了线程间的通信问题,这篇主要说一下JDK提供的比较常用的三个并发工具类,闭锁,栅栏,和信号量。

闭锁——CountDownLatch

闭锁类似于一道大门,所有的线程都在大门外等候,当大门打开时,所有线程一起开工。

CountDownLatch提供了一个构造函数,可以传入一个整数作为参数,表示初始计数器。每调用一次countDown()方法时,计数器减一,当计数器减到0时,表示大门开放。可以把CountDownLatch想象成赛车的倒计时计数器,当计数器为0时,所有赛车加速驶出。

下面我们通过开发一个简单的压力测试小工具来演示CountDownLatch的使用。

压力测试小工具
在测试一个系统的处理能力的时候,往往要求测试工具能够模拟出多个客户端同一时刻对服务端发起请求的情况,以此来判断系统的抗压能力。下面的代码借助CountDownLatch实现了该功能,并统计出最后一条请求的完成时间,以此来判断系统的最大TPS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Task implements Runnable {
//开始计数器
private final CountDownLatch startGate;
//结束计数器
private final CountDownLatch endGate;

public Task(CountDownLatch startGate, CountDownLatch endGate){
this.startGate = startGate;
this.endGate = endGate;
}

@Override
public void run() {
try {
//创建好的线程在此处等待
startGate.await();
System.out.println(Thread.currentThread().getName() + "开始执行" + System.currentTimeMillis());
//模拟真正的任务执行
Thread.sleep(new Random().nextInt(10)*100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//线程执行完,对结束计数器减一
endGate.countDown();
}
}
}

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 每个线程创建好之后,进行等待并不执行,所有线程都创建完毕之后,在一起开始执行。
* 每个线程执行结束后,都对endGate自动减一,这样主线程会等所有线程操作完成后,进行时间统计。
*/

public class CountDownLatchDemo {
private static CountDownLatch startGate = new CountDownLatch(1);
private static CountDownLatch endGate = new CountDownLatch(4);

public static void main(String[] args) throws InterruptedException {
for (int i = 0 ; i < 4; i++){
Thread thread = new Thread(new Task(startGate,endGate));
thread.start();
}
long startTime = System.nanoTime();
System.out.println("所有线程开始执行");
startGate.countDown();
endGate.await();
long endTime = System.nanoTime();
System.out.println("并行执行任务共花费" + (endTime - startTime));
}
}

栅栏——CyclicBarrier

栅栏和闭锁有相似的地方,他们都会执行等待,直到某个条件发生。栅栏的特点是用于线程间互相等待,而闭锁更多是用于等待某个事件,此外栅栏CyclicBarrier可以通过reset()方法进行重用,而CountDownLatch则不可以。

举个例子
前端页面展示的内容往往是后台从多个渠道拿过来的数据,这个时候我们可以启动多个线程,每个线程计算一份数据,但每份数据的计算量都不一样,返回时间也不一样,但我们必须等所有数据都返回后,在一起返回给前端做展示。这个时候就可以使用CyclicBarrier帮忙。

CyclicBarrier代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CounterTask implements Runnable {
private CyclicBarrier cyclicBarrier;
public CounterTask(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "start");
try {
//模拟计算时间
Thread.sleep(new Random().nextInt(10)*100);
System.out.println(Thread.currentThread().getName()+ "complete!");
//执行完,等待其它线程
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 模拟后台计算数据
* 当多组数据都计算完成之后
* 才能向前台展示。
*/
public class CyclicBarrierDemo {
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("任务完成,可以向前端展示了");
}
});

for (int i = 0; i < 5; i++){
Thread thread = new Thread(new CounterTask(cyclicBarrier));
thread.start();
}
}
}

上面的代码我们看到,除了定义的计算任务以外,还可以通过构造函数向栅栏提供一个匿名Runnable,表示当计数器为0时,执行的任务。上面代码已经做了演示说明,不再做过多的解释。

信号量-Semaphore

之前介绍的锁,无论是内置锁synchronized还是现实锁Lock,都是只允许一个线程访问共享资源,而信号量可以允许N个线程同时访问共享资源。

可以把信号量想象成一张张许可证,拿到许可证的线程就可以操作共享资源,可以通过acquire方法获得许可证,通过release方法归还许可证。如果没有许可证可用,那么acquire方法将阻塞。

信号量一般用作实现资源池,当资源池没有资源可用时,申请资源的线程将阻塞,直到线程池有可用资源。

举个例子

利用信号量实现一个限制客户端登陆人数的功能。
LoginTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class LoginTask implements Runnable {
private Semaphore semaphore;
private int userId;

public LoginTask(int userId, Semaphore semaphore) {
this.userId = userId;
this.semaphore = semaphore;
}

@Override
public void run() {
try {
//获取许可
semaphore.acquire();
System.out.println("第" + userId + "号用户登陆");
//模拟登陆时长
Thread.sleep(new Random().nextInt(10)*2000);
System.out.println("第" + userId + "号用户推出");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//退出后释放资源
semaphore.release();
}
}
}

模拟用户登陆

1
2
3
4
5
6
7
8
9
10
public class SemaphoreLoginDemo {
public static void main(String[] args){
//创建五个许可资源
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++){
Thread thread = new Thread(new LoginTask(i,semaphore));
thread.start();
}
}
}

上面的代码大家会看到,最初有5个用户登陆,后面只有当用户退出后,才会有用户登陆进去。

结束

这篇文章介绍了,并发编程中经常会用到的三个工具类,并用代码示例做了演示说明。下一篇会介绍JDK提供的Executor框架及线程池技术。


推荐阅读
1. Java并发编程那些事儿(一) ——任务与线程
2. Java8的Stream流真香,没体验过的永远不知道
3. Awk这件上古神兵你会用了吗
4. 手把手教你搭建一套ELK日志搜索运维平台

-------------本文结束-------------
坚持原创技术分享,您的支持将鼓励我继续创作!
0%