Skip to content

Was ist die RxJS-Pipeline

Die RxJS-Pipeline ist ein Mechanismus, der eine Reihe von Operationen (Operatoren) nacheinander auf ein Observable anwendet. Durch die Verwendung von Pipelines können Datenströme in mehreren Schritten transformiert, gefiltert und kombiniert werden, wodurch der Datenfluss in einem deklarativen Programmierstil gesteuert werden kann.

Grundstruktur der Pipeline

📘 RxJS Official: pipe()

Mit der pipe()-Methode von RxJS wird eine Pipeline aufgebaut. Die Syntax sieht wie folgt aus:

ts
import { Observable } from 'rxjs';
import { map, filter, tap } from 'rxjs';

const source$: Observable<number> = // Ein Observable
source$.pipe(
  // Mehrere Operatoren verketten
  operator1(),
  operator2(),
  operator3(),
  // ...
).subscribe(value => {
  // Ergebnis verarbeiten
});

Praktische Beispiele

Grundlegende Datentransformation

ts
import { of } from 'rxjs';
import { map, filter } from 'rxjs';

// Stream von Zahlen
const numbers$ = of(1, 2, 3, 4, 5);

// Pipeline aufbauen
numbers$.pipe(
  // Nur gerade Zahlen durchlassen
  filter(n => n % 2 === 0),
  // Werte verdoppeln
  map(n => n * 2)
).subscribe(
  value => console.log(`Ergebnis: ${value}`)
);

// Ausgabe:
// Ergebnis: 4
// Ergebnis: 8

Komplexe Datenverarbeitung

ts
import { fromEvent, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

type User = {
  id: number;
  name: string;
  username: string;
  email: string;
};
type Post = {
  userId: number;
  id: number;
  title: string;
  body: string;
};

// DOM-Elemente erstellen
const searchButton = document.createElement('button');
searchButton.innerText = 'Suchen';
document.body.appendChild(searchButton);

const resultBox = document.createElement('div');
resultBox.id = 'results';
document.body.appendChild(resultBox);

// API-Anfrage bei Button-Klick
fromEvent(searchButton, 'click')
  .pipe(
    switchMap(() =>
      // Erster API-Aufruf
      ajax.getJSON<User>('https://jsonplaceholder.typicode.com/users/1').pipe(
        // Zweiter API-Aufruf zum Abrufen der Beiträge des Benutzers
        switchMap((user) => {
          const header = document.createElement('h3');
          header.textContent = `Benutzer: ${user.name}`;
          resultBox.innerHTML = ''; // Vorherige Ergebnisse löschen
          resultBox.appendChild(header);

          return ajax.getJSON<Post[]>(
            `https://jsonplaceholder.typicode.com/posts?userId=${user.id}`
          );
        }),
        // Nur die ersten 3 Beiträge abrufen
        map((posts) => posts.slice(0, 3))
      )
    )
  )
  .subscribe((posts) => {
    // Beiträge auf dem Bildschirm anzeigen
    resultBox.innerHTML += '<h4>Beitragsliste:</h4>';
    posts.forEach((post) => {
      const div = document.createElement('div');
      div.innerHTML = `<strong>${post.title}</strong><p>${post.body}</p>`;
      resultBox.appendChild(div);
    });
  });

Vorteile der Pipeline

Schauen wir uns zunächst imperativ geschriebenen Code an. Wie im Folgenden gezeigt, kann man mit RxJS-Pipelines den Code in eine besser lesbare und wartbare Form umschreiben und dabei die Absicht der Verarbeitung klarer machen.

1. Verbesserung von Lesbarkeit und Wartbarkeit

ts
// Imperative Verarbeitung
const data = [
  { id: 3, active: true },
  { id: 1, active: false },
  { id: 2, active: true }
];

const activeItems = [];
for (const item of data) {
  if (item.active) {
    activeItems.push({ ...item, label: `Item #${item.id}` });
  }
}
activeItems.sort((a, b) => a.id - b.id);

const div1 = document.createElement('div');
div1.innerHTML = '<h3>Imperativer Stil</h3>';
activeItems.forEach(item => {
  const p = document.createElement('p');
  p.textContent = item.label;
  div1.appendChild(p);
});
document.body.appendChild(div1);

⬇️⬇️⬇️

ts
import { of } from 'rxjs';
import { filter, map, toArray } from 'rxjs';

const output = document.createElement('div');
output.innerHTML = '<h3>Verbesserung von Lesbarkeit und Wartbarkeit</h3>';
document.body.appendChild(output);

of(
  { id: 3, active: true },
  { id: 1, active: false },
  { id: 2, active: true }
).pipe(
  filter(item => item.active),
  map(item => ({ ...item, label: `Item #${item.id}` })),
  toArray(),
  map(array => array.sort((a, b) => a.id - b.id))
).subscribe(sorted => {
  sorted.forEach(item => {
    const div = document.createElement('div');
    div.textContent = item.label;
    output.appendChild(div);
  });
});

Mit Pipelines wird der Datenfluss klar und die Neuzuweisung von Variablen sowie die Verwaltung von Zwischenzuständen wird unnötig.

Auch prozeduraler Code kann durch die Verwendung von RxJS-Pipelines in einem deklarativen Stil prägnant geschrieben werden. Im Folgenden finden Sie ein Beispiel dafür.

2. Deklarativer Programmierstil

Pipelines fördern einen deklarativen Stil, der explizit zeigt, "was zu tun ist". Dadurch wird die Absicht des Codes klarer.

ts
// Prozedurale Verarbeitung
const usersList = [
  { status: 'active', firstName: 'Taro', lastName: 'Yamada', email: 'taro@example.com' },
  { status: 'inactive', firstName: 'Hanako', lastName: 'Yamada', email: 'hanako@example.com' },
  { status: 'active', firstName: 'John', lastName: 'Doe', email: 'john@example.com' }
];

const activeUsers2 = [];
for (const user of usersList) {
  if (user.status === 'active') {
    const name = `${user.firstName} ${user.lastName}`;
    activeUsers2.push({ name, email: user.email });
  }
}

const div2 = document.createElement('div');
div2.innerHTML = '<h3>Prozeduraler Stil</h3>';
activeUsers2.forEach(user => {
  const p = document.createElement('p');
  p.textContent = `${user.name} (${user.email})`;
  div2.appendChild(p);
});
document.body.appendChild(div2);

⬇️⬇️⬇️

ts
// Deklarativer Programmierstil
import { from } from 'rxjs';
import { filter, map } from 'rxjs';

const out2 = document.createElement('div');
out2.innerHTML = '<h3>Deklarativer Stil</h3>';
document.body.appendChild(out2);

const users = [
  { status: 'active', firstName: 'Taro', lastName: 'Yamada', email: 'taro@example.com' },
  { status: 'inactive', firstName: 'Hanako', lastName: 'Yamada', email: 'hanako@example.com' },
  { status: 'active', firstName: 'John', lastName: 'Doe', email: 'john@example.com' }
];

from(users).pipe(
  filter(user => user.status === 'active'),
  map(user => ({
    name: `${user.firstName} ${user.lastName}`,
    email: user.email
  }))
).subscribe(user => {
  const div = document.createElement('div');
  div.textContent = `${user.name} (${user.email})`;
  out2.appendChild(div);
});

Lassen Sie uns auch hier prozedural geschriebenen Code mit einer Pipeline umstrukturieren. Komplexe Verarbeitung kann einfach durch Komposition einzelner Operatoren aufgebaut werden.

3. Kompositionsfähigkeit

Mit Pipelines können komplexe Verarbeitungen durch Kombination kleiner Operationen aufgebaut werden.

ts
// Prozedurale (imperative) Verarbeitung
const rawUsers = [
  { firstName: 'Alice', lastName: 'Smith', status: 'active' },
  { firstName: 'Bob', lastName: 'Brown', status: 'inactive' },
  { firstName: 'Carol', lastName: 'Jones', status: 'active' }
];

const activeUsers = [];
for (const user of rawUsers) {
  if (user.status === 'active') {
    const fullName = `${user.firstName} ${user.lastName}`;
    activeUsers.push({ ...user, fullName });
  }
}
activeUsers.sort((a, b) => a.fullName.localeCompare(b.fullName));

const div0 = document.createElement('div');
div0.innerHTML = '<h3>Prozeduraler Stil</h3>';
activeUsers.forEach(user => {
  const p = document.createElement('p');
  p.textContent = user.fullName;
  div0.appendChild(p);
});
document.body.appendChild(div0);

⬇️⬇️⬇️

ts
// Deklarativer Programmierstil
import { from } from 'rxjs';
import { filter, map, toArray } from 'rxjs';

const out3 = document.createElement('div');
out3.innerHTML = '<h3>Kompositionsfähigkeit</h3>';
document.body.appendChild(out3);

const users3 = [
  { firstName: 'Alice', lastName: 'Smith', status: 'active' },
  { firstName: 'Bob', lastName: 'Brown', status: 'inactive' },
  { firstName: 'Carol', lastName: 'Jones', status: 'active' }
];

const filterActive = filter((user: any) => user.status === 'active');
const formatFullName = map((user: any) => ({ ...user, fullName: `${user.firstName} ${user.lastName}` }));
const collectAndSort = toArray();
const sortByName = map((users: any[]) => users.sort((a, b) => a.fullName.localeCompare(b.fullName)));

from(users3).pipe(
  filterActive,
  formatFullName,
  collectAndSort,
  sortByName
).subscribe(users => {
  users.forEach(user => {
    const div = document.createElement('div');
    div.textContent = user.fullName;
    out3.appendChild(div);
  });
});

Pipeline-Optimierungstechniken

1. Wichtigkeit der Operatorreihenfolge

Die Reihenfolge der Operatoren hat großen Einfluss sowohl auf die Performance als auch auf die Funktionalität.

ts
// Ineffizient: map wird auf alle Elemente angewendet
observable$.pipe(
  map(x => expensiveTransformation(x)),
  filter(x => x > 10)
)

// Effizient: filter wird zuerst ausgeführt, reduziert zu transformierende Elemente
observable$.pipe(
  filter(x => x > 10),
  map(x => expensiveTransformation(x))
)

2. Erstellung benutzerdefinierter Pipelines

Komplexe Verarbeitungen können in wiederverwendbare Pipelines extrahiert werden.

ts
import { Observable, pipe } from 'rxjs';
import { filter, map } from 'rxjs';

// Benutzerdefinierte Pipeline-Funktion
export function filterAndTransform<T, R>(
  filterFn: (value: T) => boolean,
  transformFn: (value: T) => R
) {
  return pipe(
    filter(filterFn),
    map(transformFn)
  );
}

// Anwendungsbeispiel
observable$.pipe(
  filterAndTransform(
    x => x > 10,
    x => x * 2
  )
).subscribe(console.log);

Häufige Fehler in Pipelines

1. Falsche Operatorreihenfolge

ts
// ❌ Wenn filter vor debounceTime angewendet wird,
// wird filter bei jeder Eingabe ausgeführt und die Wirkung von debounce verringert
inputEvents$.pipe(
  filter(text => text.length > 2),
  debounceTime(300)
)

// ✅ debounceTime zuerst anwenden
inputEvents$.pipe(
  debounceTime(300),
  filter(text => text.length > 2)
)

2. Seiteneffekte in der Pipeline

ts
// ❌ Seiteneffekte direkt in der Pipeline ausführen
observable$.pipe(
  map(data => {
    // Seiteneffekt (schlechtes Beispiel)
    console.log(data);
    localStorage.setItem('lastData', JSON.stringify(data));
    return data;
  })
)

// ✅ tap-Operator verwenden
observable$.pipe(
  tap(data => {
    console.log(data);
    localStorage.setItem('lastData', JSON.stringify(data));
  }),
  // Datentransformation mit map durchführen
  map(data => transformData(data))
)

Zusammenfassung

Die RxJS-Pipeline ist ein leistungsstarker Mechanismus zur Verwaltung komplexer asynchroner Datenflüsse auf deklarative und kompositionsfähige Weise. Durch angemessenes Design von Pipelines können Lesbarkeit, Wartbarkeit und Wiederverwendbarkeit des Codes erheblich verbessert werden.

Beim Design von Pipelines sollten Sie die folgenden Punkte beachten:

  1. Wählen Sie die effizienteste Operatorreihenfolge
  2. Extrahieren und wiederverwenden Sie gängige Pipeline-Muster
  3. Trennen Sie Seiteneffekte mit dem tap-Operator
  4. Stellen Sie sicher, dass jeder Schritt der Pipeline eine einzige Verantwortung hat

Dieser pipeline-orientierte Ansatz zeigt seine Stärke besonders in Szenarien wie komplexer UI-Ereignisverarbeitung, API-Anfragen und Zustandsverwaltung.

Veröffentlicht unter CC-BY-4.0-Lizenz.