BlockingQueue---DelayQueue
参考资料:
林昊 --- <<分布式java应用 基础与实践>>
JDK API 文档
BlockingQueue:
在使用ThreadPoolExecutor构造器时, 有一个参数就是BlockingQueue<Runnable>. JDK的并发包中,有N个BlockingQueue的实现 :
DelayQueue: Delayed是一个无界的阻塞队列.不允许将NULL放入其中. 它的并发是使用了ReentrantLock和对应的一个condition来完成. 其外,DelayQueue的队列头部放置的是延迟期满了最久的一个元素. 如果元素的getDelay()方法返回0或者小于0的时候才能将其出列. DelayQueue中的元素都是实现了Delayed接口,在JDK中, 而Delayed接口继承了Comparable, 在使用DelayQueue之前就需要自己编写一个Delayed的实现类,JDK中,可参考FutureTask<T>中的私有内部类ScheduledFutureTask来给出思路:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
getDelay:
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
now()函数的实现:
private static final long NANO_ORIGIN = System.nanoTime();
/**
* Returns nanosecond time offset by origin
*/
final long now() {
return System.nanoTime() - NANO_ORIGIN;
}
其中,time为给定的延迟时间. 上面的comparaTo方法中,其实在可以使用:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
作为简单参考实现.
api介绍:
peek: 获取但不移除此队列的头部;如果此队列为空,则返回 null。与 poll 不同,如果队列中没有到期元素可用,则此方法返回下一个将到期的元素. 其源代码如下:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
注意了, q 是一个PriorityQueue.在分类"数据结构,算法"的系列中,笔者介绍过PriorityQueue时一个基于Comparator或者Comparable的基于数据结构"堆"的一种实现.也就是说,PriorityQueue是允许排序的,这个排序就是根据进入DelayQueue的元素的geyDelay()【Delayed接口中的方法】方法获取. 进一步看q.peek():
if (size == 0)
return null;
return (E) queue[0];
可见其返回的是queue[0].
take 和 poll的api:
这两个api都是移除queue[0]但是做法却是不一样的:
take():
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);//avaliable 是ReentrantLock在此类中的一个condition
take的话如果queue[0]没有到期的话,就会在condition条件中阻塞直到queue[0]的延迟期到达;
poll:
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
当queue[0]的延迟期没到(first.getDelay(....) > 0),那么就返回null表示现在没有可取对象.
add/put/offer api:
add 和put都是 利用offer来添加一个元素,如果队列满的话就进行相应的扩容:
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = ((oldCapacity < 64)?
((oldCapacity + 1) * 2):
((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);
这里先给出一个网上比较好的关于DelayQueue的应用.感谢blog主人的无私分享.
http://ideasforjava.javaeye.com/blog/657384
to be continue..
2023年12月06日 22:38
This Is Very Nice and Wonderful Keep Writing Such type Of Wonderful blogs