Concurrency in Java

Table of Contents

1 Java并发简介

并发是同时运行多个任务的能力。这里主要讨论在Java中如何使用多线程来实现并发。

本文主要参考:
The Java™ Tutorials > Essential Classes > Concurrency
Java核心技术 卷I 基础知识(原书第9版),第14章 多线程

2 线程基本知识

2.1 创建线程

一般地,下面两种方法可创建一个Java线程。
方法一:实现Runnable接口。它的写法有很多,如:

public class HelloRunnable implements Runnable {  // 直接实现Runnable接口

  public void run() {
    System.out.println("Hello from a thread!");
  }

  public static void main(String args[]) {
    Thread thread = new Thread(new HelloRunnable());
    thread.start();
  }
}

也可以这样:

public class HelloRunnable {
  public static void main(String[] args) {
    Thread thread = new Thread(new Runnable() {
      @Override
      public void run() {
        System.out.println("Hello from a thread!");
      }
    });
    thread.start();
  }
}

还可以用Lambda Expressions:

public class HelloRunnable {
  public static void main(String[] args) {
    Thread thread = new Thread(() -> {   // 这是Lambda Expressions
      System.out.println("Hello from a thread!");
    });
    thread.start();
  }
}

方法二:扩展Thread类,重载其run方法:

public class HelloThread extends Thread {

  public void run() {
    System.out.println("Hello from a thread!");
  }

  public static void main(String args[]) {
    Thread thread = new HelloThread();
    thread.start();
  }
}

参考:
Defining and Starting a Thread: https://docs.oracle.com/javase/tutorial/essential/concurrency/runthread.html

2.1.1 main结束不会导致其他线程提前退出(所有“non-daemon”线程结束后JVM才结束)

在Java中,线程有“non-daemon”线程和“daemon”线程之分。默认地,用户创建的线程为“non-daemon”线程,但可以通过setDaemon()来改变。系统创建一些线程,比如GC线程是“daemon”线程。

JVM在满足下面两个情况之一时退出:(1)方法System.exit()被调用;(2)所有“non-daemon”线程结束。

When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:

  1. The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  2. All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

摘自:http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html

下面是一个测试实例。

// 如果没有其他同步的需求,你不需要在main中使用thread1.join()来等待子线程结束。当所有“non-daemon”线程结束后JVM才会结束。
public class ThreadTest implements Runnable {

    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("New thread end!");
    }

    public static void main(String args[]) throws InterruptedException {
        Thread thread1 = new Thread(new ThreadTest());
        thread1.start();

        System.out.println("Main thread end!");
    }
}

上面程序会输出:

Main thread end!
New thread end!

在上面Java实例中,main线程很快会结束,但这不会导致正在执行的thread1结束(你会看到子线程总输出“New thread end!”)。
这点和C/C++中不一样的,如下面C++程序中,main结束会导致子线程退出(子线程打印的“New thread end!”不会输出):

// file test.cpp.
// g++ -std=c++11 -pthread test.cpp -o test
#include <iostream>
#include <thread>

void hello() {
  std::this_thread::sleep_for(std::chrono::seconds(10));
  std::cout<<"New thread end!\n";
}

int main() {
  std::thread t(hello);

  std::cout<<"Main thread end!\n";     // main结束会导致子线程退出,可以使用t.join()显式地等待子线程。
}

2.2 线程的6个状态

A thread can be in one of the following states:

NEW
A thread that has not yet started is in this state.
RUNNABLE
A thread executing in the Java virtual machine is in this state. (处于RUNABLE状态的线程可能正在执行,也可能正在等待CPU为它分配执行时间)
BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.
WAITING
A thread that is waiting indefinitely for another thread to perform a particular action is in this state. (处于WAITING状态的线程会一直等待,直到其他线程唤醒它,比如Process类的waitFor()方法会使当前线程进入 WAITING 状态)
TIMED_WAITING
A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state. (过指定的一段时间后,会由系统自动唤醒,比如Process类的waitFor(long timeout, TimeUnit unit)方法会使当前线程进入 TIMED_WAITING 状态)
TERMINATED
A thread that has exited is in this state.

java_6_thread_states.png

Figure 1: Java线程状态转换图

参考:
https://docs.oracle.com/javase/7/docs/api/java/lang/Thread.State.html
http://www.uml-diagrams.org/examples/java-6-thread-state-machine-diagram-example.html

2.2.1 BLOCKED VS. WATING

In the BLOCKED state, a thread is about to enter a synchronized block, but there is another thread currently running inside a synchronized block on the same object. The first thread must then wait for the second thread to exit its block.
In the WAITING state, a thread is waiting for a signal from another thread. This happens typically by calling Object.wait(), or Thread.join(). The thread will then remain in this state until another thread calls Object.notify().

3 线程同步

3.1 Intrinsic Lock(synchronized关键字)

Java从语言角度实现了 Monitor ,可以认为Java中每一个对象都有一个内部的锁(Intrinsic Lock)。

3.1.1 synchronized修饰方法

用synchronized修饰方法,相当于用“对象的内部锁”保护整个方法,这样该方法同时只能有一个线程进入。

用synchronized修饰方法的用法如下:

public synchronized void method() {
  // somethings
}

相当于下面用synchronized修饰代码块:

public void method() {
  synchronized(this) {
    // somethings
  }
}

显然,如果一个类有多个synchronized修饰的方法,那么同时只能有一个线程进入这些方法中的一个。
参考:http://stackoverflow.com/questions/3160023/do-two-synchronized-methods-execute-simultaneously

3.1.2 synchronized修饰代码块

用synchronized修饰代码块的用法如下:

synchronized(obj1) {
  // somethings (critical section)
}

当线程进入上面代码块中时,会获得“对象obj1的内部锁”,这样同时只有一个线程进入synchronized修饰的代码块中。

synchronized修饰代码块可以提供细粒度的并行控制。如:

public class MsLunch {
    private long c1 = 0;
    private long c2 = 0;
    private Object lock1 = new Object();
    private Object lock2 = new Object();

    public void inc1() {
        synchronized(lock1) {
            c1++;
        }
    }

    public void inc2() {
        synchronized(lock2) {
            c2++;
        }
    }
}

参考:https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html

3.1.3 wait, notify, notifyAll(附阻塞队列的简单实现)


Table 1: Java中每个对象都有wait, notify, notifyAll
method 说明
wait 使当前线程进入等待状态(该方法只能在synchronized方法或代码块中调用),且会释放对应的Intrinsic Lock
notify 随机唤醒一个在该对象上调用wait方法的线程(只能在synchronized方法或代码块中调用)
notifyAll 唤醒所有在该对象上调用wait方法的线程(只能在synchronized方法或代码块中调用)

synchronized/wait/notify的简单实例:

public class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while(queue.size() == capacity) {
            wait();
        }

        queue.add(element);
        notify(); // notifyAll() for multiple producer/consumer threads
    }

    public synchronized T take() throws InterruptedException {
        while(queue.isEmpty()) {
            wait();
        }

        T item = queue.remove();
        notify(); // notifyAll() for multiple producer/consumer threads
        return item;
    }
}

参考:
http://stackoverflow.com/questions/2536692/a-simple-scenario-using-wait-and-notify-in-java
https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html

3.2 ReentrantLock

一般地,Intrinsic Lock(即使用synchronized关键字)可以满足大部分对锁的需求。

和Intrinsic Lock相比,ReentrantLock(在Java SE 5.0中引入)的功能更加丰富,比如:(1) ReentrantLock支持tryLock()操作;(2)ReentrantLock支持“公平锁”。

用ReentrantLock保护代码块的基本结构如下:

private ReentrantLock myLock = new ReentrantLock();

public void fun1() {
    // some code
    myLock.lock();
    try {
        // critical section.
    } finally {
        myLock.unlock();         // 千万不要忘记unlock()。放入finally子句中,有异常抛出时也会执行unlock()。
    }
    // some code
}

上面结构确保任何时刻只有一个线程进入临界区。一旦一个线程得到了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象。

说明1:ReentrantLock从其“名字”上看,叫“可重入锁”(注:Intrinsic Lock也是可重入锁,“可重入锁”比“不可重入锁”更加安全)。“可重入锁”的意思是同一个线程可以重复地获得已经持有的锁。锁保持一个“持有计数”来跟踪对lock方法的嵌套调用。线程在每一次调用lock都要调用unlock来释放锁。所以, 被一个可重入锁保护的代码可以调用另一个使用相同锁的方法。
说明2:pthread的互斥锁默认为“不可重入的”(如果线程获得了锁,当这个线程再次获得同一个锁时会死锁),不过可以通过设置PTHREAD_MUTEX_RECURSIVE来使其变为可重入锁,详情可参考 man pthread_mutexattr_init

3.3 ReentrantReadWriteLock(读/写锁)

如果对数据进行“读”操作的线程远多于对其进行“写”操作的线程时,读/写锁可以提供更好的并发性。

下面是使用读/写锁的必要步骤:

1. Construct a ReentrantReadWriteLock object:
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

2. Extract the read and write locks:
    private Lock readLock = rwl.readLock();          // 多个读操作可以共用一个“读锁”,“读锁”会排斥写操作。
    private Lock writeLock = rwl.writeLock();        // “写锁”会排斥其他的读操作和写操作。

3. Use the read lock in all accessors:
    public double getTotalBalance()
    {
    readLock.lock();
    try { . . . }
    finally { readLock.unlock(); }
    }

4. Use the write lock in all mutators:
    public void transfer(. . .)
    {
    writeLock.lock();
    try { . . . }
    finally { writeLock.unlock(); }
    }

3.4 StampedLock(Java 8中引入)

Java 8中引入了 StampedLock ,提供了 writeLock()readLock() 方法,可以用来代替 ReentrantReadWriteLock。除此外, StampedLock提供了 tryOptimisticRead() 方法,用它可实现“乐观读”,当读操作多于写操作时(这往往是真实的场景),可以得到比ReentrantReadWriteLock更好的性能。

参考:Phaser And StampedLock Concurrency Synchronizers, by Dr Heinz M. Kabutz

3.4.1 tryOptimisticRead应用实例:实现乐观读

用tryOptimisticRead实现乐观读的基本过程为:先尝试读,读完检查有没有写操作发生,如果有则获取悲观锁(读锁)并重新读。 其代码片断如下:

long stamp = lock.tryOptimisticRead(); /* 通过tryOptimisticRead得到一个票据,不会阻塞 */

// 在这里把需要用到的那些变量一一读取到对应的本地变量(如var1, var2, ...)中。

if(! lock.validate(stamp)){            /* 验证票据的有效性,若无效则不能使用var1, var2, ... */
    long stamp = lock.readLock();      /* 当票据无效时,使用readLock获得读锁,再重新读取 */
    try {
        // 重新把需要用到的那些变量一一读取到对应的本地变量(如var1, var2, ...)中。
    } finally {
        lock.unlock(stamp);
    }
}

// 这里可以安全地使用本地变量var1, var2, ...,它们肯定是系统中某时刻一致的数据。

注1:当排它锁(写锁)已经被其它线程占有时, lock.tryOptimisticRead() 会返回0。
注2: lock.validate(stamp) 在下面两种情况下会返回为false:(1)stamp为0;(2)获得stamp和调用 lock.validate(stamp) 的时间间隔内,排它锁(写锁)被其它线程获得。
注3:如果系统中读操作远大于写操作(这往往是真实的场景),则排它锁(写锁)很少被占有,这样上面代码中的 lock.validate(stamp) 绝大部分时候都会返回true, 则上面代码中基本上不会用到锁(读锁),这样可以大大提升性能。 但是,如果系统中写操作非常多, lock.validate(stamp) 经常返回false,则StampedLock对提升性能帮忙不大。

下面是 tryOptimisticRead() 的应用实例。

import java.util.concurrent.locks.StampedLock;

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlock(stamp);
        }
    }

    /* 计算到原点的距离。下面版本是“悲观读”(和使用ReentrantReadWriteLock类似) */
    double distanceFromOriginPessimisticVersion() {
        long stamp = sl.readLock();
        try {
            double currentX = x, currentY = y;
            return Math.sqrt(currentX * currentX + currentY * currentY);
        } finally {
            sl.unlock(stamp);
        }
    }

    /* 计算到原点的距离。下面是利用tryOptimisticRead实现的“乐观读”版本 */
    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) {
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlock(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

上面代码中, distanceFromOriginPessimisticVersion() 是一种悲观用法,而 distanceFromOrigin() 则是乐观用法。当读操作远大于写操作时, distanceFromOrigin() 可获得更好的性能。

3.4.2 StampedLock是不可重入的

StampedLock是不可重入的。

StampedLocks are designed for use as internal utilities in the development of thread-safe components. Their use relies on knowledge of the internal properties of the data, objects, and methods they are protecting. They are not reentrant, so locked bodies should not call other unknown methods that may try to re-acquire locks (although you may pass a stamp to other methods that can use or convert it).

3.5 volatile字段

volatile仅能提供“很弱的”同步。相关知识点,请参考Java内存模型。

3.5.1 Double-Checked Locking using Volatile

// Works under JDK5 and later. This does not work under JDK4 and earlier.
class Foo {
      private volatile Helper helper = null;
      public Helper getHelper() {
          if (helper == null) {
              synchronized(this) {
                  if (helper == null)
                      helper = new Helper();
              }
          }
          return helper;
      }
}

参考:http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html

4 阻塞队列(Blocking Queue)

对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。

阻塞队列(BlockingQueue)扩展了Queue,增加了可阻塞的插入(put)和获取(take)操作。如果队列是空的,一个获取(take)操作会一直阻塞直到队列中存在可用元素;如果队列是满的(对于有界队列),插入(put)操作会一直阻塞直到队列中存在可用空间。阻塞队列在生产者——消费者设计中非常有用。 在节 3.1.3 中有阻塞队列的一个基本实现。

The java.util.concurrent package has the following implementations of the BlockingQueue interface:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingDeque
  • LinkedBlockingQueue
  • LinkedTransferQueue
  • PriorityBlockingQueue
  • SynchronousQueue

参考:
JAVA并发编程实践,5.3 阻塞队列和生产者—消费者模式
Java核心技术 卷I 基础知识(原书第9版),14.6 阻塞队列

4.1 阻塞队列实例:寻找目录及其所有子目录中包含某关键字的文件

下面是一个实例展示了如何使用阻塞队列来控制一组线程。程序在一个目录及它的所有子目录中搜索所有文件,打印出包含指定关键字的行。

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.Scanner;

/**
 * @version 1.01 2012-01-26
 * @author Cay Horstmann
 */
public class BlockingQueueTest
{
  public static void main(String[] args)
  {
    Scanner in = new Scanner(System.in);
    System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
    String directory = in.nextLine();
    System.out.print("Enter keyword (e.g. volatile): ");
    String keyword = in.nextLine();

    final int FILE_QUEUE_SIZE = 10;
    final int SEARCH_THREADS = 100;

    BlockingQueue<File> queue = new
      ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

    FileEnumerationTask enumerator = new
      FileEnumerationTask(queue, new File(directory));
    new Thread(enumerator).start();
    for (int i = 1; i <= SEARCH_THREADS; i++)
      new Thread(new SearchTask(queue, keyword)).start();
  }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable
{
  public static File DUMMY = new File("");
  private BlockingQueue<File> queue;
  private File startingDirectory;

  /**
   * Constructs a FileEnumerationTask.
   * @param queue the blocking queue to which the enumerated files are added
   * @param startingDirectory the directory in which to start the enumeration
   */
  public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
  {
    this.queue = queue;
    this.startingDirectory = startingDirectory;
  }

  public void run()
  {
    try {
      enumerate(startingDirectory);
      queue.put(DUMMY);
    } catch (InterruptedException e) {
    }
  }

  /**
   * Recursively enumerates all files in a given directory and its subdirectories.
   * @param directory the directory in which to start
   */
  public void enumerate(File directory) throws InterruptedException
  {
    File[] files = directory.listFiles();
    for (File file : files) {
      if (file.isDirectory()) enumerate(file);
      else queue.put(file);
    }
  }
}
/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable
{
  private BlockingQueue<File> queue;
  private String keyword;

  /**
   * Constructs a SearchTask.
   * @param queue the queue from which to take files
   * @param keyword the keyword to look for
   */
  public SearchTask(BlockingQueue<File> queue, String keyword)
  {
    this.queue = queue;
    this.keyword = keyword;
  }

  public void run()
  {
    try {
      boolean done = false;
      while (!done) {
        File file = queue.take();
        if (file == FileEnumerationTask.DUMMY) {
            queue.put(file);
            done = true;
        } else search(file);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
    }
  }

  /**
   * Searches a file for a given keyword and prints all matching lines.
   * @param file the file to search
   */
  public void search(File file) throws IOException
  {
    Scanner in = new Scanner(new FileInputStream(file));
    int lineNumber = 0;
    while (in.hasNextLine()) {
      lineNumber++;
      String line = in.nextLine();
      if (line.contains(keyword))
        System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
    }
    in.close();
  }
}

在上面程序中,生产者线程枚举在所在子目录下的所有文件并把它们放到一个阻塞队列中。同时启动了大量搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印所有包含该关键字的行,然后取出下一个文件。在这个程序中我们使用了一个小技巧在工作结束后终止这个应用程序:为了发出完成信号,生产者线程放置一个虚拟对象(DUMMY)到队列中(这就像行李传送带上放一个写着“最后一个包”的虚拟包)。当搜索线程取到这个虚拟对象时,将其放回并终止。

注意:这里不需要显式的线程同步。在这个应用程序中,我们使用阻塞队列作为一种同步机制。

5 Liveness

5.1 死锁(deadlock)

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象。

5.2 饥饿(Starvation)与活锁(Livelock)

饥饿(Starvation)描述了一个线程由于得不到足够的资源而不能执行程序的现象。 当共享资源由一个“贪婪”的线程长时间占用时,其它线程可能出现“饥饿”。 例如,假设一个对象有一个同步的方法,需要很长时间才能返回。如果一个线程经常执行这个方法,其他需要同样访问这个资源的线程就有可能出现阻塞的情况。

活锁(Livelock)是指线程1可以使用资源,但它很礼貌,让其他线程先使用资源;而线程2也可以使用资源,但它也很礼貌,也让其他线程先使用资源。这样你让我,我让你,最后两个线程都无法使用资源。例如,马路中间有条小桥,只能容纳一辆车经过,桥两头开来两辆车A和B,A比较礼貌,示意B先过,B也比较礼貌,示意A先过,结果两人一直谦让谁也过不去。

参考:
Java - Thread Livelock: http://www.logicbig.com/tutorials/core-java-tutorial/java-multi-threading/thread-livelock/
Starvation and Livelock: http://docs.oracle.com/javase/tutorial/essential/concurrency/starvelive.html
《现代操作系统(原书第3版)》,6.7.3 活锁,6.7.4 饥饿

6 Callable, Future, FutureTask

我们知道,Runnable封装一个异步运行的任务,可以把它想象为一个没有参数、没有返回值的异步方法。

Callable与Runnable类似,但Callable有返回值。 Callable接口是一个参数化的类型,只有一个方法call,如下所示:

public interface Callable<V>
{
    V call() throws Exception;
}

类型参数是返回值的类型。如Callable<Integer>表示“一个最终返回Integer对象的异步计算”。Callable常和Future一起使用。

Future保存着异步计算的结果(可以理解为Future是一个“占位符”,计算完成后的结果会保存在Future中)。 Java中Future接口如下,它有5个方法:

public interface Future<V> {
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}
Table 2: Future接口中的方法
Future<V>接口中的方法 方法说明
V get() Waits if necessary for the computation to complete, and then retrieves its result.
V get(long timeout, TimeUnit unit) Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
boolean cancel(boolean mayInterruptIfRunning) Attempts to cancel execution of this task.
boolean isCancelled() Returns true if this task was cancelled before it completed normally.
boolean isDone() Returns true if this task completed.

调用Future接口中无参数的get方法会阻塞当前线程,直到计算完成。

6.1 FutureTask

FutureTask包装器将Callable转换成Future和Runnable,它同时实现二者的接口。 这样,我们可以方便地指定一个Callable对象给新线程了(我们知道“用new Thread创建线程时,只能指定Runnable对象”,有了FutureTask包装器,就可以指定Callable对象给新线程;后文将说明如何使用线程池,由于线程池可以直接提交Callable对象,所以使用线程池时一般不需要FutureTask包装器)。

FutureTask使用方法大致如下:

Callable<Integer> myComputation = . . .;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);         // it's a Runnable
t.start();
...
Integer result = task.get();         // it's a Future

6.2 实例:计算目录及其所有子目录中包含某关键字的文件个数

下面例子将计算目录及其所有子目录中包含某关键字的文件个数。

import java.io.*;
import java.util.*;
import java.util.concurrent.*;
/**
 * @version 1.01 2012-01-26
 * @author Cay Horstmann
 */
public class FutureTest
{
  public static void main(String[] args)
  {
    Scanner in = new Scanner(System.in);
    System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");
    String directory = in.nextLine();
    System.out.print("Enter keyword (e.g. volatile): ");
    String keyword = in.nextLine();

    MatchCounter counter = new MatchCounter(new File(directory), keyword);
    FutureTask<Integer> task = new FutureTask<>(counter);
    Thread t = new Thread(task);
    t.start();
    try {
      System.out.println(task.get() + " matching files.");
    } catch (ExecutionException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
    }
  }
}

/**
 * This task counts the files in a directory and its subdirectories that contain a given keyword.
 */
class MatchCounter implements Callable<Integer>
{
  private File directory;
  private String keyword;
  private int count;

  /**
   * Constructs a MatchCounter.
   * @param directory the directory in which to start the search
   * @param keyword the keyword to look for
   */
  public MatchCounter(File directory, String keyword)
  {
    this.directory = directory;
    this.keyword = keyword;
  }

  public Integer call()
  {
    count = 0;
    try {
      File[] files = directory.listFiles();
      List<Future<Integer>> results = new ArrayList<>();

      for (File file : files)
        if (file.isDirectory()) {
          // 对于每一个子目录,产生一个新的MatchCounter并为它启动一个线程。
          MatchCounter counter = new MatchCounter(file, keyword);
          FutureTask<Integer> task = new FutureTask<>(counter);
          results.add(task);
          Thread t = new Thread(task);
          t.start();
        } else {
          if (search(file)) count++;
        }

      for (Future<Integer> result : results)
        try {
          count += result.get();         // 每一次对get的调用都会发生阻塞直到对应的call返回,结果可获得为止。
                                         // 当然线程是并行运行的,因此,很可能在大致相同的时刻所有结果都可获得。
        } catch (ExecutionException e) {
          e.printStackTrace();
        }
    } catch (InterruptedException e) {
    }
    return count;
  }

  /**
   * Searches a file for a given keyword.
   * @param file the file to search
   * @return true if the keyword is contained in the file
   */
  public boolean search(File file)
  {
    try {
      Scanner in = new Scanner(file);
      boolean found = false;
      while (!found && in.hasNextLine()) {
        String line = in.nextLine();
        if (line.contains(keyword)) found = true;
      }
      in.close();
      return found;
    } catch (IOException e) {
      return false;
    }
  }
}

说明:这个实例不是很好,因为它为每个目录产生一个线程,这样可能产生大量的生命期很短的线程。可以用“线程池”来避免大量生命期很短的线程,后文将介绍如何用“线程池”实现和这个实现相同的功能。

6.3 Futures and promises

Futures and promises are pretty similar concepts, the difference is that a future is a read-only container for a result that does not yet exist, while a promise can be written (normally only once). The Java 8 CompletableFuture can be thought of as promises.

参考:http://stackoverflow.com/questions/14541975/difference-between-future-and-promise

6.4 CompletableFuture(Java 8中引入)

6.4.1 为什么需要CompletableFuture

Future是在Java 5中引入的类,用来描述一个异步计算的结果。你可以调用 isDone() 方法来检查计算是否完成,或者使用 get() 阻塞调用线程,直到计算完成并返回结果。不过,这两种获取Future结果的方式都不优雅,前一种属于“轮询”,后一种会“阻塞”。轮询的方式会消耗CPU资源,而阻塞的方式和我们的异步编程的初衷相违背。难道不能用观察者模式当计算结果完成后通知监听者来进行下一步处理?

很多语言,比如Node.js采用回调方式实现异步编程。一些Java库也实现了相应的观察者模式使得Future计算结果完成后自动进行下一步处理,如Google guava提供 ListenableFuture 以及辅助类 Futures 等,下面介绍Google guava中这两个类的使用:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

// Test with Google guava 24.0
public class ListenableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        // ExecutorService返回普通的Future,ListeningExecutorService才能返回ListenableFuture
        // 通过MoreExecutors.listeningDecorator可以将ExecutorService转换为ListeningExecutorService
        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        final ListenableFuture<Integer> listenableFuture = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(3);          // 模拟耗时的任务
                return 100;
            }
        });

        // // 方式一:使用ListenableFuture的addListener方法
        // listenableFuture.addListener(new Runnable() {
        //     @Override
        //     public void run() {                   // 当listenableFuture结束后会调用它
        //         try {
        //             System.out.println("Get listenable future's result: " + listenableFuture.get());
        //         } catch (InterruptedException e) {
        //             e.printStackTrace();
        //         } catch (ExecutionException e) {
        //             e.printStackTrace();
        //         }
        //     }
        // }, MoreExecutors.directExecutor());

        // 方式二(代码更清晰,推荐):使用Futures.addCallback
        Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) { // 当listenableFuture成功时,会调用它
                System.out.println("Get listenable future's result: " + result);
            }

            @Override
            public void onFailure(Throwable t) {    // 当listenableFuture失败时,会调用它
                t.printStackTrace();
            }
        }, MoreExecutors.directExecutor());

        pool.shutdown();
    }
}

运行上面程序,会输出:

Get listenable future's result: 100

6.4.2 CompletableFuture基本实例

在Java 8中引入了 CompletableFuture ,包含了Google guava中类ListenableFuture的功能(还包含一些其他功能)。这样,不用第三方库也能编写优雅的异步执行代码了。

下面是CompletableFuture简单实例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureTest {

    private static int myTask() {
        try {
            TimeUnit.SECONDS.sleep(3);           // 模拟耗时的任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 100;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> myTask());
        CompletableFuture<Integer> f = completableFuture.thenApply(i -> i * 2).thenApply(i -> i + 1);

        System.out.println("Get final result: " + f.get());   // Get final result: 201
    }

}

注1:当completableFuture完成后,才会执行 completableFuture.thenApply 中参数指定的方法(这个过程类似于注册回调函数)。
注2: thenApply 会返回CompletableFuture对象, thenApply 可以连用(上面例子中有两个 thenApply 连用)。

除方法 thenApply 外,还有其它一些方法也可实现当completableFuture完成后执行相关动作的功能。如表 3 所示。

Table 3: Adding an Action to a CompletableFuture<T> Object
方法 参数 参数助记 说明
thenApply Function<? super T, ? extends U> T -> U Apply a function to the result
thenCompose Function<? super T, ? extends CompletionStage<U>> T -> CompletableFuture<U> Invoke the function on the result and execute the returned future.
handle BiFunction<? super T, Throwable, ? extends U> (T, Throwable) -> U Process the result or error.
thenAccept Consumer<? super T> T -> void Like thenApply, but with void result.
whenComplete BiConsumer<? super T, ? super Throwable> (T, Throwable) -> void Like handle, but with void result.
thenRun Runnable Runnable Execute the Runnable with void result.

注:表 3 中每个方法都存在Async版本(如 thenApply 还存在Async版本 thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) ),不过没有把它们展示在此表中。

6.4.3 CompletableFuture: Combine Multiple Futures

CompletableFuture中提供了一些方法可以组合多个futures,如 CompletableFuture.allOf 会得到一个组合的future,当所有的futures都完成后,这个组合的future才会完成。如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureTest {

    private static int myTask() {
        try {
            TimeUnit.SECONDS.sleep(3);             // 模拟耗时的任务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 100;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> myTask());
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> myTask());
        CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> myTask());

        CompletableFuture<Void> combinedFuture =
               CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);

        combinedFuture.get();     // 当3个CompletableFuture都结束后,才会返回

        Integer sum = completableFuture1.get() + completableFuture2.get() + completableFuture3.get();
        System.out.println("Get final result: " + sum);   // Get final result: 300
    }

}

CompletableFuture还提供了其它一些组合多个future的方式,如表 4 所示。

Table 4: Combining Multiple Composition Objects
方法 参数助记 说明
thenCombine CompletableFuture<U>, (T, U) -> V Execute both and combine the results with the given function.
thenAcceptBoth CompletableFuture<U>, (T, U) -> void Like thenCombine, but with void result.
runAfterBoth CompletableFuture<?>, Runnable Execute the runnable after both complete.
applyToEither CompletableFuture<T>, T -> V When a result is available from one or the other, pass it to the given function.
acceptEither CompletableFuture<T>, T -> void Like applyToEither, but with void result.
runAfterEither CompletableFuture<?>, Runnable Execute the runnable after one or the other completes.
static allOf CompletableFuture<?>… Complete with void result after all given futures complete.
static anyOf CompletableFuture<?>… Complete with void result after any of the given futures completes.

7 Executors(可方便构建“线程池”)

构建新的线程需要一定的代价,如果程序中创建大量的生命期很短的线程,应该使用线程池。 一个线程池中包含许多准备运行的空闲线程。将Runnable对象交给线程池,就会有一个线程调用run方法。当run方法退出时,线程不会死亡,而是在线程池中准备为下一个请求提供服务。

执行器(Executor)类有许多静态工厂方法用来构建线程池,如表 5 所示。

Table 5: Executors Factory Methods
Method Description
ExecutorService newCachedThreadPool() 必要时创建新线程,空闲线程会被保留60秒
ExecutorService newFixedThreadPool(int threads) 该池包含固定数量的线程,空闲线程会一直被保留
ExecutorService newSingleThreadExecutor() 只有一个线程的“池”,该线程顺序执行每一个提交的任务
ScheduledExecutorService newScheduledThreadPool(int threads) 用于预定执行而构造的固定线程池,是java.util.Timer的替代
ScheduledExecutorService newSingleThreadScheduledExecutor() 用于预定执行而构建的单线程“池”

7.1 线程池

通过newCachedThreadPool/newFixedThreadPool/newSingleThreadExecutor创建了线程池后,可以用下面的submit方法之一将一个Runnable对象或者Callable对象提交给ExecutorService:

// 使用submit方法可向线程池提交任务,线程池会在方便的时候尽早地安排线程来执行提交的任务。
Future<?> submit(Runnable task)            // 对返回的Future对象调用get方法时,会在完成的时候简单地返回null
Future<T> submit(Runnable task, T result)  // 对返回的Future对象调用get方法时,会在完成的时候返回result
Future<T> submit(Callable<T> task)         // 对返回的Future对象调用get方法时,会在完成的时候返回Callable对象的call方法的返回值

该线程池会在方便的时候尽早地执行提交的任务。调用submit时,会得到一个Future对象,可用来查询该任务的状态。

当不再需要提交任务时,可以调用 shutdown 来关闭线程池。被关闭的线程池不再接受新的任务,当所有任务都完成后,线程池中线程会死亡。另一种方法是调用 shutdownNow ,它会取消尚未开始的所有任务并试图中断正在运行的线程。

说明:用newScheduledThreadPool/newSingleThreadScheduledExecutor创建的线程池,通过调用schedule等方法可以在预定时间执行任务。

7.1.1 线程池核心类ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的类(后方将介绍,newCachedThreadPool/newFixedThreadPool/newSingleThreadExecutor等创建线程池的方法其实都是调用的ThreadPoolExecutor来实现的)。前面介绍过,我们使用表 5 中的方法即可方便地创建线程池,一般无需直接使用ThreadPoolExecutor类来创建线程池。不过,为了更好地理解线程池,有必要了解一下ThreadPoolExecutor类。

ThreadPoolExecutor提供了四个构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以发现,前三个构造函数都是调用最后一个构造函数来实现的。ThreadPoolExecutor构造函数的参数说明:

  • corePoolSize:核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(true)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位;
  • workQueue:一个阻塞队列,用来存储等待执行的任务;
  • threadFactory:执行程序创建新线程时使用的工厂;
  • handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

说明:newCachedThreadPool/newFixedThreadPool/newSingleThreadExecutor等创建线程池的方法其实都是调用的ThreadPoolExecutor来实现的。如:

// source code of newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

// source code of newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
7.1.1.1 获取线程池的基本属性

下面是获取线程池的基本属性的例子:

// file ThreadPoolAttrTest.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPoolAttrTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(3);
        ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;

        // 线程池中当前线程的数量
        System.out.println("PoolSize\t:" + tpe.getPoolSize());
        // 线程池中,保持激活的最小work数量,默认为
        System.out.println("CorePoolSize\t:" + tpe.getCorePoolSize());
        // 在线程池中同时出现的,最大线程数量
        System.out.println("LargestPoolSize\t:" + tpe.getLargestPoolSize());
        // 线程池允许的最大线程数量
        System.out.println("MaximumPoolSize\t:" + tpe.getMaximumPoolSize());
        // 对已提交的一个大概的任务数量
        System.out.println("TaskCount\t:" + tpe.getTaskCount());
        // 正在执行的任务数量
        System.out.println("ActiveCount\t:" + tpe.getActiveCount());
        // 执行完的任务数量
        System.out.println("CompletedTaskCount\t:" + tpe.getCompletedTaskCount());
        // 等待的任务数,处于等待执行的任务,会被放在队列当中
        System.out.println("Queue Size\t:" + tpe.getQueue().size());

    }
}

测试上面程序,输出:

PoolSize	:0
CorePoolSize	:3
LargestPoolSize	:0
MaximumPoolSize	:3
TaskCount	:0
ActiveCount	:0
CompletedTaskCount	:0
Queue Size	:0

7.2 Callable, Future, Executor的用法举例

下面是一个Callable, Future, Executor的用法举例。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MyCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        // 实现业务逻辑,这里简单地返回线程名字
        Thread.sleep(1000);
        return Thread.currentThread().getName();
    }

    public static void main(String args[]) {
        ExecutorService executor = Executors.newFixedThreadPool(3);   // 创建包含3个线程的线程池。

        List<Future<String>> list = new ArrayList<Future<String>>();

        // Create MyCallable instance
        Callable<String> callable = new MyCallable();
        for(int i=0; i< 10; i++) {
            Future<String> future = executor.submit(callable);        // 向线程池提交任务。
            list.add(future);            // 把future保存到列表中,以便后面引用。
        }

        for(Future<String> fut : list) {
            try {
                // print the return value of Future, notice the output delay in console
                // because Future.get() waits for task to get completed
                String ret = fut.get();   // fut.get()会阻塞直到对应call方法返回,且call方法的返回值就是get()返回值。
                System.out.println(ret);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();
    }
}

上面程序的测试输出如下(你可能得到不一样的结果):

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3

7.3 线程池实例:计算目录及其所有子目录中包含某关键字的文件个数

这个例子和前面介绍的例子功能相同(即计算目录及其所有子目录中包含某关键字的文件个数),只是用线程池实现。

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * @version 1.01 2012-01-26
 * @author Cay Horstmann
 */
public class ThreadPoolTest
{
  public static void main(String[] args) throws Exception
  {
    Scanner in = new Scanner(System.in);
    System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");
    String directory = in.nextLine();
    System.out.print("Enter keyword (e.g. volatile): ");
    String keyword = in.nextLine();

    ExecutorService pool = Executors.newCachedThreadPool();
    MatchCounter counter = new MatchCounter(new File(directory), keyword, pool);
    Future<Integer> result = pool.submit(counter);

    try {
      System.out.println(result.get() + " matching files.");
    } catch (ExecutionException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
    }
    pool.shutdown();

    int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
    System.out.println("largest pool size=" + largestPoolSize);
  }
}

/**
 * This task counts the files in a directory and its subdirectories that contain a given keyword.
 */
class MatchCounter implements Callable<Integer>
{
  private File directory;
  private String keyword;
  private ExecutorService pool;
  private int count;

  /**
   * Constructs a MatchCounter.
   * @param directory the directory in which to start the search
   * @param keyword the keyword to look for
   * @param pool the thread pool for submitting subtasks
   */
  public MatchCounter(File directory, String keyword, ExecutorService pool)
  {
    this.directory = directory;
    this.keyword = keyword;
    this.pool = pool;
  }

  public Integer call()
  {
    count = 0;
    try {
      File[] files = directory.listFiles();
      List<Future<Integer>> results = new ArrayList<>();

      for (File file : files) {
        if (file.isDirectory()) {
          MatchCounter counter = new MatchCounter(file, keyword, pool);
          Future<Integer> result = pool.submit(counter);
          results.add(result);
        }
        else {
          if (search(file)) count++;
        }
      }

      for (Future<Integer> result : results) {
        try {
          count += result.get();
        } catch (ExecutionException e) {
          e.printStackTrace();
        }
      }
    }
    catch (InterruptedException e) {
    }
    return count;
  }

  /**
   * Searches a file for a given keyword.
   * @param file the file to search
   * @return true if the keyword is contained in the file
   */
  public boolean search(File file)
  {
    try {
      Scanner in = new Scanner(file);
      boolean found = false;
      while (!found && in.hasNextLine()) {
        String line = in.nextLine();
        if (line.contains(keyword)) found = true;
      }
      in.close();
      return found;
    } catch (IOException e) {
      return false;
    }
  }
}

7.4 Fork/Join框架

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

Fork/Join是Java 7中提供的一个用于并行执行任务的框架,它是一个“把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果”的框架。

下面是使用Fork/Join框架的基本步骤:
第一步,定义一个ForkJoinTask(它是Future接口的一个实现,同时还增加了其它的方法)任务,它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个常用子类:(1) RecursiveAction:用于没有返回结果的任务(即不关心任务的返回值);(2) RecursiveTask:用于有返回结果的任务。
在定义自己的ForkJoinTask时,你需要实现compute()方法,它用来定义如何分割以及合并任务。一般地,compute()方法的实现类似于下面的伪代码:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces                   // fork 步骤
  invoke the two pieces and wait for the results  // join 步骤

第二步,通过ForkJoinPool(它是ExecutorService接口的一个实现)提交前面定义的任务即可。ForkJoinPool中有三种方法来提交ForkJoinTask任务:

public void execute(ForkJoinTask<?> task)                // 提交任务(异步地)。
public <T> T invoke(ForkJoinTask<T> task)                // 执行任务,且等待执行完成。返回任务的compute()方法的返回值。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)  // 提交任务(异步地)。返回ForkJoinTask,在返回的ForkJoinTask上调用get()方法可等待任务结束并得到任务的compute()方法的返回值。

参考:
Oracle Fork/Join Tutorial
Fork and Join: Java Can Excel at Painless Parallel Programming Too!

7.4.1 Fork/Join简单实例:计算连续自然数之和

连续自然数之和的计算有现成的解析公式,这里仅是演示Fork/Join框架的基本使用。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class CalcTask extends RecursiveTask<Long> {

    private static final long THRESHOLD = 100;
    private long start;
    private long end;

    public CalcTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        if ((end - start) < THRESHOLD) {
            // 如果任务比较小,则直接计算。
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            long middle = (start + end) / 2;
            CalcTask leftTask = new CalcTask(start, middle);
            CalcTask rightTask = new CalcTask(middle + 1, end);

            // 分割为两个子任务。
            leftTask.fork();
            rightTask.fork();

            // 等待子任务执行完成,并得到子任务的结果。
            long leftResult = leftTask.join();     // 对某个子任务执行join。当前线程无法继续了(因为要等待子任务的结果),
                                                   // 当前线程保存了主任务相关信息后,就能抽身为其他任务提供服务了,
                                                   // 一旦所依赖的子任务解决了,主任务就可以接着往下走。
            long rigthResult = rightTask.join();

            // 合并子任务的结果。
            sum = leftResult + rigthResult;
        }
        return sum;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();          // or: ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();  // need Java 8
        Long sum = forkJoinPool.invoke(new CalcTask(0, 1000));   // or: Long sum = forkJoinPool.submit(new CalcTask(0, 1000)).get();
        System.out.print(sum);     // 会输出500500
    }
}
7.4.1.1 “计算连续自然数之和”的普通ExecutorService实现

对于上面的“计算连续自然数之和”这个例子,不使用Fork/Join,使用普通的ExecutorService也可以实现。代码如下:

import java.util.*;
import java.util.concurrent.*;
import java.util.Arrays;

public class Sums implements Callable<Long> {

    private final long start;
    private final long end;

    Sums(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public Long call() {
        long sum = 0;
        for (long i = start; i <= end; i++) {
            sum = sum + i;
        }
        return sum;
    }

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(4);
        List<Future<Long>> results = executor.invokeAll(Arrays.asList(new Sums(0, 200),
                new Sums(201, 400),
                new Sums(401, 600),
                new Sums(601, 800),
                new Sums(801, 1000)));   // 分为5个子任务
        executor.shutdown();

        long sum = 0;
        for (Future<Long> result : results) {
            sum += result.get();
        }
        System.out.println(sum);    // 会输出500500
    }
}

7.4.2 Fork/Join比普通ExecutorService的好处在于——它采用了Work stealing算法,可充分利用线程资源

我们先简单地回顾一下普通的ExecutorService模型(普通的线程池模型)的工作方式。
以银行的工作模式为例,假设银行的一个员工同时只能为一个顾客提供服务,顾客比较多时可能需要排队等候。而且,银行员工非常地有耐心,他永远不会要正在享受服务的顾客离开,即使正在享受服务的顾客在等待其他事情发生时,银行员工也会一直等待这个顾客。如图 2 所示。

java_concurrency_customers_waiting_for_their_turn.png

Figure 2: Customers waiting for their turn

如果我们用普通的ExecutorService模型(普通的线程池模型)来表示图 2 所示模型的话,可以对应地表示为图 3 所示。

java_concurrency_threads_and_tasks.png

Figure 3: 普通ExecutorService模型的Threads and Tasks

现成考虑一个稍微复杂的情况。Empolyee 1正在给Customer 1提供服务,但是Customer 1需要从Customer 6中得到一些信息。这时,Customer 1叫Customer 6来银行,Customer 6排队等待空闲的银行员工给它提供服务。如图 4 所示。

java_concurrency_old_customer_waiting_for_new_customer.png

Figure 4: Old Customer waiting for New Customer

前面提到过,银行员工是非常地有耐心的,所以Empolyee 1一直被Customer 1占用着,直到Customer 6被其它银行员工服务过,Customer 1才能得到Customer 6的相应信息,Empolyee 1才有机会完成对Customer 1的服务。 显然,这种工作方式有如下明显的缺点:如果全部的员工都在给客户提供服务,且所有客户都在等待其它事件发生,这时,银行员工将一直被占用着,无法给其它新客户提供服务,出现了死锁!比如,前面例子中,又假设Customer 2需要从Customer 7中得到一些信息,Customer 3需要从Customer 8中得到一些信息,这时,Empolyee 1/2/3分别被Customer 1/2/3占用着,而Customer 6/7/8一直不会被服务到,从而Customer 1/2/3的服务永远不会结束,出现了死锁。如果增加银行员工数量,也仅仅是减少了死锁发生的可能,不能彻底解决问题。

上面的死锁场景在一些递归算法中是很常见的,比如求解汉诺塔问题或者递归地遍历树结构(如求目录的总大小)。下面是一个死锁的例子:

// 这是一个使用 ExecutorService 处理不当导致死锁的例子!
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SumTask implements Callable<Long> {

    private static final long THRESHOLD = 100;
    private ExecutorService pool;

    private final long start;
    private final long end;

    SumTask(long start, long end, ExecutorService pool) {
        this.start = start;
        this.end = end;
        this.pool = pool;
    }

    @Override
    public Long call() {
        long sum = 0;
        if ((end - start) < THRESHOLD) {
            for (long i = start; i <= end; i++) {
                sum = sum + i;
            }
        } else {
            long middle = (start + end) / 2;
            SumTask leftTask = new SumTask(start, middle, pool);
            SumTask rightTask = new SumTask(middle + 1, end, pool);

            Future<Long> leftResult = pool.submit(leftTask);
            Future<Long> rigthResult = pool.submit(rightTask);

            try {
                sum = leftResult.get() + rigthResult.get();       // 当前线程会一直等待,直到 leftTask 和 rightTask 计算完成!
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return sum;
    }

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(4);
        Future<Long> result = executor.submit(new SumTask(0, 1000, executor));

        long sum = result.get();     // 这个例子中线程数(为4)太少,会死锁!!!
                                     // 把线程数设为较大数(比如20)后,在这个例子中不会死锁。
                                     // 但没有彻底解决问题,当任务量更大时(如计算0到1000000000000之和)还可能死锁。

        System.out.println(sum);

        executor.shutdown();
    }
}

参考:
Java Fork Join Example by Albert Attard
New and Improved CLR 4 Thread Pool Engine

7.4.2.1 Fork/Join框架的任务窃取(Work stealing)算法

前面介绍过,传统ExecutorService模型有个重要的缺点:当一个任务依赖于其它子任务时,当前线程无法为其它新任务提供服务,会一直等待直到所依赖的子任务被执行完。

Fork/Join框架采用了任务窃取(Work stealing)算法,目的是为了解决普通ExecutorService模型中线程被任务(任务在等待子任务的执行结果)长期占用的问题。

Fork/Join的线程池模型如图 5 所示。

java_concurrency_threads_and_their_queues.png

Figure 5: Threads and their Queues(Started tasks中用来存放“已经开始但是依赖于子任务的结果而无法继续执行”的任务)

每个线程都有它自己的任务队列(还有一个全局的任务队列没有画出来)。假设Thread 1正在执行Task 1,而Task 1可分为两个子任务Task 2和Task 3。即Task 1的主要代码逻辑为:

Task2.fork();                 // 执行这个fork()时,会把 Task 2 加入到线程自己的任务队列LIFO中
Task3.fork();                 // 执行这个fork()时,会把 Task 3 加入到线程自己的任务队列LIFO中

task2Result = Task2.join();   // 执行这个join()时,会把 Task 1 加入到Started tasks中。这时当前线程抽身出来了,可执行其它任务了
task3Result = Task3.join();

......

上面过程如图 6 所示。

java_concurrency_add_subtasks.png

Figure 6: Add subtasks Task 2 and Task 3

Thread 1空闲出来后,在自己的LIFO任务队列中找到最新的任务(这个例子中是Task 3),开始执行它。其它空闲线程(如Thread 2)可以从比较忙的线程(如Thread 1)的任务队列中“窃取”最旧的任务(比如从Thread 1的任务队列把Task 2“窃取”过来自己执行。这就是任务窃取(Work stealing)算法的基本过程。如图 7 所示。

java_concurrency_thread_2_steals_task_2.png

Figure 7: Thread 2 steals Task 2

说明:线程自己的任务队列一般设计为LIFO队列(后进先出双端队列),且自己执行LIFO中最新的任务,别人窃取执行LIFO中最旧的任务。根据局部性原理(principle of locality),这样做可以获得更好的性能。

8 Synchronizers(用于线程同步的辅助类,优先用它们同步线程)

java.util.concurrent中提供了一些类用来帮助人们管理相互合作的多个线程,参见表 6如果你发现的表 6 中有你想要实现的行为模式,你应该直接使用它们,而不要使用锁(如Intrinsic Lock,ReentrantLock等等)来“重新”实现(即不要重复造轮子)。

Table 6: Synchronizers in Java
它能做什么 备注
CyclicBarrier 允许线程集等待直到指定数目的线程到达一个公共屏障  
CountDownLatch 允许线程集等待直到计数器减为0  
Phaser 和CyclicBarrier, CountDownLatch类似,但灵活性更高 Java SE 7中引入
Exchanger 允许两个线程在要交换的对象准备好时交换对象  
Semaphore 允许线程集等待直到被允许继续运行为止  
SynchronousQueue 允许一个线程把对象交给另一个线程 它是BlockingQueue接口的一种实现

8.1 CountDownLatch(倒计时门栓)

一个倒计时门栓让一个线程集等待直到计数变为0。倒计时门栓是一次性的,一旦计数为0,就不能再重用了。调用countDownLatch.countDown()可以减少门栓的一个计数;调用countDownLatch.await()时线程会等待,直到countDownLatch中的计数到0之后就会恢复执行。

下面是使用两个倒计时门栓用作线程同步的一个实例(摘自:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html ):
Here is a pair of classes in which a group of worker threads use two countdown latches:

  1. The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
  2. The second is a completion signal that allows the driver to wait until all workers have completed.
import java.util.concurrent.CountDownLatch;

class Driver {
    private static final int N = 10;

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse1();       // don't let run yet
        startSignal.countDown();  // let all threads proceed
        doSomethingElse2();
        doneSignal.await();       // wait for all to finish
        doSomethingElse3();
    }

    private static void doSomethingElse1() {  }
    private static void doSomethingElse2() {  }
    private static void doSomethingElse3() {  }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();        // 阻止线程运行,直到倒计时门栓startSignal的计数为零时,才会恢复运行。
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        }
    }

    void doWork() {  }
}

8.2 CyclicBarrier(循环屏障)

CyclicBarrier 是一个同步辅助类,它可以使一组线程互相等待,直到到达某个公共屏障点(common barrier point)后又开始运行。

下面是如何使用它的一些细节。首先,构造一个循环屏障,并给出参与的线程数:

CyclicBarrier barrier = new CyclicBarrier(nthreads);

每一个线程完成一些准备工作后,在循环屏障上调用await:

public void run() {
  doWork();
  barrier.await();
  // ...
}

这样当所有线程都准备完毕(调用await后),才又开始运行。

8.2.1 CyclicBarrier实例:几个运动员都准备好后一起开始跑步

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * Java Program to demonstrate how to use CyclicBarrier, Its used when number
 * of threads needs to wait for each other before starting again.
 */
public class CyclicBarrierTest {
    public static void main(String args[]) throws InterruptedException, BrokenBarrierException {
        CyclicBarrier barrier = new CyclicBarrier(4);

        Player first = new Player(barrier, "Player1");
        Player second = new Player(barrier, "Player2");
        Player third = new Player(barrier, "Player3");
        Player fourth = new Player(barrier, "Player4");

        first.start();
        second.start();
        third.start();
        fourth.start();
    }
}

class Player extends Thread {
    private CyclicBarrier barrier;

    public Player(CyclicBarrier barrier, String name) {
        super(name);
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(Thread.currentThread().getName() + " is ready.");
            barrier.await();             // 在所有参与者(线程)都已经在此 barrier 上调用 await 方法之前,将一直等待。
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        // 所有的线程调用barrier.await后,才会执行到这里。
        System.out.println(Thread.currentThread().getName() + " has started running.");
        // Other code.
    }
}

运行上面程序,其输出为:

Player1 is ready.
Player4 is ready.
Player2 is ready.
Player3 is ready.
Player3 has started running.
Player1 has started running.
Player4 has started running.
Player2 has started running.

你得到的输出可能会不一样,但一定是所有Player都准备好后,才又开始运行。

8.3 Exchanger(交换器)

Exchanger类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

具体来说, 当线程A调用Exchange对象的exchange()方法后,它会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

下面是Exchanger的一个实例(摘自:http://www.javacodex.com/Concurrency/Exchanger-Example ):

import java.util.concurrent.*;

public class ExchangerExample {

    public static void main(String[] args) {

        Exchanger<String> exchanger = new Exchanger<String>();

        Thread t1 = new MyThread(exchanger, "I like coffee.");
        Thread t2 = new MyThread(exchanger, "I like tea");
        t1.start();
        t2.start();
    }
}

class MyThread extends Thread {

    Exchanger<String> exchanger;
    String message;

    MyThread(Exchanger<String> exchanger, String message) {
        this.exchanger = exchanger;
        this.message = message;
    }

    public void run() {
        try {
            System.out.println(this.getName() + " message: " + message);

            // exchange messages
            message = exchanger.exchange(message);

            System.out.println(this.getName() + " message: " + message);
        } catch (Exception e) {
        }
    }
}

运行上面程序的输出(你可能得到顺序不一样的输出)为:

Thread-1 message: I like tea
Thread-0 message: I like coffee.
Thread-0 message: I like tea
Thread-1 message: I like coffee.

9 Atomic Variables and Nonblocking Synchronization

参考:
Java Concurrency in Practice, Chapter 15. Atomic Variables and Nonblocking Synchronization

9.1 原子变量

原子变量(Atomic Variables)和volatile变量有相同的“happens-before”原则,也就是说对一个原子变量的“写操作”会“先行发生”于后面对这个原子变量的“读操作”。原子变量和volatile变量相比的优点在于原子变量支持原子操作(比如AtomicInteger类有原子的compareAndSet等方法),而volatile变量不能直接提供类似compareAndSet方法的原子性保证。

一般来说,原子变量比锁要更加轻量级。原子变量是实现高性能并发,非阻塞同步的基本工具。

下面是不使用原子变量实现多线程安全的计数器:

// 多线程安全计数器
class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public synchronized int value() {
        return c;
    }
}

用原子变量实现的多线程安全计数器:

// 用原子变量实现的多线程安全计数器
import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}

9.2 非阻塞同步(乐观的并发策略)

互斥同步(Mutual Exclusion&Synchronization)是常见的一种并发正确性保障手段。同步是指在多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一个(或者是一些,使用信号量的时候)线程使用。而互斥是实现同步的一种手段,临界区(Critical Section)、互斥量(Mutex)和信号量(Semaphore)都是主要的互斥实现方式。因此,在这4个字里面,互斥是因,同步是果;互斥是方法,同步是目的。

互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步(Blocking Synchronization)。从处理问题的方式上说, 互斥同步属于一种悲观的并发策略 , 总是认为只要不去做正确的同步措施(例如加锁),那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。随着硬件指令集的发展,我们有了另外一个选择:基于冲突检测的乐观并发策略,通俗地说,就是先进行操作,如果没有其他线程争用共享数据,那操作就成功了; 如果共享数据有争用,产生了冲突,那就再采取其他的补偿措施(最常见的补偿措施就是不断地重试,直到成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步操作称为非阻塞同步(Non-Blocking Synchronization)。

为什么说使用乐观并发策略需要“硬件指令集的发展”才能进行呢? 因为我们需要操作和冲突检测这两个步骤具备原子性,靠什么来保证呢?如果这里再使用互斥同步来保证就失去意义了,所以我们只能靠硬件来完成这件事情,硬件保证一个从语义上看起来需要多次操作的行为只通过一条处理器指令就能完成,这类指令常用的有:

  • 测试并设置(Test-and-Set)。
  • 获取并增加(Fetch-and-Increment)。
  • 交换(Swap)。
  • 比较并交换(Compare-and-Swap,下文称CAS)。
  • 加载链接/条件存储(Load-Linked/Store-Conditional,下文称LL/SC)。

其中,前面的3条是20世纪就已经存在于大多数指令集之中的处理器指令,后面的两条是现代处理器新增的,而且这两条指令的目的和功能是类似的。在IA64、x86指令集中有 cmpxchg 指令完成CAS功能,在sparc-TSO也有casa指令实现,而在ARM和PowerPC架构下,则需要使用一对ldrex/strex指令来完成LL/SC的功能。

参考:深入理解Java虚拟机——JVM高级特性与最佳实践(第2版),13.2.2 线程安全的实现方法

9.2.1 Compare and Swap (CAS)

CAS指令需要有3个操作数,分别是内存位置(在Java中可以简单理解为变量的内存地址,用V表示)、旧的预期值(用A表示)和新值(用B表示)。CAS指令执行时,当且仅当V符合旧预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。

在JDK 1.5之后,Java程序中才可以使用CAS操作,该操作由sun.misc.Unsafe类里面的compareAndSwapInt()和compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,或者可以认为是无条件内联进去了,这种被虚拟机特殊处理的方法称为固有函数(Intrinsics),类似的固有函数还有Math.sin()等。

下面是CAS的一个模拟实现:

public class SimulatedCAS {
    private int value;

    public synchronized int get() { return value; }

    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;
        if (oldValue == expectedValue)
            value = newValue;
        return oldValue;
    }

    public synchronized boolean compareAndSet(int expectedValue, int newValue) {
         return (expectedValue == compareAndSwap(expectedValue, newValue));
    }
}

9.2.2 Lock-free algorithm, non-blocking algorithm

An algorithm is called non-blocking if failure or suspension of any thread cannot cause failure or suspension of another thread; an algorithm is called lock-free if, at each step, some thread can make progress. Algorithms that use CAS exclusively for coordination between threads can, if constructed correctly, be both non-blocking and lock-free.

摘自: Java Concurrency in Practice, Chapter 15. Atomic Variables and Nonblocking Synchronization

很多时候,名称“Lock-free algorithm”和“Non-blocking algorithm”是从不同的角度描述同一个东西,即不直接使用锁,使用CAS操作来实现的算法。

9.2.2.1 实例:Lock-free Stack

下面是Non-blocking Stack(或者称为Lock-free Stack)的一个实现:

import java.util.concurrent.atomic.AtomicReference;

public class ConcurrentStack <E> {
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private static class Node <E> {
        public final E item;
        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

Author: cig01

Created: <2013-12-01 Sun 00:00>

Last updated: <2018-04-20 Fri 19:22>

Creator: Emacs 25.3.1 (Org mode 9.1.4)