xxxxxxxxxx41private final transient ReentrantLock lock = new ReentrantLock();2private final PriorityQueue<E> q = new PriorityQueue<E>();3private Thread leader;4private final Condition available = lock.newCondition();
lock = new ReentrantLock()保证多线程操作队列的安全性。
q = new PriorityQueue<E>()按照到期时间从小到大对元素排序。
Thread leader等待对队首元素操作的leader线程,该线程将被定时休眠后唤醒,最先获取值返回。
available = lock.newCondition()与Leader/Followers模式配合,Followers在此等待,当leader离任时某个Follower被唤醒成为新的leader,非公平获取,线程获取元素的返回顺序和调用顺序不一定一致。。
保证只有一个leader线程会被定时唤醒,其余Followers线程在available上无限休眠,减少不必要的竞争。
非公平获取,线程获取元素的返回顺序和调用顺序不一定一致。
xxxxxxxxxx141public boolean offer(E e) {2 final ReentrantLock lock = this.lock;3 lock.lock();4 try {5 q.offer(e);6 if (q.peek() == e) {7 leader = null;8 available.signal();9 }10 return true;11 } finally {12 lock.unlock();13 }14}
available.signal():如果e加入队列后自己为队首元素,证明e的到期时间早于向前队首元素leader线程的休眠时间设定为leader和flowers线程),让他们尝试获取e或者将自己的休眠时间更新为e的到期,保证e被及时消费。
举例:->当前队首元素
->1s后过期时间5s的新元素e加入队列,成为队首元素
->如果不手动唤醒线程,元素e只能在9s被主动唤醒的leader线程获取,此时e已经过期4s
->如果手动唤醒等待线程,被唤醒的线程更新自己的定时休眠时间为5s,e将能在5s后被及时消费。
leader = null:leader是对队首元素操作的线程,处于定时休眠状态,具有更高的获取数据优先级。当leader和flower线程休眠时,由于二者处于同一个等待队列和柱塞队列,无法保证available.signal()唤醒的是leader线程,如果被唤醒的是flower线程且队首元素e未到期,该线程发现存在leader线程后将进入永久休眠(take方法的第15行),之后只能等待leader线程休眠时间到后主动唤醒获取队首元素,但由于leader线程休眠时间是按照e的过期时间,将导致e不能及时被消费。
举例:->前队首元素
->1s后过期时间5s的新元素e加入队列,成为队首元素,唤醒一个等待线程
->如果不将leader标识置空,且被唤醒的线程是之前的flower线程,唤醒线程将再次进入永久休眠,元素e只 能在9s被主动唤醒的leader线程获取,此时e已经过期4s
->如果将leader标识置空,被唤醒的线程将成为新的leader线程,并且更新自己的定时休眠时间为5s,e将能 在5s后被及时消费。
公平性:由于手动调用了available.signal(),被唤醒线程不一定是leader线程,所以是非公平,方法返回顺序不一定和调用顺序一致,
xxxxxxxxxx351 // 柱塞方式获得元素2public E take() throws InterruptedException {3 final ReentrantLock lock = this.lock;4 lock.lockInterruptibly();5 try {6 for (;;) {7 E first = q.peek();8 if (first == null)9 available.await();10 else {11 long delay = first.getDelay(NANOSECONDS);12 if (delay <= 0L)13 return q.poll();14 first = null; // don't retain ref while waiting15 if (leader != null)16 available.await();17 else {18 Thread thisThread = Thread.currentThread();19 leader = thisThread;20 try {21 available.awaitNanos(delay);22 } finally {23 if (leader == thisThread)24 leader = null;25 }26 }27 }28 }29 } finally {30 if (leader == null && q.peek() != null)31 available.signal();32 lock.unlock();33 }34}35
leader = null:24行leader线程定时唤醒后即完成一届任期,如果自己还是leader将自动卸任进入下一轮leader的竞选,以便其他线程有机会成为新的领导线程并被唤醒。
自动连任leader,修改进入无限期休眠条件为当前leaer不是自己、定时唤醒后不卸任自动连任、返回前再卸任:
xxxxxxxxxx261public E take() throws InterruptedException {2 //...3 try {4 //...5 // 修改进入无限期休眠条件6 if (leader != null&&leader != Thread.currentThread())7 available.await();8 else {9 Thread thisThread = Thread.currentThread();10 leader = thisThread;11 try {12 available.awaitNanos(delay);13 } finally {14 // 不卸任leader,自动连任15 // if (leader == thisThread)16 // leader = null;17 }18 }19 } finally {20 // 返回前卸任leader21 leader=null;22 if (q.peek() != null)23 available.signal();24 lock.unlock();25 }26}