How to Create an Observable
An Observable defines a "stream of data," and there are a wide variety of ways to create one. RxJS provides a variety of means to create custom Observables or to easily generate Observables from events, arrays, HTTP responses, etc.
This section provides a comprehensive overview of how to create Observables in RxJS, from basic syntax to practical applications.
Classification of Observable Creation Methods
The following is a list of the main creation methods by category.
| Category | Main Methods | Description |
|---|---|---|
| Custom Creation | new Observable() | High flexibility but requires more code. Manual cleanup required |
| Creation Functions | of(), from(), fromEvent(), interval(), timer(), ajax(), fromFetch(), scheduled() | Commonly used data, event, and time-based generation functions |
| Special Creation Functions | defer(), range(), generate(), iif() | Control-oriented, loop-oriented generation, conditional switching, etc. |
| Special Observables | EMPTY, NEVER, throwError() | For completion, no action, and error emission |
| Subject Family | Subject, BehaviorSubject | Special Observable that functions as both observer and sender |
| Callback Conversion | bindCallback(), bindNodeCallback() | Convert callback-based functions to Observable |
| Resource Control | using() | Perform resource control at the same time as subscribing to Observable |
| WebSocket | webSocket() | Handle WebSocket communication as bidirectional Observable |
Custom Creation
new Observable()
The most basic method is to use the Observable constructor directly. This method is most flexible when you want to define custom Observable logic. Fine-grained behavior control is possible through explicit next, error, and complete calls.
import { Observable } from 'rxjs';
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
observable$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// CompleteCAUTION
If you use new Observable(), you must write the explicit resource release (cleanup process) yourself.
const obs$ = new Observable(subscriber => {
const id = setInterval(() => subscriber.next(Date.now()), 1000);
return () => {
clearInterval(id); // Explicit resource release
};
});On the other hand, RxJS built-in creation functions such as fromEvent() and interval() have appropriate cleanup processes inside.
const click$ = fromEvent(document, 'click');
const timer$ = interval(1000);They use addEventListener or setInterval internally and are designed so that RxJS automatically calls removeEventListener or clearInterval() when unsubscribe().
Note that even if the cleanup process is implemented inside RxJS, that process will not be executed unless unsubscribe() is called.
const subscription = observable$.subscribe({
// Omitted...
});
subscription.unsubscribe(); // 👈- No matter which method you use to create an Observable, be sure to get into the habit of
unsubscribe()when you no longer need it. - Forgetting to unsubscribe will keep event listeners and timers running, causing memory leaks and unexpected side effects.
Creation Functions
For more concise and application-specific Observable creation, RxJS provides "Creation Functions". These can be used to simplify code for repeated use cases.
NOTE
In the official RxJS documentation, these are categorized as "Creation Functions". Previously (RxJS 5.x ~ 6) they were called "creation operators," but since RxJS 7, "Creation Functions" is the official term.
of()
The simplest Observable creation function that issues multiple values one at a time in sequence.
import { of } from 'rxjs';
const values$ = of(1, 2, 3, 4, 5);
values$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output: Value: 1, Value: 2, Value: 3, Value: 4, Value: 5, CompleteIMPORTANT
Difference between of() and from()
of([1, 2, 3])→ issues a single array.from([1, 2, 3])→ issues individual values1,2,3in sequence.
Note that this is often confused.
TIP
For detailed usage and practical examples, see of() detail page.
from()
Generates an Observable from an existing data structure such as an array, Promise, or iterable.
import { from } from 'rxjs';
// Create from array
const array$ = from([1, 2, 3]);
array$.subscribe({
next: value => console.log('Array value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Create from Promise
const promise$ = from(Promise.resolve('Promise result'));
promise$.subscribe({
next: value => console.log('Promise result:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Create from iterable
const iterable$ = from(new Set([1, 2, 3]));
iterable$.subscribe({
next: value => console.log('Iterable value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Array value: 1
// Array value: 2
// Array value: 3
// Complete
// Iterable value: 1
// Iterable value: 2
// Iterable value: 3
// Complete
// Promise result: Promise result
// CompleteTIP
For detailed usage and practical examples, see from() detail page.
fromEvent()
Function to handle event sources such as DOM events as an Observable.
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe({
next: event => console.log('Click event:', event),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Click event: PointerEvent {isTrusted: true, pointerId: 1, width: 1, height: 1, pressure: 0, …}CAUTION
Note the supported event targets
fromEvent()supports browser DOM elements (EventTarget implementation), Node.js EventEmitter, and jQuery-like event targets.- Multiple subscriptions may add multiple event listeners.
👉 For more detailed examples of event stream utilization, see Event Streaming.
TIP
For detailed usage and practical examples, see fromEvent() detail page.
interval(), timer()
📘 RxJS Official: interval, 📘 RxJS Official: timer
This function is used when you want to issue values continuously at regular intervals or when you need time control.
import { interval, timer } from 'rxjs';
// Issue values every second
const interval$ = interval(1000);
interval$.subscribe({
next: value => console.log('Interval:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Start after 3 seconds, then issue values every second
const timer$ = timer(3000, 1000);
timer$.subscribe({
next: value => console.log('Timer:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Interval: 0
// Interval: 1
// Interval: 2
// Timer: 0
// Interval: 3
// Timer: 1
// Interval: 4
// Timer: 2
// .
// .interval() and timer() are frequently used for time-controlled processing, especially suitable for animation, polling, and asynchronous event delays.
CAUTION
Note that Cold Observable
interval()andtimer()are Cold Observable and are executed independently for each subscription.- You can consider making them Hot with
share()or other methods if necessary.
For details, see the "Cold Observable and Hot Observable" section.
TIP
For detailed usage and practical examples, see interval() detail page and timer() detail page.
ajax()
Function for asynchronous handling of HTTP communication results as Observable.
import { ajax } from 'rxjs/ajax';
const api$ = ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1');
api$.subscribe({
next: response => console.log('API response:', response),
error: error => console.error('API error:', error),
complete: () => console.log('API complete')
});
// Output:
// API response: {userId: 1, id: 1, title: 'delectus aut autem', completed: false}
// API completeNOTE
RxJS ajax uses XMLHttpRequest internally. On the other hand, RxJS also has an operator called fromFetch, which uses the Fetch API to make HTTP requests.
TIP
For detailed usage and practical examples, see ajax() detail page. For an overview of HTTP communication functions, see HTTP Communication Creation Functions.
fromFetch()
fromFetch() wraps the Fetch API and allows you to treat HTTP requests as Observables. It is similar to ajax(), but more modern and lightweight.
import { fromFetch } from 'rxjs/fetch';
import { switchMap } from 'rxjs';
const api$ = fromFetch('https://jsonplaceholder.typicode.com/todos/1');
api$.pipe(
switchMap(response => response.json())
).subscribe({
next: data => console.log('Data:', data),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Data: {completed: false, id: 1, title: "delectus aut autem", userId: 1}
// CompleteNOTE
Because fromFetch() uses the Fetch API, unlike ajax(), initialization of request settings and .json() conversion of responses must be done manually. Proper error handling and HTTP status checking are also required.
TIP
For detailed usage and practical examples, see fromFetch() detail page. For an overview of HTTP communication functions, see HTTP Communication Creation Functions.
scheduled()
scheduled() is a function that allows you to explicitly specify a scheduler for published functions such as of() and from(). Use this function when you want to control the timing of synchronous or asynchronous execution in detail.
import { scheduled, asyncScheduler } from 'rxjs';
const observable$ = scheduled([1, 2, 3], asyncScheduler);
observable$.subscribe({
next: val => console.log('Value:', val),
complete: () => console.log('Complete')
});
// Execution is asynchronous
// Output:
// Value: 1
// Value: 2
// Value: 3
// CompleteNOTE
scheduled() allows existing synchronous functions (e.g. of(), from()) to work asynchronously. This is useful for testing and UI performance optimization where asynchronous processing control is required.
TIP
For detailed usage and practical examples, see scheduled() detail page. For an overview of control functions, see Control Creation Functions.
defer()
It is used when you want to delay the generation of an Observable until subscription time.
import { defer, of } from 'rxjs';
const random$ = defer(() => of(Math.random()));
random$.subscribe(value => console.log('1st:', value));
random$.subscribe(value => console.log('2nd:', value));
// Output:
// 1st: 0.123456789
// 2nd: 0.987654321NOTE
defer() is useful when you want to create a new Observable on each subscription. You can achieve lazy evaluation.
TIP
For detailed usage and practical examples, see defer() detail page.
range()
Generates a continuous integer value in the specified range as an Observable.
import { range } from 'rxjs';
const numbers$ = range(1, 5);
numbers$.subscribe({
next: value => console.log('Number:', value),
complete: () => console.log('Complete')
});
// Output:
// Number: 1
// Number: 2
// Number: 3
// Number: 4
// Number: 5
// CompleteTIP
For detailed usage and practical examples, see range() detail page.
generate()
Generates Observable like a loop structure. Allows fine control over initial values, conditions, increases/decreases, and output of values.
import { generate } from 'rxjs';
const fibonacci$ = generate({
initialState: [0, 1],
condition: ([, b]) => b < 100,
iterate: ([a, b]) => [b, a + b],
resultSelector: ([a]) => a
});
fibonacci$.subscribe({
next: value => console.log('Fibonacci:', value),
complete: () => console.log('Complete')
});
// Output:
// Fibonacci: 0
// Fibonacci: 1
// Fibonacci: 1
// Fibonacci: 2
// Fibonacci: 3
// Fibonacci: 5
// Fibonacci: 8
// Fibonacci: 13
// Fibonacci: 21
// Fibonacci: 34
// Fibonacci: 55
// Fibonacci: 89
// CompleteTIP
For detailed usage and practical examples, see generate() detail page.
iif()
Use this function when you want to switch Observable by conditional branching.
import { iif, of, EMPTY } from 'rxjs';
const condition = true;
const iif$ = iif(() => condition, of('Condition is true'), EMPTY);
iif$.subscribe({
next: val => console.log('iif:', val),
complete: () => console.log('Complete')
});
// Output:
// iif: Condition is true
// CompleteNOTE
iif() can dynamically switch the Observable to return depending on conditions. This is useful for flow control.
Special Observables
EMPTY, NEVER, throwError()
📘 RxJS Official: EMPTY, 📘 RxJS Official: NEVER, 📘 RxJS Official: throwError
RxJS also provides special Observables that are useful for execution control, exception handling, and learning.
import { EMPTY, throwError, NEVER } from 'rxjs';
// Observable that completes immediately
const empty$ = EMPTY;
empty$.subscribe({
next: () => console.log('This is not displayed'),
complete: () => console.log('Completes immediately')
});
// Observable that issues an error
const error$ = throwError(() => new Error('Error occurred'));
error$.subscribe({
next: () => console.log('This is not displayed'),
error: err => console.error('Error:', err.message),
complete: () => console.log('Complete')
});
// Observable that does not issue anything and does not complete
const never$ = NEVER;
never$.subscribe({
next: () => console.log('This is not displayed'),
complete: () => console.log('This is also not displayed')
});
// Output:
// Completes immediately
// Error: Error occurredIMPORTANT
Mainly for control, verification, and learning purposes
EMPTY,NEVER, andthrowError()are used for flow control, exception handling validation, or learning purposes, not for normal data streams.
Subject Family
Subject, BehaviorSubject, etc.
📘 RxJS Official: Subject, 📘 RxJS Official: BehaviorSubject
Observable that can issue its own value, suitable for multicast and state sharing.
import { Subject } from 'rxjs';
const subject$ = new Subject<number>();
// Use as Observer
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.subscribe(value => console.log('Observer 2:', value));
// Use as Observable
subject$.next(1);
subject$.next(2);
subject$.next(3);
// Output:
// Observer 1: 1
// Observer 2: 1
// Observer 1: 2
// Observer 2: 2
// Observer 1: 3
// Observer 2: 3IMPORTANT
Subject has the properties of both Observable and Observer. Multiple subscribers can share the same data stream (multicast).
TIP
For details on various Subject types (BehaviorSubject, ReplaySubject, AsyncSubject), see Subject and Multicast.
Callback Conversion
bindCallback()
A function that allows callback-based asynchronous functions to be treated as Observable.
import { bindCallback } from 'rxjs';
// Callback-based function (legacy style)
function asyncFunction(value: number, callback: (result: number) => void) {
setTimeout(() => callback(value * 2), 1000);
}
// Convert to Observable
const asyncFunction$ = bindCallback(asyncFunction);
const observable$ = asyncFunction$(5);
observable$.subscribe({
next: result => console.log('Result:', result),
complete: () => console.log('Complete')
});
// Output:
// Result: 10
// CompleteTIP
bindCallback() is useful for converting legacy callback-based APIs to Observable.
bindNodeCallback()
📘 RxJS Official: bindNodeCallback
A function specialized for converting callback-based functions in Node.js style (error-first callback) to Observable.
import { bindNodeCallback } from 'rxjs';
// Node.js style callback function (error-first callback)
function readFile(path: string, callback: (err: Error | null, data: string) => void) {
if (path === 'valid.txt') {
callback(null, 'file content');
} else {
callback(new Error('File not found'), '');
}
}
// Convert to Observable
const readFile$ = bindNodeCallback(readFile);
readFile$('valid.txt').subscribe({
next: data => console.log('Data:', data),
error: err => console.error('Error:', err.message),
complete: () => console.log('Complete')
});
// Output:
// Data: file content
// CompleteDifference between bindCallback() and bindNodeCallback()
Example: Target of bindCallback()
// General callback (success only)
function getData(cb: (data: string) => void) {
cb('success');
}→ Use bindCallback() for simple "return only a value" callbacks.
Example: Target of bindNodeCallback() (Node.js style)
// Error-first callback
function readFile(path: string, cb: (err: Error | null, data: string) => void) {
if (path === 'valid.txt') cb(null, 'file content');
else cb(new Error('not found'), '');
}→ If you use bindNodeCallback(), errors will be notified as Observable errors.
NOTE
How to use
- bindNodeCallback() if the first argument of the callback is "error or not"
- bindCallback() for a simple "return only a value" callback
Resource Control
using()
using() functions to associate the creation and release of resources with the Observable's lifecycle. It is useful in combination with processes that require manual cleanup, such as WebSockets, event listeners, and external resources.
import { using, interval, Subscription } from 'rxjs';
const resource$ = using(
() => new Subscription(() => console.log('Resource released')),
() => interval(1000)
);
const sub = resource$.subscribe(value => console.log('Value:', value));
// Unsubscribe after a few seconds
setTimeout(() => sub.unsubscribe(), 3500);
// Output:
// Value: 0
// Value: 1
// Value: 2
// Resource releasedIMPORTANT
using() is useful for matching the scope of a resource with the Observable's subscription. An explicit cleanup process is automatically called when unsubscribe() is done.
TIP
For detailed usage and practical examples, see using() detail page. For an overview of control functions, see Control Creation Functions.
WebSocket()
The rxjs/webSocket module of RxJS provides a webSocket() function that allows WebSocket to be treated as an Observable/Observer.
import { webSocket } from 'rxjs/webSocket';
const socket$ = webSocket('wss://echo.websocket.org');
socket$.subscribe({
next: msg => console.log('Received:', msg),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Send message (as Observer)
socket$.next('Hello WebSocket!');IMPORTANT
webSocket() is an Observable/Observer hybrid that allows two-way communication. It is useful for real-time communication because WebSocket connections, sending, and receiving can be easily handled as Observable.
Summary
There are a wide variety of ways to create Observables in RxJS, and it is important to choose the appropriate method for your application.
- If you need custom processing, use
new Observable() of(),from(),fromEvent(), etc. for handling existing data and eventsajax()orfromFetch()for HTTP communicationSubjectfamily for sharing data among multiple subscribers
By using them appropriately, you can take full advantage of the flexibility of RxJS.