Controle van asynchrone verwerking
Schedulers in RxJS zijn een belangrijk mechanisme voor het controleren van de timing en uitvoeringscontext van asynchrone verwerking. In dit hoofdstuk leggen we uit hoe je asynchrone verwerking kunt controleren met behulp van schedulers.
De rol van schedulers
Schedulers vervullen drie belangrijke rollen:
| Rol | Beschrijving |
|---|---|
| Controle van uitvoeringstiming | Bepalen wanneer taken worden uitgevoerd |
| Beheer van uitvoeringscontext | Bepalen in welke thread of uitvoeringsomgeving taken worden uitgevoerd |
| Taakprioritering | Beheren van de uitvoeringsvolgorde van meerdere taken |
Begrip van synchrone en asynchrone verwerking
Standaardgedrag (synchrone uitvoering)
RxJS-operators worden standaard waar mogelijk synchron uitgevoerd.
import { of } from 'rxjs';
import { map } from 'rxjs';
console.log('Start uitvoering');
of(1, 2, 3)
.pipe(
map((x) => {
console.log(`map: ${x}`);
return x * 2;
})
)
.subscribe((x) => console.log(`subscribe: ${x}`));
console.log('Einde uitvoering');
// Output:
// Start uitvoering
// map: 1
// subscribe: 2
// map: 2
// subscribe: 4
// map: 3
// subscribe: 6
// Einde uitvoeringAsynchroon maken met schedulers
Door schedulers te gebruiken, kun je verwerking asynchroon maken.
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('Start uitvoering');
of(1, 2, 3)
.pipe(
observeOn(asyncScheduler)
)
.subscribe(x => console.log(`subscribe: ${x}`));
console.log('Einde uitvoering');
// Output:
// Start uitvoering
// Einde uitvoering
// subscribe: 1
// subscribe: 2
// subscribe: 3Operators die schedulers gebruiken
observeOn operator
observeOn wijzigt de uitvoeringscontext van de stream. Het laat waarden uitgeven met de opgegeven scheduler.
import { interval, animationFrameScheduler } from 'rxjs';
import { take, observeOn } from 'rxjs';
// Gebruiksvoorbeeld voor animatie
interval(16)
.pipe(
take(10),
observeOn(animationFrameScheduler)
)
.subscribe(() => {
// Uitvoeren gesynchroniseerd met animatieframe
updateAnimation();
});
function updateAnimation() {
// Animatie-update verwerking
}TIP
Voor een gedetailleerde uitleg, praktische voorbeelden en belangrijke aandachtspunten van de observeOn operator, zie de observeOn operator pagina.
subscribeOn operator
subscribeOn controleert de timing van het starten van de stream-subscriptie.
import { of, asyncScheduler } from 'rxjs';
import { subscribeOn, tap } from 'rxjs';
console.log('Voor subscribe start');
of('Taak uitvoeren')
.pipe(
tap(() => console.log('Taak start')),
subscribeOn(asyncScheduler)
)
.subscribe(value => console.log(value));
console.log('Na subscribe start');
// Output:
// Voor subscribe start
// Na subscribe start
// Taak start
// Taak uitvoerenTIP
Voor een gedetailleerde uitleg, praktische voorbeelden en het verschil met observeOn, zie de subscribeOn operator pagina.
Praktische voorbeelden van asynchrone verwerking
API-verzoek controle
import { from, queueScheduler } from 'rxjs';
import { mergeMap, observeOn, tap } from 'rxjs';
interface ApiRequest {
endpoint: string;
id: number;
}
const requests: ApiRequest[] = [
{ endpoint: '/users', id: 1 },
{ endpoint: '/posts', id: 1 },
{ endpoint: '/comments', id: 1 },
];
// Verzoeken in wachtrij plaatsen en op volgorde verwerken
from(requests)
.pipe(
observeOn(queueScheduler),
tap((req) => console.log(`Toegevoegd aan wachtrij: ${req.endpoint}`)),
mergeMap(
(req) =>
// Simulatie van daadwerkelijk API-verzoek
new Promise((resolve) => {
setTimeout(() => {
resolve(`Resultaat van ${req.endpoint}/${req.id}`);
}, 1000);
})
)
)
.subscribe((result) => console.log(`Voltooid: ${result}`));
// Output:
// Toegevoegd aan wachtrij: /users
// Toegevoegd aan wachtrij: /posts
// Toegevoegd aan wachtrij: /comments
// Voltooid: Resultaat van /users/1
// Voltooid: Resultaat van /posts/1
// Voltooid: Resultaat van /comments/1Voorkomen van UI-thread blokkering
Bij het verwerken van grote hoeveelheden data gebruik je schedulers om te voorkomen dat de UI-thread wordt geblokkeerd.
import { from, asapScheduler } from 'rxjs';
import { observeOn, bufferCount } from 'rxjs';
const largeDataSet = Array.from({ length: 10000 }, (_, i) => i);
// Batch-grootte
const batchSize = 100;
// Totaal aantal batches berekenen
const totalBatches = Math.ceil(largeDataSet.length / batchSize);
// Batch teller
let batchIndex = 0;
from(largeDataSet)
.pipe(
bufferCount(100), // Groepeer per 100 items
observeOn(asapScheduler) // Zo snel mogelijk, maar zonder UI te blokkeren
)
.subscribe((batch) => {
batchIndex++;
processBatch(batch, batchIndex, totalBatches);
});
function processBatch(
batch: number[],
batchIndex: number,
totalBatches: number
) {
// Batch data verwerking
const processed = batch.map((n) => n * 2);
console.log(
`Batch ${batchIndex} van ${totalBatches} voltooid: ${processed.length} items verwerkt.`
);
}
// Output:
// Batch 1 van 100 voltooid: 100 items verwerkt.
// Batch 2 van 100 voltooid: 100 items verwerkt.
// ...
// ...
// Batch 100 van 100 voltooid: 100 items verwerkt.Prestatie-optimalisatie en debugging
Testen met schedulers
import { TestScheduler } from 'rxjs/testing';
import { delay } from 'rxjs';
import { beforeEach, describe, expect, it } from 'vitest';
describe('Test van asynchrone verwerking', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('Test van delay operator', () => {
scheduler.run(({ cold, expectObservable }) => {
const source = cold('a-b-c|');
const expected = '1000ms a-b-(c|)';
const result = source.pipe(delay(1000, scheduler));
expectObservable(result).toBe(expected);
});
});
});Log-output voor debugging
import { of, asyncScheduler } from 'rxjs';
import { tap, observeOn } from 'rxjs';
console.log('Start');
of(1, 2, 3)
.pipe(
tap(value => console.log(`[Voor scheduler・synchroon] Waarde: ${value}`)),
observeOn(asyncScheduler), // Gebruik asyncScheduler
tap(value => console.log(`[Na scheduler・asynchroon] Waarde: ${value}`))
)
.subscribe();
console.log('Einde');
// Daadwerkelijke output:
// Start
// [Voor scheduler・synchroon] Waarde: 1
// [Voor scheduler・synchroon] Waarde: 2
// [Voor scheduler・synchroon] Waarde: 3
// Einde
// [Na scheduler・asynchroon] Waarde: 1
// [Na scheduler・asynchroon] Waarde: 2
// [Na scheduler・asynchroon] Waarde: 3Bij gebruik van asyncScheduler kun je het verwachte asynchrone gedrag bevestigen. queueScheduler gebruikt de microtask queue en wordt daarom uitgevoerd tijdens de uitvoering van synchrone code, terwijl asyncScheduler intern setTimeout gebruikt en dus volledig asynchroon wordt uitgevoerd.
Voorbeeld dat verschillen in scheduler-gedrag toont
Dit voorbeeld toont de verschillen in uitvoeringstiming tussen verschillende schedulers.
import { of, queueScheduler, asyncScheduler, asapScheduler } from 'rxjs';
import { observeOn } from 'rxjs';
console.log('1: Start');
// Synchrone verwerking
of('sync').subscribe(value => console.log(`2: ${value}`));
// queueScheduler (microtask)
of('queue')
.pipe(observeOn(queueScheduler))
.subscribe(value => console.log(`3: ${value}`));
// asapScheduler (microtask)
of('asap')
.pipe(observeOn(asapScheduler))
.subscribe(value => console.log(`4: ${value}`));
// asyncScheduler (macrotask)
of('async')
.pipe(observeOn(asyncScheduler))
.subscribe(value => console.log(`5: ${value}`));
Promise.resolve().then(() => console.log('6: Promise'));
console.log('7: Einde');
// Daadwerkelijke uitvoervolgorde:
// 1: Start
// 2: sync
// 7: Einde
// 3: queue
// 4: asap
// 6: Promise
// 5: asyncBest practices
Gebruik schedulers alleen wanneer nodig: Gebruik geen schedulers als het standaard synchrone gedrag voldoende is
Kies de juiste scheduler: Kies de optimale scheduler voor het gebruik
- Animatie:
animationFrameScheduler - Voorkomen UI-blokkering:
asapScheduler - Wachtrij-verwerking:
queueScheduler - Asynchrone verwerking:
asyncScheduler
- Animatie:
Prestaties monitoren: Monitor altijd de impact van scheduler-gebruik op prestaties
Zorg voor testbaarheid: Schrijf tests voor asynchrone verwerking met
TestScheduler
Veelgemaakte fouten en oplossingen
Overmatig asynchroon maken
// ❌ Onnodige asynchronisatie
of(1, 2, 3)
.pipe(
observeOn(asyncScheduler),
map(x => x * 2),
observeOn(asyncScheduler), // Dubbele asynchronisatie
filter(x => x > 3)
)
.subscribe();
// ✅ Alleen asynchroon maken waar nodig
of(1, 2, 3)
.pipe(
map(x => x * 2),
filter(x => x > 3),
observeOn(asyncScheduler) // Samen asynchroon maken op het einde
)
.subscribe();Verkeerd gebruik van schedulers
// ❌ Verkeerd gebruik
interval(1000)
.pipe(
subscribeOn(animationFrameScheduler) // Heeft geen effect op interval
)
.subscribe();
// ✅ Correct gebruik
interval(1000, animationFrameScheduler) // Specificeer scheduler bij creatie
.subscribe();Samenvatting
Schedulers zijn een krachtig hulpmiddel voor het nauwkeurig controleren van asynchrone verwerking in RxJS. Door ze correct te gebruiken, kun je prestatie-optimalisatie, het voorkomen van UI-thread blokkering en eenvoudiger testen realiseren. Echter, overmatig asynchroon maken kan prestaties juist verslechteren, dus het is belangrijk om ze alleen te gebruiken wanneer nodig.
In de volgende sectie leggen we de specifieke soorten schedulers en hun gebruik in detail uit.