Caching RxJS streams into Web Storage
Frameworks such as Nest and Angular (among others) use RxJS as an async API for handling HTTP requests.
If you're a user of any of these, it may be common that you need to cache some requests; for example, fetching from your expensive endpoint only if the latest request sent by the client was over 1 minute ago - or store data locally forever if this never really changes.
In this article, I want to introduce you to the library cached-observable, and walk through the steps to build it from scratch. I use this library in all my Angular/Nest projects.
The library allows you to cache streams, assign a maximum age (so that the caching expires after a certain amount of time), and store your data in either: memory, Session Storage, or Local Storage.
Cache Providers
As we've said, we're going to be using three different types of "storage" types. To make this simple, we're going to define a unified interface CacheProvider<T>
with T
being the type of the value we're caching, and CachePayload
being the data structure of the entry into the cache.
We want to store the following information:
export interface CachePayload<T> {
value: Observable<T>;
expiry: number | undefined;
lastUpdated: number;
}
Let's explain the above:
value
is the original observableexpiry
is the value, in milliseconds, of the timestamp when the cache will expirelastUpdated
is there for convenience
Now, we want to define what a provider looks like. It will have three methods:
import { CachePayload } from './cache-payload.interface';
export interface CacheProvider<T> {
get(key: string): CachePayload<T> | undefined;
set(key: string, value: CachePayload<T>): void;
unset(key: string): void;
}
Memory Provider
We can now write the implementations; let's start with the in-memory
provider.
We can use a Map
data structure to store our data by its key. NB: it's not a great idea to make a global cache like in this example, but it's best if it's injected into this function by its consumer. For simplicity, we'll use a global cache.
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';
// global cache
const cache = new Map<string, CachePayload<unknown>>();
export function memoryCacheProvider<T>(): CacheProvider<T> {
return {
get(key: string) {
return cache.get(key) as CachePayload<T> | undefined;
},
set(key: string, value: CachePayload<T>) {
cache.set(key, value);
},
unset(key: string) {
cache.delete(key);
},
};
}
The above is very simple - we implement the native Map
methods using the CacheProvider
interface, and some Typescript on top.
Web Storage Providers
The following is going to be slightly more difficult. We need to store our data using Web Storage (Session and Local), but our data needs to serializable, and therefore, to be extracted from the Observable and stored as JSON.
Setter
Let's start from the setter function. We need to subscribe to the Observable
, extract its value, and then replace the Observable
with the original JSON.
set(
key: string,
payload: CachePayload<T>
) {
payload.value.pipe(
take(1),
filter(Boolean)
).subscribe((value) => {
storage.setItem(
key,
JSON.stringify({
...payload,
value,
}),
);
});
}
Getter
Okay, this is not too hard.
- We get the value, if it exists, parse it from a string to a JSON, we wrap the value in an
Observable
using theof
creational operator, and return it - If it does not exist, we return undefined
Of course, it's wrapped in a try/catch
because JSON.parse can go wrong.
try {
const payload = storage.getItem(key);
if (!payload) {
return;
}
const parsed = JSON.parse(payload);
return {
...parsed,
value: of(parsed.value),
};
} catch (e) {
return;
}
}
Unset
Nothing special to see; we simply remove the item from the storage:
And this is the complete snippet:
import { of, filter, take } from 'rxjs';
import { CachePayload } from './cache-payload.interface';
import { CacheProvider } from './cache-provider';
interface WebStorage {
getItem(key: string): string;
setItem(key: string, value: string): void;
removeItem(key: string): void;
}
export function storageCacheProvider<T>(
storage: WebStorage
): CacheProvider<T> {
return {
get(key: string) {
try {
const payload = storage.getItem(key);
if (!payload) {
return;
}
const parsed = JSON.parse(payload);
return {
...parsed,
value: of(parsed.value),
};
} catch (e) {
return;
}
},
set(
key: string,
payload: CachePayload<T>
) {
payload.value.pipe(
take(1),
filter(Boolean)
).subscribe((value) => {
storage.setItem(
key,
JSON.stringify({
...payload,
value,
}),
);
});
},
unset(key: string) {
storage.removeItem(key);
},
};
}
Now that we're done defining the methods for Web Storage, we need to create the providers for the actual implementations of WebStorage:
import { CacheProvider } from './cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { storageCacheProvider } from './storage-cache-provider';
// use this as this file is being rejected by Node
declare const window: any;
export function sessionCacheProvider<T>(): CacheProvider<T> {
const storage = window?.sessionStorage;
return storage ?
storageCacheProvider(storage) :
memoryCacheProvider();
}
export function localCacheProvider<T>(): CacheProvider<T> {
const storage = window?.localStorage;
return storage ?
storageCacheProvider(storage) :
memoryCacheProvider();
}
You can find below some magic at the top to avoid errors if this somehow gets compiled by the Node runtime; please don't use these providers in Node.
It can be pretty simple to extend it to other "providers", such as Redis.
Provider Factory
The factory below will return the implementation based on which type the consumer requested:
import { CacheProviderType } from './cache-provider-type';
import { localCacheProvider } from './local-cache-provider';
import { memoryCacheProvider } from './memory-cache-provider';
import { sessionCacheProvider } from './session-cache-provider';
export function cacheProviderFactory<T = unknown>() {
const memoryCache = memoryCacheProvider<T>();
return {
ofType(type: CacheProviderType) {
switch (type) {
case CacheProviderType.Memory:
return memoryCache;
case CacheProviderType.Persistent:
return localCacheProvider<T>();
case CacheProviderType.Session:
return sessionCacheProvider<T>();
}
},
};
}
Cached Observable
We finally have everything we need to finally build our utility.
Let's explore the shape of our functions
function cachedObservable<T = unknown>(
observable$: Observable<T>,
key: string,
maxAge?: number | undefined,
cacheProviderType = CacheProviderType.Memory,
): Observable<T>
observable$
- is the stream we receive in input. If cached, this is what gets returned back to the consumerkey
- is a unique identifier for each stream; yep, it needs to be uniquemaxAge
this is an optional parameter; if we want the cache to expire after 10 seconds, we would pass10_000
cacheProviderType
- also optional; it's the provider type we want to use
We start our function by:
- instantiating the provider type we want to use to store/retrieve the data from
- checking if the value exists in the cache
const cacheFactory = cacheProviderFactory<T>();
const cache = cacheFactory.ofType(cacheProviderType);
const cached = cache.get(key);
if (cached) {
if (
cached.expiry !== undefined
&& isKeyExpired(cached.expiry)
) {
invalidateCachedObservable(cacheProviderType, key);
} else {
return cached.value;
}
}
// defined somewhere else in the file
function isKeyExpired(expirationDate: number) {
const currentTimestamp = new Date().getTime();
return currentTimestamp >= expirationDate;
}
As you can see above:
- if the cached value exists, AND if it is not expired, we return it
- otherwise, we clean up the value from memory/storage, and continue with storing the value
Let's go on:
const value = observable$.pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
// setting expiry
const lastUpdated = +new Date();
const expiry =
maxAge ? lastUpdated + maxAge : undefined;
// caching value
cache.set(key, {
value,
expiry,
lastUpdated,
});
// return
return value;
We use the operator shareReplay
such that new subscription will reuse the same subscription.
That means, if we have an in-flight request (hence, our cache is still missing the value) and another request gets made, instead of executing it, the consumer will receive the cached stream even if it hasn't yet emitted a value. The subject will multicast the resolved value to all its observers.
Aferwards, we set the CachePayload
data structure and return the value we just cached to the consumer.
Invalidation
Of course, we also want to offer the ability to invalidate an entry.
It's simple, but we need the consumer to provide which cache provider type to use.
export function invalidateCachedObservable(
cacheType: CacheProviderType,
key: string,
) {
const cacheFactory = cacheProviderFactory();
const cache = cacheFactory.ofType(cacheType);
return cache.unset(key);
}
Testing
We can use Jest to make sure our cache works as expected.
Let's create some utilities we can use to make our life easier:
const spy = jest.fn(() => 1);
const tick = (ms = 0) => {
jest.advanceTimersByTime(ms);
};
const createStream = (key: string) => {
const stream$ = of(1).pipe(
mergeMap(() => {
return of(1).pipe(tap(spy));
}),
);
return lastValueFrom(
cachedObservable(stream$, key)
);
};
And now, on to the test:
describe('cachedObservable', () =>
// tell Jest to use fake timers,
jest.useFakeTimers();
// advance virtual time by <ms> milliseconds
const tick = (ms = 0) => {
jest.advanceTimersByTime(ms);
};
it('should return the same value', () => {
const stream = () => of(1);
const key = '1';
const createStream = () => {
return cachedObservable(
stream(),
key
);
};
expect(createStream()).toBe(createStream());
});
it('should no longer store the value', async () => {
const key = '4';
// we define a spy which is going to
// be called whenever a new stream
// ges created
// if the instance is cached, it won't call it
const spy = jest.fn(() => 1);
// cache a stream, for only 500 ms
const createStream = () => {
const stream$ = of(1).pipe(
mergeMap(() => {
return of(1).pipe(
tap(spy)
);
}),
);
// for simplicity we wrap it into a promise
return lastValueFrom(
cachedObservable(stream$, key, 500)
);
};
// create the first stream and advance by 250ms
await createStream();
await tick(250);
// create another stream and advance by 250ms
await createStream();
await tick(250);
// create another stream after 500ms have passed
await createStream();
// we expect spy was called only twice
expect(spy).toHaveBeenCalledTimes(2);
return true;
});
Installing the library
If you're interested in using the library instead of building your own, follow these simples step:
Installing with NPM
npm i cached-observable --save
Usage
Let's assume we're making an HTTP requests, and the HTTP client returns an Observable:
function getTodos() {
const request$ = this.http.get(url);
const maxAge = 60_000;
return cachedObservable(request$, url, maxAge, CacheProviderType.Persistent);
}
If we call this functions n
times in the first minute, we're only making one HTTP request; the following requests will be served with data stored in the local storage!
I hope this was helpful! Ciao!