diff --git a/src/IntraIOManager.cpp b/src/IntraIOManager.cpp index a12a2b0..d0da86f 100644 --- a/src/IntraIOManager.cpp +++ b/src/IntraIOManager.cpp @@ -166,6 +166,7 @@ void IntraIOManager::routeMessage(const std::string& sourceId, const std::string if (it != subscriptionInfoMap.end() && patternMatches(pattern, topic)) { isLowFreq = it->second.isLowFreq; matchedPattern = pattern; + logger->debug(" 🔍 Pattern '{}' matched topic '{}' → isLowFreq={}", pattern, topic, isLowFreq); break; } } @@ -180,7 +181,8 @@ void IntraIOManager::routeMessage(const std::string& sourceId, const std::string buffer.messages.push_back({topic, messageData}); deliveredCount++; - logger->trace(" 📦 Buffered for '{}' (low-freq batch)", subscriberId); + logger->debug(" 📦 Buffered for '{}' (pattern: {}, buffer size: {})", + subscriberId, matchedPattern, buffer.messages.size()); } else { // High-freq: immediate delivery json dataCopy = messageData; @@ -327,6 +329,8 @@ void IntraIOManager::batchFlushLoop() { std::lock_guard batchLock(batchMutex); auto now = std::chrono::steady_clock::now(); + logger->trace("🔄 Batch flush check: {} buffers", batchBuffers.size()); + for (auto& [pattern, buffer] : batchBuffers) { if (buffer.messages.empty()) { continue; @@ -335,7 +339,11 @@ void IntraIOManager::batchFlushLoop() { auto elapsed = std::chrono::duration_cast( now - buffer.lastFlush).count(); + logger->debug("🔄 Pattern '{}': {} messages, elapsed={}ms, interval={}ms", + pattern, buffer.messages.size(), elapsed, buffer.batchInterval); + if (elapsed >= buffer.batchInterval) { + logger->info("📦 Triggering flush for pattern '{}' ({} messages)", pattern, buffer.messages.size()); flushBatchBuffer(buffer); buffer.lastFlush = now; } @@ -360,7 +368,8 @@ void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) { } size_t batchSize = buffer.messages.size(); - logger->debug("📦 Flushing batch for '{}': {} messages", buffer.instanceId, batchSize); + logger->info("📦 Flushing batch for '{}': {} messages (pattern: {})", + buffer.instanceId, batchSize, buffer.pattern); // Create a single batch message containing all messages as an array json batchArray = json::array(); @@ -380,6 +389,8 @@ void IntraIOManager::flushBatchBuffer(BatchBuffer& buffer) { auto batchDataNode = std::make_unique("batch", batchArray); targetInstance->second->deliverMessage(firstTopic, std::move(batchDataNode), true); + logger->info("✅ Batch delivered to '{}' successfully", buffer.instanceId); + buffer.messages.clear(); }