xxxxxxxxxx
41private final transient ReentrantLock lock = new ReentrantLock();
2private final PriorityQueue<E> q = new PriorityQueue<E>();
3private Thread leader;
4private final Condition available = lock.newCondition();
lock = new ReentrantLock()
保证多线程操作队列的安全性。
q = new PriorityQueue<E>()
按照到期时间从小到大对元素排序。
Thread leader
等待对队首元素操作的leader线程,该线程将被定时休眠后唤醒,最先获取值返回。
available = lock.newCondition()
与Leader/Followers模式配合,Followers在此等待,当leader离任时某个Follower被唤醒成为新的leader,非公平获取,线程获取元素的返回顺序和调用顺序不一定一致。。
保证只有一个leader线程会被定时唤醒,其余Followers线程在available
上无限休眠,减少不必要的竞争。
非公平获取,线程获取元素的返回顺序和调用顺序不一定一致。
xxxxxxxxxx
141public boolean offer(E e) {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 q.offer(e);
6 if (q.peek() == e) {
7 leader = null;
8 available.signal();
9 }
10 return true;
11 } finally {
12 lock.unlock();
13 }
14}
available.signal()
:如果e
加入队列后自己为队首元素,证明e
的到期时间早于向前队首元素leader
线程的休眠时间设定为leader
和flowers
线程),让他们尝试获取e
或者将自己的休眠时间更新为e
的到期,保证e
被及时消费。
举例:->当前队首元素
->1s后过期时间5s的新元素e
加入队列,成为队首元素
->如果不手动唤醒线程,元素e
只能在9s被主动唤醒的leader
线程获取,此时e
已经过期4s
->如果手动唤醒等待线程,被唤醒的线程更新自己的定时休眠时间为5s,e
将能在5s后被及时消费。
leader = null
:leader
是对队首元素操作的线程,处于定时休眠状态,具有更高的获取数据优先级。当leader
和flower
线程休眠时,由于二者处于同一个等待队列和柱塞队列,无法保证available.signal()
唤醒的是leader
线程,如果被唤醒的是flower
线程且队首元素e
未到期,该线程发现存在leader
线程后将进入永久休眠(take
方法的第15行),之后只能等待leader
线程休眠时间到后主动唤醒获取队首元素,但由于leader
线程休眠时间是按照e
的过期时间,将导致e
不能及时被消费。
举例:->前队首元素
->1s后过期时间5s的新元素e
加入队列,成为队首元素,唤醒一个等待线程
->如果不将leader标识置空,且被唤醒的线程是之前的flower线程,唤醒线程将再次进入永久休眠,元素e
只 能在9s被主动唤醒的leader
线程获取,此时e
已经过期4s
->如果将leader标识置空,被唤醒的线程将成为新的leader线程,并且更新自己的定时休眠时间为5s,e
将能 在5s后被及时消费。
公平性:由于手动调用了available.signal()
,被唤醒线程不一定是leader
线程,所以是非公平,方法返回顺序不一定和调用顺序一致,
xxxxxxxxxx
351 // 柱塞方式获得元素
2public E take() throws InterruptedException {
3 final ReentrantLock lock = this.lock;
4 lock.lockInterruptibly();
5 try {
6 for (;;) {
7 E first = q.peek();
8 if (first == null)
9 available.await();
10 else {
11 long delay = first.getDelay(NANOSECONDS);
12 if (delay <= 0L)
13 return q.poll();
14 first = null; // don't retain ref while waiting
15 if (leader != null)
16 available.await();
17 else {
18 Thread thisThread = Thread.currentThread();
19 leader = thisThread;
20 try {
21 available.awaitNanos(delay);
22 } finally {
23 if (leader == thisThread)
24 leader = null;
25 }
26 }
27 }
28 }
29 } finally {
30 if (leader == null && q.peek() != null)
31 available.signal();
32 lock.unlock();
33 }
34}
35
leader = null
:24行leader线程定时唤醒后即完成一届任期,如果自己还是leader将自动卸任进入下一轮leader的竞选,以便其他线程有机会成为新的领导线程并被唤醒。
自动连任leader
,修改进入无限期休眠条件为当前leaer
不是自己、定时唤醒后不卸任自动连任、返回前再卸任:
xxxxxxxxxx
261public E take() throws InterruptedException {
2 //...
3 try {
4 //...
5 // 修改进入无限期休眠条件
6 if (leader != null&&leader != Thread.currentThread())
7 available.await();
8 else {
9 Thread thisThread = Thread.currentThread();
10 leader = thisThread;
11 try {
12 available.awaitNanos(delay);
13 } finally {
14 // 不卸任leader,自动连任
15 // if (leader == thisThread)
16 // leader = null;
17 }
18 }
19 } finally {
20 // 返回前卸任leader
21 leader=null;
22 if (q.peek() != null)
23 available.signal();
24 lock.unlock();
25 }
26}