Skip to content

buffer - 別のObservableのタイミングで値をまとめる

bufferオペレーターは、別のObservableが値を発行するまでソースObservableの値を蓄積し、そのタイミングで蓄積した値を配列として一括出力します。 時間や個数ではなく、外部のイベントやシグナルに応じてバッファリングを制御したい場合に便利です。

🔰 基本構文と使い方

ts
import { interval, fromEvent } from 'rxjs';
import { buffer } from 'rxjs';

// 100msごとに値を発行
const source$ = interval(100);

// クリックイベントをトリガーとして使用
const clicks$ = fromEvent(document, 'click');

source$.pipe(
  buffer(clicks$)
).subscribe(bufferedValues => {
  console.log('クリックまでに蓄積された値:', bufferedValues);
});

// 出力例(クリックするたびに出力):
// クリックまでに蓄積された値: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// クリックまでに蓄積された値: [11, 12, 13, 14, 15, 16, 17]
// ...
  • clicks$が値を発行するたびに、それまでに蓄積された値が配列として出力されます。
  • バッファの区切りを外部のObservableで制御できるのが特徴です。

🌐 RxJS公式ドキュメント - buffer

💡 典型的な活用パターン

  • ユーザーアクションをトリガーにしたバッチ処理
  • 外部シグナルに基づくデータ収集と送信
  • 動的な区切りでのイベントグループ化
  • WebSocketやAPIの接続確立時にまとめて送信

🔍 bufferTime / bufferCount との違い

オペレーター区切りのタイミング用途
buffer別のObservableの発行イベント駆動型の制御
bufferTime一定時間時間ベースのバッチ処理
bufferCount一定個数個数ベースのバッチ処理
ts
import { interval, timer } from 'rxjs';
import { buffer } from 'rxjs';

const source$ = interval(100);
// 1秒ごとにトリガー
const trigger$ = timer(1000, 1000);

source$.pipe(
  buffer(trigger$)
).subscribe(values => {
  console.log('1秒ごとの値:', values);
});

// 出力:
// 1秒ごとの値: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
// 1秒ごとの値: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

🧠 実践コード例(UI付き)

ボタンクリックをトリガーに、それまでのマウス移動イベントをまとめて記録する例です。

ts
import { fromEvent } from 'rxjs';
import { map, buffer } from 'rxjs';

// ボタンと出力エリアを作成
const button = document.createElement('button');
button.textContent = 'マウス移動を記録';
document.body.appendChild(button);

const output = document.createElement('div');
output.style.marginTop = '10px';
document.body.appendChild(output);

// マウス移動イベント
const mouseMoves$ = fromEvent<MouseEvent>(document, 'mousemove').pipe(
  map(event => ({ x: event.clientX, y: event.clientY }))
);

// ボタンクリックをトリガーに
const clicks$ = fromEvent(button, 'click');

mouseMoves$.pipe(
  buffer(clicks$)
).subscribe(positions => {
  const message = `検出されたイベント数: ${positions.length}件`;
  console.log(message);
  console.log('座標データ:', positions.slice(0, 5)); // 最初の5件のみ表示
  output.textContent = message;
});
  • ボタンをクリックするまでのマウス移動がすべてバッファに蓄積されます。
  • クリック時にまとめて処理されるため、任意のタイミングでのバッチ処理が可能です。

🎯 複数のトリガーを使った高度な例

複数のトリガーObservableを組み合わせることで、より柔軟な制御が可能です。

ts
import { interval, merge, fromEvent, timer } from 'rxjs';
import { buffer, mapTo } from 'rxjs';

const source$ = interval(100);

// 複数のトリガー: クリックまたは5秒経過
const clicks$ = fromEvent(document, 'click').pipe(mapTo('click'));
const fiveSeconds$ = timer(5000, 5000).pipe(mapTo('timer'));
const trigger$ = merge(clicks$, fiveSeconds$);

source$.pipe(
  buffer(trigger$)
).subscribe(values => {
  console.log(`バッファ出力 (${values.length}個):`, values);
});

⚠️ 注意点

メモリリークに注意

bufferは次のトリガーまで値を蓄積し続けるため、トリガーが長時間発生しない場合、メモリを圧迫する可能性があります。

ts
// 悪い例: トリガーが発生しない可能性がある
const neverTrigger$ = fromEvent(document.querySelector('.non-existent'), 'click');

source$.pipe(
  buffer(neverTrigger$) // トリガーが発生せず、無限にバッファが蓄積される
).subscribe();

対策:

  • bufferTimebufferCountと組み合わせて最大バッファサイズを制限
  • タイムアウト処理を追加
ts
import { interval, fromEvent, timer, race } from 'rxjs';
import { buffer } from 'rxjs';

const source$ = interval(100);

// 複数のトリガー: クリックまたは5秒経過
const clicks$ = fromEvent(document, 'click');
const timeout$ = timer(10000); // 最大10秒でタイムアウト

source$.pipe(
  buffer(race(clicks$, timeout$)) // どちらか早い方で発行
).subscribe(values => {
  console.log('バッファ:', values);
});

📚 関連オペレーター

  • bufferTime - 時間ベースでバッファリング
  • bufferCount - 個数ベースでバッファリング
  • bufferToggle - 開始・終了のObservableでバッファリング制御
  • bufferWhen - 動的なクロージング条件でバッファリング
  • window - バッファの代わりにObservableを返す

まとめ

bufferオペレーターは、外部のObservableをトリガーとして値をまとめて処理するための強力なツールです。時間や個数ではなく、イベント駆動型のバッチ処理を実現できます。ただし、トリガーが発生しない場合のメモリリークには注意が必要です。

Released under the CC-BY-4.0 license.