RXJS — Operators and Subjects

RXJS OPERATORS:

Operators are a very essential part of RXJS. It allows complex asynchronous code to be composed in a declarative manner.

  1. interval operator: It’s a function that takes in number as an argument and outputs an observable.
import {interval} from 'rxjs';const observable = interval(1000) // argument is no of milliseconds// subscribe the above observable
observable.subscribe({
next:(val) => console.log(`Demo of interval operator : ${val}`)
})
// outputs
// Demo of interval operator : 0
// Demo of interval operator : 1
// Demo of interval operator : 2
...
...
...

2. take operator: It’s a filter that emits first counts values emitted by observable.

import { interval } from 'rxjs';import { take } from 'rxjs/operators';const intervalObservable = interval(1000);const takeFirstThree = intervalObservable.pipe(take(3));takeFirstThree.subscribe(x => console.log(x));//Output// 0// 1// 2

SUBJECTS in RXJS

A subject is a special observable whose value can be multicasted into many observers. While plain Observable is unicast i.e each subscribed observer has its own execution state.

Code to show Observable is unicast

import {Observable} from 'rxjs';// Observable is unicast
const observable = new Observable(subscriber=>{
setTimeout(()=>{subscriber.next(Math.random())},1000)})const a = observable.subscribe(val=>console.log(`Observer A got value ${val}`));const b = observable.subscribe(val=>console.log(`Observer B got value ${val}`));// Observer A got value 0.9349096703197488
// Observer B got value 0.3966789007105411
NOTE: If you notice the outputs, they are different because each observer has its own execution state.

Code to show Subject is Multicast

import {Subject} from 'rxjs';const subject = new Subject<number>();subject.subscribe({next:(val)=>console.log(`Observer A: ${val}`)
})
subject.subscribe({
next:(val)=>console.log(`Observer B: ${val}`)
})
subject.next(Math.random());
subject.complete();
//Observer A: 0.801180339885259
//Observer B: 0.801180339885259
NOTE: If you notice carefully, all the outputs are same. Hence, subject can be subscribed as many observers as possible and yet the same value is emitted to every observers. Because, there is only one execution state existed.

BehaviorSubject

  1. It stores the latest value emitted to observers.
  2. When a new observer subscribes, it will immediately receive current value.
  3. It is useful for representing values over time.

Code Without BehaviorSubject(Normal subject)

import {Subject} from 'rxjs';const withoutBS = new Subject();console.log("without bs start")withoutBS.next(0); // latest value emitted is 0// new observer subscribes the subject
withoutBS.subscribe({
next:val=>console.log(`without BS: ${val}`),
});withoutBS.next(1);
withoutBS.next(2);
console.log("without BS end");
// without bs start
// without BS: 1
// without BS: 2
// without bs end

NOTE: If you noticed carefully when a new observer subscribed to the subject, the latest value was 0, but that was not emitted to the new observer. This is the drawback of a normal subject(Subject). Hence, we use BehaviorSubject.

Code With BehaviorSubject

import {BehaviorSubject} from 'rxjs';
const withBS = new BehaviorSubject(0);
console.log("with bs start")withBS.next(0); // latest value emitted// new observer subscribed to the subjectwithBS.subscribe({next:val=>console.log(`with BS: ${val}`),});withBS.next(1);withBS.next(2);console.log("with BS end");// with bs start
// with BS: 0
// with BS: 1
// with BS: 2
// with BS end

NOTE: If you noticed carefully when a new observer subscribed to the subject, the latest value was 0, and now that was emitted to the new observer.

ReplaySubject

It is similar to BehaviorSubject and records or remembers multiple old values and emits them to new subscribers.

import {ReplaySubject} from 'rxjs';const replayObservable = new ReplaySubject(3);// buffers 3 values for new subscriberreplayObservable.subscribe({next:(val)=>console.log(`Observer A:${val}`),});replayObservable.next(1);replayObservable.next(2);replayObservable.next(3);replayObservable.next(4);LINE **// New observer subscribed the replayObservablereplayObservable.subscribe({next:(val)=>console.log(`Observer B:${val}`),});replayObservable.next(5)// Observer A:1
// Observer A:2
// Observer A:3
// Observer A:4
// Observer B:2
// Observer B:3
// Observer B:4

// Observer A:5
// Observer B:5

NOTE: If you noticed carefully, the ReplaySubject in created with buffer 3 values for a new subscriber. Hence, when the second new subscriber subscribed to the subject, they got the last three values emitted by the replayObservable.

AsyncSubject

  1. It only emits the last value of the Observable execution and only when the execution completes.

AsyncSubject code without complete

import {AsyncSubject} from 'rxjs';const asyncSubject = new AsyncSubject();asyncSubject.subscribe({next:(val) => console.log(`Observer A:${val}`),})asyncSubject.next(1);asyncSubject.next(2);asyncSubject.next(3);// No output

NOTE: If you notice carefully, asyncSubject not yet completed. Hence, no value is emitted by asynSubject.

Now see the below code pretty similar to the above code but just once change.

import {AsyncSubject} from 'rxjs';const asyncSubject = new AsyncSubject();asyncSubject.subscribe({next:(val) => console.log(`Observer A:${val}`),})asyncSubject.next(1);asyncSubject.next(2);asyncSubject.next(3);asyncSubject.complete(); // changes from the above code// Observer A:3

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store