通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们获取这个可观察对象并向其添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。
在RxJs 5中正确的方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?
注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。
你可以构建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器检索到的数据:
declare type GetDataHandler<T> = () => Observable<T>;
export class Cacheable<T> {
protected data: T;
protected subjectData: Subject<T>;
protected observableData: Observable<T>;
public getHandler: GetDataHandler<T>;
constructor() {
this.subjectData = new ReplaySubject(1);
this.observableData = this.subjectData.asObservable();
}
public getData(): Observable<T> {
if (!this.getHandler) {
throw new Error("getHandler is not defined");
}
if (!this.data) {
this.getHandler().map((r: T) => {
this.data = r;
return r;
}).subscribe(
result => this.subjectData.next(result),
err => this.subjectData.error(err)
);
}
return this.observableData;
}
public resetCache(): void {
this.data = null;
}
public refresh(): void {
this.resetCache();
this.getData();
}
}
使用
声明Cacheable<>对象(假设是服务的一部分):
list: Cacheable<string> = new Cacheable<string>();
和处理程序:
this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}
从组件调用:
//gets data from server
List.getData().subscribe(…)
您可以有多个组件订阅到它。
更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable
更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。
以前……
首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。
Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。
这是一个可重用函数
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
下面是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
rxjs 5.4.0有一个新的shareereplay方法。
rx-book shareReplay ()
reactivex.io/rxjs没有文档
作者明确地说“这是处理缓存AJAX结果之类的事情的理想选择”
rxjs PR #2443 feat(shaereplay):增加了publishReplay的shaereplay变体
shareereplay返回一个多播源的可观察对象
ReplaySubject。重放主题被错误地从
源,而不是对源的完成。这使得共享重玩
非常适合处理诸如缓存AJAX结果之类的事情
的事情。但是,重复行为不同于分享行为
它不会重复源可观察对象,而是会重复
源可观察对象的值。
上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个问题的回复是@Arlo的回复。
我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码并用shaereplay(1)替换share():
https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a
import { finalize, Observable, share } from 'rxjs';
export function SharedObservable(): MethodDecorator {
const obs$ = new Map<string, Observable<any>>();
return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
const originalMethod = descriptor.value;
descriptor.value = function (...args: any[]) {
const key = JSON.stringify(args);
if (!obs$.has(key)) {
// We have no observable for this key yet, so we create one
const res = originalMethod.apply(this, args).pipe(
share(), // Make the observable hot
finalize(() => obs$.delete(key)) // Cleanup when observable is complete
);
obs$.set(key, res);
}
// Return the cached observable
return obs$.get(key);
};
return descriptor;
};
}
用法:
@SharedObservable()
myFunc(id: number): Observable<any> {
return this.http.get<any>(`/api/someUrl/${id}`);
}
RXJS 5.3.0
我对.map(myFunction).publishReplay(1).refCount()不满意
对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)
你可以做的另一件事,就是不使用refCount(),让Observable立即热:
let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;
这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。
我认为@ngx-cache/core对于维护http调用的缓存功能是有用的,特别是如果http调用是在浏览器和服务器平台上进行的。
假设我们有以下方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
你可以在第一次执行时使用@ngx-cache/core的Cached装饰器将HTTP调用方法的返回值存储在缓存存储中(存储可以配置,请检查ng-seed/universal的实现)。下一次调用该方法时(无论是在浏览器平台上还是在服务器平台上),将从缓存存储中检索该值。
import { Cached } from '@ngx-cache/core';
...
@Cached('get-customer') // the cache key/identifier
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
还可以使用缓存API使用缓存方法(has、get、set)。
anyclass.ts
...
import { CacheService } from '@ngx-cache/core';
@Injectable()
export class AnyClass {
constructor(private readonly cache: CacheService) {
// note that CacheService is injected into a private property of AnyClass
}
// will retrieve 'some string value'
getSomeStringValue(): string {
if (this.cache.has('some-string'))
return this.cache.get('some-string');
this.cache.set('some-string', 'some string value');
return 'some string value';
}
}
下面是客户端和服务器端缓存的包列表:
@ngx-cache/core:缓存实用程序
@ngx-cache/platform- Browser: SPA/Browser平台实现
@ngx-cache/platform-server:服务器平台实现
@ngx-cache/fs-storage:存储工具(服务器平台需要)