通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们获取这个可观察对象并向其添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。
在RxJs 5中正确的方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?
注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。
rxjs 5.4.0(2017-05-09)增加了对shareereplay的支持。
为什么使用共享回放?
当您有不希望在多个订阅者之间执行的副作用或繁重的计算时,通常需要使用shareReplay。在您知道流的后期订阅者需要访问先前发出的值的情况下,它可能也很有价值。这种在订阅上重放价值的能力是share和shareereplay的区别所在。
你可以很容易地修改一个angular服务来使用它,并返回一个带有缓存结果的可观察对象,它只会进行一次http调用(假设第一次调用成功)。
Angular服务示例
这是一个非常简单的客户服务,使用共享回放。
customer.service.ts
import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Injectable({providedIn: 'root'})
export class CustomerService {
private readonly _getCustomers: Observable<ICustomer[]>;
constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}
export interface ICustomer {
/* ICustomer interface fields defined here */
}
注意,构造函数中的赋值可以移动到getCustomers方法,但由于HttpClient返回的可观察对象是“冷的”,因此在构造函数中这样做是可以接受的,因为http调用只会在订阅的第一个调用中进行。
此外,这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。
rxjs 5.4.0有一个新的shareereplay方法。
rx-book shareReplay ()
reactivex.io/rxjs没有文档
作者明确地说“这是处理缓存AJAX结果之类的事情的理想选择”
rxjs PR #2443 feat(shaereplay):增加了publishReplay的shaereplay变体
shareereplay返回一个多播源的可观察对象
ReplaySubject。重放主题被错误地从
源,而不是对源的完成。这使得共享重玩
非常适合处理诸如缓存AJAX结果之类的事情
的事情。但是,重复行为不同于分享行为
它不会重复源可观察对象,而是会重复
源可观察对象的值。
rxjs 5.4.0(2017-05-09)增加了对shareereplay的支持。
为什么使用共享回放?
当您有不希望在多个订阅者之间执行的副作用或繁重的计算时,通常需要使用shareReplay。在您知道流的后期订阅者需要访问先前发出的值的情况下,它可能也很有价值。这种在订阅上重放价值的能力是share和shareereplay的区别所在。
你可以很容易地修改一个angular服务来使用它,并返回一个带有缓存结果的可观察对象,它只会进行一次http调用(假设第一次调用成功)。
Angular服务示例
这是一个非常简单的客户服务,使用共享回放。
customer.service.ts
import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Injectable({providedIn: 'root'})
export class CustomerService {
private readonly _getCustomers: Observable<ICustomer[]>;
constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}
export interface ICustomer {
/* ICustomer interface fields defined here */
}
注意,构造函数中的赋值可以移动到getCustomers方法,但由于HttpClient返回的可观察对象是“冷的”,因此在构造函数中这样做是可以接受的,因为http调用只会在订阅的第一个调用中进行。
此外,这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。
我写了一个缓存类,
/**
* Caches results returned from given fetcher callback for given key,
* up to maxItems results, deletes the oldest results when full (FIFO).
*/
export class StaticCache
{
static cachedData: Map<string, any> = new Map<string, any>();
static maxItems: number = 400;
static get(key: string){
return this.cachedData.get(key);
}
static getOrFetch(key: string, fetcher: (string) => any): any {
let value = this.cachedData.get(key);
if (value != null){
console.log("Cache HIT! (fetcher)");
return value;
}
console.log("Cache MISS... (fetcher)");
value = fetcher(key);
this.add(key, value);
return value;
}
static add(key, value){
this.cachedData.set(key, value);
this.deleteOverflowing();
}
static deleteOverflowing(): void {
if (this.cachedData.size > this.maxItems) {
this.deleteOldest(this.cachedData.size - this.maxItems);
}
}
/// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
/// However that seems not to work. Trying with forEach.
static deleteOldest(howMany: number): void {
//console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
let iterKeys = this.cachedData.keys();
let item: IteratorResult<string>;
while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
//console.debug(" Deleting: " + item.value);
this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
}
}
static clear(): void {
this.cachedData = new Map<string, any>();
}
}
由于我们使用它的方式不同,所以它都是静态的,但是可以随意地将它变成一个正常的类和服务。我不确定angular是否会一直保持一个实例(对于Angular2来说是新的)。
我是这样使用它的:
let httpService: Http = this.http;
function fetcher(url: string): Observable<any> {
console.log(" Fetching URL: " + url);
return httpService.get(url).map((response: Response) => {
if (!response) return null;
if (typeof response.json() !== "array")
throw new Error("Graph REST should return an array of vertices.");
let items: any[] = graphService.fromJSONarray(response.json(), httpService);
return array ? items : items[0];
});
}
// If data is a link, return a result of a service call.
if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
{
// Make an HTTP call.
let url = this.data[verticesLabel][name]["link"];
let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
if (!cachedObservable)
throw new Error("Failed loading link: " + url);
return cachedObservable;
}
我认为可能有更聪明的方法,使用一些可观察的技巧,但这对我的目的来说已经很好了。
你可以构建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器检索到的数据:
declare type GetDataHandler<T> = () => Observable<T>;
export class Cacheable<T> {
protected data: T;
protected subjectData: Subject<T>;
protected observableData: Observable<T>;
public getHandler: GetDataHandler<T>;
constructor() {
this.subjectData = new ReplaySubject(1);
this.observableData = this.subjectData.asObservable();
}
public getData(): Observable<T> {
if (!this.getHandler) {
throw new Error("getHandler is not defined");
}
if (!this.data) {
this.getHandler().map((r: T) => {
this.data = r;
return r;
}).subscribe(
result => this.subjectData.next(result),
err => this.subjectData.error(err)
);
}
return this.observableData;
}
public resetCache(): void {
this.data = null;
}
public refresh(): void {
this.resetCache();
this.getData();
}
}
使用
声明Cacheable<>对象(假设是服务的一部分):
list: Cacheable<string> = new Cacheable<string>();
和处理程序:
this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}
从组件调用:
//gets data from server
List.getData().subscribe(…)
您可以有多个组件订阅到它。
更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable