Giancarlo Buomprisco

Giancarlo Buomprisco

·9 min read

RxJS Patterns: Efficiency and Performance

A rundown of all RxJS operators and techniques you can leverage to avoid needless computation and make your code snappier and faster

This post was originally published, by myself, on the Bit blog

RxJS is a library that helps make it easier to compose asynchronous or callback-based code, as described on its home page.

But did you know that RxJS could also help to make your applications **more efficient and performant **thanks to its powerful operators?

In this article, I want to share the most useful operators and techniques that help avoid unnecessary and repeated computations that in turn will make your app faster and more efficient, and all we need is some RxJS magic.

Notice: the examples below are simplified and may not be complete snippets.

Filtering

Filtering operators allow us to filter events from the stream that we want to disregard and avoid sending them to the observable’s subscribers. If we filter the events soon enough in the pipeline, we avoid passing them down to other operators and to the subscription callback.

Of course, this is especially important if the pipeline is doing heavy-computations or HTTP requests.

These operators are primarily used for logic rather than performance reasons, although they’re still useful to avoid computing needless tasks.

The questions you can ask yourself when writing a stream is: should I pass all items through or can I skip them at some point in the pipeline?

filter

The simplest operator to avoid needless computation (among other things) is filter.

If you are already familiar with the method Array.prototype.filter, then you’ll probably know its usage already: we pass a predicate as a parameter to the operator, and if it returns true for the event being streamed, the event will be passed through the pipeline, otherwise, it will be discarded.

const numbers$ = of(1, 2, 3, 4, 5);
const predicate = (n) => n <= 2;

numbers$
    .pipe(
        filter(predicate)
    )
    .subscribe(console.log);
// will log 1,2

distinctUntilChanged

Another type of filtering operator is distinctUntilChanged.

This operator will compare the current value with the previous value of the source Observable, and if these are different, it will then pass the item through. In short, it works just like filter, but will compare the previous and the current value.

A very common scenario that we can use in order to explain how this operator works is receiving inputs from a text-input and skip all the events whose value is unchanged from the previous.

const textChanges$ = fromEvent(textElement, 'input');

textChanges$
    .pipe(
        filter(Boolean),
        distinctUntilChanged()
    )
    .subscribe(console.log);

But this only works for primitive values. If you want to pass a more powerful equality predicate, then you can supply a function and compare manually the previous and current values.

const stream$ = /* some Rx stream with objects */
const isEqual = require('lodash/isEqual');

stream$
    .pipe(
        distinctUntilChanged(isEqual)
    )
    .subscribe(/**/);

Debouncing and Throttling

Debouncing and throttling are techniques used to batch events emitted within a time window in a single emission.

These two techniques are sometimes used and referred to interchangeably even though they achieve similar things in different ways.

throttleTime

The operators throttleTime is used to only emit the first item received within the time window specified in milliseconds, then wait again for the entire amount of the time window before a subsequent event can be emitted.

const textChanges$ = fromEvent(textElement, 'input');

textChanges$
    .pipe(
        filter(Boolean),
        distinctUntilChanged(),
        throttleTime(1000)
    )
    .subscribe(console.log);

Let’s explain this with a simple visual representation:

time in ms : 0---500---1000---1500---2000
events     : _a_____b__________c__d___e__

Which events emitted? a and c! The first event a was collected within the time frame 0 and 1000, the second event b was skipped as it got emitted within the window. Then c emitted, and d and e were filtered.

debounceTime

Contrary to throttleTime, the operator debounceTime is used to only emit the latest item received within the time window specified in milliseconds. Just like throttling, debouncing will wait for the time windows before a new event can be emitted.

const textChanges$ = fromEvent(textElement, 'input');

textChanges$
    .pipe(
        filter(Boolean),
        distinctUntilChanged(),
        debounceTime(1000)
    )
    .subscribe(console.log);

Let’s repeat the same representation used with throttleTime to understand the difference:

time in ms : 0---500---1000---1500---2000
events     : _a_____b__________c__d___e__

In this scenario, only b and e emitted.

How do throttling and debouncing help?

These operators are used to delay and batch the execution of repeated events within a time frame.

They help in various situations where we want to avoid useless execution of commands or expensive operations like HTTP requests.

Imagine user-input changes that trigger requests to a server for each change: if we didn’t debounce it, we would not only spam our service but also degrade the UX for our users. In my experience, debouncing every 250 to 500 ms is the sweet spot to ensure smooth user experience.

Canceling

Canceling subscriptions is an important and too often neglected task that I see very often when reviewing PRs.

Canceling is not only important to reduce useless computations and avoid memory leaks, but also, more importantly, to prevent possible bugs in our applications.

Unsubscribing

The easiest, imperative way to cancel subscriptions is to simply call the unsubscribe method that every subscription object should have implemented.

const inputs$ = fromEvent(element, 'input');
const subscription = inputs.subscribe(/*some work*/);

subscription.unsubscribe();

While this is a totally effective and working example for unsubscribing streams, it is usually not considered a best-practice. In fact, Rx provides powerful operators that can help us achieve the same, but in a more declarative and reactive way.

takeUntil

My favorite way of unsubscribing from streams is takeUntil. This operator will allow you to unsubscribe a stream when the stream passed as input to the operator emits an event. Oh, that seems too complicated, but it’s actually not.

Let’s see an example:

  • we have two observables that emit with an interval of respectively 1000 ms (1 second) and 100 ms

  • the stream that emits every 100ms will unsubscribe when the other stream emits, which will happen every 1 second

// emit every 1 second
const slow$ = interval(1000);

// emit every 100 ms
const fast$ = interval(100).pipe(
    takeUntil(slow$)
);

fast$.subscribe({
    next(n) {
        console.log(n);
    },
    complete() {
        console.log('I am unsubscribed!');
    }
});

slow$.subscribe();

Which will produce the following output:

0
1
2
3
4
5
6
7
8
I am unsubscribed!

takeWhile

This operator is very useful to unsubscribe streams based on their own value. One of the ways I needed to use this operator is to stop certain timers once they reach a certain number of iterations. For instance, a countdown timer.

In the following example, I want to stop a timer once it iterates for 5 times. —

  • the takeWhile operator accepts a predicate function whose parameter is the current value of the stream

  • if the predicate is truthy, it will keep emitting values; if it’s falsy, then it will unsubscribe the stream

const stream$ = interval(1000).pipe(
    takeWhile(n => n < 5)
);

stream$.subscribe({
    next(n) {
        console.log(n)
    },
    complete() {
        console.log('I am unsubscribed!')
    }
});

Which will produce the following output:

0
1
2
3
4
I am unsubscribed!

switchMap

The operator switchMap is commonly used for flattening a stream of observables.

What you may know, is that it has a peculiar behavior: on each emission, instead of maintaining more than one inner observables, it will complete the previous inner observable and then emit the new one.

That is, if we have an in-flight HTTP request, it will be canceled when another emission takes place. Of course, depending on the observable type you use, you’d have different teardown effects.

In the example below, I created a simple snippet that will issue requests when the user inputs values in a form. The request will query Github’s API for repositories and render them on screen.

At the end of the article, we will revisit this example by adding a powerful caching mechanism, and the link to the code.

Batching

In situations where you repeatedly perform a very expensive operation within a very small time frame, such as re-rendering a DOM tree on updates from a stream, batching can help collect updates and render them at once.

The first time I employed this technique was when working with Angular.js: on every update from the back-end, the digest cycle was called so many times it was clocking up the application.

That’s when I thought: why not batch the updates in an array, and then update only every 1 or 2 seconds? And in order to do this, we can use the buffer or bufferTime operators (or, the other operators in the buffering family).

bufferTime

The operator bufferTime is a shortcut for the operator buffer that accepts an amount of time in milliseconds and will batch the stream every n milliseconds in an array.

For example, in the following example we mock up a stream that emits every 500 milliseconds. The buffer will be set at 2 seconds. That means, we collect 4 updates within a 2-second window.

this.stream$ = interval(500);

this.data$ = this.stream$.pipe(
    bufferTime(2000),
    filter((items) => items.length > 0),
    scan((acc, items) => {
        return [...acc, ...items];
    }, [])
);

this.data$.subscribe((items) => {
    /* expensive operation here */

    console.log('re-render!');
});

Let’s summarise the above:

  • we have a stream that emits events with data (in the example, it’s simply an interval with numbers)

  • we batch the events every 2 seconds with bufferTime(2000)

  • we filter all the events that are empty; in our example makes little sense as we always have values, but this is something you may want to do in a real application as sometimes you’ll be receiving data that doesn’t need to be re-rendered

  • we use the scan operator that works just like Array.prototype.reduce. We collect events and group them all in an array — in case we are working with a large list we want to re-render

There are two things to notice when using this technique:

  • test performance! buffering will keep working under the hood collecting events: only use this if rendering, or another expensive operation, is a real bottleneck in your application

  • because of the above, do remember to unsubscribe the stream when not needed

Caching

Every application needs some level of caching for improving efficiency and performance. One of the most common situations where caching is important is HTTP requests, and we can leverage RxJS to make this very easily.

For example, we may not only want to return the cached response, but we may also want to stop any in-flight requests that return the same values, assuming of course that the requests are idempotent.

In the below example, we will extend our repositories search application with a cache. The cache works in this way:

  • we create a Map to store the name and the result of the request

  • when we are about to issue the request, we check if we have a cached result. If not, we go on and execute the request

  • when the request is executed, we place the observable itself in a cache, and we use the operator shareReplay(1) to store in memory the latest 1 emissions. When the observable (in the cache) is subscribed again, it will yield its result rather than issue the request again

The example above is simplified and doesn’t take into account error-handling and such. As a result, if you use it as inspiration for your code, you may want to handle more situations.

If you want to see this in action, please visit the following Stackblitz link.

Takeaways

RxJS is an incredibly powerful tool. While its learning curve might be steep at first, learning how to use it is a great skill to add to your personal developer arsenal.

While it can easily make your code more declarative and readable, it can also help us improve the efficiency and performance of our applications with just a few lines of code added to our streams.

The techniques above are an exhaustive, yet far from complete, list of techniques that use RxJS operators to make your code efficient and performant.


Learn more about
RxJsRxJs