Giancarlo Buomprisco

Giancarlo Buomprisco

·9 min read

Caching RxJS streams into Web Storage

Learn how to cache RxJS streams in your browser's storage

RxJS is a powerful Reactive Functional Programming library that helps us tame asynchronous programming with Javascript and Typescript.

Important frameworks such as Nest and Angular (among others) use RxJS as the tool to handle asynchronous communication, such as for handling HTTP requests.

As a user, it can be very common having to cache some requests (or computationally heavy calculations) in memory or in the user's browsers (using LocalStorage or SessionStorage): for example, fetching from your expensive endpoint only if the latest request sent by the client was over 1 minute ago - or store data locally forever if this never really changes.

In this article, I want to introduce you a library I wrote to handle this sort of situations named cached-observable.

I will walk through the steps to build it from scratch.

The library allows you to cache streams, assign a maximum age (so that the caching expires after a certain amount of time), and store your data in either: memory, SessionStorage, or LocalStorage.

I personally use this library in all my Rx-powered codebases.

Cache Providers

As we've said, we're going to be using three different types of "storage" types.

To make this simple, we're going to define a unified interface CacheProvider<T> with T being the type of the value we're caching, and CachePayload being the data structure of the entry into the cache.

We want to store the following information:

export interface CachePayload<T> {
  value: Observable<T>;
  expiry: number | undefined;
  lastUpdated: number;
}

Let's explain the above:

  • value is the original Observable
  • expiry is the value, in milliseconds, of the timestamp when the cache will expire
  • lastUpdated is there for convenience

Now, we want to define what a cache provider looks like. It will have three methods:

import { CachePayload } from './cache-payload.interface';

export interface CacheProvider<T> {
  get(key: string): CachePayload<T> | undefined;
  set(key: string, value: CachePayload<T>): void;
  unset(key: string): void;
}

Memory Provider

We can now write the implementations; let's start with the in-memory provider.

The in-memory provider helps us cache data in memory, until one of the following events happen:

  • in a browser, when the page is refreshed
  • in NodeJS, when the process restarts
  • when it's manually cleared by the user

To store the data locally, can use a Map data structure to store our data indexed by a unique key provided by the consumer.

NB: we're going to make a global cache, but it's not a great idea. It's best if it's injected into this function by its consumer. For simplicity's sake, we use a global cache.

import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';

// global cache
const cache = new Map<string, CachePayload<unknown>>();

export function memoryCacheProvider<T>(): CacheProvider<T> {
  return {
    get(key: string) {
      return cache.get(key) as CachePayload<T> | undefined;
    },
    set(
      key: string,
      value: CachePayload<T>
    ) {
      cache.set(key, value);
    },
    unset(key: string) {
      cache.delete(key);
    },
  };
}

The above is very simple: we implement the native Map methods using the CacheProvider interface, and some Typescript on top. We do it such that our implementation adheres to the CacheProvider interfaces.

Web Storage Providers

Storing our streams' data in Web Storage is going to be slightly harder.

Problem: the data needs to serializable, and therefore, to be extracted from the Observable and stored as stringified JSON.

Caching our Observable's data into Web Storage using a Setter

Let's start from the setter function.

We need to:

  • subscribe to the Observable
  • extract its value
  • replace the Observable with the original JSON
set(
  key: string,
  payload: CachePayload<T>
) {
  payload.value.pipe(
    take(1),
    filter(Boolean)
  ).subscribe((value) => {
    storage.setItem(
      key,
      JSON.stringify({
        ...payload,
        value,
      }),
    );
  });
}

Getting data from Web Storage to an Observable's Stream

Okay, this is not too hard.

  • we get the value from Local Storage using the provided unique key
  • we parse it from a string to a JSON
  • we wrap the value in an Observable using the of creational operator
  • we return the resulting observable
  • if it does not exist, we return undefined

Of course, we wrap the code block in a try/catch because JSON.parse can go very wrong.

try {
  const payload = storage.getItem(key);

  if (!payload) {
    return;
  }

  const parsed = JSON.parse(payload);

  return {
    ...parsed,
    value: of(parsed.value),
  };
  } catch (e) {
    return;
  }
}

Removing a value from the cache

We simply remove the item from the storage. This is the complete snippet:

import { of, filter, take } from 'rxjs';
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';

interface WebStorage {
  getItem(key: string): string;
  setItem(key: string, value: string): void;
  removeItem(key: string): void;
}

export function storageCacheProvider<T>(
  storage: WebStorage
): CacheProvider<T> {
  return {
    get(key: string) {
      try {
        const payload = storage.getItem(key);

        if (!payload) {
          return;
        }

        const parsed = JSON.parse(payload);

        return {
          ...parsed,
          value: of(parsed.value),
        };
      } catch (e) {
        return;
      }
    },
    set(
      key: string,
      payload: CachePayload<T>
    ) {
      payload.value.pipe(
        take(1),
        filter(Boolean)
      ).subscribe((value) => {
        storage.setItem(
          key,
          JSON.stringify({
            ...payload,
            value,
          }),
        );
      });
    },
    unset(key: string) {
      storage.removeItem(key);
    },
  };
}

Now that we're done defining the methods for Web Storage, we need to create the providers for the actual implementations of WebStorage:

import { CacheProvider } from './cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { storageCacheProvider } from './storage-cache-provider';

// use this as this file is being rejected by Node
declare const window: any;

export function sessionCacheProvider<T>(): CacheProvider<T> {
  const storage = window?.sessionStorage;

  return storage ?
    storageCacheProvider(storage) :
    memoryCacheProvider();
}

export function localCacheProvider<T>(): CacheProvider<T> {
  const storage = window?.localStorage;

  return storage ?
    storageCacheProvider(storage) :
    memoryCacheProvider();
}

You can find below some magic at the top to avoid errors if this somehow gets compiled by the Node runtime; please don't use these providers in Node.

It can be pretty simple to extend it to other "providers", such as Redis.

Provider Factory

The factory below will return the implementation based on which type the consumer requested:

import { CacheProviderType } from './cache-provider-type';
import { localCacheProvider } from './local-cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { sessionCacheProvider } from './session-cache-provider';

export function cacheProviderFactory<T = unknown>() {
  const memoryCache = memoryCacheProvider<T>();

  return {
    ofType(type: CacheProviderType) {
      switch (type) {
        case CacheProviderType.Memory:
          return memoryCache;

        case CacheProviderType.Persistent:
          return localCacheProvider<T>();

        case CacheProviderType.Session:
          return sessionCacheProvider<T>();
      }
    },
  };
}

Cached Observable

We finally have everything we need to finally build our utility.

Let's explore the shape of our functions

function cachedObservable<T = unknown>(
  observable$: Observable<T>,
  key: string,
  maxAge?: number | undefined,
  cacheProviderType = CacheProviderType.Memory,
): Observable<T>
  • observable$ - is the stream we receive in input. If cached, this is what gets returned back to the consumer
  • key - is a unique identifier for each stream; yep, it needs to be unique
  • maxAge this is an optional parameter; if we want the cache to expire after 10 seconds, we would pass 10_000
  • cacheProviderType - also optional; it's the provider type we want to use

We start our function by:

  • instantiating the provider type we want to use to store/retrieve the data from
  • checking if the value exists in the cache
const cacheFactory = cacheProviderFactory<T>();
const cache = cacheFactory.ofType(cacheProviderType);
const cached = cache.get(key);

if (cached) {
  if (
    cached.expiry !== undefined
      && isKeyExpired(cached.expiry)
  ) {
    invalidateCachedObservable(cacheProviderType, key);
  } else {
    return cached.value;
  }
}

// defined somewhere else in the file
function isKeyExpired(expirationDate: number) {
  const currentTimestamp = new Date().getTime();

  return currentTimestamp >= expirationDate;
}

As you can see above:

  • if the cached value exists, AND if it is not expired, we return it to the consumer
  • if it does not exist, we remove the value from memory and continue with storing the value

Let's go on:

const value = observable$.pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
);

// setting expiry
const lastUpdated = +new Date();

const expiry =
  maxAge ? lastUpdated + maxAge : undefined;

// caching value
cache.set(key, {
  value,
  expiry,
  lastUpdated,
});

// return
return value;

We use the operator shareReplay such that new subscription will reuse the same subscription.

That means, if we have an in-flight request (hence, our cache is still missing the value) and another request gets made, instead of executing it, the consumer will receive the cached stream even if it hasn't yet emitted a value. The subject will multicast the resolved value to all its observers.

Aferwards, we set the CachePayload data structure and return the value we just cached to the consumer.

Invalidation

Of course, we also want to offer the ability to invalidate an entry.

It's simple, but we need the consumer to provide which cache provider type to use.

export function invalidateCachedObservable(
  cacheType: CacheProviderType,
  key: string,
) {
  const cacheFactory = cacheProviderFactory();
  const cache = cacheFactory.ofType(cacheType);

  return cache.unset(key);
}

Testing

We can use Jest to make sure our cache works as expected.

Let's create some utilities we can use to make our life easier:

const spy = jest.fn(() => 1);

const tick = (ms = 0) => {
  jest.advanceTimersByTime(ms);
};

const createStream = (key: string) => {
  const stream$ = of(1).pipe(
    mergeMap(() => {
      return of(1).pipe(tap(spy));
    }),
  );

  return lastValueFrom(
    cachedObservable(stream$, key)
  );
};

Now, let's write a Jest test to check our implementation works correctly:

describe('cachedObservable', () =>
  // tell Jest to use fake timers,
  jest.useFakeTimers();

  // advance virtual time by <ms> milliseconds
  const tick = (ms = 0) => {
    jest.advanceTimersByTime(ms);
  };

  it('should return the same value', () => {
    const stream = () => of(1);
    const key = '1';

    const createStream = () => {
      return cachedObservable(
        stream(),
        key
      );
    };

    expect(createStream()).toBe(createStream());
  });

  it('should no longer store the value', async () => {
    const key = '4';

    // we define a spy which is going to
    // be called whenever a new stream
    // ges created
    // if the instance is cached, it won't call it
    const spy = jest.fn(() => 1);

    // cache a stream, for only 500 ms
    const createStream = () => {
      const stream$ = of(1).pipe(
          mergeMap(() => {
              return of(1).pipe(
                tap(spy)
              );
          }),
      );

      // for simplicity we wrap it into a promise
      return lastValueFrom(
        cachedObservable(stream$, key, 500)
      );
    };

    // create the first stream and advance by 250ms
    await createStream();
    await tick(250);

    // create another stream and advance by 250ms
    await createStream();
    await tick(250);

    // create another stream after 500ms have passed
    await createStream();

    // we expect spy was called only twice
    expect(spy).toHaveBeenCalledTimes(2);

    return true;
});

Installing the library

If you're interested in using the library instead of building your own, follow these simples step:

Installing with NPM

npm i cached-observable --save

Usage

Let's assume we're making an HTTP requests, and the HTTP client returns an Observable:

function getTodos() {
  const request$ = this.http.get(url);
  const maxAge = 60_000;

  return cachedObservable(request$, url, maxAge, CacheProviderType.Persistent);
}

If we call this functions n times in the first minute, we're only making one HTTP request; the following requests will be served with data stored in the local storage!

I hope this was helpful! Ciao!


Learn more about
RxJsRxJs