Would you like to TALK about it?

Angular's async pipe and performance - a code walk-through

August 27, 2020

The Angular async pipe is the cornerstone of making applications performant. How exactly does it work? Let’s walk through the code together to understand it.

"Angular"

If you have ever searched for articles about Angular applications and performance, you have read about OnPush change detection. A lot of people jump right into using it, however, I see a tendency of using object mutation for manipulating data and relying on the Default change detection strategy. Usually, when an application is built on object mutation, changing to OnPush change detection breaks the application. There are usually two solutions, one is to revert to using the Default change detection strategy, the other one is injecting the ChangeDetectorRef to every component where a subscription occurs and call its markForCheck() method at the end of the callback function.

Using the Default change detection strategy in these cases won’t increase performance, injecting the ChangeDetectorRef into every component can be rather cumbersome and obnoxious. However, you can avoid it with RxJS, and the async pipe.

Data composition is important

I have met with the phenomena of subscribing to an Observable inside a component and saving the result into a class member property. You might be familiar with this structure as well:

// ...
data: Data[] = [];

constructor(private http: HttpClient) {}

ngOnInit(): void {
  this.http.get(`some/url`).subscribe(result => {
    this.data = result;
  })
}

Instead of assigning to the data property of your component class, you could (and in my humble opinion should) use the async pipe in your template, to subscribe to the observable, and it would handle unsubscribing for you.

{
  {
    data$ | async
  }
}
// ts

data$ = this.http.get(`some/url`)

How does the async pipe work?

@Pipe({ name: "async", pure: false })
export class AsyncPipe implements OnDestroy, PipeTransform {
  // ...
}

The Angular async pipe is not pure. Whenever a pipe has an internal state the pure property of the @Pipe() decorator config should be set to false. This means, that the transform() method of the pipe gets invoked on every change-detection cycle. Since the async pipe usually deals with Observable or Promise inputs, the pipe itself has an internal state for storing the last value. However, to properly handle the teardown logic and to avoid memory leaks, the Subscription, the source (_obj) and the SubscriptionStrategy are stored in memory as well.

// ...
  private _latestValue: any = null;

  private _subscription: SubscriptionLike|Promise<any>|null = null;
  private _obj: Observable<any>|Promise<any>|EventEmitter<any>|null = null;
  private _strategy: SubscriptionStrategy = null!;

  constructor(private _ref: ChangeDetectorRef) {}

//...

As you can see, the ChangeDetectorRef is injected into every async pipe instance, but more on that later. First, let’s check the SubscriptionStrategy interface. The classes that implement this interface must have the following methods: createSubscription, dispose and onDestroy. The first creates the subscription, dispose and onDestroy are responsible for handling the teardown logic, so memory leaks can be avoided.

interface SubscriptionStrategy {
  createSubscription(
    async: Observable<any> | Promise<any>,
    updateLatestValue: any
  ): SubscriptionLike | Promise<any>
  dispose(subscription: SubscriptionLike | Promise<any>): void
  onDestroy(subscription: SubscriptionLike | Promise<any>): void
}

class ObservableStrategy implements SubscriptionStrategy {
  createSubscription(
    async: Observable<any>,
    updateLatestValue: any
  ): SubscriptionLike {
    return async.subscribe({
      next: updateLatestValue,
      error: (e: any) => {
        throw e
      },
    })
  }

  dispose(subscription: SubscriptionLike): void {
    subscription.unsubscribe()
  }

  onDestroy(subscription: SubscriptionLike): void {
    subscription.unsubscribe()
  }
}

class PromiseStrategy implements SubscriptionStrategy {
  createSubscription(
    async: Promise<any>,
    updateLatestValue: (v: any) => any
  ): Promise<any> {
    return async.then(updateLatestValue, e => {
      throw e
    })
  }

  dispose(subscription: Promise<any>): void {}

  onDestroy(subscription: Promise<any>): void {}
}

const _promiseStrategy = new PromiseStrategy()
const _observableStrategy = new ObservableStrategy()

// ... Pipe class declaration

The ObservableStartegy and the PromiseStrategy classes are wrappers around the logic that needs to be handled. While the dispose and onDestroy methods for Promise handling are void methods, the Observable strategy calls .unsubscribe() in both of those methods. Although, the onDestroy method never gets called in the async_pipe.ts file the dispose method handles unsubscribing.

@Pipe({ name: "async", pure: false })
export class AsyncPipe implements OnDestroy, PipeTransform {
  // ...

  ngOnDestroy(): void {
    if (this._subscription) {
      this._dispose()
    }
  }

  // ...

  private _dispose(): void {
    this._strategy.dispose(this._subscription!)
    this._latestValue = null
    this._subscription = null
    this._obj = null
  }

  // ...
}

As it shows, the async pipe implements the OnDestroy lifecycle hook, and if there is a subscription stored in the instance, it calls the internal _dispose() method. This method calls dispose on the internally stored _strategy, and sets everything to null. When this occurs, the JS engine’s garbage collector will deal with the rest.

// ...
  transform<T>(obj: null): null;
  transform<T>(obj: undefined): undefined;
  transform<T>(obj: Observable<T>|null|undefined): T|null;
  transform<T>(obj: Promise<T>|null|undefined): T|null;
  transform(obj: Observable<any>|Promise<any>|null|undefined): any {
    if (!this._obj) {
      if (obj) {
        this._subscribe(obj);
      }
      return this._latestValue;
    }

    if (obj !== this._obj) {
      this._dispose();
      return this.transform(obj as any);
    }

    return this._latestValue;
  }

// ...

The transform() method always returns the internally stored _latestValue, therefore whenever an async pipe is used, the first value is always null. The first time the method gets called, and the provided parameter is neither null nor undefined, a subscription occurs. This internal _subscribe method handles a couple of things. It saves the reference of the pipe’s target, then selects the proper strategy for it via the Angular internal ɵisPromise and ɵisObservable helper functions.

  private _subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
    this._obj = obj;
    this._strategy = this._selectStrategy(obj);
    this._subscription = this._strategy.createSubscription(
        obj, (value: Object) => this._updateLatestValue(obj, value));
  }

  private _selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
    if (ɵisPromise(obj)) {
      return _promiseStrategy;
    }

    if (ɵisObservable(obj)) {
      return _observableStrategy;
    }

    throw invalidPipeArgumentError(AsyncPipe, obj);
  }

Finally, it saves the subscription by creating it with the createSubscription method, providing the internal _updateLatestValue callback method. This method checks if the internally stored Observable and the passed Observable are the same and have the same reference. If they are, the _latestValue is updated, and the ChangeDetectorRef’s markForCheck() method gets called, triggering a change detection when the subscribed Observable emits a new value. This is the part, where using RxJS and the async pipe handles using the OnPush change detection strategy.

  private _updateLatestValue(async: any, value: Object): void {
    if (async === this._obj) {
      this._latestValue = value;
      this._ref.markForCheck();
    }
  }

That is not all, since the pipe’s target can be a new Observable instance as well. Since Observables are objects, they are passed by reference. Therefore, whenever you assign a new observable to a member property, the transform method runs recursively.

// ...
  transform(obj: Observable<any>|Promise<any>|null|undefined): any {
    if (!this._obj) {
      if (obj) {
        this._subscribe(obj);
      }
      return this._latestValue;
    }

    if (obj !== this._obj) {
      this._dispose();
      return this.transform(obj as any);
    }

    return this._latestValue;
  }

// ...

You’ll notice, when there is an existing subscription, the internally stored and target Observables are checked against each other, and if they differ by reference, the old (internally stored) Observable gets disposed, and the transform method gets called recursively to create a new subscription.

Example in action

Let’s create a component with two timers. One timer should emit every 2 seconds, and it should use the async pipe, the other one should emit every second, but it should use object mutation. For now, let’s use the default change detection strategy.

@Component({
  selector: "app-test",
  template: `
    <div>Timer 1: {{ timer1$ | async }}</div>
    <div>Timer 2: {{ timer2 }}</div>
  `,
})
export class TestComponent {
  timer1$ = timer(0, 2000)

  timer2 = 0

  constructor() {
    timer(0, 1000).subscribe(count => {
      timer2 = count
    })
  }
}

When using the Default change detection strategy, you can see that timer2 gets increased by 1 every second, and timer1$ with the async pipe gets increased by 1 every two seconds. Now let’s switch to OnPush change detection, by adding changeDetection: ChangeDetectionStrategy.OnPush to the component decorator.

Now the timer2 binding gets increased by 2 every 2 seconds, and the timer1$ behaves the same way as before, namely, it gets increased by 1 every 2 seconds. Why does timer2 get increased when timer1$ emits? Because the async pipe triggers change detection. If you comment out the {{ timer$1 | async }} part from the template, you can observe that nothing gets updated.

Conclusion

Using the async pipe and understanding how it works allows us to write better-performing applications. When you use OnPush change detection, Angular can work more efficiently, because it does not have to watch for object mutation. In these cases, RxJS and data composition can help you in making reactive and performant applications.


Written by Balázs Tápai.
I believe in quality software development. I make software to make people's lives better. A well written and well tested codebase helps you and the developers that come after you.
You can follow me on Twitter or Github.