seo-generator-server/lib/shared/QueueProcessor.js
StillHammer a2ffe7fec5 Refactor batch processing system with shared QueueProcessor base class
• Created QueueProcessor base class for shared queue management, retry logic, and persistence
• Refactored BatchProcessor to extend QueueProcessor (385→142 lines, 63% reduction)
• Created BatchController with comprehensive API endpoints for batch operations
• Added Digital Ocean templates integration with caching
• Integrated batch endpoints into ManualServer with proper routing
• Fixed infinite recursion bug in queue status calculations
• Eliminated ~400 lines of duplicate code across processors
• Maintained backward compatibility with existing test interfaces

Architecture benefits:
- Single source of truth for queue processing logic
- Simplified maintenance and bug fixes
- Clear separation between AutoProcessor (production) and BatchProcessor (R&D)
- Extensible design for future processor types

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-19 02:04:48 +08:00

759 lines
21 KiB
JavaScript

// ========================================
// QUEUE PROCESSOR - CLASSE COMMUNE
// Responsabilité: Logique partagée de queue, retry, persistance
// ========================================
const { logSh } = require('../ErrorReporting');
const { tracer } = require('../trace');
const { handleModularWorkflow } = require('../Main');
const { readInstructionsData } = require('../BrainConfig');
const fs = require('fs').promises;
const path = require('path');
/**
* QUEUE PROCESSOR BASE
* Classe commune pour la gestion de queue avec retry logic et persistance
*/
class QueueProcessor {
constructor(options = {}) {
this.name = options.name || 'QueueProcessor';
this.configPath = options.configPath;
this.statusPath = options.statusPath;
this.queuePath = options.queuePath;
// Configuration par défaut
this.config = {
selective: 'standardEnhancement',
adversarial: 'light',
humanSimulation: 'none',
patternBreaking: 'none',
intensity: 1.0,
rowRange: { start: 2, end: 10 },
saveIntermediateSteps: false,
maxRetries: 3,
delayBetweenItems: 1000,
batchSize: 1,
...options.config
};
// État du processeur
this.isRunning = false;
this.isPaused = false;
this.currentRow = null;
this.queue = [];
this.processedItems = [];
this.failedItems = [];
// Métriques
this.startTime = null;
this.processedCount = 0;
this.errorCount = 0;
// Stats détaillées
this.stats = {
itemsQueued: 0,
itemsProcessed: 0,
itemsFailed: 0,
averageProcessingTime: 0,
totalProcessingTime: 0,
startTime: Date.now(),
lastProcessedAt: null
};
// Callbacks optionnels
this.onStatusUpdate = null;
this.onProgress = null;
this.onError = null;
this.onComplete = null;
this.onItemProcessed = null;
}
// ========================================
// INITIALISATION
// ========================================
/**
* Initialise le processeur
*/
async initialize() {
try {
await this.loadConfig();
await this.initializeQueue();
logSh(`🎯 ${this.name} initialisé`, 'DEBUG');
} catch (error) {
logSh(`❌ Erreur initialisation ${this.name}: ${error.message}`, 'ERROR');
throw error;
}
}
/**
* Charge la configuration
*/
async loadConfig() {
if (!this.configPath) return;
try {
const configData = await fs.readFile(this.configPath, 'utf8');
this.config = { ...this.config, ...JSON.parse(configData) };
logSh(`📋 Configuration ${this.name} chargée`, 'DEBUG');
} catch (error) {
logSh(`⚠️ Configuration non trouvée pour ${this.name}, utilisation valeurs par défaut`, 'WARNING');
}
}
/**
* Initialise les fichiers de configuration
*/
async initializeFiles() {
if (!this.configPath) return;
try {
const configDir = path.dirname(this.configPath);
await fs.mkdir(configDir, { recursive: true });
// Créer config par défaut si inexistant
try {
await fs.access(this.configPath);
} catch {
await fs.writeFile(this.configPath, JSON.stringify(this.config, null, 2));
logSh(`📝 Configuration ${this.name} par défaut créée`, 'DEBUG');
}
// Créer status par défaut si inexistant
if (this.statusPath) {
const defaultStatus = this.getDefaultStatus();
try {
await fs.access(this.statusPath);
} catch {
await fs.writeFile(this.statusPath, JSON.stringify(defaultStatus, null, 2));
logSh(`📊 Status ${this.name} par défaut créé`, 'DEBUG');
}
}
} catch (error) {
logSh(`❌ Erreur initialisation fichiers ${this.name}: ${error.message}`, 'ERROR');
}
}
// ========================================
// GESTION QUEUE
// ========================================
/**
* Initialise la queue
*/
async initializeQueue() {
try {
// Essayer de charger la queue existante
if (this.queuePath) {
try {
const queueData = await fs.readFile(this.queuePath, 'utf8');
const savedQueue = JSON.parse(queueData);
if (savedQueue.queue && Array.isArray(savedQueue.queue)) {
this.queue = savedQueue.queue;
this.processedCount = savedQueue.processedCount || 0;
logSh(`📊 Queue ${this.name} restaurée: ${this.queue.length} éléments`, 'DEBUG');
}
} catch {
// Queue n'existe pas, on la créera
}
}
// Si queue vide, la populer
if (this.queue.length === 0) {
await this.populateQueue();
}
} catch (error) {
logSh(`❌ Erreur initialisation queue ${this.name}: ${error.message}`, 'ERROR');
}
}
/**
* Popule la queue avec les lignes à traiter
*/
async populateQueue() {
try {
this.queue = [];
const { start, end } = this.config.rowRange;
for (let rowNumber = start; rowNumber <= end; rowNumber++) {
this.queue.push({
rowNumber,
status: 'pending',
attempts: 0,
maxAttempts: this.config.maxRetries,
error: null,
result: null,
startTime: null,
endTime: null,
addedAt: Date.now()
});
}
await this.saveQueue();
this.stats.itemsQueued = this.queue.length;
logSh(`📋 Queue ${this.name} populée: ${this.queue.length} lignes (${start} à ${end})`, 'INFO');
} catch (error) {
logSh(`❌ Erreur population queue ${this.name}: ${error.message}`, 'ERROR');
throw error;
}
}
/**
* Popule la queue depuis Google Sheets (version avancée)
*/
async populateQueueFromSheets() {
try {
this.queue = [];
let currentRow = this.config.startRow || 2;
let consecutiveEmptyRows = 0;
const maxEmptyRows = 5;
while (currentRow <= (this.config.endRow || 50)) {
if (this.config.endRow && currentRow > this.config.endRow) {
break;
}
try {
const csvData = await readInstructionsData(currentRow);
if (!csvData || !csvData.mc0) {
consecutiveEmptyRows++;
if (consecutiveEmptyRows >= maxEmptyRows) {
logSh(`🛑 Arrêt scan après ${maxEmptyRows} lignes vides consécutives`, 'INFO');
break;
}
} else {
consecutiveEmptyRows = 0;
this.queue.push({
rowNumber: currentRow,
data: csvData,
status: 'pending',
attempts: 0,
maxAttempts: this.config.maxRetries,
error: null,
result: null,
startTime: null,
endTime: null,
addedAt: Date.now()
});
}
} catch (error) {
consecutiveEmptyRows++;
if (consecutiveEmptyRows >= maxEmptyRows) {
break;
}
}
currentRow++;
}
await this.saveQueue();
this.stats.itemsQueued = this.queue.length;
logSh(`📊 Queue ${this.name} chargée depuis Sheets: ${this.stats.itemsQueued} éléments`, 'INFO');
} catch (error) {
logSh(`❌ Erreur chargement queue depuis Sheets: ${error.message}`, 'ERROR');
throw error;
}
}
/**
* Sauvegarde la queue
*/
async saveQueue() {
if (!this.queuePath) return;
try {
const queueData = {
queue: this.queue,
processedCount: this.processedCount,
lastUpdate: new Date().toISOString()
};
await fs.writeFile(this.queuePath, JSON.stringify(queueData, null, 2));
} catch (error) {
logSh(`❌ Erreur sauvegarde queue ${this.name}: ${error.message}`, 'ERROR');
}
}
// ========================================
// CONTRÔLES PRINCIPAUX
// ========================================
/**
* Démarre le traitement
*/
async start() {
return tracer.run(`${this.name}.start`, async () => {
if (this.isRunning) {
throw new Error(`${this.name} est déjà en cours`);
}
logSh(`🚀 Démarrage ${this.name}`, 'INFO');
this.isRunning = true;
this.isPaused = false;
this.startTime = new Date();
this.processedCount = 0;
this.errorCount = 0;
await this.loadConfig();
if (this.queue.length === 0) {
await this.populateQueue();
}
await this.updateStatus();
// Démarrer le traitement asynchrone
this.processQueue().catch(error => {
logSh(`❌ Erreur traitement queue ${this.name}: ${error.message}`, 'ERROR');
this.handleError(error);
});
return this.getStatus();
});
}
/**
* Arrête le traitement
*/
async stop() {
return tracer.run(`${this.name}.stop`, async () => {
logSh(`🛑 Arrêt ${this.name}`, 'INFO');
this.isRunning = false;
this.isPaused = false;
this.currentRow = null;
await this.updateStatus();
return this.getStatus();
});
}
/**
* Met en pause le traitement
*/
async pause() {
return tracer.run(`${this.name}.pause`, async () => {
if (!this.isRunning) {
throw new Error(`Aucun traitement ${this.name} en cours`);
}
logSh(`⏸️ Mise en pause ${this.name}`, 'INFO');
this.isPaused = true;
await this.updateStatus();
return this.getStatus();
});
}
/**
* Reprend le traitement
*/
async resume() {
return tracer.run(`${this.name}.resume`, async () => {
if (!this.isRunning || !this.isPaused) {
throw new Error(`Aucun traitement ${this.name} en pause`);
}
logSh(`▶️ Reprise ${this.name}`, 'INFO');
this.isPaused = false;
await this.updateStatus();
// Reprendre le traitement
this.processQueue().catch(error => {
logSh(`❌ Erreur reprise traitement ${this.name}: ${error.message}`, 'ERROR');
this.handleError(error);
});
return this.getStatus();
});
}
// ========================================
// TRAITEMENT QUEUE
// ========================================
/**
* Traite la queue
*/
async processQueue() {
return tracer.run(`${this.name}.processQueue`, async () => {
while (this.isRunning && !this.isPaused) {
const nextItem = this.queue.find(item => item.status === 'pending' ||
(item.status === 'error' && item.attempts < item.maxAttempts));
if (!nextItem) {
logSh(`✅ Traitement ${this.name} terminé`, 'INFO');
await this.complete();
break;
}
await this.processItem(nextItem);
if (this.config.delayBetweenItems > 0) {
await this.sleep(this.config.delayBetweenItems);
}
}
});
}
/**
* Traite un élément de la queue
*/
async processItem(item) {
return tracer.run(`${this.name}.processItem`, async () => {
logSh(`🔄 Traitement ${this.name} ligne ${item.rowNumber} (tentative ${item.attempts + 1}/${item.maxAttempts})`, 'INFO');
this.currentRow = item.rowNumber;
item.status = 'processing';
item.startTime = new Date().toISOString();
item.attempts++;
await this.updateStatus();
await this.saveQueue();
try {
const result = await this.processRow(item.rowNumber, item.data);
// Succès
item.status = 'completed';
item.result = result;
item.endTime = new Date().toISOString();
item.error = null;
this.processedCount++;
this.processedItems.push(item);
const duration = Date.now() - new Date(item.startTime).getTime();
this.stats.itemsProcessed++;
this.stats.totalProcessingTime += duration;
this.stats.averageProcessingTime = Math.round(this.stats.totalProcessingTime / this.stats.itemsProcessed);
this.stats.lastProcessedAt = Date.now();
logSh(`${this.name} ligne ${item.rowNumber} traitée avec succès (${duration}ms)`, 'INFO');
if (this.onItemProcessed) {
this.onItemProcessed(item, result);
}
if (this.onProgress) {
this.onProgress(item, this.getProgress());
}
} catch (error) {
item.error = {
message: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
};
if (item.attempts >= item.maxAttempts) {
item.status = 'failed';
this.errorCount++;
this.failedItems.push(item);
logSh(`${this.name} ligne ${item.rowNumber} échouée définitivement après ${item.attempts} tentatives`, 'ERROR');
} else {
item.status = 'error';
logSh(`⚠️ ${this.name} ligne ${item.rowNumber} échouée, retry possible`, 'WARNING');
}
if (this.onError) {
this.onError(item, error);
}
}
this.currentRow = null;
await this.updateStatus();
await this.saveQueue();
});
}
/**
* Traite une ligne spécifique - à surcharger dans les classes enfants
*/
async processRow(rowNumber, data = null) {
const rowConfig = this.buildRowConfig(rowNumber, data);
logSh(`🎯 Configuration ${this.name} ligne ${rowNumber}: ${JSON.stringify(rowConfig)}`, 'DEBUG');
const result = await handleModularWorkflow(rowConfig);
logSh(`📊 Résultat ${this.name} ligne ${rowNumber}: ${result ? 'SUCCESS' : 'FAILED'}`, 'INFO');
return result;
}
/**
* Construit la configuration pour une ligne - à surcharger si nécessaire
*/
buildRowConfig(rowNumber, data = null) {
return {
rowNumber,
source: `${this.name.toLowerCase()}_row_${rowNumber}`,
selectiveStack: this.config.selective,
adversarialMode: this.config.adversarial,
humanSimulationMode: this.config.humanSimulation,
patternBreakingMode: this.config.patternBreaking,
intensity: this.config.intensity,
saveIntermediateSteps: this.config.saveIntermediateSteps,
data
};
}
// ========================================
// GESTION ÉTAT
// ========================================
/**
* Met à jour le status
*/
async updateStatus() {
const status = this.getStatus();
if (this.statusPath) {
try {
await fs.writeFile(this.statusPath, JSON.stringify(status, null, 2));
} catch (error) {
logSh(`❌ Erreur mise à jour status ${this.name}: ${error.message}`, 'ERROR');
}
}
if (this.onStatusUpdate) {
this.onStatusUpdate(status);
}
}
/**
* Retourne le status actuel
*/
getStatus() {
const now = new Date();
const completedItems = this.queue.filter(item => item.status === 'completed').length;
const failedItems = this.queue.filter(item => item.status === 'failed').length;
const totalItems = this.queue.length;
const progress = totalItems > 0 ? ((completedItems + failedItems) / totalItems) * 100 : 0;
let status = 'idle';
if (this.isRunning && this.isPaused) {
status = 'paused';
} else if (this.isRunning) {
status = 'running';
} else if (completedItems + failedItems === totalItems && totalItems > 0) {
status = 'completed';
}
return {
status,
currentRow: this.currentRow,
totalRows: totalItems,
completedRows: completedItems,
failedRows: failedItems,
progress: Math.round(progress),
startTime: this.startTime ? this.startTime.toISOString() : null,
estimatedEnd: this.estimateCompletionTime(),
errors: this.queue.filter(item => item.error).map(item => ({
rowNumber: item.rowNumber,
error: item.error,
attempts: item.attempts
})),
lastResult: this.getLastResult(),
config: this.config,
queue: this.queue,
stats: this.stats
};
}
/**
* Retourne la progression détaillée
*/
getProgress() {
// Calcul direct des métriques sans appeler getStatus() pour éviter la récursion
const now = new Date();
const elapsed = this.startTime ? now - this.startTime : 0;
const completedRows = this.processedItems.length;
const failedRows = this.failedItems.length;
const totalRows = this.queue.length + completedRows + failedRows;
const avgTimePerRow = completedRows > 0 ? elapsed / completedRows : 0;
const remainingRows = totalRows - completedRows - failedRows;
const estimatedRemaining = avgTimePerRow * remainingRows;
return {
status: this.status,
currentRow: this.currentItem ? this.currentItem.rowNumber : null,
totalRows: totalRows,
completedRows: completedRows,
failedRows: failedRows,
progress: totalRows > 0 ? Math.round((completedRows / totalRows) * 100) : 0,
startTime: this.startTime ? this.startTime.toISOString() : null,
estimatedEnd: null, // Calculé séparément pour éviter récursion
errors: this.failedItems.map(item => ({ row: item.rowNumber, error: item.error })),
lastResult: this.processedItems.length > 0 ? this.processedItems[this.processedItems.length - 1].result : null,
config: this.config,
queue: this.queue,
stats: {
itemsQueued: this.queue.length,
itemsProcessed: completedRows,
itemsFailed: failedRows,
averageProcessingTime: avgTimePerRow,
totalProcessingTime: elapsed,
startTime: this.startTime ? this.startTime.getTime() : null,
lastProcessedAt: this.processedItems.length > 0 ? this.processedItems[this.processedItems.length - 1].endTime : null
},
metrics: {
elapsedTime: elapsed,
avgTimePerRow: avgTimePerRow,
estimatedRemaining: estimatedRemaining,
completionPercentage: totalRows > 0 ? (completedRows / totalRows) * 100 : 0,
throughput: completedRows > 0 && elapsed > 0 ? (completedRows / (elapsed / 1000 / 60)) : 0
}
};
}
/**
* Estime l'heure de fin
*/
estimateCompletionTime() {
if (!this.startTime || !this.isRunning || this.isPaused) {
return null;
}
// Calcul direct sans appeler getProgress() pour éviter la récursion
const now = new Date();
const elapsed = now - this.startTime;
const completedRows = this.processedItems.length;
if (completedRows > 0) {
const avgTimePerRow = elapsed / completedRows;
const remainingRows = this.queue.length;
const estimatedRemaining = avgTimePerRow * remainingRows;
if (estimatedRemaining > 0) {
const endTime = new Date(Date.now() + estimatedRemaining);
return endTime.toISOString();
}
}
return null;
}
/**
* Retourne le dernier résultat
*/
getLastResult() {
const completedItems = this.queue.filter(item => item.status === 'completed');
if (completedItems.length === 0) return null;
const lastItem = completedItems[completedItems.length - 1];
return {
rowNumber: lastItem.rowNumber,
result: lastItem.result,
endTime: lastItem.endTime
};
}
/**
* Status par défaut
*/
getDefaultStatus() {
return {
status: 'idle',
currentRow: null,
totalRows: 0,
progress: 0,
startTime: null,
estimatedEnd: null,
errors: [],
lastResult: null,
config: this.config
};
}
// ========================================
// GESTION ERREURS
// ========================================
/**
* Gère les erreurs critiques
*/
async handleError(error) {
logSh(`💥 Erreur critique ${this.name}: ${error.message}`, 'ERROR');
this.isRunning = false;
this.isPaused = false;
await this.updateStatus();
if (this.onError) {
this.onError(null, error);
}
}
/**
* Termine le traitement
*/
async complete() {
logSh(`🏁 Traitement ${this.name} terminé`, 'INFO');
this.isRunning = false;
this.isPaused = false;
this.currentRow = null;
await this.updateStatus();
if (this.onComplete) {
this.onComplete(this.getStatus());
}
}
// ========================================
// UTILITAIRES
// ========================================
/**
* Pause l'exécution
*/
async sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Reset la queue
*/
async resetQueue() {
logSh(`🔄 Reset de la queue ${this.name}`, 'INFO');
this.queue = [];
this.processedCount = 0;
this.errorCount = 0;
await this.populateQueue();
await this.updateStatus();
}
/**
* Configure les callbacks
*/
setCallbacks({ onStatusUpdate, onProgress, onError, onComplete, onItemProcessed }) {
this.onStatusUpdate = onStatusUpdate;
this.onProgress = onProgress;
this.onError = onError;
this.onComplete = onComplete;
this.onItemProcessed = onItemProcessed;
}
}
// ============= EXPORTS =============
module.exports = { QueueProcessor };