code_money_guji's Blog

Happy coding

BlockingQueue---DelayQueue

code_money_guji posted @ 2011年2月25日 10:10 in Concurrency in java , 3450 阅读

参考资料:

         林昊   --- <<分布式java应用  基础与实践>>

         JDK API 文档

 

BlockingQueue:

      在使用ThreadPoolExecutor构造器时, 有一个参数就是BlockingQueue<Runnable>. JDK的并发包中,有N个BlockingQueue的实现 :

     

       DelayQueue: Delayed是一个无界的阻塞队列.不允许将NULL放入其中. 它的并发是使用了ReentrantLock和对应的一个condition来完成. 其外,DelayQueue的队列头部放置的是延迟期满了最久的一个元素. 如果元素的getDelay()方法返回0或者小于0的时候才能将其出列. DelayQueue中的元素都是实现了Delayed接口,在JDK中, 而Delayed接口继承了Comparable, 在使用DelayQueue之前就需要自己编写一个Delayed的实现类,JDK中,可参考FutureTask<T>中的私有内部类ScheduledFutureTask来给出思路:

 

  public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }


getDelay:

 public long getDelay(TimeUnit unit) {
            long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
            return d;
        }

now()函数的实现:
 private static final long NANO_ORIGIN = System.nanoTime();

    /**
     * Returns nanosecond time offset by origin
     */
    final long now() {
        return System.nanoTime() - NANO_ORIGIN;
    }

其中,time为给定的延迟时间. 上面的comparaTo方法中,其实在可以使用:

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
          
           long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

作为简单参考实现.

api介绍:

   peek: 获取但不移除此队列的头部;如果此队列为空,则返回 null。与 poll 不同,如果队列中没有到期元素可用,则此方法返回下一个将到期的元素. 其源代码如下:

 

 public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

 注意了,  q 是一个PriorityQueue.在分类"数据结构,算法"的系列中,笔者介绍过PriorityQueue时一个基于Comparator或者Comparable的基于数据结构"堆"的一种实现.也就是说,PriorityQueue是允许排序的,这个排序就是根据进入DelayQueue的元素的geyDelay()【Delayed接口中的方法】方法获取. 进一步看q.peek():

   if (size == 0)
            return null;
        return (E) queue[0];

可见其返回的是queue[0]. 

take 和 poll的api:

这两个api都是移除queue[0]但是做法却是不一样的:

take():

if (first == null) {
                    available.await();
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);//avaliable 是ReentrantLock在此类中的一个condition

take的话如果queue[0]没有到期的话,就会在condition条件中阻塞直到queue[0]的延迟期到达;

 

poll:

 E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;

当queue[0]的延迟期没到(first.getDelay(....) > 0),那么就返回null表示现在没有可取对象.

 

add/put/offer api:

add 和put都是 利用offer来添加一个元素,如果队列满的话就进行相应的扩容:

int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        int newCapacity = ((oldCapacity < 64)?
                           ((oldCapacity + 1) * 2):
                           ((oldCapacity / 2) * 3));
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        if (newCapacity < minCapacity)
            newCapacity = minCapacity;
        queue = Arrays.copyOf(queue, newCapacity);

 

这里先给出一个网上比较好的关于DelayQueue的应用.感谢blog主人的无私分享.

http://ideasforjava.javaeye.com/blog/657384

to be continue..

 

Avatar_small
seo service UK 说:
2023年12月06日 22:38

This Is Very Nice and Wonderful Keep Writing Such type Of Wonderful blogs


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter