Skip to content

RxJSとは何か?

概要

RxJS(Reactive Extensions for JavaScript)とは、「リアクティブプログラミング」を JavaScript で行うためのライブラリです。

リアクティブプログラミングとは?

リアクティブプログラミングは、データの変化に応じて自動的に更新されるプログラムの作成方法です。 イベント駆動型プログラミングの一種であり、特に非同期なデータストリームの扱いに焦点を当てています。 データの流れ(ストリーム)を中心に考え、その流れに対して反応(リアクション)する方式でプログラムを構築します。

つまり、RxJSはイベントや非同期データの流れ(ストリーム)を、関数型スタイルで扱うためのライブラリです。Observableパターンを利用して、非同期データストリームを扱うための強力なツールを提供します。

Observableとは、イベントや非同期データの流れ(ストリーム)を表現するRxJSの中核的な構成要素です。値が「流れてくる」源であり、購読(subscribe)によって値を受け取ることができます。Observableとは、時間の経過とともに値を発行する「データの流れ(ストリーム)」です。購読(subscribe)することで、その値を受け取ることができます。

TIP

「ストリームってそもそも何?」という方は、ストリームとは? も参照してみてください。

簡単な使用例

ts
import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(event => {
  console.log('クリックされました:', event);
});

RxJSの基本構成要素

RxJSを使いこなすには、以下の中核的な構成要素を理解することが重要です。

構成要素概要
Observable非同期または時間に沿って発生するデータを表すストリームの源です。
Observer[1]Observableからデータを購読して受け取る側の存在です。
SubscriptionObservableの購読と解除の管理を行います。
Creation FunctionsObservableを作成・結合するための関数群です。
OperatorObservableを変換・制御するための関数群です。
Subject[2]ObservableとObserverの両方の性質を持つ中継器です。
Scheduler[3]Observableの実行タイミングを制御する仕組みです。

これらはそれぞれ独立した機能を持ちながらも連携して動作します。 たとえば、Creation FunctionsでObservableを作成・結合し、Operatorで変換・制御し、Observerが購読し、Schedulerで実行タイミングを制御する、といった形で、全体としてストリーム処理を構成します。

RxJSの構成要素とデータフロー

※ 各構成要素の詳細な使い方や例については、それぞれの専用章で個別に解説します。

構成クラス図

RxJSの利点

利点内容
宣言的コード[4]map, filter などで「何をしたいか」を記述し、forループなどの手続き的記述を避けられる
非同期処理の単純化Promise やコールバックのネストを避け、直感的な流れで書ける
エラー処理.pipe(catchError(...)) などでストリーム中のエラーを統一的に処理可能
キャンセル可能Subscription.unsubscribe() によりストリームの中断が可能
多様なオペレーターdebounceTime, mergeMap, combineLatest など多数の演算子で変換や合成が可能

ユースケース

RxJSは「時間とともに変化するデータ」を扱うあらゆる場面で活躍します。以下に主要な活用分野を紹介します。

リアルタイム通信・ストリーミング

WebSocketやServer-Sent Events(SSE)などのリアルタイム通信を扱う場合、RxJSは特に強力です。

用途説明主要なオペレーター
WebSocket通信チャット、通知、株価更新などwebSocket, filter, map
Server-Sent Eventsサーバーからのプッシュ通知fromEvent, retry
IoTセンサー監視連続的なセンサーデータの処理debounceTime, distinctUntilChanged

簡単な例

ts
import { webSocket } from 'rxjs/webSocket';
import { filter } from 'rxjs';

const socket$ = webSocket('wss://example.com/chat');

socket$.pipe(
  filter(msg => msg.type === 'message')
).subscribe(msg => console.log('新着:', msg.text));

UI/状態管理・フォーム制御

ユーザー入力や状態の変化をリアクティブに扱うことができます。

用途説明主要なオペレーター
入力フォームの制御検索補完、リアルタイムバリデーションdebounceTime, distinctUntilChanged, switchMap
複数フォーム項目の連携依存する入力項目の更新combineLatest, withLatestFrom
コンポーネント間通信イベントバスやカスタム状態管理Subject, share
UIイベント処理クリック、スクロール、ドラッグ&ドロップfromEvent, takeUntil

簡単な例

ts
import { fromEvent, combineLatest } from 'rxjs';
import { debounceTime, map, switchMap } from 'rxjs';

const searchInput = document.querySelector('#search') as HTMLInputElement;
const sortSelect = document.querySelector('#sort') as HTMLInputElement;

const search$ = fromEvent(searchInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value)
);

const sort$ = fromEvent(sortSelect, 'change').pipe(
  map(e => (e.target as HTMLSelectElement).value)
);

combineLatest([search$, sort$]).pipe(
  debounceTime(300),
  switchMap(([query, order]) =>
    fetch(`/api/search?q=${query}&sort=${order}`).then(r => r.json())
  )
).subscribe(results => console.log(results));

オフライン対応・PWA

Progressive Web App(PWA)でのオフライン対応やネットワーク状態管理に活用できます。

用途説明主要なオペレーター
ネットワーク状態監視オンライン/オフライン検出fromEvent, merge
オフライン時の再試行接続復帰時の自動再同期retry, retryWhen
キャッシュ制御Service Workerとの連携switchMap, catchError

簡単な例

ts
import { fromEvent, merge } from 'rxjs';
import { map, startWith } from 'rxjs';

const online$ = fromEvent(window, 'online').pipe(map(() => true));
const offline$ = fromEvent(window, 'offline').pipe(map(() => false));

merge(online$, offline$).pipe(
  startWith(navigator.onLine)
).subscribe(isOnline => {
  console.log(isOnline ? 'オンライン' : 'オフライン');
});

AI/ストリーミングAPI

OpenAIなどのストリーミングAPIレスポンスを扱う場合にも最適です。

用途説明主要なオペレーター
トークン逐次出力AIレスポンスのリアルタイム表示concatMap, scan
ストリーミング処理Server-Sent Eventsの処理fromEvent, map
バックエンド統合NestJS(RxJS標準搭載)での利用各種オペレーター

HTTP通信とエラー処理

非同期HTTP通信をエレガントに扱えます。

用途説明主要なオペレーター
APIリクエストRESTful APIとの通信switchMap, mergeMap
エラーハンドリングリトライやフォールバックcatchError, retry
タイムアウト制御応答時間の制限timeout
キャンセル不要なリクエストの中断takeUntil, unsubscribe()

状態管理・アーキテクチャ

アプリケーション全体のアーキテクチャ設計にも活用できます。

用途説明主要なオペレーター
状態管理ライブラリNgRx、Redux-Observableなどscan, share
イベントフロー管理DDD(ドメイン駆動設計)での活用Subject, shareReplay
データレイヤー分離クリーンアーキテクチャ各種オペレーター

TIP

PromiseとRxJSの使い分けについては、PromiseとRxJSの違いも参照してください。

まとめ

RxJSは、非同期およびイベントベースのプログラミングに対する強力なアプローチを提供します。Observableを中心としたデータストリームの考え方は、複雑な非同期処理を扱う際に特に役立ちます。


  1. 実装としては Subscriber クラスが使われます。詳しくは Observer と Subscriber の違い を参照してください。 ↩︎

  2. Subjectは、値を発行するObservableであると同時に、値を受け取るObserverとしても振る舞える特殊な存在です。 ↩︎

  3. Schedulerは、非同期処理の実行タイミングやコンテキストを制御するために使われ、デバッグやパフォーマンス管理にも役立ちます。 ↩︎

    • 宣言的コード: 「どういう結果が欲しいのか」を素直に書くコード
    • 手続き的コード: 「どういう計算を行っていけば欲しい結果が手に入るのか」を書くコード
    ↩︎

Released under the CC-BY-4.0 license.