我想知道两者的区别 CompletableFuture,Future和Observable RxJava。

我所知道的都是异步的,但是

Future.get()阻塞线程

CompletableFuture给出了回调方法

RxJava可观察对象——类似于CompletableFuture,但有其他好处(不确定)

例如:如果客户端需要进行多个服务调用,当我们使用Futures (Java)时,Future.get()将按顺序执行…我想知道它在RxJava中是如何更好的。

文档http://reactivex.io/intro.html说

使用Futures来优化组合有条件异步执行流是很困难的(或者不可能,因为每个请求的延迟在运行时是不同的)。当然,这是可以做到的,但很快就会变得复杂(因此容易出错),或者过早地阻塞Future.get(),这就消除了异步执行的好处。

真的很想知道RxJava如何解决这个问题。我发现从文档中很难理解。


当前回答

所有三个接口都用于将值从生产者传输到消费者。消费者可以分为两类:

同步:消费者进行阻塞调用,当值准备好时返回 异步:当值准备好时,调用使用者的回调方法

此外,通信接口在其他方面也有所不同:

能够传输单个值或多个值 如果有多个值,则支持或不支持反压

结果是:

未来使用同步接口传输单个值 CompletableFuture使用同步和异步接口传递单个值 Rx使用带背压的异步接口传输多个值

此外,所有这些通信设施都支持传输异常。但情况并非总是如此。例如,BlockingQueue就不是。

其他回答

期货

期货是在Java 5(2004)中引入的。它们基本上是一个尚未完成的操作结果的占位符。一旦操作完成,Future将包含该结果。例如,一个操作可以是一个提交给ExecutorService的Runnable或Callable实例。操作的提交者可以使用Future对象来检查操作是否为done(),或者使用阻塞的get()方法等待它完成。

例子:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures were introduced in Java 8 (2014). They are in fact an evolution of regular Futures, inspired by Google's Listenable Futures, part of the Guava library. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to "go do some task X, and when you're done, go do this other thing using the result of X". Using CompletableFutures, you can do something with the result of the operation without actually blocking a thread to wait for the result. Here's a simple example:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava是由Netflix创建的整个响应式编程库。乍一看,它似乎与Java 8的流类似。是的,但它更强大。

与Futures类似,RxJava可以用来将一堆同步或异步操作串在一起,以创建一个处理管道。与Futures是一次性使用的不同,RxJava工作在零个或多个项目的流上。包括包含无限项的永不结束的流。由于拥有丰富得令人难以置信的操作符集,它也更加灵活和强大。

与Java 8的流不同,RxJava也有一个反压机制,这允许它处理处理管道的不同部分在不同的线程中以不同的速率运行的情况。

RxJava的缺点是,尽管有可靠的文档,但由于涉及到范式转换,它是一个具有挑战性的库。Rx代码也可能是调试的噩梦,特别是在涉及多个线程的情况下,如果需要反向压力,情况会更糟。

如果你想深入了解它,在官方网站上有一整页的各种教程,加上官方文档和Javadoc。你也可以看看一些视频,比如这个视频,它简要介绍了Rx,还讨论了Rx和期货之间的区别。

附加功能:Java 9反应流

Java 9的活性流又名流API是由各种活性流库(如RxJava 2、Akka Streams和Vertx)实现的一组接口。它们允许这些反应库相互连接,同时保留所有重要的反压力。

我从0.9开始使用Rx Java,现在是1.3.2,很快就会迁移到2。x我在一个私人项目中使用这个,我已经工作了8年。

如果没有这个库,我就不会再编程了。一开始我是持怀疑态度的,但这是你需要创造的一种完全不同的心态。一开始很困难。有时我一看就是几个小时。哈哈

这只是一个实践的问题,真正了解流程(又名可观察对象和观察者的契约),一旦你达到了那里,你就会讨厌这样做。

对我来说,这个库并没有什么缺点。

用例: 我有一个监视器视图,包含9个仪表(cpu, mem,网络等…)在启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个可观察对象(interval),其中包含9米的所有数据。 它将每秒钟向视图推送一个新的结果(所以不是轮询!!) 该可观察对象使用平面映射同时(异步!)从9个不同的来源获取数据,并将结果压缩到您的视图将在onNext()上获得的新模型中。

你要怎么用期货,完全期权等等来做呢?好运!:)

Rx Java为我解决了编程中的许多问题,并在某种程度上使编程变得更简单……

优点:

Statelss !!! (important thing to mention, most important maybe) Thread management out of the box Build sequences that have their own lifecycle Everything are observables so chaining is easy Less code to write Single jar on classpath (very lightweight) Highly concurrent No callback hell anymore Subscriber based (tight contract between consumer and producer) Backpressure strategies (circuit breaker a like) Splendid error handling and recovering Very nice documentation (marbles <3) Complete control Many more ...

缺点: -难以测试

Java的Future是一个占位符,用来保存将来用阻塞API完成的事情。您必须使用它的isDone()方法定期轮询它,以检查该任务是否已完成。当然,您可以实现自己的异步代码来管理轮询逻辑。但是,这会导致更多的样板代码和调试开销。

Java的CompletableFuture是由Scala的Future所创新的。它携带一个内部回调方法。一旦完成,回调方法将被触发,并告诉线程应该执行下游操作。这就是为什么它有thenApply方法来对包装在CompletableFuture中的对象做进一步的操作。

RxJava's Observable is an enhanced version of CompletableFuture. It allows you to handle the backpressure. In the thenApply method (and even with its brothers thenApplyAsync) we mentioned above, this situation might happen: the downstream method wants to call an external service that might become unavailable sometimes. In this case, the CompleteableFuture will fail completely and you will have to handle the error by yourself. However, Observable allows you to handle the backpressure and continue the execution once the external service to become available.

此外,还有一个类似Observable的接口:Flowable。它们的设计目的各不相同。通常,Flowable专门用于处理冷的和非定时的操作,而Observable专门用于处理需要即时响应的执行。官方文件见:https://github.com/ReactiveX/RxJava#backpressure

所有三个接口都用于将值从生产者传输到消费者。消费者可以分为两类:

同步:消费者进行阻塞调用,当值准备好时返回 异步:当值准备好时,调用使用者的回调方法

此外,通信接口在其他方面也有所不同:

能够传输单个值或多个值 如果有多个值,则支持或不支持反压

结果是:

未来使用同步接口传输单个值 CompletableFuture使用同步和异步接口传递单个值 Rx使用带背压的异步接口传输多个值

此外,所有这些通信设施都支持传输异常。但情况并非总是如此。例如,BlockingQueue就不是。

与普通Future相比,CompletableFuture的主要优点是它利用了极其强大的流API,并为你提供了回调处理程序来连接你的任务,如果你使用普通Future,这是绝对不存在的。除了提供异步架构外,CompletableFuture是处理计算量大的map-reduce任务的一种方式,而不用太担心应用程序的性能。