随着Java 5的到来,Java对并发的重新思考也浮出了水面。这些新思想主要体现在 java.util.concurrent
包上,其中包含了大量用来编写多线程代码的新工具。在后续版本中,这些工具不断得到改进,但其工作方式却依然保持不变,并且直到今天还是对开发人员很有帮助。
我们马上快速过一下java.util.concurrent
中主要的类及相关包,比如atomic和locks包。我们会向你介绍这些类及其适用的情景。你也应该读一下它们的Javadoc,并尝试熟悉整个包——它们使编写并发类容易多了。
代码迁移
如果你还有基于(Java 5之前的)老办法编写的多线程代码,建议你用
java.util.concurrent
重构。按我们的经验,几乎在所有案例中,如果你特意把代码迁移到新的API中,代码就会得以改进。你的努力付出将使代码在清晰性和可靠性上得到极大提升。
请把这次讨论当做并发编程的启动工具,而不是一次研讨会。想要充分利用好java.util.concurrent
,你还需要知道更多的知识。
4.3.1 原子类:java.util.concurrent.atomic
java.util.concurrent.atomic
中有几个名字以Atomic
打头的类。它们的语义基本上和volatile
一样,只是封装在一个API里了,这个API包含为操作提供的适当的原子(要么不做,要做就全做)方法。对于开发人员来说,这是非常简单的避免在共享数据上出现竞争危害1的办法。
1 竞争危害(race hazard)又名竞态条件(race condition)。一个系统或进程的输出,依赖于不受控制事件的出现顺序或时机。例如两个进程都试图修改一个共享内存的内容。在没有并发控制的情况下,最后的结果取决于两个进程的执行顺序与时机,如果发生了并发访问冲突,最后的结果是不正确的。——译者注
在编写这些实现时利用了现代处理器的特性,所以如果能从硬件和操作系统上得到适当的支持,它们可以是非阻塞(无需线程锁)的,而大多数现代系统都能提供这种支持。常见的用法是实现序列号机制,在AtomicInteger
或AtomicLong
上用原子操作getAndIncrement
方法。
要做序列号,该类应该有个nextId
方法,每次调用时肯定能返回一个唯一并且完全增长的数值。这和数据库里序列号的概念很像(所以这个变量叫这个名字)。
来看一段产生序列号的代码:
private final AtomicLong sequenceNumber = new AtomicLong(0);public long nextId { return sequenceNumber.getAndIncrement;}
注意 原子类不是从有相似名称的类继承而来的,所以
AtomicBoolean
不能当Boolean
用,AtomicInteger
也不是Integer
,虽然它确实扩展了Number
。
接下来,我们会检查一下java.util.concurrent
如何对同步模型的核心建模——Lock
接口。
4.3.2 线程锁:java.util.concurrent.locks
块结构同步方式基于锁这样一个简单的概念。这种方式有几个缺点。
- 锁只有一种类型。
- 对被锁住对象的所有同步操作都是一样的作用。
- 在同步代码块或方法开始时取得线程锁。
- 在同步代码块或方法结束时释放线程锁。
- 线程或者得到锁,或者阻塞——没有其他可能。
如果我们要重构对线程锁的支持,有几处可以得到提升。
- 添加不同类型的锁,比如读取锁和写入锁。
- 对锁的阻塞没有限制,即允许在一个方法中上锁,在另一个方法中解锁)。
- 如果线程得不到锁,比如锁由另外一个线程持有,就允许该线程后退或继续执行,或者做点别的事情——运用
tryLock
方法。 - 允许线程尝试取锁,并可以在超过等待时间后放弃。
能实现以上这些的关键就是java.util.concurrent.locks
中的Lock
接口。还有它的两个实现类。
ReentrantLock
——本质上跟用在同步块上那种锁是一样的,但它要稍微灵活点儿。ReentrantReadWriteLock
——在需要读取很多线程而写入很少线程时,用它性能会更好。
块结构并发能实现的所有功能都可以用Lock
接口实现。下面是用ReentrantLock
重写的那个死锁的例子。
代码清单4-4 用ReentrantLock
重写死锁
private final Lock lock = new ReentrantLock;public void propagateUpdate(Update upd_, MicroBlogNode backup_) { //每个线程都先锁住自己的锁 lock.lock; try { System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent); //调用confirmUpdate知悉其他线程 backup_.confirmUpdate(this, upd_); } finally { lock.unlock; }}public void confirmUpdate(MicroBlogNode other_, Update upd_) { //①尝试锁住其他线程 lock.lock; try{ System.out.println(iden +": recvd confirm: "+upd_.getUpdateText +" from "+ other_.getIdentifier); } finally { lock.unlock; }}
锁住其他线程的尝试①通常都会失败,因为它已经被锁住了(如图4-3所示)。这就是导致死锁出现的原因。
用锁时带上try...finally
把
lock
放在try...finally
块中(释放也在这里)的模式是另外一个好用的小工具。在跟块结构并发相似的情景中它同样很好用。而另一方面,如果需要传递Lock
对象,比如从一个方法中返回,则不能用这个模式。使用Lock
对象可能要比块结构方式强大得多,但有时用它们很难设计出完善的锁定策略。
对付死锁的策略有很多,但你应该特别注意一个不起任何作用的策略。请看下面这段代码中新版的propagateUpdate
方法(假定confirmUpdate
也做出了同样的修改)。在这个例子中,我们用带有超时机制的tryLock
替换了无条件的锁。通过这种办法可以为其他线程提供得到线程锁的机会,从而去除死锁。
代码清单4-5 一次有缺陷的解决死锁问题的尝试
public void propagateUpdate(Update upd_, MicroBlogNode backup_) { boolean acquired = false; while (!acquired) { try { int wait = (int)(Math.random * 10); //尝试与锁定,超时时长随机 acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS); if (acquired) { System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent); //在其他线程上确认 backup_.confirmUpdate(this, update_); } else { Thread.sleep(wait); } } catch (InterruptedException e) { } finally { //仅在锁定时解锁 if (acquired) lock.unlock; } }}
如果运行代码清单4-5中的代码,你会发现它有时候还是不能解决死锁问题。你能看到“received confirm of update”,但它并不会一直出现,时有时无。
实际上,死锁问题并没有真正解决,因为如果线程取得了第一个锁(在propagateUpdate
中),它才会调用confirmUpdate
,并且在完成之前绝不会释放第一个锁。即使两个线程都能在彼此调用confirmUpdate
之前取得第一个线程锁,它们还是会产生死锁。
如果取得第二个锁的尝试失败,能真正解决问题的办法是让线程释放其持有的第一个锁,再次从头开始等待,从而使其他线程有机会得到完整的锁集合,能走完全程。代码如下所示。
代码清单4-6 修正死锁
public void propagateUpdate(Update upd_, MicroBlogNode backup_) { boolean acquired = false; boolean done = false; while (!done) { int wait = (int)(Math.random * 10); try { acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS); if (acquired) { System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent); //检查tryConfirmUpdate的返回值 done = backupNode_.tryConfirmUpdate(this, update_); } } catch (InterruptedException e) { } finally { if (acquired) lock.unlock; } //如果done为false,释放锁并等待 if (!done) try { Thread.sleep(wait); } catch (InterruptedException e) { } }}public boolean tryConfirmUpdate(MicroBlogNode other_, Update upd_) { boolean acquired = false; try { int wait = (int)(Math.random * 10); acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS); if (acquired) { long elapsed = System.currentTimeMillis - startTime; System.out.println(ident +": recvd confirm: "+ upd_.getUpdateText +" from "+other_.getIdent +" - took "+ elapsed +" millis"); return true; } } catch (InterruptedException e) { } finally { if (acquired) lock.unlock; } return false; }
这一版会检查tryConfirmUpdate
的返回码。如果为false
,最初的锁被释放。该线程会暂停一段时间,让其他线程有机会获取锁。
把这段代码运行几次,你会发现这两个线程基本上总能走完全程——死锁问题已经被你解决了。你也许想试验试验之前版本中那段代码的不同形式,诸如最原始的、有缺陷的或被改正的。通过对这些代码的演练,你能对锁机制有更深刻的理解,并且开始渐渐地凭直觉避免死锁问题的出现。
为什么那个有缺陷的版本有时候能奏效?
你已经看到了,死锁仍然存在,那是什么原因导致这个版本中的代码有时可以成功呢?代码中附加的复杂性是罪魁祸首。它影响JVM的线程调度器,让它变得更加难以预测。这意味着它有时候能让某个线程(通常是第一个)在其他线程运行之前进入
confirmUpdate
方法并取得第二个锁。这种情况也会发生在原始代码中,只是可能性更低罢了。
我们只是揭开了Lock
各种可能性的面纱——有很多种方法可以产生更加复杂的锁定结构。接下来我们就来讨论其中一个概念——锁存器。
4.3.3 CountDownLatch
CountDownLatch
是一种简单的同步模式,这种模式允许线程在通过同步屏障之前做些少量的准备工作。
为了达到这种效果,在构建新的CountDownLatch
实例时要给它提供一个int
值(计数器)。此外,还有两个用来控制锁存器的方法:countDown
和await
。前者对计数器减1,而后者让调用线程在计数器到0之前一直等待。如果计数器已经为0或更小,则它什么也不做。这个简单的机制使得这种所需准备最少的模式非常容易部署。
在下面的代码中,同一进程内的一组更新处理线程至少必须有一半线程正确初始化(假定更新处理线程的初始化要占用一定时间)之后,才能开始接受系统发送给它们中的任何一个线程的更新。
代码清单4-7 用锁存器辅助初始化
public static class ProcessingThread extends Thread { private final String ident; private final CountDownLatch latch; public ProcessingThread(String ident_, CountDownLatch cdl_) { ident = ident_; latch = cdl_; } public String getIdentifier { return identifier; } //节点初始化 public void initialize { latch.countDown; } public void run { initialize; }}final int quorum = 1 + (int)(MAX_THREADS / 2);final CountDownLatch cdl = new CountDownLatch(quorum);final Set<ProcessingThread> nodes = new HashSet<>;try { for (int i=0; i<MAX_THREADS; i++) { ProcessingThread local = new ProcessingThread("localhost:"+(9000 + i), cdl); nodes.add(local); local.start; } //达到quorum,开始发送更新 cdl.await;} catch (InterruptedException e) {} finally {}
这段代码把锁存器的值设置为quorum
。一旦被初始化的线程达到这个数量,就可以开始处理更新了。每个线程完成初始化后都会马上调用countDown
,所以主线程只需等待quorum
的到来,然后启动(并派发更新,尽管我们没给出那部分代码)。
我们接下来要讨论的是对多线程开发人员来说最有用的类之一:java.util.concurrent
中的ConcurrentHashMap
。
4.3.4 ConcurrentHashMap
ConcurrentHashMap
类是标准HashMap
的并发版本。它改进了Collections
类中提供的synchronizedMap
功能,因为那些方法返回的集合中包含的锁要比需要的多。
图4-7 HashMap
的经典视图
如图4-7所示,传统的HashMap
用hash函数来确定存放键/值对的“桶”,这是该类名字中“Hash”的由来。这意味着多线程处理可以更加简单直接——修改HashMap
时并不需要把整个结构都锁住,只要锁住即将修改的桶就行了。
提示 好的并发
HashMap
实现在读取时不用锁,写入时只需锁住要修改的桶。Java基本上能达到这个标准,但这里还有一些大多数开发人员都无需过多关注的底层细节。
ConcurrentHashMap
类还实现了ConcurrentMap
接口,有些提供了原子操作的新方法:
putIfAbsent
——如果还没有对应键,则把键/值对添加到HashMap
中。remove
——如果对应键存在,且值也与当前状态相等(equal
),则用原子方式移除键值对。replace
——API为HashMap
中原子替换的操作方法提供了两种不同的形式。
比如说,如果你把代码清单4-1中的私有final
域arrivalTime
的类型从HashMap
改成ConcurrentHashMap
,那就可以把synchronized
方法替换成常规的非同步访问。注意代码清单4-8中锁的缺失——根本就没有显式的同步。
代码清单4-8 使用ConcurrentHashMap
public class ExampleMicroBlogTimingNode implements SimpleMicroBlogNode { ... private final Map<Update, Long> arrivalTime = new ConcurrentHashMap <>; ... public void propagateUpdate(Update upd_) { arrivalTime.putIfAbsent(upd_, System.currentTimeMillis); } public boolean confirmUpdateReceived(Update upd_) { return arrivalTime.get(upd_) != null; }}
ConcurrentHashMap
是java.util.concurrent
包中最有用的类之一。它不仅提供了多线程的安全性,并且性能更优,在日常使用中没有严重的缺陷。接下来我们会讨论它的最佳拍档,用于List
的CopyOnWriteArrayList
。
4.3.5 CopyOnWriteArrayList
从名字就能看出来,CopyOnWriteArrayList是标准ArrayList的替代品。CopyOnWriteArrayList通过增加写时复制(copy-on-write)语义来实现线程安全性,也就是说修改列表的任何操作都会创建一个列表底层数组的新复本(如图4-8所示)。这就意味着所有成形的迭代器1都不用担心它们会碰到意料之外的修改。
1 迭代器(iterator)是一个对象,它的工作是遍历并选择序列中的对象,而客户端程序员不必知道或关心该序列底层的结构(也就是不同容器的类型)。——译者注
图4-8 写时复制数组
当快速、一致的数据快照(不同的读取器读到的数据偶尔可能会不一样)比完美的同步以及性能上的突破更重要时,这种共享数据的方法非常理想,并经常出现在非关键任务中。
我们来看一个写时复制的案例。假设有个微博的时间线更新,这是一个典型的非关键任务的例子。每个读取器的性能、自身一致性的快照要比全局的一致性更受欢迎。代码清单4-9表示每个用户时间线视图的持有者类。我们将会在代码清单4-10中用它来演示写时复制操作是如何进行的。
代码清单4-9 写时复制案例
public class MicroBlogTimeline { private final CopyOnWriteArrayList<Update> updates; private final ReentrantLock lock; private final String name; private Iterator<Update> it; //构造方法已省略 public void addUpdate(Update update_) { updates.add(update_); } //设置迭代器 public void prep { it = updates.iterator; } public void printTimeline { //需要在这里锁定 lock.lock; try { if (it != null) { System.out.print(name+ ": "); while (it.hasNext) { Update s = it.next; System.out.print(s+ ", "); } System.out.println; } } finally { lock.unlock; } }}
我们专门设计了这个类来阐明在写时复制语义下的迭代器行为。你需要在输出方法中锁定,以防止输出在两个线程间乱掉,此外你也能看到两个线程各自的状态。
你可以从下面的代码中调用MicroBlogTimeline
类。
代码清单4-10 揭示写时复制行为
final CountDownLatch firstLatch = new CountDownLatch(1);final CountDownLatch secondLatch = new CountDownLatch(1);final Update.Builder ub = new Update.Builder;//①设置初始状态final List<Update> l = new CopyOnWriteArrayList<>;l.add(ub.author(new Author("Ben")).updateText("I like pie").build);l.add(ub.author(new Author("Charles")).updateText( ➥ "I like ham on rye").build);ReentrantLock lock = new ReentrantLock;final MicroBlogTimeline tl1 = new MicroBlogTimeline("TL1", l, lock);final MicroBlogTimeline tl2 = new MicroBlogTimeline("TL2", l, lock);Thread t1 = new Thread { public void run { l.add(ub.author(new Author("Jeffrey")).updateText("I like a lot of things").build); tl1.prep; firstLatch.countDown; //用锁存器严格限制事件的顺序(1) try { secondLatch.await; } catch (InterruptedException e) { } tl1.printTimeline; }};Thread t2 = new Thread{ public void run{ try { //用锁存器严格限制事件的顺序(2) firstLatch.await; l.add(ub.author(new Author("Gavin")).updateText("I like otters").build); tl2.prep; //用锁存器严格限制事件的顺序(3) secondLatch.countDown; } catch (InterruptedException e) { } tl2.printTimeline; }};t1.start;t2.start;
这段代码里有很多辅助的测试代码。但也有很多值得注意的地方:
CountDownLatch
用来严格控制两个线程之间发生的事情。如果用普通的
List
代替CopyOnWriteArrayList
,结果会导致出现ConcurrentModificationException
异常。这也是在两个线程之间共享一个
Lock
对象以控制对共享资源(即STDOUT
)访问的例子。如果用块结构方式写这段代码,会显得更加杂乱。
这段代码的输出如下:
TL2: Update [author=Author [name=Ben], updateText=I like pie, createTime=0], Update [author=Author [name=Charles], updateText=I like ham on rye, createTime=0], Update [author=Author [name=Jeffrey], updateText=I like a lot of things, createTime=0], Update [author=Author [name=Gavin], updateText=I like otters, createTime=0],TL1: Update [author=Author [name=Ben], updateText=I like pie, createTime=0], Update [author=Author [name=Charles], updateText=I like ham on rye, createTime=0], Update [author=Author [name=Jeffrey], updateText=I like a lot of things, createTime=0],
第二行输出(标签为TL1
)漏掉了最后一个更新,就是提到了水獭的那个,尽管按锁存器的意思在列表被修改后tl1
2是可以访问的。这说明了tl1
中所包含的迭代器被tl2
复制,并且最后一个更新对tl1
是不可见的。这就是我们想要展示的写时复制特性。
2 原文为mbex1
,下文同。——译者注
CopyOnWriteArrayList
的性能使用
CopyOnWriteArrayList
类要比使用ConcurrentHashMap
多花点心思,它是HashMap
的即用型并发替代品。这是因为性能问题——写时复制特性意味着如果列表在被读取或遍历时做了修改,那就必须复制整个数组。也就是说如果对列表的修改次数跟读取次数相差不多,这种方式未必能达到较好的性能。但就像我们在第6章一再提到的那样,得到性能优异的代码的唯一可靠的方法就是测试,再测试,并衡量结果。
下一个在并发代码中常用的构件是java.util.concurrent
中的Queue
。它用于在线程之间切换工作元素,并且还是很多灵活可靠的多线程设计的基础。
4.3.6 Queue
队列是一个非常美妙的抽象概念。不,之所以这么说并不是因为我们生活在伦敦这个世界排队之都。为把处理资源分发给工作单位(或者把工作单元分配给处理资源,这取决于你看待问题的方式),队列提供了一种简单又可靠的方式。
Java中有些多线程编程模式在很大程度上都依赖于Queue
实现的线程安全性,所以很有必要充分认识它。Queue
接口被放在了java.util
包中,因为即便在单线程编程中它也是一个重要的模式,但我们的重点是多线程编程,并且假定你已经熟悉队列的基本用法了。
队列经常用来在线程之间传递工作单元,这个模式通常适合用Queue
最简单的并发扩展BlockingQueue
来实现。接下来我们就会重点介绍它。
1.BlockingQueue
BlockingQueue
还有两个特性。
- 在向队列中
put
时,如果队列已满,它会让放入线程等待队列腾出空间。 - 在从队列中
take
时,如果队列为空,会导致取出线程阻塞。
这两个特性非常有用,因为如果一个线程(或线程池)的能力超过了其他线程,比较快的线程就会被强制等待,因此可以对整个系统起到调节作用,如图4-9所示。
图4-9 BlockingQueue
BlockingQueue的两个实现
Java提供了
BlockingQueue
接口的两个基本实现:LinkedBlockingQueue
和ArrayBlockingQueue
。它们的特性稍有不同;比如说,在已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue
非常高效,而LinkedBlockingQueue
在某些情况下则会快一点儿。
2.使用工作单元
Queue
接口全都是泛型的——它们是Queue<E>
,BlockingQueue<E>
,等等依此类推。尽管看起来奇怪,但有时候利用这一点把工作项封装在一个人工容器类内却是明智之举。
比如说,你有一个表示工作单元的MyAwesomeClass
类,想要用多线程方式处理,与其用BlockingQueue<MyAwesomeClass>
不如用BlockingQueue<WorkUnit<MyAwesomeClass>>
。其中WorkUnit
(或QueueObject
,或随你怎么命名这个容器类)是像下面这样的包装接口或类:
public class WorkUnit<T> { private final T workUnit; public T getWork{ return workUnit; } public WorkUnit(T workUnit_) { workUnit = workUnit_; }}
有了这层间接引用,不用牺牲所包含类型(在此即MyAwesomeClass
)在概念上的完整性就可以在这里添加额外的元数据了。
这特别有用。能用上额外元数据的用例很多,下面举几个例子:
- 测试(比如展示一个对象的修改历史)
- 性能指标(比如到达时间或服务质量)
- 运行时系统信息(比如
MyAwesomeClass
实例是如何被排到队列中的)
以后再在这种间接引用里增加元数据可能会非常困难。如果你发现在某些情况下需要更多的元数据,那么要把它们加入到间接引用中可能需要大量的重构工作,而加在WorkUnit
类中就只是个简单的修改。
3.一个BlockingQueue
的例子
我们用一个简单的例子——等着看医生的宠物们——来看看如何使用BlockingQueue
。这个例子中有一个等着让医生给做检查的宠物集合。
代码清单4-11 在Java中对宠物建模
public abstract class Pet { protected final String name; public Pet(String name) { this.name = name; } public abstract void examine;}public class Cat extends Pet { public Cat(String name) { super(name); } public void examine{ System.out.println("Meow!"); }}public class Dog extends Pet public Dog(String name) { super(name); } public void examine{ System.out.println("Woof!"); }}public class Appointment<T> { private final T toBeSeen; public T getPatient{ return toBeSeen; } public Appointment(T incoming) { toBeSeen = incoming; }}
在这个简单的例子中,我们用LinkedBlockingQueue<Appointment<Pet>>
表示兽医的候诊队列,Appointment
起到了WorkUnit
的作用。
兽医对象是由一个队列和一个暂停时间构建的,其中队列是由一个代表接待员的对象提供的预约队列,暂停时间表示兽医在预约之间的停工时间。
我们可以在下面这段代码中建立兽医的模型。在线程运行时,它在一个无限循环中重复调用seePatient
。当然,现实世界中的兽医不可能这样,因为他们晚上和周末要回家,不能一直在办公室等着生病的小动物上门就医。
代码清单4-12 对兽医建模
public class Veterinarian extends Thread { protected final BlockingQueue<Appointment<Pet>> appts; protected String text = ""; protected final int restTime; private boolean shutdown = false; public Veterinarian(BlockingQueue<Appointment<Pet>> lbq, int pause) { appts = lbq; restTime = pause; } public synchronized void shutdown{ shutdown = true; } @Override public void run{ while (!shutdown) { seePatient; try { Thread.sleep(restTime); } catch (InterruptedException e) { shutdown = true; } } } public void seePatient { try { //阻塞take Appointment<Pet> ap = appts.take; Pet patient = ap.getPatient; patient.examine; } catch (InterruptedException e) { shutdown = true; } }}
在seePatient
方法中,线程会从队列中取出预约,并挨个检查对应的宠物,如果当前队列中没有预约等待,则会阻塞。
4.BlockingQueue
的细粒度控制
除了简单的take
和offer
API,BlockingQueue
还提供了另外一种与队列交互的方式,这种方式对队列的控制力度更大,但稍微有点复杂。这就是带有超时的放入或取出操作,它允许线程在遇到问题时可以从与队列的交互中退出来,转而做点儿其他的事情。
实际上,这个功能并不常用,但它偶尔能派上大用场,所以我们要介绍一下。下面的例子还是来自微博。
代码清单4-13 BlockingQueue行为的例子
public abstract class MicroBlogExampleThread extends Thread { protected final BlockingQueue<Update> updates; protected String text = ""; protected final int pauseTime; private boolean shutdown = false; public MicroBlogExampleThread(BlockingQueue<Update> lbq_, int pause_) { updates = lbq_; pauseTime = pause_; } //使线程可以彻底地结束(1) public synchronized void shutdown{ shutdown = true; } @Override public void run{ while (!shutdown){//使线程可以彻底地结束(2) doAction; try { Thread.sleep(pauseTime); } catch (InterruptedException e) { //使线程可以彻底地结束(3) shutdown = true; } }}//由子类实现具体动作public abstract void doAction;}final Update.Builder ub = new Update.Builder;final BlockingQueue<Update> lbq = new LinkedBlockingQueue<>(100);MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq,10) { public void doAction{ text = text + "X"; Update u = ub.author(new Author("Tallulah")).updateText(text).build; boolean handed = false; try { handed = updates.offer(u,100,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } if (!handed) System.out.println("Unable to hand off Update to Queue due to timeout"); }};MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 1000) { public void doAction{ Update u = null; try { u = updates.take; } catch (InterruptedException e) { return; } }};t1.start;t2.start;
运行这段代码展示了填充队列的速度有多么快,也表明供给线程的速度超过了提取线程的速度。很快,“Unable to hand off Update to Queue due to timeout”消息就出现了。
这是“相连线程池”中的一种典型的极端状况,当上游的线程池比下游的快,这种情况就会发生。“相连线程池”可能会引发一些问题,比如会导致LinkedBlockingQueue
溢出。另外,如果消费者比生产者多,队列会因此而经常空着。好在Java 7在BlockingQueue
上有了解决办法——TransferQueue
。
5.TransferQueue——Java 7中的新贵
Java 7引入了TransferQueue
。它本质上是多了一项transfer
操作的BlockingQueue
。如果接收线程处于等待状态,该操作会马上把工作项传给它。否则就会阻塞直到取走工作项的线程出现。你可以把这看做“挂号信”选项,即正在处理工作项的线程在交付当前工作项之前不会开始其他工作项的处理工作。这样系统就可以调控上游线程池获取新工作项的速度。
用限定大小的阻塞队列也能达到这种调控效果,但TransferQueue
接口更灵活。此外,用TransferQueue
取代BlockingQueue
的代码性能表现可能会更好。这是因为编写TransferQueue
的实现时已经将现代编译器和处理器的特性考虑在内,执行起来效率更高。聊了这么久性能,不能空口无凭,必须给出测量结果并能证明才行。另外你也应该意识到,Java 7只给出了TransferQueue
的一种实现形式——链表版。
在下面的例子中,你会发现用TransferQueue
代替BlockingQueue
是多么简单。只要对清单4-13中的代码做些简单修改,就可以升级成TransferQueue
,请看这里。
代码清单4-14 用TransferQueue代替BlockingQueue
public abstract class MicroBlogExampleThread extends Thread { protected final TransferQueue<Update> updates; ... public MicroBlogExampleThread(TransferQueue<Update> lbq_, int pause_) { updates = lbq_; pauseTime = pause_; } ...}final TransferQueue<Update> lbq = new LinkedTransferQueue<Update>(100);MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) { public void doAction{ ... try { handed = updates.tryTransfer(u, 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } ... }};
到此为止,用来开发多线程应用的主要构件我们都见识过了。接下来该把它们整合到驱动并发代码的引擎(执行器框架)上了。用它们可以对任务进行调度和控制,可以组合高效的并发流处理工作项,从而构建大型多线程应用程序。