Executor

并发API引入了ExecutorService作为一个在程序中直接使用Thread的高层次的替换方案。Executors支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用,因此,在我们可以使用一个executor service来运行和我们想在我们整个程序中执行的一样多的并发任务。

下面是使用executors的第一个代码示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
// => Hello pool-1-thread-1

Executors类提供了便利的工厂方法来创建不同类型的 executor services。在这个示例中我们使用了一个单线程线程池的 executor。

代码运行的结果类似于上一个示例,但是当运行代码时,你会注意到一个很大的差别:Java进程从没有停止!Executors必须显式的停止-否则它们将持续监听新的任务。

ExecutorService提供了两个方法来达到这个目的——shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止所有正在执行的任务并立即关闭execuotr。

这是我喜欢的通常关闭executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor通过等待指定的时间让当前执行的任务终止来“温柔的”关闭executor。在等待最长5分钟的时间后,execuote最终会通过中断所有的正在执行的任务关闭。

CallableFuture

除了Runnable,executor还支持另一种类型的任务——Callable。Callables也是类似于runnables的函数接口,不同之处在于,Callable返回一个值。

下面的lambda表达式定义了一个callable:在休眠一分钟后返回一个整数。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也可以像runnbales一样提交给 executor services。但是callables的结果怎么办?因为submit()不会等待任务完成,executor service不能直接返回callable的结果。不过,executor 可以返回一个Future类型的结果,它可以用来在稍后某个时间取出实际的结果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在将callable提交给exector之后,我们先通过调用isDone()来检查这个future是否已经完成执行。我十分确定这会发生什么,因为在返回那个整数之前callable会休眠一分钟、

在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果123之前执行完成。现在future执行完毕,我们可以在控制台看到如下的结果:

future done? false
future done? true
result: 123

Future与底层的executor service紧密的结合在一起。记住,如果你关闭executor,所有的未中止的future都会抛出异常。

executor.shutdownNow();
future.get();

你可能注意到我们这次创建executor的方式与上一个例子稍有不同。我们使用newFixedThreadPool(1)来创建一个单线程线程池的 execuot service。 这等同于使用newSingleThreadExecutor不过使用第二种方式我们可以稍后通过简单的传入一个比1大的值来增加线程池的大小。

超时

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});
    future.get(1, TimeUnit.SECONDS);

运行上面的代码将会产生一个TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已经猜到俄为什么会排除这个异常。我们指定的最长等待时间为1分钟,而这个callable在返回结果之前实际需要两分钟。

invokeAll

Executors支持通过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。

ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");
executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在这个例子中,我们利用Java8中的函数流(stream)来处理invokeAll()调用返回的所有future。我们首先将每一个future映射到它的返回值,然后将每个值打印到控制台。如果你还不属性stream,可以阅读 数据流 部分。

invokeAny

批量提交callable的另一种方式就是invokeAny(),它的工作方式与invokeAll()稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。

为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的callable。这个方法返回一个callable,这个callable休眠指定 的时间直到返回给定的结果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我们利用这个方法创建一组callable,这些callable拥有不同的执行时间,从1分钟到3分钟。通过invokeAny()将这些callable提交给一个executor,返回最快的callable的字符串结果-在这个例子中为任务2:

ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2

上面这个例子又使用了另一种方式来创建executor——调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools使用一个并行因子数来创建,默认值为主机CPU的可用核心数。

ForkJoinPools 在Java7时引入,将会在这个系列后面的教程中详细讲解。让我们深入了解一下 scheduled executors 来结束本次教程。