Cómo Crear un Observable
Un Observable define un "flujo de datos", y hay una amplia variedad de formas de crear uno. RxJS proporciona una variedad de medios para crear Observables personalizados o generar fácilmente Observables desde eventos, arrays, respuestas HTTP, etc.
Esta sección proporciona una descripción completa de cómo crear Observables en RxJS, desde la sintaxis básica hasta las aplicaciones prácticas.
Clasificación de Métodos de Creación de Observable
La siguiente es una lista de los principales métodos de creación por categoría.
| Categoría | Métodos Principales | Descripción |
|---|---|---|
| Creación Personalizada | new Observable() | Alta flexibilidad pero requiere más código. Limpieza manual requerida |
| Creation Functions | of(), from(), fromEvent(), interval(), timer(), ajax(), fromFetch(), scheduled() | Funciones de generación basadas en datos, eventos y tiempo comúnmente utilizadas |
| Special Creation Functions | defer(), range(), generate(), iif() | Orientadas a control, generación orientada a bucles, cambio condicional, etc. |
| Special Observables | EMPTY, NEVER, throwError() | Para finalización, sin acción, y emisión de errores |
| Familia Subject | Subject, BehaviorSubject | Observable especial que funciona como observador y emisor |
| Conversión de Callback | bindCallback(), bindNodeCallback() | Convertir funciones basadas en callback a Observable |
| Control de Recursos | using() | Realizar control de recursos al mismo tiempo que se suscribe a Observable |
| WebSocket | webSocket() | Manejar comunicación WebSocket como Observable bidireccional |
Creación Personalizada
new Observable()
El método más básico es usar el constructor Observable directamente. Este método es más flexible cuando se desea definir lógica Observable personalizada. Es posible un control de comportamiento de grano fino a través de llamadas explícitas a next, error y complete.
import { Observable } from 'rxjs';
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
observable$.subscribe({
next: value => console.log('Valor:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida:
// Valor: 1
// Valor: 2
// Valor: 3
// Valor: 4
// CompletoCAUTION
Si usa new Observable(), debe escribir la liberación explícita de recursos (proceso de limpieza) usted mismo.
const obs$ = new Observable(subscriber => {
const id = setInterval(() => subscriber.next(Date.now()), 1000);
return () => {
clearInterval(id); // Liberación explícita de recursos
};
});Por otro lado, las creation functions integradas de RxJS como fromEvent() e interval() tienen procesos de limpieza apropiados internamente.
const click$ = fromEvent(document, 'click');
const timer$ = interval(1000);Usan addEventListener o setInterval internamente y están diseñadas para que RxJS llame automáticamente a removeEventListener o clearInterval() cuando se hace unsubscribe().
Tenga en cuenta que incluso si el proceso de limpieza está implementado dentro de RxJS, ese proceso no se ejecutará a menos que se llame a unsubscribe().
const subscription = observable$.subscribe({
// Omitido...
});
subscription.unsubscribe(); // 👈- No importa qué método use para crear un Observable, asegúrese de adquirir el hábito de
unsubscribe()cuando ya no lo necesite. - Olvidar cancelar la suscripción mantendrá los escuchadores de eventos y temporizadores en ejecución, causando fugas de memoria y efectos secundarios inesperados.
Creation Functions
Para una creación de Observable más concisa y específica de la aplicación, RxJS proporciona "Creation Functions". Estas se pueden usar para simplificar el código para casos de uso repetidos.
NOTE
En la documentación oficial de RxJS, estas se categorizan como "Creation Functions". Anteriormente (RxJS 5.x ~ 6) se llamaban "creation operators", pero desde RxJS 7, "Creation Functions" es el término oficial.
of()
La Creation Function de Observable más simple que emite múltiples valores uno a la vez en secuencia.
import { of } from 'rxjs';
const values$ = of(1, 2, 3, 4, 5);
values$.subscribe({
next: value => console.log('Valor:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida: Valor: 1, Valor: 2, Valor: 3, Valor: 4, Valor: 5, CompletoIMPORTANT
Diferencia entre of() y from()
of([1, 2, 3])→ emite un solo array.from([1, 2, 3])→ emite valores individuales1,2,3en secuencia.
Tenga en cuenta que esto a menudo se confunde.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de of().
from()
Genera un Observable a partir de una estructura de datos existente como un array, Promise o iterable.
import { from } from 'rxjs';
// Crear desde array
const array$ = from([1, 2, 3]);
array$.subscribe({
next: value => console.log('Valor del array:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Crear desde Promise
const promise$ = from(Promise.resolve('Resultado de Promise'));
promise$.subscribe({
next: value => console.log('Resultado de Promise:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Crear desde iterable
const iterable$ = from(new Set([1, 2, 3]));
iterable$.subscribe({
next: value => console.log('Valor iterable:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida:
// Valor del array: 1
// Valor del array: 2
// Valor del array: 3
// Completo
// Valor iterable: 1
// Valor iterable: 2
// Valor iterable: 3
// Completo
// Resultado de Promise: Resultado de Promise
// CompletoTIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de from().
fromEvent()
Función para manejar fuentes de eventos como eventos DOM como un Observable.
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe({
next: event => console.log('Evento de clic:', event),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida:
// Evento de clic: PointerEvent {isTrusted: true, pointerId: 1, width: 1, height: 1, pressure: 0, …}CAUTION
Tenga en cuenta los objetivos de eventos admitidos
fromEvent()admite elementos DOM del navegador (implementación de EventTarget), EventEmitter de Node.js y objetivos de eventos similares a jQuery.- Múltiples suscripciones pueden agregar múltiples escuchadores de eventos.
👉 Para ejemplos más detallados de utilización de flujos de eventos, consulte Flujo de Eventos.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de fromEvent().
interval(), timer()
📘 RxJS Oficial: interval, 📘 RxJS Oficial: timer
Esta función se usa cuando se desea emitir valores continuamente a intervalos regulares o cuando se necesita control de tiempo.
import { interval, timer } from 'rxjs';
// Emitir valores cada segundo
const interval$ = interval(1000);
interval$.subscribe({
next: value => console.log('Intervalo:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Iniciar después de 3 segundos, luego emitir valores cada segundo
const timer$ = timer(3000, 1000);
timer$.subscribe({
next: value => console.log('Temporizador:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida:
// Intervalo: 0
// Intervalo: 1
// Intervalo: 2
// Temporizador: 0
// Intervalo: 3
// Temporizador: 1
// Intervalo: 4
// Temporizador: 2
// .
// .interval() y timer() se usan frecuentemente para procesamiento controlado por tiempo, especialmente adecuados para animación, sondeo y retrasos de eventos asíncronos.
CAUTION
Tenga en cuenta que Cold Observable
interval()ytimer()son Cold Observable y se ejecutan independientemente para cada suscripción.- Puede considerar hacerlos Hot con
share()u otros métodos si es necesario.
Para detalles, consulte la sección "Cold Observable y Hot Observable".
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de interval() y página de detalle de timer().
ajax()
Función para manejo asíncrono de resultados de comunicación HTTP como Observable.
import { ajax } from 'rxjs/ajax';
const api$ = ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1');
api$.subscribe({
next: response => console.log('Respuesta de API:', response),
error: error => console.error('Error de API:', error),
complete: () => console.log('API completa')
});
// Salida:
// Respuesta de API: {userId: 1, id: 1, title: 'delectus aut autem', completed: false}
// API completaNOTE
RxJS ajax usa XMLHttpRequest internamente. Por otro lado, RxJS también tiene un operador llamado fromFetch, que usa la API Fetch para hacer solicitudes HTTP.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de ajax(). Para una descripción general de las funciones de comunicación HTTP, consulte Creation Functions de Comunicación HTTP.
fromFetch()
fromFetch() envuelve la API Fetch y le permite tratar solicitudes HTTP como Observables. Es similar a ajax(), pero más moderno y ligero.
import { fromFetch } from 'rxjs/fetch';
import { switchMap } from 'rxjs';
const api$ = fromFetch('https://jsonplaceholder.typicode.com/todos/1');
api$.pipe(
switchMap(response => response.json())
).subscribe({
next: data => console.log('Datos:', data),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Salida:
// Datos: {completed: false, id: 1, title: "delectus aut autem", userId: 1}
// CompletoNOTE
Debido a que fromFetch() usa la API Fetch, a diferencia de ajax(), la inicialización de configuración de solicitudes y la conversión .json() de respuestas deben hacerse manualmente. También se requiere un manejo de errores apropiado y verificación de estado HTTP.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de fromFetch(). Para una descripción general de las funciones de comunicación HTTP, consulte Creation Functions de Comunicación HTTP.
scheduled()
scheduled() es una función que le permite especificar explícitamente un scheduler para funciones publicadas como of() y from(). Use esta función cuando desee controlar el tiempo de ejecución sincrónica o asíncrona en detalle.
import { scheduled, asyncScheduler } from 'rxjs';
const observable$ = scheduled([1, 2, 3], asyncScheduler);
observable$.subscribe({
next: val => console.log('Valor:', val),
complete: () => console.log('Completo')
});
// La ejecución es asíncrona
// Salida:
// Valor: 1
// Valor: 2
// Valor: 3
// CompletoNOTE
scheduled() permite que las funciones sincrónicas existentes (ej. of(), from()) funcionen de manera asíncrona. Esto es útil para pruebas y optimización de rendimiento de UI donde se requiere control de procesamiento asíncrono.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de scheduled(). Para una descripción general de funciones de control, consulte Creation Functions de Control.
defer()
Se usa cuando se desea retrasar la generación de un Observable hasta el momento de la suscripción.
import { defer, of } from 'rxjs';
const random$ = defer(() => of(Math.random()));
random$.subscribe(value => console.log('1ro:', value));
random$.subscribe(value => console.log('2do:', value));
// Salida:
// 1ro: 0.123456789
// 2do: 0.987654321NOTE
defer() es útil cuando se desea crear un nuevo Observable en cada suscripción. Se puede lograr evaluación perezosa.
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de defer().
range()
Genera un valor entero continuo en el rango especificado como un Observable.
import { range } from 'rxjs';
const numbers$ = range(1, 5);
numbers$.subscribe({
next: value => console.log('Número:', value),
complete: () => console.log('Completo')
});
// Salida:
// Número: 1
// Número: 2
// Número: 3
// Número: 4
// Número: 5
// CompletoTIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de range().
generate()
Genera Observable como una estructura de bucle. Permite un control fino sobre valores iniciales, condiciones, aumentos/disminuciones y salida de valores.
import { generate } from 'rxjs';
const fibonacci$ = generate({
initialState: [0, 1],
condition: ([, b]) => b < 100,
iterate: ([a, b]) => [b, a + b],
resultSelector: ([a]) => a
});
fibonacci$.subscribe({
next: value => console.log('Fibonacci:', value),
complete: () => console.log('Completo')
});
// Salida:
// Fibonacci: 0
// Fibonacci: 1
// Fibonacci: 1
// Fibonacci: 2
// Fibonacci: 3
// Fibonacci: 5
// Fibonacci: 8
// Fibonacci: 13
// Fibonacci: 21
// Fibonacci: 34
// Fibonacci: 55
// Fibonacci: 89
// CompletoTIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de generate().
iif()
Use esta función cuando desee cambiar Observable por ramificación condicional.
import { iif, of, EMPTY } from 'rxjs';
const condition = true;
const iif$ = iif(() => condition, of('La condición es verdadera'), EMPTY);
iif$.subscribe({
next: val => console.log('iif:', val),
complete: () => console.log('Completo')
});
// Salida:
// iif: La condición es verdadera
// CompletoNOTE
iif() puede cambiar dinámicamente el Observable a devolver dependiendo de las condiciones. Esto es útil para el control de flujo.
Special Observables
EMPTY, NEVER, throwError()
📘 RxJS Oficial: EMPTY, 📘 RxJS Oficial: NEVER, 📘 RxJS Oficial: throwError
RxJS también proporciona Observables especiales que son útiles para el control de ejecución, manejo de excepciones y aprendizaje.
import { EMPTY, throwError, NEVER } from 'rxjs';
// Observable que se completa inmediatamente
const empty$ = EMPTY;
empty$.subscribe({
next: () => console.log('Esto no se muestra'),
complete: () => console.log('Se completa inmediatamente')
});
// Observable que emite un error
const error$ = throwError(() => new Error('Error ocurrido'));
error$.subscribe({
next: () => console.log('Esto no se muestra'),
error: err => console.error('Error:', err.message),
complete: () => console.log('Completo')
});
// Observable que no emite nada y no se completa
const never$ = NEVER;
never$.subscribe({
next: () => console.log('Esto no se muestra'),
complete: () => console.log('Esto tampoco se muestra')
});
// Salida:
// Se completa inmediatamente
// Error: Error ocurridoIMPORTANT
Principalmente para propósitos de control, verificación y aprendizaje
EMPTY,NEVERythrowError()se usan para control de flujo, validación de manejo de excepciones, o propósitos de aprendizaje, no para flujos de datos normales.
Familia Subject
Subject, BehaviorSubject, etc.
📘 RxJS Oficial: Subject, 📘 RxJS Oficial: BehaviorSubject
Observable que puede emitir su propio valor, adecuado para multicast y compartir estado.
import { Subject } from 'rxjs';
const subject$ = new Subject<number>();
// Usar como Observer
subject$.subscribe(value => console.log('Observador 1:', value));
subject$.subscribe(value => console.log('Observador 2:', value));
// Usar como Observable
subject$.next(1);
subject$.next(2);
subject$.next(3);
// Salida:
// Observador 1: 1
// Observador 2: 1
// Observador 1: 2
// Observador 2: 2
// Observador 1: 3
// Observador 2: 3IMPORTANT
Subject tiene las propiedades tanto de Observable como de Observer. Múltiples suscriptores pueden compartir el mismo flujo de datos (multicast).
TIP
Para detalles sobre varios tipos de Subject (BehaviorSubject, ReplaySubject, AsyncSubject), consulte Subject y Multicast.
Conversión de Callback
bindCallback()
Una función que permite que las funciones asíncronas basadas en callback se traten como Observable.
import { bindCallback } from 'rxjs';
// Función basada en callback (estilo legado)
function asyncFunction(value: number, callback: (result: number) => void) {
setTimeout(() => callback(value * 2), 1000);
}
// Convertir a Observable
const asyncFunction$ = bindCallback(asyncFunction);
const observable$ = asyncFunction$(5);
observable$.subscribe({
next: result => console.log('Resultado:', result),
complete: () => console.log('Completo')
});
// Salida:
// Resultado: 10
// CompletoTIP
bindCallback() es útil para convertir APIs basadas en callback heredadas a Observable.
bindNodeCallback()
📘 RxJS Oficial: bindNodeCallback
Una función especializada para convertir funciones basadas en callback en estilo Node.js (error-first callback) a Observable.
import { bindNodeCallback } from 'rxjs';
// Función de callback estilo Node.js (error-first callback)
function readFile(path: string, callback: (err: Error | null, data: string) => void) {
if (path === 'valid.txt') {
callback(null, 'contenido del archivo');
} else {
callback(new Error('Archivo no encontrado'), '');
}
}
// Convertir a Observable
const readFile$ = bindNodeCallback(readFile);
readFile$('valid.txt').subscribe({
next: data => console.log('Datos:', data),
error: err => console.error('Error:', err.message),
complete: () => console.log('Completo')
});
// Salida:
// Datos: contenido del archivo
// CompletoDiferencia entre bindCallback() y bindNodeCallback()
Ejemplo: Objetivo de bindCallback()
// Callback general (solo éxito)
function getData(cb: (data: string) => void) {
cb('éxito');
}→ Use bindCallback() para callbacks simples de "devolver solo un valor".
Ejemplo: Objetivo de bindNodeCallback() (estilo Node.js)
// Error-first callback
function readFile(path: string, cb: (err: Error | null, data: string) => void) {
if (path === 'valid.txt') cb(null, 'contenido del archivo');
else cb(new Error('no encontrado'), '');
}→ Si usa bindNodeCallback(), los errores se notificarán como errores de Observable.
NOTE
Cómo usar
- bindNodeCallback() si el primer argumento del callback es "error o no"
- bindCallback() para un callback simple de "devolver solo un valor"
Control de Recursos
using()
using() funciona para asociar la creación y liberación de recursos con el ciclo de vida del Observable. Es útil en combinación con procesos que requieren limpieza manual, como WebSockets, escuchadores de eventos y recursos externos.
import { using, interval, Subscription } from 'rxjs';
const resource$ = using(
() => new Subscription(() => console.log('Recurso liberado')),
() => interval(1000)
);
const sub = resource$.subscribe(value => console.log('Valor:', value));
// Cancelar suscripción después de unos segundos
setTimeout(() => sub.unsubscribe(), 3500);
// Salida:
// Valor: 0
// Valor: 1
// Valor: 2
// Recurso liberadoIMPORTANT
using() es útil para hacer coincidir el alcance de un recurso con la suscripción del Observable. Se llama automáticamente un proceso de limpieza explícito cuando se hace unsubscribe().
TIP
Para uso detallado y ejemplos prácticos, consulte página de detalle de using(). Para una descripción general de funciones de control, consulte Creation Functions de Control.
WebSocket()
El módulo rxjs/webSocket de RxJS proporciona una función webSocket() que permite que WebSocket se trate como un Observable/Observer.
import { webSocket } from 'rxjs/webSocket';
const socket$ = webSocket('wss://echo.websocket.org');
socket$.subscribe({
next: msg => console.log('Recibido:', msg),
error: err => console.error('Error:', err),
complete: () => console.log('Completo')
});
// Enviar mensaje (como Observer)
socket$.next('¡Hola WebSocket!');IMPORTANT
webSocket() es un híbrido Observable/Observer que permite comunicación bidireccional. Es útil para comunicación en tiempo real porque las conexiones WebSocket, envío y recepción se pueden manejar fácilmente como Observable.
Resumen
Hay una amplia variedad de formas de crear Observables en RxJS, y es importante elegir el método apropiado para su aplicación.
- Si necesita procesamiento personalizado, use
new Observable() of(),from(),fromEvent(), etc. para manejar datos y eventos existentesajax()ofromFetch()para comunicación HTTP- Familia
Subjectpara compartir datos entre múltiples suscriptores
Al usarlos apropiadamente, puede aprovechar al máximo la flexibilidad de RxJS.