我正在为我的应用开发网络。所以我决定试试Square的Retrofit。我看到它们支持简单的回调

@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);

和RxJava的Observable

@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

乍一看,两者都非常相似,但当它实现时,它就变得有趣了……

而简单的回调实现看起来类似于:

api.getUserPhoto(photoId, new Callback<Photo>() {
    @Override
    public void onSuccess() {
    }
});

这是非常简单直接的。而使用Observable,它很快就会变得冗长且相当复杂。

public Observable<Photo> getUserPhoto(final int photoId) {
    return Observable.create(new Observable.OnSubscribeFunc<Photo>() {
        @Override
        public Subscription onSubscribe(Observer<? super Photo> observer) {
            try {
                observer.onNext(api.getUserPhoto(photoId));
                observer.onCompleted();
            } catch (Exception e) {
                observer.onError(e);
            }

            return Subscriptions.empty();
        }
    }).subscribeOn(Schedulers.threadPoolForIO());
}

但事实并非如此。你仍然需要做这样的事情:

Observable.from(photoIdArray)
        .mapMany(new Func1<String, Observable<Photo>>() {
            @Override
            public Observable<Photo> call(Integer s) {
                return getUserPhoto(s);
            }
        })
        .subscribeOn(Schedulers.threadPoolForIO())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Photo>() {
            @Override
            public void call(Photo photo) {
                //save photo?
            }
        });

我是不是遗漏了什么?或者在这种情况下使用可观察对象是错误的? 什么时候会/应该更喜欢Observable而不是简单的回调?

更新

使用改造比上面的例子简单得多,就像@Niels在他的回答中或Jake Wharton的例子项目U2020中所展示的那样。但本质上的问题是不变的——什么时候应该使用一种方式或另一种方式?


看起来你在重新发明轮子,你所做的已经在改造中实现了。

举个例子,你可以看看retrofit的RestAdapterTest.java,在那里他们定义了一个接口,将Observable作为返回类型,然后使用它。


Observable的东西已经在Retrofit中完成了,所以代码可以是这样的:

api.getUserPhoto(photoId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Photo>() {
         @Override
            public void call(Photo photo) {
                //save photo?
            }
     });

在getUserPhoto()的情况下,RxJava的优势不是很大。 但让我们再举一个例子,当你为一个用户获取所有照片时,但只有当图像是PNG时,你不能访问JSON来在服务器端进行过滤。

api.getUserPhotos(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<List<Photo>, Observable<Photo>>() {
    @Override
    public Observable<Photo> call(List<Photo> photos) {
         return Observable.from(photos);
    }
})
.filter(new Func1<Photo, Boolean>() {
    @Override
    public Boolean call(Photo photo) {
         return photo.isPNG();
    }
})
.subscribe(
    new Action1<Photo>() {
    @Override
        public void call(Photo photo) {
            // on main thread; callback for each photo, add them to a list or something.
            list.add(photo)
        }
    }, 
    new Action1<Throwable>() {
    @Override
        public void call(Throwable throwable) {
            // on main thread; something went wrong
            System.out.println("Error! " + throwable);
        }
    }, 
    new Action0() {
        @Override
        public void call() {
            // on main thread; all photo's loaded, time to show the list or something.
        }
    });

现在JSON返回Photo的列表。我们将把它们平面映射到单独的项目。通过这样做,我们将能够使用过滤器方法来忽略非PNG的照片。之后,我们将订阅,并为每张照片获取一个回调,一个errorHandler,以及当所有行都完成时的回调。

TLDR 这里的点是;回调只返回成功和失败的回调;RxJava Observable允许你做映射,减少,过滤和更多的事情。


使用rxjava,你可以用更少的代码做更多的事情。

让我们假设你想在你的应用中实现即时搜索。 通过回调,你担心取消订阅前一个请求并订阅新请求,自己处理方向更改…我认为这是大量的代码和太啰嗦。

用rxjava非常简单。

public class PhotoModel{
  BehaviorSubject<Observable<Photo>> subject = BehaviorSubject.create(...);

  public void setUserId(String id){
   subject.onNext(Api.getUserPhoto(photoId));
  }

  public Observable<Photo> subscribeToPhoto(){
    return Observable.switchOnNext(subject);
  }
}

如果你想实现即时搜索,你只需要监听TextChangeListener并调用photommodel . setuserid (EditText.getText());

在Fragment或activity的onCreate方法中,你订阅了返回photomode . subscribetophoto()的Observable,它会返回一个总是发出由最新的Observable(request)发出的项的Observable。

AndroidObservable.bindFragment(this, photoModel.subscribeToPhoto())
                 .subscribe(new Action1<Photo>(Photo photo){
      //Here you always receive the response of the latest query to the server.
                  });

此外,如果photommodel是一个单例,例如,您不需要担心方向更改,因为不管您何时订阅,BehaviorSubject都会发出最后一个服务器响应。

通过这几行代码,我们实现了即时搜索和处理方向更改。 你认为你可以用更少的代码实现回调吗?我对此表示怀疑。


对于简单的网络来说,RxJava相对于Callback的优势非常有限。简单的getUserPhoto示例:

RxJava:

api.getUserPhoto(photoId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Photo>() {
            @Override
            public void call(Photo photo) {
               // do some stuff with your photo 
            }
     });

回调函数:

api.getUserPhoto(photoId, new Callback<Photo>() {
    @Override
    public void onSuccess(Photo photo, Response response) {
    }
});

RxJava变体并不比Callback变体好多少。现在,让我们忽略错误处理。 让我们来拍一组照片:

RxJava:

api.getUserPhotos(userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<List<Photo>, Observable<Photo>>() {
    @Override
    public Observable<Photo> call(List<Photo> photos) {
         return Observable.from(photos);
    }
})
.filter(new Func1<Photo, Boolean>() {
    @Override
    public Boolean call(Photo photo) {
         return photo.isPNG();
    }
})
.subscribe(
    new Action1<Photo>() {
    @Override
        public void call(Photo photo) {
            list.add(photo)
        }
    });

回调函数:

api.getUserPhotos(userId, new Callback<List<Photo>>() {
    @Override
    public void onSuccess(List<Photo> photos, Response response) {
        List<Photo> filteredPhotos = new ArrayList<Photo>();
        for(Photo photo: photos) {
            if(photo.isPNG()) {
                filteredList.add(photo);
            }
        }
    }
});

现在,RxJava变体仍然不是更小,尽管使用Lambdas它将是更接近Callback变体的getter。 此外,如果您可以访问JSON提要,那么在只显示png的情况下检索所有照片会有点奇怪。只需调整提要,它只显示png。

第一个结论

它不会使您的代码库更小,当您加载一个简单的JSON,您准备在正确的格式。

现在,让我们让事情变得更有趣一点。假设你不仅想要检索userPhoto,而且你有一个instagram的克隆,你想检索2个json: 1. getUserDetails () 2. getUserPhotos ()

您希望并行加载这两个json,当两者都加载时,应该显示页面。 回调变量将变得有点困难:你必须创建2个回调,将数据存储在活动中,如果所有数据都已加载,则显示页面:

回调函数:

api.getUserDetails(userId, new Callback<UserDetails>() {
    @Override
    public void onSuccess(UserDetails details, Response response) {
        this.details = details;
        if(this.photos != null) {
            displayPage();
        }
    }
});

api.getUserPhotos(userId, new Callback<List<Photo>>() {
    @Override
    public void onSuccess(List<Photo> photos, Response response) {
        this.photos = photos;
        if(this.details != null) {
            displayPage();
        }
    }
});

RxJava:

private class Combined {
    UserDetails details;
    List<Photo> photos;
}


Observable.zip(api.getUserDetails(userId), api.getUserPhotos(userId), new Func2<UserDetails, List<Photo>, Combined>() {
            @Override
            public Combined call(UserDetails details, List<Photo> photos) {
                Combined r = new Combined();
                r.details = details;
                r.photos = photos;
                return r;
            }
        }).subscribe(new Action1<Combined>() {
            @Override
            public void call(Combined combined) {
            }
        });

我们正在取得进展!RxJava的代码现在和回调选项一样大。RxJava代码更加健壮; 想想如果我们需要加载第三个JSON(比如最新的视频)会发生什么?RxJava只需要微小的调整,而Callback变体需要在多个地方进行调整(在每次回调时,我们需要检查是否检索了所有数据)。

另一个例子;我们想要创建一个自动完成字段,它使用Retrofit加载数据。 我们不想每次EditText有TextChangedEvent时都做一个webcall。当快速输入时,只有最后一个元素应该触发调用。 在RxJava中,我们可以使用debounce操作符:

inputObservable.debounce(1, TimeUnit.SECONDS).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                // use Retrofit to create autocompletedata
            }
        });

我不会创建回调变量,但您将理解这是更多的工作。

结论: 当数据作为流发送时,RxJava非常出色。Retrofit Observable同时将所有元素推送到流上。 与Callback相比,这本身并不是特别有用。但是当有多个元素被推送到流上并在不同的时间被推送时,并且您需要做与时间相关的事情时,RxJava使代码更易于维护。


通过其他答案中的样本和结论,我认为对于简单的一两步任务来说,没有太大的区别。然而,Callback是简单而直接的。RxJava比较复杂,对于简单的任务来说太大了。还有第三种解决办法:算盘常用。让我用这三种解决方案实现上面的用例:Callback, RxJava, CompletableFuture(abacsus -common)和Retrolambda:

从网络获取照片并保存/显示在设备上:

// By Callback
api.getUserPhoto(userId, new Callback<Photo>() {
    @Override
    public void onResponse(Call<Photo> call, Response<Photo> response) {
        save(response.body()); // or update view on UI thread.
    }

    @Override
    public void onFailure(Call<Photo> call, Throwable t) {
        // show error message on UI or do something else.
    }
});

// By RxJava
api.getUserPhoto2(userId) //
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(photo -> {
            save(photo); // or update view on UI thread.
        }, error -> {
            // show error message on UI or do something else.
        });

// By Thread pool executor and CompletableFuture.
TPExecutor.execute(() -> api.getUserPhoto(userId))
        .thenRunOnUI((photo, error) -> {
            if (error != null) {
                // show error message on UI or do something else.
            } else {
                save(photo); // or update view on UI thread.
            }
        });

并行加载用户详细信息和照片

// By Callback
// ignored because it's little complicated

// By RxJava
Observable.zip(api.getUserDetails2(userId), api.getUserPhoto2(userId), (details, photo) -> Pair.of(details, photo))
        .subscribe(p -> {
            // Do your task.
        });

// By Thread pool executor and CompletableFuture.
TPExecutor.execute(() -> api.getUserDetails(userId))
          .runOnUIAfterBoth(TPExecutor.execute(() -> api.getUserPhoto(userId)), p -> {
    // Do your task
});

我们通常遵循以下逻辑:

如果它是一个简单的单响应调用,那么Callback或Future更好。 如果一个调用有多个响应(流),或者不同调用之间有复杂的交互(参见@Niels的回答),那么Observables更好。


我个人更喜欢使用Rx来获得api响应的情况下,我必须做过滤,映射或类似的数据或在情况下,我必须做另一个api调用基于之前的调用响应


当你为乐趣、宠物项目、POC或第一个原型创建应用时,你会使用简单的android/java核心类,如回调、异步任务、循环器、线程等。它们使用简单,不需要任何第三方lib集成。当类似的事情可以立即完成时,仅仅为了构建一个小型的不可更改的项目而进行大型库集成是不合逻辑的。

然而,这些就像一把非常锋利的刀。在生产环境中使用这些工具总是很酷,但也会产生一些后果。如果不熟悉Clean编码和SOLID原则,就很难编写安全的并发代码。您必须维护一个适当的体系结构,以促进未来的变更和提高团队生产力。

另一方面,像RxJava、co -routine等并发库已经被尝试和测试了超过十亿次,以帮助编写可用于生产的并发代码。现在再次强调,并不是说使用这些库就不能编写并发代码或抽象出所有并发逻辑。你现在还是。但是现在,它是可见的,并且在整个代码库中,更重要的是在整个开发团队中,执行了一个清晰的并发代码编写模式。

这是使用并发框架而不是处理原始并发的普通旧核心类的主要好处。不过,别误会我的意思。我非常相信限制外部库的依赖关系,但在这种特定情况下,你必须为你的代码库构建一个自定义框架,这是一项耗时的任务,只有在有了丰富的经验之后才能完成。因此,与使用回调等普通类相比,并发框架更受欢迎。


TL 'DR

如果你已经在整个代码库中使用RxJava进行并发编码,只需使用RxJava Observable/Flowable即可。一个明智的问题是我应该使用可观察对象还是可流动对象。 如果不是,继续使用可调用对象。