Skip to content

mergeAll - 全ての内部Observableを並行に平坦化

mergeAll オペレーターは、Higher-order Observable(Observable of Observables)を受け取り、 全ての内部Observableを並行に購読して、値を平坦化します。

🔰 基本構文と使い方

ts
import { fromEvent, interval } from 'rxjs';
import { map, mergeAll, take } from 'rxjs';

const clicks$ = fromEvent(document, 'click');

// クリックごとに新しいカウンターを開始(Higher-order Observable)
const higherOrder$ = clicks$.pipe(
  map(() => interval(1000).pipe(take(3)))
);

// 全てのカウンターを並行に購読
higherOrder$
  .pipe(mergeAll())
  .subscribe(x => console.log(x));

// 出力(クリックを3回した場合):
// 0(1つ目のカウンター)
// 1(1つ目のカウンター)
// 0(2つ目のカウンター)← 並行実行
// 2(1つ目のカウンター)
// 1(2つ目のカウンター)
// 0(3つ目のカウンター)← 並行実行
// ...
  • Higher-order Observableから発行される各内部Observableを並行に購読
  • 全ての内部Observableからの値を単一のストリームに結合
  • 並行購読数を制限することも可能(mergeAll(2) = 最大2つまで並行)

🌐 RxJS公式ドキュメント - mergeAll

💡 典型的な活用パターン

  • 複数のAPI呼び出しを並行実行
  • ユーザーアクションごとに独立したストリームを開始
  • WebSocketやEventSourceなど複数のリアルタイム接続を統合

🧠 実践コード例

入力変更のたびにAPI呼び出し(模擬)を並行実行する例

ts
import { fromEvent, of } from 'rxjs';
import { map, mergeAll, delay, debounceTime } from 'rxjs';

const input = document.createElement('input');
input.placeholder = '検索キーワードを入力';
document.body.appendChild(input);

const output = document.createElement('div');
document.body.appendChild(output);

// 入力イベントをデバウンス
const search$ = fromEvent(input, 'input').pipe(
  debounceTime(300),
  map((e) => (e.target as HTMLInputElement).value)
);

// Higher-order Observable: 各入力値に対して模擬API呼び出し
const results$ = search$.pipe(
  map(query =>
    // 模擬API呼び出し(500msの遅延)
    of(`結果: "${query}"`).pipe(delay(500))
  ),
  mergeAll() // 全てのAPI呼び出しを並行実行
);

results$.subscribe(result => {
  const item = document.createElement('div');
  item.textContent = result;
  output.prepend(item);
});
  • ユーザーが素早く入力を変更しても、全てのAPI呼び出しが並行実行されます
  • 古い検索結果が新しい結果の後に表示される可能性があります(順序保証なし)

🔄 関連オペレーター

オペレーター説明
mergeMapmap + mergeAll の短縮形(よく使われる)
concatAll内部Observableを順番に購読(前の完了を待つ)
switchAll新しい内部Observableに切り替え(古いものをキャンセル)
exhaustAll実行中は新しい内部Observableを無視

⚠️ 注意点

並行購読数の制限

並行購読数を制限しないと、パフォーマンス問題が発生する可能性があります。

ts
// 並行購読数を2つまでに制限
higherOrder$.pipe(
  mergeAll(2) // 最大2つまで並行実行
).subscribe();

順序保証なし

mergeAll は並行実行のため、値の順序は保証されません。 順序が重要な場合は concatAll を使用してください。

Released under the CC-BY-4.0 license.