通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们获取这个可观察对象并向其添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。
在RxJs 5中正确的方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?
注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。
更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。
以前……
首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。
Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。
这是一个可重用函数
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
下面是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
编辑:从2021年开始,正确的方法是使用RxJs原生提出的shareReplay操作符。详见下面的回答。
缓存数据,如果可用,则返回此数据,否则发出HTTP请求。
import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';
@Injectable()
export class DataService {
private url: string = 'https://cors-test.appspot.com/test';
private data: Data;
private observable: Observable<any>;
constructor(private http: Http) {}
getData() {
if(this.data) {
// if `data` is available just return it as `Observable`
return Observable.of(this.data);
} else if(this.observable) {
// if `this.observable` is set then the request is in progress
// return the `Observable` for the ongoing request
return this.observable;
} else {
// example header (not necessary)
let headers = new Headers();
headers.append('Content-Type', 'application/json');
// create the request, store the `Observable` for subsequent subscribers
this.observable = this.http.get(this.url, {
headers: headers
})
.map(response => {
// when the cached data is available we don't need the `Observable` reference anymore
this.observable = null;
if(response.status == 400) {
return "FAILURE";
} else if(response.status == 200) {
this.data = new Data(response.json());
return this.data;
}
// make it shared so more than one subscriber can get the result
})
.share();
return this.observable;
}
}
}
砰砰作响的例子
这篇文章https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html是一个很好的解释如何缓存共享播放。
你可以简单地使用ngx-cacheable!它更适合你的场景。
使用这个的好处
它只调用rest API一次,缓存响应并为接下来的请求返回相同的响应。
在创建/更新/删除操作后,可以根据需要调用API。
那么,你的服务等级应该是这样的
import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';
const customerNotifier = new Subject();
@Injectable()
export class customersService {
// relieves all its caches when any new value is emitted in the stream using notifier
@Cacheable({
cacheBusterObserver: customerNotifier,
async: true
})
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
addCustomer() {
// some code
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
updateCustomer() {
// some code
}
}
这里有更多的参考链接。
rxjs 5.4.0有一个新的shareereplay方法。
rx-book shareReplay ()
reactivex.io/rxjs没有文档
作者明确地说“这是处理缓存AJAX结果之类的事情的理想选择”
rxjs PR #2443 feat(shaereplay):增加了publishReplay的shaereplay变体
shareereplay返回一个多播源的可观察对象
ReplaySubject。重放主题被错误地从
源,而不是对源的完成。这使得共享重玩
非常适合处理诸如缓存AJAX结果之类的事情
的事情。但是,重复行为不同于分享行为
它不会重复源可观察对象,而是会重复
源可观察对象的值。
只需在map之后和任何订阅之前调用share()。
在我的例子中,我有一个通用服务(RestClientService.ts),它进行其余调用,提取数据,检查错误,并将可观察对象返回到具体的实现服务(f.ex)。: ContractClientService.ts),最后这个具体的实现将可观察对象返回给de ContractComponent。Ts,这个订阅来更新视图。
RestClientService.ts:
export abstract class RestClientService<T extends BaseModel> {
public GetAll = (path: string, property: string): Observable<T[]> => {
let fullPath = this.actionUrl + path;
let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
observable = observable.share(); //allows multiple subscribers without making again the http request
observable.subscribe(
(res) => {},
error => this.handleError2(error, "GetAll", fullPath),
() => {}
);
return observable;
}
private extractData(res: Response, property: string) {
...
}
private handleError2(error: any, method: string, path: string) {
...
}
}
ContractService.ts:
export class ContractService extends RestClientService<Contract> {
private GET_ALL_ITEMS_REST_URI_PATH = "search";
private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
public getAllItems(): Observable<Contract[]> {
return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
}
}
ContractComponent.ts:
export class ContractComponent implements OnInit {
getAllItems() {
this.rcService.getAllItems().subscribe((data) => {
this.items = data;
});
}
}