Photo by Cris Ovalle on Unsplash

Using RxJS as a Synchronous Transducer Library

Kevin Ghadyani

JavaScript Arrays are great when you only have a few items, but when you have a large amount of data or want to do complex transformations with lots of map, filter, and reduce method calls, you’ll see a significant slowdown in performance using Array.prototype methods.

Take this simple map example:

initialArray
.map(String)

Even something as simple as a single array map could be made orders of magnitude faster with transducers. As you’ll see in my performance analysis, the final numbers were incredibly surprising.

You might not even realize how slow something is until you’ve made it faster, but there are also rare situations where you’ll either be working with large arrays of data or handling a lot of transformations and need to improve performance.

Let’s do a simple performance test with 10 million items using Array.prototype.map:

And then let’s compare that to the native for loop:

I ran each test 7 times and pulled the averages. On my overclocked Intel Core i7–4770K, our array method averaged 1281ms while our for loop averaged 323ms. Incredible right? What a performance improvement! But for loops are so 10 years ago. They’re difficult to write and harder to reason about when doing complex transformations.

How do we solve this almost 300% performance difference but still keep our code readable?

Transduction is a way to of processing a bunch of logic on each individual item in an array by wrapping it in a recursive set of reducer functions before outputting a value. This is different from how JavaScript Array methods function because each item in the array is processed with a single map, filter, or reduce method before continuing to the next, and each step of the way, it’s creating completely new arrays to house that data.

There are plenty of libraries for handling transduction and a quite a few articles on how to do it to replace native Array methods, but I want to go over an experiment I tried in one of my home projects which was using RxJS instead of JavaScript’s native Array methods.

I’m a believer that it’s easier and cheaper to learn one thing well than to have to learn a ton of other things. While its important to use the right tool for the right job, I’m always exploring ways of using the same tool for a bunch of different jobs because it can minimize a team’s learning cost and significantly reduce a project’s implementation costs.

Like my article on backpressure in RxJS, this implementation is experimental. Normally, I write articles about things I’ve had in production, but these two are about ideas I’ve come up with to expand RxJS’s capabilities.

If you want to go in-depth on Transducers, Eric Elliot has a fantastic article to get you started:

A transducer runs a single item through a bunch of functions just like an RxJS stream, and that’s especially relevant now as RxJS implements the transducer pattern with their pipe method and lettable operators there might be a way to make these compatible.

I’d make the argument RxJS — if it’s already part of your project — could be used for synchronous transducers in place of array methods, but there are some caveats.

First, for whatever reason, RxJS doesn’t implement the transducer interface like other transducer libraries, so it’s not compatible with them out-of-the-box. Not a big deal if all you’re using is RxJS anyway.

Second, RxJS relies on the subscribe method which uses a callback. Array methods return arrays, not observables containing the value of an array. So what do you do? Unless you’re using BehaviorSubject, you can’t pull a synchronous value and even using it, you can’t be sure the value you get is going to be what you expect.

On top of that, RxJS observables don’t make a distinction between asynchronous and synchronous operators. You could create a synchronous wrapper around your logic, but it will only protect you at runtime.

Since RxJS’s BehaviorSubject is close to what we want, let’s look at an implementation using it:

Yuck! This is not something I wanna do over-and-over again in my codebase. And having to remember to unsubscribe after subscribing? And remembering to mergeMap each time? You gotta be kidding me.

Also note, this doesn’t even work. Your value is the original array you passed in, not the one that went through the pipeline. You can only pickup your transduced value by passing in a handler to subscribe so BehaviorSubject turned out to be pretty worthless.

If we used from, we could mimic the same behavior, get it working, and use less code:

This is a ton cleaner and value is correct now, but it still has the same issues as before: hard to maintain and hard to write (and gross). If it’s only one spot in your codebase, no problem, but more than that? Start thinking of better ways.

What we need is a way to remove our dependency on the subscribe method to make an observable function synchronously and return a value. In this case, we want an array.

To get a value back, we’ll need something like this on our observable:

source$
.execute = () => {
// do transducer logic

return value
}

While this is a method that doesn’t currently exist in RxJS, it’s definitely something we could add ourselves in our own projects.

RxJS has a similar method to our made-up execute called forEach, but it’s designed similarly to Array.prototype.forEach except that instead of returning undefined, it returns a promise. The promise then method doesn’t return anything, but it does say when your observable is complete. If it errors, catch will trigger. It’s a pretty nifty setup, but not useful when we’re trying to replicate Array's synchronous operations.

Because we’re dealing with an array, there are no values over time. RxJS already handles this and completes the observable when it’s hit the end of the list. If we wrap subscribe in our execute function, we could have it return a synchronous value similar to how async-await works without it requiring you rewrite your entire application.

This is how I imagine we’d use execute:

And with this particular implementation, I was able to capture an error when asynchronous code ran:

Notice how our value is undefined, but we’re still able to see an error about its usage sometime after. Because it’s asynchronous, we can’t use try-catch to find the error, but we can log it to the console at runtime so you can be aware something’s wrong.

But this implementation has a flaw wherein you could have an asynchronous observable that never unsubscribes and only logs errors.

Instead, we need to unsubscribe immediately after execution, but we’ll have no way to know our observable has asynchronous operators anymore:

The only other way would be adding a take(1) operator and unsubscribing only after you’ve thrown an error, but since you can’t be sure about memory leaks this way with an observable that never fires, it’s also probably not a good implementation.

I’m thinking auto-unsubscribe is more what you would expect. This way, you’ll never have an instance of memory leaks from using execute even though you’ll miss out on some helpful “you screwed up” errors.

This implementation would’ve been a lot cleaner if I used functional composition and subscribed to the observable myself instead of extending the class, but I wanted to mimic how it might look if it was implemented in RxJS.

This API could be further improved by using from under-the-hood. Just one less thing to import when consuming createExecutableObservable.

Now that we’ve created our transducer logic, let’s run a performance test using the same comparison we did before:

It’s cleaner API over the for loop, but a whole lot more code than or original map test so as you’d expect, this would probably take a lot longer. I mean, the native example is just a single method call compared to this monster.

Once I ran the performance test, I was astonished to see an average of 371ms. That’s about 240% faster than our map test and only 13% slower than our for loop. I’d argue that’s good enough for me, but probably not good enough if you’re working in embedded systems and have very tight processing constraints.

For the sake of readability, would you rather write a for loop or use your existing knowledge of RxJS transducers? Which would be easier to maintain especially when it may be another 3–4 years till you see another for loop?

The Real Story

Transducers aren’t all peaches and cream. When you have a very small number of items, transducers are a lot slower than either solution. It’s beyond the scope of this article to find the trade-off point for most machines, but let’s redo those performance tests with an array of 3 items.

  • Array Method: 0.33ms
  • For Loop: 0.22ms
  • Transducer: 3.06ms

All of a sudden, the tables have turned. 3ms for 3 items? That’s an order of magnitude longer than the other two solutions. Are you kidding me? Apparently not.

All that transducer logic comes at a cost. The setup, while fast in larger operations, it extremely slow when working with a small set of data.

This is where “use the right tool for the right job” comes into play. If you’ve got a small array, use the built-in methods. You lose the benefit of those amazing RxJS operators such as tap (just use filter), but you gain an amazing amount of speed and don’t have to jerry-rig the well-documented Array methods with your transducer patterns.

The real dilemma comes when you’re already in an RxJS tranducer pipeline. When you’re switch mapping between an observable and a synchronous array execution, which do you use? I think that’s dependent on your team and the processing required. If you need the RxJS operators, use them. If you don’t, you can safely use JavaScripts native Array methods so long as you don’t have a lot of items or a lot of complex transformations.

Transducers are generic and don’t care about the collection itself, only about the values coming in and out of each transducer (RxJS operator). That means we could theoretically use this same methodology with other collections as well such as iterators.

Let’s use the generator from my previous article on Lossless Backpressure in RxJS, but without the asynchronous backpressure handler:

Pretty cool right? The only reason this works is because RxJS goes through the entire generator’s values, creates and array, and pushes the values one-by-one through the pipeline.

More-complex handling of subjects, like the backpressure iterator function I wrote, don’t work like you expect and will keep you up working late into the night. If there’s one flaw in this whole design, it’s the case where you’re not sure if your pipeline is asynchronous.

While RxJS can be used as a transducer library which does result in a significantly faster execution time of large datasets, jerry-rigging the subscribe function definitely isn’t how it’s intended.

I’d love to see if there’s a better way to do this in the future, but this use case simplifies it quite a bit without having to pull down, maintain, and learn another library.

If you’ve got an interest in more topics related to RxJS, you should checkout my other articles: