Skip to content

mergeScan - 非同期処理を伴う累積

mergeScanオペレーターは、ストリームの各値に対して非同期の累積処理を実行します。 scanmergeMapを組み合わせたような動作で、累積値を保持しながら、各値を新しいObservableに変換し、その結果を次の累積処理に使用します。

🔰 基本構文と使い方

ts
import { interval, of } from 'rxjs';
import { mergeScan, take,  } from 'rxjs';

interval(1000).pipe(
  take(5),
  mergeScan((acc, curr) => {
    // 各値に対して非同期処理(ここでは即座に返す)
    return of(acc + curr);
  }, 0)
).subscribe(console.log);

// 出力: 0, 1, 3, 6, 10
  • accは累積値、currは現在の値です。
  • 累積関数はObservableを返す必要があります。
  • 各値の処理結果は累積されていきます。

🌐 RxJS公式ドキュメント - mergeScan

💡 典型的な活用パターン

  • APIレスポンスを累積して集計する
  • 前の結果に基づいて次のAPIリクエストを実行する
  • リアルタイムデータの非同期累積処理
  • ページネーションで複数ページのデータを累積取得

📊 scanとの違い

オペレーター累積関数の戻り値ユースケース
scan直接値を返す同期的な累積処理
mergeScanObservableを返す非同期的な累積処理
ts
// scan - 同期処理
source$.pipe(
  scan((acc, curr) => acc + curr, 0)
)

// mergeScan - 非同期処理
source$.pipe(
  mergeScan((acc, curr) => of(acc + curr).pipe(delay(100)), 0)
)

🧠 実践コード例(API累積取得)

ボタンをクリックするたびに、前回の結果に新しいデータを追加していく例です。

ts
import { fromEvent, of } from 'rxjs';
import { mergeScan, delay, take, map } from 'rxjs';

// ボタン作成
const button = document.createElement('button');
button.textContent = 'データ取得';
document.body.appendChild(button);

// 出力エリア作成
const output = document.createElement('div');
output.style.marginTop = '10px';
document.body.appendChild(output);

// ダミーAPI(遅延してデータを返す)
const fetchData = (page: number) => {
  return of(`データ${page}`).pipe(delay(500));
};

// クリックイベントで累積取得
fromEvent(button, 'click').pipe(
  take(5), // 最大5回
  mergeScan((accumulated, _, index) => {
    const page = index + 1;
    console.log(`ページ${page}を取得中...`);

    // 前回までの累積データに新しいデータを追加
    return fetchData(page).pipe(
      map(newData => [...accumulated, newData])
    );
  }, [] as string[])
).subscribe((allData) => {
  output.innerHTML = `
    <div>取得済みデータ:</div>
    <ul>${allData.map(d => `<li>${d}</li>`).join('')}</ul>
  `;
});
  • 各クリックで非同期にデータを取得します。
  • 前回までの結果(accumulated)に新しいデータを追加していきます。
  • 累積結果がリアルタイムで更新されます。

🎯 実用例:並行制御付きの累積処理

mergeScanにはconcurrentパラメーターがあり、同時実行数を制御できます。

ts
import { interval, of } from 'rxjs';
import { mergeScan, take, delay } from 'rxjs';

interface RequestLog {
  total: number;
  logs: string[];
}

interval(200).pipe(
  take(10),
  mergeScan((acc, curr) => {
    const timestamp = new Date().toLocaleTimeString();
    console.log(`リクエスト${curr}開始: ${timestamp}`);

    // 各リクエストは1秒かかる
    return of({
      total: acc.total + 1,
      logs: [...acc.logs, `リクエスト${curr}完了: ${timestamp}`]
    }).pipe(delay(1000));
  }, { total: 0, logs: [] } as RequestLog, 2) // 同時実行数2
).subscribe((result) => {
  console.log(`累計: ${result.total}件`);
  console.log(result.logs[result.logs.length - 1]);
});
  • concurrent: 2により、最大2つまで同時にリクエストを実行します。
  • 3つ目以降のリクエストは、前のリクエストが完了するまで待機します。

⚠️ 注意点

1. エラーハンドリング

累積関数内でエラーが発生すると、ストリーム全体が停止します。

ts
source$.pipe(
  mergeScan((acc, curr) => {
    return apiCall(curr).pipe(
      map(result => acc + result),
      catchError(err => {
        console.error('エラー発生:', err);
        // 累積値を維持して続行
        return of(acc);
      })
    );
  }, 0)
)

2. メモリ管理

累積値が大きくなりすぎないよう注意

ts
// 悪い例:無制限に累積
mergeScan((acc, curr) => of([...acc, curr]), [])

// 良い例:最新N件のみ保持
mergeScan((acc, curr) => {
  const newAcc = [...acc, curr];
  return of(newAcc.slice(-100)); // 最新100件のみ
}, [])

3. 同期的な処理ならscanを使う

非同期処理が不要な場合は、シンプルなscanを使いましょう。

ts
// mergeScanは不要
source$.pipe(
  mergeScan((acc, curr) => of(acc + curr), 0)
)

// scanで十分
source$.pipe(
  scan((acc, curr) => acc + curr, 0)
)

🔗 関連オペレーター

  • scan - 同期的な累積処理
  • reduce - 完了時のみ最終累積値を出力
  • mergeMap - 非同期マッピング(累積なし)
  • expand - 再帰的な展開処理

Released under the CC-BY-4.0 license.