首页 » Java程序员修炼之道 » Java程序员修炼之道全文在线阅读

《Java程序员修炼之道》10.6 Clojure并发

关灯直达底部

Java的状态模型从根本上来说是基于对象可变思想的。正如第4章中所提及的,这会直接导致并发代码的安全问题。在某一线程修改对象的状态时,为了防止其他线程看到对象的中间(即不一致)状态,需要引入相当复杂的锁策略。这些策略理解难,调试难,测试更难。

Clojure的并发概念在某些方面不像Java中那么底层。比如说,由Clojure运行时管理线程池的使用(开发人员在这方面几乎或根本不能控制)看起来可能有点奇怪。但是让平台(此处即Clojure运行时)细致地做好内务工作的好处在于,开发人员可以专注于更重要的任务,比如总体设计。

Clojure的指导思想是默认把线程彼此隔开,这种实现并发安全的办法由来已久。假定“没有共享资源”的基线和采用不可变值使Clojure避开了很多Java所面临的问题,从而可以专注于为并发编程安全地共享状态的方法。

注意 为了帮助提升安全性,Clojure的运行时提供了线程协调机制,我们强烈建议你使用这些机制,而不是用Java的惯例或构造自己的并发结构。

实际上,Clojure用不同的方法实现了不同的并发模型:未来式(future)、并行调用(pcall)、引用形式(ref)和代理(agent)。且听我们一一道来,先从最简单的开始。

10.6.1 未来式与并行调用

第一个也是最明显的一个状态分享办法就是不分享。实际上,我们一直使用的Clojure结构var本质上是不可以共享的。如果两个不同的线程继承了名字相同的var,并在线程里重新绑定了它,那绑定只在这些线程内部可见,绝不可能被其他线程共享。

可以利用Clojure跟Java的紧密结合启动新线程,也就是说在Clojure中写Java并发代码非常容易。但其中有些抽象在Clojure中有更干净的形式。比如对于第4章介绍的Java 未来式(Future),Clojure提供了非常干净的方式。代码清单10-8是个简单的例子。

代码清单10-8 Clojure中的Future

user=> (def simple-future  (future (do    (println /"Line 0/")    (Thread/sleep 10000)    (println /"Line 1/")    (Thread/sleep 10000)    (println /"Line 2/"))))#/'user/simple-futureLine 0  //马上开始执行user=> (future-done? simple-future)user=> falseLine 1user=> @simple-future //解引用导致阻塞Line 2niluser=>    

这段代码用(future)建立了一个Future。创建之后它马上就开始在后台线程中运行,所以在Clojure REPL中看到了输出Line 0(然后是Line 1)——代码已经开始在另一个线程上运行了。接着可以用(future-done?)来检查代码是否已经运行完,这个调用是非阻塞的。然而对future的解引用会阻塞调用线程,直到函数完成。

这实际上是Clojure对Java Future的一个瘦封装,语法更干净。Clojure还提供了对并发程序员非常有帮助的辅助形式。有个简单的函数是(pcalls),可以接受数量可变的零参函数,让它们并发执行。它们在运行时管理的线程池上执行,并返回一个懒序列结果。试图访问序列中的任何还没完成的元素会导致访问线程被阻塞。

代码清单10-9建立了一个单参函数(wait-with-for)。它用了一个类似10.3.2节介绍过的loop形式。可以用它创建一些零参函数(wait-1)(wait-2)等,并把它们传给(pcalls)

代码清单10-9 Clojure中的并行调用

user=> (defn wait-with-for [limit]  (let [counter 1]    (loop [ctr counter]    (Thread/sleep 500)    (println (str /"Ctr=/" ctr))    (if (< ctr limit)        (recur (inc ctr))        ctr))))#/'user/wait-with-foruser=> (defn wait-1  (wait-with-for 1))user=> #/'user/wait-1user=> (defn wait-2  (wait-with-for 2))user=> #/'user/wait-2user=> (defn wait-3  (wait-with-for 3))user=> #/'user/wait-3user=> (def wait-seq (pcalls wait-1 wait-2 wait-3))#/'user/wait-seqCtr=1Ctr=1Ctr=1Ctr=2Ctr=2Ctr=3user=> (first wait-seq)1user=> (first (next wait-seq))2  

因为线程睡眠值只有500毫秒,等待函数很快就能完成。通过调整超时(比如延迟到10秒),很容易验证由(pcalls)返回的懒序列wait-seq是否有上面描述的那种阻塞行为。

对于不需要共享状态的情况,这种简单的多线程结构挺好,但在很多应用中,不同的处理线程都要在运行过程中相互通信。Clojure有几个模型可以处理这种情况,接下来我们先看看其中的一个:借助(ref)形式实现的状态共享。

10.6.2 ref形式

ref是Clojure在线程间共享状态的办法。它们基于运行时提供的一个模型,在这个模型中,状态的改变要能被多个线程见到。该模型在符号和值之间引入了一个额外的中间层。也就是说,符号绑定到值的引用上,而不是直接绑到值上。这个系统基本上是事务化的,并且由Clojure运行时进行协调。如图10-6所示。

图10-6 软件事务内存

这一中间层意味着改变或更新ref之前必须把它放在一个事务中。当事务完成的时候,或者全变了,或者什么也没变。这跟数据库中的事务是类似的。

这可能有点抽象了,所以我们来看一个模拟ATM的例子。在Java中,要对所有敏感数据加锁保护。代码清单10-10是一个简单的自动提款机模型,包括锁。

代码清单10-10 Java中的ATM模型

public class Account {    private double balance = 0;    private final String name;    private final Lock lock = new ReentrantLock;    public Account(String name_, double initialBal_){        name = name_;        balance = initialBal_;    }    public synchronized double getBalance{        return balance;    }    public synchronized void debit(double debitAmt_) {        balance -= debitAmt_;    }    public String getName {        return name;    }    public String toString {        return /"Account [balance=/" + balance + /", name=/" + name + /"]/";    }    public Lock getLock {        return lock;    }}public class Debitter implements Runnable {    private final Account acc;    private final CountDownLatch cdl;    public Debitter(Account account_, CountDownLatch cdl_) {        acc = account_;        cdl = cdl_;    }    public void run {        double bal = acc.getBalance;        Lock lk = acc.getLock;        while (bal > 0) {            try {                Thread.sleep(1);            } catch (InterruptedException e) { }            lk.lock; //能在acc上同步            bal = acc.getBalance; //必须重新取得余额            if (bal > 0) {                acc.debit(1);                bal--;            }            lk.unlock;        }        cdl.countDown;    }}Account myAcc = new Account(/"Test Account/", 500 * NUM_THREADS);CountDownLatch stopl = new CountDownLatch(NUM_THREADS);for (int i=0; i<NUM_THREADS; i++) {    new Thread(new Debitter(myAcc, stopl)).start;}stopl.await;System.out.println(myAcc);  

再来看看用Clojure怎么写。先来个单线程版本。然后我们再开发一个并发版本跟单线程版本比较,这样并发代码应该更容易理解。

代码清单10-11是单线程版本。

代码清单10-11 Clojure中的简单ATM模型

(defn make-new-acc [account-name opening-balance]    {:name account-name :bal opening-balance})(defn loop-and-debit [account]    (loop [acc account]      (let [balance (:bal acc) my-name (:name acc)]          (Thread/sleep 1)          (if (&gt; balance 0)            (recur (make-new-acc my-name (dec balance))) //用循环/递归代替Java中的while            acc     ))))(loop-and-debit (make-new-acc /"Ben/" 5000))  

这段代码跟Java版比起来非常紧凑。必须承认,这是单线程的,但还是比Java的代码少了很多。运行代码会得到期望的结果:一个余额为0的acc映射。现在我们看看并发形式。

要让这段代码并行,需要引入ref。它们是用(ref)形式创建的,并且类型为clojure.lang.Ref的JVM对象。通常建立时会带一个保存状态的映射,此外还需要(dosync)形式来设置事务。在事务之内,还要用到(alter)形式来修改ref。使用ref的多线程ATM函数如代码清单10-12所示。

代码清单10-12 多线程ATM

(defn make-new-acc [account-name opening-balance]    (ref {:name account-name :bal opening-balance}))(defn alter-acc [acc new-name new-balance]    (assoc acc :bal new-balance :name new-name)) //必须返回值,而不是引用(defn loop-and-debit [account]    (loop [acc account]      (let [balance (:bal @acc)             my-name (:name @acc)]         (Thread/sleep 1)         (if (> balance 0)           (recur (dosync (alter acc alter-acc my-name (dec balance)) acc))           acc     ))))(def my-acc (make-new-acc /"Ben/" 5000))(defn my-loop  (let [the-acc my-acc]    (loop-and-debit the-acc)))(pcalls my-loop my-loop my-loop my-loop my-loop)  

就像注释中说的,对值进行操作的(alter-acc)函数必须返回一个值。所操作的值是对当前事务中线程可见的本地值,这称为事务内的值。返回的值是在变更函数返回之后的ref新值。在退出 (dosync)所定义的事务块之前,这个值对外界是不可见的。

与此同时,其他事务可能像这个一样也在进行。如果是这样,Clojure STM系统会进行跟踪,并且只允许那些自开始以来已经提交过的事务组成的事务提交。如果不一致,它会回滚,并且可能在得到更新过的状态后再次尝试。

如果事务做了任何会产生副作用的事情(比如日志文件或其他输出),这个重试行为可能会引发问题。让事务化部分在函数式编程中(即没有副作用)尽可能地保持简单纯粹是你的责任。

对于某些多线程方式而言,这种持乐观态度的事务行为看起来可能是相当重量级的做法。有些并发应用只需偶尔在线程间进行通信,并且是以相当不对称的风格。幸运的是,Clojure提供了另外一种更好地体现“过后就忘“原则的并发机制,这也是我们下一节的主题。

10.6.3 代理

代理是Clojure中异步的、面向消息的并发原语。Clojure代理不是共享状态,而是属于另外一个线程的一点儿状态,但它会从另外一个线程中接收消息(以函数的形式)。这乍看起来可能是个奇怪的想法,尽管遇到过Scala的actor之后这种感觉可能会少一点。

“我离它们太远了,只能把礼物装进包裹寄给它们,”她想,“这也太滑稽了,给自己的双脚送礼物还需要邮寄!地址写起来就更有趣了!”

——《爱丽丝梦游仙境》,刘易斯•卡罗尔

应用到代理上的函数在代理的线程上运行。这个线程是由Clojure运行时管理的,在一个程序员通常无法访问的线程池里。运行时还会保证代理中那些可以被外界看到的值是孤立的和原子的。这就是说用户代码只会见到状态修改之前或之后的代理值。

代码清单10-13是个简单的代理例子,跟用来讨论future的例子类似。

代码清单10-13 Clojure代理

(defn wait-and-log [coll str-to-add]    (do (Thread/sleep 10000)        (let [my-coll (conj coll str-to-add)]            (Thread/sleep 10000)            (conj my-coll str-to-add))))(def str-coll (agent ))(send str-coll wait-and-log /"foo/")@str-coll  

send调用派发了一个(wait-and-log)调用给代理,通过使用REPL解引用,结果就像承诺的那样,你绝不会看到代理的中间状态——只有最后的状态出现了(字符串/"foo/"被添加了两次)。

实际上,代码清单10-13上的(send)调用很容易让人联想到爱丽丝的脚的地址。刘易斯•卡罗尔很可能是用Clojure代码写的地址:

爱丽丝的右脚收    壁炉前的毛毯上      靠近挡板      (带去爱丽丝的爱)  

在你认为一个人的脚是身体的有机组成时,这的确挺怪异的。同样,发消息给Clojure管理的线程池中一个线程上的代理看起来也挺怪异的,两个线程还共享一个地址空间。但你目前多次遇到的一个并发主题就是如果它能让用法更加简单清晰,额外的复杂性可能是件好事。