// ======================================== // FICHIER: AutoProcessor.js // RESPONSABILITÉ: Mode AUTO - Traitement Batch Google Sheets // FONCTIONNALITÉS: Processing queue, scheduling, monitoring // ======================================== const { logSh } = require('../ErrorReporting'); const { handleModularWorkflow } = require('../Main'); const { readInstructionsData } = require('../BrainConfig'); /** * PROCESSEUR MODE AUTO * Traitement automatique et séquentiel des lignes Google Sheets */ class AutoProcessor { constructor(options = {}) { this.config = { batchSize: options.batchSize || 5, // Lignes par batch delayBetweenItems: options.delayBetweenItems || 2000, // 2s entre chaque ligne delayBetweenBatches: options.delayBetweenBatches || 30000, // 30s entre batches maxRetries: options.maxRetries || 3, startRow: options.startRow || 2, endRow: options.endRow || null, // null = jusqu'à la fin autoMode: options.autoMode || 'standardEnhancement', // Config par défaut monitoringPort: options.monitoringPort || 3001, ...options }; this.processingQueue = []; this.processedItems = []; this.failedItems = []; this.state = { isProcessing: false, isPaused: false, currentItem: null, startTime: null, lastActivity: null, totalProcessed: 0, totalErrors: 0 }; this.stats = { itemsQueued: 0, itemsProcessed: 0, itemsFailed: 0, averageProcessingTime: 0, totalProcessingTime: 0, startTime: Date.now(), lastProcessedAt: null }; this.monitoringServer = null; this.processingInterval = null; this.isRunning = false; } // ======================================== // DÉMARRAGE ET ARRÊT // ======================================== /** * Démarre le processeur AUTO complet */ async start() { if (this.isRunning) { logSh('⚠️ AutoProcessor déjà en cours d\'exécution', 'WARNING'); return; } logSh('🤖 Démarrage AutoProcessor...', 'INFO'); try { // 1. Charger la queue depuis Google Sheets await this.loadProcessingQueue(); // 2. Serveur de monitoring (lecture seule) await this.startMonitoringServer(); // 3. Démarrer le traitement this.startProcessingLoop(); // 4. Monitoring périodique this.startHealthMonitoring(); this.isRunning = true; this.state.startTime = Date.now(); logSh(`✅ AutoProcessor démarré: ${this.stats.itemsQueued} éléments en queue`, 'INFO'); logSh(`📊 Monitoring sur http://localhost:${this.config.monitoringPort}`, 'INFO'); } catch (error) { logSh(`❌ Erreur démarrage AutoProcessor: ${error.message}`, 'ERROR'); await this.stop(); throw error; } } /** * Arrête le processeur AUTO */ async stop() { if (!this.isRunning) return; logSh('🛑 Arrêt AutoProcessor...', 'INFO'); try { // Marquer comme en arrêt this.isRunning = false; // Arrêter la boucle de traitement if (this.processingInterval) { clearInterval(this.processingInterval); this.processingInterval = null; } // Attendre la fin du traitement en cours if (this.state.isProcessing) { logSh('⏳ Attente fin traitement en cours...', 'INFO'); await this.waitForCurrentProcessing(); } // Arrêter monitoring if (this.healthInterval) { clearInterval(this.healthInterval); this.healthInterval = null; } // Arrêter serveur monitoring if (this.monitoringServer) { await new Promise((resolve) => { this.monitoringServer.close(() => resolve()); }); this.monitoringServer = null; } // Sauvegarder progression await this.saveProgress(); logSh('✅ AutoProcessor arrêté', 'INFO'); } catch (error) { logSh(`⚠️ Erreur arrêt AutoProcessor: ${error.message}`, 'WARNING'); } } // ======================================== // CHARGEMENT QUEUE // ======================================== /** * Charge la queue de traitement depuis Google Sheets */ async loadProcessingQueue() { logSh('📋 Chargement queue depuis Google Sheets...', 'INFO'); try { // Restaurer progression si disponible - TEMPORAIREMENT DÉSACTIVÉ // const savedProgress = await this.loadProgress(); // const processedRows = new Set(savedProgress?.processedRows || []); const processedRows = new Set(); // Ignore la progression sauvegardée // Scanner les lignes disponibles let currentRow = this.config.startRow; let consecutiveEmptyRows = 0; const maxEmptyRows = 5; // Arrêt après 5 lignes vides consécutives while (currentRow <= (this.config.endRow || 10)) { // 🔧 LIMITE MAX POUR ÉVITER BOUCLE INFINIE // Vérifier limite max si définie if (this.config.endRow && currentRow > this.config.endRow) { break; } try { // Tenter de lire la ligne const csvData = await readInstructionsData(currentRow); if (!csvData || !csvData.mc0) { // Ligne vide ou invalide consecutiveEmptyRows++; if (consecutiveEmptyRows >= maxEmptyRows) { logSh(`🛑 Arrêt scan après ${maxEmptyRows} lignes vides consécutives à partir de la ligne ${currentRow - maxEmptyRows + 1}`, 'INFO'); break; } } else { // Ligne valide trouvée consecutiveEmptyRows = 0; // Ajouter à la queue si pas déjà traitée if (!processedRows.has(currentRow)) { this.processingQueue.push({ rowNumber: currentRow, data: csvData, attempts: 0, status: 'pending', addedAt: Date.now() }); } else { logSh(`⏭️ Ligne ${currentRow} déjà traitée, ignorée`, 'DEBUG'); } } } catch (error) { // Erreur de lecture = ligne probablement vide consecutiveEmptyRows++; if (consecutiveEmptyRows >= maxEmptyRows) { break; } } currentRow++; } this.stats.itemsQueued = this.processingQueue.length; logSh(`📊 Queue chargée: ${this.stats.itemsQueued} éléments (lignes ${this.config.startRow}-${currentRow - 1})`, 'INFO'); if (this.stats.itemsQueued === 0) { logSh('⚠️ Aucun élément à traiter trouvé', 'WARNING'); } } catch (error) { logSh(`❌ Erreur chargement queue: ${error.message}`, 'ERROR'); throw error; } } // ======================================== // BOUCLE DE TRAITEMENT // ======================================== /** * Démarre la boucle principale de traitement */ startProcessingLoop() { if (this.processingQueue.length === 0) { logSh('⚠️ Queue vide, pas de traitement à démarrer', 'WARNING'); return; } logSh('🔄 Démarrage boucle de traitement...', 'INFO'); // Traitement immédiat du premier batch setTimeout(() => { this.processNextBatch(); }, 1000); // Puis traitement périodique this.processingInterval = setInterval(() => { if (!this.state.isProcessing && !this.state.isPaused) { this.processNextBatch(); } }, this.config.delayBetweenBatches); } /** * Traite le prochain batch d'éléments */ async processNextBatch() { if (this.state.isProcessing || this.state.isPaused || !this.isRunning) { return; } // Vérifier s'il reste des éléments const pendingItems = this.processingQueue.filter(item => item.status === 'pending'); if (pendingItems.length === 0) { logSh('✅ Tous les éléments ont été traités', 'INFO'); await this.completeProcessing(); return; } // Prendre le prochain batch const batchItems = pendingItems.slice(0, this.config.batchSize); logSh(`🚀 Traitement batch: ${batchItems.length} éléments`, 'INFO'); this.state.isProcessing = true; this.state.lastActivity = Date.now(); try { // Traiter chaque élément du batch séquentiellement for (const item of batchItems) { if (!this.isRunning) break; // Arrêt demandé await this.processItem(item); // Délai entre éléments if (this.config.delayBetweenItems > 0) { await this.sleep(this.config.delayBetweenItems); } } logSh(`✅ Batch terminé: ${batchItems.length} éléments traités`, 'INFO'); } catch (error) { logSh(`❌ Erreur traitement batch: ${error.message}`, 'ERROR'); } finally { this.state.isProcessing = false; this.state.currentItem = null; } } /** * Traite un élément individuel */ async processItem(item) { const startTime = Date.now(); this.state.currentItem = item; logSh(`🎯 Traitement ligne ${item.rowNumber}: ${item.data.mc0}`, 'INFO'); try { item.status = 'processing'; item.attempts++; item.startedAt = startTime; // Configuration de traitement automatique const processingConfig = { rowNumber: item.rowNumber, selectiveStack: this.config.autoMode, adversarialMode: 'light', humanSimulationMode: 'lightSimulation', patternBreakingMode: 'standardPatternBreaking', source: `auto_processor_row_${item.rowNumber}` }; // Exécution du workflow modulaire const result = await handleModularWorkflow(processingConfig); const duration = Date.now() - startTime; // Succès item.status = 'completed'; item.completedAt = Date.now(); item.duration = duration; item.result = { stats: result.stats, success: true }; this.processedItems.push(item); this.stats.itemsProcessed++; this.stats.totalProcessingTime += duration; this.stats.averageProcessingTime = Math.round(this.stats.totalProcessingTime / this.stats.itemsProcessed); this.stats.lastProcessedAt = Date.now(); logSh(`✅ Ligne ${item.rowNumber} terminée (${duration}ms) - ${result.stats.totalModifications || 0} modifications`, 'INFO'); } catch (error) { const duration = Date.now() - startTime; // Échec item.status = 'failed'; item.failedAt = Date.now(); item.duration = duration; item.error = error.message; this.stats.totalErrors++; logSh(`❌ Échec ligne ${item.rowNumber} (tentative ${item.attempts}/${this.config.maxRetries}): ${error.message}`, 'ERROR'); // Retry si possible if (item.attempts < this.config.maxRetries) { logSh(`🔄 Retry programmé pour ligne ${item.rowNumber}`, 'INFO'); item.status = 'pending'; // Remettre en queue } else { logSh(`💀 Ligne ${item.rowNumber} abandonnée après ${item.attempts} tentatives`, 'WARNING'); this.failedItems.push(item); this.stats.itemsFailed++; } } // Sauvegarder progression périodiquement if (this.stats.itemsProcessed % 5 === 0) { await this.saveProgress(); } } // ======================================== // SERVEUR MONITORING // ======================================== /** * Démarre le serveur de monitoring (lecture seule) */ async startMonitoringServer() { const express = require('express'); const app = express(); app.use(express.json()); // Page de status principale app.get('/', (req, res) => { res.send(this.generateStatusPage()); }); // API status JSON app.get('/api/status', (req, res) => { res.json(this.getDetailedStatus()); }); // API stats JSON app.get('/api/stats', (req, res) => { res.json({ success: true, stats: { ...this.stats }, queue: { total: this.processingQueue.length, pending: this.processingQueue.filter(i => i.status === 'pending').length, processing: this.processingQueue.filter(i => i.status === 'processing').length, completed: this.processingQueue.filter(i => i.status === 'completed').length, failed: this.processingQueue.filter(i => i.status === 'failed').length }, timestamp: new Date().toISOString() }); }); // Actions de contrôle (limitées) app.post('/api/pause', (req, res) => { this.pauseProcessing(); res.json({ success: true, message: 'Traitement mis en pause' }); }); app.post('/api/resume', (req, res) => { this.resumeProcessing(); res.json({ success: true, message: 'Traitement repris' }); }); // 404 pour autres routes app.use('*', (req, res) => { res.status(404).json({ success: false, error: 'Route non trouvée', mode: 'AUTO', message: 'Interface de monitoring en lecture seule' }); }); // Démarrage serveur return new Promise((resolve, reject) => { try { this.monitoringServer = app.listen(this.config.monitoringPort, '0.0.0.0', () => { logSh(`📊 Serveur monitoring démarré sur http://localhost:${this.config.monitoringPort}`, 'DEBUG'); resolve(); }); this.monitoringServer.on('error', (error) => { reject(error); }); } catch (error) { reject(error); } }); } /** * Génère la page de status HTML */ generateStatusPage() { const uptime = Math.floor((Date.now() - this.stats.startTime) / 1000); const progress = this.stats.itemsQueued > 0 ? Math.round((this.stats.itemsProcessed / this.stats.itemsQueued) * 100) : 0; const pendingCount = this.processingQueue.filter(i => i.status === 'pending').length; const completedCount = this.processingQueue.filter(i => i.status === 'completed').length; const failedCount = this.processingQueue.filter(i => i.status === 'failed').length; return `
Traitement Automatique Google Sheets