期货
期货是在Java 5(2004)中引入的。它们基本上是一个尚未完成的操作结果的占位符。一旦操作完成,Future将包含该结果。例如,一个操作可以是一个提交给ExecutorService的Runnable或Callable实例。操作的提交者可以使用Future对象来检查操作是否为done(),或者使用阻塞的get()方法等待它完成。
例子:
/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
}
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new MyCallable());
System.out.println(f.isDone()); //False
System.out.println(f.get()); //Waits until the task is done, then prints 1
}
CompletableFutures
CompletableFutures were introduced in Java 8 (2014). They are in fact an evolution of regular Futures, inspired by Google's Listenable Futures, part of the Guava library. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to "go do some task X, and when you're done, go do this other thing using the result of X". Using CompletableFutures, you can do something with the result of the operation without actually blocking a thread to wait for the result. Here's a simple example:
/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return 1;
}
}
/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {
@Override
public Integer apply(Integer x) {
return x + 1;
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
System.out.println(f.isDone()); // False
CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}
RxJava
RxJava是由Netflix创建的整个响应式编程库。乍一看,它似乎与Java 8的流类似。是的,但它更强大。
与Futures类似,RxJava可以用来将一堆同步或异步操作串在一起,以创建一个处理管道。与Futures是一次性使用的不同,RxJava工作在零个或多个项目的流上。包括包含无限项的永不结束的流。由于拥有丰富得令人难以置信的操作符集,它也更加灵活和强大。
与Java 8的流不同,RxJava也有一个反压机制,这允许它处理处理管道的不同部分在不同的线程中以不同的速率运行的情况。
RxJava的缺点是,尽管有可靠的文档,但由于涉及到范式转换,它是一个具有挑战性的库。Rx代码也可能是调试的噩梦,特别是在涉及多个线程的情况下,如果需要反向压力,情况会更糟。
如果你想深入了解它,在官方网站上有一整页的各种教程,加上官方文档和Javadoc。你也可以看看一些视频,比如这个视频,它简要介绍了Rx,还讨论了Rx和期货之间的区别。
附加功能:Java 9反应流
Java 9的活性流又名流API是由各种活性流库(如RxJava 2、Akka Streams和Vertx)实现的一组接口。它们允许这些反应库相互连接,同时保留所有重要的反压力。