通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们获取这个可观察对象并向其添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。
在RxJs 5中正确的方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?
注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。
RXJS 5.3.0
我对.map(myFunction).publishReplay(1).refCount()不满意
对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)
你可以做的另一件事,就是不使用refCount(),让Observable立即热:
let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;
这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。
您选择的实现将取决于是否希望unsubscribe()取消您的HTTP请求。
在任何情况下,TypeScript装饰器都是标准化行为的好方法。这是我写的:
@CacheObservableArgsKey
getMyThing(id: string): Observable<any> {
return this.http.get('things/'+id);
}
装饰器定义:
/**
* Decorator that replays and connects to the Observable returned from the function.
* Caches the result using all arguments to form a key.
* @param target
* @param name
* @param descriptor
* @returns {PropertyDescriptor}
*/
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
const originalFunc = descriptor.value;
const cacheMap = new Map<string, any>();
descriptor.value = function(this: any, ...args: any[]): any {
const key = args.join('::');
let returnValue = cacheMap.get(key);
if (returnValue !== undefined) {
console.log(`${name} cache-hit ${key}`, returnValue);
return returnValue;
}
returnValue = originalFunc.apply(this, args);
console.log(`${name} cache-miss ${key} new`, returnValue);
if (returnValue instanceof Observable) {
returnValue = returnValue.publishReplay(1);
returnValue.connect();
}
else {
console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
}
cacheMap.set(key, returnValue);
return returnValue;
};
return descriptor;
}
rxjs 5.4.0有一个新的shareereplay方法。
rx-book shareReplay ()
reactivex.io/rxjs没有文档
作者明确地说“这是处理缓存AJAX结果之类的事情的理想选择”
rxjs PR #2443 feat(shaereplay):增加了publishReplay的shaereplay变体
shareereplay返回一个多播源的可观察对象
ReplaySubject。重放主题被错误地从
源,而不是对源的完成。这使得共享重玩
非常适合处理诸如缓存AJAX结果之类的事情
的事情。但是,重复行为不同于分享行为
它不会重复源可观察对象,而是会重复
源可观察对象的值。
上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个问题的回复是@Arlo的回复。
我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码并用shaereplay(1)替换share():
https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a
import { finalize, Observable, share } from 'rxjs';
export function SharedObservable(): MethodDecorator {
const obs$ = new Map<string, Observable<any>>();
return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
const originalMethod = descriptor.value;
descriptor.value = function (...args: any[]) {
const key = JSON.stringify(args);
if (!obs$.has(key)) {
// We have no observable for this key yet, so we create one
const res = originalMethod.apply(this, args).pipe(
share(), // Make the observable hot
finalize(() => obs$.delete(key)) // Cleanup when observable is complete
);
obs$.set(key, res);
}
// Return the cached observable
return obs$.get(key);
};
return descriptor;
};
}
用法:
@SharedObservable()
myFunc(id: number): Observable<any> {
return this.http.get<any>(`/api/someUrl/${id}`);
}