我们在前面的讨论中一直把工作任务当成抽象的单元。然而有个细节需要注意,我们一直没有提到的是这些单元要比Thread
小——它们提供的方法把计算任务包含在一个工作单元中,无需为每个单元启动新的线程。这样处理多线程代码通常效率更高,因为免除了为每个单元启动Thread
的开销。执行代码的线程是重用的,处理完一个任务后会继续处理新的工作单元。
虽然复杂一些,但你可以实现线程池、工人与管理者模式和执行者等开发人员最常用的模式。我们接下来要密切关注可以对任务(Callable
、Future
和FutureTask
)和执行者建模的类和接口,特别是ScheduledThreadPoolExecutor
。
4.4.1 任务建模
我们的终极目标是不用为调度每个任务或工作单元而启动新线程。归根结底,就是要把它们做成可以调用(通常由执行者调用)的代码,而不是直接可运行的线程。
我们来看对任务建模的三种办法——Callable
和Future
接口以及FutureTask
类。
1.Callable
接口
Callable
接口是一个非常常见的概念,代表了一段可以调用并返回结果的代码。尽管这种做法很直接,但实际上它的作用微妙而又强大,用它可以创建出一些特别实用的模式。
Callable
的典型用法是匿名实现类。这段代码的最后一行把s
赋值为out.toString
:
final MyObject out = getSampleObject;Callable<String> cb = new Callable<String> { public String call throws Exception { return out.toString; }};String s = cb.call;
可以把Callable
的匿名实现类当做对单一抽象方法call
的递延调用,该实现必须提供这个方法。
Callable
是SAM类型(“单一抽象方法”的缩写,有时会这样称呼它)的示例——这是Java 7把函数作为一等类型最可行的办法。在后续章节讨论非Java语言时还会遇到它们,那时我们还会进一步讨论把函数作为值或一等类型的概念。
2.Future
接口
Future
接口用来表示异步任务,是还没有完成的任务给出的未来结果。我们在第2章介绍NIO.2和异步I/O时提过。
下面是Future
中的主要方法。
get
——用来获取结果。如果结果还没准备好,get
会被阻塞直到它能取得结果。还有一个可以设置超时的版本,这个版本永远不会阻塞。cancel
——在运算结束前取消。isDone
——调用者用它来判断运算是否结束。
下面这段代码(找素数)展示了Future
的用法:
Future<Long> fut = getNthPrime(1_000_000_000);Long result = null;while (result == null) { try { result = fut.get(60, TimeUnit.SECONDS); } catch (TimeoutException tox) { } System.out.println(/"Still not found the billionth prime!/");}System.out.println(/"Found it: /"+ result.longValue);
在这段代码中,你应该想象一下返回Future
的getNthPrime
在某个后台线程或多个线程上运行的情景,也有可能是在执行者框架上运行。即便使用先进的硬件,这种运算可能也需要很长时间——你最后还是要用Future
的cancel
方法。
3.FutureTask
类
FutureTask
是Future
接口的常用实现类,它也实现了Runnable
接口。这意味着FutureTask
可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future
和Runnable
接口的组合:get
、cancel
、isDone
、isCancelled
和run
,最后一个方法通常都是由执行者调用,你基本不需要直接调用它。
FutureTask
还提供了两个很方便的构造器:一个以Callable
为参数,另一个以Runnable
为参数。这些类之间的关联表明对于任务建模的办法非常灵活,允许你基于FutureTask
的Runnable
特性(因为它实现了Runnable
接口),把任务写成Callable
,然后封装进一个由执行者调度并在必要时可以取消的FutureTask
。
4.4.2 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
(以下简称STPE)是线程池类中的重中之重——它功能多样,广受欢迎。STPE接收任务,并把它们安排给线程池里的线程。
- 线程池的大小可以预定义,也可自适应。
- 所安排的任务可以定期执行,也可只运行一次。
- STPE扩展了
ThreadPoolExecutor
类(很相似,但不具备定期调度能力)。
和java.util.concurrent
中的工具类相结合的STPE线程池是大中型多线程应用程序最常见的模式之一,这些工具类包括我们在前面已经见过的ConcurrentHashMap
、CopyOnWriteArrayList
和BlockingQueue
等。
STPE不过是通过Executors
类的工厂方法轻易获取的众多执行者之一。使用这些工厂方法很方便,开发人员通过它们可以轻易获取典型配置,需要时还可以开放完整的接口方法。
下面的代码是一个定期读取的例子。这是newScheduledThreadPool
的常见用法:msgReader
对象被安排poll
一个队列,从队列中的WorkUnit
对象里取得工作项,然后输出。
代码清单4-15 STPE定期读取
private ScheduledExecutorService stpe;private ScheduledFuture<?> hndl;//取消时需要private BlockingQueue<WorkUnit<String>> lbq = new LinkedBlockingQueue<>;private void run{ stpe = Executors.newScheduledThreadPool(2);//执行者的工厂方法 final Runnable msgReader = new Runnable{ public void run{ String nextMsg = lbq.poll.getWork; if (nextMsg != null) System.out.println(/"Msg recvd: /"+ nextMsg); } }; hndl = stpe.scheduleAtFixedRate(msgReader, 10, 10, TimeUnit.MILLISECONDS);}public void cancel { final ScheduledFuture<?> myHndl = hndl; stpe.schedule(new Runnable { public void run { myHndl.cancel(true); }//取消时需要 }, 10, TimeUnit.MILLISECONDS);}
在这个例子中,STPE每隔10毫秒就唤醒一个线程,让它尝试poll
一个队列。如果读取返回null
(因为队列当前为空),则什么也不会发生,线程回去继续睡大觉。如果收到了一个工作单元,则线程会输出该工作单元的内容。
用Callable调用的代表性问题
形式简单的
Callable
、FutureTask
及相关类存在几个问题——尤其在涉及类型系统时。要明白这一点,可以想想怎么才能满足一个未知方法可能出现的所有方法签名。Callable
只能用于没有参数的方法。要满足所有可能性,你需要Callable
的不同变体。在Java中,你可以通过指定模型系统内的方法签名来解决这个问题。但你在本书第三部分会见到,动态语言不能用这种静态视图来约束。我们将会返回来重点讨论这种类型系统之间的不匹配。现在你只要注意到,虽然
Callable
很有用,但要用它构建一个通用框架来对线程执行进行建模还是有点儿限制得太死了。
现在我们要转向Java 7重点突出的框架之一——用于轻量级并发的分支/合并(fork/join)框架。这个框架比我们在本节中见到的执行者在处理并发问题方面更加高效,要达到这点绝非易事。