RxJS Subjects in Depth

·
·7 min read
Cover Image for RxJS Subjects in Depth

This post was originally published, by myself, on the Bit blog

In this article, I want to explore the topic of RxJS’s implementation of Subjects, a utility that is increasingly getting awareness and love from the community.

In the past, I have used Subjects in a variety of ways, but sometimes not fully understanding what they are internally and what are the main differences with Observables.

This is what the article will cover:

  • What is a Subject?

  • Multicasting and Unicasting

  • Other types of Subject: AsyncSubject, ReplaySubject, and BehaviorSubject

What is a Subject?

Let’s start with a simple question: what is a Subject? According to Rx’s website:

A Subject is a special type of Observable that allows values to be multicasted to many Observers.

Subjects are like EventEmitters.

If this is unclear, hang on, by the end of the article you’ll have a much clearer understanding of what a Subject is and all the way you can make use of them.

The definition stated by the Rx documentation initially struck me: in fact, I always thought of Subjects as purely a way to both pull and push values using streams. It turns out, I didn’t really know them full well even after having used them daily for about 5 years.

Subject

Subject is a class that internally extends Observable. A Subject is both an Observable and an Observer that allows values to be multicasted to many Observers, unlike Observables, where each subscriber owns an independent execution of the Observable.

That means:

  • you can subscribe to a Subject to pull values from its stream

  • you can feed values to the stream by calling the method next()

  • you can even pass a Subject as an Observer to an Observable: as mentioned above, a Subject is also an Observer, and as a such, it implements the methods next, error and complete

Let’s see a quick example:

const subject$ = new Subject();

// Pull values
subject$.subscribe(
    console.log, 
    null, 
    () => console.log('Complete!')
);

// Push values
subject$.next('Hello World');

// Use Subject as an Observer
const numbers$ = of(1, 2, 3);
numbers$.subscribe(subject$);

/* Output below */

// Hello Word
// 1
// 2
// 3
// Complete!

The Internals of a Subject

Internally, every Subject maintains a registry (as an array) of observers. This is how internally a Subject works, in a nutshell:

  • every time a new observer subscribes, the Subject will store the observer in the observers' array

  • when a new item is emitted (i.e. the method next() was called), the Subject will loop through the observers and emit the same value to each one of them (multicasting). The same will happen when it errors or completes

  • when a Subject completes, all the observers will be automatically unsubscribed

  • when a Subject is unsubscribed, instead, the subscriptions will still be alive. The observers’ array is nullified, but it doesn’t unsubscribe them. If you attempt to emit a value from an unsubscribed Subject, it will actually throw an error. The best course of action should be to complete your subjects when you need to dispose of them and their observers

  • when one of the observers is unsubscribed, it will then be removed from the registry

Multicasting

Passing a Subject as an Observer allows to convert the Observable’s behavior from unicast to multicast. Using a Subject is, indeed, the only way to make an Observable multicast, which means they will share the same execution with multiple Observers.

Wait, though: what does sharing execution actually mean? Let’s see two examples to understand the concept better.

Let’s use the observable interval as an example: we want to create an observable that emits every 1000 ms (1 second), and we want to share the execution with all the subscribers, regardless of when they subscribed.

const subject$ = new Subject<number>();

const observer = {
    next: console.log
};

const observable$ = interval(1000);

// subscribe after 1 second
setTimeout(() => {
    console.log("Subscribing first observer");    
    subject$.subscribe(observer);
}, 1000);

// subscribe after 2 seconds
setTimeout(() => {
    console.log("Subscribing second observer");
    subject$.subscribe(observer);
}, 2000);

// subscribe using subject$ as an observer
observable$.subscribe(subject$);

Let’s summarize the snippet above

  • we create a subject called subject$ and an observer which simply logs the current value after each emission

  • we create an observable that emits every 1 second (using interval)

  • we subscribe respectively after 1 and 2 seconds

  • finally, we use the subject as an observer and subscribe to the interval observable

Let’s see the output:

As you can see in the image above, even if the second observable subscribed after 1 second, the values emitted to the 2 observers are exactly the same. Indeed, they share the same source observable.

Another common example that shows the usefulness of multicasting is subscribing to an observable that executes an HTTP request, a scenario that happens often in frameworks such as Angular: by multicasting the observable, you can avoid executing multiple requests and share the execution with multiple subscribers, that will receive the same value.

AsyncSubject

I personally find AsyncSubject the least known type of Subject, simply because I never really needed it, or more likely I didn’t know I could have needed it.

In short, the AsyncSubject will:

  • emit only once it completes

  • emit only the latest value it received

const asyncSubject$ = new AsyncSubject();

asyncSubject$.next(1);
asyncSubject$.next(2);
asyncSubject$.next(3);

asyncSubject$.subscribe(console.log);

// ... nothing happening!

asyncSubject$.complete();

// 3

As you can see, even if we subscribed, nothing happened until we called the method complete.

ReplaySubject

Before introducing a ReplaySubject, let’s see a common situation where normal Subjects are used:

  • we create a subject

  • somewhere in our app, we start pushing values to the subject, but there’s no subscriber yet

  • at some point, the first observer subscribes

  • we expect the observer to emit the values (all of them? or only the last one?) that were previously pushed through the subject

  • … nothing happening! In fact, a Subject has no memory

const subject$ = new Subject();

// somewhere else in our app
subject.next(/* value */);

// somewhere in our app
subject$.subscribe(/* do something */);

// nothing happening

This is one of the situations when a ReplaySubject can help us: in fact, a Subject will record the values emitted and will push to the observer all the values emitted when a subscribed.

Let’s get back to the question above: does a ReplaySubject replay all the emissions or only the latest one?

Well, by default, the subject will replay all the items emitted, but we can provide an argument called bufferSize. This argument defines the number of emissions that the ReplaySubject should keep in its memory:

const subject$ = new ReplaySubject(1);

subject$.next(1);
subject$.next(2);
subject$.next(3);

subject$.subscribe(console.log);

// Output
// 3

There’s also a second argument that can be passed to ReplaySubject in order to define how long the old values should be stored in memory.

const subject$ = new ReplaySubject(100, 250);

setTimeout(() => subject$.next(1), 50);
setTimeout(() => subject$.next(2), 100);
setTimeout(() => subject$.next(3), 150);
setTimeout(() => subject$.next(4), 200);
setTimeout(() => subject$.next(5), 250);

setTimeout(() => {
    subject$.subscribe(v => console.log('SUBCRIPTION A', v));
}, 200);

setTimeout(() => {
    subject$.subscribe(v => console.log('SUBCRIPTION B', v));
}, 400);
  • we create a ReplaySubject whose bufferSize is 100 and windowTime 250

  • we emit 5 values every 50ms

  • we subscribe the first time after 200ms and the second time after 400ms

Let’s analyze the output:

SUBCRIPTION A 1
SUBCRIPTION A 2
SUBCRIPTION A 3
SUBCRIPTION A 4
SUBCRIPTION A 5
SUBCRIPTION B 4
SUBCRIPTION B 5

The subscription A was able to replay all the items, but the subscription B was only able to replay items 4 and 5, as they were the only ones emitted within the window time specified.

BehaviorSubject

BehaviorSubject is probably the most well-known subclass of Subject. This kind of Subject represents the “current value”.

Interestingly, the Combine framework named it CurrentValueSubject

Similarly to ReplaySubject, it will also replay the current value whenever an observer subscribes to it.

In order to use BehaviorSubject we need to provide a mandatory initial value when this gets instantiated.

const subject$ = new BehaviorSubject(0); // 0 is the initial value

subject$.next(1);

setTimeout(() => {
    subject$.subscribe(console.log);
}, 200);

// 1

Whenever a new value is emitted, the BehaviorSubject will store the value in the property value which can also be publicly accessed.

Final Words

Rx Subjects are quite powerful tools, and like any powerful tool in software engineering, they can also be easily abused. The concept of unicasting and multicasting is a striking distinction that you need to take into account when working with Rx.

Understanding how Subjects work internally can be fairly beneficial to avoid common pitfalls and bugs, but also to understand when you need them and when, instead, you don’t.


Learn more about
RxJsRxJs

Cover Image for A Reactive Enum with Typescript and RxJs
·5 min read·
RxJsRxJs

Typescript's template literals' types allow us to generate dynamic and typed code, together. In this article, I want to show how we can build a dynamic reactive enum with TS and RxJS

Cover Image for Caching RxJS streams into Web Storage
·8 min read·
RxJsRxJs

In this article I will introduce you to a simple utility that allows you to cache RxJS streams in memory or the browser's storage

Cover Image for 5 common mistakes with RxJS
·6 min read·
RxJsRxJs

A list of common mistakes while using RxJS, and explanations on what to do instead

Cover Image for RxJS Patterns: Efficiency and Performance
·10 min read·
RxJsRxJs

A rundown of all RxJS operators and techniques you can leverage to avoid needless computation and make your code snappier and faster

Cover Image for A simple Countdown with RxJS
·5 min read·
RxJsRxJs

In this tutorial, we’re going to build a very simple timer application with only a few lines of code using RxJS