Using RxJS as a Synchronous Transducer Library
JavaScript Arrays are great when you only have a few items, but when you have a large amount of data or want to do complex transformations with lots of map
, filter
, and reduce
method calls, you’ll see a significant slowdown in performance using Array.prototype
methods.
Take this simple map
example:
initialArray
.map(String)
Even something as simple as a single array map
could be made orders of magnitude faster with transducers. As you’ll see in my performance analysis, the final numbers were incredibly surprising.
You might not even realize how slow something is until you’ve made it faster, but there are also rare situations where you’ll either be working with large arrays of data or handling a lot of transformations and need to improve performance.
Let’s do a simple performance test with 10 million items using Array.prototype.map
:
And then let’s compare that to the native for
loop:
I ran each test 7 times and pulled the averages. On my overclocked Intel Core i7–4770K, our array method averaged 1281ms while our for
loop averaged 323ms. Incredible right? What a performance improvement! But for
loops are so 10 years ago. They’re difficult to write and harder to reason about when doing complex transformations.
How do we solve this almost 300% performance difference but still keep our code readable?
Transduction is a way to of processing a bunch of logic on each individual item in an array by wrapping it in a recursive set of reducer functions before outputting a value. This is different from how JavaScript Array
methods function because each item in the array is processed with a single map
, filter
, or reduce
method before continuing to the next, and each step of the way, it’s creating completely new arrays to house that data.
There are plenty of libraries for handling transduction and a quite a few articles on how to do it to replace native Array
methods, but I want to go over an experiment I tried in one of my home projects which was using RxJS instead of JavaScript’s native Array
methods.
I’m a believer that it’s easier and cheaper to learn one thing well than to have to learn a ton of other things. While its important to use the right tool for the right job, I’m always exploring ways of using the same tool for a bunch of different jobs because it can minimize a team’s learning cost and significantly reduce a project’s implementation costs.
Like my article on backpressure in RxJS, this implementation is experimental. Normally, I write articles about things I’ve had in production, but these two are about ideas I’ve come up with to expand RxJS’s capabilities.
If you want to go in-depth on Transducers, Eric Elliot has a fantastic article to get you started:
A transducer runs a single item through a bunch of functions just like an RxJS stream, and that’s especially relevant now as RxJS implements the transducer pattern with their pipe
method and lettable operators there might be a way to make these compatible.
I’d make the argument RxJS — if it’s already part of your project — could be used for synchronous transducers in place of array methods, but there are some caveats.
First, for whatever reason, RxJS doesn’t implement the transducer interface like other transducer libraries, so it’s not compatible with them out-of-the-box. Not a big deal if all you’re using is RxJS anyway.
Second, RxJS relies on the subscribe
method which uses a callback. Array
methods return arrays, not observables containing the value of an array. So what do you do? Unless you’re using BehaviorSubject
, you can’t pull a synchronous value and even using it, you can’t be sure the value you get is going to be what you expect.
On top of that, RxJS observables don’t make a distinction between asynchronous and synchronous operators. You could create a synchronous wrapper around your logic, but it will only protect you at runtime.
Since RxJS’s BehaviorSubject
is close to what we want, let’s look at an implementation using it:
Yuck! This is not something I wanna do over-and-over again in my codebase. And having to remember to unsubscribe after subscribing? And remembering to mergeMap
each time? You gotta be kidding me.
Also note, this doesn’t even work. Your value
is the original array you passed in, not the one that went through the pipeline. You can only pickup your transduced value
by passing in a handler to subscribe
so BehaviorSubject
turned out to be pretty worthless.
If we used from
, we could mimic the same behavior, get it working, and use less code:
This is a ton cleaner and value
is correct now, but it still has the same issues as before: hard to maintain and hard to write (and gross). If it’s only one spot in your codebase, no problem, but more than that? Start thinking of better ways.
What we need is a way to remove our dependency on the subscribe
method to make an observable function synchronously and return a value. In this case, we want an array.
To get a value back, we’ll need something like this on our observable:
source$
.execute = () => {
// do transducer logic return value
}
While this is a method that doesn’t currently exist in RxJS, it’s definitely something we could add ourselves in our own projects.
RxJS has a similar method to our made-up execute
called forEach
, but it’s designed similarly to Array.prototype.forEach
except that instead of returning undefined
, it returns a promise. The promise then
method doesn’t return anything, but it does say when your observable is complete. If it errors, catch
will trigger. It’s a pretty nifty setup, but not useful when we’re trying to replicate Array
's synchronous operations.
Because we’re dealing with an array, there are no values over time. RxJS already handles this and completes the observable when it’s hit the end of the list. If we wrap subscribe
in our execute
function, we could have it return a synchronous value similar to how async-await works without it requiring you rewrite your entire application.
This is how I imagine we’d use execute
:
And with this particular implementation, I was able to capture an error when asynchronous code ran:
Notice how our value is undefined
, but we’re still able to see an error about its usage sometime after. Because it’s asynchronous, we can’t use try-catch to find the error, but we can log it to the console at runtime so you can be aware something’s wrong.
But this implementation has a flaw wherein you could have an asynchronous observable that never unsubscribes and only logs errors.
Instead, we need to unsubscribe immediately after execution, but we’ll have no way to know our observable has asynchronous operators anymore:
The only other way would be adding a take(1)
operator and unsubscribing only after you’ve thrown an error, but since you can’t be sure about memory leaks this way with an observable that never fires, it’s also probably not a good implementation.
I’m thinking auto-unsubscribe is more what you would expect. This way, you’ll never have an instance of memory leaks from using execute
even though you’ll miss out on some helpful “you screwed up” errors.
This implementation would’ve been a lot cleaner if I used functional composition and subscribed to the observable myself instead of extending the class, but I wanted to mimic how it might look if it was implemented in RxJS.
This API could be further improved by using from
under-the-hood. Just one less thing to import when consuming createExecutableObservable
.
Now that we’ve created our transducer logic, let’s run a performance test using the same comparison we did before:
It’s cleaner API over the for
loop, but a whole lot more code than or original map
test so as you’d expect, this would probably take a lot longer. I mean, the native example is just a single method call compared to this monster.
Once I ran the performance test, I was astonished to see an average of 371ms. That’s about 240% faster than our map
test and only 13% slower than our for
loop. I’d argue that’s good enough for me, but probably not good enough if you’re working in embedded systems and have very tight processing constraints.
For the sake of readability, would you rather write a for
loop or use your existing knowledge of RxJS transducers? Which would be easier to maintain especially when it may be another 3–4 years till you see another for
loop?
The Real Story
Transducers aren’t all peaches and cream. When you have a very small number of items, transducers are a lot slower than either solution. It’s beyond the scope of this article to find the trade-off point for most machines, but let’s redo those performance tests with an array of 3 items.
- Array Method: 0.33ms
- For Loop: 0.22ms
- Transducer: 3.06ms
All of a sudden, the tables have turned. 3ms for 3 items? That’s an order of magnitude longer than the other two solutions. Are you kidding me? Apparently not.
All that transducer logic comes at a cost. The setup, while fast in larger operations, it extremely slow when working with a small set of data.
This is where “use the right tool for the right job” comes into play. If you’ve got a small array, use the built-in methods. You lose the benefit of those amazing RxJS operators such as tap
(just use filter
), but you gain an amazing amount of speed and don’t have to jerry-rig the well-documented Array
methods with your transducer patterns.
The real dilemma comes when you’re already in an RxJS tranducer pipeline. When you’re switch mapping between an observable and a synchronous array execution, which do you use? I think that’s dependent on your team and the processing required. If you need the RxJS operators, use them. If you don’t, you can safely use JavaScripts native Array
methods so long as you don’t have a lot of items or a lot of complex transformations.
Transducers are generic and don’t care about the collection itself, only about the values coming in and out of each transducer (RxJS operator). That means we could theoretically use this same methodology with other collections as well such as iterators.
Let’s use the generator from my previous article on Lossless Backpressure in RxJS, but without the asynchronous backpressure handler:
Pretty cool right? The only reason this works is because RxJS goes through the entire generator’s values, creates and array, and pushes the values one-by-one through the pipeline.
More-complex handling of subjects, like the backpressure iterator function I wrote, don’t work like you expect and will keep you up working late into the night. If there’s one flaw in this whole design, it’s the case where you’re not sure if your pipeline is asynchronous.
While RxJS can be used as a transducer library which does result in a significantly faster execution time of large datasets, jerry-rigging the subscribe
function definitely isn’t how it’s intended.
I’d love to see if there’s a better way to do this in the future, but this use case simplifies it quite a bit without having to pull down, maintain, and learn another library.
If you’ve got an interest in more topics related to RxJS, you should checkout my other articles: