通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

这是.publishReplay (1) .refCount ();或.publishLast () .refCount ();因为Angular Http的可观察对象在请求后完成。

这个简单的类缓存结果,因此您可以多次订阅.value,并且只发出一个请求。你也可以使用.reload()来发出新的请求并发布数据。

你可以这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

来源是:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

其他回答

我写了一个缓存类,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

由于我们使用它的方式不同,所以它都是静态的,但是可以随意地将它变成一个正常的类和服务。我不确定angular是否会一直保持一个实例(对于Angular2来说是新的)。

我是这样使用它的:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

我认为可能有更聪明的方法,使用一些可观察的技巧,但这对我的目的来说已经很好了。

RXJS 5.3.0

我对.map(myFunction).publishReplay(1).refCount()不满意

对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)

你可以做的另一件事,就是不使用refCount(),让Observable立即热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。

你可以简单地使用ngx-cacheable!它更适合你的场景。

使用这个的好处 它只调用rest API一次,缓存响应并为接下来的请求返回相同的响应。 在创建/更新/删除操作后,可以根据需要调用API。

那么,你的服务等级应该是这样的

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

这里有更多的参考链接。

您是否试过运行已经拥有的代码?

因为你是根据getJSON()的承诺构造Observable,所以网络请求是在任何人订阅之前发出的。最终的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

伟大的答案。

或者你可以这样做:

这是rxjs的最新版本。我使用5.5.7版本的RxJS

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());