Skip to content

Was ist RxJS?

Übersicht

RxJS (Reactive Extensions for JavaScript) ist eine Bibliothek zur Implementierung von "reaktiver Programmierung" in JavaScript.

Was ist reaktive Programmierung?

Reaktive Programmierung ist eine Methode zur Erstellung von Programmen, die automatisch aktualisiert werden, wenn sich Daten ändern. Sie ist eine Form der ereignisgesteuerten Programmierung und konzentriert sich insbesondere auf den Umgang mit asynchronen Datenströmen. Programme werden konstruiert, indem man sich auf den Datenfluss (Stream) konzentriert und darauf reagiert (Reaktion).

Mit anderen Worten: RxJS ist eine Bibliothek zur Behandlung von Ereignissen und asynchronen Datenströmen (Streams) im funktionalen Stil. Sie nutzt das Observable-Muster und bietet leistungsstarke Werkzeuge für den Umgang mit asynchronen Datenströmen.

Observable ist ein zentrales Konstrukt von RxJS, das Ereignisse oder asynchrone Datenströme (Streams) repräsentiert. Es ist die Quelle, aus der Werte "fließen", und durch Abonnieren (subscribe) können diese Werte empfangen werden. Ein Observable ist ein "Datenfluss (Stream)", der im Laufe der Zeit Werte emittiert. Durch Abonnieren (subscribe) können diese Werte empfangen werden.

TIP

Wenn Sie sich fragen "Was ist überhaupt ein Stream?", lesen Sie auch Was ist ein Stream?.

Einfaches Verwendungsbeispiel

ts
import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(event => {
  console.log('Geklickt:', event);
});

Grundlegende Bausteine von RxJS

Um RxJS zu beherrschen, ist es wichtig, die folgenden Kernkomponenten zu verstehen.

KomponenteÜbersicht
ObservableDie Quelle eines Streams, der asynchrone oder zeitlich auftretende Daten darstellt.
Observer[1]Die Instanz, die Daten von einem Observable abonniert und empfängt.
SubscriptionVerwaltet das Abonnieren und Abmelden von Observables.
Creation FunctionsEine Gruppe von Funktionen zum Erstellen und Kombinieren von Observables.
OperatorEine Gruppe von Funktionen zum Transformieren und Steuern von Observables.
Subject[2]Ein Vermittler, der sowohl Observable- als auch Observer-Eigenschaften besitzt.
Scheduler[3]Ein Mechanismus zur Steuerung des Ausführungszeitpunkts von Observables.

Diese Komponenten haben jeweils eigenständige Funktionen, arbeiten aber zusammen. Beispielsweise wird ein Observable mit Creation Functions erstellt/kombiniert, mit Operatoren transformiert/gesteuert, vom Observer abonniert und der Ausführungszeitpunkt mit Scheduler gesteuert, um die gesamte Stream-Verarbeitung zu bilden.

Komponenten und Datenfluss von RxJS

※ Detaillierte Verwendung und Beispiele für jede Komponente werden in den jeweiligen Kapiteln separat erläutert.

Klassendiagramm der Komponenten

Vorteile von RxJS

VorteilInhalt
Deklarativer Code[4]Mit map, filter etc. wird beschrieben "was erreicht werden soll", prozedurale Beschreibungen wie for-Schleifen werden vermieden
Vereinfachung asynchroner VerarbeitungVermeidung von Verschachtelungen bei Promise oder Callbacks, intuitiver Ablauf
FehlerbehandlungEinheitliche Fehlerbehandlung im Stream mit .pipe(catchError(...))
AbbrechbarkeitUnterbrechung des Streams durch Subscription.unsubscribe() möglich
Vielfältige OperatorenTransformation und Komposition mit zahlreichen Operatoren wie debounceTime, mergeMap, combineLatest

Anwendungsfälle

RxJS glänzt in allen Situationen, in denen "sich zeitlich verändernde Daten" behandelt werden. Im Folgenden werden die wichtigsten Anwendungsbereiche vorgestellt.

Echtzeit-Kommunikation & Streaming

Bei Echtzeit-Kommunikation wie WebSocket oder Server-Sent Events (SSE) ist RxJS besonders leistungsstark.

VerwendungBeschreibungHauptoperatoren
WebSocket-KommunikationChat, Benachrichtigungen, Aktienkursaktualisierungen etc.webSocket, filter, map
Server-Sent EventsPush-Benachrichtigungen vom ServerfromEvent, retry
IoT-Sensor-ÜberwachungVerarbeitung kontinuierlicher SensordatendebounceTime, distinctUntilChanged

Einfaches Beispiel

ts
import { webSocket } from 'rxjs/webSocket';
import { filter } from 'rxjs';

const socket$ = webSocket('wss://example.com/chat');

socket$.pipe(
  filter(msg => msg.type === 'message')
).subscribe(msg => console.log('Neu:', msg.text));

UI/Zustandsverwaltung & Formularsteuerung

Benutzereingaben und Zustandsänderungen können reaktiv behandelt werden.

Beziehung zu Frameworks

Moderne Frontend-Frameworks (Angular Signals, React hooks, Vue Composition API, Svelte Runes etc.) bieten jeweils eigene reaktive Systeme. RxJS ist als framework-unabhängige Bibliothek konzipiert und kann mit diesen kombiniert oder alternativ verwendet werden. Die Integration mit framework-spezifischen Mechanismen wird in Kapitel 15 "Integration mit Frameworks" (in Vorbereitung) ausführlich erläutert.

VerwendungBeschreibungHauptoperatoren
Eingabeformular-SteuerungSuchvervollständigung, Echtzeit-ValidierungdebounceTime, distinctUntilChanged, switchMap
Verknüpfung mehrerer FormularfelderAktualisierung abhängiger EingabefeldercombineLatest, withLatestFrom
Kommunikation zwischen KomponentenEvent-Bus oder benutzerdefinierte ZustandsverwaltungSubject, share
UI-EreignisverarbeitungKlick, Scroll, Drag & DropfromEvent, takeUntil

Einfaches Beispiel

ts
import { fromEvent, combineLatest } from 'rxjs';
import { debounceTime, map, switchMap } from 'rxjs';

const searchInput = document.querySelector('#search') as HTMLInputElement;
const sortSelect = document.querySelector('#sort') as HTMLInputElement;

const search$ = fromEvent(searchInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value)
);

const sort$ = fromEvent(sortSelect, 'change').pipe(
  map(e => (e.target as HTMLSelectElement).value)
);

combineLatest([search$, sort$]).pipe(
  debounceTime(300),
  switchMap(([query, order]) =>
    fetch(`/api/search?q=${query}&sort=${order}`).then(r => r.json())
  )
).subscribe(results => console.log(results));

Offline-Unterstützung & PWA

Kann für Offline-Unterstützung und Netzwerkstatusverwaltung in Progressive Web Apps (PWA) genutzt werden.

VerwendungBeschreibungHauptoperatoren
Netzwerkstatus-ÜberwachungOnline/Offline-ErkennungfromEvent, merge
Wiederholung bei OfflineAutomatische Neusynchronisation bei Verbindungswiederherstellungretry, retryWhen
Cache-SteuerungIntegration mit Service WorkerswitchMap, catchError

Einfaches Beispiel

ts
import { fromEvent, merge } from 'rxjs';
import { map, startWith } from 'rxjs';

const online$ = fromEvent(window, 'online').pipe(map(() => true));
const offline$ = fromEvent(window, 'offline').pipe(map(() => false));

merge(online$, offline$).pipe(
  startWith(navigator.onLine)
).subscribe(isOnline => {
  console.log(isOnline ? 'Online' : 'Offline');
});

KI/Streaming-API

Auch für Streaming-API-Antworten wie OpenAI optimal geeignet.

VerwendungBeschreibungHauptoperatoren
Token-schrittweise AusgabeEchtzeit-Anzeige von KI-AntwortenconcatMap, scan
Streaming-VerarbeitungVerarbeitung von Server-Sent EventsfromEvent, map
Backend-IntegrationNutzung in NestJS (mit RxJS standardmäßig integriert)Verschiedene Operatoren

HTTP-Kommunikation und Fehlerbehandlung

Elegante Behandlung asynchroner HTTP-Kommunikation.

VerwendungBeschreibungHauptoperatoren
API-AnfragenKommunikation mit RESTful APIswitchMap, mergeMap
FehlerbehandlungWiederholung und FallbackcatchError, retry
Timeout-SteuerungBegrenzung der Antwortzeittimeout
AbbruchUnterbrechung unnötiger AnfragentakeUntil, unsubscribe()

Zustandsverwaltung & Architektur

Kann auch für das Architekturdesign der gesamten Anwendung verwendet werden.

VerwendungBeschreibungHauptoperatoren
ZustandsverwaltungsbibliothekenNgRx, Redux-Observable etc.scan, share
Event-Flow-ManagementNutzung in DDD (Domain-Driven Design)Subject, shareReplay
Datenschicht-TrennungClean ArchitectureVerschiedene Operatoren

TIP

Zur Unterscheidung zwischen Promise und RxJS siehe auch Unterschied zwischen Promise und RxJS.

Zusammenfassung

RxJS bietet einen leistungsstarken Ansatz für asynchrone und ereignisbasierte Programmierung. Das Konzept des Datenflusses mit Observable im Zentrum ist besonders nützlich beim Umgang mit komplexen asynchronen Verarbeitungen.


  1. Als Implementierung wird die Subscriber-Klasse verwendet. Weitere Details finden Sie unter Unterschied zwischen Observer und Subscriber. ↩︎

  2. Subject ist eine spezielle Entität, die sowohl als Observable, das Werte emittiert, als auch als Observer, der Werte empfängt, fungieren kann. ↩︎

  3. Scheduler wird verwendet, um den Zeitpunkt und Kontext der asynchronen Verarbeitung zu steuern und ist auch für Debugging und Performance-Management nützlich. ↩︎

    • Deklarativer Code: Code, der direkt beschreibt "welches Ergebnis gewünscht wird"
    • Prozeduraler Code: Code, der beschreibt "welche Berechnungen durchgeführt werden müssen, um das gewünschte Ergebnis zu erhalten"
    ↩︎

Veröffentlicht unter CC-BY-4.0-Lizenz.