首页 » Java程序员修炼之道 » Java程序员修炼之道全文在线阅读

《Java程序员修炼之道》4.5 分支/合并框架

关灯直达底部

就像第6章要讨论的一样,处理器的速度(或者更准确地说是CPU上的晶体管数量)最近几年增长迅猛。由此产生的副作用就是处理器等待I/O操作变成了家常便饭。这表明我们能够更好地利用计算机的处理能力。分支/合并框架就可以解决这个问题——这也是Java 7对并发领域新做出的最大贡献。

分支/合并框架完全是为了实现线程池中任务的自动调度,并且这种调度对用户来说是透明的。为了达到这种效果,必须按用户指定的方式对任务进行分解。在很多应用程序中,对于分支/合并框架来说都可以很自然地把其中的任务分成“小型”和“大型”任务。

我们来快速浏览一些与分支/合并框架相关的重要事实和基本原理。

  • 引入了一种新的执行者服务,称为ForkJoinPool
  • ForkJoinPool服务处理一种比线程“更小”的并发单元ForkJoinTask
  • ForkJoinTask是一种由ForkJoinPool以更轻量化的方式所调度的抽象。
  • 通常使用两种任务(尽管两种都表示为ForkJoinTask实例):
    • “小型”任务是那种无需处理器耗费太多时间就可以直接执行的任务。
    • “大型”任务是那种需要在直接执行前进行分解(还可能不止一次)的任务。
  • 提供了支持大型任务分解的基本方法,它还有自动调度和重新调度的能力。

这个框架的关键特性之一就是这些轻量的任务都能够生成新的ForkJoinTask实例,而这些实例将仍由执行它们父任务的线程池来安排调度。这就是分而治之。

我们会通过一个简单的例子告诉你如何使用分支/合并框架,然后简短介绍“工作窃取”这个特性,最后讨论一下那些适用于并行处理技术的特性。使用分支/合并框架最好的办法就是从例子入手。

4.5.1 一个简单的分支/合并例子

我们为说明分支/合并框架而设定了这样的应用场景:有一个数组,里面存放不同时间到达的微博更新,我们想按到达时间对它们排序,以便为用户生成时间线,就像在代码清单4-9中生成的那个一样。

我们会用MergeSort的变体实现多线程排序。代码清单4-16中用到了ForkJoinTask的特定子类RecursiveAction。因为它明显可以独立完成任务(对这些更新的排序能当即完成),而且具备递归处理能力(递归特别适合做排序),所以用RecursiveAction会比用通用的ForkJoinTask更简单。

MicroBlogUpdateSorter类用Update对象的compareTo方法对更新列表排序。compute方法(超类RecursiveAction中的抽象方法,必须实现)基本上是按创建时间对微博更新数组排序。

代码清单4-16 用RecursiveAction排序

public class MicroBlogUpdateSorter extends RecursiveAction {  private static final int SMALL_ENOUGH = 32;// 串行排序项只有32个或更少  private final Update updates;  private final int start, end;  private final Update result;  public MicroBlogUpdateSorter(Update updates_) {    this(updates_, 0, updates_.length);  }  public MicroBlogUpdateSorter(Update upds_, int startPos_, int endPos_) {    start = startPos_;    end = endPos_;    updates = upds_;    result = new Update[updates.length];  }  private void merge(MicroBlogUpdateSorter left_, MicroBlogUpdateSorter right_) {    int i = 0;    int lCt = 0;    int rCt = 0;    while (lCt < left_.size && rCt < right_.size) {      result[i++] = (left_.result[lCt].compareTo(right_.result[rCt]) < 0)        ? left_.result[lCt++]        : right_.result[rCt++];    }    while (lCt < left_.size) result[i++] = left_.result[lCt++];    while (rCt < right_.size) result[i++] = right_.result[rCt++];  }  public int size {    return end - start;  }  public Update getResult {    return result;  }  //RecursiveAction中声明的方法  @Override  protected void compute {    if (size < SMALL_ENOUGH) {      System.arraycopy(updates, start, result, 0, size);      Arrays.sort(result, 0, size);    } else {      int mid = size / 2;      MicroBlogUpdateSorter left = new MicroBlogUpdateSorter(updates, start, start + mid);      MicroBlogUpdateSorter right = new MicroBlogUpdateSorter(updates, start + mid, end);      invokeAll(left, right);      merge(left, right)    }  }}  

要使用这个排序器,你可以用下面这样的代码驱动它,生成一些包含由X组成的字符串的更新,并打乱它们的顺序,之后再传给排序器。最终得到重新排序后的更新。

代码清单4-17 使用递归排序器

List<Update> lu = new ArrayList<Update>;String text = /"/";final Update.Builder ub = new Update.Builder;final Author a = new Author(/"Tallulah/");for (int i=0; i<256; i++) {  text = text + /"X/";  long now = System.currentTimeMillis;  lu.add(ub.author(a).updateText(text).createTime(now).build);  try {    Thread.sleep(1);  } catch (InterruptedException e) {  }}Collections.shuffle(lu);Update updates = lu.toArray(new Update[0]);//传入空数组,省掉空间分配MicroBlogUpdateSorter sorter = new MicroBlogUpdateSorter(updates);ForkJoinPool pool = new ForkJoinPool(4);pool.invoke(sorter);for (Update u: sorter.getResult) {  System.out.println(u);}  

TimSort

随着Java 7的到来,默认的数组排序算法已经变了。以前是以QuickSort的形式,但到了Java 7时代则变成了“TimSort”——MergeSort和插入排序的混合体。TimSort最初是Tim Peters为Python开发的,而且从2.3版(2002)开始就是Python中的默认排序算法了。

如果想看看TimSort在Java 7中存在的证据,可以给清单4-16中的代码传入一个null数组。对数组排序时,由于数组尺寸太小,会调用Array.sort方法,该方法会抛出空指针异常,在输出的异常信息里就能看到TimSort类。

4.5.2 ForkJoinTask与工作窃取

ForkJoinTaskRecursiveAction的超类。它是从动作中返回结果的泛型类型,所以RecursiveAction扩展了ForkJoinTask<Void>。这使得ForkJoinTask非常适合用MapReduce1 方式返回数据集中归结出的结果。

1 MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(化简)函数,用来保证所有映射的键值对中的每一个元素都共享相同的键组。——译者注

ForkJoinTasksForkJoinPool调度安排,ForkJoinPool是专为这些轻量任务设计的新型执行者服务。该服务维护每个线程的任务列表,并且当某个任务完成时,它能把挂在满负荷线程上的任务重新安排到空闲线程上去。

采用这种“工作窃取”的算法是为了解决大小不同的任务所导致的调度问题。大小不同的任务所需的运行时间通常也会有很大差别。比如说,某个线程的运行队列中都是小任务,而另外一个全是大任务。如果小任务的运行速度比大任务快五倍,只处理小任务的线程很可能在处理大任务的线程完成之前就处于空闲状态了。

Java 7实现的工作窃取机制精准地解决了这个问题,并且在分支/合并框架工作的整个生命周期中使线程池中的所有线程都有用武之地。工作窃取完全是自动的,你什么也不用做就能享受到它带来的好处。不需要手工干预,而是由运行环境承担更多工作帮助开发人员管理并发,这在Java 7中已经不是什么新鲜事了。

4.5.3 并行问题

分支/合并框架的确对我们的帮助很大,但在实际中,并不是每个问题都能像4.5.1节中那样轻易地简化成多线程MergeSort

这里是一些可以用分支/合并方法解决的问题:

  • 模拟大量简单对象的运动,比如粒子效果;
  • 日志文件分析;
  • 从输入中计数的数据操作,比如mapreduce操作。

从另一个角度来说,图4-10中这个被分解的问题正是分支/合并框架可以解决的。

图4-10 分支与合并

用下面这个列表检查问题及其子任务是一个切实有效的方法,它可以确定是否能用分支/合并来解决这个问题。

  • 问题的子任务是否无需与其他子任务有显式的协作或同步也可以工作?

  • 子任务是不是不会对数据进行修改,只是经过计算得出些结果(它们是不是函数程序员称为“纯粹的”函数的函数)?

  • 对于子任务来说,分而治之是不是很自然的事?子任务是不是会创建更多的子任务,而且它们要比派生出它们的任务粒度更细?

对于前面这些提问,如果答案是肯定的,或者“大体如此,但有临界情况”,那你的问题很可能适合用分支/合并的方式解决。反过来,如果答案是“可能吧”或者“算不上”,你就会发现分支/合并帮不上什么忙,可能用其他的同步方式更合适。

注意 前面的检查列表是测试某个问题(比如在Hadoop和NoSQL数据库中常见的那种)能否很好地用分支/合并方式解决的有效方法。

想设计出优秀的多线程算法并不容易,分支/合并方法也不能面面俱到。在适用的领域,它的用处很广。其实归根结底,你必须要确定你的问题是否适合这个框架,如果不适合,你只能在性能卓越的java.util.concurrent工具箱上构建自己的解决方案。

在下一节中,我们会详细讨论经常被误解的Java内存模型(Java Memory Model,JMM)。很多Java程序员都知道JMM,并且在没有经过正式介绍的情况下按自己的理解写代码。如果你觉得这是在说你,那么接下来的内容会帮助你重新认识JMM,并且帮你打下扎实的基础。JMM这个话题相当有深度,所以如果你急着进入下一章,可以先跳过它。