FutureTask
FutureTask
参考资料 :
JDK api 文档
http://liuchangit.com/java/20.html 一个blog充斥着比较好的Future Pattern应用场景.感谢作者. :)
林昊 ---<<分布式java应用 ---基础与实践>>
在现实的应用场景中,经常遇到一个情况:一个查找需要本地和远程的协作. 例如在纵向分表的情况下. 由于其他列存在远程的数据库中,一次查询可能会引发两次查询:本地查询和用远程查询. 为例提高响应的时间,可以使用类似:
public List<Entity> findEntities(SearchContition cond){
//伪代码
SercherThread t = new RemoteSercherThread(cond);
t.start();
.....
}
这样的方法,使用线程去远程查找. 但是这样有一个问题会产生:
在这个模型中并不是One-way的方式,也就是说最终需要将本地搜索和远程搜索的数据结合到一起发送给客户端, 使用runnable的不足是run方法没有放回值.主线程不知道什么时候远程搜索能够完成. 这个时候,可能会使用到一些同步器的机制. 当然你最终可能会选择Future模式:
下面是JDK FutureTask中的JDK简述:
取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get方法。一旦计算完成,就不能再重新开始或取消计算。
FutureTask的特征:
1 一个futureTask可以避免执行两次;
下面是其run方法的源代码:
void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
上面的源代码可以看出
!compareAndSetState(0, RUNNING)
1 run方法不允许callable.call()方法执行两次!!! 这是一个重要的特征, 可以避免代码由于并发从而执行了两次.下面是一个实例代码:
代码from 林昊--<<分布式java应用 ---基础与实践>>
public Connection getDBConnection(String key){
if(connectionPool.containsKey(key)){
//connectionPool是一个ConcurrentHashMap
return connectionPool.get(key);
}else{
//create connection
connectionPool.put(key,connection);
return connection;
}
}
上面的数据库连接池虽然是使用了ConcurrentHashMap避免了并发操作.但是,多个细粒度的同步方法合在一起并不能证明就没有并发问题,本质原因是: 虽然单个concurrentHashMap的方法是同步的,但是整体来说这段代码并不是原子性的. 完全有可能出现下面的情况:
当thread_1执行connectionPool.containsKey(key)发现不存在key,进入到else block执行,正在此时,thread_2执行connectionPool.containsKey(key),由于thread_1正在创建connection不持有concurrentHashMap的lock,thread_2进入else block. 最终的结果是connection被创建了两次!!!
[友情提示:有些时候甚至是编译器对code的优化也能够造成线程的一场执行!] 可参考单例的线程问题.
这个时候,就能够使用FutureTask来避免此类问题.
2 从上futureTask 的run方法依然能够查出run方法在futureTask内部是在同一个线程中执行. 也就是说,直接调用futureTask.run()并不会在其内部开启一个thread执行.这也就是为什么大部分时候FutureTask是和ExecuteService一起完成. 当然,也可以写一个实现了Runnable对象来启动一个thread执行这个futureTask的run方法.
FutureTask<Object> futureTask = new FutureTask<Object>(new SubProblemRunner());
//下面的SubProblemWorker是实现了Runnable的类.
new Thread(new SubProblemWorker(futureTask)).start();
Object o = futureTask.get();
下面给出FutureTask$Sync这个类的innerSet(V v) 源代码:
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN)//
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
FutureTask中的Sync同样是继承了AbstractQueuedSynchronizer. 前面介绍过, AbstractQueuedSynchronizer的核心是维护state[int]状态.而FutureTask$Sync中使用:
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
来表示任务正在执行,已经执行或者被取消的状态,通过CAS算法:
(compareAndSetState(s, RAN)//s是getState()获取
来设置状态.
下面是一个FutureTask和ExecutorService的代码示例:
ExecutorService exec = Executors.newCachedThreadPool();
futureTask = exec.submit(new Callable<Object>() {
public Object call() {
Object result = searchFromRemoteDB(RemoCond);
return result;
}
});
SearchResult result = serchFromLocalDB(localCond);
Object result = futureTask.get();
[友情提示: 可以使用ThreadPoolExecutor来作为线程池,因为ThreadPoolExecutor得构造方法提供了更为灵活的线程池选项.例如执行策略和锁选用的缓冲数据结构.根据实际情况来制定线程池,使其能够提供更好的服务!].
FutureTask异常处理:
在FutureTask中的innerRun方法中,当程序出现问题的时候:
catch (Throwable ex) {
innerSetException(ex);
}
innerSetException方法:
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
result = null;
releaseShared(0);
done();
return;
}
}
}
上面的代码看出当程序抛出异常的时候,类似与innerSet()方法一样, 把异常设置到futureTask中. 在设计线程操作的时候,一定会遇到的问题就是如果工作者线程抛出了异常,怎么把异常设置到调用线程中. 这就需要"子线程持有调用者引用",与此同时,调用者提供setException方法.
从上面的代码还有一个问题引出:
由于FutureTask是一次性的, api并没有提供能够让他能够复用的使用情景. 当task抛出异常的时候,会设置:
compareAndSetState(s, RAN)
那么, 如何高效的复用其设置呢 ?
to be continue...
CountDownLatch
参考资料:
林昊 -----<<分布式java应用 基础与实践>>
JDK api 文档
CountDownLatch 功能:
其用的是一个减数机制, 首先调用new CountDownLatch(int N)构造一个CountDownLatch,注意,CountDownLatch是一次性的,他并不提供任何api给开发者去改变N的大小.线程调用countDownLatch.await()以后, 会进入阻塞状态, 等待其他线程调用countDownLatch.countDown()来完成N-1的操作,并且只有N=0的时候,所有因为调用countDownLatch.await()的线程才能被释放.
下面给出CountdownLatch在JDK文档中的示例程序:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1);//计数器设置为一,是一个开关操作即便是Worker有N个, //也照样会在调用startSignal.sawait是阻塞. CountDownLatch doneSignal = new CountDownLatch(N);//计数器设置为N for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet这里是先让线程执行run()方法中的startSignal.await(); startSignal.countDown(); // let all threads proceed当所有 doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown();//只有N个线程都执行到此步骤的时候 } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
上述的程序一共是N+1个线程在执行, 可以理解为主线程将一个比较苦难的问题交给N个工作者线程完成. 但是,具体要求是:
1 N个工作者线程需要等待主线程完成好准备工作: startSignal.await()来表示阻塞工作者;Main中的doSomeThingElse()表示主线程准备问题;Main中的startSignalCountDown表示问题准备好了,可以让工作者执行问题;
2 N个工作者线程每一个完成一部分,只有N个线程都完成了各自的工作才能把工作的结果交换给主线程并检查器结果正确性.run中的doneSignal.countDown()以及CountDownLatch doneSignal = new CountDownLatch(N)可以说明这个需求;
CoundownLatch countDown方法使用内部类Sync来实现. Sync则是继承自AbstractQueuedSynchronizer. AbstractQueuedSynchronizer在内部是通过维护一个state变量来表示控制的数量.
countDown()方法的实现:
sync.releaseShared(1);
sync.releaseShared方法的实现则是通过无所算法CAS(Compare and set) 通过CAS指令来完成乐观操作:
public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();//获取AbstractQueuedSynchronizer中的state变量.
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) //使用轮询的方式来完成状态的设置.
return nextc == 0;
}
}
}
to be continue...
CopyOnWriteArrayList
参考资料:
JDK 文档
CoppyOnWriteArrayList api 说明:
ArrayList的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。
这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。“快照”风格的迭代器方法在创建迭代器时使用了对数组状态的引用。此数组在迭代器的生存期内不会更改,因此不可能发生冲突,并且迭代器保证不会抛出 ConcurrentModificationException。创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。在迭代器上进行的元素更改操作(remove、set 和 add)不受支持。这些方法将抛出 UnsupportedOperationException。
允许使用所有元素,包括 null。
内存一致性效果:当存在其他并发 collection 时,将对象放入 CopyOnWriteArrayList
之前的线程中的操作Happen-Before随后通过另一线程从 CopyOnWriteArrayList
中访问或移除该元素的操作。
解决问题: 但选择使用ArrayList时候, 经常能够在如下示例代码中,在多线程的环境下抛出ConcurrentModificationException:
示例代码 01:
private ReentrantLock lock = new ReentrantLock();
private volatile boolean modified = false;
private Condition modifiedCondition = lock.newCondition();
private ArrayList<String> arrayList = null;
public ArrayListCache(ArrayList<String> list){
this.arrayList = list;
}
public void iterateList(){
lock.lock();
try{
Iterator<String> iterator = arrayList.iterator();
while(! modified){
modifiedCondition.await();
}
while(iterator.hasNext()){
String item = iterator.next();
//doSomeThing
}
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void addToCache(String item){
lock.lock();
try{
arrayList.add(item);
modified = true;
modifiedCondition.signal();
}finally{
lock.unlock();
}
}
上述代码中,可能会抛出ConcurrentModificationException异常. 这是因为当thread_1在得到当前的arrayList的iterator之后会等待thread_2去调用addToCache方法, 当thread_2更改了list并signal thread_1之后, 由于arrayList被改变, iterator.next()就会抛出异常; 可以给出iterator.next()的源代码如下:
checkForComodification();
try {
E next = get(cursor);
lastRet = cursor++;
return next;
} catch (IndexOutOfBoundsException e) {
checkForComodification();
throw new NoSuchElementException();
}
上述的
checkForComodification();
将会抛出此异常;
在这种情况下, 可以考虑使用CopyOnWriteArrayList来代替ArrayList.更改示例代码:
private ReentrantLock lock = new ReentrantLock();
private volatile boolean modified = false;
private Condition modifiedCondition = lock.newCondition();
private CopyOnWriteArrayList<String> arrayList = null;
public ArrayListCache(ArrayList<String> list){
if(list != null){
this.arrayList = new CopyOnWriteArrayList<String>(list);
}else{
//do ypu code if the parameter is null
}
}
CopyOnWriteArrayList处理更改操作的时候,会先复制一份,也就是说实在一个copy里面进行的,更改之后将会set到原来的引用上,如下式CopyOnWriteArrayList add方法的源代码参考:
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray(); //get the orig arrayList
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements); //set after update
return true;
} finally {
lock.unlock();
}
to be continue ...
ReentantLock
参考资料:
1 java 并发实践
2 JDK 文档以及源代码
Reentantlock在jdk文档中的介绍如下:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
其完成了java synchorized的语义.
此类的构造方法接受一个可选的公平 参数。当设置为 true
时,在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程。否则此锁将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁)相比,使用公平锁的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁和保证锁分配的均衡性时差异较小。不过要注意的是,公平锁不能保证线程调度的公平性。因此,使用公平锁的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁时。还要注意的是,未定时的 tryLock方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁是可用的,此方法就可以获得成功。
上述事例中,涉及到经常使用的ReentrantLock的方法, lock()和unlock().
其外, ReentantLock 提供了Condition的相关操作.
Condition
将 Object
监视器方法( wait,notify和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock
实现组合使用,为每个对象提供多个等待 set(wait-set)
Condition Exsample [from JDK api --- Condition]
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
示例代码 2
针对上面的例子, 如下说明:
1 在调用lock.await()的时候,必须获取lock的锁定(而不是简单的synchronized(notEmpty)). 否则会抛出异常:
java.lang.IllegalMonitorStateException 注意了, 当使用Object中的wait()和signal()或者signalAll()时,一定要确保已经获取了监视对象的锁定. 如下代码就是错的:
synchronized(someObject){
await();// 错误,这里的await需要锁定this 而不是object可以改为someObject.wait()
//signal()同上.
...
}
2 condiionObject.await()必须出现在while(condition)中, 原因有:
2.1 防止虚假唤醒;
2.2 解释代码为【示例代码2】中的while改为if, get方法的notFull.signal()改为notFull.signalAll()
如果有N个线程在notFull这个Condition对象的wait-set中等待【put操作中】, 只有一个活跃线程调用这个condition对象的notifyAll方法, 即是在take操作中的notFull.signal()误写为notFull.signalAll(); 那么,N个线程会同步,依次执行
items[putptr] = x;
假设这个时候putptr已经是MAX_SIZE -2, 线程数N > 2; 那么会抛出相关的越界异常;
上述的的解释可能会比较“不现实”. 但是, while语句的作用就是强制N个等待线程在某种原因唤醒的时候, 依然能够检查等待条件.
3 signal的操作会等待锁的释放,一般写的程序的最后是一种编程习惯.
4 await()和sleep区别, await()将会让线程处于休眠状态释放掉锁. 而sleep将"持锁而睡".
使用conditionObject.await(long time,TimeUnit unit);
api : 此方法会等待相关的conditionObject.signal()或者conditionObject.signalAll()方法, signal或者signalAll在给定的时间time内完成,那么就返回true, 否则返回false. 注意,此方法并不是去获取锁本身,而是"等待通知".当然的,当通知出发此方法的返回时, 当前被唤醒的对象是一定会去获取lock的.
如下代码:[保证ReentrantLock和condition是同一个对象]
public void run() {
lock.lock();
try{
if("thread_1".equals(threadName)){
System.out.println("thread_1 到达...");
//测试代码就不写入while()中,通常情况避免虚假唤醒是要将await至于while(condition)
//中的.
boolean result = condition.await(4, TimeUnit.SECONDS);
if(result){
System.out.println("true");
}else{
System.out.println("false");
}
}else{
System.out.println("thread_2 到达...");
//Thread.sleep(500);
condition.signal();
}
}catch(Exception e){
e.printStackTrace();
}finally{
System.out.println(threadName + "释放锁");
lock.unlock();
}
}
当应用程序在运行期间等待一个执行时间不太确定的操作时候,用此方法是很合适的. 可以用一个变量来改变下次等待的时间,如下伪代码:
.....
while(condition){
try{
boolean waitTime =conditionObject.await(wantedTime,...);
if(waitTime == false) {
//超时
changeWantedTime(...);
}
}
}
使用乐观的tryLock(): 当不成功时候,会迅速返回false而不是进入等待区等待.
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
<to be continue...>