通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

这是.publishReplay (1) .refCount ();或.publishLast () .refCount ();因为Angular Http的可观察对象在请求后完成。

这个简单的类缓存结果,因此您可以多次订阅.value,并且只发出一个请求。你也可以使用.reload()来发出新的请求并发布数据。

你可以这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

来源是:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

其他回答

这是.publishReplay (1) .refCount ();或.publishLast () .refCount ();因为Angular Http的可观察对象在请求后完成。

这个简单的类缓存结果,因此您可以多次订阅.value,并且只发出一个请求。你也可以使用.reload()来发出新的请求并发布数据。

你可以这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

来源是:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

根据这篇文章

事实证明,通过添加publishReplay(1)和refCount,我们可以轻松地将缓存添加到可观察对象中。

所以在if语句里面

.publishReplay(1)
.refCount();

. map(…)

只需在map之后和任何订阅之前调用share()。

在我的例子中,我有一个通用服务(RestClientService.ts),它进行其余调用,提取数据,检查错误,并将可观察对象返回到具体的实现服务(f.ex)。: ContractClientService.ts),最后这个具体的实现将可观察对象返回给de ContractComponent。Ts,这个订阅来更新视图。

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

ContractService.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

ContractComponent.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}

rxjs 5.4.0有一个新的shareereplay方法。

rx-book shareReplay () reactivex.io/rxjs没有文档

作者明确地说“这是处理缓存AJAX结果之类的事情的理想选择”

rxjs PR #2443 feat(shaereplay):增加了publishReplay的shaereplay变体

shareereplay返回一个多播源的可观察对象 ReplaySubject。重放主题被错误地从 源,而不是对源的完成。这使得共享重玩 非常适合处理诸如缓存AJAX结果之类的事情 的事情。但是,重复行为不同于分享行为 它不会重复源可观察对象,而是会重复 源可观察对象的值。

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢使用异步方法来调用网络请求。方法本身不返回值,而是更新同一服务中的BehaviorSubject,组件将订阅该服务。

现在为什么使用一个行为主体而不是一个可观察对象?因为,

在订阅时,BehaviorSubject返回最后一个值,而常规可观察对象只有在接收到onnext时才会触发。 如果您想在非可观察代码(没有订阅)中检索BehaviorSubject的最后一个值,您可以使用getValue()方法。

例子:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,在任何需要的地方,我们都可以订阅客户$。

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

或者您可能想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

所以现在,在再次调用getCustomers之前,数据都保留在客户$ BehaviorSubject中。

如果想要刷新这些数据,该怎么办呢?只需要打电话给getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用此方法,我们不必在后续网络调用之间显式地保留数据,因为它由BehaviorSubject处理。

PS:通常当一个组件被销毁时,摆脱订阅是一个很好的实践,为此你可以使用这个答案中建议的方法。