Skip to content

オペレーター選択の迷い

RxJSには100種類以上のオペレーターがあり、どれを使えば良いか迷うことは誰もが経験する困難です。このページでは、実践的な選択基準とフローチャートを提供します。

100以上のオペレーターから選ぶ基準

問題:選択肢が多すぎる

typescript
// 配列を変換したい... map? scan? reduce? toArray?
// 複数のAPIを呼びたい... mergeMap? switchMap? concatMap? exhaustMap?
// 値をフィルタしたい... filter? take? first? distinctUntilChanged?
// 複数のストリームを結合したい... merge? combineLatest? zip? forkJoin?

解決策:カテゴリ + 目的で絞り込む

より詳細な選択フローチャート

以下のフローチャートは、具体的な目的に応じてオペレーターを選ぶ手順を示しています。

1. 変換オペレーター(Transformation)

いつ使う? データの形を変えたい、非同期処理を呼びたい

オペレーター用途よくあるユースケース
map値を1:1で変換プロパティ取得、計算、型変換
scan累積処理(中間値を流す)カウンター、合計、履歴
reduce累積処理(最終値のみ)配列の合計、最大値
mergeMap非同期処理を並列実行複数API並列呼び出し
switchMap非同期処理を切り替え検索API(最新のみ)
concatMap非同期処理を順次実行順序が重要な処理
exhaustMap実行中は新しい処理を無視連打防止(送信ボタン)

実践例:ユースケース別の選択

ユースケース1: プロパティを取得

typescript
import { of } from 'rxjs';
import { map } from 'rxjs';

interface User { id: number; name: string; }

of({ id: 1, name: 'Alice' }).pipe(
  map(user => user.name) // 値を1:1で変換 → map
).subscribe(name => console.log(name)); // 'Alice'

ユースケース2: カウンター

typescript
import { fromEvent } from 'rxjs';
import { scan } from 'rxjs';

const button = document.querySelector('button')!;

fromEvent(button, 'click').pipe(
  scan(count => count + 1, 0) // 累積処理 → scan
).subscribe(count => console.log(`クリック回数: ${count}`));

ユースケース3: 検索API呼び出し

typescript
import { fromEvent } from 'rxjs';
import { debounceTime, map, switchMap } from 'rxjs';

const searchInput = document.querySelector('input')!;

fromEvent(searchInput, 'input').pipe(
  debounceTime(300),
  map(e => (e.target as HTMLInputElement).value),
  switchMap(query => searchAPI(query)) // 最新のみ → switchMap
).subscribe(results => console.log(results));

2. フィルタリングオペレーター(Filtering)

いつ使う?

値を取捨選択したい、タイミングを制御したい

オペレーター用途よくあるユースケース
filter条件に合う値のみ通す偶数のみ、非null値のみ
take最初のN個だけ最初の5件取得
first最初の1つだけ初回値の取得
distinctUntilChanged前回と異なる値のみ重複除外
debounceTime一定時間経過後に発火検索入力(入力完了後)
throttleTime一定間隔で間引くスクロールイベント

実践例:ユースケース別の選択

ユースケース1: 偶数のみ取得

typescript
import { of } from 'rxjs';
import { filter } from 'rxjs';

of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0) // 条件に合う値のみ → filter
).subscribe(console.log); // 2, 4

ユースケース2: 検索入力の最適化

typescript
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, map } from 'rxjs';

const input = document.querySelector('input')!;

fromEvent(input, 'input').pipe(
  debounceTime(300),              // 入力完了を待つ → debounceTime
  map(e => (e.target as HTMLInputElement).value),
  distinctUntilChanged()          // 重複除外 → distinctUntilChanged
).subscribe(query => console.log('検索:', query));

ユースケース3: スクロールイベントの間引き

typescript
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs';

fromEvent(window, 'scroll').pipe(
  throttleTime(200) // 200msごとに1回のみ → throttleTime
).subscribe(() => console.log('スクロール位置:', window.scrollY));

3. 結合オペレーター(Combination)

いつ使う?

複数のストリームを組み合わせたい

オペレーター用途よくあるユースケース
merge複数のストリームを並行複数のイベント監視
combineLatest全ての最新値を組み合わせフォームバリデーション
zip対応する値をペア化2つのAPIの結果を対応付け
forkJoin全て完了後に結果を配列で複数APIの並列実行
withLatestFromメインストリーム + 補助値イベント + 現在の状態

実践例:ユースケース別の選択

ユースケース1: 複数のイベントを監視

typescript
import { fromEvent, merge } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
const keypresses$ = fromEvent(document, 'keypress');

merge(clicks$, keypresses$).pipe() // 並行監視 → merge
  .subscribe(() => console.log('何かのイベント発生'));

ユースケース2: フォームバリデーション

typescript
import { combineLatest } from 'rxjs';
import { map } from 'rxjs';

const email$ = getFormControl('email');
const password$ = getFormControl('password');

combineLatest([email$, password$]).pipe( // 全ての最新値 → combineLatest
  map(([email, password]) => email.length > 0 && password.length > 7)
).subscribe(isValid => console.log('フォーム有効:', isValid));

ユースケース3: 複数APIの並列実行

typescript
import { forkJoin } from 'rxjs';

forkJoin({
  user: getUserAPI(),
  posts: getPostsAPI(),
  comments: getCommentsAPI()
}).subscribe(({ user, posts, comments }) => { // 全完了待ち → forkJoin
  console.log('全データ取得完了', { user, posts, comments });
});

よく使うオペレーター20選

以下は実務で最も頻繁に使用されるオペレーターです。まずはこの20個を習得しましょう。

🥇 最頻出(必須)

  1. map - 値を変換
  2. filter - 条件でフィルタ
  3. switchMap - 検索など、最新のみ必要
  4. tap - デバッグ、副作用
  5. take - 最初のN個
  6. first - 最初の1つ
  7. catchError - エラー処理
  8. takeUntil - 購読解除

🥈 頻出(よく使う)

  1. mergeMap - 並列非同期処理
  2. debounceTime - 入力完了待ち
  3. distinctUntilChanged - 重複除外
  4. combineLatest - 複数値の組み合わせ
  5. startWith - 初期値を設定
  6. scan - 累積処理
  7. shareReplay - 結果をキャッシュ

🥉 よく使う(知っておくべき)

  1. concatMap - 順次処理
  2. throttleTime - イベント間引き
  3. withLatestFrom - 補助値の取得
  4. forkJoin - 複数API待ち合わせ
  5. retry - リトライ処理

switchMap vs mergeMap vs concatMap vs exhaustMap

この4つは最も混同しやすいオペレーターです。違いを明確に理解しましょう。

比較表

オペレーター実行方法前の処理新しい処理使いどころ
switchMap切り替えキャンセル即座に開始検索、オートコンプリート
mergeMap並列実行継続即座に開始ファイルアップロード、分析
concatMap順次実行完了を待つ待機後に開始順序が重要な処理
exhaustMap実行中は無視継続無視ボタン連打防止

Marble Diagram での比較

外側: ----A----B----C----|

内側: A → --1--2|
      B → --3--4|
      C → --5--6|

switchMap:  ----1--3--5--6|  (Aは2の前にキャンセル、Bは4の前にキャンセル)
mergeMap:   ----1-23-45-6|   (全て並列実行)
concatMap:  ----1--2--3--4--5--6|  (順次実行)
exhaustMap: ----1--2|            (B、Cは無視される)

実践例:同じ処理での4つの違い

状況: ボタンクリックごとにAPI(1秒かかる)を呼ぶ。ユーザーが0.5秒ごとにクリック。

switchMap - 検索に最適

typescript
import { fromEvent } from 'rxjs';
import { switchMap } from 'rxjs';

fromEvent(button, 'click').pipe(
  switchMap(() => searchAPI()) // 最新のみ実行、古いリクエストはキャンセル
).subscribe(result => console.log(result));

// 0.0秒: クリック1 → API1開始
// 0.5秒: クリック2 → API1キャンセル、API2開始
// 1.0秒: クリック3 → API2キャンセル、API3開始
// 2.0秒: API3完了 → 結果表示(API3のみ)

💡 使いどころ

  • 検索・オートコンプリート: 最新の入力値だけが必要
  • タブ切り替え: 表示中のタブのデータだけ必要
  • ページネーション: 最新のページだけ表示

mergeMap - 並列処理に最適

typescript
import { fromEvent } from 'rxjs';
import { mergeMap } from 'rxjs';

fromEvent(button, 'click').pipe(
  mergeMap(() => uploadFileAPI()) // 全て並列実行
).subscribe(result => console.log(result));

// 0.0秒: クリック1 → API1開始
// 0.5秒: クリック2 → API2開始(API1継続)
// 1.0秒: クリック3 → API3開始(API1, API2継続)
// 1.0秒: API1完了 → 結果表示
// 1.5秒: API2完了 → 結果表示
// 2.0秒: API3完了 → 結果表示

💡 使いどころ

  • ファイルアップロード: 複数ファイルを同時アップロード
  • 分析・ログ送信: 独立した処理を並列実行
  • 通知システム: 複数の通知を同時処理

concatMap - 順序が重要な処理に最適

typescript
import { fromEvent } from 'rxjs';
import { concatMap } from 'rxjs';

fromEvent(button, 'click').pipe(
  concatMap(() => updateDatabaseAPI()) // 順次実行(前の完了を待つ)
).subscribe(result => console.log(result));

// 0.0秒: クリック1 → API1開始
// 0.5秒: クリック2 → 待機(キューに追加)
// 1.0秒: クリック3 → 待機(キューに追加)
// 1.0秒: API1完了 → 結果表示、API2開始
// 2.0秒: API2完了 → 結果表示、API3開始
// 3.0秒: API3完了 → 結果表示

💡 使いどころ

  • データベース更新: 順序が重要な書き込み処理
  • トランザクション: 前の処理結果を次で使う
  • アニメーション: 順番に実行したい処理

exhaustMap - 連打防止に最適

typescript
import { fromEvent } from 'rxjs';
import { exhaustMap } from 'rxjs';

fromEvent(button, 'click').pipe(
  exhaustMap(() => submitFormAPI()) // 実行中は新しいリクエストを無視
).subscribe(result => console.log(result));

// 0.0秒: クリック1 → API1開始
// 0.5秒: クリック2 → 無視(API1実行中)
// 1.0秒: クリック3 → 無視(API1実行中)
// 1.0秒: API1完了 → 結果表示
// 1.5秒: クリック4 → API4開始(前回完了済み)

💡 使いどころ

  • 送信ボタン: 二重送信防止
  • ログイン処理: 連打によるエラー防止
  • 決済処理: 重複実行を防ぐ

選択フローチャート

実践での判断基準

ステップ1: 何を達成したいか明確にする

typescript
// ❌ 悪い例:とりあえずmergeMapを使う
observable$.pipe(
  mergeMap(value => someAPI(value))
);

// ✅ 良い例:目的を明確にしてから選ぶ
// 目的: ユーザーの検索入力に対して、最新の結果のみ表示したい
// → 古いリクエストはキャンセルすべき → switchMap
searchInput$.pipe(
  switchMap(query => searchAPI(query))
);

ステップ2: パフォーマンスを考慮する

debounceTime vs throttleTime の選択

typescript
// 検索入力: ユーザーが入力を"完了"してから実行
searchInput$.pipe(
  debounceTime(300), // 300ms入力がなければ実行
  switchMap(query => searchAPI(query))
);

// スクロール: 一定間隔で実行(高頻度すぎるのを防ぐ)
scroll$.pipe(
  throttleTime(200), // 200msごとに1回のみ実行
  tap(() => loadMoreItems())
);

ステップ3: エラー処理を組み込む

typescript
import { of } from 'rxjs';
import { catchError, retry, switchMap } from 'rxjs';

searchInput$.pipe(
  debounceTime(300),
  switchMap(query =>
    searchAPI(query).pipe(
      retry(2),                          // 2回までリトライ
      catchError(err => {
        console.error('検索エラー:', err);
        return of([]);                   // 空配列を返す
      })
    )
  )
).subscribe(results => console.log(results));

ステップ4: メモリリークを防ぐ

typescript
import { Subject } from 'rxjs';
import { switchMap, takeUntil } from 'rxjs';

class SearchComponent {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    searchInput$.pipe(
      debounceTime(300),
      switchMap(query => searchAPI(query)),
      takeUntil(this.destroy$)           // コンポーネント破棄時に解除
    ).subscribe(results => console.log(results));
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

理解度チェックリスト

以下の質問に答えられるか確認してください。

markdown
## 基本理解
- [ ] オペレーターをカテゴリ(変換、フィルタリング、結合)で分類できる
- [ ] よく使うオペレーター20選のうち、10個以上を説明できる
- [ ] switchMap, mergeMap, concatMap, exhaustMapの違いを説明できる

## 実践的な選択
- [ ] 検索機能に適したオペレーターを選べる(switchMap + debounceTime)
- [ ] 複数APIの並列呼び出しに適したオペレーターを選べる(forkJoin or mergeMap)
- [ ] フォームバリデーションに適したオペレーターを選べる(combineLatest)

## パフォーマンス
- [ ] debounceTimeとthrottleTimeの使い分けができる
- [ ] 高頻度イベントの最適化方法を知っている
- [ ] メモリリークを防ぐパターンを実装できる

## エラー処理
- [ ] catchErrorとretryを組み合わせて使える
- [ ] エラー時のフォールバック処理を実装できる
- [ ] エラーをユーザーにフィードバックできる

次のステップ

オペレーター選択を理解したら、次はタイミングと順序を学びましょう。

タイミングと順序の理解(準備中) - いつ値が流れるのか、同期vs非同期の理解

関連ページ

🎯 練習問題

問題1: 適切なオペレーターを選択

以下のシナリオに最適なオペレーターを選んでください。

  1. ユーザーが検索ボックスに入力 → API呼び出し
  2. ボタンクリックで複数のファイルをアップロード
  3. フォームの全フィールドが有効か判定
  4. 送信ボタンの連打を防止
解答例

1. 検索ボックス → API呼び出し

typescript
searchInput$.pipe(
  debounceTime(300),      // 入力完了を待つ
  distinctUntilChanged(), // 重複除外
  switchMap(query => searchAPI(query)) // 最新のみ
).subscribe(results => displayResults(results));

理由

検索は最新の結果のみ必要なのでswitchMap。入力完了を待つのでdebounceTime


2. 複数ファイルをアップロード

typescript
fromEvent(uploadButton, 'click').pipe(
  mergeMap(() => {
    const files = getSelectedFiles();
    return forkJoin(files.map(file => uploadFileAPI(file)));
  })
).subscribe(results => console.log('全ファイルアップロード完了', results));

理由

複数ファイルを並列アップロードするのでforkJoin。独立した処理なのでmergeMapでも可。


3. フォーム全フィールドの有効性判定

typescript
combineLatest([
  emailField$,
  passwordField$,
  agreeTerms$
]).pipe(
  map(([email, password, agreed]) =>
    email.valid && password.valid && agreed
  )
).subscribe(isValid => submitButton.disabled = !isValid);

理由

全フィールドの最新値を組み合わせるのでcombineLatest


4. 送信ボタンの連打防止

typescript
fromEvent(submitButton, 'click').pipe(
  exhaustMap(() => submitFormAPI())
).subscribe(result => console.log('送信完了', result));

理由

実行中の処理を保護し、新しいクリックを無視するのでexhaustMap

問題2: switchMapとmergeMapの選択

以下のコードはmergeMapを使っていますが、問題があります。修正してください。

typescript
searchInput$.pipe(
  debounceTime(300),
  mergeMap(query => searchAPI(query))
).subscribe(results => displayResults(results));
解答例
typescript
searchInput$.pipe(
  debounceTime(300),
  switchMap(query => searchAPI(query)) // mergeMap → switchMap
).subscribe(results => displayResults(results));

問題点

  • mergeMapだと、全ての検索リクエストが並列実行される
  • ユーザーが「a」→「ab」→「abc」と入力すると、3つのリクエストが全て実行される
  • 古いリクエスト(「a」の結果)が後から返ってきて、最新の結果を上書きする可能性がある

修正理由

  • switchMapを使えば、新しい検索が開始されると古いリクエストはキャンセルされる
  • 常に最新の検索結果のみが表示される

問題3: 実践シナリオ

以下の要件を満たすコードを書いてください。

要点

  • ユーザーがボタンをクリック
  • 3つのAPI(ユーザー情報、投稿一覧、コメント一覧)を並列で取得
  • 全て完了したらデータを表示
  • エラーが発生したら空のデータを返す
  • コンポーネント破棄時に購読を解除
解答例
typescript
import { fromEvent, forkJoin, of, Subject } from 'rxjs';
import { switchMap, catchError, takeUntil } from 'rxjs';

class DataComponent {
  private destroy$ = new Subject<void>();
  private button = document.querySelector('button')!;

  ngOnInit() {
    fromEvent(this.button, 'click').pipe(
      switchMap(() =>
        forkJoin({
          user: this.getUserAPI().pipe(
            catchError(() => of(null))
          ),
          posts: this.getPostsAPI().pipe(
            catchError(() => of([]))
          ),
          comments: this.getCommentsAPI().pipe(
            catchError(() => of([]))
          )
        })
      ),
      takeUntil(this.destroy$)
    ).subscribe(({ user, posts, comments }) => {
      console.log('データ取得完了', { user, posts, comments });
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }

  private getUserAPI() { /* ... */ }
  private getPostsAPI() { /* ... */ }
  private getCommentsAPI() { /* ... */ }
}

ポイント

  • forkJoinで3つのAPIを並列実行し、全完了を待つ
  • 各APIにcatchErrorでエラー時のフォールバック値を設定
  • switchMapでボタンクリックごとに新しいリクエストに切り替え
  • takeUntilでコンポーネント破棄時に自動解除

Released under the CC-BY-4.0 license.