code_money_guji's Blog

Happy coding

FutureTask

code_money_guji posted @ 2011年2月24日 03:04 in Concurrency in java , 3025 阅读

FutureTask

参考资料 :

             JDK api 文档

             http://liuchangit.com/java/20.html 一个blog充斥着比较好的Future Pattern应用场景.感谢作者. :)

            林昊 ---<<分布式java应用 ---基础与实践>>

 

在现实的应用场景中,经常遇到一个情况:一个查找需要本地和远程的协作. 例如在纵向分表的情况下. 由于其他列存在远程的数据库中,一次查询可能会引发两次查询:本地查询和用远程查询. 为例提高响应的时间,可以使用类似:

 public List<Entity> findEntities(SearchContition cond){
      //伪代码
      SercherThread t = new RemoteSercherThread(cond); 
      t.start();
      
      .....     

  }

这样的方法,使用线程去远程查找. 但是这样有一个问题会产生:

在这个模型中并不是One-way的方式,也就是说最终需要将本地搜索和远程搜索的数据结合到一起发送给客户端, 使用runnable的不足是run方法没有放回值.主线程不知道什么时候远程搜索能够完成. 这个时候,可能会使用到一些同步器的机制. 当然你最终可能会选择Future模式:

下面是JDK FutureTask中的JDK简述:

取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get方法。一旦计算完成,就不能再重新开始或取消计算

FutureTask的特征:

1 一个futureTask可以避免执行两次;

下面是其run方法的源代码:

void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }

上面的源代码可以看出

!compareAndSetState(0, RUNNING)

1 run方法不允许callable.call()方法执行两次!!! 这是一个重要的特征, 可以避免代码由于并发从而执行了两次.下面是一个实例代码:

代码from 林昊--<<分布式java应用 ---基础与实践>>

public Connection getDBConnection(String key){
     if(connectionPool.containsKey(key)){
       //connectionPool是一个ConcurrentHashMap
         return connectionPool.get(key);
     }else{
        //create connection
            connectionPool.put(key,connection);
            return connection;
       }
  

}

上面的数据库连接池虽然是使用了ConcurrentHashMap避免了并发操作.但是,多个细粒度的同步方法合在一起并不能证明就没有并发问题,本质原因是: 虽然单个concurrentHashMap的方法是同步的,但是整体来说这段代码并不是原子性的. 完全有可能出现下面的情况:

当thread_1执行connectionPool.containsKey(key)发现不存在key,进入到else block执行,正在此时,thread_2执行connectionPool.containsKey(key),由于thread_1正在创建connection不持有concurrentHashMap的lock,thread_2进入else block. 最终的结果是connection被创建了两次!!!

[友情提示:有些时候甚至是编译器对code的优化也能够造成线程的一场执行!] 可参考单例的线程问题.

这个时候,就能够使用FutureTask来避免此类问题.

 

2 从上futureTask 的run方法依然能够查出run方法在futureTask内部是在同一个线程中执行. 也就是说,直接调用futureTask.run()并不会在其内部开启一个thread执行.这也就是为什么大部分时候FutureTask是和ExecuteService一起完成. 当然,也可以写一个实现了Runnable对象来启动一个thread执行这个futureTask的run方法.

               FutureTask<Object> futureTask = new FutureTask<Object>(new SubProblemRunner());
               //下面的SubProblemWorker是实现了Runnable的类.
 	    new Thread(new SubProblemWorker(futureTask)).start();
		
	    Object o = futureTask.get();
	   

下面给出FutureTask$Sync这个类的innerSet(V v) 源代码:

   void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)// 
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
		    return;
                }
            }
        }

FutureTask中的Sync同样是继承了AbstractQueuedSynchronizer. 前面介绍过, AbstractQueuedSynchronizer的核心是维护state[int]状态.而FutureTask$Sync中使用:

        private static final int RUNNING   = 1;
        /** State value representing that task ran */
        private static final int RAN       = 2;
        /** State value representing that task was cancelled */
        private static final int CANCELLED = 4;

来表示任务正在执行,已经执行或者被取消的状态,通过CAS算法:

(compareAndSetState(s, RAN)//s是getState()获取

来设置状态.

下面是一个FutureTask和ExecutorService的代码示例:

ExecutorService exec = Executors.newCachedThreadPool();
	 
futureTask = exec.submit(new Callable<Object>() {
	 public Object call() {
	 
	  Object result = searchFromRemoteDB(RemoCond);
	 
	            return result;
	        }
	    });

    SearchResult result = serchFromLocalDB(localCond);
   Object result = futureTask.get(); 
	 

[友情提示: 可以使用ThreadPoolExecutor来作为线程池,因为ThreadPoolExecutor得构造方法提供了更为灵活的线程池选项.例如执行策略和锁选用的缓冲数据结构.根据实际情况来制定线程池,使其能够提供更好的服务!].

 

FutureTask异常处理:

在FutureTask中的innerRun方法中,当程序出现问题的时候:

catch (Throwable ex) {
                innerSetException(ex);
            }

innerSetException方法:
 void innerSetException(Throwable t) {
        for (;;) {
        int s = getState();
        if (s == RAN)
            return;
                if (s == CANCELLED) {
            // aggressively release to set runner to null,
            // in case we are racing with a cancel request
            // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
        if (compareAndSetState(s, RAN)) {
                    exception = t;
                    result = null;
                    releaseShared(0);
                    done();
            return;
                }
        }
        }


上面的代码看出当程序抛出异常的时候,类似与innerSet()方法一样, 把异常设置到futureTask中. 在设计线程操作的时候,一定会遇到的问题就是如果工作者线程抛出了异常,怎么把异常设置到调用线程中. 这就需要"子线程持有调用者引用",与此同时,调用者提供setException方法.

从上面的代码还有一个问题引出:

 由于FutureTask是一次性的, api并没有提供能够让他能够复用的使用情景. 当task抛出异常的时候,会设置:

compareAndSetState(s, RAN)

那么, 如何高效的复用其设置呢 ?

 

to be continue...


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter