RxJSとは何か?
概要
RxJS(Reactive Extensions for JavaScript)とは、「リアクティブプログラミング」を JavaScript で行うためのライブラリです。
リアクティブプログラミングとは?
リアクティブプログラミングは、データの変化に応じて自動的に更新されるプログラムの作成方法です。 イベント駆動型プログラミングの一種であり、特に非同期なデータストリームの扱いに焦点を当てています。 データの流れ(ストリーム)を中心に考え、その流れに対して反応(リアクション)する方式でプログラムを構築します。
つまり、RxJSはイベントや非同期データの流れ(ストリーム)を、関数型スタイルで扱うためのライブラリです。Observableパターンを利用して、非同期データストリームを扱うための強力なツールを提供します。
Observableとは、イベントや非同期データの流れ(ストリーム)を表現するRxJSの中核的な構成要素です。値が「流れてくる」源であり、購読(subscribe)によって値を受け取ることができます。Observableとは、時間の経過とともに値を発行する「データの流れ(ストリーム)」です。購読(subscribe)することで、その値を受け取ることができます。
TIP
「ストリームってそもそも何?」という方は、ストリームとは? も参照してみてください。
簡単な使用例
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(event => {
console.log('クリックされました:', event);
});RxJSの基本構成要素
RxJSを使いこなすには、以下の中核的な構成要素を理解することが重要です。
| 構成要素 | 概要 |
|---|---|
Observable | 非同期または時間に沿って発生するデータを表すストリームの源です。 |
Observer[1] | Observableからデータを購読して受け取る側の存在です。 |
Subscription | Observableの購読と解除の管理を行います。 |
Creation Functions | Observableを作成・結合するための関数群です。 |
Operator | Observableを変換・制御するための関数群です。 |
Subject[2] | ObservableとObserverの両方の性質を持つ中継器です。 |
Scheduler[3] | Observableの実行タイミングを制御する仕組みです。 |
これらはそれぞれ独立した機能を持ちながらも連携して動作します。 たとえば、Creation FunctionsでObservableを作成・結合し、Operatorで変換・制御し、Observerが購読し、Schedulerで実行タイミングを制御する、といった形で、全体としてストリーム処理を構成します。
RxJSの構成要素とデータフロー
※ 各構成要素の詳細な使い方や例については、それぞれの専用章で個別に解説します。
構成クラス図
RxJSの利点
| 利点 | 内容 |
|---|---|
| 宣言的コード[4] | map, filter などで「何をしたいか」を記述し、forループなどの手続き的記述を避けられる |
| 非同期処理の単純化 | Promise やコールバックのネストを避け、直感的な流れで書ける |
| エラー処理 | .pipe(catchError(...)) などでストリーム中のエラーを統一的に処理可能 |
| キャンセル可能 | Subscription.unsubscribe() によりストリームの中断が可能 |
| 多様なオペレーター | debounceTime, mergeMap, combineLatest など多数の演算子で変換や合成が可能 |
ユースケース
RxJSは「時間とともに変化するデータ」を扱うあらゆる場面で活躍します。以下に主要な活用分野を紹介します。
リアルタイム通信・ストリーミング
WebSocketやServer-Sent Events(SSE)などのリアルタイム通信を扱う場合、RxJSは特に強力です。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| WebSocket通信 | チャット、通知、株価更新など | webSocket, filter, map |
| Server-Sent Events | サーバーからのプッシュ通知 | fromEvent, retry |
| IoTセンサー監視 | 連続的なセンサーデータの処理 | debounceTime, distinctUntilChanged |
簡単な例
import { webSocket } from 'rxjs/webSocket';
import { filter } from 'rxjs';
const socket$ = webSocket('wss://example.com/chat');
socket$.pipe(
filter(msg => msg.type === 'message')
).subscribe(msg => console.log('新着:', msg.text));UI/状態管理・フォーム制御
ユーザー入力や状態の変化をリアクティブに扱うことができます。
フレームワークとの関係
現代のフロントエンドフレームワーク(Angular Signals、React hooks、Vue Composition API、Svelte Runesなど)は、それぞれ独自のリアクティブシステムを提供しています。RxJSはフレームワーク非依存のライブラリとして、これらと併用・使い分けが可能です。フレームワーク固有の仕組みとRxJSの統合については、Chapter 15「フレームワークとの統合」(準備中)で詳しく解説します。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| 入力フォームの制御 | 検索補完、リアルタイムバリデーション | debounceTime, distinctUntilChanged, switchMap |
| 複数フォーム項目の連携 | 依存する入力項目の更新 | combineLatest, withLatestFrom |
| コンポーネント間通信 | イベントバスやカスタム状態管理 | Subject, share |
| UIイベント処理 | クリック、スクロール、ドラッグ&ドロップ | fromEvent, takeUntil |
簡単な例
import { fromEvent, combineLatest } from 'rxjs';
import { debounceTime, map, switchMap } from 'rxjs';
const searchInput = document.querySelector('#search') as HTMLInputElement;
const sortSelect = document.querySelector('#sort') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input').pipe(
map(e => (e.target as HTMLInputElement).value)
);
const sort$ = fromEvent(sortSelect, 'change').pipe(
map(e => (e.target as HTMLSelectElement).value)
);
combineLatest([search$, sort$]).pipe(
debounceTime(300),
switchMap(([query, order]) =>
fetch(`/api/search?q=${query}&sort=${order}`).then(r => r.json())
)
).subscribe(results => console.log(results));オフライン対応・PWA
Progressive Web App(PWA)でのオフライン対応やネットワーク状態管理に活用できます。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| ネットワーク状態監視 | オンライン/オフライン検出 | fromEvent, merge |
| オフライン時の再試行 | 接続復帰時の自動再同期 | retry, retryWhen |
| キャッシュ制御 | Service Workerとの連携 | switchMap, catchError |
簡単な例
import { fromEvent, merge } from 'rxjs';
import { map, startWith } from 'rxjs';
const online$ = fromEvent(window, 'online').pipe(map(() => true));
const offline$ = fromEvent(window, 'offline').pipe(map(() => false));
merge(online$, offline$).pipe(
startWith(navigator.onLine)
).subscribe(isOnline => {
console.log(isOnline ? 'オンライン' : 'オフライン');
});AI/ストリーミングAPI
OpenAIなどのストリーミングAPIレスポンスを扱う場合にも最適です。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| トークン逐次出力 | AIレスポンスのリアルタイム表示 | concatMap, scan |
| ストリーミング処理 | Server-Sent Eventsの処理 | fromEvent, map |
| バックエンド統合 | NestJS(RxJS標準搭載)での利用 | 各種オペレーター |
HTTP通信とエラー処理
非同期HTTP通信をエレガントに扱えます。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| APIリクエスト | RESTful APIとの通信 | switchMap, mergeMap |
| エラーハンドリング | リトライやフォールバック | catchError, retry |
| タイムアウト制御 | 応答時間の制限 | timeout |
| キャンセル | 不要なリクエストの中断 | takeUntil, unsubscribe() |
状態管理・アーキテクチャ
アプリケーション全体のアーキテクチャ設計にも活用できます。
| 用途 | 説明 | 主要なオペレーター |
|---|---|---|
| 状態管理ライブラリ | NgRx、Redux-Observableなど | scan, share |
| イベントフロー管理 | DDD(ドメイン駆動設計)での活用 | Subject, shareReplay |
| データレイヤー分離 | クリーンアーキテクチャ | 各種オペレーター |
TIP
PromiseとRxJSの使い分けについては、PromiseとRxJSの違いも参照してください。
まとめ
RxJSは、非同期およびイベントベースのプログラミングに対する強力なアプローチを提供します。Observableを中心としたデータストリームの考え方は、複雑な非同期処理を扱う際に特に役立ちます。
実装としては Subscriber クラスが使われます。詳しくは Observer と Subscriber の違い を参照してください。 ↩︎
Subjectは、値を発行するObservableであると同時に、値を受け取るObserverとしても振る舞える特殊な存在です。 ↩︎
Schedulerは、非同期処理の実行タイミングやコンテキストを制御するために使われ、デバッグやパフォーマンス管理にも役立ちます。 ↩︎
↩︎- 宣言的コード: 「どういう結果が欲しいのか」を素直に書くコード
- 手続き的コード: 「どういう計算を行っていけば欲しい結果が手に入るのか」を書くコード