A Rx library implementation of Data Streams
Observables are collections of values that can be subscribed to by Observers and will notify them with new data. They work in Push system, meaning that data producer decides when to send data to the data consumer
They can be be viewed as generalization of functions with zero arguments, but multiple values. In that way subscribing to an observable is akin to calling a function.
Synchronicity
Subscribing to an observable is a synchronous operation, but unlike functions observables can âreturnâ multiple values over time
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100); // "return" another value
subscriber.next(200); // "return" yet another
});
And the values can be returned asynchronously
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});
Unlike calling a function with returns one value synchronously, subscribing to an observable returns any amount of values either synchronously or asynchronously
Using Observables
Creation
The constructor of an observable takes one argument, which is the subscribe
function
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.complete();
});
observable.subscribe({
next(value) { console.log('Received value:', value); },
error(err) { console.error('Error occurred:', err); },
complete() { console.log('Stream complete'); }
});
They can also be created using special creation functions
const observable = of(<values>);
creates observable emitting sequence of valuesconst observable = from([array/promise/iterable]);
converts the array/promise/iterableconst observable = fromEvent(button, 'click');
converts DOM eventconst observable = interval(<miliseconds>);
emits sequence of numbers starting at 0 every specified interval
Subscribing
observable.subscribe((x) => console.log(x));
âSubscribing to an observable is like calling function, providing callbacks where the data will be delivered toâ
observable.subscribe
returns an object - the Subscription
, which represents ongoing execution
Execution
The code inside Observable subscribe
function represents Observable execution - a lazy computation happening for each observer that subscribes, producing multiple values.
Three possible types of values:
next
notification (sends a value)error
notification (sends a js error or exception)complete
notification (doesnât send a value)
The
error
andcomplete
notifications can be only delivered once, and nothing else can be delivered afterwards
Disposal
Disposing the observable execution is handy, when we want to cancel the execution before it completes, and particularly for aborting infinite execution in finite time.
To cancel the execution call unsubscribe()
method on a Subscription
object
Example
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
// Keep track of the interval resource
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
// Provide a way of canceling and disposing the interval resource
return function unsubscribe() {
clearInterval(intervalId);
};
});