BlockingQueue---DelayQueue
参考资料:
林昊 --- <<分布式java应用 基础与实践>>
JDK API 文档
BlockingQueue:
在使用ThreadPoolExecutor构造器时, 有一个参数就是BlockingQueue<Runnable>. JDK的并发包中,有N个BlockingQueue的实现 :
DelayQueue: Delayed是一个无界的阻塞队列.不允许将NULL放入其中. 它的并发是使用了ReentrantLock和对应的一个condition来完成. 其外,DelayQueue的队列头部放置的是延迟期满了最久的一个元素. 如果元素的getDelay()方法返回0或者小于0的时候才能将其出列. DelayQueue中的元素都是实现了Delayed接口,在JDK中, 而Delayed接口继承了Comparable, 在使用DelayQueue之前就需要自己编写一个Delayed的实现类,JDK中,可参考FutureTask<T>中的私有内部类ScheduledFutureTask来给出思路:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
getDelay:
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
now()函数的实现:
private static final long NANO_ORIGIN = System.nanoTime();
/**
* Returns nanosecond time offset by origin
*/
final long now() {
return System.nanoTime() - NANO_ORIGIN;
}
其中,time为给定的延迟时间. 上面的comparaTo方法中,其实在可以使用:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
作为简单参考实现.
api介绍:
peek: 获取但不移除此队列的头部;如果此队列为空,则返回 null。与 poll 不同,如果队列中没有到期元素可用,则此方法返回下一个将到期的元素. 其源代码如下:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
注意了, q 是一个PriorityQueue.在分类"数据结构,算法"的系列中,笔者介绍过PriorityQueue时一个基于Comparator或者Comparable的基于数据结构"堆"的一种实现.也就是说,PriorityQueue是允许排序的,这个排序就是根据进入DelayQueue的元素的geyDelay()【Delayed接口中的方法】方法获取. 进一步看q.peek():
if (size == 0)
return null;
return (E) queue[0];
可见其返回的是queue[0].
take 和 poll的api:
这两个api都是移除queue[0]但是做法却是不一样的:
take():
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);//avaliable 是ReentrantLock在此类中的一个condition
take的话如果queue[0]没有到期的话,就会在condition条件中阻塞直到queue[0]的延迟期到达;
poll:
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
当queue[0]的延迟期没到(first.getDelay(....) > 0),那么就返回null表示现在没有可取对象.
add/put/offer api:
add 和put都是 利用offer来添加一个元素,如果队列满的话就进行相应的扩容:
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = ((oldCapacity < 64)?
((oldCapacity + 1) * 2):
((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);
这里先给出一个网上比较好的关于DelayQueue的应用.感谢blog主人的无私分享.
http://ideasforjava.javaeye.com/blog/657384
to be continue..
并发编程之状态的依赖(一)
参考资料:
Doug Lea -------------<<java并发编程 设计原则与模式>>
场景 1 : 当一条线程的执行过程中的错误时来源于其他对象的临时错误或者这个错误是某个时间临时出现的,是可重试的.那么,当错误的时候, 可以稍微等待一段时间并使用轮询的方式重新尝试这个操作.
实例代码: 当远程连接服务器的时候,可能会接受到RimeOutException:
long delayTime = 5000 +(long)(Math.random()*5000);
for(;;){
//基于可恢复异常的轮询操作.
try{
//do your remote connection
}catch(TimeOutException e){
Thread.sleep(delayTime);//注意,sleep是含有锁的睡眠,可能在某些场景不合适.
delayTime = .....//重新定义一个更合适时间
}
}
注意: 上述的操作中,sleep操作可能在某种场景是不合适的:比如说这段代码是在某个高并发的时刻持有某个频繁会被请求的锁.那么,这个sleep会造成休眠时间内很多现成休眠.这个代价是比较大的.
上述的操作有一种要求,要求客户方会抛出相关的timeOutException.试想:
1 倘若接受不到这个异常
2 调用的方法根本不会抛出此异常
3 可能从线程的执行到抛出异常的时间比正常处理的时间还要多,不希望等到真正的超市抛出时候才处理.
这个时候, 自定义的超市实现可能会比较好,下面的代码来自互联网, 笔者觉得此方法是一种思路,所以将其中的部分代码截出, 感谢原作者提供思路:
<代码来自互联网>
public class TimeoutThread extends Thread{
/**
* 计时器超时时间
*/
private long timeout;
/**
* 计时是否被取消
*/
private boolean isCanceled = false;
/**
* 当计时器超时时抛出的异常
*/
private TimeoutException timeoutException;
/**
* 构造器
* @param timeout 指定超时的时间
*/
public TimeoutThread(long timeout,TimeoutException timeoutErr) {
super();
this.timeout = timeout;
this.timeoutException = timeoutErr;
//设置本线程为守护线程
this.setDaemon(true);
}
/**
* 取消计时
*/
public synchronized void cancel()
{
isCanceled = true;
}
/**
* 启动超时计时器
*/
public void run()
{
try {
Thread.sleep(timeout);
if(!isCanceled)
throw timeoutException;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
使用:
TimeoutThread t = new TimeoutThread(5000,new TimeoutException("超时"));
try{
t.start();
.....要检测超时的程序段....
t.cancel();
}catch (TimeoutException e)
{
...对超时的处理...
}
场景二: 子线程在执行的时候如果抛出异常, 希望把异常传递给一个专门负责处理异常的"异常处理器". 也可以传回给主线程来进行处理.
示例代码摘自《java 并发编程 设计原则与模式》
异常时候只通知一个线程
interface ServerWithException {
void service() throws ServiceException;
}
interface ServiceExceptionHandler {
void handle(ServiceException e);
}
class ServerImpl implements ServerWithException {
public void service() throws ServiceException {}
}
class HandlerImpl implements ServiceExceptionHandler {
public void handle(ServiceException e) {}
}
class HandledService implements ServerWithException {
final ServerWithException server = new ServerImpl();
final ServiceExceptionHandler handler = new HandlerImpl();
public void service() { // no throw clause
try {
server.service();
}
catch (ServiceException e) {
handler.handle(e);
}
}
}
下面是基于事件监听的方式方处理"异常通知合作伙伴"的方式:
class ExceptionEvent extends java.util.EventObject {
public final Throwable theException;
public ExceptionEvent(Object src, Throwable ex) {
super(src);
theException = ex;
}
}
class ExceptionEventListener { // Incomplete
public void exceptionOccured(ExceptionEvent ee) {
// ... respond to exception...
}
}
class ServiceIssuingExceptionEvent { // Incomplete
// ...
private final CopyOnWriteArrayList handlers =
new CopyOnWriteArrayList();
public void addHandler(ExceptionEventListener h) {
handlers.add(h);
}
public void service() {
// ...
boolean failed = true;
if (failed) {
Throwable ex = new ServiceException();
ExceptionEvent ee = new ExceptionEvent(this, ex);
for (Iterator it = handlers.iterator(); it.hasNext();) {
ExceptionEventListener l =
(ExceptionEventListener)(it.next());
l.exceptionOccured(ee);
}
}
}
}
FutureTask
FutureTask
参考资料 :
JDK api 文档
http://liuchangit.com/java/20.html 一个blog充斥着比较好的Future Pattern应用场景.感谢作者. :)
林昊 ---<<分布式java应用 ---基础与实践>>
在现实的应用场景中,经常遇到一个情况:一个查找需要本地和远程的协作. 例如在纵向分表的情况下. 由于其他列存在远程的数据库中,一次查询可能会引发两次查询:本地查询和用远程查询. 为例提高响应的时间,可以使用类似:
public List<Entity> findEntities(SearchContition cond){
//伪代码
SercherThread t = new RemoteSercherThread(cond);
t.start();
.....
}
这样的方法,使用线程去远程查找. 但是这样有一个问题会产生:
在这个模型中并不是One-way的方式,也就是说最终需要将本地搜索和远程搜索的数据结合到一起发送给客户端, 使用runnable的不足是run方法没有放回值.主线程不知道什么时候远程搜索能够完成. 这个时候,可能会使用到一些同步器的机制. 当然你最终可能会选择Future模式:
下面是JDK FutureTask中的JDK简述:
取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get方法。一旦计算完成,就不能再重新开始或取消计算。
FutureTask的特征:
1 一个futureTask可以避免执行两次;
下面是其run方法的源代码:
void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
上面的源代码可以看出
!compareAndSetState(0, RUNNING)
1 run方法不允许callable.call()方法执行两次!!! 这是一个重要的特征, 可以避免代码由于并发从而执行了两次.下面是一个实例代码:
代码from 林昊--<<分布式java应用 ---基础与实践>>
public Connection getDBConnection(String key){
if(connectionPool.containsKey(key)){
//connectionPool是一个ConcurrentHashMap
return connectionPool.get(key);
}else{
//create connection
connectionPool.put(key,connection);
return connection;
}
}
上面的数据库连接池虽然是使用了ConcurrentHashMap避免了并发操作.但是,多个细粒度的同步方法合在一起并不能证明就没有并发问题,本质原因是: 虽然单个concurrentHashMap的方法是同步的,但是整体来说这段代码并不是原子性的. 完全有可能出现下面的情况:
当thread_1执行connectionPool.containsKey(key)发现不存在key,进入到else block执行,正在此时,thread_2执行connectionPool.containsKey(key),由于thread_1正在创建connection不持有concurrentHashMap的lock,thread_2进入else block. 最终的结果是connection被创建了两次!!!
[友情提示:有些时候甚至是编译器对code的优化也能够造成线程的一场执行!] 可参考单例的线程问题.
这个时候,就能够使用FutureTask来避免此类问题.
2 从上futureTask 的run方法依然能够查出run方法在futureTask内部是在同一个线程中执行. 也就是说,直接调用futureTask.run()并不会在其内部开启一个thread执行.这也就是为什么大部分时候FutureTask是和ExecuteService一起完成. 当然,也可以写一个实现了Runnable对象来启动一个thread执行这个futureTask的run方法.
FutureTask<Object> futureTask = new FutureTask<Object>(new SubProblemRunner());
//下面的SubProblemWorker是实现了Runnable的类.
new Thread(new SubProblemWorker(futureTask)).start();
Object o = futureTask.get();
下面给出FutureTask$Sync这个类的innerSet(V v) 源代码:
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN)//
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
FutureTask中的Sync同样是继承了AbstractQueuedSynchronizer. 前面介绍过, AbstractQueuedSynchronizer的核心是维护state[int]状态.而FutureTask$Sync中使用:
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
来表示任务正在执行,已经执行或者被取消的状态,通过CAS算法:
(compareAndSetState(s, RAN)//s是getState()获取
来设置状态.
下面是一个FutureTask和ExecutorService的代码示例:
ExecutorService exec = Executors.newCachedThreadPool();
futureTask = exec.submit(new Callable<Object>() {
public Object call() {
Object result = searchFromRemoteDB(RemoCond);
return result;
}
});
SearchResult result = serchFromLocalDB(localCond);
Object result = futureTask.get();
[友情提示: 可以使用ThreadPoolExecutor来作为线程池,因为ThreadPoolExecutor得构造方法提供了更为灵活的线程池选项.例如执行策略和锁选用的缓冲数据结构.根据实际情况来制定线程池,使其能够提供更好的服务!].
FutureTask异常处理:
在FutureTask中的innerRun方法中,当程序出现问题的时候:
catch (Throwable ex) {
innerSetException(ex);
}
innerSetException方法:
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
result = null;
releaseShared(0);
done();
return;
}
}
}
上面的代码看出当程序抛出异常的时候,类似与innerSet()方法一样, 把异常设置到futureTask中. 在设计线程操作的时候,一定会遇到的问题就是如果工作者线程抛出了异常,怎么把异常设置到调用线程中. 这就需要"子线程持有调用者引用",与此同时,调用者提供setException方法.
从上面的代码还有一个问题引出:
由于FutureTask是一次性的, api并没有提供能够让他能够复用的使用情景. 当task抛出异常的时候,会设置:
compareAndSetState(s, RAN)
那么, 如何高效的复用其设置呢 ?
to be continue...
CopyOnWriteArrayList
参考资料:
JDK 文档
CoppyOnWriteArrayList api 说明:
ArrayList的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。
这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。“快照”风格的迭代器方法在创建迭代器时使用了对数组状态的引用。此数组在迭代器的生存期内不会更改,因此不可能发生冲突,并且迭代器保证不会抛出 ConcurrentModificationException。创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。在迭代器上进行的元素更改操作(remove、set 和 add)不受支持。这些方法将抛出 UnsupportedOperationException。
允许使用所有元素,包括 null。
内存一致性效果:当存在其他并发 collection 时,将对象放入 CopyOnWriteArrayList
之前的线程中的操作Happen-Before随后通过另一线程从 CopyOnWriteArrayList
中访问或移除该元素的操作。
解决问题: 但选择使用ArrayList时候, 经常能够在如下示例代码中,在多线程的环境下抛出ConcurrentModificationException:
示例代码 01:
private ReentrantLock lock = new ReentrantLock();
private volatile boolean modified = false;
private Condition modifiedCondition = lock.newCondition();
private ArrayList<String> arrayList = null;
public ArrayListCache(ArrayList<String> list){
this.arrayList = list;
}
public void iterateList(){
lock.lock();
try{
Iterator<String> iterator = arrayList.iterator();
while(! modified){
modifiedCondition.await();
}
while(iterator.hasNext()){
String item = iterator.next();
//doSomeThing
}
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void addToCache(String item){
lock.lock();
try{
arrayList.add(item);
modified = true;
modifiedCondition.signal();
}finally{
lock.unlock();
}
}
上述代码中,可能会抛出ConcurrentModificationException异常. 这是因为当thread_1在得到当前的arrayList的iterator之后会等待thread_2去调用addToCache方法, 当thread_2更改了list并signal thread_1之后, 由于arrayList被改变, iterator.next()就会抛出异常; 可以给出iterator.next()的源代码如下:
checkForComodification();
try {
E next = get(cursor);
lastRet = cursor++;
return next;
} catch (IndexOutOfBoundsException e) {
checkForComodification();
throw new NoSuchElementException();
}
上述的
checkForComodification();
将会抛出此异常;
在这种情况下, 可以考虑使用CopyOnWriteArrayList来代替ArrayList.更改示例代码:
private ReentrantLock lock = new ReentrantLock();
private volatile boolean modified = false;
private Condition modifiedCondition = lock.newCondition();
private CopyOnWriteArrayList<String> arrayList = null;
public ArrayListCache(ArrayList<String> list){
if(list != null){
this.arrayList = new CopyOnWriteArrayList<String>(list);
}else{
//do ypu code if the parameter is null
}
}
CopyOnWriteArrayList处理更改操作的时候,会先复制一份,也就是说实在一个copy里面进行的,更改之后将会set到原来的引用上,如下式CopyOnWriteArrayList add方法的源代码参考:
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray(); //get the orig arrayList
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements); //set after update
return true;
} finally {
lock.unlock();
}
to be continue ...
ReentantLock
参考资料:
1 java 并发实践
2 JDK 文档以及源代码
Reentantlock在jdk文档中的介绍如下:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
其完成了java synchorized的语义.
此类的构造方法接受一个可选的公平 参数。当设置为 true
时,在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程。否则此锁将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁)相比,使用公平锁的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁和保证锁分配的均衡性时差异较小。不过要注意的是,公平锁不能保证线程调度的公平性。因此,使用公平锁的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁时。还要注意的是,未定时的 tryLock方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁是可用的,此方法就可以获得成功。
上述事例中,涉及到经常使用的ReentrantLock的方法, lock()和unlock().
其外, ReentantLock 提供了Condition的相关操作.
Condition
将 Object
监视器方法( wait,notify和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock
实现组合使用,为每个对象提供多个等待 set(wait-set)
Condition Exsample [from JDK api --- Condition]
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
示例代码 2
针对上面的例子, 如下说明:
1 在调用lock.await()的时候,必须获取lock的锁定(而不是简单的synchronized(notEmpty)). 否则会抛出异常:
java.lang.IllegalMonitorStateException 注意了, 当使用Object中的wait()和signal()或者signalAll()时,一定要确保已经获取了监视对象的锁定. 如下代码就是错的:
synchronized(someObject){
await();// 错误,这里的await需要锁定this 而不是object可以改为someObject.wait()
//signal()同上.
...
}
2 condiionObject.await()必须出现在while(condition)中, 原因有:
2.1 防止虚假唤醒;
2.2 解释代码为【示例代码2】中的while改为if, get方法的notFull.signal()改为notFull.signalAll()
如果有N个线程在notFull这个Condition对象的wait-set中等待【put操作中】, 只有一个活跃线程调用这个condition对象的notifyAll方法, 即是在take操作中的notFull.signal()误写为notFull.signalAll(); 那么,N个线程会同步,依次执行
items[putptr] = x;
假设这个时候putptr已经是MAX_SIZE -2, 线程数N > 2; 那么会抛出相关的越界异常;
上述的的解释可能会比较“不现实”. 但是, while语句的作用就是强制N个等待线程在某种原因唤醒的时候, 依然能够检查等待条件.
3 signal的操作会等待锁的释放,一般写的程序的最后是一种编程习惯.
4 await()和sleep区别, await()将会让线程处于休眠状态释放掉锁. 而sleep将"持锁而睡".
使用conditionObject.await(long time,TimeUnit unit);
api : 此方法会等待相关的conditionObject.signal()或者conditionObject.signalAll()方法, signal或者signalAll在给定的时间time内完成,那么就返回true, 否则返回false. 注意,此方法并不是去获取锁本身,而是"等待通知".当然的,当通知出发此方法的返回时, 当前被唤醒的对象是一定会去获取lock的.
如下代码:[保证ReentrantLock和condition是同一个对象]
public void run() {
lock.lock();
try{
if("thread_1".equals(threadName)){
System.out.println("thread_1 到达...");
//测试代码就不写入while()中,通常情况避免虚假唤醒是要将await至于while(condition)
//中的.
boolean result = condition.await(4, TimeUnit.SECONDS);
if(result){
System.out.println("true");
}else{
System.out.println("false");
}
}else{
System.out.println("thread_2 到达...");
//Thread.sleep(500);
condition.signal();
}
}catch(Exception e){
e.printStackTrace();
}finally{
System.out.println(threadName + "释放锁");
lock.unlock();
}
}
当应用程序在运行期间等待一个执行时间不太确定的操作时候,用此方法是很合适的. 可以用一个变量来改变下次等待的时间,如下伪代码:
.....
while(condition){
try{
boolean waitTime =conditionObject.await(wantedTime,...);
if(waitTime == false) {
//超时
changeWantedTime(...);
}
}
}
使用乐观的tryLock(): 当不成功时候,会迅速返回false而不是进入等待区等待.
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
<to be continue...>