Types of schedulers and how to use them
RxJS provides multiple schedulers for different applications. Each scheduler has its own execution timing and characteristics and can be used appropriately to optimize application performance and behavior.
Classification of schedulers
RxJS schedulers fall into three main categories.
- Macro Task: executed in the next task queue in the event loop
- Micro-task: executed immediately after the current task is completed and before the next task starts
- Synchronous processing: immediate execution
For more information, please refer to Task and Scheduler Basics for details.
Major schedulers
asyncScheduler
Features
- Internal implementation: uses setTimeout
- Execution timing: macro tasks
- Usage: General asynchronous processing, time-lapse processing
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('Asynchronous processing')
.pipe(observeOn(asyncScheduler))
.subscribe(value => console.log(`3: ${value}`));
console.log('2: End');
// Output:
// 1: Start
// 2: End
// 3: Asynchronous processingUse Cases
import { asyncScheduler, map, observeOn, of } from "rxjs";
function heavyComputation(value: number): number {
// Simulate heavy computation
let result = value;
for (let i = 0; i < 1000000; i++) {
result = Math.sin(result);
}
return result;
}
of(1, 2, 3)
.pipe(
observeOn(asyncScheduler),
map(value => heavyComputation(value))
)
.subscribe(result => {
console.log(`Calculation results: ${result}`);
});queueScheduler
Features
- Internal implementation: micro task queue
- Execution timing: within the current task (appears synchronous)
- Usage: Task queuing, recursion optimization
import { of, queueScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('Queue processing')
.pipe(observeOn(queueScheduler))
.subscribe(value => console.log(`2: ${value}`));
console.log('3: End');
// Output:
// 1: Start
// 2: Queue processing
// 3: EndUse Cases
import { Observable, of, queueScheduler } from 'rxjs';
import { observeOn, expand, take, map } from 'rxjs';
// Optimize recursive processing
function fibonacci(n: number): Observable<number> {
return of([0, 1]).pipe(
observeOn(queueScheduler),
expand(([a, b]) => of([b, a + b])),
map(([a]) => a),
take(n)
);
}
fibonacci(10).subscribe(value => console.log(value));asapScheduler
Features
- Internal implementation: Promise.resolve().then() or setImmediate
- Execution timing: microtasks
- Use: For asynchronous execution as soon as possible
import { of, asapScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('ASAPProcessing')
.pipe(observeOn(asapScheduler))
.subscribe(value => console.log(`3: ${value}`));
console.log('2: End');
// Output:
// 1: Start
// 2: End
// 3: ASAPProcessingUse Cases
import { fromEvent, asapScheduler } from 'rxjs';
import { observeOn, map } from 'rxjs';
// Optimization of mouse movement events
fromEvent(document, 'mousemove')
.pipe(
observeOn(asapScheduler),
map(event => ({
x: (event as MouseEvent).clientX,
y: (event as MouseEvent).clientY
}))
)
.subscribe(position => {
// UIUpdate processing of
updateCursor(position);
});animationFrameScheduler
Features
- Internal implementation: requestAnimationFrame
- Execution timing: before next screen rendering
- Use: Animation, drawing process for 60fps
Example of a simple rotation animation
import { animationFrameScheduler, interval } from 'rxjs';
import { take, map } from 'rxjs';
// HTMLCreating elements
const box = document.createElement('div');
box.style.width = '100px';
box.style.height = '100px';
box.style.backgroundColor = 'blue';
box.style.position = 'absolute';
box.style.top = '100px';
box.style.left = '100px';
document.body.appendChild(box);
// Setting up animations
let rotation = 0;
// 60fpsIn2Seconds of animation
interval(0, animationFrameScheduler)
.pipe(
take(120), // 60fps × 2Seconds = 120Frames
map(() => {
rotation += 3; // 1Every frame3Degrees of rotation
return rotation;
})
)
.subscribe(angle => {
// DOMActual rotation of element
box.style.transform = `rotate(${angle}deg)`;
});Why animationFrameScheduler is needed
The animationFrameScheduler performs synchronously with the browser's drawing cycle, which offers the following advantages
- Smooth Animation: Because processing is performed in sync with the browser's rendering timing (typically 60 fps), you can achieve smooth animations with no choppiness. 2.
- Efficient resource use: When the browser deactivates the tab, the execution of requestAnimationFrame is automatically paused to avoid unnecessary CPU usage. 3.
- Anti-flickering: Ensures computation is completed before the screen is drawn, preventing screen flickering and displaying incomplete frames.
The following is a comparison of setInterval and animationFrameScheduler.
import { animationFrameScheduler, interval, map } from "rxjs";
// ❌ setIntervalInefficient animation using
let position = 0;
const intervalId = setInterval(() => {
position += 1;
element.style.transform = `translateX(${position}px)`;
}, 16); // Approx.60fps
// Problems:
// - Out of sync with browser rendering timing
// - Continues to run even in background tabs
// - Cannot guarantee accurate60fpscannot be guaranteed
// ✅ animationFrameSchedulerEfficient animation using
interval(0, animationFrameScheduler)
.pipe(
map(() => {
position += 1;
return position;
})
)
.subscribe(pos => {
element.style.transform = `translateX(${pos}px)`;
});
// Advantages
// - Synchronized with browser rendering timing
// - Automatically paused in background tabs
// - Stable60fpsStable animationsExample of mouse-following animation
import { fromEvent, animationFrameScheduler, interval } from 'rxjs';
import { withLatestFrom, observeOn, map } from 'rxjs';
// Create circles that follow
const circle = document.createElement('div');
circle.style.width = '30px';
circle.style.height = '30px';
circle.style.borderRadius = '50%';
circle.style.backgroundColor = 'red';
circle.style.position = 'fixed';
circle.style.pointerEvents = 'none'; // Transparent to mouse events
document.body.appendChild(circle);
// Current and target positions
let currentX = 0;
let currentY = 0;
let targetX = 0;
let targetY = 0;
// Monitor mouse movement events
const mouseMove$ = fromEvent<MouseEvent>(document, 'mousemove')
.pipe(
map(event => ({
x: event.clientX,
y: event.clientY
}))
);
// Animation loop
interval(0, animationFrameScheduler)
.pipe(
withLatestFrom(mouseMove$),
map(([_, mousePos]) => mousePos)
)
.subscribe(({ x, y }) => {
// Set mouse position as target
targetX = x;
targetY = y;
// Gradual movement (easing) from current position to target position々Gradual movement from current position to target position (easing)
currentX += (targetX - currentX) * 0.1;
currentY += (targetY - currentY) * 0.1;
// DOMUpdate element
circle.style.left = `${currentX - 15}px`; // Adjust to center position
circle.style.top = `${currentY - 15}px`;
});Guide to using the scheduler
Comparison by execution timing
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('Asynchronous processing')
.pipe(observeOn(asyncScheduler))
.subscribe(value => console.log(`3: ${value}`));
console.log('2: End');
// Output:
// 1: Start
// 2: End
// 3: Asynchronous processingSelection Criteria by Application
Practical use examples
Processing large amounts of data
import { from, queueScheduler } from 'rxjs';
import { mergeMap, observeOn, tap } from 'rxjs';
interface ApiRequest {
endpoint: string;
id: number;
}
const requests: ApiRequest[] = [
{ endpoint: '/users', id: 1 },
{ endpoint: '/posts', id: 1 },
{ endpoint: '/comments', id: 1 },
];
// リクエストをキューに入れて順番に処理
from(requests)
.pipe(
observeOn(queueScheduler),
tap((req) => console.log(`キューに追加: ${req.endpoint}`)),
mergeMap(
(req) =>
// 実際のAPIリクエストのシミュレーション
new Promise((resolve) => {
setTimeout(() => {
resolve(`${req.endpoint}/${req.id} のresult`);
}, 1000);
})
)
)
.subscribe((result) => console.log(`completed: ${result}`));Scheduler utilization with exponential backoff
For more advanced control, exponential backoff can be implemented by combining retryWhen and asyncScheduler.
import { throwError, timer, of } from 'rxjs';
import { retry, mergeMap } from 'rxjs';
function fetchDataWithBackoff(id: number) {
return of(id).pipe(
mergeMap(() => {
const random = Math.random();
if (random > 0.9) {
return of({ id, data: 'success' });
}
return throwError(() => new Error('Temporary error'));
})
);
}
fetchDataWithBackoff(1)
.pipe(
// RxJS 7.3+ 推奨: retry({ count, delay }) 形式
retry({
count: 3, // Max.3回まで再試行
delay: (error, retryCount) => {
// 指数バックオフ: 1秒, 2秒, 4秒...
const delayTime = Math.pow(2, retryCount - 1) * 1000;
console.log(`🔄 リトライ ${retryCount}回目 (${delayTime}ms後)`);
// timer は内部的に asyncScheduler を使用
return timer(delayTime);
}
})
)
.subscribe({
next: result => console.log('✅ 成功:', result),
error: error => {
console.log('❌ Max.リトライ数に到達');
console.log('❌ 最終Error:', error.message);
}
});
// OutputEx.:
// 🔄 リトライ 1回目 (1000ms後)
// 🔄 リトライ 2回目 (2000ms後)
// 🔄 リトライ 3回目 (4000ms後)
// ❌ Max.リトライ数に到達
// ❌ 最終Error: Temporary errorWhen explicitly specifying an asyncScheduler
Explicitly specifying a specific scheduler allows for more flexible control, such as replacing it with TestScheduler during testing.
```ts
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('Asynchronous processing')
.pipe(observeOn(asyncScheduler))
.subscribe(value => console.log(`3: ${value}`));
console.log('2: End');
// Output:
// 1: Start
// 2: End
// 3: Asynchronous processingDetailed implementation patterns and debugging methods for retry processing are described in the retry and catchError page.
- Detailed usage of the retry operator
- Combination patterns with catchError
- Debugging techniques for retries (tracking the number of attempts, logging, etc.)
Performance Impact
Scheduler overhead
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
of('Asynchronous processing')
.pipe(observeOn(asyncScheduler))
.subscribe(value => console.log(`3: ${value}`));
console.log('2: End');
// Output:
// 1: Start
// 2: End
// 3: Asynchronous processingSummary
The choice of scheduler has a significant impact on the performance and responsiveness of an application. Understanding the characteristics of each scheduler and using them in appropriate situations will ensure efficient and smooth operation. As a general guideline,
- For general asynchronous processing, use
asyncScheduler. - queueScheduler` for recursive processing and synchronous queuing
asapSchedulerfor fast response times- animationFrameScheduler` for animation
for animation.