#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "IIO.h" using json = nlohmann::json; namespace warfactory { /** * @brief Intra-process IO implementation for development and testing * * IntraIO provides same-process pub/sub communication with zero network overhead. * Perfect for development, debugging, and single-process deployments. * * Features: * - Direct function call communication (zero latency) * - Topic pattern matching with wildcards (e.g., "player:*", "economy:*") * - Low-frequency batching with configurable intervals * - Message replacement for reducible topics (latest-only semantics) * - Comprehensive health monitoring and metrics * - Thread-safe operations * - Pull-based message consumption * * Performance characteristics: * - Publish: ~10-50ns (direct memory copy) * - Subscribe: ~100-500ns (pattern compilation) * - Pull: ~50-200ns (queue operations) * - Zero network serialization overhead */ class IntraIO : public IIO { private: std::shared_ptr logger; mutable std::mutex operationMutex; // Thread safety for all operations // Message storage std::queue messageQueue; std::queue lowFreqMessageQueue; // Subscription management struct Subscription { std::regex pattern; std::string originalPattern; SubscriptionConfig config; std::chrono::high_resolution_clock::time_point lastBatch; std::unordered_map batchedMessages; // For replaceable messages std::vector accumulatedMessages; // For non-replaceable messages }; std::vector highFreqSubscriptions; std::vector lowFreqSubscriptions; // Health monitoring mutable std::atomic totalPublished{0}; mutable std::atomic totalPulled{0}; mutable std::atomic totalDropped{0}; mutable std::chrono::high_resolution_clock::time_point lastHealthCheck; mutable float averageProcessingRate = 0.0f; // Configuration static constexpr size_t DEFAULT_MAX_QUEUE_SIZE = 10000; size_t maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; // Helper methods void logIOStart(); bool matchesPattern(const std::string& topic, const std::regex& pattern) const; std::regex compileTopicPattern(const std::string& pattern) const; void processLowFreqSubscriptions(); void flushBatchedMessages(Subscription& sub); void updateHealthMetrics() const; void enforceQueueLimits(); void logPublish(const std::string& topic, const json& message) const; void logSubscription(const std::string& pattern, bool isLowFreq) const; void logPull(const Message& message) const; public: IntraIO(); virtual ~IntraIO(); // IIO implementation void publish(const std::string& topic, const json& message) override; void subscribe(const std::string& topicPattern, const SubscriptionConfig& config = {}) override; void subscribeLowFreq(const std::string& topicPattern, const SubscriptionConfig& config = {}) override; int hasMessages() const override; Message pullMessage() override; IOHealth getHealth() const override; IOType getType() const override; // Configuration and management void setMaxQueueSize(size_t maxSize); size_t getMaxQueueSize() const; void clearAllMessages(); void clearAllSubscriptions(); // Debug and monitoring json getDetailedMetrics() const; void setLogLevel(spdlog::level::level_enum level); size_t getSubscriptionCount() const; std::vector getActiveTopics() const; // Testing utilities void simulateHighLoad(int messageCount, const std::string& topicPrefix = "test"); void forceProcessLowFreqBatches(); }; } // namespace warfactory