Introduction to RxJS – Extensions for JavaScript Reactive Streams

Reactive Extensions for JavaScript (RxJS) is a precise alternative for callback or Promise-based libraries. It treats any ubiquitous source of events in the exact same manner, whether it is reading a file, making an HTTP call, clicking a button, or moving the mouse. RxJS is built on top of the pillars of functional and reactive programming, as well as a few popular design patterns such as Observer and Iterator.

Everything – just like a Stream

To work with RxJS, we should think in terms of streams. The image below shows a simple stream (or pipeline) approach to handling data:


Producer is data source that produces various forms of data to be consumed.
Pipeline is series of logic blocks that will be executed in order when data becomes available. The data (stream of asynchronous data) are filtered and processed in different ways so that they can be more easily consumed by Consumer.
Consumer subscribes to (or listen for) Producer’s events and will do something with (consume) received data.

Any data point that holds one or more values, from a single integer to bytes of data, can be applied to the concept of a stream. Streams originate from a Producer, where data flows through a pipeline, arriving at a Consumer.

For example, we have set of operations (filter, map) that occurs between the creation of the Producer of the stream (the array) and the Consumer (the function that logs to the console):


We can create streams from static data sources: numbers (or strings), sequences, or arrays. But the power of RxJS is that it can deal with dynamic data sources in exactly the same way.

Components of an Rx Stream


A stream must always have a Producer – source of data. It is the starting point for any logic performing in RxJS.

In practice, a Producer is created from something that generates events independently (a single value, an array, mouse clicks, a bytes stream from a file). In RxJS, we call it Observable (as it’s able to be observed).

Observable is responsible for something like pushing notifications, which means that it only emits events and doesn’t care about consuming them.


We also need a Consumer to accept events from the Producer and process them in some specific ways. When Consumer begins listening (subscribing) to Producer for events, we now have a Stream. RxJS uses Consumer as an Observer.

With RxJS, streams travel only from the Producer to the Consumer, not the other way around. This means that streams always flow from an Observable to an Observer. In addition, both components are loosely coupled:


Once the Observer begins receiving events from the Observable, what can we do with the data?
>> Within the Data Pipeline.

Data Pipeline

RxJS gives us ability to manipulate and edit data when it passes from Producer to Consumer by a list of methods (Observable operators). It means that we can adapt the output of the Producer to match the expectations of the Consumer.

This design principle is typically extremely hard to accomplish in large-scale JavaScript applications, but RxJS facilitates this model of design.


Time is the implicit important factor behind all. There’s always an underlying concept of time when manipulating streams. We can build streams that run slower or faster depending on requirements.


Data sources with Rx.Observable

Types of Data
Emitted data

Emitted data is created as a result of some sort of interaction with the system such as mouse click, key press or file read.

Some of these will have at most one event, we request data and then receive a response.
=> Promise can be a good solution.

Other are part of a continuous process.
=> We must treat them as event emitters that produce multiple discrete events at future times.

Static data

This is the data which exists in the system (in memory). For instance, an array or a string.

Interacting with static data is usually like iterating through it. We could think of associative arrays or maps as unordered static data.

Generated data

This data is created periodically or eventually such as a bell ringing every hour or generating Fibonacci sequence using ES6 generators.

This kind of sequence is infinite and too large to store in memory.
=> Each value should be generated dynamically to the client as needed. So we could place setTimeout() and setInterval() functions, or take(number) to limit the quantity.

Create Observable

Rx.Observable.create is an alias for the Observable constructor which takes subscribe function as argument. So we can create an Observable with a constructor using new:

Or create() method:

Observable can be also created with of(), from(), interval() method…

Consuming data with Observer

Data that’s emitted and processed through an Observable needs a destination which Observer object is.

Subscribe to Observable

Subscribing to an Observable is calling subscribe() function inside Observable‘s constructor:

Each call to observable.subscribe() triggers its own independent setup, start an Observable execution and deliver values or events to an Observer of that execution.

Execute Observable

The code inside Observable.create(function subscribe(observer) {...}) represents an Observable execution.

The execution produces multiple values over time. There are three types of values:
– “Next” notification: actual data being delivered to an Observer such as a Number, a String, an Object…
– “Error” notification: a JavaScript error or exception.
– “Complete” notification: signal for completing a stream, now Observer can do something finally.

Nothing else can be delivered once an Error or Complete notification is delivered, and there can only be either one of them. And Observable is lazy computation because its execution only happens when an Observer subscribes.

So, what is an Observer?
Observer is just an object with three callbacks that each callback is correlated to each type of notification (next, error, complete) that an Observable may deliver.


For example:

If we don’t provide one of the callbacks, the execution will still happen normally, except some types of notifications will be ignored:

When subscribing to an Observable, we don’t need to attach callbacks to an Observer object, then pass this object to observable.subscribe(). We can just provide them as arguments:

Dispose Observable Execution with Subscription

Observable Execution may be infinite, so we need a way to cancel the execution to avoid wasting computation power or memory resources for unnecessary data – unsubscribe() method.

When observable.subscribe(observer) is called, the observer gets attached to Observable execution. This call also returns a Subscription object which represents the ongoing execution.
=> call subscription.unsubscribe() to cancel the execution.

We can add one Subscription into another. So calling unsubscribe() of parent Subscription may unsubscribe multiple children Subscriptions:

Observable Operators for Data Pipeline

Operators allows us to inject logic into an Observable’s pipeline. An operator is a pure and higher-order function. It never changes the existing Observable instance, but returns a new Observable that continues the chain.


These Operators can be used to inspect, alter, create, or delay events after they leave the Data Source but before they reach the Consumer. In other words, anything in your business logic pipeline is handled by the combination of one or more Operators.

And RxJS Operators are also lazily evaluated.

For example, this Operator has Observable and Function as arguments:

Using the Operator above, we can create new Observable:

Instance Operators vs Static Operators
Instance Operator

Instance Operator is method on an Observable instance. For example, if the operator jsamap above is an official instance operator, it will be like this:

Static Operator

Static Operator is method attached to the Observable class directly. We usually use it to create new Observable from scratch (non-Observable arguments).

Many instances of Static Operators are of, from, interval, fromPromise, empty, merge

Operator Categories
Creation Operators

We have known that create operator can be used to create Observable. There are other Creation Operators in common cases.

of: emits one or some specific values given as arguments, immediately one after the other, and then emits a complete notification:

Similar Operators:
range: emits a sequence of numbers within a specified range.
interval: emit sequential numbers every specified interval of time.
timer: is like interval, but we can specify when the emissions start.
throw: emits an error notification.
empty: emits a complete notification.
never: emits no items to the Observer.

fromEventPattern: converts any addHandler/removeHandler API to an Observable. The addHandler is called when the output Observable is subscribed, and removeHandler is called when the Subscription is unsubscribed.

fromPromise: returns an Observable that just emits the Promise‘s resolved value as a next, then completes.

from: creates an Observable from an Array, an array-like object, a Promise, an Iterable object, or an Observable-like object.

repeat: repeats the stream of items emitted by the source Observable at most count times.

Transformation Operators

Transformation Operators transform items that are emitted by an Observable into values as another Observable.

map: passes each source value through a transformation function to get corresponding output values.

scan: scan(accumulator: function, seed: any): Observable
It emits the current accumulation whenever the source emits a value.

buffer: collects output values as an array, and emits that array only when another Observable emits.

bufferTime: it collects output values as an array similar to buffer, and emits the array periodically in specific time.

The example code for buffer above becomes:

Filtering Operators

Filtering Operators provide techniques for picking values from an Observable and dealing with back-pressure (when fast data source doesn’t overwhelm the stream destination).

filter: filter(select: Function, thisArg: any): Observable
It only emits a value from the source if it passes condition in the select Function.

take: emits a number of first values from the source, limited by given quantity, then completes.

takeUntil: emits the values from the source Observable until a notifier Observable emits a value, then completes. It is helpful for us to auto-unsubscribe.

takeWhile: takeWhile(predicate: function(value, index): boolean): Observable
It takes values from the source only while they pass the condition given (predicate returns true). If predicate returns false, it completes.

skip: skips a number of first values from the source.

Similar to takeUntil, takeWhile, intead of take action, we have skip action on skipUntil, skipWhile.

first: first(predicate: function, select: function)
It emits only the first value or the first value that predicate returns true.

debounceTime: emits a value from the source Observable only after a particular time span has passed without another source emission (discards emitted values that take less than the specified time between output).

For example:


distinctUntilChanged: only emits when the current value is different than the last.

Combination Operators

Combination Operators combine data from multiple Observables.

merge: turns multiple Observables into a single Observable. We can use both Static Operator & Instance Operator:
Rx.Observable.merge(observables: ...ObservableInput, concurrent: number): Observable
merge(input: Observable, concurrent: number): Observable
With concurrent is the maximum number of input Observables being subscribed to concurrently

concat: sequentially emitting Observables’ values, one Observable after the other.
It is equivalent to merge(...observables, 1).

startWith: emits given value first, then emit values by the source Observable.

combineLatest: combineLatest(observables: ...Observable, project: function): Observable
It emits the latest value from all Observables whenever any input Observable emits a value. Notice that the first emission only occurs when all Observables have emitted.

We can use project function to transform the combined latest values into a new value:

withLatestFrom: source.withLatestFrom(other: Observable, project: Function): Observable
It emits values which are calculated from the latest values of each, but only when the source emits.

forkJoin: forkJoin(...observables, selector : function): Observable
It emits the last emitted value from each Observables when all of them complete.
Notice that if any of the inner Observables get an error, we will lose the value of any Observable that would or have already completed if we do not catch the error correctly.

zip: emit values (with the same index) as an array after all Observables emit.
If the latest parameter is a function, this function is used to compute the created value from the input values (instead emit an array of values).
The process will continue until at least one inner Observable completes.

Error Handling Operators

The Operators provide ways to handle errors and retry logic.

catch: catchs Error by returning a new Observable.

retry: restarts Observable a specific number of times if an error occurs. If not indicate the number, it restarts forever.

retryWhen: retryWhen(notifier: function(errors: Observable): Observable): Observable
It takes an errors Observable with error as its only parameter.
– If the source Observable calls error -> emit the Throwable that caused the error to the Observable returned from notifier.
– If that Observable calls complete or error -> call complete or error on the child subscription.
– Otherwise -> resubscribe to the source Observable.

The code above will retry forever if the error keep happening. To limit the amount of times, we can use scan operator to keep track of how many retries have been made, then throw the error if it exceeds the number.

Multicasting Operators

By default, RxJS Observables are cold (or unicast). Multicasting Operators can make an observable hot (or multicast), allowing side-effects to be share among multiple Subscribers.

share: returns a new Observable that multicasts (shares) the original Observable among multiple Subscribers.
– As long as there is at least one Subscriber this Observable will be subscribed and emitting data.
– When all Subscribers have unsubscribed it will unsubscribe from the source Observable.

publish: returns a ConnectableObservable that can begin emitting items to Observers that have subscribed to it when connect method is called.

Utility Operators

These operators provide helpful utilities in your observable toolkit.

do: do(nextOrObserver: Observer|function, error: function, complete: function): Observable
It returns an Observable that is identical to the source, we can use to perform actions or side-effects such as logging.

delay: shifts time for emitting each item by some specified amount of milliseconds.

There are other useful Operators such as delayWhen, timeout, toArray, toPromise that we can find at

Other Operators

– Conditional and Boolean Operators:

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

– Mathematical and Aggregate Operators:

  • count
  • max
  • min
  • reduce

For more details, please visit:

By JavaSampleApproach | March 8, 2018.

Related Posts

Got Something To Say:

Your email address will not be published. Required fields are marked *