What Does mergeMap
Do?
Let’s take the example of “click to start a timeout”. In vanilla JavaScript, you would write it like this:
document.addEventListener("click", event => {setTimeout(() => {console.log("timeout")}, 1000)})
If you understand that, you can understand mergeMap
. mergeMap
starts
with source #1 (the document clicks) then inside the callback, starts
source #2 (the timeout). The reason mergeMap
can be confusing in RxJS is
due to the APIs that wrap around those behaviors. RxJS is a pattern that
follows rules on how sources talk to each other which makes your code flexible and expressive
at the cost of mental overhead of learning the patterns.
Refactoring to Functions
Let’s explore mergeMap
by refactoring the above example into an
RxJS API. First, wrap source #1 (the document clicks) in a function that
receives a callback:
const source1 = callback => {document.addEventListener("click", callback)}
Now invoke source1 with the exact same callback as our first example:
source1(event => {setTimeout(() => {console.log("timeout")}, 1000)})
This has the same behavior as the code above. The only change is that
we’ve wrapped our addEventListener
in a function. So let’s do the same by
wrapping our setTimeout
in a function:
const source2 = callback => {setTimeout(callback, 1000)}
With both source1
and source2
defined as functions, we can express our example
like so:
source1(event => {source2(() => {console.log("timeout")})})
Making a mergeMap
Operator
Time for some copy/paste! Grab the entire previous block of code
and drop it inside a function called mergeMap
:
const mergeMap = /* But what goes here? */ => {source1(event => {source2(() => {console.log("timeout")})})}
Looking at the above example, you’ll see that we need to make 2 things into
arguments of our mergeMap
function:
source1
source2
So let’s add them in like so:
const mergeMap = (source1, source2) => {source1(event => {source2(() => {console.log("timeout") //AH, I'M TRAPPED!})})}
You’ll notice there’s one more thing trapped inside our function: the callback! We definitely want to be able to customize the callback, so let’s pull that out too:
const mergeMap = (source1, source2, callback) => {source1(event => {source2(callback)})}
Using our new mergeMap
function would look like this:
mergeMap(source1, source2, () => {console.log("timeout")})
This is NOT the RxJS API you might be familiar with, but we’re almost there.
Customizing the Second Source
Looking back at our implementation so far, you’ll see that we left a callback implementation (the one that sets up source #2) locked inside our function.
const mergeMap = (source1, source2, callback) => {source1(event => {source2(callback) //What about this behavior?})}
Let’s extract that as well:
const mergeMap = (source1, customize, callback) => {source1(event => {const custom = customize(event)custom(callback)})}
And the implementation:
mergeMap(source1,event => source2,() => {console.log("timeout")})
Now we have all the pieces in place. mergeMap
is expressed in a way that
we can pass in source1
, customize how source2
is made, and then pass in
the callback
to source2
.
Refactoring to an RxJS API
RxJS uses pipe
and subscribe
instead of invoking functions like we did above.
The example above written in RxJS would look something like this:
source1.pipe(mergeMap(event => source2)).subscribe(()=>{console.log("timeout")})
We can definitely add in this API to our current setup. So keep that codeblock above in mind as we do the following refactors.
Creating a New Source
In RxJS, Operators ALWAYS return a new source. In our simple implementation, a “source” can be defined as a function with a callback argument. This refactor is pretty straight-forward. We move the callback from the last argument to instead be the first argument of a returned function. That’s a lot of words to say that you’re doing this:
//beforeconst mergeMap = (source1, customize, callback) => {//afterconst mergeMap = (source1, customize) => callback => {
That syntax might be unfamiliar, so let’s soften the impact and name our newSource
:
const mergeMap = (source1, customize) => {const newSource = callback => {source1(event => {const custom = customize(event)custom(callback)})}return newSource}
Using mergeMap
now looks like this:
const newSource = mergeMap(source1, source2)newSource(() => {console.log("timeout")})
Adding subscribe
Now we can tack on a .subscribe
by having it reference itself. Remember, in
our demo, calling the source function is the same as calling subscribe
. So
to make it match RxJS’s API, we’ll do something a little silly:
newSource.subscribe = newSource
Yes, we’re making a property on a function reference the function itself. Kinda dumb, but the goal is to match an API, not be smart:
const newSource = mergeMap(source1, source2)newSource.subscribe = newSourcenewSource.subscribe(() => {console.log("timeout")})
Adding a .pipe
Function
RxJS uses a fluent API, so our sources need the .pipe
function attached to
them. We’ll first define pipe
:
const pipe = function(fn) {return fn(this)}
Then we can attach our pipe to any/all of the sources:
source1.pipe = pipe
With .pipe
attached to a source, pipe
is expecting a fn
that
will receive the source, so we have to re-arrange the arguments of mergeMap
to support that:
//beforeconst mergeMap = (source1, customize) => {//afterconst mergeMap = customize => source1 => {
Again, if the curry syntax is unfamiliar, we can write it out long-hand. Our fn
will have to wrap all the newSource
behavior from before:
const mergeMap = customize => {const fn = source1 => {const newSource = callback => {source1(event => {const custom = customize(event)custom(callback)})}return newSource}return fn}
Once you’ve become more accustomed to currying arguments, the following is the exact same code as above. Whether it’s “more readable” is in the eye of the beholder. It’s definitely much shorter:
const mergeMap = customize => source1 => callback =>source1(event => {const custom = customize(event)custom(callback)})
With mergeMap
refactored, we’ve matched the API of RxJS:
const newSource = source1.pipe(mergeMap(event => source2))newSource.subscribe = newSourcenewSource.subscribe(() => {console.log("timeout")})
Functions as Building Blocks
It’s fun to see what we can write with “just functions”, so let’s remove all the RxJS properties we added and only use our functions.
Let’s compare the original:
document.addEventListener("click", event => {setTimeout(() => {console.log("timeout")}, 1000)})
To our new batch of functions:
const source1 = callback => {document.addEventListener("click", callback)}const source2 = callback => {setTimeout(callback, 1000)}const mergeMap = customize => source1 => callback =>source1(event => {const custom = customize(event)custom(callback)})const mergeSource2 = mergeMap(event => source2)const newSource = mergeSource2(source1)newSource(() => {console.log("timeout")})
We took the time to separate each piece of the original into reusable functions. This approach enables us to use functions as building blocks of functionality and build even more features around them. RxJS is a library that defines patterns around these functions so they can all interoperate, but that doesn’t mean you’re stuck doing things the RxJS-way. All the concepts in RxJS can be expressed in functions that you can dig into yourself.
Endnotes:
I know RxJS has more features than my trivial explanation and example including unsubscribe/complete, errors, scheduling, etc. The RxJS contributors have done incredible, inspirational work on the project. I also think stripping away all the features and approaching them from the ground up is great teaching tool.