第1章 简介
1.1 并发简史
茶壶和面包机的生产商都很清楚:用户通常会采用异步方式来使用他们的产品,因此当这些机器完成任务时都会发出声音提示。
1.2 线程的优势
线程能够将大部分的异步工作流转换成串行工作流,因此能更好地模拟人类的工作方式和交互方式。
线程还可以简化JVM的实现,垃圾收集器通常在一个或多个专门的线程中运行。
因此,操作系统提供了一些高效的方法来实现多路I/O,例如Unix的select和poll等系统调用,要调用这些方法,Java类库需要获得一组实现非阻塞I/O的包(java.nio)。
1.3 线程带来的风险
安全性的含义是“永远不发生糟糕的事情”,而活跃性则关注于另一个目标,即“某件正确的事情最终会发生”。当某个操作无法继续执行下去时,就会发生活跃性问题。在串行程序中,活跃性问题的形式之一就是无意中造成的无线循环,从而使循环之后的代码无法得到执行。线程将带来其他一些活跃性问题。例如,如果线程A在等待线程B释放其持有的资源,而线程B永远都不释放该资源,那么A就会永久地等待下去。
在多线程程序中,当线程调度器临时挂起活跃线程并转而运行另一个线程时,就会频繁地出现上下文切换操作(Context Switch),这种操作将带来极大的开销:保存和恢复执行上下文,丢失局部性,并且CPU时间将更多地花在线程调度而不是线程运行上。
1.4 线程无处不在
每个Java应用程序都会使用线程。当JVM启动时,它将为JVM的内部任务(例如,垃圾收集、终结操作等)创建后台线程,并创建一个主线程来运行main方法。AWT (Abstract Window Toolkit)和Swing的用户界面框架将创建线程来管理用户界面事件。Timer将创建线程来执行延迟任务。一些组件框架,例如Servlet和RMI,都会创建线程池并调用这些线程中的方法。
当某个框架在应用程序中引入并发性时,通常不可能将并发性仅局限于框架代码,因为框架本身会回调(Callback)应用程序的代码,而这些代码将访问应用程序的状态。同样,对线程安全性的需求也不能局限于被调用的代码,而是要延伸到需要访问这些代码所访问的程序状态的所有代码路径。
框架通过在框架线程中调用应用程序代码将并发性引入到程序中。在代码中将不可避免地访问应用程序状态,因此所有访问这些状态的代码路径都必须是线程安全的。
第2章 线程安全性
从非正式的意义上来说,对象的状态是指存储在状态变量(例如实例或静态域)中的数据。对象的状态可能包括其他依赖对象的域。例如,某个HashMap的状态不仅存储在HashMap对象本身,还存储在许多Map.Entry对象中。在对象的状态中包含了任何可能影响其外部可见行为的数据。
访问某个变量的代码越少,就越容易确保对变量的所有访问都实现正确同步,同时也更容易找出变量在哪些条件下被访问。Java语言并没有强制要求将状态都封装在类中,开发人员完全可以将状态保存在某个公开的域(甚至公开的静态域)中,或者提供一个对内部对象的公开引用。然而,程序状态的封装性越好,就越容易实现程序的线程安全性,并且代码的维护人员也越容易保持这种方式。
2.1 什么是线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
与大多数Servlet相同,StatelessFactorizer是无状态的:它既不包含任何域,也不包含任何对其他类中域的引用。计算过程中的临时状态仅存在于线程栈上的局部变量中,并且只能由正在执行的线程访问。访问StatelessFactorizer的线程不会影响另一个访问同一个StatelessFactorizer的线程的计算结果,因为这两个线程并没有共享状态,就好像它们都在访问不同的实例。由于线程访问无状态对象的行为并不会影响其他线程中操作的正确性,因此无状态对象是线程安全的。【方法内的局部变量是线程独有的,所以不会出问题。】
2.2 竞态条件
最常见的竞态条件类型就是“先检查后执行(Check-Then-Act)”操作,即通过一个可能失效的观测结果来决定下一步的动作。
当你迈出前门时,你在星巴克A的观察结果将变得无效,你的朋友可能从后门进来了,而你却不知道。这种观察结果的失效就是大多数竞态条件的本质——基于一种可能失效的观察结果来做出判断或者执行某个计算。
当在无状态的类中添加一个状态时,如果该状态完全由线程安全的对象来管理,那么这个类仍然是线程安全的。然而,当状态变量的数量由一个变为多个时,并不会像状态变量数量为零个变为一个那样简单。
2.3 加锁机制
UnsafeCachingFactorizer的不变性条件之一是:在lastFactors中缓存的因数之积应该等于在lastNumber中缓存的数值。只有确保了这个不变性条件不被破坏,上面的Servlet才是正确的。【当在不变性条件中涉及多个变量时,各个变量之间并不是彼此独立的,而是某个变量的值会对其他变量的值产生约束。】因此,当更新某一个变量时,需要在同一个原子操作中队其他变量同时进行更新。【要保持状态的一致性,就需要在单个原子操作中更新所有相关的状态变量。】
每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁(Intrinsic Lock)或监视锁(Monitor Lock)。
以关键字synchronized来修饰的方法就是一种横跨整个方法体的同步代码块,其中该同步代码块的锁就是方法调用所在的对象。静态的synchronized方法以Class对象作为锁。
“重入”意味着获取锁的操作的粒度是“线程”,而不是“调用”。
子类改写了父类的synchronized方法,然后调用父类中的方法,此时如果没有可重入的锁,那么这段代码将产生死锁。由于Widget和LoggingWidget中doSomething方法都是synchronized方法,因此每个doSomething方法在执行前都会获取Widget上的锁。【这意味着,调用子类重写的synchronized方法,会同时锁住父类对象和子类对象?】【不对,只有一个对象,只有一个锁,这个对象叫Widget也好,叫LoggingWidget也好,还是只有一个对象,也就是只有一个锁】
2.5 活跃性与性能
当访问状态变量或者在复合操作的执行期间,CachedFactorizer需要持有锁,但在执行时间较长的因数分解运算之前要释放锁。这样既确保了线程安全性,也不会过多地影像并发性,而且在每个同步代码块中的代码路径都“足够短”。
当执行时间较长的计算或者可能无法快速完成的操作时(例如,网络I/O或控制台I/O),一定不要持有锁。
第3章 对象的共享
同步还有另一个重要的方面:内存可见性(Memory Visibility)。我们不仅希望防止某个线程正在使用对象状态而另一个线程在同时修改该状态,而且希望确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。如果没有同步,那么这种情况就无法实现。
3.1 可见性
NoVisibility可能会持续循环下去,因为读线程可能永远都看不到ready的值。一种更奇怪的现象是,NoVisibility可能会输出0,因为读线程可能看到了写入ready的值,但却没有看到之后写入number的值,这种现象被称为“重排序(Reordering)”。
当主线程首先写入number,然后在没有同步的情况下写入ready,那么读线程看到的顺序可能与写入的顺序完全相反。
加锁的含义不仅仅局限于互斥行为,还包括内存可见性。为了确保所有线程都能看到共享变量的最新值,所有执行读操作或者写操作的线程都必须在同一个锁上同步。
当把变量声明为volatile类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总会返回最新写入的值。
然而,我们并不建议过度依赖volatile变量提供的可见性。如果在代码中依赖volatile变量来控制状态的可见性,通常比使用锁的代码更脆弱,也更难以理解。
如果在验证正确性时需要对可见性进行复杂的判断,那么就不要使用volatile变量。volatile变量的正确使用方式包括:确保它们自身状态的可见性,确保它们所引用对象的状态的可见性,【以及标识一些重要的程序生命周期事件的发生(例如,初始化或关闭】。
volatile变量的一种典型用法:检查某个状态标记以判断是否退出循环。volatile变量通常用做某个操作完成、发生中断或者状态的标志。
加锁机制既可以确保可见性又可以确保原子性,而volatile变量只能确保可见性。
当且仅当满足以下所有条件时,才应该使用volatile变量:
对变量的写入操作不依赖变量的当前值,或者你能确保只有单个线程更新变量的值。
该变量不会与其他状态变量一起纳入不变性条件中。
在访问变量时不需要加锁。
3.2 发布与逸出
“发布(Publish)”一个对象的意思是指,使对象能够在当前作用域之外的代码中使用。例如,将一个指向该对象的引用保存到其他代码可以访问的地方,或者在某一个非私有的方法中返回该引用,或者将引用传递到其他类的方法中。
发布内部状态可能会破坏封装性,并使得程序难以维持不变性条件。例如,如果在对象构造完成之前就发布该对象,就会破坏线程安全性。当某个不应该发布的对象被发布时,这种情况就被称为逸出(Escape)。
3.
class UnsafeStates { private String[] states = new String[] { "AK", "AL" ... }; public String[] getStates() { return states; } }
如果按照上述方式来发布states,就会出现问题,因为任何调用者都能够修改这个数组的内容。在这个示例中,数组states已经逸出了它所在的作用域,因为这个本应是私有的变量已经被发布了。
4. 假定有一个类C,对于C来说,“外部(Alien)方法”是指行为并不完全由C来规定的方法,包括其他类中定义的方法以及类C中可以被改写的方法(既不是私有[private]方法也不是终结[final]方法)。
5. 最后一种发布对象或其内部状态的机制就是发布一个内部的类实例。
public class ThisEscape { public ThisEscape(EventSource source) { source.registerListener( new EventListener() { public void onEvent(Event e) { doSomething(e); }});}}
当ThisEscape发布EventListener时,也隐含地发布了ThisEscape实例本身,因为在这个内部类的实例中包含了对ThisEscape实例的隐含引用。
6. 当且仅当对象的构造函数返回时,对象才处于可预测的和一致的状态。因此,当从对象的构造函数中发布对象时,只是发布了一个尚未构造完成的对象。即使发布对象的语句位于构造函数的最后一行也是如此。如果this引用在构造过程中逸出,那么这种现象就被认为是不正确构造。
7. 在构造过程中使this引用逸出的一个常见错误是,在构造函数中启动一个线程。当对象在其构造函数中创建一个线程时,无论是显式创建(通过将它传给构造函数)还是隐式创建(由于Thread或Runnable是该对象的一个内部类),this应用都会被新创建的线程共享。在对象尚未完全构造之前,新的线程就可以看见它。
8. 在构造函数中调用一个可改写的实例方法时,同样会导致this引用在构造过程中逸出。
9. 如果想在构造函数中注册一个事件监听器或启动线程,那么可以使用一个私有的构造函数和一个公共的工厂方法(Factory Method),从而避免不正确的构造过程。
public class SafeListener { private final EventListener listener; private SafeListener() { listener = new EventListener() { public void onEvent(Event e) { doSomething(e); } }; } public static SafeListener newInstance(EventSource source) { SafeListener safe = new SafeListener(); source.registerListener(safe.listener); return safe; } }
3.3 线程封闭
线程封闭技术的另一种常见应用是JDBC(Java Database Connectivity)的Connection对象。JDBC规范并不要求Connection对象必须是线程安全的。在典型的服务器应用程序中,线程从连接池中获得一个Connection对象,并且用该对象来处理请求,使用完后再将对象返还给连接池。由于大多数请求(例如Servlet请求或EJB调用等)都是由单个线程采用同步的方式来处理,并且在Connection对象返回之前,连接池不会再将它分配给其他线程,因此,这种连接管理模式在处理请求时隐含地将Connection对象封闭在线程中。
Java语言及其核心库提供了一些机制来帮助维持线程封闭性,例如局部变量和ThreadLocal类,但即便如此,程序员仍然需要负责确保封闭在线程中的对象不会从线程中逸出。
在volatile变量上存在一种特殊的线程封闭。只要你能确保只有单个线程对共享的volatile变量执行写入操作,那么就可以安全地在这些共享的volatile变量上执行“读取-修改-写入”的操作。在这种情况下,相当于将修改操作封闭在单个线程中以防止发生竞态条件,并且volatile变量的可见性保证还确保了其他线程能看到最新的值。
Ad-hoc线程封闭是非常脆弱的,因为没有任何一种语言特性,例如可见性修饰符或局部变量,能将对象封闭到目标线程中。事实上,对线程封闭对象的引用通常保存在公有变量中。由于Ad-hoc线程封闭技术的脆弱性,因此在程序中尽量少用它,在可能的情况下,应该使用更强的线程封闭技术(例如,栈封闭或ThreadLocal类)。
局部变量的固有属性之一就是封闭在执行线程中。它们位于执行线程的栈中,其他线程无法访问这个栈。
由于任何方法都无法获得对基本类型的引用,因此Java语言的这种语义就确保了基本类型的局部变量始终封闭在线程内。
然而,要小心的是,只有编写代码的开发人员才知道哪些对象需要被封闭到执行线程中,以及被封闭的对象是否是线程安全的。如果没有明确地说明这些需求,那么后续的维护人员很容易错误地使对象逸出。
3.3.3 ThreadLocal类
维持线程封闭性的一种更规范方法是使用ThreadLocal,这个类能使线程中的某个值与保存值得对象关联起来。ThreadLocal提供了get和set等访问接口或方法,这些方法为每个使用该变量的线程都存有一份独立的副本,因此get总是返回由当前执行线程在调用set时设置的最新值。
当某个频繁执行的操作需要一个临时对象,例如一个缓冲区,而同时又希望避免在每次执行时都重新分配该临时对象,就可以使用这项技术。
从概念上看,你可以将ThreadLocal<T>视为包含了Map<Thread, T>对象,其中保存了特定于该线程的值,但ThreadLocal的实现并非如此。这些特定于线程的值保存在Thread对象中,当线程终止后,这些值会作为垃圾回收。
假设你需要将一个单线程应用程序移植到多线程环境中,通过将共享的全局变量转换为ThreadLocal对象(如果全局变量的语义允许),可以维持线程安全性。然而,如果将应用程序范围内的缓存转换为线程局部的缓存,就不会有太大作用。
在实现应用程序框架时大量使用了ThreadLocal。例如,在EJB调用期间,J2EE容器需要将一个事务上下文(Transaction Context)与某个执行中的线程关联起来。通过将事物上下文保存在静态的ThreadLocal对象中,可以很容易地实现这个功能:当框架代码需要判断当前运行的是哪一个事务时,只需从这个ThreadLocal对象中读取事务上下文。这种机制很方便,因为它避免了在调用每个方法时都要传递执行上下文信息,然而这也将使用该机制的代码与框架耦合在一起。
开发人员经常滥用ThreadLocal,例如将所有全局变量都作为ThreadLocal对象,或者作为一种“隐藏”方法参数的手段。ThreadLocal变量类似于全局变量,它能降低代码的可重用性,并在类之间引入隐含的耦合性,因此在使用时要格外小心。
3.4 不变性
虽然在Java语言规范和Java内存模型中都没有给出不可变性的正式定义,但不可变性并不等于将对象中所有的域都声明为final类型,即使对象中所有的域都是final类型的,这个对象也仍然是可变的,因为在final类型的域中可以保存对可变对象的引用。
当满足以下条件时,对象才是不可变的:
对象创建以后其状态就不能修改。
对象的所有域都是final类型。
对象是正确创建的(在对象的创建期间,this引用没有逸出)。
尽管保存姓名的Set对象是可变的,但从ThreeStooges的设计中可以看到,在Set对象构造完成后无法对其进行修改。stooges是一个final类型的引用变量,因此所有的对象状态都通过一个final域来访问。最后一个要求是“正确地构造对象”,这个要求很容易满足,因为构造函数能使该引用由除了构造函数及其调用者之外的代码来访问。
正如“除非需要更高的可见性,否则应该将所有的域都声明为私有域”是一个良好的编程习惯,“除非需要某个域是可变的,否则应将其声明为final域”也是一个良好的编程习惯。
每当需要对一组相关数据以原子方式执行某个操作时,就可以考虑创建一个不可变的类来包含这些数据。
对于访问和更新多个相关变量时出现的竞争条件问题,可以通过将这些变量全部保存在一个不可变对象中来消除。
如果要更新这些变量,那么可以创建一个新的容器对象,但其他使用原有对象的线程仍然会看到对象处于一致的状态。
当一个线程将volatile类型的cache设置为引用一个新的OneValueCache时,其他线程就会立刻看到新缓存的数据。
通过使用包含多个状态变量的容器对象来维持不变性条件,并使用一个volatile类型的引用来确保可见性,使得Volatile Cached Factorizer在没有显式地使用锁的情况下仍然是线程安全的。
3.5 安全发布
由于不可变对象是一种非常重要的对象,因此Java内存模型为不可变对象的共享提供了一种特殊的初始化安全性保证。
为了维持这种初始化安全性的保证,必须满足不可变性的所有需求:状态不可修改,所有域都是final类型,以及正确的构造过程。
任何线程都可以在不需要额外同步的情况下安全地访问不可变对象,即使在发布这些对象时没有使用同步。
要安全地发布一个对象,对象的引用以及对象的状态必须同时对其他线程可见。一个正确构造的对象可以通过以下方式来安全地发布:
在静态初始化函数中初始化一个对象引用。
将对象的引用保存到volatile类型的域或者AtomicReference对象中。
将对象的引用保存到某个正确构造对象的final类型域中。
将对象的引用保存到一个由锁保护的域中。(在线程安全容器内部的同步意味着,在将对象放入到某个容器,例如Vector或synchronizedList时,将满足这最后一条):
通过将一个键或者值放入HashTable、synchronizedMap或者ConcurrentMap中,可以安全地将它发布给任何从这些容器中访问它的线程(无论是直接访问还是通过迭代器访问)。
通过将某个元素放入Vector、CopyOnWriteArrayList、CopyOnWriteArraySet、synchronizedList或synchronizedSet中,可以将该元素安全地发布到任何从这些容器中访问该元素的线程。
通过将某个元素放入BlockingQueue或者ConcurrentLinkedQueue中,可以将该元素安全地发布到任何从这些队列中访问该元素的线程。
通常,要发布一个静态构造的对象,最简单和最安全的方式是使用静态的初始化器:
public static Holder holder = new Holder(42);
第4章 对象的组合
4.1 设计线程安全的类
在设计线程安全类的过程中,需要包含以下三个基本要素:
找出构成对象状态的所有变量。
找出约束状态变量的不变性条件。
建立对象状态的并发访问管理策略。
如果在对象的域中引用了其他对象,那么该对象的状态将包含被引用对象的域。例如,LinkedList的状态就包括该链表中所有节点对象的状态。
同步策略规定了如何将不可变性、线程封闭与加锁机制等结合起来以维护线程的安全性,并且还规定了哪些变量由哪些锁来保护。要确保开发人员可以对这个类进行分析与维护,就必须将同步策略写为正式文档。
同样,在操作中还会包含一些后验条件来判断状态迁移是否是有效的。如果Counter的当前状态为17,那么下一个有效状态只能是18。当下一个状态需要依赖当前状态时,这个操作就必须是一个复合操作。
要想实现某个等待先验条件为真时才执行的操作,一种更简单的方法是通过现有库中的类(例如阻塞队列[Blocking Queue]或信号量[Semaphore])来实现依赖状态的行为。
如果分配并填充了一个HashMap对象,那么就相当于创建了多个对象:HashMap对象,在HashMap对象中包含的多个对象,以及在Map.Entry中可能包含的内部对象。HashMap对象的逻辑状态包括所有的Map.Entry对象以及内部对象,即使这些对象都是一些独立的对象。
状态变量的所有者将决定采用何种加锁协议来维持变量状态的完整性。
然而,如果发布了某个可变对象的引用,那么就不再拥有独占的控制权,最多是“共享控制权”。对于从构造函数或者从方法中传递进来的对象,类通常并不拥有这些对象,除非这些方法是被专门设计为转移传递进来的对象的所有权(例如,同步容器封装器的工厂方法)。
容器类通常表现出一种“所有权分离”的形式,其中容器类拥有其自身的状态,而客户代码则拥有容器中各个对象的状态。Servlet框架中的ServletContext就是其中一个示例。ServletContext为Servlet提供了类似于Map形式的对象容器服务,在ServletContext中可以通过名称来注册(setAttribute)或获取(getAttribute)应用程序对象。
4.2 实例封闭
当一个对象被封装到另一个对象中时,能够访问被封装对象的所有代码路径都是已知的。与对象可以由整个程序访问的情况相比,更易于对代码进行分析。通过将封闭机制与合适的加锁策略结合起来,可以确保以线程安全的方式来使用非线程安全的对象。
将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。
对象可以封闭在类的一个实例(例如作为类的一个私有成员)中,或者封闭在某个作用域内(例如作为一个局部变量),再或者封闭在线程内(例如在某个线程中奖对象从一个方法传递到另一个方法,而不是在多个线程之间共享该对象)。【当然,对象本身不会逸出——出现逸出情况的原因通常是由于开发人员在发布对象时超出了对象既定的作用域】。
PersonSet的状态由HashSet来管理的,而HashSet并非线程安全的。但由于mySet是私有的并且不会逸出,因此HashSet被封闭在PersonSet中。唯一能访问mySet的代码路径是addPerson与containsPerson,在执行它们时都要获得PersonSet上的锁。PersonSet的状态完全由它的内置锁保护,因而PersonSet是一个线程安全的类。
一些基本的容器类并非线程安全的,例如ArrayList和HashMap,但类库提供了包装器工厂方法(例如Collections.synchronizedList及其类似方法),使得这些非线程安全的类可以在多线程环境中安全地使用。这些工厂方法通过“装饰器(Decorator)”模式将容器类封装在一个同步的包装器对象中,而包装器将接口中的每个方法都实现为同步方法,并将调用请求转发到底层的容器对象上。只要包装器对象拥有对底层对象的唯一引用(即把底层容器对象封闭在包装器中),那么它就是线程安全的。在这些方法的Javadoc中指出,对底层容器对象的所有访问必须通过包装器来进行。
由于deepCopy是从一个synchronized方法中调用的,因此在执行时间较长的复制操作中,tracker的内置锁将一直被占有,当有大量车辆需要追踪时,会严重降低用户界面的响应灵敏度。
4.3 线程安全性的委托
4.3.1 示例:基于委托的车辆追踪器
如果使用最初的MutablePoint类而不是Point类,就会破坏封装性,因为getLocations会发布一个指向可变状态的引用,而这个引用不是线程安全的。
在使用监视器模式的车辆追踪器中返回的是车辆位置的快照,而在使用委托的车辆追踪器中返回的是一个不可修改但却实时的车辆位置视图。这意味着,如果线程A调用getLocations,而线程B在随后修改了某些点的位置,那么在返回给线程A的Map中将反映出这些变化。
如果需要一个不发生变化的车辆视图,那么getLocations可以返回对locations这个Map对象的一个浅拷贝(Shallow Copy)。由于Map的内容是不可变的,因此只需复制Map的结构,而不用复制它的内容。
public Map<String, Point> getLocations() { return Collections.unmodifiableMap(new HashMap<String, Point>(locations)); }
4.3.2 独立的状态变量
我们还可以将线程安全性委托给多个状态变量,只要这些变量是彼此独立的,即组合而成的类并不会在其包含的多个状态变量上增加任何不变性条件。
VisualComponent使用CopyOnWriteArrayList来保存各个监听器列表。它是一个线程安全的链表,特别适用于管理监听器列表。每个链表都是线程安全的,此外,由于各个状态之间不存在耦合关系,因此VisualComponent可以将它的线程安全性委托给mouseListeners和keyListeners等对象。
4.3.3 当委托失效时
setLower和setUpper都是“先检查后执行”的操作,但它们没有使用足够的加锁机制来保证这些操作的原子性。
因此,虽然AtomicInteger是线程安全的,但经过组合得到的类却不是。由于状态变量lower和upper不是彼此独立的,因此NumberRange不能将线程安全性委托给它的线程安全状态变量。
如果一个类是由多个独立且线程安全的状态变量组成,并且在所有的操作中都不包括无效状态转换,那么可以将线程安全性委托给底层的状态变量。
4.3.4 发布底层的状态变量
当把线程安全性委托给某个对象的底层状态变量时,在什么条件下才可以发布这些变量从而使其他类能修改它们?答案仍然取决于在类中对这些变量施加了哪些不变性条件。
如果一个状态变量是线程安全的,并且没有任何不变性条件来约束它的值,在变量的操作上也不存在任何不允许的状态转换,那么就可以安全地发布这个变量。
例如,发布VisualComponent中的mouseListeners或keyListeners等变量就是安全的。由于VisualComponent并没有在其监听器链表的合法状态上施加任何约束,因此这些域可以声明为公有域或者发布,而不会破坏线程安全性。
4.3.5 示例:发布状态的车辆追踪器
PublishingVehicleTracker将其线程安全性委托给底层的ConcurrentHashMap,只是Map中的元素是线程安全的且可变的Point,而并非不可变的。getLocation方法返回底层Map对象的一个不可变副本。调用者不能增加或删除车辆,但却可以通过修改返回Map中的SafePoint值来改变车辆的位置。
PublishingVehicleTracker是线程安全的,但如果它在车辆位置的有效值上施加了任何约束,那么就不再是线程安全的。如果需要对车辆位置的变化进行·判断或者当位置变化时执行一些操作,那么PublishingVehicleTracker中采用的方法并不合适。
4.4 在现有的线程安全类中添加功能
“扩展”方法比直接将代码添加到类中更加脆弱,因为现在的同步策略实现被分布到多个单独维护的源代码文件中。如果底层的类改变了同步策略并选择了不同的锁来保护它的状态变量,那么子类会被破坏,因为在同步策略改变后它无法再使用正确的锁来控制对基类状态的并发访问。
问题在于在错误的锁上进行了同步。无论List使用哪一个锁来保护它的状态,可以确定的是,这个锁并不是ListHelper上的锁。ListHelper只是带来了同步的假象,尽管所有的链表操作都被声明为synchronized,但却使用了不同的锁,这意味着putIfAbsent相当于List的其他操作来说并不是原子的,因此就无法确保当putIfAbsent执行时另一个线程不会修改链表。
要想使这个方法能正确执行,必须使List在实现客户端加锁或外部加锁时使用同一个锁。
然而,客户端加锁却更加脆弱,因为它将类C的加锁代码放到与C完全无关的其他类中。当在那些并不承诺遵循加锁策略的类上使用客户端加锁时,要特别小心。
客户端加锁机制与扩展类机制有许多共同点,二者都是将派生类的行为与基类的实现耦合在一起。正如扩展会破坏实现的封装性,客户端加锁同样会破坏同步策略的封装性。
当为现有的类添加一个原子操作时,有一种更好的方法:组合(Composition)。ImprovedList通过将List对象的操作委托给底层的List实例来实现List的操作,同时还添加了一个原子的putIfAbsent方法。(与Collections.synchronizedList和其他容器封装器一样,ImprovedList假设把某个链表对象传给构造函数以后,客户代码不会再直接使用这个对象,而只能通过ImprovedList来访问它。)ImprovedList通过自身的内置锁增加了一层额外的加锁。它并不关心底层的List是否是线程安全的,即使List不是线程安全的或者修改了它的加锁实现,ImprovedList也会提供一致的加锁机制来实现线程安全性。
public class ImprovedList<T> implements List<T> { private final List<T> list; public ImprovedList(List<T> list) { this.list = list; } public synchronized boolean putIfAbsent(T x) { boolean contains = list.contains(x); if (contains) list.add(x); return !contains; } public synchronized void clear() { list.clear(); } // ...按照类似的方式委托List的其他方法 }
事实上,我们使用了Java监视器模式来封装现有的List,并且只要在类中拥有指向底层List的唯一外部引用,就能确保线程安全性。
4.5 将同步策略文档化
在设计同步策略时需要考虑多个方面,例如,将哪些变量声明为volatile类型,哪些变量用锁来保护,哪些锁保护哪些变量,哪些变量必须是不可变的或者被封闭在线程中,哪些操作必须是原子操作等。
如果使用锁来保护状态,那么也要将其写入文档以便日后维护,这很简单,只需使用标注@GuardedBy即可。
更糟糕的是,我们的直觉通常是错误的:我们认为“可能是线程安全”的类通常并不是线程安全的。例如,java.text.SimpleDateFormat并不是线程安全的,但JDK 1.4之前的Javadoc并没有提到这点。许多开发人员都对这个类不是线程安全的而感到惊讶。
如果某个类没有明确地声明是线程安全的,那么就不要假设它是线程安全的,从而有效地避免类似于SimpleDateFormat的问题。而另一方面,如果不对容器提供对象(例如HttpSession)的线程安全性做某种有问题的假设,也就不可能开发出一个基于Servlet的应用程序。不要使你的客户或同事也做这样的猜测。
许多Java技术规范都没有(或者至少不愿意)说明接口的线程安全性,例如ServletContext、HttpSession或DataSource。这些接口是由容器或数据库供应商来实现的,而你通常无法通过查看其实现代码来了解细节功能。此外,你也不希望依赖于某个特定JDBC驱动的实现细节——你希望遵从标准,这样代码可以基于任何一个JDBC驱动工作。
第5章 基础构建模块
5.1 同步容器类
同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”(检查在Map中是否存在键值K,如果没有,就加入二元组(K,V))。
在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是“及时失败”(fail-fast)的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。
这种“及时失败”的迭代器并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。它们采用的实现方式是,将计数器的变化与容器关联起来:如果在迭代期间计数器被修改,那么hasNext或next将抛出ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。
如果不希望在迭代期间对容器加锁,那么一种替代方法就是“克隆”容器,并在副本上进行迭代。由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就避免了抛出ConcurrentModificationException(在克隆过程中仍然需要对容器加锁)。在克隆容器时存在显著的性能开销。
虽然加锁可以防止迭代器抛出ConcurrentModificationException,但你必须要记住在所有对共享容器进行迭代的地方都需要加锁。实际情况要更加复杂,因为在某些情况下,迭代器会隐藏起来。
编译器将字符串的连接操作转换成调用StringBuilder.append(Object),而这个方法又会调用容器的toString方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。
容器的hashCode和equals等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样,containsAll、removeAll和retainAll等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出ConcurrentModificationException。
5.2 并发容器
在Java 5.0中增加了ConcurrentHashMap,用来替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。在新的ConcurrentMap接口中增加了对一些常见复合操作的支持,例如“若没有则添加”、替换以及有条件删除等。
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,直到队列中出现可用的空间。
ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁(Lock Striping)。
对于一些需要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。虽然这看上去有些令人不安,但事实上size和isEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总是在不断变化。因此,这些操作的需求被弱化了,以换取对其他更重要操作的性能优化,包括get、put、containsKey和remove等。
由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。但是,一些常见的复合操作,例如“若没有则添加”,“若相等则移除(Remove-If-Equal)”和“若相等则替换(Replace-If-Equal)”等,都已经实现为原子操作并且在ConcurrentMap的接口中声明。如果你需要在现有的同步Map中添加这样的功能,那么很可能就意味着应该考虑使用ConcurrentMap了。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。
“写入时复制”容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作带来的影响。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。
5.3 阻塞队列和生产者-消费者模式
生产者-消费者模式能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。
一种最常见的生产者-消费者设计模式就是线程池与工作队列组合,在Executor任务执行框架中就体现了这种模式。
如果生产者不能尽快地产生工作项使消费者保持忙碌,那么消费者就只能一直等待,直到有工作可做。在某些情况下,这种方式是非常合适的(例如,在服务器应用程序中,没有任何客户请求服务),而在其他一些情况下,这也表示需要调整生产者线程数量和消费者线程数量之间的比率,从而实现更高的资源利用率(例如,在“网页爬虫[Web Crawler]”或其他应用程序中,有无穷的工作需要完成)。
正如其他有序的容器一样,PriorityBlockingQueue既可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较。
因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
虽然这个示例使用了显式管理的线程,但许多生产者-消费者设计也可以通过Executor任务执行框架来实现,其本身也使用了生产者-消费者模式。
对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。
在转移所有权后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。这种安全的发布确保了对象状态对于新的所有者来说是可见的,并且由于最初的所有者不会再访问它,因此对象将被封闭在新的线程中。新的所有者线程可以对该对象做任意修改,因为它具有独占的访问权。
只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之间传递所有权。
正如阻塞队列适用于生产者-消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing)。在生产者-消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。
在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。
工作密取非常适用于既是消费者也是生产者问题——当执行某个工作时可能导致出现更多的工作。例如,在网页爬虫程序中处理一个页面时,通常会发现有更多的页面需要处理。类似的还有许多搜索图的算法,例如在垃圾回收阶段对堆进行标记,都可以通过工作密取机制来实现高校并行。当一个工作线程找到新的任务单元时,它会将其放到自己队列的末尾(或者在工作共享设计模式中,放入其他工作者线程的队列中)。当双端队列为空时,它会在另一个线程的队列队尾查找新的任务,从而确保每个线程都保持忙碌状态。
5.4 阻塞方法与中断方法
传递InterruptedException。避开这个异常通常是最明智的策略——只需把InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。
当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。
然而在出现InterruptedException时不应该做的事情是,捕获它但不做出任何响应。这将使调用栈上更高层的代码无法对中断采取处理措施,因为线程被中断的证据已经丢失。
当某方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。
5.5 同步工具类
同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高校地等待同步工具类进入到预期状态。
5.5.1 闭锁
二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。
确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
TestHarness创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示“起始门(Starting Gate)”和“结束门(Ending Gate)”。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的事就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) {} } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end-start; } }
所有线程都会在await()这里阻塞。nThreads个线程都start了之后,全部在startGate.await()这里阻塞。直到主线程执行完循环体,接着执行startGate.countDown()的,这nThreads个线程才通过开始门,然后开始执行task。此时,主线程在endGate.await()这里阻塞。
5. 为什么要在TestHarness中使用闭锁,而不是在线程创建后就立即启动?或许,我们希望测试n个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将“领先”后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够同时释放所有的工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。
5.5.2 FutureTask
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。
public class Preloader { private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); }}; private final Thread thread = new Thread(future); public void start() { thread.start(); } //通过提前调用start()启动FutureTask的Callable线程执行loadProductInfo()这个耗时的任务。不要等待需要ProductInfo时再调用该任务。 public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw launderThrowable(cause); } } }
Preloader创建了一个FutureTask,其中包含从数据库加载产品信息的任务,以及一个执行运算的线程。由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一个start方法来启动线程。当程序随后需要ProductInfo时,可以调用get方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完后再返回。
5.5.3 信号量
在这种实现中不包含真正的许可对象,并且Semaphore也不会将许可与线程关联起来,因此在一个线程中获得的许可可以在另一个线程中释放。可以将acquire操作视为是消费一个许可,而release操作是创建一个许可,Semaphore并不受限于它在创建时的初始许可数量。
我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。
底层的Set实现并不知道关于边界的任何信息,这是由BoundedHashSet来处理的。
5.5.4 栅栏
闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。
如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
在把模拟过程并行化时,为每个元素(在这个示例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。合理的做法是,将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。CellularAutomata将问题分解为N_cpu个子问题,其中N_cpu等于可用CPU的数量,并将每个子问题分配给一个线程。
当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。
数据交换的时机取决于应用程序的响应需求。最简单的方案是,当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一个方法是,不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持一定时间后,也进行交换。
5.6 构建高效且可伸缩的结果缓存
Memorizer2的问题在于,如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复这个计算。我们希望通过某种方法来表达“线程X正在计算f(27)”这种情况,这样当另一个线程查找f(27)时,它能够知道最高效的方法是等待线程X计算结束,然后再去查询缓存“f(27)的结果是多少?”
Memorizer3将用于缓存值的Map重新定义为ConcurrentHashMap<A,Future<V>>,替换原来的ConcurrentHashMap<A,V>。Memorizer3首先检查某个相应的计算是否已经开始(Memorizer2与之相反,它首先判断某个计算是否已经完成)。如果还没有启动,那么就创建一个FutureTask,并注册到Map中,然后启动计算:如果已经启动,那么等待现有计算的结果。结果可能很快会得到,也可能还在运算过程中,但这对于Future.get的调用者来说是透明的。
它只有一个缺陷,即仍然存在两个线程计算出相同值的漏洞。这个漏洞的发生概率要远小于Memorizer2中发生的概率,但由于compute方法中的if代码块仍然是非原子(nonatomic)的“先检查再执行”操作,因此两个线程仍有可能在同一时间内调用compute来计算相同的值,即二者都没有在缓存中找到期望的值,因此都开始计算。
Memorizer3中存在这个问题的原因是,复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来确保原子性。Memorizer使用了ConcurrentMap中的原子方法putIfAbsent,避免了Memorizer3的漏洞。
public class Memorizer<A, V> implements Computable<A, V> { private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memorizer(Computable<A, V> c) { this.c = c; } public V compute (final A arg) throws InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V> () { public V call() throws InterrruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, ft); // Returns: the previous value associated with the specified key, or null if there was no mapping for the key if (f == null) { f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(arg, f); // 5. 当缓存的是Future而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个计算被取消或者失败,那么在计算这个结果时将指明计算过程被取消或者失败。为了避免这种情况,如果Memorizer发现计算被取消,那么将把Future从缓存中移除。如果检测到RuntimeException,那么也会移除Future,这样将来的计算才可能成功。 } catch (ExecutionException e) { throw launderThrowable(e.getCause()); }}} }
在因式分解servlet中使用Memorizer来缓存结果
public class Factorizer implements Servlet { private final Computable<BigInteger, BigInteger[]> c = new Computable<BigInteger, BigInteger[]> () { public BigInteger[] compute(BigInteger arg) { return factor(arg); }}; private final Computable<BigInteger, BigInteger[]> cache = new Memorizer<BigInteger, BigInteger[]> (c); public void service(ServletRequest req, ServletResponse resp) { try { BigInteger i = extractFromRequest(req); encodeIntoResponse(resp, cache.compute(i)); } catch (InterruptedException e) { encodeError(resp, "factorization interrupted"); } } }
第6章 任务执行
6.1 在线程中执行任务
当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。
而且,当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。
大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。
任务处理过程从主线程分离出来,使得主循环能够更快地重新等待下一个到来的连接。
如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销。如果你已经拥有足够多的线程使所有的CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。
如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常,要想从这种错误中恢复过来是非常危险的,更简单的办法是通过构造程序来避免超出这些限制。
6.2 Executor框架
它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。
每当看到下面这种形式的代码时:
new Thread(runnable).start()
并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread。
newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。
newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
但JVM只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。
ExecutorService的生命周期有3种状态:运行、关闭和已终止。
shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
Timer的另一个问题是,如果TimerTask抛出一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当TimerTask抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。(这个问题称为“线程泄露”)
ScheduledThreadPoolExecutor能正确处理这些表现出错误行为的任务。在Java 5.0或更高的JDK中,将很少使用Timer。
如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。
6.3 找出可利用的并行性
Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
对于这些任务,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。
Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在“完成”状态上。
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。
ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable或Callable实例化一个FutureTask。(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法。)
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似地,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
6.3.3 示例:使用Future实现页面渲染器
为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,另一个是下载所有的图像。(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能。)
Callable和Future有助于表示这些协同任务之间的交互。FutureRenderer中创建了一个Callable来下载所有的图像,并将其提交到一个ExecutorService。这将返回一个描述任务执行情况的Future。当主任务需要图像时,它会等待Future.get的调用结果。如果幸运的话,当开始请求时所有图像就已经下载完成了,即使没有,至少图像的下载任务也已经提前开始了。
Callable<List<ImageData>> task = new Callable<List<ImageData>>() { public List<ImageData> call() { List<ImageData> result = new ArrayList<ImageData>(); ... return result; }}; Future<List<ImageData>> future = executor.submit(task);
执行有返回值的Callable时用submit,它可以将结果包装成一个Future。执行无返回值的Runnable时,就用execute。
6.3.4 在异构任务并行化中存在的局限
两个人可以很好地分担洗碗的工作:其中一个人负责清洗,而另一个人负责烘干。然而,要将不同类型的任务平均分配给每个工人却并不容易。当人数增加时,如何确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事情。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法带来的好处将减少。
FutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像的速度(可能性更大),那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
6.3.5 CompletionService: Executor与BlockingQueue
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时被封装为Future。
ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用FutureTask中的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。take和poll方法委托给了BlockingQueue,这些方法会在得到结果之前阻塞。
多个ExecutorCompletionService可以共享一个Executor,因此可以创建一个对于特定计算私有,又能共享一个公共Executor的ExecutorCompletionService。因此,CompletionService的作用就相当于一组计算的句柄,这与Future作为单个计算的句柄是非常类似的。
使用CompletionService,使页面元素在下载完成后立即显示出来
public class Renderer { private final ExecutorService executor; Renderer(ExecutorService executor) { this.executor = executor; } void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); // java.util.concurrent // Interface CompletionService<V> // All Known Implementing Classes: ExecutorCompletionService CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor); for (final ImageInfo imageInfo : info) completionService.submit(new Callable<ImageData>() { public ImageData call() { return imageInfo.downloadImage(); } }); renderText(source); try { for( int i=0, n=info.size(); i<n; i++) { Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
6.3.7 为任务设置时限
在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。
此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。
try { long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS); } catch (ExecutionException e) { ad = DEFAULT_AD; } catch (TimeoutException e) { ad = DEFAULT_AD; f.cancel(true); }
6.3.8 示例:旅行预订门户网站
创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法——invokeAll。
InvokeAll方法的参数为一组任务,并返回一组Future。
invokeAll按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。
当invokeAll返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get或isCancelled来判断究竟是何种情况。
第7章 取消与关闭
Java没有提供任何机制来安全地终止线程。但它提供了中断(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的当前工作。
这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。
一个在行为良好的软件与勉强运行的软件之间的最主要区别就是,行为良好的软件能很完善地处理失败、关闭和取消等过程。
7.1 任务取消
如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的(Cancellable)。
应用程序事件。例如,应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都将被取消。
错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的当前状态,以便稍后重新启动。
在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。
cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行。如果cancel没有被调用,那么搜索素数的线程将永远运行下去,不断消耗CPU的时钟周期,并使得JVM不能正常退出。
一个可取消的任务必须拥有取消策略(Cancellation Policy),在这个策略中将详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。
7.1.1 中断
然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。
当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用cancel方法来设置cancelled标志,但此时生产者却永远不能检查这个标志,因此它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。
在Java的API或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。
每个线程都有一个boolean类型的中断状态。
interrupt方法能中断目标线程,而isInterrupted方法能返回目标线程的中断状态。静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。
阻塞库方法,例如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。
调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。
如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。
通常,中断是实现取消的最合理方式。
在每次迭代循环中,有两个位置可以检测出中断:在阻塞的put方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的put方法,因此这里并不一定需要进行显式的检测,但执行检测却会使PrimeProducer对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。
7.1.2 中断策略
中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。
区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作线程,同时意味着“取消当前任务”和“关闭工作者线程”。
任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。
这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作出中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。
线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。
批评者曾嘲笑Java的中断功能,因为它没有提供抢占式中断机制,而且还强迫开发人员必须处理InterruptedException。然后,通过推迟中断请求的处理,开发人员能制定更灵活的中断策略,从而使应用程序在响应性和健壮性之间实现合理的平衡。
7.1.3 响应中断
由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。
对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获InterruptedException时恢复状态。
// 不可取消的任务在退出前恢复中断 public Task getNextTask(BlockingQueue<Taskgt> queue) { boolean interrupted = false; try { while(true) { try { return queue.take(); } catch (InterruptedException e) { interrupted = true; // 重新尝试 } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }
如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。
7.1.4 示例:计时运行
如果任务在超时之前完成,那么中断timedRun所在线程的取消任务将在timedRun返回到调用者之后启动。我们不知道在这种情况下将运行什么代码,但结果一定是不好的。
7.1.5 通过Future来实现取消
// 通过Future来取消任务 public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // 接下来任务将被取消 } catch (ExecutionException e) { // 如果在任务中抛出了异常,那么重新抛出该异常 throw launderThrowable(e.getCause()); } finally { // 如果任务已经结束,那么执行取消操作也不会带来任何影响 task.cancel(true); // 如果任务正在运行,那么将被中断 } }
7.1.6 处理不可中断的阻塞
在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。
Java.io包中的同步Socket I/O。在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
获取某个锁。如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
// 通过改写interrupt方法将非标准的取消操作封装在Thread中 public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while(true) { int count = in.read(buf); if(count<0) break; else if(count>0) processBuffer(buf, count); } } catch (IOException e) { /* 允许线程退出 */ } } }
7.2 停止基于线程的服务
与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法(Lifecycle Method)来关闭它自己以及它所拥有的线程。
7.2.1 示例:日志服务
产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(Multiple-Producer, Single-Consumer)的设计方式:每个调用log的操作都相当于一个生产者,而后台的日志线程则相当于消费者。
当取消一个生产者-消费者操作时,需要同时取消生产者和消费者。
为LogWriter提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。我们采用的方法是:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提交信息的权利。
// 向LogWriter添加可靠的取消操作 public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; @GuardedBy("this") private boolean isShutdown; @GuardedBy("this") private int reservations; public void start() { loggerThread.start(); } public void stop() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized(this) { if(isShutdown) throw new IllegalStateException(...); ++reservations; } queue.put(msg); } private class LoggerThread extends Thread { public void run() { try { while(true) { try { synchronized(LogService.this) { if (isShutdown && reservations == 0) break; } String msg = queue.take(); synchronized(LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { /* retry */ } } } finally { writer.close(); } } } }
7.2.2 关闭ExecutorService
在进行强制关闭时,shutdownNow首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。
LogService的一种变化形式,它将管理线程的工作委托给一个ExecutorService,而不是由其自行管理。通过封装ExecutorService,可以将所有权链(Ownership Chain)从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。
// 使用ExecutorService的日志服务 public class LogService { private final ExecutorService exec = newSingleThreadExecutor(); ... public void start() { } public void stop() throws InterruptedException { try { exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); } finally { writer.close(); } } public void log(String msg) { try { exec.execute(new WriteTask(msg)); } catch(RejectedExecutionException ignored) { } } }
7.2.4 示例:只执行一次的服务
// 使用私有的Executor,并且该Executor的生命周期受限于方法调用 boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNextMail = new AtomicBoolean(false); try { for (final String host : hosts) exec.execute(new Runnable() { public void run() { if (checkMail(host)) hasNextMail.set(true); } }); } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); }
7.2.5 shutdownNow的局限性
当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务。并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。shutdownNow返回的Runnable对象可能与提交给ExecutorService的Runnable对象并不相同:它们可能是被封装过的已提交任务。
通过封装ExecutorSerivce并使得execute(类似地还有submit)记录哪些任务是在关闭后取消的,TrackingExecutor可以找出哪些任务已经开始但还没有正常完成。
// 在ExecutorService中跟踪在关闭之后被取消的任务 public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); ... public List<Runnable> getCancelledTasks() { if (!exec.isTerminated()) throw new IllegalStateException(...); return new ArrayList<Runnable>(tasksCancelledAtShutdown); } public void execute(final Runnable runnable) { exec.execute(new Runnable() { public void run() { try { runnable.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) // 说明线程没有正常执行完 tasksCancelledAtShutdown.add(runnable); } } }); } // 将ExecutorService的其他方法委托给exec }
网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。
// 使用TrackingExecutorService来保存未完成的任务以备后续执行 public abstract class WebCrawler { private volatile TrackingExecutor exec; @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>(); ... private synchronized void start() { exec = new TrackingExecutor(Executors.newCachedThreadPool()); for(URL url : urlsToCrawl) submitCrawlTask(url); urlsToCrawl.clear(); } public synchronized void stop() throws InterruptedException { try { saveUncrawled(exec.shutdownNow()); // 保存已提交但还未开始的任务,shutdownNow的返回值 if (exec.awaitTermination(TIMEOUT, UNIT)) saveUncrawled(exec.getCancelledTasks()); // TrackingExecutor记录下的没执行完的runnable们 } finally { exec = null; } } protected abstract List<URL> processPage(URL url); // 迭代处理的意思 private void saveUncrawled(List<Runnable> uncrawled) { for(Runnable task : uncrawled) urlsToCrawl.add(((CrawlTask) task).getPage()); } private void submitCrawlTask(URL u) { exec.execute(new CrawlTask(u)); // TrackingExecutor在这里记录 } private class CrawlTask implements Runnable { private final URL url; ... public void run() { for(URL link : processPage(url)) { if(Thread.currentThread().isInterrupted()) return; submitCrawlTask(link); } } public URL getPage() { return url; } } }
在TrackingExecutor中存在一个不可避免的竞态条件,从而产生“误报”问题:一些被认为已取消的任务实际上已经执行完成。这个问题的原因在于,在任务执行最后一条指令以及线程池将任务记录为“结束”的两个时刻之间,线程池可能被关闭。如果任务是幂等的(Idempotent,即将任务执行两次与执行一次会得到相同的结果),那么这不会存在问题,在网页爬虫程序中就是这种情况。
7.3 处理非正常的线程终止
当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。
// UncaughtExceptionHandler接口 public interface UncaughtExceptionHandler { void uncaughtException(Thread t, Throwable e); }
要为线程池中的所有线程设置一个UncaughtExceptionHandler,需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory。(与所有的线程操控一样,只有线程的所有者能够改变线程的UncaughtExceptionHandler。)标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个try-finally代码块来接收通知,因此当线程结束时,将有新的线程来代替它。
令人困惑的是,只有通过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过submit提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将被Funture.get封装在ExecutionException中重新抛出。
7.4 JVM关闭
7.4.1 关闭钩子
在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。
在关闭应用程序线程时,如果有(守护或非守护)线程仍然在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器,然后再停止。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。
// 通过注册一个关闭钩子来停止日志服务 public void start() { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { LogService.this.stop(); } catch (InterruptedException ignored) {} } }); }
7.4.2 守护线程
有时候,你希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM的关闭。在这种情况下就需要使用守护线程(Daemon Thread)。
在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此默认情况下,主线程创建的所有线程都是普通线程。
普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。
我们应尽可能少地使用守护线程——很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务,那么将是一种危险的行为。守护线程最好用于执行“内部”任务,例如周期性地从内存的缓存中移出逾期的数据。
此外,守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期。
7.4.3 终结器
在大多数情况下,通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。
避免使用终结器。
小结
通过使用FutureTask和Executor框架,可以帮助我们构建可取消的任务和服务。
第8章 线程池的使用
8.1 在任务与执行策略之间的隐性耦合
只有当线程本地值得生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。
如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。
// 在单线程Executor中任务发生死锁(不要这么做) public class ThreadDeadlock { ExecutorService exec = Executors.newSingleThreadExecutor(); public class RenderPageTask implements Callable<String> { public String call() throws Exception { Future<String> header, footer; header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); String page = renderBody(); // 将发生死锁——由于任务在等待子任务的结果 return header.get() + page + footer.get(); } } } // 单线程Executor,现在在执行RenderPageTask,而它依赖的两个LoadFileTask却会一直在等待被Executor执行。
有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等。
8.2 设置线程池的大小
在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。
要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大的内存?任务是计算密集型、I/O密集型还是二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
对于计算密集型的任务,在拥有NcpuNcpu个处理器的系统上,当线程池的大小为Ncpu+1Ncpu+1时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费。)对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。
Ncpu=number of CPUsNcpu=number of CPUs
Ucpu=target CPU utilization,0≤Ucpu≤1Ucpu=target CPU utilization,0≤Ucpu≤1
WC=ratio of wait time to compute timeWC=ratio of wait time to compute time
要使处理器达到期望的使用率,线程池的最优大小等于:
Nthreads=Ncpu∗Ucpu∗(1+WC)Nthreads=Ncpu∗Ucpu∗(1+WC)
可以通过Runtime来获得CPU的数目:
int N_CPUS = Runtime.getRuntime().availableProcessors();
1
8.3 配置ThreadPoolExecutor
// ThreadPoolExecutor的通用构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
8.3.1 线程的创建与销毁
基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无线扩展,并且当需求降低时会自动收缩。
8.3.2 管理队列任务
newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略[Saturation Policy]可以解决这个问题。)
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。
只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。(这种性能差异是由于使用了SynchronousQueue而不是LinkedBlockingQueue。)
只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。
8.3.3 饱和策略
ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
“中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先级队列放在一起使用。)
“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。我们可以将WebServer示例修改为使用有界队列和“调用者运行”饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
// 创建一个固定大小的线程池,并采用有界队列以及“调用者运行”饱和策略 ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY)); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());
8.3.4 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
// ThreadFactory接口 public interface ThreadFactory { Thread newThread(Runnable r); }
// 自定义的线程工厂 public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName); } }
在MyAppThread中还可以定制其他行为,包括:为线程指定名字,设置自定义UncaughtExceptionHandler向Logger中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或者终止时把调试消息写入日志。
如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限,从而导致令人困惑的安全性异常。
8.3.5 在调用构造函数后再定制ThreadPoolExecutor
// 对通过标准工厂方法创建的Executor进行修改 ExecutorService exec = Executors.newCachedThreadPool(); if (exec instanceof ThreadPoolExecutor) ((ThreadPoolExecutor) exec).setCorePoolSize(10); else throw new AssertionError("Oops, bad assumption");
在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。
啦啦啦,让我们重温一下包装器模式:unconfigurableExecutorService
首先,它实际上返回的是DelegatedExecutorService,这才是真正的包装器类
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); }
接着我们就来看看漂亮的DelegatedExecutorService包装器吧!
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; // 被包装的实例啦 DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
8.4 扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,它提供了几个可以子类化中改写的方法:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。
在线程池完成关闭操作时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。
// 增加了日志和计时等功能的线程池 public class TimingThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); } protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s, end %s, time=%dns", t, r, taskTime)); } finally { super.afterExecute(r,t); } } protected void terminated() { try { log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }
8.5 递归算法的并行化
// 将串行执行转换为并行执行 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
如果需要提交一个任务集并等待它们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果。
一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。例如,sequentialRecursive用深度优先算法遍历一棵树,在每个节点上执行计算并将结果放入一个集合。
// 将串行递归转换为并行递归 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(new Runnable() { public void run() { results.add(n.compute()); } }); parallelRecursive(exec, n.getChildren(), results); } }
遍历过程仍然是串行的,只有compute调用才是并行执行的。
// 等待通过并行方式计算的结果 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
示例:谜题框架
我们将“谜题”定义为:包含了一个初始位置,一个目标位置,以及用于判断是否有效移动的规则集。规则集包含两部分:计算从指定位置开始的所有合法移动,以及每次移动的结果位置。
// 表示“搬箱子”之类谜题的抽象类 public interface Puzzle<P, M> { P initialPosition(); boolean isGoal(P position); Set<M> legalMoves(P position); P move(P position, M move); }
// 用于谜题解决框架的链表节点 @Immutable static class Node<P, M> { final P pos; final M move; final Node<P, M> prev; Node(P pos, M move, Node<P,M> prev) { ... } List<M> asMoveList() { List<M> solution = new LinkedList<M>(); for (Node<P,M> n = this; n.move!=null; n=n.prev) solution.add(0, n.move); // 添加到链表头 return solution; } }
// 串行的谜题解答器 public class SequentialPuzzleSolver<P,M> { private final Puzzle<P,M> puzzle; private final Set<P> seen = new HashSet<P>(); // 避免重复位置 public SequentialPuzzleSolver(Puzzle<P,M> puzzle) { this.puzzle = puzzle; } public List<M> solve() { P pos = puzzle.initialPosition(); return search(new Node<P,M>(pos, null, null)); } private List<M> search(Node<P,M> node) { if (!seen.contains(node.pos)) { seen.add(node.pos); if(puzzle.isGoal(node.pos)) return node.asMoveList(); for(M move : puzzle.legalMoves(node.pos)) { P pos = puzzle.move(node.pos, move); Node<P,M> child = new Node<P,M>(pos, move, node); List<M> result = search(child); // 深搜 if(result!=null) return result; } } return null; } static class Node<P,M> { ... } }
通过修改解决方案以利用并发性,可以以并行方式来计算下一步移动以及目标条件,因为计算某次移动的过程在很大程度上与计算其他移动的过程是相互独立的。(之所以说“在很大程度上”,是因为在各个任务之间会共享一些可变状态,例如已遍历位置的集合。)
// 并发的谜题解答器 public class ConcurrentPuzzleSolver<P,M> { private final Puzzle<P,M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P,Boolean> seen; final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P,M>>(); ... public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 阻塞直到找到解答 Node<P,M> solnNode = solution.getValue(); return (solnNode==null) ? null : solnNode.asMoveList(); } finally { exec.shutdown(); } } protected Runnable newTask(P p, M m, Node<P,M> n) { return new SolverTask(p, m, n); } class SolverTask extends Node<P,M> implements Runnable { ... public void run() { // putIfAbsent returns the previous value associated with the specified key, or null if there was no mapping for the key if (solution.isSet() || seen.putIfAbsent(pos, true)!=null) return; // 已经找到了解答或者已经遍历了这个位置 if (puzzle.isGoal(pos)) solution.setValue(this); else for (M m : puzzle.legalMoves(pos)) exec.execute( newTask(puzzle.move(pos,m), m, this)); } } }
为了避免无限循环,在串行版本中引入了一个Set对象,其中保存了之前已经搜索过的所有位置。在ConcurrentPuzzleSolver中使用ConcurrentHashMap来实现相同的功能。这种做法不仅提供了线程安全性,还避免了在更新共享集合时存在的竞态条件,因为putIfAbsent只有在之前没有遍历过的某个位置才会通过原子方式添加到集合中。ConcurrentPuzzleSolver使用线程池的内部工作队列而不是调用栈来保存搜索的状态。
串行版本的程序执行深度优先搜索,因此搜索过程将受限于栈的大小。并发版本的程序执行广度优先搜索,因此不会受到栈大小的限制(但如果待搜索的或者已搜索的位置集合大小超过了可用的内存总量,那么仍可能耗尽内存)。
为了在找到某个解答后停止搜索,需要通过某种方式来检查是否有线程已经找到了一个解答。如果需要第一个找到的解答,那么还需要在其他任务都没有找到解答时更新解答。这些需求描述的是一种闭锁(Latch)机制,具体地说,是一种包含结果的闭锁。
ValueLatch中使用CountDownLatch来实现所需的闭锁行为,并且使用锁定机制来确保解答只会被设置一次。
// 由ConcurrentPuzzleSolver使用的携带结果的闭锁 @ThreadSafe public class ValueLatch<T> { @GuardedBy("this") private T value = null; // 携带的结果 private final CountDownLatch done = new CountDownLatch(1); public boolean isSet() { return (done.getCount()==0); } public synchronized void setValue(T newValue) { // synchronized if(!isSet()) { value = newValue; done.countDown(); } } public T getValue() throws InterruptedException { done.await(); synchronized(this) { // synchronized return value; } } }
第一个找到解答的线程还会关闭Executor,从而阻止接受新的任务。要避免处理RejectedExecutionException,需要将拒绝执行处理器设置为“抛弃已提交的任务”。然后,所有未完成的任务最终将执行完成,并且在执行任何新任务时都会失败,从而使Executor结束。
如果不存在解答,那么ConcurrentPuzzleSolver就不能很好地处理这种情况:如果已遍历了所有的移动和位置都没有找到解答,那么在getSolution调用中将永远等待下去。当遍历了整个搜索空间时,串行版本的程序将结束,但要结束并发程序会更困难。其中一种方法是:记录活动任务的数量,当该值为零时将解答设置为null。
第9章 图形用户界面应用程序
9.1 为什么GUI是单线程的
单线程的GUI框架并不仅限于在Java中,在Qt、NexiStep、MacOS Cocoa、X Windows以及其他环境中的GUI框架都是单线程的。许多人曾经尝试过编写多线程的GUI框架,但最终都由于竞态条件和死锁导致的稳定性问题而又重新回到单线程的事件队列模型:采用一个专门的线程从队列中抽取事件,并将它们转发到应用程序定义的事件处理器。
在多线程的GUI框架中更容易发生死锁问题,其部分原因在于,在输入事件的处理过程与GUI组件的面向对象模型之间会存在错误的交互。用户引发的动作将通过一种类似于“气泡上升”的方式从操作系统传递给应用程序——操作系统首先检测到一次鼠标点击,然后通过工具包将其转化为“鼠标点击”事件,该事件最终被转换为一个更高层事件(例如“鼠标键被按下”事件)转发给应用程序的监听器。另一方面,应用程序引发的动作又会以“气泡下沉”的方式从应用程序返回到操作系统。例如,在应用程序中引发修改某个组件背景色的请求,该请求将转发给某个特定的组件类,并最终转发给操作系统进行绘制。因此,一方面这组操作将以完全相反的顺序来访问相同的GUI对象;另一方面又要确保每个对象都是线程安全的,从而导致不一致的锁定顺序,并引发死锁。这种问题几乎在每次开发GUI工具包时都会重现。
另一个在多线程GUI框架中导致死锁的原因就是“模型-视图-控制(MVC)”这种设计模式的广泛使用。
“控制”模块将调用“模型”模块,而“模型”模块将发生的变化通知给“视图”模块。“控制”模块同样可以调用“视图”模块,并调用“模型”模块来查询模型的状态。这将再次导致不一致的锁定顺序并出现死锁。
单线程的GUI框架通过线程封闭机制来实现线程安全性。所有GUI对象,包括可视化组件和数据模型等,都只能在事件线程中访问。
9.1.1 串行事件处理
因此,在事件线程中执行的任务必须尽快地把控制权交还给事件线程。要启动一些执行时间较长的任务,例如对某个大型文档执行拼写检查,在文件系统中执行搜索,或者通过网络获取资源等,必须在另一个线程中执行这些任务,从而尽快地将控制权交还给事件线程。如果要在执行某个时间较长的任务时更新进度标识,或者在任务完成后提供一个可视化的反馈,那么需要再次执行事件线程中的代码。这也很快会使程序变得更复杂。
9.1.2 Swing中的线程封闭机制
所有Swing组件(例如JButton和JTable)和数据模型对象(例如TableModel和TreeModel)都被封闭在事件线程中,因此任何访问它们的代码都必须在事件线程中运行。
这种方法的好处在于,当访问表现对象(Presentation Object)时在事件线程中运行的任务无须担心同步问题,而坏处在于,无法从事件线程之外的线程中访问表现对象。
Swing的单线程规则是:Swing中的组件以及模型只能在这个事件分发线程中进行创建、修改以及查询。
单线程规则的其他一些例外情况包括:
SwingUtilities.isEventDispatchThread,用于判断当前线程是否是事件线程。
SwingUtilities.invokeLater,该方法可以将一个Runnable任务调度到事件线程中执行(可以从任意线程中调用。
SwingUtilities.invokeAndWait,该方法可以将一个Runnable任务调度到事件线程中执行,并阻塞当前线程直到任务完成(只能从非GUI线程中调用)。
所有将重绘(Repaint)请求或重生效(Revalidation)请求插入队列的方法(可从任意线程中调用)。
所有添加或移除监听器的方法(这些方法可以从任意线程中调用,但监听器本身一定要在事件线程中调用)。invokeLater和invokeAndWait两个方法的作用酷似Executor。
// 使用Executor来实现SwingUtilities public class SwingUtilities { private static final ExecutorService exec = Executors.newSingleThreadExecutor(new SwingThreadFactory()); private static volatile Thread swingThread; private static class SwingThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { swingThread = new Thread(r); return swingThread; } } public static boolean isEventDispatchThread() { return Thread.currentThread() == swingThread; } public static void invokeLater(Runnable task) { exec.execute(task); } public static void invokeAndWait(Runnable task) throws InterruptedException, InvocationTargetException { Future f = exec.submit(task); try { f.get(); } catch (ExecutionException e) { throw new InvocationTargetException(e); } } }
GuiExecutor是一个Executor,它将任务委托给SwingUtilities来执行。也可以采用其他的GUI框架来实现它,例如SWT提供的Display.asyncExec方法,它类似于Swing中的invokeLater。
// 基于SwingUtilities构建的Executor public class GuiExecutor extends AbstractExecutorService { // 采用“单件(Singleton)”模式,有一个私有构造函数和一个公有的工厂方法 private static final GuiExecutor instance = new GuiExecutor(); private GuiExecutor() { } public static GuiExecutor instance() { return instance; } public void execute(Runnable r) { if (SwingUtilities.isEventDispatchThread()) r.run(); else SwingUtilities.invokeLater(r); } // 其他生命周期方法的实现 }
9.2 短时间的GUI任务
按钮点击时的执行控制流:EDT->鼠标点击->动作事件->动作监听者->设置颜色
Swing将大多数可视化组件都分为两个对象,即模型对象与视图对象。在模型对象中保存的是将被显示的数据,而在视图对象中则保存了控制显示方式的规则。模型对象可以通过引发事件来表示模型数据发生了变化,而视图对象则通过“订阅”来接收这些事件。当视图对象收到表示模型数据已经变化的事件时,将向模型对象查询新的数据,并更新界面显示。
模型对象与视图对象的控制流:EDT->鼠标点击->动作事件->动作监听者->更新表格模型->表格修改事件->表格监听器->更新表格视图
9.3 长时间的GUI任务
这个示例通过“Fire and Forget”方式将长时间任务从事件线程中分离出来,这种方式可能并不是非常有用。在执行完一个长时间的任务后,通常会产生某种可视化的反馈。但你并不能从后台线程中访问这些表现对象,因此任务在完成时必须向事件线程提交另一个任务来更新用户界面。
// 将一个长时间任务绑定到一个可视化组件 ExecutorService backgroundExec = Executors.newCachedThreadPool(); ... button.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent e) { backgroundExec.execute(new Runnable() { public void run() { doBigComputation(); } }); } });
动作监听器首先使按钮无效,并设置一个标签表示正在进行某个计算,然后将一个任务提交给后台的Executor。当任务完成时,它会在事件线程中增加另一个任务,该任务将重新激活按钮并恢复标签文本。
// 支持用户反馈的长时间任务 button.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent e) { button.setEnabled(false); label.setText("busy"); backgroundExec.execute(new Runnable() { public void run() { try { doBigComputation(); } finally { GuiExecutor.instance().execute(new Runnable() { public void run() { button.setEnabled(true); label.setText("idle"); } }); } } }); } });
在GUI应用程序中,这种“线程接力”是处理长时间任务的典型方法。
9.3.1 取消你可以直接通过线程中断来实现取消操作,但是一种更简单的办法是使用Future,专门用来管理可取消的任务。
如果调用Future的cancel方法,并将参数mayInterruptIfRunning设置为true,那么这个Future可以中断正在执行任务的线程。如果你编写的任务能够响应中断,那么当它被取消时就可以提前返回。
// 取消一个长时间任务 Future<?> runningTask = null; // 线程封闭 ... startButton.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent e) { if(runningTask!=null) { runningTask = backgroundExec.submit(new Runnable() { public void run() { while(moreWork()) { if(Thread.currentThread().isInterrupted()) { cleanUpPartialWork(); break; } doSomeWork(); } } }); } } }); cancelButton.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent event) { if(runningTask!=null) runningTask.cancel(true); } }); // 可以处理到这个事件,因为上面的任务是在另一个线程中执行,控制权早就已返还给事件线程了
由于runningTask被封闭在事件线程中,因此在对它进行设置或检查时不需要同步,并且“开始”按钮的监听器可以确保每次只有一个后台任务在运行。然而,当任务完成时最好能通知按钮监听器,例如说可以禁用“取消”按钮。
9.3.2 进度标识和完成标识
通过Future来表示一个长时间的任务,可以极大地简化取消操作的实现。在FutureTask中也有一个done方法同样有助于实现完成通知。当后台的Callable完成后,将调用done。通过done方法在事件线程中触发一个完成任务,我们能够构造一个BackgroundTask类,这个类将提供一个在事件线程中调用的onCompletion方法。
// 支持取消,完成通知以及进度通知的后台任务类 abstract class BackgroundTask<V> implements Runnable, Future<V> { private final FutureTask<V> computation = new Computation(); private class Computation extends FutureTask<V> { public Computation() { super(new Callable<V>() { public V call() throws Exception { return BackgroundTask.this.compute(); } }); } protected final void done() { GuiExecutor.instance().execute(new Runnable() { public void run() { V value = null; Throwable thrown = null; boolean cancelled = false; try { value = get(); } catch (ExecutionException e) { thrown = e.getCause(); } catch (CancellationException e) { cancelled = true; } catch (InterruptedException consumed) { } finally { onCompletion(value, thrown, cancelled); // 任务完成的通知 } } }); } } protected void setProgress(final int current, final int max) { GuiExecutor.instance().execute(new Runnable() { public void run() { onProgress(current, max); } }); } // 在后台线程中被取消 protected abstract V compute() throws Exception; // 在事件线程中被取消 protected void onCompletion(V result, Throwable exception, boolean cancelled) { } protected void onProgress(int current, int max) { } // Future的其他方法 }
compute方法可以调用setProgress方法以数字形式来指示进度。因而在事件线程中调用onProgress,从而更新用户界面以显示可视化的进度信息。
要想实现BackgroundTask,你只需要实现compute,该方法将在后台线程中调用。也可以改写onCompletion和onProgress,这两个方法也会在事件线程中调用。
// 通过BackgroundTask来执行长时间的并且可取消的任务 startButton.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent e) { class CancelListener implements ActionListener { BackgroundTask<?> task; public void actionPerformed(ActionEvent event) { if(task!=null) task.cancel(true); } } final CancelListener listener = new CancelListener(); listener.task = new BackgroundTask<Void>() { // V = Void public Void compute() { while(moreWork() && !isCancelled()) doSomeWork(); return null; } public void onCompletion(boolean cancelled, String s, Throwable exception) { cancelButton.removeActionListener(listener); // 表示任务完成,再点击cancel按钮就无效了 label.setText("done"); } }; cancelButton.addActionListener(listener); // 这个操作放在startButton事件处理器里,表示在点击start按钮之前,点击cancel按钮是无效的 backgroundExec.execute(listener.task); } });
9.4 共享数据模型
最简单的情况是,数据模型中的数据由用户来输入或者由应用程序来启动时静态地从文件或其他数据源加载。在这种情况下,除了事件线程之外的任何线程都不可能访问到数据。但在某些情况下,表现模型对象只是一个数据源(例如数据库、文件系统或远程服务等)的视图对象。这时,当数据在应用程序中进出时,有多个线程都可以访问这些数据。
正确的做法是,当树节点被展开时才读取相应的内容。即使只枚举远程卷上的单个目录也可能花费很长的时间,因此你可以考虑在后台任务中执行枚举操作。当后台任务完成后,必须通过某种方式将数据填充到树形模型中。可以使用线程安全的树形模型来实现这个功能:通过invokeLater提交一个任务,将数据从后台任务中“推入”事件线程,或者让事件线程通过轮询来查看是否有数据可用。
9.4.1 线程安全的数据模型
如果数据模型支持细粒度的并发,那么事件线程和后台线程就能共享该数据模型,而不会发生响应性问题。例如,第5章的DelegatingVehicleTracker在底层使用了一个ConcurrentHashMap来提供高度并发的读写操作。这种方法的缺点在于,ConcurrentHashMap无法提供一致的数据快照,而这可能是需求的一部分。线程安全的数据模型必须在更新模板时产生事件,这样视图才能在数据发生变化后进行更新。
然而,只有在遍历操作远远多于修改操作时,“写时拷贝”容器才能提供更好的性能,例如在车辆追踪应用程序中就不适合采用这种方法。一些特定的数据结构或许可以避免这种限制,但要构建一个既能提供高效的并发访问又能在旧数据无效后不再维护它们的数据结构却并不容易,因此只有其他方法都行不通后才应该考虑使用它。
9.4.2 分解数据模型
在分解模型设计中,表现模型被封闭在事件线程中,而其他模型,即共享模型,是线程安全的,因此既可以由事件线程方法,也可以由应用程序线程访问。表现模型会注册共享模型的监听器,从而在更新时得到通知。然后,表示模型可以在共享模型中得到更新:通过将相关状态的快照嵌入到更新消息中,或者由表现模型在收到更新事件时直接从共享模型中获取数据。
如果数据模型很大,或者更新频率极高,在分解模型包含的信息中有一方或双方对另一方不可见,那么更高效的方式是发送增量更新信息而不是发送一个完整的快照。
第10章 避免活跃性危险
在安全性与活跃性之间通常存在着某种制衡。我们使用加锁机制来确保线程安全,但如果过度地使用加锁,则可能导致锁顺序死锁(Lock-Ordering Deadlock)。同样,我们使用线程池和信号量来限制对资源的使用,但这些被限制的行为可能会导致资源死锁(Resource Deadlock)。
10.1 死锁
在数据库系统的设计中考虑了监测死锁以及从死锁中恢复。在执行一个事务(Transaction)时可能需要获取多个锁,并一直持有这些锁直到事务提交。因此在两个事务之间很可能发生死锁,但事实上这种情况并不多见。如果没有外部干涉,那么这些事务将永远等待下去(在某个事务中持有的锁可能在其他事务中也需要)。但数据库服务器不会让这种情况发生。当它检测到一组事务发生了死锁时(通过在表示等待关系的有向图中搜索循环),将选择一个牺牲者并放弃这个事务。作为牺牲者的事务会释放它所持有的资源,从而使其他事务继续进行。应用程序可以重新执行被强制中止的事务,而这个事务现在可以成功完成,因为所有跟它竞争资源的事务都已经完成了。
与许多其他的并发危险一样,死锁造成的影响很少会立即显现出来。如果一个类可能发生死锁,那么并不意味着每次都会发生死锁,而只是表示有可能。当死锁出现时,往往是在最糟糕的时候——在高负载情况下。
10.1.1 锁顺序死锁
如果每个需要锁L和锁M的线程都以相同的顺序来获取L和M,那么就不会发生死锁了。
10.1.2 动态的锁顺序死锁这种死锁可以采用查看是否存在嵌套的锁获取操作的方法来检查。由于我们无法控制参数的顺序,因此要解决这个问题,必须定义锁的顺序,并在整个应用程序中都按照这个顺序来获取锁。
在制定锁的顺序时,可以使用System.identityHashCode方法,该方法将返回由Object.hashCode返回的值。
// 通过锁顺序来避免死锁 private static final Object tieLock = new Object(); public void transferMoney(final Account fromAcct, final Account toAcct, final DollarAmount amount) throws InsufficientFundsException { class Helper { public void transfer() throws InsufficientFundsException { if(fromAcct.getBalance().compareTo(amount)<0) throw new InsufficientFundsException(); else { fromAcct.debit(amount); toAcct.credit(amount); } } } int fromHash = System.identityHashCode(fromAcct); int toHash = System.identityHashCode(toAcct); if (fromHash<toHash) { synchronized(fromAcct) { synchronized(toAcct) { new Helper().transfer(); } } } else if (fromHash>toHash) { synchronized(toAcct) { synchronized(fromAcct) { new Helper().transfer(); } } } else { synchronized(tieLock) { synchronized(fromAcct) { synchronized(toAcct) { new Helper().transfer(0; } } } } }
在极少数情况下,两个对象可能拥有相同的散列值,此时必须通过某种任意的方法来决定锁的顺序,而这可能又会重新引入死锁。为了避免这种情况,可以使用“加时赛(Tie-Breaking)”锁。在获得两个Account锁之前,首先获得这个“加时赛”锁,从而保证每次只有一个线程以未知的顺序获得这两个锁,从而消除了死锁发生的可能性(只要一致地使用这种机制)。如果经常会出现散列冲突的情况,那么这种技术可能会成为并发性的一个瓶颈(这类似于在整个程序中只有一个锁的情况),但由于System.identityHashCode中出现散列冲突的频率非常低,因此这项技术以最小的代价,换来了最大的安全性。
如果在Account中包含一个唯一的、不可变的,并且具备可比性的键值,例如账号,那么要制定锁的顺序就更加容易了:通过键值对对象进行排序,因而不需要使用“加时赛”锁。
10.1.3 在协作对象之间发生的死锁
// 在相互协作对象之间的锁顺序死锁(不要这么做) // 注意:容易发生死锁! class Taxi { @GuardedBy("this") private Point location, destination; private final Dispatcher dispatcher; public Taxi(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public synchronized Point getLocation() { return location; } public synchronized void setLocation(Point location) { // 首先要获取this这个Taxi对象的锁 this.location = location; if (location.equals(destination)) // notifyAvailable是外部方法,且是synchronized方法,所以要获取dispatcher对象的锁 dispatcher.notifyAvailable(this); } } class Dispatcher { @GuardedBy("this") private final Set<Taxi> taxis; @GuardedBy("this") private final Set<Taxi> availableTaxis; public Dispatcher() { taxis = new HashSet<Taxi>(); availableTaxis = new HashSet<Taxi>(); } public synchronized void notifyAvailable(Taxi taxi) { availableTaxis.add(taxi); } public synchronized Image getImage() { // 首先要获取this这个Dispatcher对象的锁 Image image = new Image(); for (Taxi t : taxis) image.drawMarker(t.getLocation()); // getLocation是个外部方法,且是synchronized的,所以要获取每个t(Taxi)的锁。于是getImage和setLocation就会死锁了。 return image; } }
因为setLocation和notifyAvailable都是同步方法,因此调用setLocation的线程将首先获取Taxi的锁,然后获取Dispatcher的锁。同样,调用getImage的线程将首先获取Dispatcher的锁,然后再获取每一个Taxi的锁(每次获取一个)。
然而要在Taxi和Dispatcher中查找死锁则比较困难:如果在持有锁的情况下调用某个外部方法,那么就需要警惕死锁。
如果在持有锁时调用某个外部方法,那么将出现活跃性问题。在这个外部方法中可能会获取其他锁(这可能会产生死锁),或者阻塞时间过长,导致其他线程无法及时获得当前被持有的锁。
10.1.4 开放调用
这需要使同步代码块仅被用于保护那些涉及共享状态的操作。通常,如果只是为了语法紧凑或简洁性(而不是因为整个方法必须通过一个锁来保护)而使用同步方法(而不是同步代码块),那么就会导致上面的死锁。
// 通过公开调用来避免在相互协作的对象之间产生死锁 @ThreadSafe class Taxi { @GuardedBy("this") private Point location, destination; private final Dispatcher dispatcher; ... public synchronized Point getLocation() { // location是共享状态 return location; } public void setLocation(Point location) { boolean reachedDestination; // 保存location.equals(destination)这个共享状态 synchronized(this) { // 两个共享状态 this.location = location; reachedDestination = location.equals(destination); } if (reachedDestination) dispatcher.notifyAvailable(this); } } @ThreadSafe class Dispatcher { @GuardedBy("this") private final Set<Taxi> taxis; @GuardedBy("this") private final Set<Taxi> availableTaxis; ... public synchronized void notifyAvailable(Taxi taxi) { // 修改availableTaxis这个共享状态 availableTaxis.add(taxi); } public Image getImage() { Set<Taxi> copy; synchronized(this) { // 复制的意思是,出租车就是这么些出租车了,但是它们的位置还可以变 copy = new HashSet<Taxi>(taxis); } Image image = new Image(); for (Taxi t : copy) image.drawMarker(t.getLocation()); return image; } }
有时候,在重新编写同步代码块以使用开发调用时会产生意想不到的结果,因为这会使得某个原子操作变为非原子操作。在许多情况下,使某个操作失去原子性是可以接受的。例如,对于两个操作:更新出租车位置以及通知调度程序这辆出租车已准备好出发去一个新的目的地,这两个操作并不需要实现为一个原子操作。在其他情况下,虽然去掉原子性可能会出现一些值得注意的结果,但这种语义变化仍然是可以接受的。在容易产生死锁的版本中,getImage会生成某个时刻下的整个车队位置的完整快照,而在重新改写的版本中,getImage将获得每辆出租车不同时刻的位置。
例如,在关闭某个服务时,你可能希望所有正在运行的操作执行完成以后,再释放这些服务占用的资源。如果在等待操作完成的同时持有该服务的锁,那么将很容易导致死锁,但如果在服务关闭之前就释放服务的锁,则可能导致其他线程开始新的操作。
这个问题的解决方法是,在将服务的状态更新为“关闭”之前一直持有锁,这样其他想要开始新操作的线程,包括想关闭该服务的其他线程,会发现服务已经不可用,因此也就不会试图开始新的操作。然后,你可以等待关闭操作结束,并且知道当开放调用完成后,只有执行关闭操作的线程才能访问服务的状态。因此,这项技术依赖于构造一些协议(而不是通过加锁)来防止其他线程进入代码的临界区。
10.1.5 资源死锁
如果某些任务需要等待其他任务的结果,那么这些任务往往是产生线程饥饿死锁的主要来源,有界线程池/资源池与相互依赖的任务不能一起使用。
10.2 死锁的避免与诊断
10.2.1 支持定时的锁
还有一项技术可以检测死锁和从死锁中恢复过来,即显式使用Lock类中的定时tryLock功能来代替内置锁机制。
这项技术只有在同时获取两个锁时才有效,如果在嵌套的方法调用中请求多个锁,那么即使你知道已经持有了外层的锁,也无法释放它。
10.2.2 通过线程转储信息来分析死锁
线程转储包括各个运行中的线程的栈追踪信息,这类似于发生异常时的栈追踪信息。线程转储还包含加锁信息,例如每个线程持有了哪些锁,在哪些栈帧中获得这些锁,以及被阻塞的线程正在等待获取哪一个锁。在生成线程转储之前,JVM将在等待关系图中通过搜索循环来找出死锁。如果发现了一个死锁,则获取相应的死锁信息,例如在死锁中涉及哪些锁和线程,以及这个锁的获取操作位于程序的哪些位置。
要在UNIX平台上触发线程转储操作,可以通过向JVM的进城发送SGIQUIT信号(kill -3),或者在UNIX平台中按下Ctrl-\键,在Windows平台中按下Ctrl-Break键。在许多IDE中都可以请求线程转储。
如果使用显式的Lock类而不是内部锁,那么Java 5.0并不支持与Lock相关的转储信息,在线程转储中不会出现显式的Lock。虽然Java 6中包含对显式Lock的线程转储和死锁检测等的支持,但在这些锁上获得的信息比在内置锁上获得的信息精确度低。内置锁与获得它们所在的线程帧是相关联的,而显式的Lock只与获得它的线程相关联。
10.3 其他活跃性危险
10.3.1 饥饿
引发饥饿的最常见资源就是CPU时钟周期。
要避免使用线程优先级,因为这会增加平台依赖性,并可能导致活跃性问题。在大多数并发应用程序中,都可以使用默认线程优先级。
10.3.2 糟糕的响应性
但CPU密集型的后台任务仍然可能对响应性造成影响,因为它们会与事件线程共同竞争CPU的时钟周期。
10.3.3 活锁
活锁(Livelock)是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复执行相同的操作,而且总会失败。活锁通常发生在处理事务消息的应用程序中:如果不能成功地处理某个消息,那么消息处理机制将回滚整个事务,并将它重新放到队列的开头。
这种形式的活锁通常是由过度的错误恢复代码造成的,因为它错误地将不可修复的错误作为可修复的错误。
当多个相互协作线程都对彼此进行响应从而修改各自的状态,并使得任何一个线程都无法继续执行时,就发生了活锁。这就像两个过于礼貌的人在半路上面对面地相遇:他们彼此都让出对方的路,然而又在另一条路上相遇了。因此他们就这样反复地避让下去。
要解决这种活锁问题,需要在重试机制中引入随机性。
以太协议定义了在重复发生冲突时采用指数方式回退机制,从而降低在多台存在冲突的机器之间发生拥塞和反复失败的风险。
第11章 性能与可伸缩性
11.1 对性能的思考
造成这些开销的操作包括:线程之间的协调(例如加锁、触发信号以及内存同步等),增加的上下文切换,线程的创建和销毁,以及线程的调度等。
要想通过并发来获得更好的性能,需要努力做好两件事情:更有效地利用现有处理资源,以及在出现新的处理资源时使程序尽可能地利用这些新资源。
11.1.1 性能与可伸缩性
可伸缩性指的是:当增加计算资源时(例如CPU、内存、存储容量或I/O带宽),程序的吞吐量或者处理能力能相应地增加。
在并发应用程序中针对可伸缩性设计和调整时所采用的方法与传统的性能调优方法截然不同。当进行性能调优时,其目的通常是用更小的代价完成相同的工作,例如通过缓存来重用之前计算的结果,或者采用时间复杂度为O(n2)O(n2)算法来代替复杂度为O(nlogn)O(nlogn)的算法。在进行可伸缩性调优时,其目的是设法将问题的计算并行化,从而能利用更多的计算资源来完成更多的任务。
我们熟悉的三层程序模型,即在模型中的表现层、业务逻辑层和持久化层是彼此独立的,并且可能由不同的系统来处理,这很好地说明了提高可伸缩性通常会造成性能损失的原因。如果把表现层、业务逻辑层和持久化层都融合到单个应用程序中,那么在处理第一个工作单元时,其性能肯定要高于将应用程序分为多层并将不同层次分布到多个系统时的性能。这种单一的应用程序避免了在不同层次之间传递任务时存在的网络延迟,同时也不需要将计算过程分解到不同的抽象层次,因此能减少许多开销(例如在任务排队、线程调用以及数据复制时存在的开销)。
然而,当这种单一的系统到达自身处理能力的极限时,会遇到一个严重的问题:要进一步提升它的处理能力将非常困难。因此,我们通常会接受每个工作单元执行更长的时间或消耗更多的计算资源,以换取应用程序在增加更多资源的情况下处理更高的负载。
11.1.2 评估各种性能权衡因素
例如,“快速排序”算法在大规模数据集上的执行效率非常高,但对于小规模的数据集来说,“冒泡排序”实际上更高效。如果要实现一个高效的排序算法,那么需要知道被处理数据集的大小,还有衡量优化的指标,包括:平均计算时间、最差时间、可预知性。然而,编写某个库中排序算法的开发人员通常无法知道这些需求信息。这就是为什么大多数优化措施都不成熟的原因之一:它们通常无法获得一组明确的需求。
很多性能优化措施通常都是以牺牲可读性或可维护性为代价——代码越“聪明”或越“晦涩”,就越难以理解和维护。有时候,优化措施会破坏面向对象的设计原则,例如需要打破封装,有时候,它们又会带来更高的错误风险,因为通常越快的算法就越复杂。
在实现这种性能提升时需要付出哪些隐含的代价,例如增加开发风险或维护开销?这种权衡是否合适?
以测试为基准,不要猜测。
例如,免费的perfbar应用程序可以给出CPU的忙碌程度信息,而我们通常的目标就是使CPU保持忙碌状态,因此这个功能可以有效地评估是否需要进行性能调优或者已实现的调优效果如何。
11.2 Amdahl定律
而有些任务本质上是串行的,例如,即使增加再多的工人也不可能增加作物的生长速度。
假定F是必须被串行执行的部分,那么根据Amdahl定律,在包含N个处理器的机器中,最高的加速比为:Speedup≤1F+1−FNSpeedup≤1F+1−FN
当N趋近无穷大时,最大的加速比趋近于1/F。因此,如果程序有50%的计算需要串行执行,那么最高的加速比只能是2(而不管有多少个线程可用);如果程序中有10%的计算需要串行执行,那么最高的加速比将接近10。
随着处理器数量的增加,可以很明显地看到,即使串行部分所占的百分比很小,也会极大地限制当增加计算资源时能够提升的吞吐率。
然而,这个过程中包含了一个串行部分——从队列中获取任务。所有工作者线程都共享同一个工作队列,因此在对该队列进行并发访问时需要采用某种同步机制来维持队列的完整性。
如果使用LinkedBlockingQueue作为工作队列,那么出列操作被阻塞的可能性将小于使用同步LinkedList时发生阻塞的可能性,因为LinkedBlockingQueue使用了一种可伸缩性更高的算法。
这个示例还忽略了另一种常见的串行操作:对结果进行处理。所有有用的计算都会生成某种结果或者产生某种效应——如果不会,那么可以将它们作为“死亡代码”删除掉。由于Runnable没有提供明确的结果处理过程,因此这些任务一定会产生某种效果,例如将它们的结果写入到日志或者保存到某个数据结构。通常,日志文件和结果容器都会由多个工作者线程共享,并且这也是一个串行部分。如果所有线程都将各自的计算结果保存到自行维护数据结构中,并且在所有任务都执行完成后再合并所有的结果,那么这种合并操作也是一个串行部分。
在所有并发程序中都包含一些串行部分。
11.2.1 示例:在各种框架中隐藏的串行部分
吞吐量的差异来源于两个队列中不同比例的串行部分。同步的LinkedList采用单个锁来保护整个队列的状态,并且在offer和remove等方法的调用期间都将持有这个锁。ConcurrentLinkedQueue使用了一种更复杂的非阻塞队列算法,该算法使用原子引用来更新各个链接指针。在第一个队列中,整个的插入或删除操作都将串行执行,而在第二个队列中,只有对指针的更新操作需要串行执行。
11.2.2 Amdahl定律的应用
在评估一个算法时,要考虑算法在数百个或数千个处理器的情况下的性能表现,从而对可能出现的可伸缩性局限有一定程度的认识。例如,两种降低锁粒度的技术:锁分解(将一个锁分解为两个锁)和锁分段(把一个锁分解为多个锁)。当通过Amdahl定律来分析这两项技术时,我们会发现,如果将一个锁分解为两个锁,似乎并不能充分利用多处理器的能力。锁分段技术似乎更有前途,因为分段的数量可随着处理器数量的增加而增加。
11.3 线程引入的开销
11.3.1 上下文切换
切换上下文需要一定的开销,而在线程调度过程中需要访问由操作系统和JVM共享的数据结构。应用程序、操作系统以及JVM都使用一组相同的CPU。在JVM和操作系统的代码中消耗越多的CPU时钟周期,应用程序的可用CPU时钟周期就越少。但上下文切换的开销并不只包含JVM和操作系统的开销。当一个新的线程被切换进来时,它所需要的数据可能不在当前处理器的本地缓存中,因此上下文切换将导致一些缓存缺失,因而线程在首次调度运行时会更加缓慢。这就是为什么调度器会为每个可运行的线程分配一个最小执行时间,即使有许多其他的线程正在等待执行:它将上下文切换的开销分摊到更多不会中断的执行时间上,从而提高整体的吞吐量(以损失响应性为代价)。
上下文切换的实际开销会随着平台的不同而变化,然而按照经验来看:在大多数通用的处理器中,上下文切换的开销相当于5000~10000个时钟周期,也就是几微秒。
UNIX系统的vmstat命令和Windows系统的perfmon工具都能报告上下文切换次数以及在内核中执行时间所占比例等信息。如果内核占用率较高(超过10%),那么通常表示调度活动发生得很频繁,这很可能是由I/O或竞争锁导致的阻塞引发的。
11.3.2 内存同步
在synchronized和volatile提供的可见性保证中可能会使用一些特殊指令,即内存栅栏(Memory Barrier)。内存栅栏可以刷新缓存,使缓存无效,刷新硬件的写缓冲,以及停止执行管道。
在内存栅栏中,大多数操作都是不能被重排序的。
如果有一个锁对象只能由当前线程访问,那么JVM就可以通过优化去掉这个锁获取操作,因为另一个线程无法与当前线程在这个锁上发生同步。例如,JVM通常都会去掉下面的锁获取操作:
synchronized (new Object()) { // 执行一些操作...... }
一些更完备的JVM能通过逸出分析(Escape Analysis)来找出不会发布到堆的本地对象引用(因此这个引用是线程本地的)。
即使不进行逸出分析,编译器也可以执行锁粒度粗化(Lock Coarsening)操作,即将邻近的同步代码块用同一个锁合并起来。
不要过度担心非竞争同步带来的开销。这个基本的机制已经非常快了,并且JVM还能进行额外的优化以进一步降低或消除开销。因此,我们应该将优化重点放在那些发生锁竞争的地方。
同步会增加共享内存总线上的通信量,总线的带宽是有限的,并且所有的处理器都共享这条总线。如果有多个线程竞争同步带宽,那么所有使用了同步的线程都会受到影响。
11.3.3 阻塞
非竞争的同步可以完全在JVM中进行处理,而竞争的同步可能需要操作系统的介入,从而增加开销。
如果等待时间较短,则适合采用自旋等待方式,而如果等待时间较长,则适合采用线程挂起方式。
11.4 减少锁的竞争
我们已经看到,串行操作会降低可伸缩性,并且上下文切换也会降低性能。在锁上发生竞争时将同时导致这两种问题,因此减少锁的竞争能够提高性能和可伸缩性。
在并发程序中,对可伸缩性的最主要威胁就是独占方式的资源锁。
有两个因素将影响在锁上发生竞争的可能性:锁的请求频率,以及每次持有该锁的时间。
有3种方式可以降低锁的竞争程度:
减少锁的持有时间。
降低锁的请求频率。
使用带有协调机制的独占锁,这些机制允许更高的并发性。
11.4.1 缩小锁的范围(“快进快出”)
降低发生竞争可能性的一种有效方式就是尽可能缩短锁的持有时间。例如,可以将一些与锁无关的代码移出同步代码块,尤其是那些开销较大的操作,以及可能被阻塞的操作,例如I/O操作。
由于在AttributeStore中只有一个状态变量attributes,因此可以通过将线程安全性委托给其他的类来进一步提升它的性能。通过用线程安全的Map(Hashtable、synchronizedMap或ConcurrentHashMap)来代替attributes,AttributeStore可以将确保线程安全性的任务委托给顶层的线程安全容器来实现。这样就无须在AttributeStore中采用显式的同步,缩小在访问Map期间锁的范围,并降低了将来的代码维护者无意破坏线程安全性的风险(例如在访问attributes之前忘记获得相应的锁)。
在分解同步代码块时,理想的平衡点将与平台相关,但在实际情况中,仅当可以将一些“大量”的计算或阻塞操作从同步代码块中移出时,才应该考虑同步代码块的大小。
11.4.2 减小锁的粒度
这可以通过锁分解和锁分段等技术来实现,在这些技术中将采用多个相互独立的锁来保护独立的状态变量,从而改变这些变量在之前由单个锁来保护的情况。这些技术能减小锁操作的粒度,并能实现更高的可伸缩性,然而,使用的锁越多,那么发生死锁的风险也就越高。
如果一个锁需要保护多个相互独立的状态变量,那么可以将这个锁分解为多个锁,并且每个锁只保护一个变量,从而提高可伸缩性,并最终降低每个锁被请求的频率。
// 将ServerStatus重新改写为使用锁分解技术 @ThreadSafe public class ServerStatus { @GuardedBy("users") public final Set<String> users; @GuardedBy("queries") public final Set<String> queries; ... public void addUser(String u) { synchronized(users) { users.add(u); } } public void addQuery(String q) { synchronized(queries) { queries.add(q); } } // 去掉同样被改写为使用被分解锁的方法 }
11.4.3 锁分段
在某些情况下,可以将锁分解技术进一步扩展为对一组独立对象上的锁进行分解,这种情况被称为锁分段。例如,在ConcurrentHashMap的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。假设散列函数具有合理的分布性,并且关键字能够实现均匀分布,那么这大约能把对于锁的请求减少到原来的1/16。正是这项技术使得ConcurrentHashMap能够支持多达16个并发的写入器。(要使得拥有大量处理器的系统在高访问量的情况下实现更高的并发性,还可以进一步增加锁的数量,但仅当你能证明并发写入线程的竞争足够激烈并需要突破这个限制时,才能将锁分段的数量超过默认的16个。)
锁分段的一个劣势在于:与采用单个锁来实现独占访问相比,要获取多个锁来实现独占访问将更加困难并且开销更高。通常,在执行一个操作时最多只需获取一个锁,但在某些情况下需要加锁整个容器,例如当ConcurrentHashMap需要扩展映射范围,以及重新计算键值的散列值要分布到更大的桶集合中时,就需要获取分段锁集合中的所有的锁。
它拥有N_LOCKS个锁,并且每个锁保护散列桶的一个子集。大多数方法,例如get,都只需要获得一个锁,而有些方法则需要获得所有的锁,但并不要求同时获得,例如clear方法的实现。
// 在基于散列的Map中使用锁分段技术 @ThreadSafe public class StripedMap { // 同步策略:buckets[n]由locks[n%N_LOCKS]来保护 private static final int N_LOCKS = 16; private final Node[] buckets; private final Object[] locks; private static class Node { ... } public StripedMap(int numBuckets) { buckets = new Node[numBuckets]; locks = new Object[N_LOCKS]; for(int i=0; i<N_LOCKS; i++) locks[i] = new Object(); } private final int hash(Object key) { return Math.abs(key.hashCode() % buckets.length); } public Object get(Object key) { int hash = hash(key); synchronized(locks[hash % N_LOCKS]) { for (Node m=buckets[hash]; m!=null; m=m.next) if(m.key.equals(key)) return m.value; } return null; } public void clear() { for(int i=0; i<buckets.length; i++) { synchronized(locks[i % N_LOCKS]) { buckets[i] = null; } } } ... }
11.4.4 避免热点域
如果程序采用锁分段技术,那么一定要表现出在锁上的竞争频率高于在锁保护的数据上发生竞争的频率。
即使使用锁分段技术来实现散列链,那么在对计数器的访问进行同步时,也会重新导致在使用独占锁时存在的可伸缩性问题。一个看似性能优化的措施——缓存size操作的结果,已经变成了一个可伸缩性问题。在这种情况下,计数器也被称为热点域,因为每个导致元素数量发生变化的操作都需要访问它。
为了避免这个问题,ConcurrentHashMap中的size将对每个分段进行枚举并将每个分段中的元素数量相加,而不是维护一个全局计数。为了避免枚举每个元素,ConcurrentHashMap为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值。
11.4.5 一些替代独占锁的方法
原子变量提供了一种方式来降低更新“热点域”时的开销,例如静态计数器、序列发生器、或者对链表数据结构中头节点的引用。原子变量类提供了在整数或者对象引用上的细粒度原子操作(因此可伸缩性更高),并使用了现代处理器中提供的底层并发原语(例如比较并交换[compare-and-swap])。如果在类中只包含少量的热点域,并且这些域不会与其他变量参与到不变性条件中,那么用原子变量来替代它们能提高可伸缩性。
11.4.6 监测CPU的利用率
不均匀的利用率表明大多数计算都是由一小组线程完成的,并且应用程序没有利用其他的处理器。
在vmstat命令的输出中,有一栏信息是当前处于可运行状态但并没有运行的线程数量。如果CPU的利用率很高,并且总会有可运行的线程在等待CPU,那么当增加更多的处理器时,程序的性能可能会得到提升。
11.4.7 向对象池说“不”
事实上,现在Java的分配操作已经比C语言的malloc调用更快:在Hotspot 1.4.x和5.0中,“new Object”的代码大约只包含10条机器指令。
除了损失CPU指令周期外,在对象池技术中还存在一些其他问题,其中最大的问题就是如何正确地设定对象池的大小(如果对象池太小,那么将没有作用,而如果太大,则会对垃圾收集器带来压力,因为过大的对象池将占用其他程序需要的内存资源)。
通常,对象分配操作的开销比同步的开销更低。
11.5 示例:比较Map的性能
在单线程环境下,ConcurrentHashMap的性能比同步的HashMap的性能略好一些,但在并发环境中则要好得多。
ConcurrentHashMap和ConcurrentSkipListMap的数据显示,它们在线程数量增加时能表现出很好的可伸缩性,并且吞吐量会随着线程数量的增加而增加。虽然图中的线程数量并不大,但与普通的应用程序相比,这个测试程序在每个线程上生成了更多的竞争,因为它除了向Map施加压力外几乎没有执行任何其他操作,而实际的应用程序通常会在每次迭代中进行一些线程本地工作。
11.6 减少上下文切换的开销
当任务在运行和阻塞这两个状态之间转换时,就相当于一次上下文切换。在服务器应用程序中,发生阻塞原因之一就是在处理请求时产生各种日志消息。
日志操作的服务时间包括与I/O流类相关的计算时间,如果I/O操作被阻塞,那么还会包括线程被阻塞的时间。操作系统将这个被阻塞的线程从调度队列中移走并直到I/O操作结束,这将比实际阻塞的时间更长。当I/O操作结束时,可能有其他线程正在执行它们的调度时间片,并且在调度队列中有些线程位于被阻塞线程之前,从而进一步增加服务时间。如果有多个线程在同时记录日志,那么还可能在输出流的锁上发生竞争,这种情况的结果与阻塞I/O的情况一样——线程被阻塞并等待锁,然后被线程调度器交换出去。在这种日志操作中包含了I/O操作和加锁操作,从而导致上下文切换次数的增多,以及服务时间的增加。
通过将I/O操作从处理请求的线程中分离出来,可以缩短处理请求的平均服务时间。调用log方法的线程将不会再因为等待输出流的锁或者I/O完成而被阻塞,它们只需将消息放入队列,然后就返回各自的任务中。另一方面,虽然在消息队列上可能发生竞争,但put操作相对于记录日志的I/O操作(可能需要执行系统调用)是一种更为轻量级的操作,因此在实际使用中发生阻塞的概率更小(只要队列没有填满)。由于发出日志请求的线程现在被阻塞的概率降低,因此该线程在处理请求时被交换出去的概率也会降低。我们所做的工作就是把一条包含I/O操作和锁竞争的复杂且不确定的代码路径变成一条简单的代码路径。