Intro to Rxjs for Javascript Developers

List Of Content

ADS Area (CARBON)

ADS Area (CARBON)

 

 

What is Rxjs

Rxjs or Reactive Extension for javascript is a library for working with the Reactive pattern of Observables and Observers which widely used for dealing with a stream of data constantly emitting data to the observer which reacts upon this events and changes.

Mainly working with synchronous/asynchronous code on javascript.

Rxjs is a portal of the pattern which is available for other languages too (C++, Java...).

Why use Rxjs

In my opinion, any developer who wants to work and manipulate stream and sequence of data should consider learning Rxjs especially for javascript developers since Rxjs is a hot technology and a lot of companies and teams are recruiting with such requirement for landing a web developer job.

Here are some points I found true after using Rxjs for a while:

  • Run both Synchronous & Asynchronous code

  •  Iterable objects and Promises can be used as within Rxjs

  • Emit multiple events over time

  • A large set of operators to work with standard data types

  • Observe any Object and React upon changes

Check the official Website for Reactive Extension Rectivex.io

And the official Docs for Rxjs for javascript rxjs.dev

Rxjs Observable

An Observable is a special kind of object that emits a sequence of events over time it represents a stream of data the data flow ends with a complete or error status either one of them when emitted it ends the lifecycle of the observable and becomes useless.

For every Observable there has to be an observer which is basically a callback(s) gets executed when the observable emit a new value and a complete and error callbacks for reacting to either the complete status or the error status.

Observables are cold by default which means they won't run until you subscribe (register) at least an observer to it so it could start receiving a sequence of data from the Observable.

When you work with Rxjs most of the time you will be creating (generating) observables through the help of operators which can generate an observable for a specific operation, for example, emitting all array values one after the other using the from the operator.

Here is a diagram shows how an Observable operates with more details.

Let's create our first Observable, I will be using a basic Nodejs application where Rxjs also works on Browser.

const rxjs = require("rxjs");
//Create an Observable instance 
const myObservable = new rxjs.Observable(subscriber => {
  //TODO: You can Fetch data from the Server API
  //Emit a value(s)
  subscriber.next("Hello");
  subscriber.next("Hello");
  subscriber.next(15);
  subscriber.next({ id: 1 });

  //Finish Execution Successfully
  subscriber.complete()

  //Or you can call error if there are any errors 
  //subscriber.error("Err, No one should use Jquery!");
});

You simply create an observable through the Observable class which takes a callback function takes the subscriber (observer) as the first argument so you can emit data to the observer and the observer can react upon these notifications (events).

As I told you before, Observables are COLD so they won't run, until we subscribe at least a single observer to it.

//We subscribe to myObservable and that's what an observer looks like 
myObservable.subscribe(
  v => console.log("Next: ", v),
  err => console.error(err),
  () => console.log("Completed Successfully!")
); 

The first thing you notice on the Observer definition is it takes three callback functions, the first takes care of the next imitation of any value so all emitted events and values from the observable implementation will be caught by this callback function.

The second one is the error callback function which catches the error if any gets emitted.

the last function is for receiving complete status so it could react upon this.

Note: complete & error statuses can only be emited once inside an observable definition, and all code comes after the call of error() or complete() will be ignored and not executed take it as the return keyword

Now run the application you should get something similar to:

Next:  Hello
Next:  Hello
Next:  15
Next:  { id: 1 }
Next:  After two seconds
Completed Successfully!

So as we called the next method on subscriber each value get emitted and caught by the next callback listener on the observable as well as the complete status event.

Rxjs Subject

Subjects are a special kind of observables the main key difference is Subjects can play both roles of an Observable and an Observer at the same time which means it can emit events and status (Observable) and at the meantime listens for them (Observer).

Here are some key differences between a Subject and an Observable.

  • Subjects are Hot they run immediately after initiating them while Observable needs an at least a subscriber in order to start emitting values.
  • Subjects don't have the function scope like an Observable which gives them the ability to play the role of an Observable of being emitting values and statuses and listen for those events as an Observer.
  • All Subject Observers share the same execution context so instead of creating a new version of an Observable definition each time we subscribe to the Subject it does share the same definition with all of its subscribers (observers).

There are three main types of Subjects.

  • BehaviorSubject it does provide you with the ability to specify an initial value to be emitted to all observers at first when they subscribe.
  • ReplaySubject it does provide you with the ability to get all the previously emitted values before the observer subscribes to the stream (BehaviorSubject).
  • AsyncSubject it allows you to run Async code with a scheduler.

Let's see how a regular Subject works.

const mySubject = new rxjs.Subject();
mySubject.next("This value is pushed before subject runs");
mySubject.next("THIS WILL NOT BE RECEIVED");

mySubject.subscribe(
  v => console.log("Next: ", v),
  err => console.error(err),
  () => console.log("Completed Successfully!")
);

mySubject.next("this value gets received");

We run the above code

Next:  this value gets received

We only get the value emitted after subscribing to the subject stream and that's what I mean by subjects are Hot Observables so they run even though there is not subscriber observing for values and statuses.

And that's where other types of Subjects comes in handy.

Rxjs Behavior Subject

BehaviorSubjects allows you to specify an initial value to be emitted to all observers at first when they first subscribe to the subject.

const mySubject = new rxjs.BehaviorSubject("INITIAL");

mySubject.subscribe(
  v => console.log("Next: ", v),
  err => console.error(err),
  () => console.log("Completed Successfully!")
);

mySubject.next("this value gets received");

When running you get.

Next:  INITIAL
Next:  this value gets received

If you can notice we're getting the INITIAL value even though we are subscribing late to the Subject to the overall functionality of a BehaviorSubject is it emits the lastly emitted value before any observer subscribes to the subject stream in this case we don't have a value emitted before the subscription that's why we're getting the initial value we specified on the constructor.

Rxjs Replay Subject

ReplaySubject allows you to get all the emitted values before an observer subscribes to the stream which means you could initialize your ReplaySubject and emit three values after that you subscribe an observer to the subject, this observer once it subscribes it gets all the three emitted values before by the order of imitation.

//Initialize a ReplaySubject 
const mySubject = new rxjs.ReplaySubject();
//We emit two values before we subscribe to the stream
mySubject.next("This value is pushed before subject runs");
mySubject.next("THIS IS RECEIVED");

mySubject.subscribe(
  v => console.log("Next: ", v),
  err => console.error(err),
  () => console.log("Completed Successfully!")
);

mySubject.next("this value gets received");

We run the code 

Next:  This value is pushed before subject runs
Next:  THIS IS RECEIVED
Next:  this value gets received

As you can clearly see we get all the emitted values even before we subscribe to the stream, in this case, it plays the role of a regular Observable by being cold but it's actually hot since it runs without needing an observer to subscribe.

Rxjs Operators

Operators are the sugar of Rxjs library since they allow to easily create Observable for specific operators without defining the whole Observable logic by your own.

So, for example, you have an Array of data and you want to emit all the items by order using an observable you can achieve this by using the from operators which takes an iterable object anything like Promise, Array, string... and returns an Observable.

const rxjs = require('rxjs');

const myData = ["hi", 15, "icecream"];
const myDataObservable = rxjs.from(myData).subscribe((v) => console.log("Next: ",myData));

You will get all array items emitted to the subscribed observable(s).

Next: hi
Next: 15
Next: icecream

If you use from with a string you will get a single character emitted by a time.

You can check the full list of available operators with detailed examples and info about each operator on the Docs

Timer Operator

The Timer operator is used to create an Observable that emits a status after a period of time specified in milliseconds.

//We create a timer observable and we pipe the emited value to a custom value using pipe method
//We also use mapTo operator to map what ever emitted value to a custom value we specify
const timer1 = rxjs.timer(1000).pipe(mapTo("hi"));
//And make sure to subscribe to it
timer1.subscribe(
  v => console.log("Next: ", v),
  err => console.error(err),
  () => console.log("Completed Successfully!")
);

The value hi will be emitted after one second of subscribing to the timer Observable.

//Afet 1 second 
Next: hi

The Timer Observable will immediately complete after emitting a single value when the time is elapsed.

Concat Operator 

The concat operator is used for concatenating two or more Observables together which will run every single Observable after the one before completes, so it waits for the completion of one observable for it to move to the next.

const timer1 = rxjs.timer(1000).pipe(mapTo("hi"));
const timer2 = rxjs.timer(2000).pipe(mapTo("hello"));

rxjs
  .concat(timer1, timer2)
  .subscribe(
    v => console.log("Next: ", v),
    err => console.error(err),
    () => console.log("Completed Successfully!")
  );

Concat will run the first timer1 which takes one second to completes then it moves to the second one which takes two seconds to completes and finally it completes the generated Observable of concat.

//After one second 
hi
//Wait for timer1 to complete then after two seconds 
hello

Map & MapTo Operators

Map operator can map any emitted value to another value the same as mapTo works, those operators only manipulate data through the pipe method they don't generate a new Observable.

From the previous example of timer1 & timer2, we can map the newly emitted value from the newly generated concat Observable to a custom Object instead.

//Timers generation here...
//We map each emitted value to a new object with item property holding the actual value 
//and new added exists property set to true.
rxjs
  .concat(timer1, timer2)
  .pipe(map(v => ({ item: v, exists: true })))
  .subscribe(
    v => console.log("Next: ", v),
    err => console.error(err),
    () => console.log("Completed Successfully!")
  );

After running you will get the newly generated Object instead through pipe.

//After one second 
{item: 'hi', exists: true}
//Wait for timer1 to complete then after two seconds 
{item: 'hello', exists: true}

Those are the Operators I find so important to know which I use almost on every observable, you can check the other available operators from the Official API Docs.

 

Share Tutorial

Made With By

Ipenywis Founder, Game/Web Developer, Love Play Games