コールドObservableとホットObservable
RxJSを使用する上で重要な概念の一つが、「コールドObservable」と「ホットObservable」の区別です。この違いを理解することは、効率的なObservableの使い方を習得するために不可欠です。
なぜCold/Hotの理解が重要か
Cold/Hotの違いを理解していないと、以下のような問題に直面します。
- 意図しない重複実行 - API呼び出しが複数回実行される
- メモリリーク - 購読を適切に管理できない
- パフォーマンス問題 - 不要な処理が繰り返される
- データ不整合 - 期待したデータが受け取れない
Cold vs Hotの違い(比較表)
まず、全体像を把握しましょう。
| 比較項目 | Cold Observable | Hot Observable |
|---|---|---|
| 購読なしでの実行 | 実行されない(購読されて初めて実行) | 実行される(subscribeされなくとも値を流す) |
| データ発行タイミング | subscribe() された時に開始 | 発行側のタイミングで開始(購読と無関係) |
| 実行の再利用 | 毎回新たに実行される | 既存のストリームを複数で共有 |
| データの一貫性 | 各購読で独立した値を受け取る | 途中から購読すると過去の値を受け取れない |
| 主な使用例 | HTTPリクエスト、非同期処理 | UIイベント、WebSocket、リアルタイム通信 |
| 使用場面 | 各処理が独立している場合 | 状態共有、イベントブロードキャスト |
判断基準: 各購読者に対して処理を再実行すべきか?それともストリームを共有すべきか?
Cold vs Hotの判断基準
実際にObservableがColdかHotかを見分けるには、以下の基準で判断できます。
| 判断ポイント | Cold | Hot |
|---|---|---|
| 購読ごとに実行ロジックが再実行されるか? | ✅ 毎回再実行 | ❌ 実行を共有 |
| 購読前にデータが流れているか? | ❌ 購読されるまで待機 | ✅ 購読と無関係に流れる |
| 複数購読で同じデータを受け取るか? | ❌ 独立したデータ | ✅ 同じデータを共有 |
実践的な見分け方
以下のテストで簡単に判断できます。
const observable$ = /* 調べたいObservable */;
observable$.subscribe(/* 購読1 */);
observable$.subscribe(/* 購読2 */);
// ✅ Cold: Observable内のconsole.logが2回実行される
// (購読ごとに実行ロジックが再実行される)
// ✅ Hot: Observable内のconsole.logが1回だけ実行される
// (実行が共有される)具体例:
import { Observable, Subject } from 'rxjs';
// Cold Observable
const cold$ = new Observable(subscriber => {
console.log('Cold: 実行開始');
subscriber.next(Math.random());
});
cold$.subscribe(v => console.log('購読1:', v));
cold$.subscribe(v => console.log('購読2:', v));
// 出力:
// Cold: 実行開始 ← 1回目
// 購読1: 0.123...
// Cold: 実行開始 ← 2回目(再実行される)
// 購読2: 0.456...
// Hot Observable
const hot$ = new Subject();
hot$.subscribe(v => console.log('購読1:', v));
hot$.subscribe(v => console.log('購読2:', v));
hot$.next(1); // データ発行は1回だけ
// 出力:
// 購読1: 1
// 購読2: 1 ← 同じデータを共有Creation Function別 Cold/Hot分類表
すべての主要なCreation Functionについて、Cold/Hotを分類します。これにより、どの関数がどちらのObservableを生成するか一目で分かります。
| カテゴリ | Creation Function | Cold/Hot | 備考 |
|---|---|---|---|
| 基本作成系 | of() | ❄️ Cold | 購読ごとに値を再発行 |
from() | ❄️ Cold | 購読ごとに配列/Promiseを再実行 | |
fromEvent() | ❄️ Cold | 購読ごとに独立したリスナーを追加 [1] | |
interval() | ❄️ Cold | 購読ごとに独立したタイマー | |
timer() | ❄️ Cold | 購読ごとに独立したタイマー | |
| ループ生成系 | range() | ❄️ Cold | 購読ごとに範囲を再生成 |
generate() | ❄️ Cold | 購読ごとにループを再実行 | |
| HTTP通信系 | ajax() | ❄️ Cold | 購読ごとに新しいHTTPリクエスト |
fromFetch() | ❄️ Cold | 購読ごとに新しいFetchリクエスト | |
| 結合系 | concat() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2] |
merge() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:1] | |
combineLatest() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:2] | |
zip() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:3] | |
forkJoin() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:4] | |
| 選択・分割系 | race() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:5] |
partition() | ❄️ Cold | 元のObservableの性質を引き継ぐ [2:6] | |
| 条件分岐系 | iif() | ❄️ Cold | 条件により選択されたObservableの性質を引き継ぐ |
defer() | ❄️ Cold | 購読ごとにファクトリ関数を実行 | |
| 制御系 | scheduled() | ❄️ Cold | 元のObservableの性質を引き継ぐ |
using() | ❄️ Cold | 購読ごとにリソースを作成 | |
| Subject系 | new Subject() | 🔥 Hot | 常にHot |
new BehaviorSubject() | 🔥 Hot | 常にHot | |
new ReplaySubject() | 🔥 Hot | 常にHot | |
new AsyncSubject() | 🔥 Hot | 常にHot | |
| WebSocket | webSocket() | 🔥 Hot | WebSocket接続を共有 |
重要な原則
ほぼすべてのCreation FunctionはColdを生成します。 Hotを生成するのは、
- Subject系(Subject, BehaviorSubject, ReplaySubject, AsyncSubject)
- webSocket()
のみです。
コールドObservable
特徴
- 購読されるたびに新しいデータストリームが作成される
- 購読されるまでデータの発行を開始しない(遅延実行)
- すべてのサブスクライバーは、Observableの最初から全データを受け取る
Cold Observableは、subscribeするたびに新しい実行コンテキストが生成されます。 これはHTTPリクエストや非同期処理など、毎回新しい処理が必要な場合に適しています。
コード例
import { Observable } from 'rxjs';
// コールドObservableの例
const cold$ = new Observable<number>(subscriber => {
console.log('データソースの作成 - 新しい購読');
const randomValue = Math.random();
subscriber.next(randomValue);
subscriber.complete();
});
// 1回目の購読
console.log('--- 1回目の購読 ---');
cold$.subscribe(value => console.log('購読者1:', value));
// 2回目の購読(異なるデータが生成される)
console.log('--- 2回目の購読 ---');
cold$.subscribe(value => console.log('購読者2:', value));実行結果
--- 1回目の購読 ---
データソースの作成 - 新しい購読
購読者1: 0.259632...
--- 2回目の購読 ---
データソースの作成 - 新しい購読 ← 再実行される
購読者2: 0.744322... ← 異なる値重要なポイント
購読するたびに「データソースの作成」が実行され、異なる値が生成されます。
よくあるコールドObservable(見分け方)
以下のObservableは通常Coldです。
import { of, from, interval, timer } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Creation Functions
of(1, 2, 3) // Cold
from([1, 2, 3]) // Cold
from(fetch('/api/data')) // Cold
// 時間オペレーター
interval(1000) // Cold
timer(1000) // Cold
// HTTP リクエスト
ajax('/api/users') // Coldルール
Creation Functions、時間オペレーター、HTTPリクエストは基本的にCold
ホットObservable
特徴
- subscribeされなくとも値を流す(購読の有無に関わらず実行される)
- 購読を開始した時点以降のデータのみを受け取る
- 一つのデータソースが複数のサブスクライバーに共有される
Hot Observableは、ストリームの発行タイミングが購読とは無関係であり、購読者は途中から参加することになります。
コード例
import { Subject } from 'rxjs';
// ホットObservableの例(Subject使用)
const hot$ = new Subject<number>();
// 最初の購読
console.log('--- 購読者1 開始 ---');
hot$.subscribe(value => console.log('購読者1:', value));
// データ発行
hot$.next(1);
hot$.next(2);
// 2番目の購読(後からの購読)
console.log('--- 購読者2 開始 ---');
hot$.subscribe(value => console.log('購読者2:', value));
// さらにデータ発行
hot$.next(3);
hot$.next(4);
hot$.complete();実行結果
--- 購読者1 開始 ---
購読者1: 1
購読者1: 2
--- 購読者2 開始 ---
購読者1: 3
購読者2: 3 ← 購読2は3から参加(1, 2は受け取れない)
購読者1: 4
購読者2: 4重要なポイント
購読者2は途中から参加したため、過去の値(1, 2)は受け取れません。
よくあるホットObservable(見分け方)
以下のObservableは常にHotです。
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
// Subject系(常にHot)
new Subject() // Hot
new BehaviorSubject(0) // Hot
new ReplaySubject(1) // Hot
// WebSocket(常にHot)
webSocket('ws://localhost:8080') // Hotルール
Hotを生成するのはSubject系とwebSocket()のみ
fromEvent()はColdです
fromEvent(button, 'click') はHotと誤解されやすいですが、実際はColdです。各購読ごとに独立したイベントリスナーを追加します。イベント自体は購読と無関係に発生しますが、各購読者は独立したリスナーを持ちます。
コールドObservableをホットに変換する方法
RxJSでは、Cold ObservableをHotに変換する手段として主に以下が使われます。
share()- 簡易ホット化(推奨)shareReplay()- 過去の値をキャッシュしてホット化- 非推奨(RxJS v7で非推奨、v8で削除)multicast()
share()オペレーター
share()は、コールドObservableをホットObservableに変換する最も一般的な方法です。
import { interval } from 'rxjs';
import { share, take } from 'rxjs';
// HTTP呼び出しをシミュレート
const makeHttpRequest = () => {
console.log('HTTP呼び出し実行!');
return interval(1000).pipe(take(3));
};
// ❌ コールドObservable(共有なし)
const cold$ = makeHttpRequest();
cold$.subscribe(val => console.log('購読者1:', val));
cold$.subscribe(val => console.log('購読者2:', val));
// → HTTP呼び出しが2回実行される
// ✅ ホットObservable(share使用)
const shared$ = makeHttpRequest().pipe(share());
shared$.subscribe(val => console.log('共有購読者1:', val));
shared$.subscribe(val => console.log('共有購読者2:', val));
// → HTTP呼び出しは1回だけ、結果を共有実行結果(Cold):
HTTP呼び出し実行! ← 1回目
購読者1: 0
HTTP呼び出し実行! ← 2回目(重複!)
購読者2: 0
...実行結果(Hot):
HTTP呼び出し実行! ← 1回だけ
共有購読者1: 0
共有購読者2: 0 ← 同じストリームを共有
...ユースケース
- 複数コンポーネントで同じAPI結果を使う
- 副作用(HTTP呼び出しなど)の重複を避ける
shareReplay()オペレーター
shareReplay()はshare()の拡張版で、過去の値をキャッシュして新しいサブスクライバーに再生します。
import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs';
const request$ = interval(1000).pipe(
take(3),
shareReplay(2) // 最後の2つの値をキャッシュ
);
// 1回目の購読
request$.subscribe(val => console.log('購読者1:', val));
// 3.5秒後に2回目の購読(ストリーム完了後)
setTimeout(() => {
console.log('--- 購読者2開始(完了後) ---');
request$.subscribe(val => console.log('購読者2:', val));
}, 3500);実行結果
購読者1: 0
購読者1: 1
購読者1: 2
--- 購読者2開始(完了後) ---
購読者2: 1 ← キャッシュされた値(最後の2つ)
購読者2: 2 ← キャッシュされた値ユースケース
- API結果のキャッシュ
- 初期状態の共有(最新の1件のみキャッシュ)
- 遅延購読者への過去データ提供
shareReplayの注意点
shareReplay()は購読が0になってもキャッシュを保持し続けるため、メモリリークの原因になることがあります。詳しくは Chapter 10: shareReplayの誤用 を参照してください。
multicast()について
NOTE
multicast() は柔軟ですが、RxJS v7で非推奨となり、v8で削除されました。現在は share() や shareReplay() を使用してください。詳しくは share()オペレーター解説 を参照してください。
実践的な例: APIキャッシュサービス
実際のアプリケーションでよくあるパターン:複数のコンポーネントが同じAPIデータを必要とする場合。
import { Observable, of, throwError } from 'rxjs';
import { catchError, shareReplay, delay, tap } from 'rxjs';
// シンプルなキャッシュサービス
class UserService {
private cache$: Observable<User[]> | null = null;
getUsers(): Observable<User[]> {
// キャッシュがあればそれを返す
if (this.cache$) {
console.log('キャッシュから返却');
return this.cache$;
}
// 新しいリクエストを作成しキャッシュ
console.log('新規リクエスト実行');
this.cache$ = this.fetchUsersFromAPI().pipe(
catchError(err => {
this.cache$ = null; // エラー時はキャッシュをクリア
return throwError(() => err);
}),
shareReplay(1) // 最後の結果をキャッシュ
);
return this.cache$;
}
private fetchUsersFromAPI(): Observable<User[]> {
// 実際のAPIリクエストをシミュレート
return of([
{ id: 1, name: '山田太郎' },
{ id: 2, name: '佐藤花子' }
]).pipe(
delay(1000),
tap(() => console.log('APIからデータ受信'))
);
}
clearCache(): void {
this.cache$ = null;
console.log('キャッシュクリア');
}
}
interface User {
id: number;
name: string;
}
// 使用例
const userService = new UserService();
// コンポーネント1: データを要求
userService.getUsers().subscribe(users =>
console.log('コンポーネント1:', users)
);
// コンポーネント2: 2秒後にデータを要求
setTimeout(() => {
userService.getUsers().subscribe(users =>
console.log('コンポーネント2:', users)
);
}, 2000);
// キャッシュをクリアして再度要求
setTimeout(() => {
userService.clearCache();
userService.getUsers().subscribe(users =>
console.log('コンポーネント3:', users)
);
}, 4000);実行結果
新規リクエスト実行
APIからデータ受信
コンポーネント1: [{id: 1, name: '山田太郎'}, {id: 2, name: '佐藤花子'}]
キャッシュから返却 ← API呼び出しなし
コンポーネント2: [{id: 1, name: '山田太郎'}, {id: 2, name: '佐藤花子'}]
キャッシュクリア
新規リクエスト実行 ← 再度API呼び出し
APIからデータ受信
コンポーネント3: [{id: 1, name: '山田太郎'}, {id: 2, name: '佐藤花子'}]ポイント:
shareReplay(1)で最後のレスポンスをキャッシュ- 複数のコンポーネントがデータを共有(API呼び出しは1回のみ)
- エラー時やクリア時は適切にキャッシュを破棄
使用するタイミング
まとめ
コールドObservableとホットObservableを理解し、適切に使い分けることは、効率的なRxJSアプリケーションを構築するための重要なスキルです。
重要なポイント
- Cold Observable: 購読されて初めて動き出すストリーム(購読ごとに独立実行)
- Hot Observable: すでに動いているストリームを共有(複数購読で同じ実行)
- share(): ColdをHotに変換する最も簡単な方法
- shareReplay(): 過去の値をキャッシュしてHotに変換(API結果の共有に便利)
設計判断の基準
- 複数のサブスクライバー間でデータを共有する必要があるか?
- 過去の値をキャッシュし、新しいサブスクライバーに提供する必要があるか?
- 副作用(HTTPリクエストなど)の重複をどのように管理するか?
これらの考慮事項を元に、適切なObservableの種類とオペレーターを選択することで、効率的で堅牢なリアクティブアプリケーションを構築できます。
関連セクション
- share()オペレーター - share()の詳細解説
- shareReplayの誤用 - よくある間違いと対処法
- Subject - HotなSubjectの理解