fix: IntraIOManager batch thread + AutoCompiler Windows support

- Re-enable batch flush thread for low-frequency message batching
- Fix JSON type error in routing stats logging (.get<size_t>())
- Add Windows/MinGW support to AutoCompiler (mingw32-make, NUL)
- Fix TankModule.h linter merge bug (add comment between lines)
- Add Windows platform check for make command in test_01

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
StillHammer 2025-12-31 09:44:37 +07:00
parent edf4d76844
commit 415cad1b0a
4 changed files with 579 additions and 559 deletions

View File

@ -1,430 +1,430 @@
#include <grove/IntraIOManager.h> #include <grove/IntraIOManager.h>
#include <grove/IntraIO.h> #include <grove/IntraIO.h>
#include <grove/JsonDataNode.h> #include <grove/JsonDataNode.h>
#include <stdexcept> #include <stdexcept>
#include <logger/Logger.h> #include <logger/Logger.h>
namespace grove { namespace grove {
IntraIOManager::IntraIOManager() { IntraIOManager::IntraIOManager() {
// Create logger with domain organization (file logging disabled for Windows compatibility) // Create logger with domain organization (file logging disabled for Windows compatibility)
stillhammer::LoggerConfig config; stillhammer::LoggerConfig config;
config.disableFile(); // TEMPORARY: Disable file logging to fix Windows crash config.disableFile(); // TEMPORARY: Disable file logging to fix Windows crash
logger = stillhammer::createDomainLogger("IntraIOManager", "io", config); logger = stillhammer::createDomainLogger("IntraIOManager", "io", config);
logger->info("🌐🔗 IntraIOManager created - Central message router initialized"); logger->info("🌐🔗 IntraIOManager created - Central message router initialized");
// TEMPORARY: Disable batch thread to debug Windows crash // TEMPORARY: Disable batch thread to debug Windows crash
batchThreadRunning = false; batchThreadRunning = true;
// batchThread = std::thread(&IntraIOManager::batchFlushLoop, this); batchThread = std::thread(&IntraIOManager::batchFlushLoop, this);
logger->info("⚠️ Batch flush thread DISABLED (debugging Windows crash)"); logger->info("⚠️ Batch flush thread DISABLED (debugging Windows crash)");
} }
IntraIOManager::~IntraIOManager() { IntraIOManager::~IntraIOManager() {
// Stop batch thread first // Stop batch thread first
batchThreadRunning = false; batchThreadRunning = false;
// TEMPORARY: Thread disabled for debugging // Join the batch thread
// if (batchThread.joinable()) { if (batchThread.joinable()) {
// batchThread.join(); batchThread.join();
// } }
logger->info("🛑 Batch flush thread stopped (was disabled)"); logger->info("🛑 Batch flush thread stopped");
// Get stats before locking to avoid recursive lock // Get stats before locking to avoid recursive lock
auto stats = getRoutingStats(); auto stats = getRoutingStats();
logger->info("📊 Final routing stats:"); logger->info("📊 Final routing stats:");
logger->info(" Total routed messages: {}", stats["total_routed_messages"]); logger->info(" Total routed messages: {}", stats["total_routed_messages"].get<size_t>());
logger->info(" Total routes: {}", stats["total_routes"]); logger->info(" Total routes: {}", stats["total_routes"].get<size_t>());
logger->info(" Active instances: {}", stats["active_instances"]); logger->info(" Active instances: {}", stats["active_instances"].get<size_t>());
{ {
std::unique_lock lock(managerMutex); // WRITE - exclusive access needed std::unique_lock lock(managerMutex); // WRITE - exclusive access needed
instances.clear(); instances.clear();
topicTree.clear(); topicTree.clear();
instancePatterns.clear(); instancePatterns.clear();
subscriptionInfoMap.clear(); subscriptionInfoMap.clear();
} }
{ {
std::lock_guard<std::mutex> batchLock(batchMutex); std::lock_guard<std::mutex> batchLock(batchMutex);
batchBuffers.clear(); batchBuffers.clear();
} }
logger->info("🌐🔗 IntraIOManager destroyed"); logger->info("🌐🔗 IntraIOManager destroyed");
} }
std::shared_ptr<IntraIO> IntraIOManager::createInstance(const std::string& instanceId) { std::shared_ptr<IntraIO> IntraIOManager::createInstance(const std::string& instanceId) {
std::unique_lock lock(managerMutex); // WRITE - exclusive access needed std::unique_lock lock(managerMutex); // WRITE - exclusive access needed
auto it = instances.find(instanceId); auto it = instances.find(instanceId);
if (it != instances.end()) { if (it != instances.end()) {
logger->warn("⚠️ Instance '{}' already exists, returning existing", instanceId); logger->warn("⚠️ Instance '{}' already exists, returning existing", instanceId);
// Need to cast back to IntraIO // Need to cast back to IntraIO
return std::static_pointer_cast<IntraIO>(it->second); return std::static_pointer_cast<IntraIO>(it->second);
} }
// Create new IntraIO instance via factory function // Create new IntraIO instance via factory function
auto instance = createIntraIOInstance(instanceId); auto instance = createIntraIOInstance(instanceId);
instances[instanceId] = instance; instances[instanceId] = instance;
logger->info("✅ Created IntraIO instance: '{}'", instanceId); logger->info("✅ Created IntraIO instance: '{}'", instanceId);
logger->debug("📊 Total instances: {}", instances.size()); logger->debug("📊 Total instances: {}", instances.size());
return instance; return instance;
} }
void IntraIOManager::registerInstance(const std::string& instanceId, std::shared_ptr<IIntraIODelivery> instance) { void IntraIOManager::registerInstance(const std::string& instanceId, std::shared_ptr<IIntraIODelivery> instance) {
std::unique_lock lock(managerMutex); // WRITE - exclusive access needed std::unique_lock lock(managerMutex); // WRITE - exclusive access needed
instances[instanceId] = instance; instances[instanceId] = instance;
logger->info("📋 Registered instance: '{}'", instanceId); logger->info("📋 Registered instance: '{}'", instanceId);
} }
void IntraIOManager::removeInstance(const std::string& instanceId) { void IntraIOManager::removeInstance(const std::string& instanceId) {
std::unique_lock lock(managerMutex); // WRITE - exclusive access needed std::unique_lock lock(managerMutex); // WRITE - exclusive access needed
auto it = instances.find(instanceId); auto it = instances.find(instanceId);
if (it == instances.end()) { if (it == instances.end()) {
logger->warn("⚠️ Instance '{}' not found for removal", instanceId); logger->warn("⚠️ Instance '{}' not found for removal", instanceId);
return; return;
} }
// Remove all subscriptions for this instance from TopicTree // Remove all subscriptions for this instance from TopicTree
topicTree.unregisterSubscriberAll(instanceId); topicTree.unregisterSubscriberAll(instanceId);
// Clean up tracking data // Clean up tracking data
instancePatterns.erase(instanceId); instancePatterns.erase(instanceId);
instances.erase(it); instances.erase(it);
logger->info("🗑️ Removed IntraIO instance: '{}'", instanceId); logger->info("🗑️ Removed IntraIO instance: '{}'", instanceId);
logger->debug("📊 Remaining instances: {}", instances.size()); logger->debug("📊 Remaining instances: {}", instances.size());
} }
std::shared_ptr<IntraIO> IntraIOManager::getInstance(const std::string& instanceId) const { std::shared_ptr<IntraIO> IntraIOManager::getInstance(const std::string& instanceId) const {
std::shared_lock lock(managerMutex); // READ - concurrent access allowed! std::shared_lock lock(managerMutex); // READ - concurrent access allowed!
auto it = instances.find(instanceId); auto it = instances.find(instanceId);
if (it != instances.end()) { if (it != instances.end()) {
return std::static_pointer_cast<IntraIO>(it->second); return std::static_pointer_cast<IntraIO>(it->second);
} }
return nullptr; return nullptr;
} }
void IntraIOManager::routeMessage(const std::string& sourceId, const std::string& topic, const json& messageData) { void IntraIOManager::routeMessage(const std::string& sourceId, const std::string& topic, const json& messageData) {
// DEADLOCK FIX: Use scoped_lock for consistent lock ordering when both mutexes needed // DEADLOCK FIX: Use scoped_lock for consistent lock ordering when both mutexes needed
std::scoped_lock lock(managerMutex, batchMutex); std::scoped_lock lock(managerMutex, batchMutex);
totalRoutedMessages++; totalRoutedMessages++;
messagesSinceLastLog++; messagesSinceLastLog++;
size_t deliveredCount = 0; size_t deliveredCount = 0;
// Batched logging - log tous les LOG_BATCH_SIZE messages // Batched logging - log tous les LOG_BATCH_SIZE messages
bool shouldLog = (messagesSinceLastLog % LOG_BATCH_SIZE == 0); bool shouldLog = (messagesSinceLastLog % LOG_BATCH_SIZE == 0);
if (shouldLog) { if (shouldLog) {
logger->info("📊 Routing stats: {} total messages routed", totalRoutedMessages.load()); logger->info("📊 Routing stats: {} total messages routed", totalRoutedMessages.load());
} }
logger->trace("📨 Routing message: {} → '{}'", sourceId, topic); logger->trace("📨 Routing message: {} → '{}'", sourceId, topic);
// Find all matching subscribers - O(k) where k = topic depth // Find all matching subscribers - O(k) where k = topic depth
auto subscribers = topicTree.findSubscribers(topic); auto subscribers = topicTree.findSubscribers(topic);
logger->trace(" 🔍 Found {} matching subscriber(s) for topic '{}'", subscribers.size(), topic); logger->trace(" 🔍 Found {} matching subscriber(s) for topic '{}'", subscribers.size(), topic);
for (const auto& subscriberId : subscribers) { for (const auto& subscriberId : subscribers) {
// Don't deliver back to sender // Don't deliver back to sender
if (subscriberId == sourceId) { if (subscriberId == sourceId) {
logger->debug(" ⏭️ Skipping sender '{}'", subscriberId); logger->debug(" ⏭️ Skipping sender '{}'", subscriberId);
continue; continue;
} }
auto targetInstance = instances.find(subscriberId); auto targetInstance = instances.find(subscriberId);
if (targetInstance != instances.end()) { if (targetInstance != instances.end()) {
// Get subscription info for this subscriber // Get subscription info for this subscriber
// IMPORTANT: We need to find which pattern actually matched this topic! // IMPORTANT: We need to find which pattern actually matched this topic!
bool isLowFreq = false; bool isLowFreq = false;
std::string matchedPattern; std::string matchedPattern;
// Helper lambda to check if a pattern matches a topic // Helper lambda to check if a pattern matches a topic
auto patternMatches = [](const std::string& pattern, const std::string& topic) -> bool { auto patternMatches = [](const std::string& pattern, const std::string& topic) -> bool {
// Simple wildcard matching: convert pattern to check // Simple wildcard matching: convert pattern to check
// pattern: "batch:.*" matches topic: "batch:metric" // pattern: "batch:.*" matches topic: "batch:metric"
// pattern: "player:*" matches topic: "player:123" but not "player:123:health" // pattern: "player:*" matches topic: "player:123" but not "player:123:health"
size_t ppos = 0, tpos = 0; size_t ppos = 0, tpos = 0;
while (ppos < pattern.size() && tpos < topic.size()) { while (ppos < pattern.size() && tpos < topic.size()) {
if (pattern.substr(ppos, 2) == ".*") { if (pattern.substr(ppos, 2) == ".*") {
// Multi-level wildcard - matches everything from here // Multi-level wildcard - matches everything from here
return true; return true;
} else if (pattern[ppos] == '*') { } else if (pattern[ppos] == '*') {
// Single-level wildcard - match until next : or end // Single-level wildcard - match until next : or end
while (tpos < topic.size() && topic[tpos] != ':') { while (tpos < topic.size() && topic[tpos] != ':') {
tpos++; tpos++;
} }
ppos++; ppos++;
} else if (pattern[ppos] == topic[tpos]) { } else if (pattern[ppos] == topic[tpos]) {
ppos++; ppos++;
tpos++; tpos++;
} else { } else {
return false; return false;
} }
} }
return ppos == pattern.size() && tpos == topic.size(); return ppos == pattern.size() && tpos == topic.size();
}; };
for (const auto& pattern : instancePatterns[subscriberId]) { for (const auto& pattern : instancePatterns[subscriberId]) {
auto it = subscriptionInfoMap.find(pattern); auto it = subscriptionInfoMap.find(pattern);
if (it != subscriptionInfoMap.end() && patternMatches(pattern, topic)) { if (it != subscriptionInfoMap.end() && patternMatches(pattern, topic)) {
isLowFreq = it->second.isLowFreq; isLowFreq = it->second.isLowFreq;
matchedPattern = pattern; matchedPattern = pattern;
logger->debug(" 🔍 Pattern '{}' matched topic '{}' → isLowFreq={}", pattern, topic, isLowFreq); logger->debug(" 🔍 Pattern '{}' matched topic '{}' → isLowFreq={}", pattern, topic, isLowFreq);
break; break;
} }
} }
if (isLowFreq) { if (isLowFreq) {
// Add to batch buffer instead of immediate delivery // Add to batch buffer instead of immediate delivery
// NOTE: batchMutex already held via scoped_lock // NOTE: batchMutex already held via scoped_lock
auto& buffer = batchBuffers[matchedPattern]; auto& buffer = batchBuffers[matchedPattern];
buffer.instanceId = subscriberId; buffer.instanceId = subscriberId;
buffer.pattern = matchedPattern; buffer.pattern = matchedPattern;
buffer.messages.push_back({topic, messageData}); buffer.messages.push_back({topic, messageData});
deliveredCount++; deliveredCount++;
logger->debug(" 📦 Buffered for '{}' (pattern: {}, buffer size: {})", logger->debug(" 📦 Buffered for '{}' (pattern: {}, buffer size: {})",
subscriberId, matchedPattern, buffer.messages.size()); subscriberId, matchedPattern, buffer.messages.size());
} else { } else {
// High-freq: immediate delivery // High-freq: immediate delivery
json dataCopy = messageData; json dataCopy = messageData;
auto dataNode = std::make_unique<JsonDataNode>("message", dataCopy); auto dataNode = std::make_unique<JsonDataNode>("message", dataCopy);
targetInstance->second->deliverMessage(topic, std::move(dataNode), false); targetInstance->second->deliverMessage(topic, std::move(dataNode), false);
deliveredCount++; deliveredCount++;
logger->trace(" ↪️ Delivered to '{}' (high-freq)", subscriberId); logger->trace(" ↪️ Delivered to '{}' (high-freq)", subscriberId);
} }
} else { } else {
logger->warn("⚠️ Target instance '{}' not found", subscriberId); logger->warn("⚠️ Target instance '{}' not found", subscriberId);
} }
} }
// Trace-only logging pour éviter spam // Trace-only logging pour éviter spam
logger->trace("📤 Message '{}' delivered to {} instances", topic, deliveredCount); logger->trace("📤 Message '{}' delivered to {} instances", topic, deliveredCount);
} }
void IntraIOManager::registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq, int batchInterval) { void IntraIOManager::registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq, int batchInterval) {
// DEADLOCK FIX: Use scoped_lock for consistent lock ordering // DEADLOCK FIX: Use scoped_lock for consistent lock ordering
std::scoped_lock lock(managerMutex, batchMutex); std::scoped_lock lock(managerMutex, batchMutex);
try { try {
// Register in TopicTree - O(k) where k = pattern depth // Register in TopicTree - O(k) where k = pattern depth
topicTree.registerSubscriber(pattern, instanceId); topicTree.registerSubscriber(pattern, instanceId);
// Track pattern for management // Track pattern for management
instancePatterns[instanceId].push_back(pattern); instancePatterns[instanceId].push_back(pattern);
SubscriptionInfo info; SubscriptionInfo info;
info.instanceId = instanceId; info.instanceId = instanceId;
info.isLowFreq = isLowFreq; info.isLowFreq = isLowFreq;
info.batchInterval = batchInterval; info.batchInterval = batchInterval;
subscriptionInfoMap[pattern] = info; subscriptionInfoMap[pattern] = info;
// Initialize batch buffer if low-freq // Initialize batch buffer if low-freq
// NOTE: batchMutex already held via scoped_lock // NOTE: batchMutex already held via scoped_lock
if (isLowFreq) { if (isLowFreq) {
auto& buffer = batchBuffers[pattern]; auto& buffer = batchBuffers[pattern];
buffer.instanceId = instanceId; buffer.instanceId = instanceId;
buffer.pattern = pattern; buffer.pattern = pattern;
buffer.batchInterval = batchInterval; buffer.batchInterval = batchInterval;
buffer.lastFlush = std::chrono::steady_clock::now(); buffer.lastFlush = std::chrono::steady_clock::now();
buffer.messages.clear(); buffer.messages.clear();
} }
totalRoutes++; totalRoutes++;
logger->info("📋 Registered subscription: '{}' → '{}' ({}, interval={}ms)", logger->info("📋 Registered subscription: '{}' → '{}' ({}, interval={}ms)",
instanceId, pattern, isLowFreq ? "low-freq" : "high-freq", batchInterval); instanceId, pattern, isLowFreq ? "low-freq" : "high-freq", batchInterval);
} catch (const std::exception& e) { } catch (const std::exception& e) {
logger->error("❌ Failed to register subscription '{}' for '{}': {}", logger->error("❌ Failed to register subscription '{}' for '{}': {}",
pattern, instanceId, e.what()); pattern, instanceId, e.what());
throw; throw;
} }
} }
void IntraIOManager::unregisterSubscription(const std::string& instanceId, const std::string& pattern) { void IntraIOManager::unregisterSubscription(const std::string& instanceId, const std::string& pattern) {
// DEADLOCK FIX: Use scoped_lock for consistent lock ordering // DEADLOCK FIX: Use scoped_lock for consistent lock ordering
std::scoped_lock lock(managerMutex, batchMutex); std::scoped_lock lock(managerMutex, batchMutex);
// Remove from TopicTree // Remove from TopicTree
topicTree.unregisterSubscriber(pattern, instanceId); topicTree.unregisterSubscriber(pattern, instanceId);
// Remove from tracking // Remove from tracking
auto& patterns = instancePatterns[instanceId]; auto& patterns = instancePatterns[instanceId];
patterns.erase(std::remove(patterns.begin(), patterns.end(), pattern), patterns.end()); patterns.erase(std::remove(patterns.begin(), patterns.end(), pattern), patterns.end());
subscriptionInfoMap.erase(pattern); subscriptionInfoMap.erase(pattern);
// Remove batch buffer if exists // Remove batch buffer if exists
// NOTE: batchMutex already held via scoped_lock // NOTE: batchMutex already held via scoped_lock
batchBuffers.erase(pattern); batchBuffers.erase(pattern);
logger->info("🗑️ Unregistered subscription: '{}' → '{}'", instanceId, pattern); logger->info("🗑️ Unregistered subscription: '{}' → '{}'", instanceId, pattern);
} }
void IntraIOManager::clearAllRoutes() { void IntraIOManager::clearAllRoutes() {
// DEADLOCK FIX: Use scoped_lock for consistent lock ordering // DEADLOCK FIX: Use scoped_lock for consistent lock ordering
std::scoped_lock lock(managerMutex, batchMutex); std::scoped_lock lock(managerMutex, batchMutex);
auto clearedCount = topicTree.subscriberCount(); auto clearedCount = topicTree.subscriberCount();
topicTree.clear(); topicTree.clear();
instancePatterns.clear(); instancePatterns.clear();
subscriptionInfoMap.clear(); subscriptionInfoMap.clear();
// NOTE: batchMutex already held via scoped_lock // NOTE: batchMutex already held via scoped_lock
batchBuffers.clear(); batchBuffers.clear();
logger->info("🧹 Cleared {} routing entries", clearedCount); logger->info("🧹 Cleared {} routing entries", clearedCount);
} }
size_t IntraIOManager::getInstanceCount() const { size_t IntraIOManager::getInstanceCount() const {
std::shared_lock lock(managerMutex); // READ - concurrent access allowed! std::shared_lock lock(managerMutex); // READ - concurrent access allowed!
return instances.size(); return instances.size();
} }
std::vector<std::string> IntraIOManager::getInstanceIds() const { std::vector<std::string> IntraIOManager::getInstanceIds() const {
std::shared_lock lock(managerMutex); // READ - concurrent access allowed! std::shared_lock lock(managerMutex); // READ - concurrent access allowed!
std::vector<std::string> ids; std::vector<std::string> ids;
for (const auto& pair : instances) { for (const auto& pair : instances) {
ids.push_back(pair.first); ids.push_back(pair.first);
} }
return ids; return ids;
} }
json IntraIOManager::getRoutingStats() const { json IntraIOManager::getRoutingStats() const {
std::shared_lock lock(managerMutex); // READ - concurrent access allowed! std::shared_lock lock(managerMutex); // READ - concurrent access allowed!
json stats; json stats;
stats["total_routed_messages"] = totalRoutedMessages.load(); stats["total_routed_messages"] = totalRoutedMessages.load();
stats["total_routes"] = totalRoutes.load(); stats["total_routes"] = totalRoutes.load();
stats["active_instances"] = instances.size(); stats["active_instances"] = instances.size();
stats["routing_entries"] = topicTree.subscriberCount(); stats["routing_entries"] = topicTree.subscriberCount();
// Instance details // Instance details
json instanceDetails = json::object(); json instanceDetails = json::object();
for (const auto& pair : instances) { for (const auto& pair : instances) {
instanceDetails[pair.first] = { instanceDetails[pair.first] = {
{"active", true}, {"active", true},
{"type", "IntraIO"} {"type", "IntraIO"}
}; };
} }
stats["instances"] = instanceDetails; stats["instances"] = instanceDetails;
return stats; return stats;
} }
void IntraIOManager::setLogLevel(spdlog::level::level_enum level) { void IntraIOManager::setLogLevel(spdlog::level::level_enum level) {
logger->set_level(level); logger->set_level(level);
logger->info("📝 Log level set to: {}", spdlog::level::to_string_view(level)); logger->info("📝 Log level set to: {}", spdlog::level::to_string_view(level));
} }
// Batch flush loop - runs in separate thread // Batch flush loop - runs in separate thread
// DEADLOCK FIX: Collect buffers to flush under batchMutex, then flush under managerMutex // DEADLOCK FIX: Collect buffers to flush under batchMutex, then flush under managerMutex
void IntraIOManager::batchFlushLoop() { void IntraIOManager::batchFlushLoop() {
logger->info("🔄 Batch flush loop started"); logger->info("🔄 Batch flush loop started");
while (batchThreadRunning) { while (batchThreadRunning) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Collect buffers that need flushing (under batchMutex only) // Collect buffers that need flushing (under batchMutex only)
std::vector<BatchBuffer> buffersToFlush; std::vector<BatchBuffer> buffersToFlush;
{ {
std::lock_guard<std::mutex> batchLock(batchMutex); std::lock_guard<std::mutex> batchLock(batchMutex);
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
logger->trace("🔄 Batch flush check: {} buffers", batchBuffers.size()); logger->trace("🔄 Batch flush check: {} buffers", batchBuffers.size());
for (auto& [pattern, buffer] : batchBuffers) { for (auto& [pattern, buffer] : batchBuffers) {
if (buffer.messages.empty()) { if (buffer.messages.empty()) {
continue; continue;
} }
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>( auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - buffer.lastFlush).count(); now - buffer.lastFlush).count();
logger->debug("🔄 Pattern '{}': {} messages, elapsed={}ms, interval={}ms", logger->debug("🔄 Pattern '{}': {} messages, elapsed={}ms, interval={}ms",
pattern, buffer.messages.size(), elapsed, buffer.batchInterval); pattern, buffer.messages.size(), elapsed, buffer.batchInterval);
if (elapsed >= buffer.batchInterval) { if (elapsed >= buffer.batchInterval) {
logger->info("📦 Triggering flush for pattern '{}' ({} messages)", pattern, buffer.messages.size()); logger->info("📦 Triggering flush for pattern '{}' ({} messages)", pattern, buffer.messages.size());
// Copy buffer for flush, clear original // Copy buffer for flush, clear original
buffersToFlush.push_back(buffer); buffersToFlush.push_back(buffer);
buffer.messages.clear(); buffer.messages.clear();
buffer.lastFlush = now; buffer.lastFlush = now;
} }
} }
} }
// batchMutex released here // batchMutex released here
// Now flush each buffer (under managerMutex only) - NO DEADLOCK // Now flush each buffer (under managerMutex only) - NO DEADLOCK
for (auto& buffer : buffersToFlush) { for (auto& buffer : buffersToFlush) {
flushBatchBufferSafe(buffer); flushBatchBufferSafe(buffer);
} }
} }
logger->info("🔄 Batch flush loop stopped"); logger->info("🔄 Batch flush loop stopped");
} }
void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) { void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) {
// DEPRECATED: Use flushBatchBufferSafe instead to avoid deadlocks // DEPRECATED: Use flushBatchBufferSafe instead to avoid deadlocks
flushBatchBufferSafe(buffer); flushBatchBufferSafe(buffer);
} }
// Safe version that only takes managerMutex (called after releasing batchMutex) // Safe version that only takes managerMutex (called after releasing batchMutex)
void IntraIOManager::flushBatchBufferSafe(BatchBuffer& buffer) { void IntraIOManager::flushBatchBufferSafe(BatchBuffer& buffer) {
if (buffer.messages.empty()) { if (buffer.messages.empty()) {
return; return;
} }
std::unique_lock lock(managerMutex); // WRITE - exclusive access needed std::unique_lock lock(managerMutex); // WRITE - exclusive access needed
auto targetInstance = instances.find(buffer.instanceId); auto targetInstance = instances.find(buffer.instanceId);
if (targetInstance == instances.end()) { if (targetInstance == instances.end()) {
logger->warn("⚠️ Cannot flush batch for '{}': instance not found", buffer.instanceId); logger->warn("⚠️ Cannot flush batch for '{}': instance not found", buffer.instanceId);
buffer.messages.clear(); buffer.messages.clear();
return; return;
} }
size_t batchSize = buffer.messages.size(); size_t batchSize = buffer.messages.size();
logger->info("📦 Flushing batch for '{}': {} messages (pattern: {})", logger->info("📦 Flushing batch for '{}': {} messages (pattern: {})",
buffer.instanceId, batchSize, buffer.pattern); buffer.instanceId, batchSize, buffer.pattern);
// Create a single batch message containing all messages as an array // Create a single batch message containing all messages as an array
json batchArray = json::array(); json batchArray = json::array();
std::string firstTopic; std::string firstTopic;
for (const auto& [topic, messageData] : buffer.messages) { for (const auto& [topic, messageData] : buffer.messages) {
if (firstTopic.empty()) { if (firstTopic.empty()) {
firstTopic = topic; firstTopic = topic;
} }
batchArray.push_back({ batchArray.push_back({
{"topic", topic}, {"topic", topic},
{"data", messageData} {"data", messageData}
}); });
} }
// Deliver ONE batch message containing the array // Deliver ONE batch message containing the array
auto batchDataNode = std::make_unique<JsonDataNode>("batch", batchArray); auto batchDataNode = std::make_unique<JsonDataNode>("batch", batchArray);
targetInstance->second->deliverMessage(firstTopic, std::move(batchDataNode), true); targetInstance->second->deliverMessage(firstTopic, std::move(batchDataNode), true);
logger->info("✅ Batch delivered to '{}' successfully", buffer.instanceId); logger->info("✅ Batch delivered to '{}' successfully", buffer.instanceId);
buffer.messages.clear(); buffer.messages.clear();
} }
// Singleton implementation // Singleton implementation
IntraIOManager& IntraIOManager::getInstance() { IntraIOManager& IntraIOManager::getInstance() {
static IntraIOManager instance; static IntraIOManager instance;
return instance; return instance;
} }
} // namespace grove } // namespace grove

View File

@ -1,127 +1,134 @@
#include "AutoCompiler.h" #include "AutoCompiler.h"
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <regex> #include <regex>
#include <filesystem> #include <filesystem>
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
#ifndef _WIN32 #ifndef _WIN32
#include <sys/wait.h> #include <sys/wait.h>
#endif #endif
namespace TestHelpers { namespace TestHelpers {
AutoCompiler::AutoCompiler(const std::string& moduleName, AutoCompiler::AutoCompiler(const std::string& moduleName,
const std::string& buildDir, const std::string& buildDir,
const std::string& sourcePath) const std::string& sourcePath)
: moduleName_(moduleName) : moduleName_(moduleName)
, buildDir_(buildDir) , buildDir_(buildDir)
, sourcePath_(sourcePath) , sourcePath_(sourcePath)
{ {
} }
AutoCompiler::~AutoCompiler() { AutoCompiler::~AutoCompiler() {
stop(); stop();
} }
void AutoCompiler::start(int iterations, int intervalMs) { void AutoCompiler::start(int iterations, int intervalMs) {
if (running_.load()) { if (running_.load()) {
return; // Already running return; // Already running
} }
running_ = true; running_ = true;
compilationThread_ = std::thread(&AutoCompiler::compilationLoop, this, iterations, intervalMs); compilationThread_ = std::thread(&AutoCompiler::compilationLoop, this, iterations, intervalMs);
} }
void AutoCompiler::stop() { void AutoCompiler::stop() {
running_ = false; running_ = false;
if (compilationThread_.joinable()) { if (compilationThread_.joinable()) {
compilationThread_.join(); compilationThread_.join();
} }
} }
void AutoCompiler::waitForCompletion() { void AutoCompiler::waitForCompletion() {
if (compilationThread_.joinable()) { if (compilationThread_.joinable()) {
compilationThread_.join(); compilationThread_.join();
} }
} }
void AutoCompiler::modifySourceVersion(int iteration) { void AutoCompiler::modifySourceVersion(int iteration) {
// Read entire file // Read entire file
std::ifstream inFile(sourcePath_); std::ifstream inFile(sourcePath_);
if (!inFile.is_open()) { if (!inFile.is_open()) {
std::cerr << "[AutoCompiler] Failed to open source file: " << sourcePath_ << std::endl; std::cerr << "[AutoCompiler] Failed to open source file: " << sourcePath_ << std::endl;
return; return;
} }
std::stringstream buffer; std::stringstream buffer;
buffer << inFile.rdbuf(); buffer << inFile.rdbuf();
inFile.close(); inFile.close();
std::string content = buffer.str(); std::string content = buffer.str();
// Replace version string: moduleVersion = "vX" → moduleVersion = "vITERATION" // Replace version string: moduleVersion = "vX" → moduleVersion = "vITERATION"
std::regex versionRegex(R"(std::string\s+moduleVersion\s*=\s*"v\d+")"); std::regex versionRegex(R"(std::string\s+moduleVersion\s*=\s*"v\d+")");
std::string newVersion = "std::string moduleVersion = \"v" + std::to_string(iteration) + "\""; std::string newVersion = "std::string moduleVersion = \"v" + std::to_string(iteration) + "\"";
content = std::regex_replace(content, versionRegex, newVersion); content = std::regex_replace(content, versionRegex, newVersion);
// Write back to file // Write back to file
std::ofstream outFile(sourcePath_); std::ofstream outFile(sourcePath_);
if (!outFile.is_open()) { if (!outFile.is_open()) {
std::cerr << "[AutoCompiler] Failed to write source file: " << sourcePath_ << std::endl; std::cerr << "[AutoCompiler] Failed to write source file: " << sourcePath_ << std::endl;
return; return;
} }
outFile << content; outFile << content;
outFile.close(); outFile.close();
} }
bool AutoCompiler::compile(int iteration) { bool AutoCompiler::compile(int iteration) {
// Modify source version before compiling // Modify source version before compiling
modifySourceVersion(iteration); modifySourceVersion(iteration);
// Small delay to ensure file is written // Small delay to ensure file is written
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
// Build the module using make // Build the module using make
// Note: Tests run from build/tests/, so we use make -C .. to build from build directory #ifdef _WIN32
std::string command; std::string makeCmd = "mingw32-make";
if (buildDir_ == "build") { std::string nullDev = "NUL";
command = "make -C .. " + moduleName_ + " > /dev/null 2>&1"; #else
} else { std::string makeCmd = "make";
command = "make -C " + buildDir_ + " " + moduleName_ + " > /dev/null 2>&1"; std::string nullDev = "/dev/null";
} #endif
int result = std::system(command.c_str()); // Note: Tests run from build/tests/, so we use make -C .. to build from build directory
std::string command;
// std::system returns exit status in platform-specific format if (buildDir_ == "build") {
// WEXITSTATUS is the correct way to extract it on POSIX systems command = makeCmd + " -C .. " + moduleName_ + " > " + nullDev + " 2>&1";
#ifdef _WIN32 } else {
return (result == 0); command = makeCmd + " -C " + buildDir_ + " " + moduleName_ + " > " + nullDev + " 2>&1";
#else }
return (WEXITSTATUS(result) == 0); int result = std::system(command.c_str());
#endif
} // std::system returns exit status in platform-specific format
// WEXITSTATUS is the correct way to extract it on POSIX systems
void AutoCompiler::compilationLoop(int iterations, int intervalMs) { #ifdef _WIN32
for (int i = 1; i <= iterations && running_.load(); ++i) { return (result == 0);
currentIteration_ = i; #else
return (WEXITSTATUS(result) == 0);
// Compile #endif
bool success = compile(i); }
if (success) {
successCount_++; void AutoCompiler::compilationLoop(int iterations, int intervalMs) {
} else { for (int i = 1; i <= iterations && running_.load(); ++i) {
failureCount_++; currentIteration_ = i;
}
// Compile
// Wait for next iteration bool success = compile(i);
if (i < iterations) { if (success) {
std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); successCount_++;
} } else {
} failureCount_++;
}
running_ = false;
} // Wait for next iteration
if (i < iterations) {
} // namespace TestHelpers std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
}
}
running_ = false;
}
} // namespace TestHelpers

View File

@ -125,7 +125,13 @@ int main() {
// Recompiler // Recompiler
std::cout << " 2. Recompiling module...\n"; std::cout << " 2. Recompiling module...\n";
// Note: This test runs from build/tests/, so we use make -C .. to build from build directory // Note: This test runs from build/tests/, so we use make -C .. to build from build directory
int buildResult = system("make -C .. TankModule 2>&1 > /dev/null"); int buildResult = system(
#ifdef _WIN32
"mingw32-make -C .. TankModule 2>&1 > NUL"
#else
"make -C .. TankModule 2>&1 > /dev/null"
#endif
);
if (buildResult != 0) { if (buildResult != 0) {
std::cerr << "❌ Compilation failed!\n"; std::cerr << "❌ Compilation failed!\n";
return 1; return 1;
@ -259,7 +265,13 @@ int main() {
outputRestore.close(); outputRestore.close();
// Rebuild to restore original version (test runs from build/tests/) // Rebuild to restore original version (test runs from build/tests/)
system("make -C .. TankModule 2>&1 > /dev/null"); system(
#ifdef _WIN32
"mingw32-make -C .. TankModule 2>&1 > NUL"
#else
"make -C .. TankModule 2>&1 > /dev/null"
#endif
);
// === RAPPORTS === // === RAPPORTS ===
std::cout << "\n"; std::cout << "\n";

View File

@ -32,7 +32,8 @@ public:
private: private:
std::vector<Tank> tanks; std::vector<Tank> tanks;
int frameCount = 0; int frameCount = 0;
std::string moduleVersion = "v2.0 HOT-RELOADED";:shared_ptr<spdlog::logger> logger; std::string moduleVersion = "v2.0 HOT-RELOADED"; // Module logging
std::shared_ptr<spdlog::logger> logger;
std::unique_ptr<IDataNode> config; std::unique_ptr<IDataNode> config;
void updateTank(Tank& tank, float dt); void updateTank(Tank& tank, float dt);