How to not create your RxJS Observables
Last week a friend and college of mine was stuck with a problem. An NgRx effect was subscribed to a stream of WebSocket messages, but the effect didn't receive any messages. Though, we saw that the server was sending them and that they reached the client.
The problem wasn't the Effect, but the WebSocket stream that was wrongfully initialized.
The use case was to only establish the WebSocket connection for the users that had enough permissions to start a process. The WebSocket was created to report the progress of this process to the rest of the users.
The simplified version looks like this:
This doesn't work because the steam$
is reassignment to the "real" WebSocket stream after the Effect was initialized.
When the WebSocket stream emits a new progress value, the Effect doesn't receive the update because it is listening to of({ status: 'idle' })
So how do we solve this? Mostly, the answer to that question when it comes to RxJS is to wrap the Observable inside another Observable.
Simplified reproduction link
To reproduce this in a simple way, I created 2 streams. One stream is listening for "a" keydown events, the second stream is listening to "b" keydown events. At first we're interested in the "a" events, and when the button (toggle) is clicked, we only want to receive the "b" events.
Implementation One: The Imperative Way link
To stay in the imperative world we can re-create this if
statement inside an outer Observable.
So, we start off by using the "a"-events and when the button is clicked, we switch the inner Observable to return the "b"-event stream.
In the code below we use a RxJS Subject
to recreate the toggle.
While this works, we can do better.
Implementation Two: Let's Think In Streams link
Instead of re-creating a new stream with a Subject
, why not re-use a stream?
The toggle$
stream is exactly what we need to switch between the two streams, and it's already there!
The above doesn't take the "a"-stream into account, it just creates the "b"-stream when the toggle emits a value. For our use-case this was perfect, but if needed we can provide an initial value.
With an initial value link
By using the startWith
operator, we can start-off the stream with a single "a" value.
With an initial stream link
Or, if you're interested in the "a"-stream you can use the concat
method
in combination with the takeUntil
operator.
This will handle all streams sequentially.
For our code this means that it will first emit all the "a" events, and when the toggle is clicked it switches to the "b" events.
Conclusion link
By wrapping the Observable (the inner-Observable) in an other Observable (the outer-Observable), the reference to the Observable remains the same. In the Observable we forsee a way to switch between the two Observables. This way, in our case, the NgRx Effect works as intended.
format_quoteThere are multiple operators that provide a solution to different use case, try the RxJS Operator Decision Tree to find your solution
You can play around with the code in this Blitz.
Feel free to update this blog post on GitHub, thanks in advance!
Join My Newsletter (WIP)
Join my weekly newsletter to receive my latest blog posts and bits, directly in your inbox.
Support me
I appreciate it if you would support me if have you enjoyed this post and found it useful, thank you in advance.