Caching RxJS streams into Web Storage
Learn how to cache RxJS streams in your browser's storage
RxJS is a powerful Reactive Functional Programming library that helps us tame asynchronous programming with Javascript and Typescript.
Important frameworks such as Nest and Angular (among others) use RxJS as the tool to handle asynchronous communication, such as for handling HTTP requests.
As a user, it can be very common having to cache some requests (or
computationally heavy calculations) in memory or in
the user's browsers (using LocalStorage
or SessionStorage
): for
example, fetching from your expensive endpoint only if the latest request sent by the client was over 1 minute ago - or store data locally forever if this never really changes.
In this article, I want to introduce you a library I wrote to handle this sort of situations named cached-observable.
I will walk through the steps to build it from scratch.
The library allows you to cache streams, assign a maximum age (so that the caching expires after a certain amount of time), and store your data in either: memory, SessionStorage
, or LocalStorage
.
I personally use this library in all my Rx-powered codebases.
Cache Providers
As we've said, we're going to be using three different types of "storage" types.
To make this simple, we're going to define a unified interface CacheProvider<T>
with T
being the type of the value we're caching, and CachePayload
being the data structure of the entry into the cache.
We want to store the following information:
export interface CachePayload<T> {
value: Observable<T>;
expiry: number | undefined;
lastUpdated: number;
}
Let's explain the above:
value
is the originalObservable
expiry
is the value, in milliseconds, of the timestamp when the cache will expirelastUpdated
is there for convenience
Now, we want to define what a cache provider looks like. It will have three methods:
import { CachePayload } from './cache-payload.interface';
export interface CacheProvider<T> {
get(key: string): CachePayload<T> | undefined;
set(key: string, value: CachePayload<T>): void;
unset(key: string): void;
}
Memory Provider
We can now write the implementations; let's start with the in-memory
provider.
The in-memory
provider helps us cache data in memory, until one of the
following events happen:
- in a browser, when the page is refreshed
- in NodeJS, when the process restarts
- when it's manually cleared by the user
To store the data locally, can use a Map
data structure to store our data
indexed by a unique key provided by the consumer.
NB: we're going to make a global cache, but it's not a great idea. It's best if it's injected into this function by its consumer. For simplicity's sake, we use a global cache.
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';
// global cache
const cache = new Map<string, CachePayload<unknown>>();
export function memoryCacheProvider<T>(): CacheProvider<T> {
return {
get(key: string) {
return cache.get(key) as CachePayload<T> | undefined;
},
set(
key: string,
value: CachePayload<T>
) {
cache.set(key, value);
},
unset(key: string) {
cache.delete(key);
},
};
}
The above is very simple: we implement the native Map
methods using the
CacheProvider
interface, and some Typescript on top. We do it such that
our implementation adheres to the CacheProvider
interfaces.
Web Storage Providers
Storing our streams' data in Web Storage is going to be slightly harder.
Problem: the data needs to serializable, and therefore, to be extracted from the Observable and stored as stringified JSON.
Caching our Observable's data into Web Storage using a Setter
Let's start from the setter function.
We need to:
- subscribe to the
Observable
- extract its value
- replace the
Observable
with the original JSON
set(
key: string,
payload: CachePayload<T>
) {
payload.value.pipe(
take(1),
filter(Boolean)
).subscribe((value) => {
storage.setItem(
key,
JSON.stringify({
...payload,
value,
}),
);
});
}
Getting data from Web Storage to an Observable's Stream
Okay, this is not too hard.
- we get the value from Local Storage using the provided unique key
- we parse it from a string to a JSON
- we wrap the value in an
Observable
using theof
creational operator - we return the resulting observable
- if it does not exist, we return undefined
Of course, we wrap the code block in a try/catch
because JSON.parse
can go
very wrong.
try {
const payload = storage.getItem(key);
if (!payload) {
return;
}
const parsed = JSON.parse(payload);
return {
...parsed,
value: of(parsed.value),
};
} catch (e) {
return;
}
}
Removing a value from the cache
We simply remove the item from the storage. This is the complete snippet:
import { of, filter, take } from 'rxjs';
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';
interface WebStorage {
getItem(key: string): string;
setItem(key: string, value: string): void;
removeItem(key: string): void;
}
export function storageCacheProvider<T>(
storage: WebStorage
): CacheProvider<T> {
return {
get(key: string) {
try {
const payload = storage.getItem(key);
if (!payload) {
return;
}
const parsed = JSON.parse(payload);
return {
...parsed,
value: of(parsed.value),
};
} catch (e) {
return;
}
},
set(
key: string,
payload: CachePayload<T>
) {
payload.value.pipe(
take(1),
filter(Boolean)
).subscribe((value) => {
storage.setItem(
key,
JSON.stringify({
...payload,
value,
}),
);
});
},
unset(key: string) {
storage.removeItem(key);
},
};
}
Now that we're done defining the methods for Web Storage, we need to create the providers for the actual implementations of WebStorage
:
import { CacheProvider } from './cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { storageCacheProvider } from './storage-cache-provider';
// use this as this file is being rejected by Node
declare const window: any;
export function sessionCacheProvider<T>(): CacheProvider<T> {
const storage = window?.sessionStorage;
return storage ?
storageCacheProvider(storage) :
memoryCacheProvider();
}
export function localCacheProvider<T>(): CacheProvider<T> {
const storage = window?.localStorage;
return storage ?
storageCacheProvider(storage) :
memoryCacheProvider();
}
You can find below some magic at the top to avoid errors if this somehow gets compiled by the Node runtime; please don't use these providers in Node.
It can be pretty simple to extend it to other "providers", such as Redis
.
Provider Factory
The factory below will return the implementation based on which type the consumer requested:
import { CacheProviderType } from './cache-provider-type';
import { localCacheProvider } from './local-cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { sessionCacheProvider } from './session-cache-provider';
export function cacheProviderFactory<T = unknown>() {
const memoryCache = memoryCacheProvider<T>();
return {
ofType(type: CacheProviderType) {
switch (type) {
case CacheProviderType.Memory:
return memoryCache;
case CacheProviderType.Persistent:
return localCacheProvider<T>();
case CacheProviderType.Session:
return sessionCacheProvider<T>();
}
},
};
}
Cached Observable
We finally have everything we need to finally build our utility.
Let's explore the shape of our functions
function cachedObservable<T = unknown>(
observable$: Observable<T>,
key: string,
maxAge?: number | undefined,
cacheProviderType = CacheProviderType.Memory,
): Observable<T>
observable$
- is the stream we receive in input. If cached, this is what gets returned back to the consumerkey
- is a unique identifier for each stream; yep, it needs to be uniquemaxAge
this is an optional parameter; if we want the cache to expire after 10 seconds, we would pass10_000
cacheProviderType
- also optional; it's the provider type we want to use
We start our function by:
- instantiating the provider type we want to use to store/retrieve the data from
- checking if the value exists in the cache
const cacheFactory = cacheProviderFactory<T>();
const cache = cacheFactory.ofType(cacheProviderType);
const cached = cache.get(key);
if (cached) {
if (
cached.expiry !== undefined
&& isKeyExpired(cached.expiry)
) {
invalidateCachedObservable(cacheProviderType, key);
} else {
return cached.value;
}
}
// defined somewhere else in the file
function isKeyExpired(expirationDate: number) {
const currentTimestamp = new Date().getTime();
return currentTimestamp >= expirationDate;
}
As you can see above:
- if the cached value exists, AND if it is not expired, we return it to the consumer
- if it does not exist, we remove the value from memory and continue with storing the value
Let's go on:
const value = observable$.pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
// setting expiry
const lastUpdated = +new Date();
const expiry =
maxAge ? lastUpdated + maxAge : undefined;
// caching value
cache.set(key, {
value,
expiry,
lastUpdated,
});
// return
return value;
We use the operator shareReplay
such that new subscription will reuse the same subscription.
That means, if we have an in-flight request (hence, our cache is still missing the value) and another request gets made, instead of executing it, the consumer will receive the cached stream even if it hasn't yet emitted a value. The subject will multicast the resolved value to all its observers.
Aferwards, we set the CachePayload
data structure and return the value we just cached to the consumer.
Invalidation
Of course, we also want to offer the ability to invalidate an entry.
It's simple, but we need the consumer to provide which cache provider type to use.
export function invalidateCachedObservable(
cacheType: CacheProviderType,
key: string,
) {
const cacheFactory = cacheProviderFactory();
const cache = cacheFactory.ofType(cacheType);
return cache.unset(key);
}
Testing
We can use Jest to make sure our cache works as expected.
Let's create some utilities we can use to make our life easier:
const spy = jest.fn(() => 1);
const tick = (ms = 0) => {
jest.advanceTimersByTime(ms);
};
const createStream = (key: string) => {
const stream$ = of(1).pipe(
mergeMap(() => {
return of(1).pipe(tap(spy));
}),
);
return lastValueFrom(
cachedObservable(stream$, key)
);
};
Now, let's write a Jest test to check our implementation works correctly:
describe('cachedObservable', () =>
// tell Jest to use fake timers,
jest.useFakeTimers();
// advance virtual time by <ms> milliseconds
const tick = (ms = 0) => {
jest.advanceTimersByTime(ms);
};
it('should return the same value', () => {
const stream = () => of(1);
const key = '1';
const createStream = () => {
return cachedObservable(
stream(),
key
);
};
expect(createStream()).toBe(createStream());
});
it('should no longer store the value', async () => {
const key = '4';
// we define a spy which is going to
// be called whenever a new stream
// ges created
// if the instance is cached, it won't call it
const spy = jest.fn(() => 1);
// cache a stream, for only 500 ms
const createStream = () => {
const stream$ = of(1).pipe(
mergeMap(() => {
return of(1).pipe(
tap(spy)
);
}),
);
// for simplicity we wrap it into a promise
return lastValueFrom(
cachedObservable(stream$, key, 500)
);
};
// create the first stream and advance by 250ms
await createStream();
await tick(250);
// create another stream and advance by 250ms
await createStream();
await tick(250);
// create another stream after 500ms have passed
await createStream();
// we expect spy was called only twice
expect(spy).toHaveBeenCalledTimes(2);
return true;
});
Installing the library
If you're interested in using the library instead of building your own, follow these simples step:
Installing with NPM
npm i cached-observable --save
Usage
Let's assume we're making an HTTP requests, and the HTTP client returns an Observable:
function getTodos() {
const request$ = this.http.get(url);
const maxAge = 60_000;
return cachedObservable(request$, url, maxAge, CacheProviderType.Persistent);
}
If we call this functions n
times in the first minute, we're only making one HTTP request; the following requests will be served with data stored in the local storage!
I hope this was helpful! Ciao!