Was ist RxJS?
Übersicht
RxJS (Reactive Extensions for JavaScript) ist eine Bibliothek zur Implementierung von "reaktiver Programmierung" in JavaScript.
Was ist reaktive Programmierung?
Reaktive Programmierung ist eine Methode zur Erstellung von Programmen, die automatisch aktualisiert werden, wenn sich Daten ändern. Sie ist eine Form der ereignisgesteuerten Programmierung und konzentriert sich insbesondere auf den Umgang mit asynchronen Datenströmen. Programme werden konstruiert, indem man sich auf den Datenfluss (Stream) konzentriert und darauf reagiert (Reaktion).
Mit anderen Worten: RxJS ist eine Bibliothek zur Behandlung von Ereignissen und asynchronen Datenströmen (Streams) im funktionalen Stil. Sie nutzt das Observable-Muster und bietet leistungsstarke Werkzeuge für den Umgang mit asynchronen Datenströmen.
Observable ist ein zentrales Konstrukt von RxJS, das Ereignisse oder asynchrone Datenströme (Streams) repräsentiert. Es ist die Quelle, aus der Werte "fließen", und durch Abonnieren (subscribe) können diese Werte empfangen werden. Ein Observable ist ein "Datenfluss (Stream)", der im Laufe der Zeit Werte emittiert. Durch Abonnieren (subscribe) können diese Werte empfangen werden.
TIP
Wenn Sie sich fragen "Was ist überhaupt ein Stream?", lesen Sie auch Was ist ein Stream?.
Einfaches Verwendungsbeispiel
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(event => {
console.log('Geklickt:', event);
});Grundlegende Bausteine von RxJS
Um RxJS zu beherrschen, ist es wichtig, die folgenden Kernkomponenten zu verstehen.
| Komponente | Übersicht |
|---|---|
Observable | Die Quelle eines Streams, der asynchrone oder zeitlich auftretende Daten darstellt. |
Observer[1] | Die Instanz, die Daten von einem Observable abonniert und empfängt. |
Subscription | Verwaltet das Abonnieren und Abmelden von Observables. |
Creation Functions | Eine Gruppe von Funktionen zum Erstellen und Kombinieren von Observables. |
Operator | Eine Gruppe von Funktionen zum Transformieren und Steuern von Observables. |
Subject[2] | Ein Vermittler, der sowohl Observable- als auch Observer-Eigenschaften besitzt. |
Scheduler[3] | Ein Mechanismus zur Steuerung des Ausführungszeitpunkts von Observables. |
Diese Komponenten haben jeweils eigenständige Funktionen, arbeiten aber zusammen. Beispielsweise wird ein Observable mit Creation Functions erstellt/kombiniert, mit Operatoren transformiert/gesteuert, vom Observer abonniert und der Ausführungszeitpunkt mit Scheduler gesteuert, um die gesamte Stream-Verarbeitung zu bilden.
Komponenten und Datenfluss von RxJS
※ Detaillierte Verwendung und Beispiele für jede Komponente werden in den jeweiligen Kapiteln separat erläutert.
Klassendiagramm der Komponenten
Vorteile von RxJS
| Vorteil | Inhalt |
|---|---|
| Deklarativer Code[4] | Mit map, filter etc. wird beschrieben "was erreicht werden soll", prozedurale Beschreibungen wie for-Schleifen werden vermieden |
| Vereinfachung asynchroner Verarbeitung | Vermeidung von Verschachtelungen bei Promise oder Callbacks, intuitiver Ablauf |
| Fehlerbehandlung | Einheitliche Fehlerbehandlung im Stream mit .pipe(catchError(...)) |
| Abbrechbarkeit | Unterbrechung des Streams durch Subscription.unsubscribe() möglich |
| Vielfältige Operatoren | Transformation und Komposition mit zahlreichen Operatoren wie debounceTime, mergeMap, combineLatest |
Anwendungsfälle
RxJS glänzt in allen Situationen, in denen "sich zeitlich verändernde Daten" behandelt werden. Im Folgenden werden die wichtigsten Anwendungsbereiche vorgestellt.
Echtzeit-Kommunikation & Streaming
Bei Echtzeit-Kommunikation wie WebSocket oder Server-Sent Events (SSE) ist RxJS besonders leistungsstark.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| WebSocket-Kommunikation | Chat, Benachrichtigungen, Aktienkursaktualisierungen etc. | webSocket, filter, map |
| Server-Sent Events | Push-Benachrichtigungen vom Server | fromEvent, retry |
| IoT-Sensor-Überwachung | Verarbeitung kontinuierlicher Sensordaten | debounceTime, distinctUntilChanged |
Einfaches Beispiel
import { webSocket } from 'rxjs/webSocket';
import { filter } from 'rxjs';
const socket$ = webSocket('wss://example.com/chat');
socket$.pipe(
filter(msg => msg.type === 'message')
).subscribe(msg => console.log('Neu:', msg.text));UI/Zustandsverwaltung & Formularsteuerung
Benutzereingaben und Zustandsänderungen können reaktiv behandelt werden.
Beziehung zu Frameworks
Moderne Frontend-Frameworks (Angular Signals, React hooks, Vue Composition API, Svelte Runes etc.) bieten jeweils eigene reaktive Systeme. RxJS ist als framework-unabhängige Bibliothek konzipiert und kann mit diesen kombiniert oder alternativ verwendet werden. Die Integration mit framework-spezifischen Mechanismen wird in Kapitel 15 "Integration mit Frameworks" (in Vorbereitung) ausführlich erläutert.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| Eingabeformular-Steuerung | Suchvervollständigung, Echtzeit-Validierung | debounceTime, distinctUntilChanged, switchMap |
| Verknüpfung mehrerer Formularfelder | Aktualisierung abhängiger Eingabefelder | combineLatest, withLatestFrom |
| Kommunikation zwischen Komponenten | Event-Bus oder benutzerdefinierte Zustandsverwaltung | Subject, share |
| UI-Ereignisverarbeitung | Klick, Scroll, Drag & Drop | fromEvent, takeUntil |
Einfaches Beispiel
import { fromEvent, combineLatest } from 'rxjs';
import { debounceTime, map, switchMap } from 'rxjs';
const searchInput = document.querySelector('#search') as HTMLInputElement;
const sortSelect = document.querySelector('#sort') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input').pipe(
map(e => (e.target as HTMLInputElement).value)
);
const sort$ = fromEvent(sortSelect, 'change').pipe(
map(e => (e.target as HTMLSelectElement).value)
);
combineLatest([search$, sort$]).pipe(
debounceTime(300),
switchMap(([query, order]) =>
fetch(`/api/search?q=${query}&sort=${order}`).then(r => r.json())
)
).subscribe(results => console.log(results));Offline-Unterstützung & PWA
Kann für Offline-Unterstützung und Netzwerkstatusverwaltung in Progressive Web Apps (PWA) genutzt werden.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| Netzwerkstatus-Überwachung | Online/Offline-Erkennung | fromEvent, merge |
| Wiederholung bei Offline | Automatische Neusynchronisation bei Verbindungswiederherstellung | retry, retryWhen |
| Cache-Steuerung | Integration mit Service Worker | switchMap, catchError |
Einfaches Beispiel
import { fromEvent, merge } from 'rxjs';
import { map, startWith } from 'rxjs';
const online$ = fromEvent(window, 'online').pipe(map(() => true));
const offline$ = fromEvent(window, 'offline').pipe(map(() => false));
merge(online$, offline$).pipe(
startWith(navigator.onLine)
).subscribe(isOnline => {
console.log(isOnline ? 'Online' : 'Offline');
});KI/Streaming-API
Auch für Streaming-API-Antworten wie OpenAI optimal geeignet.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| Token-schrittweise Ausgabe | Echtzeit-Anzeige von KI-Antworten | concatMap, scan |
| Streaming-Verarbeitung | Verarbeitung von Server-Sent Events | fromEvent, map |
| Backend-Integration | Nutzung in NestJS (mit RxJS standardmäßig integriert) | Verschiedene Operatoren |
HTTP-Kommunikation und Fehlerbehandlung
Elegante Behandlung asynchroner HTTP-Kommunikation.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| API-Anfragen | Kommunikation mit RESTful API | switchMap, mergeMap |
| Fehlerbehandlung | Wiederholung und Fallback | catchError, retry |
| Timeout-Steuerung | Begrenzung der Antwortzeit | timeout |
| Abbruch | Unterbrechung unnötiger Anfragen | takeUntil, unsubscribe() |
Zustandsverwaltung & Architektur
Kann auch für das Architekturdesign der gesamten Anwendung verwendet werden.
| Verwendung | Beschreibung | Hauptoperatoren |
|---|---|---|
| Zustandsverwaltungsbibliotheken | NgRx, Redux-Observable etc. | scan, share |
| Event-Flow-Management | Nutzung in DDD (Domain-Driven Design) | Subject, shareReplay |
| Datenschicht-Trennung | Clean Architecture | Verschiedene Operatoren |
TIP
Zur Unterscheidung zwischen Promise und RxJS siehe auch Unterschied zwischen Promise und RxJS.
Zusammenfassung
RxJS bietet einen leistungsstarken Ansatz für asynchrone und ereignisbasierte Programmierung. Das Konzept des Datenflusses mit Observable im Zentrum ist besonders nützlich beim Umgang mit komplexen asynchronen Verarbeitungen.
Als Implementierung wird die Subscriber-Klasse verwendet. Weitere Details finden Sie unter Unterschied zwischen Observer und Subscriber. ↩︎
Subject ist eine spezielle Entität, die sowohl als Observable, das Werte emittiert, als auch als Observer, der Werte empfängt, fungieren kann. ↩︎
Scheduler wird verwendet, um den Zeitpunkt und Kontext der asynchronen Verarbeitung zu steuern und ist auch für Debugging und Performance-Management nützlich. ↩︎
↩︎- Deklarativer Code: Code, der direkt beschreibt "welches Ergebnis gewünscht wird"
- Prozeduraler Code: Code, der beschreibt "welche Berechnungen durchgeführt werden müssen, um das gewünschte Ergebnis zu erhalten"