什么时候我应该存储订阅实例和调用unsubscribe()在ngOnDestroy生命周期,什么时候我可以简单地忽略它们?

保存所有订阅会给组件代码带来很多麻烦。

HTTP客户端指南忽略这样的订阅:

getHeroes() {
  this.heroService.getHeroes()
                  .subscribe(
                     heroes => this.heroes = heroes,
                     error =>  this.errorMessage = <any>error);
}

同时,《航路指南》指出:

最终,我们会航行到别的地方。路由器将从DOM中移除这个组件并销毁它。在那之前,我们得把自己弄干净。具体来说,我们必须在Angular销毁该组件之前取消订阅。如果不这样做,可能会产生内存泄漏。 我们在ngOnDestroy方法中取消订阅我们的可观察对象。

private sub: any;

ngOnInit() {
  this.sub = this.route.params.subscribe(params => {
     let id = +params['id']; // (+) converts string 'id' to a number
     this.service.getHero(id).then(hero => this.hero = hero);
   });
}

ngOnDestroy() {
  this.sub.unsubscribe();
}

当前回答

DisposeBag

这个想法的灵感来自RxSwift的DisposeBag,所以我决定开发一个类似但简单的结构。

DisposeBag是一个数据结构,它包含对所有打开订阅的引用。它促进了组件中订阅的处理,同时为我们提供了api来跟踪打开订阅的状态。

优势

非常简单的API,使您的代码看起来简单和小。 提供用于跟踪开放订阅状态的API(允许您显示不确定的进度条) 没有依赖注入/包。

使用

在组件:

@Component({
  selector: 'some-component',
  templateUrl: './some-component.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class SomeComponent implements OnInit, OnDestroy {

  public bag = new DisposeBag()
  
  constructor(private _change: ChangeDetectorRef) {
  }

  ngOnInit(): void {

    // an observable that takes some time to finish such as an api call.
    const aSimpleObservable = of(0).pipe(delay(5000))

    // this identifier allows us to track the progress for this specific subscription (very useful in template)
    this.bag.subscribe("submission", aSimpleObservable, () => { 
      this._change.markForCheck() // trigger UI change
     })
  }

  ngOnDestroy(): void {
    // never forget to add this line.
    this.bag.destroy()
  }
}

在模板:


<!-- will be shown as long as the submission subscription is open -->
<span *ngIf="bag.inProgress('submission')">submission in progress</span>

<!-- will be shown as long as there's an open subscription in the bag  -->
<span *ngIf="bag.hasInProgress">some subscriptions are still in progress</span>

实现

import { Observable, Observer, Subject, Subscription, takeUntil } from "rxjs";


/**
 * This class facilitates the disposal of the subscription in our components.
 * instead of creating _unsubscribeAll and lots of boilerplates to create different variables for Subscriptions; 
 * you can just easily use subscribe(someStringIdentifier, observable, observer). then you can use bag.inProgress() with
 * the same someStringIdentifier on you html or elsewhere to determine the state of the ongoing subscription.
 *
 *  don't forget to add onDestroy() { this.bag.destroy() }
 * 
 *  Author: Hamidreza Vakilian (hvakilian1@gmail.com)
 * @export
 * @class DisposeBag
 */
export class DisposeBag {
    private _unsubscribeAll: Subject<any> = new Subject<any>();

    private subscriptions = new Map<string, Subscription>()


    /**
     * this method automatically adds takeUntil to your observable, adds it to a private map.
     * this method enables inProgress to work. don't forget to add onDestroy() { this.bag.destroy() }
     *
     * @template T
     * @param {string} id
     * @param {Observable<T>} obs
     * @param {Partial<Observer<T>>} observer
     * @return {*}  {Subscription}
     * @memberof DisposeBag
     */
    public subscribe<T>(id: string, obs: Observable<T>, observer: Partial<Observer<T>> | ((value: T) => void)): Subscription {
        if (id.isEmpty()) {
            throw new Error('disposable.subscribe is called with invalid id')
        }
        if (!obs) {
            throw new Error('disposable.subscribe is called with an invalid observable')
        }

        /* handle the observer */
        let subs: Subscription
        if (typeof observer === 'function') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else if (typeof observer === 'object') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else {
            throw new Error('disposable.subscribe is called with an invalid observer')
        }

        /* unsubscribe from the last possible subscription if in progress. */
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs && !possibleSubs.closed) {
            console.info(`Disposebag: a subscription with id=${id} was disposed and replaced.`)
            possibleSubs.unsubscribe()
        }

        /* store the reference in the map */
        this.subscriptions.set(id, subs)

        return subs
    }


    /**
     * Returns true if any of the registered subscriptions is in progress.
     *
     * @readonly
     * @type {boolean}
     * @memberof DisposeBag
     */
    public get hasInProgress(): boolean {
        return Array.from(this.subscriptions.values()).reduce(
            (prev, current: Subscription) => { 
                return prev || !current.closed }
            , false)
    }

    /**
     * call this from your template or elsewhere to determine the state of each subscription.
     *
     * @param {string} id
     * @return {*} 
     * @memberof DisposeBag
     */
    public inProgress(id: string) {
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs) {
            return !possibleSubs.closed
        } else {
            return false
        }
    }


    /**
     * Never forget to call this method in your onDestroy() method of your components.
     *
     * @memberof DisposeBag
     */
    public destroy() {
        this._unsubscribeAll.next(null);
        this._unsubscribeAll.complete();
    }
}

其他回答

基于:使用类继承来钩子到Angular 2的组件生命周期

另一种通用方法:

导出抽象类UnsubscribeOnDestroy实现OnDestroy { protected d$: Subject<any>; 构造函数(){ 这一点。d$ = new Subject<void>(); const f = this.ngOnDestroy; 这一点。ngOnDestroy = () => { f (); this.d $ . next (); this.d .complete美元(); }; } public ngOnDestroy() { / /空操作 } }

并使用:

@ component ({ 选择器:“my-comp”, 模板:“ }) 导出类RsvpFormSaveComponent扩展UnsubscribeOnDestroy实现OnInit { 构造函数(){ 超级(); } ngOnInit(): void { Observable.of (bla) .takeUntil (this.d $) .subscribe(val => console.log(val)); } }

订阅类有一个有趣的特性:

表示一个一次性资源,比如Observable的执行。订阅有一个重要的方法,即unsubscribe,它不接受参数,只处理订阅所持有的资源。 此外,可以通过add()方法将订阅分组在一起,该方法将把一个子订阅附加到当前订阅。当订阅未订阅时,它的所有子(及其孙辈)也将被取消订阅。

您可以创建一个聚合订阅对象,对所有订阅进行分组。 为此,您可以创建一个空的Subscription,并使用它的add()方法向其添加订阅。当组件被销毁时,只需要取消订阅聚合订阅。

@Component({ ... })
export class SmartComponent implements OnInit, OnDestroy {
  private subscriptions = new Subscription();

  constructor(private heroService: HeroService) {
  }

  ngOnInit() {
    this.subscriptions.add(this.heroService.getHeroes().subscribe(heroes => this.heroes = heroes));
    this.subscriptions.add(/* another subscription */);
    this.subscriptions.add(/* and another subscription */);
    this.subscriptions.add(/* and so on */);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

我尝试了seangwright的解决方案(编辑3)

这对timer或interval创建的Observable不起作用。

然而,我用另一种方法让它工作:

import { Component, OnDestroy, OnInit } from '@angular/core';
import 'rxjs/add/operator/takeUntil';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/Rx';

import { MyThingService } from '../my-thing.service';

@Component({
   selector: 'my-thing',
   templateUrl: './my-thing.component.html'
})
export class MyThingComponent implements OnDestroy, OnInit {
   private subscriptions: Array<Subscription> = [];

  constructor(
     private myThingService: MyThingService,
   ) { }

  ngOnInit() {
    const newSubs = this.myThingService.getThings()
        .subscribe(things => console.log(things));
    this.subscriptions.push(newSubs);
  }

  ngOnDestroy() {
    for (const subs of this.subscriptions) {
      subs.unsubscribe();
   }
 }
}

根据@seangwright的回答,我写了一个抽象类来处理组件中“无限”的可观察对象的订阅:

import { OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { PartialObserver } from 'rxjs/Observer';

export abstract class InfiniteSubscriberComponent implements OnDestroy {
  private onDestroySource: Subject<any> = new Subject();

  constructor() {}

  subscribe(observable: Observable<any>): Subscription;

  subscribe(
    observable: Observable<any>,
    observer: PartialObserver<any>
  ): Subscription;

  subscribe(
    observable: Observable<any>,
    next?: (value: any) => void,
    error?: (error: any) => void,
    complete?: () => void
  ): Subscription;

  subscribe(observable: Observable<any>, ...subscribeArgs): Subscription {
    return observable
      .takeUntil(this.onDestroySource)
      .subscribe(...subscribeArgs);
  }

  ngOnDestroy() {
    this.onDestroySource.next();
    this.onDestroySource.complete();
  }
}

要使用它,只需在你的angular组件中扩展它,并调用subscribe()方法,如下所示:

this.subscribe(someObservable, data => doSomething());

它还像往常一样接受错误和完整的回调,接受一个观察者对象,或者根本不接受回调。如果你在子组件中也实现了这个方法,记得调用super.ngOnDestroy()。

在这里可以找到Ben Lesh的另一篇参考文章:RxJS: Don’t Unsubscribe。

你不需要有一堆订阅和取消手动订阅。使用Subject和takeUntil组合来像boss一样处理订阅:

import { Subject } from "rxjs"
import { takeUntil } from "rxjs/operators"

@Component({
  moduleId: __moduleName,
  selector: "my-view",
  templateUrl: "../views/view-route.view.html"
})
export class ViewRouteComponent implements OnInit, OnDestroy {
  componentDestroyed$: Subject<boolean> = new Subject()

  constructor(private titleService: TitleService) {}

  ngOnInit() {
    this.titleService.emitter1$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something 1 */ })

    this.titleService.emitter2$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something 2 */ })

    //...

    this.titleService.emitterN$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something N */ })
  }

  ngOnDestroy() {
    this.componentDestroyed$.next(true)
    this.componentDestroyed$.complete()
  }
}

@acumartini在评论中提出了另一种方法,使用takeWhile而不是takeUntil。你可能更喜欢它,但要注意,这样你的Observable的执行就不会在组件的ngDestroy上被取消(例如,当你进行耗时的计算或等待来自服务器的数据时)。方法没有这个缺陷,它会导致立即取消请求。感谢@AlexChe在评论中的详细解释。

代码如下:

@Component({
  moduleId: __moduleName,
  selector: "my-view",
  templateUrl: "../views/view-route.view.html"
})
export class ViewRouteComponent implements OnInit, OnDestroy {
  alive: boolean = true

  constructor(private titleService: TitleService) {}

  ngOnInit() {
    this.titleService.emitter1$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something 1 */ })

    this.titleService.emitter2$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something 2 */ })

    // ...

    this.titleService.emitterN$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something N */ })
  }

  ngOnDestroy() {
    this.alive = false
  }
}