Caching RxJS streams into Web Storage

·
·8 min read
Cover Image for Caching RxJS streams into Web Storage

Frameworks such as Nest and Angular (among others) use RxJS as an async API for handling HTTP requests.

If you're a user of any of these, it may be common that you need to cache some requests; for example, fetching from your expensive endpoint only if the latest request sent by the client was over 1 minute ago - or store data locally forever if this never really changes.

In this article, I want to introduce you to the library cached-observable, and walk through the steps to build it from scratch. I use this library in all my Angular/Nest projects.

The library allows you to cache streams, assign a maximum age (so that the caching expires after a certain amount of time), and store your data in either: memory, Session Storage, or Local Storage.

Cache Providers

As we've said, we're going to be using three different types of "storage" types. To make this simple, we're going to define a unified interface CacheProvider<T> with T being the type of the value we're caching, and CachePayload being the data structure of the entry into the cache.

We want to store the following information:

export interface CachePayload<T> {
  value: Observable<T>;
  expiry: number | undefined;
  lastUpdated: number;
}

Let's explain the above:

  • value is the original observable
  • expiry is the value, in milliseconds, of the timestamp when the cache will expire
  • lastUpdated is there for convenience

Now, we want to define what a provider looks like. It will have three methods:

import { CachePayload } from './cache-payload.interface';

export interface CacheProvider<T> {
  get(key: string): CachePayload<T> | undefined;
  set(key: string, value: CachePayload<T>): void;
  unset(key: string): void;
}

Memory Provider

We can now write the implementations; let's start with the in-memory provider.

We can use a Map data structure to store our data by its key. NB: it's not a great idea to make a global cache like in this example, but it's best if it's injected into this function by its consumer. For simplicity, we'll use a global cache.

import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';

// global cache
const cache = new Map<string, CachePayload<unknown>>();

export function memoryCacheProvider<T>(): CacheProvider<T> {
  return {
    get(key: string) {
      return cache.get(key) as CachePayload<T> | undefined;
    },
    set(key: string, value: CachePayload<T>) {
      cache.set(key, value);
    },
    unset(key: string) {
      cache.delete(key);
    },
  };
}

The above is very simple - we implement the native Map methods using the CacheProvider interface, and some Typescript on top.

Web Storage Providers

The following is going to be slightly more difficult. We need to store our data using Web Storage (Session and Local), but our data needs to serializable, and therefore, to be extracted from the Observable and stored as JSON.

Setter

Let's start from the setter function. We need to subscribe to the Observable, extract its value, and then replace the Observable with the original JSON.

set(
  key: string, 
  payload: CachePayload<T>
) {
  payload.value.pipe(
    take(1), 
    filter(Boolean)
  ).subscribe((value) => {
    storage.setItem(
      key,
      JSON.stringify({
        ...payload,
        value,
      }),
    );
  });
}

Getter

Okay, this is not too hard.

  • We get the value, if it exists, parse it from a string to a JSON, we wrap the value in an Observable using the of creational operator, and return it
  • If it does not exist, we return undefined

Of course, it's wrapped in a try/catch because JSON.parse can go wrong.

try {
  const payload = storage.getItem(key);

  if (!payload) {
    return;
  }

  const parsed = JSON.parse(payload);

  return {
    ...parsed,
    value: of(parsed.value),
  };
  } catch (e) {
    return;
  }
}

Unset

Nothing special to see; we simply remove the item from the storage:

And this is the complete snippet:

import { of, filter, take } from 'rxjs';
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';

interface WebStorage {
  getItem(key: string): string;
  setItem(key: string, value: string): void;
  removeItem(key: string): void;
}

export function storageCacheProvider<T>(
  storage: WebStorage
): CacheProvider<T> {
  return {
    get(key: string) {
      try {
        const payload = storage.getItem(key);

        if (!payload) {
          return;
        }

        const parsed = JSON.parse(payload);

        return {
          ...parsed,
          value: of(parsed.value),
        };
      } catch (e) {
        return;
      }
    },
    set(
      key: string, 
      payload: CachePayload<T>
    ) {
      payload.value.pipe(
        take(1), 
        filter(Boolean)
      ).subscribe((value) => {
        storage.setItem(
          key,
          JSON.stringify({
            ...payload,
            value,
          }),
        );
      });
    },
    unset(key: string) {
      storage.removeItem(key);
    },
  };
}

Now that we're done defining the methods for Web Storage, we need to create the providers for the actual implementations of WebStorage:

import { CacheProvider } from './cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { storageCacheProvider } from './storage-cache-provider';

// use this as this file is being rejected by Node
declare const window: any;

export function sessionCacheProvider<T>(): CacheProvider<T> {
  const storage = window?.sessionStorage;

  return storage ? 
    storageCacheProvider(storage) : 
    memoryCacheProvider();
}

export function localCacheProvider<T>(): CacheProvider<T> {
  const storage = window?.localStorage;

  return storage ? 
    storageCacheProvider(storage) : 
    memoryCacheProvider();
}

You can find below some magic at the top to avoid errors if this somehow gets compiled by the Node runtime; please don't use these providers in Node.

It can be pretty simple to extend it to other "providers", such as Redis.

Provider Factory

The factory below will return the implementation based on which type the consumer requested:

import { CacheProviderType } from './cache-provider-type';
import { localCacheProvider } from './local-cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { sessionCacheProvider } from './session-cache-provider';

export function cacheProviderFactory<T = unknown>() {
  const memoryCache = memoryCacheProvider<T>();

  return {
    ofType(type: CacheProviderType) {
      switch (type) {
        case CacheProviderType.Memory:
          return memoryCache;

        case CacheProviderType.Persistent:
          return localCacheProvider<T>();

        case CacheProviderType.Session:
          return sessionCacheProvider<T>();
      }
    },
  };
}

Cached Observable

We finally have everything we need to finally build our utility.

Let's explore the shape of our functions

function cachedObservable<T = unknown>(
  observable$: Observable<T>,
  key: string,
  maxAge?: number | undefined,
  cacheProviderType = CacheProviderType.Memory,
): Observable<T>
  • observable$ - is the stream we receive in input. If cached, this is what gets returned back to the consumer
  • key - is a unique identifier for each stream; yep, it needs to be unique
  • maxAge this is an optional parameter; if we want the cache to expire after 10 seconds, we would pass 10_000
  • cacheProviderType - also optional; it's the provider type we want to use

We start our function by:

  • instantiating the provider type we want to use to store/retrieve the data from
  • checking if the value exists in the cache
const cacheFactory = cacheProviderFactory<T>();
const cache = cacheFactory.ofType(cacheProviderType);
const cached = cache.get(key);

if (cached) {
  if (
    cached.expiry !== undefined 
      && isKeyExpired(cached.expiry)
  ) {
    invalidateCachedObservable(cacheProviderType, key);
  } else {
    return cached.value;
  }
}

// defined somewhere else in the file
function isKeyExpired(expirationDate: number) {
  const currentTimestamp = new Date().getTime();

  return currentTimestamp >= expirationDate;
}

As you can see above:

  • if the cached value exists, AND if it is not expired, we return it
  • otherwise, we clean up the value from memory/storage, and continue with storing the value

Let's go on:

const value = observable$.pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
);

// setting expiry
const lastUpdated = +new Date();
const expiry = 
  maxAge ? lastUpdated + maxAge : undefined;

// caching value
cache.set(key, {
  value,
  expiry,
  lastUpdated,
});

// return 
return value;

We use the operator shareReplay such that new subscription will reuse the same subscription.

That means, if we have an in-flight request (hence, our cache is still missing the value) and another request gets made, instead of executing it, the consumer will receive the cached stream even if it hasn't yet emitted a value. The subject will multicast the resolved value to all its observers.

Aferwards, we set the CachePayload data structure and return the value we just cached to the consumer.

Invalidation

Of course, we also want to offer the ability to invalidate an entry.

It's simple, but we need the consumer to provide which cache provider type to use.

export function invalidateCachedObservable(
  cacheType: CacheProviderType,
  key: string,
) {
  const cacheFactory = cacheProviderFactory();
  const cache = cacheFactory.ofType(cacheType);

  return cache.unset(key);
}

Testing

We can use Jest to make sure our cache works as expected.

Let's create some utilities we can use to make our life easier:

const spy = jest.fn(() => 1);

const tick = (ms = 0) => {
  jest.advanceTimersByTime(ms);
};

const createStream = (key: string) => {
  const stream$ = of(1).pipe(
    mergeMap(() => {
      return of(1).pipe(tap(spy));
    }),
  );

  return lastValueFrom(
    cachedObservable(stream$, key)
  );
};

And now, on to the test:

describe('cachedObservable', () => 
  // tell Jest to use fake timers,
  jest.useFakeTimers();
  
  // advance virtual time by <ms> milliseconds
  const tick = (ms = 0) => {
    jest.advanceTimersByTime(ms);
  };

  it('should return the same value', () => {
    const stream = () => of(1);
    const key = '1';

    const createStream = () => {
      return cachedObservable(
        stream(), 
        key
      );
    };
  
    expect(createStream()).toBe(createStream());
  });

  it('should no longer store the value', async () => {
    const key = '4';

    // we define a spy which is going to 
    // be called whenever a new stream
    // ges created
    // if the instance is cached, it won't call it
    const spy = jest.fn(() => 1);
    
    // cache a stream, for only 500 ms
    const createStream = () => {
      const stream$ = of(1).pipe(
          mergeMap(() => {
              return of(1).pipe(
                tap(spy)
              );
          }),
      );

      // for simplicity we wrap it into a promise
      return lastValueFrom(
        cachedObservable(stream$, key, 500)
      );
    };
  
    // create the first stream and advance by 250ms
    await createStream();
    await tick(250);
  
    // create another stream and advance by 250ms
    await createStream();
    await tick(250);
  
    // create another stream after 500ms have passed
    await createStream();
  
    // we expect spy was called only twice
    expect(spy).toHaveBeenCalledTimes(2);

    return true;
});

Installing the library

If you're interested in using the library instead of building your own, follow these simples step:

Installing with NPM

npm i cached-observable --save

Usage

Let's assume we're making an HTTP requests, and the HTTP client returns an Observable:

function getTodos() {
  const request$ = this.http.get(url);
  const maxAge = 60_000;
  
  return cachedObservable(request$, url, maxAge, CacheProviderType.Persistent);
}

If we call this functions n times in the first minute, we're only making one HTTP request; the following requests will be served with data stored in the local storage!

I hope this was helpful! Ciao!


Learn more about
RxJsRxJs

Cover Image for A Reactive Enum with Typescript and RxJs
·5 min read·
RxJsRxJs

Typescript's template literals' types allow us to generate dynamic and typed code, together. In this article, I want to show how we can build a dynamic reactive enum with TS and RxJS

Cover Image for 5 common mistakes with RxJS
·6 min read·
RxJsRxJs

A list of common mistakes while using RxJS, and explanations on what to do instead

Cover Image for RxJS Subjects in Depth
·7 min read·
RxJsRxJs

Learn how RxJS Subjects are used in real-world applications

Cover Image for RxJS Patterns: Efficiency and Performance
·10 min read·
RxJsRxJs

A rundown of all RxJS operators and techniques you can leverage to avoid needless computation and make your code snappier and faster

Cover Image for A simple Countdown with RxJS
·5 min read·
RxJsRxJs

In this tutorial, we’re going to build a very simple timer application with only a few lines of code using RxJS