Mécanisme de Multicasting
Le multicasting est une technique permettant de diffuser efficacement un flux de données d'un Observable à plusieurs souscripteurs (Observers). Dans RxJS, cela peut être réalisé via des Subject ou des opérateurs.
Qu'est-ce que le Multicasting
Un Observable ordinaire (Cold Observable) crée un nouveau flux de données chaque fois qu'il est souscrit. Cela signifie que lorsqu'il y a plusieurs souscripteurs, le même traitement est exécuté plusieurs fois.
L'utilisation du multicasting permet d'exécuter la source de données une seule fois et de distribuer le résultat à plusieurs souscripteurs. Ceci est particulièrement important dans les cas suivants :
- Ne pas vouloir appeler des requêtes HTTP/API en double
- Vouloir exécuter une seule fois des opérations coûteuses (calculs ou effets de bord)
- Partager l'état de l'application entre plusieurs composants
Patterns de base du Multicasting
Multicast de base utilisant un Subject
import { Observable, Subject } from 'rxjs';
import { tap } from 'rxjs';
// Source de données (Cold Observable)
function createDataSource(): Observable<number> {
return new Observable<number>(observer => {
console.log('Source de données : connexion');
// Logique de génération de données (opération coûteuse supposée)
const id = setInterval(() => {
const value = Math.round(Math.random() * 100);
console.log(`Source de données : génération de valeur -> ${value}`);
observer.next(value);
}, 1000);
// Fonction de nettoyage
return () => {
console.log('Source de données : déconnexion');
clearInterval(id);
};
});
}
// Implémentation du multicast
function multicast() {
// Source de données originale
const source$ = createDataSource().pipe(
tap(value => console.log(`Traitement source : ${value}`))
);
// Subject pour le multicasting
const subject = new Subject<number>();
// Connecter la source au Subject
const subscription = source$.subscribe(subject);
// Plusieurs souscripteurs s'abonnent au Subject
console.log('Observer 1 début de souscription');
const subscription1 = subject.subscribe(value => console.log(`Observer 1: ${value}`));
// Ajouter un autre souscripteur après 3 secondes
setTimeout(() => {
console.log('Observer 2 début de souscription');
const subscription2 = subject.subscribe(value => console.log(`Observer 2: ${value}`));
// Terminer toutes les souscriptions après 5 secondes
setTimeout(() => {
console.log('Fin de toutes les souscriptions');
subscription.unsubscribe();
subscription1.unsubscribe();
subscription2.unsubscribe();
}, 5000);
}, 3000);
}
// Exécution
multicast();Résultat d'exécution
Source de données : connexion
Observer 1 début de souscription
Source de données : génération de valeur -> 71
Traitement source : 71
Observer 1: 71
Source de données : génération de valeur -> 79
Traitement source : 79
Observer 1: 79
Source de données : génération de valeur -> 63
Traitement source : 63
Observer 1: 63
Observer 2 début de souscription
Source de données : génération de valeur -> 49
Traitement source : 49
Observer 1: 49
Observer 2: 49
Source de données : génération de valeur -> 94
Traitement source : 94
Observer 1: 94
Observer 2: 94
Source de données : génération de valeur -> 89
Traitement source : 89
Observer 1: 89
Observer 2: 89
Source de données : génération de valeur -> 10
Traitement source : 10
Observer 1: 10
Observer 2: 10
Source de données : génération de valeur -> 68
Traitement source : 68
Observer 1: 68
Observer 2: 68
Fin de toutes les souscriptions
Source de données : déconnexionOpérateurs de Multicast
RxJS fournit des opérateurs dédiés pour implémenter le multicasting.
Opérateur share()
📘 Documentation officielle RxJS : share()
L'opérateur le plus simple pour implémenter le multicast. En interne, il combine multicast() et refCount().
import { interval } from 'rxjs';
import { take, share, tap } from 'rxjs';
// Observable qui compte par intervalles
const source$ = interval(1000).pipe(
take(5),
tap(value => console.log(`Source : ${value}`)),
share() // Activer le multicasting
);
// Premier souscripteur
console.log('Observer 1 début de souscription');
const subscription1 = source$.subscribe(value => console.log(`Observer 1: ${value}`));
// Ajouter un deuxième souscripteur après 2.5 secondes
setTimeout(() => {
console.log('Observer 2 début de souscription');
const subscription2 = source$.subscribe(value => console.log(`Observer 2: ${value}`));
// Désabonner le souscripteur 1 après 5 secondes
setTimeout(() => {
console.log('Observer 1 désabonnement');
subscription1.unsubscribe();
}, 2500);
}, 2500);Résultat d'exécution
Observer 1 début de souscription
Source : 0
Observer 1: 0
Observer 2 début de souscription
Source : 1
Observer 1: 1
Observer 2: 1
Source : 2
Observer 1: 2
Observer 2: 2
Source : 3
Observer 1: 3
Observer 2: 3
Observer 1 désabonnement
Source : 4
Observer 2: 4Contrôle détaillé de share()
Au lieu de refCount(), depuis RxJS 7, vous pouvez contrôler le comportement plus clairement en passant des options à share().
import { interval } from 'rxjs';
import { take, share, tap } from 'rxjs';
const source$ = interval(1000).pipe(
take(6),
tap((value) => console.log(`Source : ${value}`)),
share({
resetOnError: true,
resetOnComplete: true,
resetOnRefCountZero: true,
})
);
// Premier souscripteur
console.log('Observer 1 début de souscription');
const subscription1 = source$.subscribe((value) =>
console.log(`Observer 1: ${value}`)
);
// Ajouter un deuxième souscripteur après 2.5 secondes
setTimeout(() => {
console.log('Observer 2 début de souscription');
const subscription2 = source$.subscribe((value) =>
console.log(`Observer 2: ${value}`)
);
setTimeout(() => {
console.log('Observer 1 désabonnement');
subscription1.unsubscribe();
}, 1500);
}, 2500);Résultat d'exécution
Observer 1 début de souscription
Source : 0
Observer 1: 0
Source : 1
Observer 1: 1
Observer 2 début de souscription
Source : 2
Observer 1: 2
Observer 2: 2
Source : 3
Observer 1: 3
Observer 2: 3
Observer 1 désabonnement
Source : 4
Observer 2: 4
Source : 5
Observer 2: 5Cette méthode permet de contrôler clairement le comportement lors de la fin du flux ou lorsque le nombre de souscripteurs atteint zéro.
Opérateur shareReplay()
📘 Documentation officielle RxJS : shareReplay()
Similaire à share(), mais mémorise un nombre spécifié de valeurs passées et les fournit également aux souscripteurs qui arrivent plus tard.
import { interval } from 'rxjs';
import { take, shareReplay, tap } from 'rxjs';
// Utilisation de shareReplay (taille de buffer 2)
const source$ = interval(1000).pipe(
take(5),
tap(value => console.log(`Source : ${value}`)),
shareReplay(2) // Bufferiser les 2 dernières valeurs
);
// Premier souscripteur
console.log('Observer 1 début de souscription');
source$.subscribe(value => console.log(`Observer 1: ${value}`));
// Ajouter un deuxième souscripteur après 3.5 secondes
setTimeout(() => {
console.log('Observer 2 début de souscription - reçoit les 2 dernières valeurs');
source$.subscribe(value => console.log(`Observer 2: ${value}`));
}, 3500);Résultat d'exécution
Observer 1 début de souscription
Source : 0
Observer 1: 0
Source : 1
Observer 1: 1
Observer 2 début de souscription - reçoit les 2 dernières valeurs
Observer 2: 0
Observer 2: 1
Source : 2
Observer 1: 2
Observer 2: 2
Source : 3
Observer 1: 3
Observer 2: 3
Source : 4
Observer 1: 4
Observer 2: 4Timing et Cycle de vie dans le Multicasting
Il est important de comprendre le cycle de vie d'un flux multicast. Particulièrement lors de l'utilisation de l'opérateur share(), il faut faire attention aux comportements suivants :
- Premier souscripteur :
share()démarre la connexion à l'Observable source lorsque la première souscription a lieu. - Tous les souscripteurs se désabonnent : Si la configuration
share({ resetOnRefCountZero: true })est présente, la connexion à la source est désactivée lorsque le nombre de souscripteurs atteint zéro. - Complétion ou erreur : Par défaut,
share()réinitialise son état interne lorsque complete ou error se produit (si resetOnComplete/resetOnError est true). - Réabonnement : Lorsqu'une souscription a lieu après la réinitialisation du flux, il est reconstruit comme un nouveau Observable.
Ainsi, les options de share() contrôlent le timing de démarrage, arrêt et régénération du flux en fonction du nombre de souscriptions et de l'état de complétion.
Cas d'usage pratiques
Partage de requêtes API
Exemple d'évitement de requêtes dupliquées au même endpoint API.
import { Observable, of, throwError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, catchError, shareReplay, tap } from 'rxjs';
// Simulation de service API
class UserService {
private cache = new Map<string, Observable<any>>();
getUser(id: string): Observable<any> {
// Retourner du cache si disponible
if (this.cache.has(id)) {
console.log(`Récupération de l'utilisateur ID ${id} depuis le cache`);
return this.cache.get(id)!;
}
// Créer une nouvelle requête
console.log(`Récupération de l'utilisateur ID ${id} depuis l'API`);
const request$ = ajax.getJSON(`https://jsonplaceholder.typicode.com/users/${id}`).pipe(
tap(response => console.log('Réponse API:', response)),
catchError(error => {
console.error('Erreur API:', error);
// Supprimer du cache
this.cache.delete(id);
return throwError(() => new Error('Échec de la récupération de l\'utilisateur'));
}),
// Partager avec shareReplay (mettre en cache la valeur même après complétion)
shareReplay(1)
);
// Enregistrer dans le cache
this.cache.set(id, request$);
return request$;
}
}
// Exemple d'utilisation
const userService = new UserService();
// Plusieurs composants demandent les mêmes données utilisateur
console.log('Composant 1 : demande de données utilisateur');
userService.getUser('1').subscribe(user => {
console.log('Composant 1 : réception des données utilisateur', user);
});
// Un autre composant demande les mêmes données un peu plus tard
setTimeout(() => {
console.log('Composant 2 : demande des mêmes données utilisateur');
userService.getUser('1').subscribe(user => {
console.log('Composant 2 : réception des données utilisateur', user);
});
}, 1000);
// Demande d'un autre utilisateur
setTimeout(() => {
console.log('Composant 3 : demande de données d\'un autre utilisateur');
userService.getUser('2').subscribe(user => {
console.log('Composant 3 : réception des données utilisateur', user);
});
}, 2000);Résultat d'exécution
Composant 1 : demande de données utilisateur
Récupération de l'utilisateur ID 1 depuis l'API
Réponse API: {id: 1, name: 'Leanne Graham', username: 'Bret', email: 'Sincere@april.biz', address: {…}, …}
Composant 1 : réception des données utilisateur {id: 1, name: 'Leanne Graham', username: 'Bret', email: 'Sincere@april.biz', address: {…}, …}
Composant 2 : demande des mêmes données utilisateur
Récupération de l'utilisateur ID 1 depuis le cache
Composant 2 : réception des données utilisateur {id: 1, name: 'Leanne Graham', username: 'Bret', email: 'Sincere@april.biz', address: {…}, …}
Composant 3 : demande de données d'un autre utilisateur
Récupération de l'utilisateur ID 2 depuis l'API
Réponse API: {id: 2, name: 'Ervin Howell', username: 'Antonette', email: 'Shanna@melissa.tv', address: {…}, …}
Composant 3 : réception des données utilisateur {id: 2, name: 'Ervin Howell', username: 'Antonette', email: 'Shanna@melissa.tv', address: {…}, …}Patterns de conception du Multicasting
Observable Singleton
Pattern de partage d'un observable unique dans toute l'application.
import { Subject } from 'rxjs';
// Gestion d'état global de l'application
class AppState {
// Instance singleton
private static instance: AppState;
// Flux de notifications global
private notificationsSubject = new Subject<string>();
// Observable public (lecture seule)
readonly notifications$ = this.notificationsSubject.asObservable();
// Accès singleton
static getInstance(): AppState {
if (!AppState.instance) {
AppState.instance = new AppState();
}
return AppState.instance;
}
// Méthode pour envoyer une notification
notify(message: string): void {
this.notificationsSubject.next(message);
}
}
// Exemple d'utilisation
const appState = AppState.getInstance();
// Surveiller les notifications (depuis plusieurs composants)
appState.notifications$.subscribe((msg) =>
console.log('Composant A:', msg)
);
appState.notifications$.subscribe((msg) =>
console.log('Composant B:', msg)
);
// Envoyer une notification
appState.notify('Mise à jour système disponible');Résultat d'exécution
Composant A: Mise à jour système disponible
Composant B: Mise à jour système disponibleRésumé
Le multicasting est une technique importante pour améliorer l'efficacité et les performances des applications RxJS. Les points principaux sont les suivants :
- Le multicasting permet de partager une source de données unique entre plusieurs souscripteurs
- Peut être implémenté avec des opérateurs comme
share(),shareReplay(),publish() - Peut éviter les requêtes API dupliquées et optimiser les traitements à coût de calcul élevé
- Utile pour la gestion d'état et la communication entre composants
En choisissant une stratégie de multicast appropriée, vous pouvez améliorer la réactivité et l'efficacité de l'application tout en réduisant la quantité de code et en améliorant la maintenabilité.