💡 実装のベストプラクティス
1. 効率的なデータ差分配信
class EfficientDataStreaming {
constructor() {
this.lastSnapshot = new Map();
this.subscribers = new Map();
}
updateData(symbol, newData) {
const lastData = this.lastSnapshot.get(symbol);
const diff = this.calculateDiff(lastData, newData);
if (diff.hasChanges) {
this.broadcastDiff(symbol, diff);
this.lastSnapshot.set(symbol, newData);
}
}
calculateDiff(oldData, newData) {
const changes = {};
let hasChanges = false;
for (const [key, value] of Object.entries(newData)) {
if (!oldData || oldData[key] !== value) {
changes[key] = value;
hasChanges = true;
}
}
return { hasChanges, changes };
}
broadcastDiff(symbol, diff) {
const subscribers = this.subscribers.get(symbol) || [];
const message = {
type: 'diff',
symbol,
changes: diff.changes,
timestamp: Date.now()
};
subscribers.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
});
}
}
2. 適応的品質制御
class AdaptiveQualityControl {
constructor() {
this.clientProfiles = new Map();
this.qualityLevels = {
high: { updateRate: 100, compression: 'none' },
medium: { updateRate: 500, compression: 'gzip' },
low: { updateRate: 2000, compression: 'heavy' }
};
}
adjustQualityForClient(clientId, networkStats) {
const profile = this.clientProfiles.get(clientId) || {};
if (networkStats.latency > 1000 || networkStats.bandwidth < 1000000) {
profile.quality = 'low';
} else if (networkStats.latency > 500 || networkStats.bandwidth < 5000000) {
profile.quality = 'medium';
} else {
profile.quality = 'high';
}
this.clientProfiles.set(clientId, profile);
return this.qualityLevels[profile.quality];
}
sendDataToClient(clientId, data) {
const profile = this.clientProfiles.get(clientId);
const quality = this.qualityLevels[profile.quality];
const processedData = this.processDataForQuality(data, quality);
this.sendToWebSocket(clientId, processedData);
}
}
3. 高頻度データバッファリング
class HighFrequencyDataBuffer {
constructor(flushInterval = 100) {
this.buffer = new Map();
this.flushInterval = flushInterval;
this.startFlushing();
}
addData(key, data) {
if (!this.buffer.has(key)) {
this.buffer.set(key, []);
}
this.buffer.get(key).push({
data,
timestamp: Date.now()
});
}
startFlushing() {
setInterval(() => {
this.flushBuffer();
}, this.flushInterval);
}
flushBuffer() {
for (const [key, dataPoints] of this.buffer.entries()) {
if (dataPoints.length > 0) {
const aggregated = this.aggregateDataPoints(dataPoints);
this.sendAggregatedData(key, aggregated);
this.buffer.set(key, []);
}
}
}
aggregateDataPoints(dataPoints) {
return {
count: dataPoints.length,
latest: dataPoints[dataPoints.length - 1].data,
average: this.calculateAverage(dataPoints),
timeRange: {
start: dataPoints[0].timestamp,
end: dataPoints[dataPoints.length - 1].timestamp
}
};
}
}
4. 接続プール管理
class WebSocketConnectionPool {
constructor(maxConnections = 10000) {
this.connections = new Map();
this.connectionGroups = new Map();
this.maxConnections = maxConnections;
}
addConnection(connectionId, ws, topics = []) {
if (this.connections.size >= this.maxConnections) {
throw new Error('Connection limit reached');
}
this.connections.set(connectionId, {
ws,
topics: new Set(topics),
lastActivity: Date.now(),
messageCount: 0
});
topics.forEach(topic => {
if (!this.connectionGroups.has(topic)) {
this.connectionGroups.set(topic, new Set());
}
this.connectionGroups.get(topic).add(connectionId);
});
}
broadcastToTopic(topic, message) {
const connections = this.connectionGroups.get(topic);
if (!connections) return;
const messageStr = JSON.stringify(message);
let successCount = 0;
connections.forEach(connectionId => {
const conn = this.connections.get(connectionId);
if (conn && conn.ws.readyState === WebSocket.OPEN) {
try {
conn.ws.send(messageStr);
conn.messageCount++;
conn.lastActivity = Date.now();
successCount++;
} catch (error) {
this.removeConnection(connectionId);
}
}
});
return successCount;
}
}
この包括的なデータ配信システムアーキテクチャにより、高性能でスケーラブルなリアルタイムデータ配信を実現できます。