SynchronousQueue 原理
这篇文章主要讲述SynchronousQueue数据结构及在线程池中的应用。
1. 概述
在生产者和消费者的线程模型下,生产者线程生产数据(事件,任务等),消费者线程消费数据。通常情况下,它们需要一个中间的数据结构来交换数据,如队列。生成者线程将数据存入队列,若队列满了则阻塞生产者线程,等待消费者线程唤醒;消费者线程从队列取数据,若队列为空则阻塞消费者线程,等待生产者线程唤醒。在这种场景下,如果生产数据过快,队列中会堆积很多数据,如果是无界队列的话,可能会造成问题。现在提供了另外一种选择,即SynchronousQueue,它内部没有容量,生产者线程和消费者线程进行一一匹配,亲手(handoff)交换数据。
SynchronousQueue是一种特殊的阻塞队列,它没有实际的容量,任意线程(生产者线程或者消费者线程,生产类型的操作如put/offer,消费类型的操作如poll/take)都会等待知道获得数据或者交付完成数据才会返回,一个生产者线程的使命是将线程附着着的数据交付给一个消费者线程,而一个消费者线程则是等待一个生产者线程的数据。它们会在匹配到互补线程的时候就会做数据交易,比如生产者线程遇到消费者线程时,或者消费者线程遇到生产者线程时,一个生产者线程就会将数据交付给消费者线程,然后共同退出。[1]
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。因为SynchronousQueue没有容量,因此offer和take会一直阻塞(如果操作允许超时且超时时间为0,没有匹配的线程时则直接返回不回阻塞),直到有另一个线程已经准备好参与到交付过程中。在线程池中,提交的任务尽量是执行时间短、不会被阻塞,这样才能保证线程的重复使用,否则就会在短时间内生成大量的线程,引起内存被耗尽的问题(一个线程会申请一个线程栈)。
在对SynchronousQueue进一步分析前,先对SynchronousQueue的基本结构及对外接口的接口(方法)做了一个简单的描述,在后面的部分会依赖这些知识点。
SynchronousQueue内部逻辑封装在Transferer的实现类中,该类只有一个transfer方法,put/offer及poll/take都是用它来实现,它的声明如下:
1 | abstract static class Transferer<E> { |
SynchronousQueue中的put/offer操作主要是生产数据,put操作主要是数据交给消费者,没有匹配的消费者则会调用LockSupport.park阻塞自己,直到有消费者线程唤醒且进行匹配,可以被中断,所以要判断是正常返回还是中断返回;而offer操作主要是尝试将数据交给消费者,如果有匹配的消费者则Transferer.transfer返回数据本身,否则直接返回null,offer根据返回的结果进行判断是是否成功,该方法不会阻塞自己。线程池中使用了offer操作,offer成功表示有消费者线程可以重复使用,否则新建一个线程来运行该任务,方法定义如下:
1 |
|
SynchronousQueue中的poll/take主要是消费数据,take操作是从队列中获取数据,调用该方法会被阻塞直到被向生产者线程唤醒,也可以调用interrupted()中断返回,返回之前需要判断是正常返回还是中断返回;不带参数的poll方法尝试获取数据,如果有数据则返回成功,否则返回失败,不会阻塞调用者线程;在带超时参数的poll方法中,如果没有数据,则会将调用线程睡眠指定时间。线程池中获取数据使用了带超时参数的poll操作,超时时间为60S,如果60S都没有数据,则唤醒该线程重新获取数据,如果仍然没有数据,则释放回收该线程。
1 |
|
SynchronousQueue有两个版本的Transferer实现,一种为公平模式,一种为非公平模式,公平模式的实现类为TransferQueue,它使用队列来作为存储结构,请求匹配的线程总是先尝试跟队列尾部的线程进行匹配,如果失败再将请求的线程添加到队列尾部,而非公平模式的实现类为TransferStack,它使用一个stack来作为存储结构,请求匹配的线程总是试图与栈顶线程进行匹配,失败则添加到栈顶。下面针对SynchronousQueue的两个版本进行分析。
2. 图解TransferStack
非公平模式底层的数据结构是TransferStack,它实际是一个LIFO的栈结构,用head指针指向栈顶,根据栈的LIFO特点,在非公平模式下,匹配操作总是与栈顶元素进行,即与最后一个入栈的元素而不是第一个元素,它的不公平性主要体现在这里。TransferStack及head定义如下:
1 | /** Dual stack */ |
现在以图形的方式来展示TransferStack的工作原理,SynchronousQueue为生产及消费数据提供了阻塞及不阻塞两种接口,两者都是阻塞方式是最复杂的一种情况,现在就以它为例。
1 | /** 生产数据 **/ |
在实例中SNode的内存布局如下所示:
操作序列如下:
1、TransferStack初始状态
栈顶元素head初始状态指向NULL。
2、生产者线程t1执行 put(task1)操作
TransferStack栈中没有元素,构造SNode结点SNode1,结点内容如下:
- mode : 1,表示为数据模式;
- item : task1,表示需要传递给消费者的数据;
- waiter : t1,表示生产者线程;
- match : null,表示没有匹配的结点;
- next : null,表示下一个结点。
将SNode1结点压入TransferStack栈中,调用LockSupport.park方法阻塞t1线程,等待消费者线程唤醒。
3、生产者线程t2执行 put(task2)操作
生产者线程t2执行的操作是生产数据,与TransferStack栈顶元素的mode是一致的,构造新的结点SNode2,并将该结点压入栈首,内容如下:
- mode : 1,表示为数据模式;
- item : task2,表示需要传递给消费者的数据;
- waiter : t2,表示生产者线程;
- match : null,表示没有匹配的结点;
- next : SNode1,表示下一个结点。
最后阻塞t2线程,等待消费者线程唤醒。
4、消费者线程t3执行 take()操作
消费者线程t3执行REQUEST操作,与TransferStack栈顶元素的mode是互补(FULFILLING)的,此时也会构造一个新结点SNode3,加入到栈中,内容如下:
- mode : 3,DATA与FULFILLING取或操作,即1 | 2 = 3, 表示正在进行匹配操作;
- item : null,请求(REQUEST)操作没有数据;
- waiter : t3,表示生产者线程;
- match : null,表示没有匹配的结点;
- next : SNode2,表示下一个结点。
SNode3压入栈顶之后,同时尝试修改与之匹配的SNode2结点,将SNode2中的match字段修改为SNode3,表示与之匹配的结点,此时栈的状态如下所示:
完成匹配操作之后,t3线程LockSupport.unpark方法唤醒t2线程,并将SNode3及SNode2弹出栈,其状态如下所示:
最后消费者t3线程拿到生产者线程生产的数据task2,生产者线程t2被唤醒继续执行后续流程。
5、消费者线程t4执行 take()操作
与第4步的操作类似,此时执行的互补(FULFILLING)操作,新构建一个结点SNode4,并压入栈中,其状态如下所示:
最后将两个结点弹出栈,此时栈中没有结点,回到初始状态,如第1步所示,同时t4线程拿到t1线程的数据task1。
3. 图解TransferQueue
公平模式底层的数据结构是TransferQueue,它是一个队列结构。head引用指向队列头结点,而tail指向尾结点。它的公平体现在匹配的操作是从队列的第一个元素开始进行的。下面是TransferQueue的定义。
1 | /** Dual Queue */ |
QNode的内存布局如下所示:
以上章的操作顺序来请求数据。
1、TransferQueue初始状态
构建一个Dummy Node压入队列,当head,tail指向同一个结点表示队列为空。
2、生产者线程t1执行 put(task1)操作
TransferQueue队列为空,构造QNode结点QNode1,结点内容如下:
- isData : true,表示为数据模式;
- item : task1,表示需要传递给消费者的数据;
- waiter : t1,表示生产者线程;
- next : null,表示下一结点为空。
将QNode1结点压入TransferQueue队尾,调用LockSupport.park方法阻塞t1线程,等待消费者线程唤醒。
3、生产者线程t2执行 put(task2)操作
生产者线程t2执行的操作是生产数据,与TransferQueue队首元素的isData是一样的,表示模式是一致的,构造新的结点PNode2,并将该结点压入队尾,内容如下:
- isData : true,表示为数据模式;
- item : task2,表示需要传递给消费者的数据;
- waiter : t2,表示生产者线程;
- next : null,表示下一结点为空。
最后阻塞t2线程,等待消费者线程唤醒。
4、消费者线程t3执行 take()操作
消费者线程t3执行REQUEST操作,跟TransferQueue队首元素的模式是互补(FULFILLING)的,此时与TransferStack的操作不一样,不用生成新结点压入队列。它执行了以下操作:1)将PNode1(队首)元素设置为参数e,在这里是null,完成数据的交接(如果是put操作的话,传递的就是真实的数据);2)将head指针指向PNode1,PNode1作为新的Dummy Node(等于将PNode1移除队列);3)唤醒PNode1对应的线程,即t1。
最后t3从PNode1中拿出数据task1并返回,生产者线程t1被唤醒继续执行后续流程。
5、消费者线程t4执行 take()操作
与第4步的操作类似,此时执行的互补(FULFILLING)操作,队列状态如下所示:
回到初始状态,如第1步所示,同时t4线程拿到t2线程的数据task2,t2线程被唤醒。
4. 代码分析
现在以TransferStack为例,分析SynchronousQueue的流程。
1 | /** |
代码主要分为三种情况:
如果当前的栈是空的,或者包含与请求节点模式相同的节点,那么就将这个请求的节点作为新的栈顶节点,等待被下一个请求的节点匹配,最后会返回匹配节点的数据或者null,如果被取消则会返回null。
如果当前栈不为空,并且请求的节点和当前栈顶节点模式互补,那么将这个请求的节点的模式变为FULFILLING,然后将其压入栈中,和互补的节点进行匹配,完成匹配之后将两个节点一起弹出,并且返回交易的数据。
如果栈顶已经存在一个模式为FULFILLING的节点,说明栈顶的节点正在进行匹配,那么就帮助这个栈顶节点快速完成匹配,然后继续匹配。
主要方法说明:
casHead : 通过CAS操作将nh设置为新的栈顶结点;
1
2
3
4boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}awaitFulfill : 自旋或阻塞一个节点,直到找到一个匹配的结点;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋
if (spins > 0)
// 如果是cpu是多核,则进行自旋
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 如果没有设置超时,则直接阻塞调用者线程
else if (!timed)
LockSupport.park(this);
// 如果设置超时时间,则设置阻塞时间
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}tryMatch : 尝试匹配结点,如果匹配成功则唤醒结点对应的线程
1
2
3
4
5
6
7
8
9
10
11
12boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
5. 线程池应用
SynchronousQueue的一个使用场景是线程池,使用它的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。
1 | public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { |
首先看下线程池的任务提交流程:
1 | public void execute(Runnable command) { |
提交任务有在三个步骤:
- 如果当前工作线程数小于“核心线程数”,则创建一个工作线程来执行task,在这里,由于“核心线程数”等于0,会跳过这个步骤,执行第2步;
- 将任务加入到工作队列(SynchronousQueue)中,如果添加成功表示将任务交付给工作线程了,如果没有成功则执行第3步;
- 如果加入到工作队列失败,会尝试创建一个工作线程来执行任务,如果工作线程数小于最大线程数(Integer.MAX_VALUE),正常情况下,工作线程会创建成功,如果创建失败则会执行“拒绝策略”。
结合SynchronousQueue,我们来分析下线程池的执行流程:
- 由于“核心线程数”等于0,会跳过第1个步骤;
- 在第2步中,提交任务采用的是offer操作,在上面的内容我们提到,offer尝试将数据(任务)交给匹配的线程:
- 如果有匹配的工作者线程,交付成功;
- 如果匹配不成功,返回false,不会阻塞调用者线程;
在这里分为两种情况,1)刚开始,没有工作线程,SynchronousQueue队列为空,offer操作失败,继续执行第3步;2)执行一段时间后,SynchronousQueue中有工作线程,数据交付成功,直接返回;
- 交付失败后,会尝试新建一个工作线程来执行任务,由于最大线程数设置为Integer.MAX_VALUE,线程都会创建成功,而不会被拒绝。在这里,线程数没有做限制,存在线程创建过多导致内存溢出的风险。
分析了提交任务(offer),再来看获取任务的流程,获取任务的流程在工作线程的执行代码中,工作线程一直会从SynchronousQueue中获取任务,如果空闲时间超过60S则会回收该工作线程。我们来看下工作线程中获取任务的代码:
1 | private Runnable getTask() { |
获取任务的关键代码主要是这行语句:
1 | workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) |
这行语句有如下功能:获取任务,如果没有匹配的操作,则会阻塞60S,这里的60S就是线程空闲时间;如果有匹配的操作,则直接获取交付的数据。在这里,如果是超时返回的话,该工作线程会退出,且工作线程的数量会减1。
通过上面的分析,我们可以看出,提交任务使用不阻塞的offer方法,获取任务使用带超时的阻塞方法poll。不管offer成功与否,总能保证有工作线程去马上去执行任务,如果没有可复用的工作线程,则会创建一个新的工作线程来执行,另外SynchronousQueue只会阻塞工作线程,即队列中只会有工作线程,不会有提交任务的线程。
6. 总结
这篇文章分析了SynchronousQueue的底层数据结构及在线程池中的运用,并对相关代码进行了分析,希望能够对想了解SynchronousQueue的同学有所帮助。
参考: