通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

我把问题打了星号,但我会试着试一下。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是证据:)

这里只有一个要点:getCustomer().subscribe(customer$)

我们不是订阅getCustomer()的api响应,我们是订阅一个ReplaySubject,它是可观察的,它也可以订阅一个不同的可观察对象,并且(这很重要)持有它最后发出的值并重新发布给它的任何(ReplaySubject的)订阅者。

其他回答

根据这篇文章

事实证明,通过添加publishReplay(1)和refCount,我们可以轻松地将缓存添加到可观察对象中。

所以在if语句里面

.publishReplay(1)
.refCount();

. map(…)

您是否试过运行已经拥有的代码?

因为你是根据getJSON()的承诺构造Observable,所以网络请求是在任何人订阅之前发出的。最终的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

RXJS 5.3.0

我对.map(myFunction).publishReplay(1).refCount()不满意

对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)

你可以做的另一件事,就是不使用refCount(),让Observable立即热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。

你可以构建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器检索到的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

使用

声明Cacheable<>对象(假设是服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以有多个组件订阅到它。

更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable

我认为@ngx-cache/core对于维护http调用的缓存功能是有用的,特别是如果http调用是在浏览器和服务器平台上进行的。

假设我们有以下方法:

getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

你可以在第一次执行时使用@ngx-cache/core的Cached装饰器将HTTP调用方法的返回值存储在缓存存储中(存储可以配置,请检查ng-seed/universal的实现)。下一次调用该方法时(无论是在浏览器平台上还是在服务器平台上),将从缓存存储中检索该值。

import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

还可以使用缓存API使用缓存方法(has、get、set)。

anyclass.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

下面是客户端和服务器端缓存的包列表:

@ngx-cache/core:缓存实用程序 @ngx-cache/platform- Browser: SPA/Browser平台实现 @ngx-cache/platform-server:服务器平台实现 @ngx-cache/fs-storage:存储工具(服务器平台需要)