Observables 1 are collections of data, streams of data, or event sources that emit values over time, allowing you to react to new or updated values asynchronously by subscribing to them (subscription). In this context, data can come in various forms or different data types.
Example in real life
Imagine YouTube Live Streaming π₯
- The streamer (Observable) emits data (video frames).
- Viewers (Subscribers) receive the data in real time.
- If a viewer leaves the stream (unsubscribe), they stop receiving new updates.
Reminder
- Pull Systems: Consumer requests data (
function
,iterator
).- Push Systems: Producer sends data (
Promise
,Observable
).- Observables are better than Promises when you need multiple values over time.
- Observables are lazy, just like functions. (Execute only when its being call)
- Subscribing to an Observable is like calling a function, but it can return multiple values.
- Observables can be synchronous or asynchronous.
Pull vs. Push Systems
Pull and Push are two ways in which data flows from a Producer (who creates data) to a Consumer (who receives data).
System | Producer (Data Source) | Consumer (Data Receiver) |
---|---|---|
Pull | Passive β Produces data only when asked | Active β Requests data when needed |
Push | Active β Produces data automatically | Passive β Reacts when data arrives |
Pull
The Consumer asks for data when needed. The Producer only provides it when requested.
π§π½βπ» Examples of Pull Systems
-
Functions
- A function only runs when you call it.
function getData() { return 42; // Only returns a value when called } console.log(getData()); // 42
-
Iterators & Generators (
function*
)- You manually call
.next()
to βpullβ the next value.
function* generator() { yield 1; yield 2; yield 3; } const iter = generator(); console.log(iter.next().value); // 1 console.log(iter.next().value); // 2
- You manually call
Push
The Producer sends data whenever it wants, and the Consumer reacts to it automatically.
π§π½βπ» Examples of Push Systems
-
Promises
- A Promise resolves and sends data when itβs ready, without the consumer asking.
const promise = new Promise((resolve) => { setTimeout(() => resolve(42), 1000); }); promise.then((value) => console.log(value)); // 42 (after 1 second)
-
Observables
- Observables are Push-based, meaning they emit multiple values over time, and consumers subscribe to them.
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); setTimeout(() => subscriber.next(3), 1000); }); observable.subscribe(value => console.log(value));
- Output
1 2 (after 1 second) 3
Observables vs Functions & Promises
Observables are often misunderstood, so letβs compare them:
Feature | Function | Promise | Observable |
---|---|---|---|
Returns | Single value | Single value (async) | Multiple values (sync or async) |
Lazy Execution | β Yes | β Yes | β Yes |
Can be canceled | β No | β No | β Yes (unsubscribe) |
Push/Pull? | Pull | Push | Push |
Example | getData() | fetchData().then() | observable.subscribe() |
Observable as Generalized Function
Observables are similar to functions, but instead of returning a single value, they return a stream of values over time.
π§π½βπ»Function Example
function foo() {
console.log("Hello");
return 42;
}
console.log(foo());
console.log(foo());
β¨Function Output
Hello
42
Hello
42
π§π½βπ»Same behavior but using observable
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log("Hello");
subscriber.next(42);
});
foo.subscribe(value => console.log(value));
foo.subscribe(value => console.log(value));
β¨Output
Hello
42
Hello
42
- Just like functions, an Observable does nothing until subscribed to (lazy execution).
- Each
subscribe()
call triggers a new execution, like calling a function multiple times.
Observables Can Emit Multiple Values
Unlike functions, Observables can push multiple values over time.
π§π½βπ» Example:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log("Hello");
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log("before");
foo.subscribe(value => console.log(value));
console.log("after");
Output:
before
Hello
42
100
200
after
- The function version cannot return multiple values, but an Observable can.
Observables Can Be Asynchronous
Observables can emit values asynchronously, like Promises.
Example:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log("Hello");
subscriber.next(42);
subscriber.next(100);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});
console.log("before");
foo.subscribe(value => console.log(value));
console.log("after");
Output:
before
Hello
42
100
after
(1 second later) 300
- The first values are synchronous, but 300 is emitted asynchronously.
Key Parts of an Observables
Observables have four key parts:
- Creation β You create an Observable that will emit data.
- Subscription β Someone listens to (or watches) the Observable.
- Execution β The Observable starts emitting values.
- Unsubscription β Stop listening to the Observable.
Below showcasing code examples that uses observable:
π§π½βπ»Example 1: Basic Observable
import { Observable } from 'rxjs';
// πΉ Step 1: Create an Observable (CREATION)
const myObservable = new Observable(observer => {
observer.next('Hello');
observer.next('World');
observer.complete(); // πΉ Execution ends
});
// πΉ Step 2: Subscribe to the Observable (SUBSCRIPTION)
const subscription = myObservable.subscribe({
next: value => console.log(value), // πΉ Execution: Receive emitted values
complete: () => console.log('Observable completed'), // πΉ Execution: No more data
});
// πΉ Step 3: Unsubscribe (UNSUBSCRIPTION) - Not necessary here since it auto-completes
π Explanation:
- Creation β
new Observable(observer => {...})
defines an Observable that emits values (Hello
andWorld
). - Subscription β
subscribe({...})
starts listening for emitted values. - Execution β The Observable emits two values (
Hello
,World
) before callingcomplete()
. - Unsubscription β No need to manually unsubscribe here because the Observable completes automatically.
π§π½βπ» Example 2: Observable with Asynchronous Execution & Unsubscription
import { Observable } from 'rxjs';
// πΉ Step 1: Create an Observable (CREATION)
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(9);
subscriber.next(9);
// πΉ Emit a value asynchronously after 1 second
const timeoutId = setTimeout(() => {
subscriber.next(7);
subscriber.complete(); // πΉ Execution ends
}, 1000);
// πΉ Step 4: Unsubscribe logic (UNSUBSCRIPTION)
return () => {
clearTimeout(timeoutId); // Stop async execution if unsubscribed early
console.log('Unsubscribed: Cleanup done');
};
});
console.log('Before subscribing');
// πΉ Step 2: Subscribe to the Observable (SUBSCRIPTION)
const subscription = observable.subscribe({
next: x => console.log('Received:', x), // πΉ Execution: Process each value
error: err => console.error('Error occurred:', err),
complete: () => console.log('Observable completed'),
});
console.log('After subscribing');
// πΉ Step 3: Unsubscribe after 500ms (before the last value is emitted)
setTimeout(() => {
subscription.unsubscribe();
}, 500);
β¨Output
Before subscribing
Received: 1
Received: 9
Received: 9
After subscribing
Unsubscribed: Cleanup done
π Explanation:
- Creation β The Observable emits three values synchronously (
1, 9, 9
) and one value (7
) after a delay. - Subscription β The subscriber listens to values via
next()
. - Execution β Values are emitted immediately, and the last value is scheduled after 1 second.
- Unsubscription β The subscriber unsubscribes after 500ms, before the last value is emitted, stopping execution.