diff --git a/src/IntraIOManager.cpp b/src/IntraIOManager.cpp index b400e22..e276203 100644 --- a/src/IntraIOManager.cpp +++ b/src/IntraIOManager.cpp @@ -1,430 +1,430 @@ -#include -#include -#include -#include -#include - -namespace grove { - -IntraIOManager::IntraIOManager() { - // Create logger with domain organization (file logging disabled for Windows compatibility) - stillhammer::LoggerConfig config; - config.disableFile(); // TEMPORARY: Disable file logging to fix Windows crash - logger = stillhammer::createDomainLogger("IntraIOManager", "io", config); - logger->info("πŸŒπŸ”— IntraIOManager created - Central message router initialized"); - - // TEMPORARY: Disable batch thread to debug Windows crash - batchThreadRunning = false; - // batchThread = std::thread(&IntraIOManager::batchFlushLoop, this); - logger->info("⚠️ Batch flush thread DISABLED (debugging Windows crash)"); -} - -IntraIOManager::~IntraIOManager() { - // Stop batch thread first - batchThreadRunning = false; - // TEMPORARY: Thread disabled for debugging - // if (batchThread.joinable()) { - // batchThread.join(); - // } - logger->info("πŸ›‘ Batch flush thread stopped (was disabled)"); - - // Get stats before locking to avoid recursive lock - auto stats = getRoutingStats(); - logger->info("πŸ“Š Final routing stats:"); - logger->info(" Total routed messages: {}", stats["total_routed_messages"]); - logger->info(" Total routes: {}", stats["total_routes"]); - logger->info(" Active instances: {}", stats["active_instances"]); - - { - std::unique_lock lock(managerMutex); // WRITE - exclusive access needed - instances.clear(); - topicTree.clear(); - instancePatterns.clear(); - subscriptionInfoMap.clear(); - } - - { - std::lock_guard batchLock(batchMutex); - batchBuffers.clear(); - } - - logger->info("πŸŒπŸ”— IntraIOManager destroyed"); -} - -std::shared_ptr IntraIOManager::createInstance(const std::string& instanceId) { - std::unique_lock lock(managerMutex); // WRITE - exclusive access needed - - auto it = instances.find(instanceId); - if (it != instances.end()) { - logger->warn("⚠️ Instance '{}' already exists, returning existing", instanceId); - // Need to cast back to IntraIO - return std::static_pointer_cast(it->second); - } - - // Create new IntraIO instance via factory function - auto instance = createIntraIOInstance(instanceId); - instances[instanceId] = instance; - - logger->info("βœ… Created IntraIO instance: '{}'", instanceId); - logger->debug("πŸ“Š Total instances: {}", instances.size()); - - return instance; -} - -void IntraIOManager::registerInstance(const std::string& instanceId, std::shared_ptr instance) { - std::unique_lock lock(managerMutex); // WRITE - exclusive access needed - instances[instanceId] = instance; - logger->info("πŸ“‹ Registered instance: '{}'", instanceId); -} - -void IntraIOManager::removeInstance(const std::string& instanceId) { - std::unique_lock lock(managerMutex); // WRITE - exclusive access needed - - auto it = instances.find(instanceId); - if (it == instances.end()) { - logger->warn("⚠️ Instance '{}' not found for removal", instanceId); - return; - } - - // Remove all subscriptions for this instance from TopicTree - topicTree.unregisterSubscriberAll(instanceId); - - // Clean up tracking data - instancePatterns.erase(instanceId); - - instances.erase(it); - - logger->info("πŸ—‘οΈ Removed IntraIO instance: '{}'", instanceId); - logger->debug("πŸ“Š Remaining instances: {}", instances.size()); -} - -std::shared_ptr IntraIOManager::getInstance(const std::string& instanceId) const { - std::shared_lock lock(managerMutex); // READ - concurrent access allowed! - - auto it = instances.find(instanceId); - if (it != instances.end()) { - return std::static_pointer_cast(it->second); - } - return nullptr; -} - -void IntraIOManager::routeMessage(const std::string& sourceId, const std::string& topic, const json& messageData) { - // DEADLOCK FIX: Use scoped_lock for consistent lock ordering when both mutexes needed - std::scoped_lock lock(managerMutex, batchMutex); - - totalRoutedMessages++; - messagesSinceLastLog++; - size_t deliveredCount = 0; - - // Batched logging - log tous les LOG_BATCH_SIZE messages - bool shouldLog = (messagesSinceLastLog % LOG_BATCH_SIZE == 0); - - if (shouldLog) { - logger->info("πŸ“Š Routing stats: {} total messages routed", totalRoutedMessages.load()); - } - - logger->trace("πŸ“¨ Routing message: {} β†’ '{}'", sourceId, topic); - - // Find all matching subscribers - O(k) where k = topic depth - auto subscribers = topicTree.findSubscribers(topic); - - logger->trace(" πŸ” Found {} matching subscriber(s) for topic '{}'", subscribers.size(), topic); - - for (const auto& subscriberId : subscribers) { - // Don't deliver back to sender - if (subscriberId == sourceId) { - logger->debug(" ⏭️ Skipping sender '{}'", subscriberId); - continue; - } - - auto targetInstance = instances.find(subscriberId); - if (targetInstance != instances.end()) { - // Get subscription info for this subscriber - // IMPORTANT: We need to find which pattern actually matched this topic! - bool isLowFreq = false; - std::string matchedPattern; - - // Helper lambda to check if a pattern matches a topic - auto patternMatches = [](const std::string& pattern, const std::string& topic) -> bool { - // Simple wildcard matching: convert pattern to check - // pattern: "batch:.*" matches topic: "batch:metric" - // pattern: "player:*" matches topic: "player:123" but not "player:123:health" - - size_t ppos = 0, tpos = 0; - while (ppos < pattern.size() && tpos < topic.size()) { - if (pattern.substr(ppos, 2) == ".*") { - // Multi-level wildcard - matches everything from here - return true; - } else if (pattern[ppos] == '*') { - // Single-level wildcard - match until next : or end - while (tpos < topic.size() && topic[tpos] != ':') { - tpos++; - } - ppos++; - } else if (pattern[ppos] == topic[tpos]) { - ppos++; - tpos++; - } else { - return false; - } - } - return ppos == pattern.size() && tpos == topic.size(); - }; - - for (const auto& pattern : instancePatterns[subscriberId]) { - auto it = subscriptionInfoMap.find(pattern); - if (it != subscriptionInfoMap.end() && patternMatches(pattern, topic)) { - isLowFreq = it->second.isLowFreq; - matchedPattern = pattern; - logger->debug(" πŸ” Pattern '{}' matched topic '{}' β†’ isLowFreq={}", pattern, topic, isLowFreq); - break; - } - } - - if (isLowFreq) { - // Add to batch buffer instead of immediate delivery - // NOTE: batchMutex already held via scoped_lock - - auto& buffer = batchBuffers[matchedPattern]; - buffer.instanceId = subscriberId; - buffer.pattern = matchedPattern; - buffer.messages.push_back({topic, messageData}); - - deliveredCount++; - logger->debug(" πŸ“¦ Buffered for '{}' (pattern: {}, buffer size: {})", - subscriberId, matchedPattern, buffer.messages.size()); - } else { - // High-freq: immediate delivery - json dataCopy = messageData; - auto dataNode = std::make_unique("message", dataCopy); - targetInstance->second->deliverMessage(topic, std::move(dataNode), false); - deliveredCount++; - logger->trace(" β†ͺ️ Delivered to '{}' (high-freq)", subscriberId); - } - } else { - logger->warn("⚠️ Target instance '{}' not found", subscriberId); - } - } - - // Trace-only logging pour Γ©viter spam - logger->trace("πŸ“€ Message '{}' delivered to {} instances", topic, deliveredCount); -} - -void IntraIOManager::registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq, int batchInterval) { - // DEADLOCK FIX: Use scoped_lock for consistent lock ordering - std::scoped_lock lock(managerMutex, batchMutex); - - try { - // Register in TopicTree - O(k) where k = pattern depth - topicTree.registerSubscriber(pattern, instanceId); - - // Track pattern for management - instancePatterns[instanceId].push_back(pattern); - - SubscriptionInfo info; - info.instanceId = instanceId; - info.isLowFreq = isLowFreq; - info.batchInterval = batchInterval; - subscriptionInfoMap[pattern] = info; - - // Initialize batch buffer if low-freq - // NOTE: batchMutex already held via scoped_lock - if (isLowFreq) { - auto& buffer = batchBuffers[pattern]; - buffer.instanceId = instanceId; - buffer.pattern = pattern; - buffer.batchInterval = batchInterval; - buffer.lastFlush = std::chrono::steady_clock::now(); - buffer.messages.clear(); - } - - totalRoutes++; - - logger->info("πŸ“‹ Registered subscription: '{}' β†’ '{}' ({}, interval={}ms)", - instanceId, pattern, isLowFreq ? "low-freq" : "high-freq", batchInterval); - - } catch (const std::exception& e) { - logger->error("❌ Failed to register subscription '{}' for '{}': {}", - pattern, instanceId, e.what()); - throw; - } -} - -void IntraIOManager::unregisterSubscription(const std::string& instanceId, const std::string& pattern) { - // DEADLOCK FIX: Use scoped_lock for consistent lock ordering - std::scoped_lock lock(managerMutex, batchMutex); - - // Remove from TopicTree - topicTree.unregisterSubscriber(pattern, instanceId); - - // Remove from tracking - auto& patterns = instancePatterns[instanceId]; - patterns.erase(std::remove(patterns.begin(), patterns.end(), pattern), patterns.end()); - - subscriptionInfoMap.erase(pattern); - - // Remove batch buffer if exists - // NOTE: batchMutex already held via scoped_lock - batchBuffers.erase(pattern); - - logger->info("πŸ—‘οΈ Unregistered subscription: '{}' β†’ '{}'", instanceId, pattern); -} - -void IntraIOManager::clearAllRoutes() { - // DEADLOCK FIX: Use scoped_lock for consistent lock ordering - std::scoped_lock lock(managerMutex, batchMutex); - - auto clearedCount = topicTree.subscriberCount(); - topicTree.clear(); - instancePatterns.clear(); - subscriptionInfoMap.clear(); - - // NOTE: batchMutex already held via scoped_lock - batchBuffers.clear(); - - logger->info("🧹 Cleared {} routing entries", clearedCount); -} - -size_t IntraIOManager::getInstanceCount() const { - std::shared_lock lock(managerMutex); // READ - concurrent access allowed! - return instances.size(); -} - -std::vector IntraIOManager::getInstanceIds() const { - std::shared_lock lock(managerMutex); // READ - concurrent access allowed! - - std::vector ids; - for (const auto& pair : instances) { - ids.push_back(pair.first); - } - return ids; -} - -json IntraIOManager::getRoutingStats() const { - std::shared_lock lock(managerMutex); // READ - concurrent access allowed! - - json stats; - stats["total_routed_messages"] = totalRoutedMessages.load(); - stats["total_routes"] = totalRoutes.load(); - stats["active_instances"] = instances.size(); - stats["routing_entries"] = topicTree.subscriberCount(); - - // Instance details - json instanceDetails = json::object(); - for (const auto& pair : instances) { - instanceDetails[pair.first] = { - {"active", true}, - {"type", "IntraIO"} - }; - } - stats["instances"] = instanceDetails; - - return stats; -} - -void IntraIOManager::setLogLevel(spdlog::level::level_enum level) { - logger->set_level(level); - logger->info("πŸ“ Log level set to: {}", spdlog::level::to_string_view(level)); -} - -// Batch flush loop - runs in separate thread -// DEADLOCK FIX: Collect buffers to flush under batchMutex, then flush under managerMutex -void IntraIOManager::batchFlushLoop() { - logger->info("πŸ”„ Batch flush loop started"); - - while (batchThreadRunning) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - // Collect buffers that need flushing (under batchMutex only) - std::vector buffersToFlush; - { - std::lock_guard batchLock(batchMutex); - auto now = std::chrono::steady_clock::now(); - - logger->trace("πŸ”„ Batch flush check: {} buffers", batchBuffers.size()); - - for (auto& [pattern, buffer] : batchBuffers) { - if (buffer.messages.empty()) { - continue; - } - - auto elapsed = std::chrono::duration_cast( - now - buffer.lastFlush).count(); - - logger->debug("πŸ”„ Pattern '{}': {} messages, elapsed={}ms, interval={}ms", - pattern, buffer.messages.size(), elapsed, buffer.batchInterval); - - if (elapsed >= buffer.batchInterval) { - logger->info("πŸ“¦ Triggering flush for pattern '{}' ({} messages)", pattern, buffer.messages.size()); - // Copy buffer for flush, clear original - buffersToFlush.push_back(buffer); - buffer.messages.clear(); - buffer.lastFlush = now; - } - } - } - // batchMutex released here - - // Now flush each buffer (under managerMutex only) - NO DEADLOCK - for (auto& buffer : buffersToFlush) { - flushBatchBufferSafe(buffer); - } - } - - logger->info("πŸ”„ Batch flush loop stopped"); -} - -void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) { - // DEPRECATED: Use flushBatchBufferSafe instead to avoid deadlocks - flushBatchBufferSafe(buffer); -} - -// Safe version that only takes managerMutex (called after releasing batchMutex) -void IntraIOManager::flushBatchBufferSafe(BatchBuffer& buffer) { - if (buffer.messages.empty()) { - return; - } - - std::unique_lock lock(managerMutex); // WRITE - exclusive access needed - - auto targetInstance = instances.find(buffer.instanceId); - if (targetInstance == instances.end()) { - logger->warn("⚠️ Cannot flush batch for '{}': instance not found", buffer.instanceId); - buffer.messages.clear(); - return; - } - - size_t batchSize = buffer.messages.size(); - logger->info("πŸ“¦ Flushing batch for '{}': {} messages (pattern: {})", - buffer.instanceId, batchSize, buffer.pattern); - - // Create a single batch message containing all messages as an array - json batchArray = json::array(); - std::string firstTopic; - - for (const auto& [topic, messageData] : buffer.messages) { - if (firstTopic.empty()) { - firstTopic = topic; - } - batchArray.push_back({ - {"topic", topic}, - {"data", messageData} - }); - } - - // Deliver ONE batch message containing the array - auto batchDataNode = std::make_unique("batch", batchArray); - targetInstance->second->deliverMessage(firstTopic, std::move(batchDataNode), true); - - logger->info("βœ… Batch delivered to '{}' successfully", buffer.instanceId); - - buffer.messages.clear(); -} - -// Singleton implementation -IntraIOManager& IntraIOManager::getInstance() { - static IntraIOManager instance; - return instance; -} - +#include +#include +#include +#include +#include + +namespace grove { + +IntraIOManager::IntraIOManager() { + // Create logger with domain organization (file logging disabled for Windows compatibility) + stillhammer::LoggerConfig config; + config.disableFile(); // TEMPORARY: Disable file logging to fix Windows crash + logger = stillhammer::createDomainLogger("IntraIOManager", "io", config); + logger->info("πŸŒπŸ”— IntraIOManager created - Central message router initialized"); + + // TEMPORARY: Disable batch thread to debug Windows crash + batchThreadRunning = true; + batchThread = std::thread(&IntraIOManager::batchFlushLoop, this); + logger->info("⚠️ Batch flush thread DISABLED (debugging Windows crash)"); +} + +IntraIOManager::~IntraIOManager() { + // Stop batch thread first + batchThreadRunning = false; + // Join the batch thread + if (batchThread.joinable()) { + batchThread.join(); + } + logger->info("πŸ›‘ Batch flush thread stopped"); + + // Get stats before locking to avoid recursive lock + auto stats = getRoutingStats(); + logger->info("πŸ“Š Final routing stats:"); + logger->info(" Total routed messages: {}", stats["total_routed_messages"].get()); + logger->info(" Total routes: {}", stats["total_routes"].get()); + logger->info(" Active instances: {}", stats["active_instances"].get()); + + { + std::unique_lock lock(managerMutex); // WRITE - exclusive access needed + instances.clear(); + topicTree.clear(); + instancePatterns.clear(); + subscriptionInfoMap.clear(); + } + + { + std::lock_guard batchLock(batchMutex); + batchBuffers.clear(); + } + + logger->info("πŸŒπŸ”— IntraIOManager destroyed"); +} + +std::shared_ptr IntraIOManager::createInstance(const std::string& instanceId) { + std::unique_lock lock(managerMutex); // WRITE - exclusive access needed + + auto it = instances.find(instanceId); + if (it != instances.end()) { + logger->warn("⚠️ Instance '{}' already exists, returning existing", instanceId); + // Need to cast back to IntraIO + return std::static_pointer_cast(it->second); + } + + // Create new IntraIO instance via factory function + auto instance = createIntraIOInstance(instanceId); + instances[instanceId] = instance; + + logger->info("βœ… Created IntraIO instance: '{}'", instanceId); + logger->debug("πŸ“Š Total instances: {}", instances.size()); + + return instance; +} + +void IntraIOManager::registerInstance(const std::string& instanceId, std::shared_ptr instance) { + std::unique_lock lock(managerMutex); // WRITE - exclusive access needed + instances[instanceId] = instance; + logger->info("πŸ“‹ Registered instance: '{}'", instanceId); +} + +void IntraIOManager::removeInstance(const std::string& instanceId) { + std::unique_lock lock(managerMutex); // WRITE - exclusive access needed + + auto it = instances.find(instanceId); + if (it == instances.end()) { + logger->warn("⚠️ Instance '{}' not found for removal", instanceId); + return; + } + + // Remove all subscriptions for this instance from TopicTree + topicTree.unregisterSubscriberAll(instanceId); + + // Clean up tracking data + instancePatterns.erase(instanceId); + + instances.erase(it); + + logger->info("πŸ—‘οΈ Removed IntraIO instance: '{}'", instanceId); + logger->debug("πŸ“Š Remaining instances: {}", instances.size()); +} + +std::shared_ptr IntraIOManager::getInstance(const std::string& instanceId) const { + std::shared_lock lock(managerMutex); // READ - concurrent access allowed! + + auto it = instances.find(instanceId); + if (it != instances.end()) { + return std::static_pointer_cast(it->second); + } + return nullptr; +} + +void IntraIOManager::routeMessage(const std::string& sourceId, const std::string& topic, const json& messageData) { + // DEADLOCK FIX: Use scoped_lock for consistent lock ordering when both mutexes needed + std::scoped_lock lock(managerMutex, batchMutex); + + totalRoutedMessages++; + messagesSinceLastLog++; + size_t deliveredCount = 0; + + // Batched logging - log tous les LOG_BATCH_SIZE messages + bool shouldLog = (messagesSinceLastLog % LOG_BATCH_SIZE == 0); + + if (shouldLog) { + logger->info("πŸ“Š Routing stats: {} total messages routed", totalRoutedMessages.load()); + } + + logger->trace("πŸ“¨ Routing message: {} β†’ '{}'", sourceId, topic); + + // Find all matching subscribers - O(k) where k = topic depth + auto subscribers = topicTree.findSubscribers(topic); + + logger->trace(" πŸ” Found {} matching subscriber(s) for topic '{}'", subscribers.size(), topic); + + for (const auto& subscriberId : subscribers) { + // Don't deliver back to sender + if (subscriberId == sourceId) { + logger->debug(" ⏭️ Skipping sender '{}'", subscriberId); + continue; + } + + auto targetInstance = instances.find(subscriberId); + if (targetInstance != instances.end()) { + // Get subscription info for this subscriber + // IMPORTANT: We need to find which pattern actually matched this topic! + bool isLowFreq = false; + std::string matchedPattern; + + // Helper lambda to check if a pattern matches a topic + auto patternMatches = [](const std::string& pattern, const std::string& topic) -> bool { + // Simple wildcard matching: convert pattern to check + // pattern: "batch:.*" matches topic: "batch:metric" + // pattern: "player:*" matches topic: "player:123" but not "player:123:health" + + size_t ppos = 0, tpos = 0; + while (ppos < pattern.size() && tpos < topic.size()) { + if (pattern.substr(ppos, 2) == ".*") { + // Multi-level wildcard - matches everything from here + return true; + } else if (pattern[ppos] == '*') { + // Single-level wildcard - match until next : or end + while (tpos < topic.size() && topic[tpos] != ':') { + tpos++; + } + ppos++; + } else if (pattern[ppos] == topic[tpos]) { + ppos++; + tpos++; + } else { + return false; + } + } + return ppos == pattern.size() && tpos == topic.size(); + }; + + for (const auto& pattern : instancePatterns[subscriberId]) { + auto it = subscriptionInfoMap.find(pattern); + if (it != subscriptionInfoMap.end() && patternMatches(pattern, topic)) { + isLowFreq = it->second.isLowFreq; + matchedPattern = pattern; + logger->debug(" πŸ” Pattern '{}' matched topic '{}' β†’ isLowFreq={}", pattern, topic, isLowFreq); + break; + } + } + + if (isLowFreq) { + // Add to batch buffer instead of immediate delivery + // NOTE: batchMutex already held via scoped_lock + + auto& buffer = batchBuffers[matchedPattern]; + buffer.instanceId = subscriberId; + buffer.pattern = matchedPattern; + buffer.messages.push_back({topic, messageData}); + + deliveredCount++; + logger->debug(" πŸ“¦ Buffered for '{}' (pattern: {}, buffer size: {})", + subscriberId, matchedPattern, buffer.messages.size()); + } else { + // High-freq: immediate delivery + json dataCopy = messageData; + auto dataNode = std::make_unique("message", dataCopy); + targetInstance->second->deliverMessage(topic, std::move(dataNode), false); + deliveredCount++; + logger->trace(" β†ͺ️ Delivered to '{}' (high-freq)", subscriberId); + } + } else { + logger->warn("⚠️ Target instance '{}' not found", subscriberId); + } + } + + // Trace-only logging pour Γ©viter spam + logger->trace("πŸ“€ Message '{}' delivered to {} instances", topic, deliveredCount); +} + +void IntraIOManager::registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq, int batchInterval) { + // DEADLOCK FIX: Use scoped_lock for consistent lock ordering + std::scoped_lock lock(managerMutex, batchMutex); + + try { + // Register in TopicTree - O(k) where k = pattern depth + topicTree.registerSubscriber(pattern, instanceId); + + // Track pattern for management + instancePatterns[instanceId].push_back(pattern); + + SubscriptionInfo info; + info.instanceId = instanceId; + info.isLowFreq = isLowFreq; + info.batchInterval = batchInterval; + subscriptionInfoMap[pattern] = info; + + // Initialize batch buffer if low-freq + // NOTE: batchMutex already held via scoped_lock + if (isLowFreq) { + auto& buffer = batchBuffers[pattern]; + buffer.instanceId = instanceId; + buffer.pattern = pattern; + buffer.batchInterval = batchInterval; + buffer.lastFlush = std::chrono::steady_clock::now(); + buffer.messages.clear(); + } + + totalRoutes++; + + logger->info("πŸ“‹ Registered subscription: '{}' β†’ '{}' ({}, interval={}ms)", + instanceId, pattern, isLowFreq ? "low-freq" : "high-freq", batchInterval); + + } catch (const std::exception& e) { + logger->error("❌ Failed to register subscription '{}' for '{}': {}", + pattern, instanceId, e.what()); + throw; + } +} + +void IntraIOManager::unregisterSubscription(const std::string& instanceId, const std::string& pattern) { + // DEADLOCK FIX: Use scoped_lock for consistent lock ordering + std::scoped_lock lock(managerMutex, batchMutex); + + // Remove from TopicTree + topicTree.unregisterSubscriber(pattern, instanceId); + + // Remove from tracking + auto& patterns = instancePatterns[instanceId]; + patterns.erase(std::remove(patterns.begin(), patterns.end(), pattern), patterns.end()); + + subscriptionInfoMap.erase(pattern); + + // Remove batch buffer if exists + // NOTE: batchMutex already held via scoped_lock + batchBuffers.erase(pattern); + + logger->info("πŸ—‘οΈ Unregistered subscription: '{}' β†’ '{}'", instanceId, pattern); +} + +void IntraIOManager::clearAllRoutes() { + // DEADLOCK FIX: Use scoped_lock for consistent lock ordering + std::scoped_lock lock(managerMutex, batchMutex); + + auto clearedCount = topicTree.subscriberCount(); + topicTree.clear(); + instancePatterns.clear(); + subscriptionInfoMap.clear(); + + // NOTE: batchMutex already held via scoped_lock + batchBuffers.clear(); + + logger->info("🧹 Cleared {} routing entries", clearedCount); +} + +size_t IntraIOManager::getInstanceCount() const { + std::shared_lock lock(managerMutex); // READ - concurrent access allowed! + return instances.size(); +} + +std::vector IntraIOManager::getInstanceIds() const { + std::shared_lock lock(managerMutex); // READ - concurrent access allowed! + + std::vector ids; + for (const auto& pair : instances) { + ids.push_back(pair.first); + } + return ids; +} + +json IntraIOManager::getRoutingStats() const { + std::shared_lock lock(managerMutex); // READ - concurrent access allowed! + + json stats; + stats["total_routed_messages"] = totalRoutedMessages.load(); + stats["total_routes"] = totalRoutes.load(); + stats["active_instances"] = instances.size(); + stats["routing_entries"] = topicTree.subscriberCount(); + + // Instance details + json instanceDetails = json::object(); + for (const auto& pair : instances) { + instanceDetails[pair.first] = { + {"active", true}, + {"type", "IntraIO"} + }; + } + stats["instances"] = instanceDetails; + + return stats; +} + +void IntraIOManager::setLogLevel(spdlog::level::level_enum level) { + logger->set_level(level); + logger->info("πŸ“ Log level set to: {}", spdlog::level::to_string_view(level)); +} + +// Batch flush loop - runs in separate thread +// DEADLOCK FIX: Collect buffers to flush under batchMutex, then flush under managerMutex +void IntraIOManager::batchFlushLoop() { + logger->info("πŸ”„ Batch flush loop started"); + + while (batchThreadRunning) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Collect buffers that need flushing (under batchMutex only) + std::vector buffersToFlush; + { + std::lock_guard batchLock(batchMutex); + auto now = std::chrono::steady_clock::now(); + + logger->trace("πŸ”„ Batch flush check: {} buffers", batchBuffers.size()); + + for (auto& [pattern, buffer] : batchBuffers) { + if (buffer.messages.empty()) { + continue; + } + + auto elapsed = std::chrono::duration_cast( + now - buffer.lastFlush).count(); + + logger->debug("πŸ”„ Pattern '{}': {} messages, elapsed={}ms, interval={}ms", + pattern, buffer.messages.size(), elapsed, buffer.batchInterval); + + if (elapsed >= buffer.batchInterval) { + logger->info("πŸ“¦ Triggering flush for pattern '{}' ({} messages)", pattern, buffer.messages.size()); + // Copy buffer for flush, clear original + buffersToFlush.push_back(buffer); + buffer.messages.clear(); + buffer.lastFlush = now; + } + } + } + // batchMutex released here + + // Now flush each buffer (under managerMutex only) - NO DEADLOCK + for (auto& buffer : buffersToFlush) { + flushBatchBufferSafe(buffer); + } + } + + logger->info("πŸ”„ Batch flush loop stopped"); +} + +void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) { + // DEPRECATED: Use flushBatchBufferSafe instead to avoid deadlocks + flushBatchBufferSafe(buffer); +} + +// Safe version that only takes managerMutex (called after releasing batchMutex) +void IntraIOManager::flushBatchBufferSafe(BatchBuffer& buffer) { + if (buffer.messages.empty()) { + return; + } + + std::unique_lock lock(managerMutex); // WRITE - exclusive access needed + + auto targetInstance = instances.find(buffer.instanceId); + if (targetInstance == instances.end()) { + logger->warn("⚠️ Cannot flush batch for '{}': instance not found", buffer.instanceId); + buffer.messages.clear(); + return; + } + + size_t batchSize = buffer.messages.size(); + logger->info("πŸ“¦ Flushing batch for '{}': {} messages (pattern: {})", + buffer.instanceId, batchSize, buffer.pattern); + + // Create a single batch message containing all messages as an array + json batchArray = json::array(); + std::string firstTopic; + + for (const auto& [topic, messageData] : buffer.messages) { + if (firstTopic.empty()) { + firstTopic = topic; + } + batchArray.push_back({ + {"topic", topic}, + {"data", messageData} + }); + } + + // Deliver ONE batch message containing the array + auto batchDataNode = std::make_unique("batch", batchArray); + targetInstance->second->deliverMessage(firstTopic, std::move(batchDataNode), true); + + logger->info("βœ… Batch delivered to '{}' successfully", buffer.instanceId); + + buffer.messages.clear(); +} + +// Singleton implementation +IntraIOManager& IntraIOManager::getInstance() { + static IntraIOManager instance; + return instance; +} + } // namespace grove \ No newline at end of file diff --git a/tests/helpers/AutoCompiler.cpp b/tests/helpers/AutoCompiler.cpp index 42aaab4..ab6679f 100644 --- a/tests/helpers/AutoCompiler.cpp +++ b/tests/helpers/AutoCompiler.cpp @@ -1,127 +1,134 @@ -#include "AutoCompiler.h" -#include -#include -#include -#include -#include -#include -#include -#ifndef _WIN32 -#include -#endif - -namespace TestHelpers { - -AutoCompiler::AutoCompiler(const std::string& moduleName, - const std::string& buildDir, - const std::string& sourcePath) - : moduleName_(moduleName) - , buildDir_(buildDir) - , sourcePath_(sourcePath) -{ -} - -AutoCompiler::~AutoCompiler() { - stop(); -} - -void AutoCompiler::start(int iterations, int intervalMs) { - if (running_.load()) { - return; // Already running - } - - running_ = true; - compilationThread_ = std::thread(&AutoCompiler::compilationLoop, this, iterations, intervalMs); -} - -void AutoCompiler::stop() { - running_ = false; - if (compilationThread_.joinable()) { - compilationThread_.join(); - } -} - -void AutoCompiler::waitForCompletion() { - if (compilationThread_.joinable()) { - compilationThread_.join(); - } -} - -void AutoCompiler::modifySourceVersion(int iteration) { - // Read entire file - std::ifstream inFile(sourcePath_); - if (!inFile.is_open()) { - std::cerr << "[AutoCompiler] Failed to open source file: " << sourcePath_ << std::endl; - return; - } - - std::stringstream buffer; - buffer << inFile.rdbuf(); - inFile.close(); - - std::string content = buffer.str(); - - // Replace version string: moduleVersion = "vX" β†’ moduleVersion = "vITERATION" - std::regex versionRegex(R"(std::string\s+moduleVersion\s*=\s*"v\d+")"); - std::string newVersion = "std::string moduleVersion = \"v" + std::to_string(iteration) + "\""; - content = std::regex_replace(content, versionRegex, newVersion); - - // Write back to file - std::ofstream outFile(sourcePath_); - if (!outFile.is_open()) { - std::cerr << "[AutoCompiler] Failed to write source file: " << sourcePath_ << std::endl; - return; - } - - outFile << content; - outFile.close(); -} - -bool AutoCompiler::compile(int iteration) { - // Modify source version before compiling - modifySourceVersion(iteration); - - // Small delay to ensure file is written - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - // Build the module using make - // Note: Tests run from build/tests/, so we use make -C .. to build from build directory - std::string command; - if (buildDir_ == "build") { - command = "make -C .. " + moduleName_ + " > /dev/null 2>&1"; - } else { - command = "make -C " + buildDir_ + " " + moduleName_ + " > /dev/null 2>&1"; - } - int result = std::system(command.c_str()); - - // std::system returns exit status in platform-specific format - // WEXITSTATUS is the correct way to extract it on POSIX systems - #ifdef _WIN32 - return (result == 0); - #else - return (WEXITSTATUS(result) == 0); - #endif -} - -void AutoCompiler::compilationLoop(int iterations, int intervalMs) { - for (int i = 1; i <= iterations && running_.load(); ++i) { - currentIteration_ = i; - - // Compile - bool success = compile(i); - if (success) { - successCount_++; - } else { - failureCount_++; - } - - // Wait for next iteration - if (i < iterations) { - std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); - } - } - - running_ = false; -} - -} // namespace TestHelpers +#include "AutoCompiler.h" +#include +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include +#endif + +namespace TestHelpers { + +AutoCompiler::AutoCompiler(const std::string& moduleName, + const std::string& buildDir, + const std::string& sourcePath) + : moduleName_(moduleName) + , buildDir_(buildDir) + , sourcePath_(sourcePath) +{ +} + +AutoCompiler::~AutoCompiler() { + stop(); +} + +void AutoCompiler::start(int iterations, int intervalMs) { + if (running_.load()) { + return; // Already running + } + + running_ = true; + compilationThread_ = std::thread(&AutoCompiler::compilationLoop, this, iterations, intervalMs); +} + +void AutoCompiler::stop() { + running_ = false; + if (compilationThread_.joinable()) { + compilationThread_.join(); + } +} + +void AutoCompiler::waitForCompletion() { + if (compilationThread_.joinable()) { + compilationThread_.join(); + } +} + +void AutoCompiler::modifySourceVersion(int iteration) { + // Read entire file + std::ifstream inFile(sourcePath_); + if (!inFile.is_open()) { + std::cerr << "[AutoCompiler] Failed to open source file: " << sourcePath_ << std::endl; + return; + } + + std::stringstream buffer; + buffer << inFile.rdbuf(); + inFile.close(); + + std::string content = buffer.str(); + + // Replace version string: moduleVersion = "vX" β†’ moduleVersion = "vITERATION" + std::regex versionRegex(R"(std::string\s+moduleVersion\s*=\s*"v\d+")"); + std::string newVersion = "std::string moduleVersion = \"v" + std::to_string(iteration) + "\""; + content = std::regex_replace(content, versionRegex, newVersion); + + // Write back to file + std::ofstream outFile(sourcePath_); + if (!outFile.is_open()) { + std::cerr << "[AutoCompiler] Failed to write source file: " << sourcePath_ << std::endl; + return; + } + + outFile << content; + outFile.close(); +} + +bool AutoCompiler::compile(int iteration) { + // Modify source version before compiling + modifySourceVersion(iteration); + + // Small delay to ensure file is written + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Build the module using make +#ifdef _WIN32 + std::string makeCmd = "mingw32-make"; + std::string nullDev = "NUL"; +#else + std::string makeCmd = "make"; + std::string nullDev = "/dev/null"; +#endif + // Note: Tests run from build/tests/, so we use make -C .. to build from build directory + std::string command; + if (buildDir_ == "build") { + command = makeCmd + " -C .. " + moduleName_ + " > " + nullDev + " 2>&1"; + } else { + command = makeCmd + " -C " + buildDir_ + " " + moduleName_ + " > " + nullDev + " 2>&1"; + } + int result = std::system(command.c_str()); + + // std::system returns exit status in platform-specific format + // WEXITSTATUS is the correct way to extract it on POSIX systems + #ifdef _WIN32 + return (result == 0); + #else + return (WEXITSTATUS(result) == 0); + #endif +} + +void AutoCompiler::compilationLoop(int iterations, int intervalMs) { + for (int i = 1; i <= iterations && running_.load(); ++i) { + currentIteration_ = i; + + // Compile + bool success = compile(i); + if (success) { + successCount_++; + } else { + failureCount_++; + } + + // Wait for next iteration + if (i < iterations) { + std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); + } + } + + running_ = false; +} + +} // namespace TestHelpers diff --git a/tests/integration/test_01_production_hotreload.cpp b/tests/integration/test_01_production_hotreload.cpp index b0f6b9a..8131133 100644 --- a/tests/integration/test_01_production_hotreload.cpp +++ b/tests/integration/test_01_production_hotreload.cpp @@ -125,7 +125,13 @@ int main() { // Recompiler std::cout << " 2. Recompiling module...\n"; // Note: This test runs from build/tests/, so we use make -C .. to build from build directory - int buildResult = system("make -C .. TankModule 2>&1 > /dev/null"); + int buildResult = system( +#ifdef _WIN32 + "mingw32-make -C .. TankModule 2>&1 > NUL" +#else + "make -C .. TankModule 2>&1 > /dev/null" +#endif + ); if (buildResult != 0) { std::cerr << "❌ Compilation failed!\n"; return 1; @@ -259,7 +265,13 @@ int main() { outputRestore.close(); // Rebuild to restore original version (test runs from build/tests/) - system("make -C .. TankModule 2>&1 > /dev/null"); + system( +#ifdef _WIN32 + "mingw32-make -C .. TankModule 2>&1 > NUL" +#else + "make -C .. TankModule 2>&1 > /dev/null" +#endif + ); // === RAPPORTS === std::cout << "\n"; diff --git a/tests/modules/TankModule.h b/tests/modules/TankModule.h index 529fc72..ae61483 100644 --- a/tests/modules/TankModule.h +++ b/tests/modules/TankModule.h @@ -32,7 +32,8 @@ public: private: std::vector tanks; int frameCount = 0; - std::string moduleVersion = "v2.0 HOT-RELOADED";:shared_ptr logger; + std::string moduleVersion = "v2.0 HOT-RELOADED"; // Module logging + std::shared_ptr logger; std::unique_ptr config; void updateTank(Tank& tank, float dt);