在RxJava中,有5种不同的调度程序可供选择:

immediate(): Creates and returns a Scheduler that executes work immediately on the current thread. trampoline(): Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. newThread(): Creates and returns a Scheduler that creates a new Thread for each unit of work. computation(): Creates and returns a Scheduler intended for computational work. This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler. Use Schedulers.io() instead. io(): Creates and returns a Scheduler intended for IO-bound work. The implementation is backed by an Executor thread-pool that will grow as needed. This can be used for asynchronously performing blocking IO. Do not perform computational work on this scheduler. Use Schedulers.computation() instead.

问题:

前3个调度器是不言自明的;然而,我对计算和io有点困惑。

什么是“IO-bound work”?它是用于处理流(java.io)和文件(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST api ? computation()和newThread()有什么不同?是不是所有的computation()调用都在一个单独的(后台)线程上,而不是每次都在一个新的(后台)线程上? 为什么在做IO工作时调用computation()不好? 为什么在做计算工作时调用io()不好?


当前回答

最重要的一点是两个调度器。io和Schedulers.computation由无限线程池支持,这与问题中提到的其他线程池不同。如果Executor是用newCachedThreadPool(自动回收线程池)创建的,那么只有Schedulers.from(Executor)共享这个特性。

正如之前的回应和网络上的多篇文章所充分解释的那样,调度器。io和scheduler .computation应使用小心翼翼,因为它们针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是为响应式流提供真正的并发性。

与新来者的看法相反,响应流本质上不是并发的,而是异步的和顺序的。正因如此,调度器。只有当I/O操作阻塞时(例如:使用阻塞命令如Apache IOUtils FileUtils.readFileAsString(…)),才会冻结调用线程,直到操作完成。

使用异步方法(如Java AsynchronousFileChannel(…))不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,调度器。IO线程实际上并不适合异步操作,因为它们不会运行事件循环,而且回调永远不会…被称为。

同样的逻辑也适用于数据库访问或远程API调用。不要使用调度器。如果可以使用异步或响应式API进行调用,则可以使用。

回到并发性。您可能无法访问异步或响应式API来异步或并发地执行I/O操作,因此惟一的选择是在单独的线程上分派多个调用。可惜的是,响应式流在其末端是顺序的,但好消息是flatMap()操作符可以在其核心引入并发性。

并发性必须在流构造中构建,通常使用flatMap()操作符。这个功能强大的操作符可以配置为在内部为flatMap()嵌入式Function<T, R>提供多线程上下文。该上下文由Scheduler等多线程调度器提供。io或Scheduler.computation。

在RxJava2调度器和并发性的文章中找到更多细节,在那里您将找到关于如何顺序和并发地使用调度器的代码示例和详细解释。

希望这能有所帮助,

软杰克

其他回答

问得好,我认为文档可以提供更多细节。

io() is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples. computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule CPU intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit. It's best to leave computation() for CPU intensive work only otherwise you won't get good CPU utilization. It's bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.

最重要的一点是两个调度器。io和Schedulers.computation由无限线程池支持,这与问题中提到的其他线程池不同。如果Executor是用newCachedThreadPool(自动回收线程池)创建的,那么只有Schedulers.from(Executor)共享这个特性。

正如之前的回应和网络上的多篇文章所充分解释的那样,调度器。io和scheduler .computation应使用小心翼翼,因为它们针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是为响应式流提供真正的并发性。

与新来者的看法相反,响应流本质上不是并发的,而是异步的和顺序的。正因如此,调度器。只有当I/O操作阻塞时(例如:使用阻塞命令如Apache IOUtils FileUtils.readFileAsString(…)),才会冻结调用线程,直到操作完成。

使用异步方法(如Java AsynchronousFileChannel(…))不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,调度器。IO线程实际上并不适合异步操作,因为它们不会运行事件循环,而且回调永远不会…被称为。

同样的逻辑也适用于数据库访问或远程API调用。不要使用调度器。如果可以使用异步或响应式API进行调用,则可以使用。

回到并发性。您可能无法访问异步或响应式API来异步或并发地执行I/O操作,因此惟一的选择是在单独的线程上分派多个调用。可惜的是,响应式流在其末端是顺序的,但好消息是flatMap()操作符可以在其核心引入并发性。

并发性必须在流构造中构建,通常使用flatMap()操作符。这个功能强大的操作符可以配置为在内部为flatMap()嵌入式Function<T, R>提供多线程上下文。该上下文由Scheduler等多线程调度器提供。io或Scheduler.computation。

在RxJava2调度器和并发性的文章中找到更多细节,在那里您将找到关于如何顺序和并发地使用调度器的代码示例和详细解释。

希望这能有所帮助,

软杰克

这篇博文提供了一个很好的答案

文章中写道:

Schedulers.io()由一个无限的线程池支持。它用于非cpu密集型I/O类型的工作,包括与文件系统的交互、执行网络调用、数据库交互等。此线程池用于异步执行阻塞IO。

Schedulers.computation()由一个有限的线程池支持,其大小为可用处理器的数量。它用于计算或cpu密集型工作,如调整图像大小,处理大型数据集等。注意:当分配的计算线程多于可用内核时,由于上下文切换和线程创建开销,性能会下降,因为线程会争夺处理器时间。

Schedulers.newThread()为每个调度的工作单元创建一个新线程。这个调度程序的开销很大,因为每次都要生成新线程,而且不会发生重用。

Schedulers.from(Executor Executor)创建并返回指定执行程序支持的自定义调度器。要限制线程池中并发线程的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果一个任务在所有线程都被占用时被调度,它将被排队。在显式关闭池之前,池中的线程将一直存在。

Main thread或AndroidSchedulers.mainThread()是由RxAndroid扩展库提供给RxJava的。主线程(也称为UI线程)是用户交互发生的地方。应该注意不要重载这个线程,以防止不响应的UI或更糟糕的应用程序不响应(ANR)对话框。

Schedulers.single()是RxJava 2中的新功能。此调度器由单个线程支持,按请求的顺序依次执行任务。

Schedulers.trampoline()由一个参与的工作线程以FIFO(先入先出)方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。