What Does Pipe Do Anyway?
Let’s take a quick look at the most common RxJS example. This code will log out
MouseEvents
from clicking on the documuent:
import { fromEvent } from "rxjs"fromEvent(document, "click").subscribe(value => {console.log(value)})
So what happens when we add a pipe
into the mix:
fromEvent(document, "click").pipe() //what happens here?.subscribe(value => {console.log(value)})
As it turns out, our MouseEvent
s are still logged out. This is the exact same behavior
as before. But why?
Pipe
Returns an Observable
To demonstrate, the code belows shows that pipe
returns its own observable:
const observable = fromEvent()console.log(observable === observable.pipe()) //true
So What’s an Operator
?
An operator
is a function you pass into a pipe
. And pipe
returns its own observable. So let’s think about what that means:
- An
operator
has the original observable as the first argument - An
operator
returns an observable
This most basic operator
we can write looks like this:
const operator = observable => {//return the original observablereturn observable}fromEvent(document, "click").pipe(operator) //our operator only passes the observable through.subscribe(value => {console.log(value)})
Observable In, Observable Out
Since returning the original observable does nothing, let’s try returning a different observable. Pay special attention to the following:
- The
click
observable never calls subscribe! It’s simply ignored by the operator - We
subscribe
to thehi
observable
In fact:
console.log(click.pipe(() => hi) === hi) //true!
This isn’t at all what we want, but it proves “Observable in, Observable out”
Always subscribe
to the Original Observable Inside of an Operator
The previous examples were simply to prove a point: Operators receive the original Observable return an Observable.
But the purpose of operators is to subscribe
to the original Observable then change the behavior of the observer:
The simplest example I can think of involves subscribing
and logging out “hi”.
const operator = observable => {observable.subscribe(value => {console.log("hi")})return observable}
With this operator in place, our demo will log out both "hi"
and the MouseEvent
.
The Operator
’s True Purpose: Intercepting Values from Observables
Herein lies the secret sauce of operators:
- Create a new Observable inside the Operator
subscribe
to the original Observable- Pass different values to
next
const operator = observable => {const newObservable = { //1. Create a new Observablesubscribe: next => {observable.subscribe(value => { //2. Subscribe to the originalnext("hi") //3. Pass a different value to `next`})}}return newObservable}
This opens the door to do anything inside an operator!
❗️ RxJS has APIs for creating new Observables (e.g.,
new Observable
). It’s important to use the API instead of the plain object we wrote above to handle completion, errors, and many more cases.
Adding Arguments to Operators
Let’s extract the "hi"
from our previous example to be an argument in our operator:
//beforeconst operator = observable => {...next("hi")//afterconst operator = message => observable => {...next(message)
Now we can pass "hi"
as the first argument to our operator.
observable.pipe(operator("hi"))
operator(message)
creates a function
that’s passed back to pipe
which then passes in the Observable. If this is unfamiliar, it may help to
see it written out in long-form, then refactored step-by-step:
Long Version
observable.pipe(observable => {const newOperator = operator("hi")return newOperator(observable)})
Medium Version
observable.pipe(observable => {return operator("hi")(observable)})
Short Version
observable.pipe(operator("hi"))
All three versions are the same. It only depends on your exposure to these coding patterns for which version is the most comfortable to you. I’d recommend becoming familiar with the short version, because that’s what all the RxJS docs use.
You now have unlimited customization options. You can pass in values, functions, observables, or anything you want to customize how your new Observable will behave. The power is in your hands! 💪
Pipe Internals from Scratch
Let’s strip down our RxJS patterns to the bare minimum required to “push”
values to a next
function. Here’s our next
function:
const next = value => {console.log(value)}
Next, we’ll create a barebones Observable; an Object with a subscribe
method
which takes next
as a function and invokes it:
const observable = {subscribe: next => {next("hello")}}
Finally, invoke subscribe
with next
and you should see “hello” in the console:
Here Comes the Pipe
[Insert “ceci n’est pas une pipe” joke here]
The pipe
method will sit in-between the Observable
and the Observer
allowing
us to operate on what happens between the beginning and the end:
observable.pipe(/*do something awesome*/).subscribe(next)
To create a pipe
method, we need to pass the Observable
itself (AKA this
in JavaScript)
down through the pipe
so it has access to the internals:
pipe(operator){operator(this)}
We can drop that pipe
method directly on to the Observable
:
const observable = {subscribe: next => {next("hello")},pipe(operator){return operator(this)}}
Let’s create an operator
that does nothing:
const operator = observable => {return observable}
And now drop the operator
into the pipe
:
observable.pipe(operator).subscribe(next)
You’ll see that we get the same "hello"
output as before. The Observable
is going in the function and out the function unchanged:
pipe
Can Take Multiple Operators
If you’ve seen many pipe
demos, you’ve probably seen:
.pipe(map, filter, scan)
Multiple arguments is simply an API choice for convenience by the RxJS team. If they would have
limited pipe
to one argument, you would have to chain pipe
like this:
.pipe(map).pipe(filter).pipe(scan)
To enable multiple operators in our demo, we have to treat them as an Array. We can use
the ...
array syntax to pull in every operator as an Array. Then use reduce
on that
Array to apply each operator to the observable:
pipe(...operators) {return operators.reduce((observable, operator) => {return operator(observable)}, this)}
Now we’re free to pass in as many operators as we want: