GroveEngine/include/grove/IntraIO.h
StillHammer 1b7703f07b feat(IIO)!: BREAKING CHANGE - Callback-based message dispatch
## Breaking Change

IIO API redesigned from manual pull+if-forest to callback dispatch.
All modules must update their subscribe() calls to pass handlers.

### Before (OLD API)
```cpp
io->subscribe("input:mouse");

void process(...) {
    while (io->hasMessages()) {
        auto msg = io->pullMessage();
        if (msg.topic == "input:mouse") {
            handleMouse(msg);
        } else if (msg.topic == "input:keyboard") {
            handleKeyboard(msg);
        }
    }
}
```

### After (NEW API)
```cpp
io->subscribe("input:mouse", [this](const Message& msg) {
    handleMouse(msg);
});

void process(...) {
    while (io->hasMessages()) {
        io->pullAndDispatch();  // Callbacks invoked automatically
    }
}
```

## Changes

**Core API (include/grove/IIO.h)**
- Added: `using MessageHandler = std::function<void(const Message&)>`
- Changed: `subscribe()` now requires `MessageHandler` callback parameter
- Changed: `subscribeLowFreq()` now requires `MessageHandler` callback
- Removed: `pullMessage()`
- Added: `pullAndDispatch()` - pulls and auto-dispatches to handlers

**Implementation (src/IntraIO.cpp)**
- Store callbacks in `Subscription.handler`
- `pullAndDispatch()` matches topic against ALL subscriptions (not just first)
- Fixed: Regex pattern compilation supports both wildcards (*) and regex (.*)
- Performance: ~1000 msg/s throughput (unchanged from before)

**Files Updated**
- 31 test/module files migrated to callback API (via parallel agents)
- 8 documentation files updated (DEVELOPER_GUIDE, USER_GUIDE, module READMEs)

## Bugs Fixed During Migration

1. **pullAndDispatch() early return bug**: Was only calling FIRST matching handler
   - Fix: Loop through ALL subscriptions, invoke all matching handlers

2. **Regex pattern compilation bug**: Pattern "player:.*" failed to match
   - Fix: Detect ".*" in pattern → use as regex, otherwise escape and convert wildcards

## Testing

 test_11_io_system: PASSED (IIO pub/sub, pattern matching, batching)
 test_threaded_module_system: 6/6 PASSED
 test_threaded_stress: 5/5 PASSED (50 modules, 100x reload, concurrent ops)
 test_12_datanode: PASSED
 10 TopicTree scenarios: 10/10 PASSED
 benchmark_e2e: ~1000 msg/s throughput

Total: 23+ tests passing

## Performance Impact

No performance regression from callback dispatch:
- IIO throughput: ~1000 msg/s (same as before)
- ThreadedModuleSystem: Speedup ~1.0x (barrier pattern expected)

## Migration Guide

For all modules using IIO:

1. Update subscribe() calls to include handler lambda
2. Replace pullMessage() loops with pullAndDispatch()
3. Move topic-specific logic from if-forest into callbacks

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-19 14:19:27 +07:00

145 lines
5.3 KiB
C++

#pragma once
#include <memory>
#include <string>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <regex>
#include <mutex>
#include <chrono>
#include <atomic>
#include <spdlog/spdlog.h>
#include <nlohmann/json.hpp>
#include "IIO.h"
using json = nlohmann::json;
namespace grove {
// Interface for message delivery to avoid circular include
class IIntraIODelivery {
public:
virtual ~IIntraIODelivery() = default;
virtual void deliverMessage(const std::string& topic, std::unique_ptr<IDataNode> message, bool isLowFreq) = 0;
virtual const std::string& getInstanceId() const = 0;
};
/**
* @brief Intra-process IO implementation with central routing
*
* IntraIO provides same-process pub/sub communication with zero network overhead.
* Each module gets its own IntraIO instance, and messages are routed through
* IntraIOManager for proper multi-module delivery.
*
* Features:
* - Per-module isolation (one instance per module)
* - Central routing via IntraIOManager
* - Topic pattern matching with wildcards (e.g., "player:*", "economy:*")
* - Low-frequency batching with configurable intervals
* - Message replacement for reducible topics (latest-only semantics)
* - Comprehensive health monitoring and metrics
* - Thread-safe operations
* - Pull-based message consumption
*
* Performance characteristics:
* - Publish: ~10-50ns (direct memory copy + routing)
* - Subscribe: ~100-500ns (pattern registration)
* - Pull: ~50-200ns (queue operations)
* - Zero network serialization overhead
*/
class IntraIO : public IIO, public IIntraIODelivery {
private:
std::shared_ptr<spdlog::logger> logger;
mutable std::mutex operationMutex; // Thread safety for all operations
// Instance identification for routing
std::string instanceId;
// Message storage
std::queue<Message> messageQueue;
std::queue<Message> lowFreqMessageQueue;
// Subscription management
struct Subscription {
std::regex pattern;
std::string originalPattern;
MessageHandler handler; // Callback for this subscription
SubscriptionConfig config;
std::chrono::high_resolution_clock::time_point lastBatch;
std::unordered_map<std::string, Message> batchedMessages; // For replaceable messages
std::vector<Message> accumulatedMessages; // For non-replaceable messages
// Default constructor
Subscription() = default;
// Move-only (Message contains unique_ptr, handler is copyable)
Subscription(Subscription&&) = default;
Subscription& operator=(Subscription&&) = default;
Subscription(const Subscription&) = delete;
Subscription& operator=(const Subscription&) = delete;
};
std::vector<Subscription> highFreqSubscriptions;
std::vector<Subscription> lowFreqSubscriptions;
// Health monitoring
mutable std::atomic<size_t> totalPublished{0};
mutable std::atomic<size_t> totalPulled{0};
mutable std::atomic<size_t> totalDropped{0};
mutable std::chrono::high_resolution_clock::time_point lastHealthCheck;
mutable float averageProcessingRate = 0.0f;
// Configuration
static constexpr size_t DEFAULT_MAX_QUEUE_SIZE = 10000;
size_t maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
// Helper methods
void logIOStart();
bool matchesPattern(const std::string& topic, const std::regex& pattern) const;
std::regex compileTopicPattern(const std::string& pattern) const;
void processLowFreqSubscriptions();
void flushBatchedMessages(Subscription& sub);
void updateHealthMetrics() const;
void enforceQueueLimits();
void logPublish(const std::string& topic, const IDataNode& message) const;
void logSubscription(const std::string& pattern, bool isLowFreq) const;
void logPull(const Message& message) const;
public:
IntraIO(const std::string& instanceId);
virtual ~IntraIO();
// IIO implementation
void publish(const std::string& topic, std::unique_ptr<IDataNode> message) override;
void subscribe(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config = {}) override;
void subscribeLowFreq(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config = {}) override;
int hasMessages() const override;
void pullAndDispatch() override;
IOHealth getHealth() const override;
IOType getType() const override;
// Configuration and management
void setMaxQueueSize(size_t maxSize);
size_t getMaxQueueSize() const;
void clearAllMessages();
void clearAllSubscriptions();
// Debug and monitoring
json getDetailedMetrics() const;
void setLogLevel(spdlog::level::level_enum level);
size_t getSubscriptionCount() const;
std::vector<std::string> getActiveTopics() const;
// Testing utilities
void simulateHighLoad(int messageCount, const std::string& topicPrefix = "test");
void forceProcessLowFreqBatches();
// Manager interface (called by IntraIOManager)
void deliverMessage(const std::string& topic, std::unique_ptr<IDataNode> message, bool isLowFreq) override;
const std::string& getInstanceId() const override;
};
} // namespace grove