通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

rxjs 5.4.0(2017-05-09)增加了对shareereplay的支持。

为什么使用共享回放? 当您有不希望在多个订阅者之间执行的副作用或繁重的计算时,通常需要使用shareReplay。在您知道流的后期订阅者需要访问先前发出的值的情况下,它可能也很有价值。这种在订阅上重放价值的能力是share和shareereplay的区别所在。

你可以很容易地修改一个angular服务来使用它,并返回一个带有缓存结果的可观察对象,它只会进行一次http调用(假设第一次调用成功)。

Angular服务示例

这是一个非常简单的客户服务,使用共享回放。

customer.service.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable({providedIn: 'root'})
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }
    
    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

注意,构造函数中的赋值可以移动到getCustomers方法,但由于HttpClient返回的可观察对象是“冷的”,因此在构造函数中这样做是可以接受的,因为http调用只会在订阅的第一个调用中进行。

此外,这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。

其他回答

更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。

以前……

首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。

Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。

这是一个可重用函数

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

下面是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

您选择的实现将取决于是否希望unsubscribe()取消您的HTTP请求。

在任何情况下,TypeScript装饰器都是标准化行为的好方法。这是我写的:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }

装饰器定义:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢使用异步方法来调用网络请求。方法本身不返回值,而是更新同一服务中的BehaviorSubject,组件将订阅该服务。

现在为什么使用一个行为主体而不是一个可观察对象?因为,

在订阅时,BehaviorSubject返回最后一个值,而常规可观察对象只有在接收到onnext时才会触发。 如果您想在非可观察代码(没有订阅)中检索BehaviorSubject的最后一个值,您可以使用getValue()方法。

例子:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,在任何需要的地方,我们都可以订阅客户$。

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

或者您可能想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

所以现在,在再次调用getCustomers之前,数据都保留在客户$ BehaviorSubject中。

如果想要刷新这些数据,该怎么办呢?只需要打电话给getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用此方法,我们不必在后续网络调用之间显式地保留数据,因为它由BehaviorSubject处理。

PS:通常当一个组件被销毁时,摆脱订阅是一个很好的实践,为此你可以使用这个答案中建议的方法。

我把问题打了星号,但我会试着试一下。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是证据:)

这里只有一个要点:getCustomer().subscribe(customer$)

我们不是订阅getCustomer()的api响应,我们是订阅一个ReplaySubject,它是可观察的,它也可以订阅一个不同的可观察对象,并且(这很重要)持有它最后发出的值并重新发布给它的任何(ReplaySubject的)订阅者。

你可以简单地使用ngx-cacheable!它更适合你的场景。

使用这个的好处 它只调用rest API一次,缓存响应并为接下来的请求返回相同的响应。 在创建/更新/删除操作后,可以根据需要调用API。

那么,你的服务等级应该是这样的

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

这里有更多的参考链接。