通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

根据@Cristian的建议,这是一种适用于HTTP可观察对象的方法,它只发射一次,然后就完成了:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}

其他回答

我把问题打了星号,但我会试着试一下。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是证据:)

这里只有一个要点:getCustomer().subscribe(customer$)

我们不是订阅getCustomer()的api响应,我们是订阅一个ReplaySubject,它是可观察的,它也可以订阅一个不同的可观察对象,并且(这很重要)持有它最后发出的值并重新发布给它的任何(ReplaySubject的)订阅者。

上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个问题的回复是@Arlo的回复。

我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码并用shaereplay(1)替换share():

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

用法:

@SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

只需在map之后和任何订阅之前调用share()。

在我的例子中,我有一个通用服务(RestClientService.ts),它进行其余调用,提取数据,检查错误,并将可观察对象返回到具体的实现服务(f.ex)。: ContractClientService.ts),最后这个具体的实现将可观察对象返回给de ContractComponent。Ts,这个订阅来更新视图。

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

ContractService.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

ContractComponent.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}

你可以构建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器检索到的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

使用

声明Cacheable<>对象(假设是服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以有多个组件订阅到它。

更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢使用异步方法来调用网络请求。方法本身不返回值,而是更新同一服务中的BehaviorSubject,组件将订阅该服务。

现在为什么使用一个行为主体而不是一个可观察对象?因为,

在订阅时,BehaviorSubject返回最后一个值,而常规可观察对象只有在接收到onnext时才会触发。 如果您想在非可观察代码(没有订阅)中检索BehaviorSubject的最后一个值,您可以使用getValue()方法。

例子:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,在任何需要的地方,我们都可以订阅客户$。

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

或者您可能想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

所以现在,在再次调用getCustomers之前,数据都保留在客户$ BehaviorSubject中。

如果想要刷新这些数据,该怎么办呢?只需要打电话给getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用此方法,我们不必在后续网络调用之间显式地保留数据,因为它由BehaviorSubject处理。

PS:通常当一个组件被销毁时,摆脱订阅是一个很好的实践,为此你可以使用这个答案中建议的方法。