Skip to content

RxJSのパイプラインとは

RxJSのパイプラインは、Observableに対して一連の操作(オペレーター)を順番に適用する仕組みです。パイプラインを使うことで、データストリームを複数の段階で変換、フィルタリング、結合することができ、宣言的なプログラミングスタイルでデータの流れをコントロールできます。

パイプラインの基本構造

RxJSのpipe()メソッドを使って、パイプラインを構築します。構文は以下のようになります。

ts
import { Observable } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';

const source$: Observable<number> = // 何らかのObservable
source$.pipe(
  // 複数のオペレーターをチェーン
  operator1(),
  operator2(),
  operator3(),
  // ...
).subscribe(value => {
  // 結果を処理
});

実用的な例

基本的なデータ変換

ts
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

// 数値のストリーム
const numbers$ = of(1, 2, 3, 4, 5);

// パイプラインを構築
numbers$.pipe(
  // 偶数のみを通過させる
  filter(n => n % 2 === 0),
  // 値を2倍にする
  map(n => n * 2)
).subscribe(
  value => console.log(`結果: ${value}`)
);

// 出力:
// 結果: 4
// 結果: 8

複雑なデータ処理

ts
import { fromEvent, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

type User = {
  id: number;
  name: string;
  username: string;
  email: string;
};
type Post = {
  userId: number;
  id: number;
  title: string;
  body: string;
};

// DOM要素を作成
const searchButton = document.createElement('button');
searchButton.innerText = '検索';
document.body.appendChild(searchButton);

const resultBox = document.createElement('div');
resultBox.id = 'results';
document.body.appendChild(resultBox);

// ボタンクリック時にAPIリクエスト
fromEvent(searchButton, 'click')
  .pipe(
    switchMap(() =>
      // 最初のAPI呼び出し
      ajax.getJSON<User>('https://jsonplaceholder.typicode.com/users/1').pipe(
        // ユーザーの投稿を取得する2つ目のAPI呼び出し
        switchMap((user) => {
          const header = document.createElement('h3');
          header.textContent = `ユーザー: ${user.name}`;
          resultBox.innerHTML = ''; // 前回の結果をクリア
          resultBox.appendChild(header);

          return ajax.getJSON<Post[]>(
            `https://jsonplaceholder.typicode.com/posts?userId=${user.id}`
          );
        }),
        // 最初の3つの投稿だけを取得
        map((posts) => posts.slice(0, 3))
      )
    )
  )
  .subscribe((posts) => {
    // 画面に投稿を表示する処理
    resultBox.innerHTML += '<h4>投稿一覧:</h4>';
    posts.forEach((post) => {
      const div = document.createElement('div');
      div.innerHTML = `<strong>${post.title}</strong><p>${post.body}</p>`;
      resultBox.appendChild(div);
    });
  });

パイプラインの利点

まずは命令的に書かれたコードを見てみましょう。次に示すように、RxJS のパイプラインを使えば、処理の意図を明確にしながら、より読みやすく保守しやすい形に書き換えることができます。

1. 可読性と保守性の向上

ts
// 命令的スタイルでの処理
const data = [
  { id: 3, active: true },
  { id: 1, active: false },
  { id: 2, active: true }
];

const activeItems = [];
for (const item of data) {
  if (item.active) {
    activeItems.push({ ...item, label: `Item #${item.id}` });
  }
}
activeItems.sort((a, b) => a.id - b.id);

const div1 = document.createElement('div');
div1.innerHTML = '<h3>命令的スタイル</h3>';
activeItems.forEach(item => {
  const p = document.createElement('p');
  p.textContent = item.label;
  div1.appendChild(p);
});
document.body.appendChild(div1);

⬇️⬇️⬇️

ts
import { of } from 'rxjs';
import { filter, map, toArray } from 'rxjs/operators';

const output = document.createElement('div');
output.innerHTML = '<h3>可読性と保守性の向上</h3>';
document.body.appendChild(output);

of(
  { id: 3, active: true },
  { id: 1, active: false },
  { id: 2, active: true }
).pipe(
  filter(item => item.active),
  map(item => ({ ...item, label: `Item #${item.id}` })),
  toArray(),
  map(array => array.sort((a, b) => a.id - b.id))
).subscribe(sorted => {
  sorted.forEach(item => {
    const div = document.createElement('div');
    div.textContent = item.label;
    output.appendChild(div);
  });
});

パイプラインを使うと、データの流れが明確になり、変数の再代入や中間状態の管理が不要になります。

上記のような手続き型のコードも、RxJS のパイプラインを使うことで、宣言的なスタイルで簡潔に記述できます。以下にその例を示します。

2. 宣言的なプログラミングスタイル

パイプラインは「何をするか」を明示的に示す宣言的なスタイルを促進します。これにより、コードの意図がより明確になります。

ts
// 手続き型スタイルでの処理
const usersList = [
  { status: 'active', firstName: 'Taro', lastName: 'Yamada', email: 'taro@example.com' },
  { status: 'inactive', firstName: 'Hanako', lastName: 'Yamada', email: 'hanako@example.com' },
  { status: 'active', firstName: 'John', lastName: 'Doe', email: 'john@example.com' }
];

const activeUsers2 = [];
for (const user of usersList) {
  if (user.status === 'active') {
    const name = `${user.firstName} ${user.lastName}`;
    activeUsers2.push({ name, email: user.email });
  }
}

const div2 = document.createElement('div');
div2.innerHTML = '<h3>手続き型スタイル</h3>';
activeUsers2.forEach(user => {
  const p = document.createElement('p');
  p.textContent = `${user.name} (${user.email})`;
  div2.appendChild(p);
});
document.body.appendChild(div2);

⬇️⬇️⬇️

ts
// 宣言的なプログラミングスタイル
import { from } from 'rxjs';
import { filter, map } from 'rxjs/operators';

const out2 = document.createElement('div');
out2.innerHTML = '<h3>宣言的なスタイル</h3>';
document.body.appendChild(out2);

const users = [
  { status: 'active', firstName: 'Taro', lastName: 'Yamada', email: 'taro@example.com' },
  { status: 'inactive', firstName: 'Hanako', lastName: 'Yamada', email: 'hanako@example.com' },
  { status: 'active', firstName: 'John', lastName: 'Doe', email: 'john@example.com' }
];

from(users).pipe(
  filter(user => user.status === 'active'),
  map(user => ({
    name: `${user.firstName} ${user.lastName}`,
    email: user.email
  }))
).subscribe(user => {
  const div = document.createElement('div');
  div.textContent = `${user.name} (${user.email})`;
  out2.appendChild(div);
});

こちらも同様に、手続き的に処理を記述していたコードを、パイプラインで再構成してみましょう。個別の演算子を合成して複雑な処理をシンプルに構築できます。

3. 合成可能性

パイプラインを使うと、小さな操作を組み合わせて複雑な処理を構築できます。

ts
// 手続き型(命令的)スタイルの処理
const rawUsers = [
  { firstName: 'Alice', lastName: 'Smith', status: 'active' },
  { firstName: 'Bob', lastName: 'Brown', status: 'inactive' },
  { firstName: 'Carol', lastName: 'Jones', status: 'active' }
];

const activeUsers = [];
for (const user of rawUsers) {
  if (user.status === 'active') {
    const fullName = `${user.firstName} ${user.lastName}`;
    activeUsers.push({ ...user, fullName });
  }
}
activeUsers.sort((a, b) => a.fullName.localeCompare(b.fullName));

const div0 = document.createElement('div');
div0.innerHTML = '<h3>手続き型スタイル</h3>';
activeUsers.forEach(user => {
  const p = document.createElement('p');
  p.textContent = user.fullName;
  div0.appendChild(p);
});
document.body.appendChild(div0);

⬇️⬇️⬇️

ts
// 宣言的なプログラミングスタイル
import { from } from 'rxjs';
import { filter, map, toArray } from 'rxjs/operators';

const out3 = document.createElement('div');
out3.innerHTML = '<h3>合成可能性</h3>';
document.body.appendChild(out3);

const users3 = [
  { firstName: 'Alice', lastName: 'Smith', status: 'active' },
  { firstName: 'Bob', lastName: 'Brown', status: 'inactive' },
  { firstName: 'Carol', lastName: 'Jones', status: 'active' }
];

const filterActive = filter((user: any) => user.status === 'active');
const formatFullName = map((user: any) => ({ ...user, fullName: `${user.firstName} ${user.lastName}` }));
const collectAndSort = toArray();
const sortByName = map((users: any[]) => users.sort((a, b) => a.fullName.localeCompare(b.fullName)));

from(users3).pipe(
  filterActive,
  formatFullName,
  collectAndSort,
  sortByName
).subscribe(users => {
  users.forEach(user => {
    const div = document.createElement('div');
    div.textContent = user.fullName;
    out3.appendChild(div);
  });
});

パイプラインの最適化テクニック

1. オペレーターの順序の重要性

オペレーターの順序は、パフォーマンスと機能の両方に大きな影響を与えます。

ts
// 非効率: mapが全ての要素に適用される
observable$.pipe(
  map(x => expensiveTransformation(x)),
  filter(x => x > 10)
)

// 効率的: filterが先に実行され、変換する要素が減る
observable$.pipe(
  filter(x => x > 10),
  map(x => expensiveTransformation(x))
)

2. カスタムパイプラインの作成

複雑な処理を再利用可能なパイプラインに抽出できます。

ts
import { Observable, pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';

// カスタムパイプライン関数
export function filterAndTransform<T, R>(
  filterFn: (value: T) => boolean,
  transformFn: (value: T) => R
) {
  return pipe(
    filter(filterFn),
    map(transformFn)
  );
}

// 使用例
observable$.pipe(
  filterAndTransform(
    x => x > 10,
    x => x * 2
  )
).subscribe(console.log);

パイプラインでよくある間違い

1. オペレーターの順序間違い

ts
// ❌ debounceTimeより先にfilterを適用すると、
// 入力ごとにfilterが実行され、debounceの効果が減少
inputEvents$.pipe(
  filter(text => text.length > 2),
  debounceTime(300)
)

// ✅ 先にdebounceTimeを適用する
inputEvents$.pipe(
  debounceTime(300),
  filter(text => text.length > 2)
)

2. パイプライン内での副作用

ts
// ❌ パイプライン内で副作用を直接実行
observable$.pipe(
  map(data => {
    // 副作用(良くない例)
    console.log(data);
    localStorage.setItem('lastData', JSON.stringify(data));
    return data;
  })
)

// ✅ tapオペレーターを使用する
observable$.pipe(
  tap(data => {
    console.log(data);
    localStorage.setItem('lastData', JSON.stringify(data));
  }),
  // データ変換はmapで行う
  map(data => transformData(data))
)

まとめ

RxJSのパイプラインは、複雑な非同期データフローを宣言的かつ合成可能な方法で管理するための強力なメカニズムです。パイプラインを適切に設計することで、コードの可読性、保守性、再利用性を大幅に向上させることができます。

パイプラインの設計時には、以下のポイントを意識すると良いでしょう。

  1. 最も効率的なオペレーターの順序を選ぶ
  2. 共通のパイプラインパターンを抽出して再利用する
  3. 副作用はtapオペレーターで分離する
  4. パイプラインの各ステップが単一の責任を持つようにする

このようなパイプライン指向のアプローチは、特に複雑なUIイベント処理、APIリクエスト、状態管理などのシナリオで威力を発揮します。

Released under the CC-BY-4.0 license.