通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们获取这个可观察对象并向其添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。
在RxJs 5中正确的方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?
注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。
这是.publishReplay (1) .refCount ();或.publishLast () .refCount ();因为Angular Http的可观察对象在请求后完成。
这个简单的类缓存结果,因此您可以多次订阅.value,并且只发出一个请求。你也可以使用.reload()来发出新的请求并发布数据。
你可以这样使用它:
let res = new RestResource(() => this.http.get('inline.bundleo.js'));
res.status.subscribe((loading)=>{
console.log('STATUS=',loading);
});
res.value.subscribe((value) => {
console.log('VALUE=', value);
});
来源是:
export class RestResource {
static readonly LOADING: string = 'RestResource_Loading';
static readonly ERROR: string = 'RestResource_Error';
static readonly IDLE: string = 'RestResource_Idle';
public value: Observable<any>;
public status: Observable<string>;
private loadStatus: Observer<any>;
private reloader: Observable<any>;
private reloadTrigger: Observer<any>;
constructor(requestObservableFn: () => Observable<any>) {
this.status = Observable.create((o) => {
this.loadStatus = o;
});
this.reloader = Observable.create((o: Observer<any>) => {
this.reloadTrigger = o;
});
this.value = this.reloader.startWith(null).switchMap(() => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.LOADING);
}
return requestObservableFn()
.map((res) => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.IDLE);
}
return res;
}).catch((err)=>{
if (this.loadStatus) {
this.loadStatus.next(RestResource.ERROR);
}
return Observable.of(null);
});
}).publishReplay(1).refCount();
}
reload() {
this.reloadTrigger.next(null);
}
}
我写了一个缓存类,
/**
* Caches results returned from given fetcher callback for given key,
* up to maxItems results, deletes the oldest results when full (FIFO).
*/
export class StaticCache
{
static cachedData: Map<string, any> = new Map<string, any>();
static maxItems: number = 400;
static get(key: string){
return this.cachedData.get(key);
}
static getOrFetch(key: string, fetcher: (string) => any): any {
let value = this.cachedData.get(key);
if (value != null){
console.log("Cache HIT! (fetcher)");
return value;
}
console.log("Cache MISS... (fetcher)");
value = fetcher(key);
this.add(key, value);
return value;
}
static add(key, value){
this.cachedData.set(key, value);
this.deleteOverflowing();
}
static deleteOverflowing(): void {
if (this.cachedData.size > this.maxItems) {
this.deleteOldest(this.cachedData.size - this.maxItems);
}
}
/// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
/// However that seems not to work. Trying with forEach.
static deleteOldest(howMany: number): void {
//console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
let iterKeys = this.cachedData.keys();
let item: IteratorResult<string>;
while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
//console.debug(" Deleting: " + item.value);
this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
}
}
static clear(): void {
this.cachedData = new Map<string, any>();
}
}
由于我们使用它的方式不同,所以它都是静态的,但是可以随意地将它变成一个正常的类和服务。我不确定angular是否会一直保持一个实例(对于Angular2来说是新的)。
我是这样使用它的:
let httpService: Http = this.http;
function fetcher(url: string): Observable<any> {
console.log(" Fetching URL: " + url);
return httpService.get(url).map((response: Response) => {
if (!response) return null;
if (typeof response.json() !== "array")
throw new Error("Graph REST should return an array of vertices.");
let items: any[] = graphService.fromJSONarray(response.json(), httpService);
return array ? items : items[0];
});
}
// If data is a link, return a result of a service call.
if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
{
// Make an HTTP call.
let url = this.data[verticesLabel][name]["link"];
let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
if (!cachedObservable)
throw new Error("Failed loading link: " + url);
return cachedObservable;
}
我认为可能有更聪明的方法,使用一些可观察的技巧,但这对我的目的来说已经很好了。
RXJS 5.3.0
我对.map(myFunction).publishReplay(1).refCount()不满意
对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)
你可以做的另一件事,就是不使用refCount(),让Observable立即热:
let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;
这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。
你可以简单地使用ngx-cacheable!它更适合你的场景。
使用这个的好处
它只调用rest API一次,缓存响应并为接下来的请求返回相同的响应。
在创建/更新/删除操作后,可以根据需要调用API。
那么,你的服务等级应该是这样的
import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';
const customerNotifier = new Subject();
@Injectable()
export class customersService {
// relieves all its caches when any new value is emitted in the stream using notifier
@Cacheable({
cacheBusterObserver: customerNotifier,
async: true
})
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
addCustomer() {
// some code
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
updateCustomer() {
// some code
}
}
这里有更多的参考链接。