diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 7622566..1c77f21 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -1,17 +1,13 @@ cmake_minimum_required(VERSION 3.20) -project(WarfactoryCore LANGUAGES CXX) +project(UnifiedIOTest LANGUAGES CXX) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) -# Output directories -set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) - # Core includes include_directories(include) -# Find spdlog for real implementations -find_package(PkgConfig QUIET) +# Find spdlog and nlohmann_json find_package(spdlog QUIET) find_package(nlohmann_json QUIET) @@ -25,14 +21,25 @@ if(NOT nlohmann_json_FOUND) FetchContent_MakeAvailable(nlohmann_json) endif() -# Skip spdlog for now - just focused test +if(NOT spdlog_FOUND) + include(FetchContent) + FetchContent_Declare(spdlog + GIT_REPOSITORY https://github.com/gabime/spdlog.git + GIT_TAG v1.12.0 + ) + FetchContent_MakeAvailable(spdlog) +endif() -# Focused hot-reload performance test -add_executable(focused-hot-reload-test - src/focused_hot_reload_test.cpp +# Unified IO Test +add_executable(unified-io-test + src/test_unified_io.cpp + src/IntraIO.cpp + src/IntraIOManager.cpp + src/IOFactory.cpp ) -target_link_libraries(focused-hot-reload-test +target_link_libraries(unified-io-test PRIVATE nlohmann_json::nlohmann_json - PRIVATE ${CMAKE_DL_LIBS} + PRIVATE spdlog::spdlog + PRIVATE pthread ) \ No newline at end of file diff --git a/core/include/warfactory/IOFactory.h b/core/include/warfactory/IOFactory.h index 58f9341..10e1551 100644 --- a/core/include/warfactory/IOFactory.h +++ b/core/include/warfactory/IOFactory.h @@ -37,22 +37,25 @@ public: /** * @brief Create IO transport from string type name * @param transportType String representation of transport type + * @param instanceId Unique identifier for this IO instance (required for IntraIO) * @return Unique pointer to IO implementation * @throws std::invalid_argument if transport type is unknown */ - static std::unique_ptr create(const std::string& transportType); + static std::unique_ptr create(const std::string& transportType, const std::string& instanceId = ""); /** * @brief Create IO transport from enum type * @param ioType IOType enum value + * @param instanceId Unique identifier for this IO instance (required for IntraIO) * @return Unique pointer to IO implementation * @throws std::invalid_argument if type is not implemented */ - static std::unique_ptr create(IOType ioType); + static std::unique_ptr create(IOType ioType, const std::string& instanceId = ""); /** * @brief Create IO transport from JSON configuration * @param config JSON configuration object + * @param instanceId Unique identifier for this IO instance (required for IntraIO) * @return Unique pointer to configured IO transport * @throws std::invalid_argument if config is invalid * @@ -60,6 +63,7 @@ public: * ```json * { * "type": "network", + * "instance_id": "module-name", * "host": "localhost", * "port": 8080, * "protocol": "tcp", @@ -69,7 +73,7 @@ public: * } * ``` */ - static std::unique_ptr createFromConfig(const json& config); + static std::unique_ptr createFromConfig(const json& config, const std::string& instanceId = ""); /** * @brief Get list of available transport types @@ -114,10 +118,12 @@ public: * @brief Create IO transport with automatic endpoint discovery * @param transportType Transport type to create * @param endpoint Optional endpoint specification (auto-detected if empty) + * @param instanceId Unique identifier for this IO instance (required for IntraIO) * @return Unique pointer to configured IO transport */ static std::unique_ptr createWithEndpoint(const std::string& transportType, - const std::string& endpoint = ""); + const std::string& endpoint = "", + const std::string& instanceId = ""); private: static std::shared_ptr getFactoryLogger(); diff --git a/core/include/warfactory/IntraIO.h b/core/include/warfactory/IntraIO.h index 361635f..981fa5f 100644 --- a/core/include/warfactory/IntraIO.h +++ b/core/include/warfactory/IntraIO.h @@ -19,14 +19,24 @@ using json = nlohmann::json; namespace warfactory { +// Interface for message delivery to avoid circular include +class IIntraIODelivery { +public: + virtual ~IIntraIODelivery() = default; + virtual void deliverMessage(const std::string& topic, const json& message, bool isLowFreq) = 0; + virtual const std::string& getInstanceId() const = 0; +}; + /** - * @brief Intra-process IO implementation for development and testing + * @brief Intra-process IO implementation with central routing * * IntraIO provides same-process pub/sub communication with zero network overhead. - * Perfect for development, debugging, and single-process deployments. + * Each module gets its own IntraIO instance, and messages are routed through + * IntraIOManager for proper multi-module delivery. * * Features: - * - Direct function call communication (zero latency) + * - Per-module isolation (one instance per module) + * - Central routing via IntraIOManager * - Topic pattern matching with wildcards (e.g., "player:*", "economy:*") * - Low-frequency batching with configurable intervals * - Message replacement for reducible topics (latest-only semantics) @@ -35,16 +45,19 @@ namespace warfactory { * - Pull-based message consumption * * Performance characteristics: - * - Publish: ~10-50ns (direct memory copy) - * - Subscribe: ~100-500ns (pattern compilation) + * - Publish: ~10-50ns (direct memory copy + routing) + * - Subscribe: ~100-500ns (pattern registration) * - Pull: ~50-200ns (queue operations) * - Zero network serialization overhead */ -class IntraIO : public IIO { +class IntraIO : public IIO, public IIntraIODelivery { private: std::shared_ptr logger; mutable std::mutex operationMutex; // Thread safety for all operations + // Instance identification for routing + std::string instanceId; + // Message storage std::queue messageQueue; std::queue lowFreqMessageQueue; @@ -86,7 +99,7 @@ private: void logPull(const Message& message) const; public: - IntraIO(); + IntraIO(const std::string& instanceId); virtual ~IntraIO(); // IIO implementation @@ -113,6 +126,10 @@ public: // Testing utilities void simulateHighLoad(int messageCount, const std::string& topicPrefix = "test"); void forceProcessLowFreqBatches(); + + // Manager interface (called by IntraIOManager) + void deliverMessage(const std::string& topic, const json& message, bool isLowFreq); + const std::string& getInstanceId() const; }; } // namespace warfactory \ No newline at end of file diff --git a/core/include/warfactory/IntraIOManager.h b/core/include/warfactory/IntraIOManager.h new file mode 100644 index 0000000..3bb2b9e --- /dev/null +++ b/core/include/warfactory/IntraIOManager.h @@ -0,0 +1,91 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "IIO.h" + +using json = nlohmann::json; + +namespace warfactory { + +class IntraIO; // Forward declaration +class IIntraIODelivery; // Forward declaration + +// Factory function for creating IntraIO (defined in IntraIO.cpp to avoid circular include) +std::shared_ptr createIntraIOInstance(const std::string& instanceId); + +/** + * @brief Central router for IntraIO instances + * + * IntraIOManager coordinates message passing between multiple IntraIO instances. + * Each module gets its own IntraIO instance, and the manager handles routing + * messages between them based on subscriptions. + * + * Architecture: + * - One IntraIO instance per module (isolation) + * - Central routing of messages between instances + * - Pattern-based subscription matching + * - Thread-safe operations + * + * Performance: + * - Direct memory routing (no serialization) + * - Pattern caching for fast lookup + * - Batched delivery for efficiency + */ +class IntraIOManager { +private: + std::shared_ptr logger; + mutable std::mutex managerMutex; + + // Registry of IntraIO instances + std::unordered_map> instances; + + // Subscription routing table + struct RouteEntry { + std::string instanceId; + std::regex pattern; + std::string originalPattern; + bool isLowFreq; + }; + std::vector routingTable; + + // Statistics + mutable std::atomic totalRoutedMessages{0}; + mutable std::atomic totalRoutes{0}; + +public: + IntraIOManager(); + ~IntraIOManager(); + + // Instance management + std::shared_ptr createInstance(const std::string& instanceId); + void registerInstance(const std::string& instanceId, std::shared_ptr instance); + void removeInstance(const std::string& instanceId); + std::shared_ptr getInstance(const std::string& instanceId) const; + + // Routing (called by IntraIO instances) + void routeMessage(const std::string& sourceid, const std::string& topic, const json& message); + void registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq); + void unregisterSubscription(const std::string& instanceId, const std::string& pattern); + + // Management + void clearAllRoutes(); + size_t getInstanceCount() const; + std::vector getInstanceIds() const; + + // Debug and monitoring + json getRoutingStats() const; + void setLogLevel(spdlog::level::level_enum level); + + // Singleton access (for global routing) + static IntraIOManager& getInstance(); +}; + +} // namespace warfactory \ No newline at end of file diff --git a/core/src/IOFactory.cpp b/core/src/IOFactory.cpp index 1cdbfef..e373fc9 100644 --- a/core/src/IOFactory.cpp +++ b/core/src/IOFactory.cpp @@ -1,37 +1,61 @@ #include #include #include +#include #include // Include implemented transports #include +#include // Forward declarations for future implementations // #include "LocalIO.h" // #include "NetworkIO.h" namespace warfactory { -std::unique_ptr IOFactory::create(const std::string& transportType) { +std::unique_ptr IOFactory::create(const std::string& transportType, const std::string& instanceId) { auto logger = getFactoryLogger(); - logger->info("๐ŸŒ IOFactory: Creating transport '{}'", transportType); + logger->info("๐ŸŒ IOFactory: Creating transport '{}' with instanceId '{}'", transportType, instanceId); IOType type = parseTransport(transportType); - return create(type); + return create(type, instanceId); } -std::unique_ptr IOFactory::create(IOType ioType) { +std::unique_ptr IOFactory::create(IOType ioType, const std::string& instanceId) { auto logger = getFactoryLogger(); std::string typeStr = transportToString(ioType); - logger->info("๐ŸŒ IOFactory: Creating enum type '{}'", typeStr); + logger->info("๐ŸŒ IOFactory: Creating enum type '{}' with instanceId '{}'", typeStr, instanceId); std::unique_ptr io; switch (ioType) { - case IOType::INTRA: + case IOType::INTRA: { logger->debug("๐Ÿ”ง Creating IntraIO instance"); - io = std::make_unique(); - logger->info("โœ… IntraIO created successfully"); + + // Generate instanceId if not provided + std::string actualInstanceId = instanceId; + if (actualInstanceId.empty()) { + actualInstanceId = "intra-" + std::to_string(std::random_device{}() % 10000); + logger->debug("๐Ÿ”ง Generated instanceId: '{}'", actualInstanceId); + } + + // TEMPORARY SOLUTION: Create direct IntraIO instance + // TODO: Properly integrate with IntraIOManager without type issues + io = std::make_unique(actualInstanceId); + + // Manually register with manager for routing + auto& manager = IntraIOManager::getInstance(); + manager.registerInstance(actualInstanceId, + std::static_pointer_cast( + std::shared_ptr(static_cast(io.get()), [](IntraIO*) { + // Don't delete - unique_ptr will handle it + }) + ) + ); + + logger->info("โœ… IntraIO created successfully with instanceId '{}'", actualInstanceId); break; + } case IOType::LOCAL: logger->debug("๐Ÿ”ง Creating LocalIO instance"); @@ -58,9 +82,9 @@ std::unique_ptr IOFactory::create(IOType ioType) { return io; } -std::unique_ptr IOFactory::createFromConfig(const json& config) { +std::unique_ptr IOFactory::createFromConfig(const json& config, const std::string& instanceId) { auto logger = getFactoryLogger(); - logger->info("๐ŸŒ IOFactory: Creating from config"); + logger->info("๐ŸŒ IOFactory: Creating from config with instanceId '{}'", instanceId); logger->trace("๐Ÿ“„ Config: {}", config.dump()); try { @@ -72,8 +96,15 @@ std::unique_ptr IOFactory::createFromConfig(const json& config) { std::string transportType = config["type"]; logger->info("๐Ÿ“‹ Config specifies transport: '{}'", transportType); + // Get instanceId from config or parameter + std::string actualInstanceId = instanceId; + if (actualInstanceId.empty() && config.contains("instance_id")) { + actualInstanceId = config["instance_id"]; + logger->debug("๐Ÿ”ง Using instanceId from config: '{}'", actualInstanceId); + } + // Create base IO transport - auto io = create(transportType); + auto io = create(transportType, actualInstanceId); auto ioType = io->getType(); // Apply transport-specific configuration @@ -212,12 +243,12 @@ IOType IOFactory::getRecommendedTransport(int expectedClients, bool distributed, } } -std::unique_ptr IOFactory::createWithEndpoint(const std::string& transportType, const std::string& endpoint) { +std::unique_ptr IOFactory::createWithEndpoint(const std::string& transportType, const std::string& endpoint, const std::string& instanceId) { auto logger = getFactoryLogger(); - logger->info("๐ŸŒ IOFactory: Creating '{}' with endpoint '{}'", transportType, endpoint); + logger->info("๐ŸŒ IOFactory: Creating '{}' with endpoint '{}' and instanceId '{}'", transportType, endpoint, instanceId); IOType ioType = parseTransport(transportType); - auto io = create(ioType); + auto io = create(ioType, instanceId); std::string actualEndpoint = endpoint; if (endpoint.empty()) { diff --git a/core/src/IntraIO.cpp b/core/src/IntraIO.cpp index 76385a2..640aa38 100644 --- a/core/src/IntraIO.cpp +++ b/core/src/IntraIO.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -7,7 +8,12 @@ namespace warfactory { -IntraIO::IntraIO() { +// Factory function for IntraIOManager to avoid circular include +std::shared_ptr createIntraIOInstance(const std::string& instanceId) { + return std::make_shared(instanceId); +} + +IntraIO::IntraIO(const std::string& instanceId) : instanceId(instanceId) { // Create logger with file and console output auto console_sink = std::make_shared(); auto file_sink = std::make_shared("logs/intra_io.log", true); @@ -15,7 +21,7 @@ IntraIO::IntraIO() { console_sink->set_level(spdlog::level::debug); file_sink->set_level(spdlog::level::trace); - logger = std::make_shared("IntraIO", + logger = std::make_shared("IntraIO[" + instanceId + "]", spdlog::sinks_init_list{console_sink, file_sink}); logger->set_level(spdlog::level::trace); logger->flush_on(spdlog::level::debug); @@ -27,69 +33,35 @@ IntraIO::IntraIO() { } IntraIO::~IntraIO() { - logger->info("๐ŸŒ IntraIO destructor called"); + logger->info("๐ŸŒ IntraIO[{}] destructor called", instanceId); + + // Unregister from manager + try { + IntraIOManager::getInstance().removeInstance(instanceId); + } catch (const std::exception& e) { + logger->warn("โš ๏ธ Failed to unregister from manager: {}", e.what()); + } auto finalMetrics = getDetailedMetrics(); - logger->info("๐Ÿ“Š Final IntraIO metrics:"); + logger->info("๐Ÿ“Š Final IntraIO[{}] metrics:", instanceId); logger->info(" Total published: {}", finalMetrics["total_published"]); logger->info(" Total pulled: {}", finalMetrics["total_pulled"]); logger->info(" Total dropped: {}", finalMetrics["total_dropped"]); logger->info(" Final queue size: {}", finalMetrics["queue_size"]); - logger->trace("๐Ÿ—๏ธ IntraIO destroyed"); + logger->trace("๐Ÿ—๏ธ IntraIO[{}] destroyed", instanceId); } void IntraIO::publish(const std::string& topic, const json& message) { std::lock_guard lock(operationMutex); logPublish(topic, message); - - auto timestamp = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now().time_since_epoch()).count(); - - Message msg{topic, message, static_cast(timestamp)}; + totalPublished++; try { - // Check if message matches any high-frequency subscriptions - bool matchedHighFreq = false; - for (const auto& sub : highFreqSubscriptions) { - if (matchesPattern(topic, sub.pattern)) { - messageQueue.push(msg); - matchedHighFreq = true; - logger->trace("๐Ÿ“จ Message matched high-freq pattern: '{}'", sub.originalPattern); - break; // Only add once to high-freq queue - } - } - - // Check if message matches any low-frequency subscriptions - for (auto& sub : lowFreqSubscriptions) { - if (matchesPattern(topic, sub.pattern)) { - logger->trace("๐Ÿ“จ Message matched low-freq pattern: '{}'", sub.originalPattern); - - if (sub.config.replaceable) { - // Replace existing message for this topic - sub.batchedMessages[topic] = msg; - logger->trace("๐Ÿ”„ Replaceable message updated for topic: '{}'", topic); - } else { - // Accumulate message - sub.accumulatedMessages.push_back(msg); - logger->trace("๐Ÿ“š Message accumulated for topic: '{}'", topic); - } - } - } - - if (!matchedHighFreq && lowFreqSubscriptions.empty()) { - // No subscriptions matched - still count as published but log warning - logger->trace("โš ๏ธ Published message has no subscribers: '{}'", topic); - } - - totalPublished++; - - // Process low-frequency batches if needed - processLowFreqSubscriptions(); - - // Enforce queue size limits - enforceQueueLimits(); + // Route message through manager to all interested instances + IntraIOManager::getInstance().routeMessage(instanceId, topic, message); + logger->trace("๐Ÿ“ค Message routed through manager: '{}'", topic); } catch (const std::exception& e) { logger->error("โŒ Error publishing message to topic '{}': {}", topic, e.what()); @@ -103,6 +75,9 @@ void IntraIO::subscribe(const std::string& topicPattern, const SubscriptionConfi logSubscription(topicPattern, false); try { + // Register with manager for routing + IntraIOManager::getInstance().registerSubscription(instanceId, topicPattern, false); + Subscription sub; sub.pattern = compileTopicPattern(topicPattern); sub.originalPattern = topicPattern; @@ -127,6 +102,9 @@ void IntraIO::subscribeLowFreq(const std::string& topicPattern, const Subscripti logSubscription(topicPattern, true); try { + // Register with manager for routing + IntraIOManager::getInstance().registerSubscription(instanceId, topicPattern, true); + Subscription sub; sub.pattern = compileTopicPattern(topicPattern); sub.originalPattern = topicPattern; @@ -349,10 +327,10 @@ std::regex IntraIO::compileTopicPattern(const std::string& pattern) const { } // Convert * to regex equivalent - size_t pos = 0; - while ((pos = regexPattern.find("\\*", pos)) != std::string::npos) { - regexPattern.replace(pos, 2, ".*"); - pos += 2; + size_t pos2 = 0; + while ((pos2 = regexPattern.find("*", pos2)) != std::string::npos) { + regexPattern.replace(pos2, 1, ".*"); + pos2 += 2; } logger->trace("๐Ÿ” Compiled pattern '{}' -> '{}'", pattern, regexPattern); @@ -452,4 +430,55 @@ void IntraIO::logPull(const Message& message) const { message.topic, message.timestamp, message.data.dump().size()); } +void IntraIO::deliverMessage(const std::string& topic, const json& message, bool isLowFreq) { + std::lock_guard lock(operationMutex); + + auto timestamp = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + + Message msg{topic, message, static_cast(timestamp)}; + + try { + if (isLowFreq) { + // Handle low-frequency message delivery + for (auto& sub : lowFreqSubscriptions) { + if (matchesPattern(topic, sub.pattern)) { + if (sub.config.replaceable) { + sub.batchedMessages[topic] = msg; + logger->trace("๐Ÿ”„ Low-freq replaceable message delivered: '{}'", topic); + } else { + sub.accumulatedMessages.push_back(msg); + logger->trace("๐Ÿ“š Low-freq message accumulated: '{}'", topic); + } + break; + } + } + } else { + // Handle high-frequency message delivery + logger->info("๐Ÿ” deliverMessage: looking for high-freq subscriptions for '{}', have {} subs", topic, highFreqSubscriptions.size()); + for (const auto& sub : highFreqSubscriptions) { + logger->info("๐Ÿ” deliverMessage: testing pattern '{}' vs topic '{}'", sub.originalPattern, topic); + if (matchesPattern(topic, sub.pattern)) { + messageQueue.push(msg); + logger->info("๐Ÿ“จ High-freq message delivered to queue: '{}'", topic); + break; + } else { + logger->info("โŒ Pattern '{}' did not match topic '{}'", sub.originalPattern, topic); + } + } + } + + // Enforce queue limits + enforceQueueLimits(); + + } catch (const std::exception& e) { + logger->error("โŒ Error delivering message to topic '{}': {}", topic, e.what()); + throw; + } +} + +const std::string& IntraIO::getInstanceId() const { + return instanceId; +} + } // namespace warfactory \ No newline at end of file diff --git a/core/src/IntraIOManager.cpp b/core/src/IntraIOManager.cpp new file mode 100644 index 0000000..ebfd646 --- /dev/null +++ b/core/src/IntraIOManager.cpp @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include + +namespace warfactory { + +IntraIOManager::IntraIOManager() { + // Create logger + auto console_sink = std::make_shared(); + auto file_sink = std::make_shared("logs/intra_io_manager.log", true); + + console_sink->set_level(spdlog::level::debug); + file_sink->set_level(spdlog::level::trace); + + logger = std::make_shared("IntraIOManager", + spdlog::sinks_init_list{console_sink, file_sink}); + logger->set_level(spdlog::level::trace); + logger->flush_on(spdlog::level::debug); + + spdlog::register_logger(logger); + + logger->info("๐ŸŒ๐Ÿ”— IntraIOManager created - Central message router initialized"); +} + +IntraIOManager::~IntraIOManager() { + std::lock_guard lock(managerMutex); + + auto stats = getRoutingStats(); + logger->info("๐Ÿ“Š Final routing stats:"); + logger->info(" Total routed messages: {}", stats["total_routed_messages"]); + logger->info(" Total routes: {}", stats["total_routes"]); + logger->info(" Active instances: {}", stats["active_instances"]); + + instances.clear(); + routingTable.clear(); + + logger->info("๐ŸŒ๐Ÿ”— IntraIOManager destroyed"); +} + +std::shared_ptr IntraIOManager::createInstance(const std::string& instanceId) { + std::lock_guard lock(managerMutex); + + auto it = instances.find(instanceId); + if (it != instances.end()) { + logger->warn("โš ๏ธ Instance '{}' already exists, returning existing", instanceId); + // Need to cast back to IntraIO + return std::static_pointer_cast(it->second); + } + + // Create new IntraIO instance via factory function + auto instance = createIntraIOInstance(instanceId); + instances[instanceId] = instance; + + logger->info("โœ… Created IntraIO instance: '{}'", instanceId); + logger->debug("๐Ÿ“Š Total instances: {}", instances.size()); + + return instance; +} + +void IntraIOManager::registerInstance(const std::string& instanceId, std::shared_ptr instance) { + std::lock_guard lock(managerMutex); + instances[instanceId] = instance; + logger->info("๐Ÿ“‹ Registered instance: '{}'", instanceId); +} + +void IntraIOManager::removeInstance(const std::string& instanceId) { + std::lock_guard lock(managerMutex); + + auto it = instances.find(instanceId); + if (it == instances.end()) { + logger->warn("โš ๏ธ Instance '{}' not found for removal", instanceId); + return; + } + + // Remove all routing entries for this instance + routingTable.erase( + std::remove_if(routingTable.begin(), routingTable.end(), + [&instanceId](const RouteEntry& entry) { + return entry.instanceId == instanceId; + }), + routingTable.end() + ); + + instances.erase(it); + + logger->info("๐Ÿ—‘๏ธ Removed IntraIO instance: '{}'", instanceId); + logger->debug("๐Ÿ“Š Remaining instances: {}", instances.size()); +} + +std::shared_ptr IntraIOManager::getInstance(const std::string& instanceId) const { + std::lock_guard lock(managerMutex); + + auto it = instances.find(instanceId); + if (it != instances.end()) { + return std::static_pointer_cast(it->second); + } + return nullptr; +} + +void IntraIOManager::routeMessage(const std::string& sourceId, const std::string& topic, const json& message) { + std::lock_guard lock(managerMutex); + + totalRoutedMessages++; + size_t deliveredCount = 0; + + logger->info("๐Ÿ“จ Routing message: {} โ†’ '{}'", sourceId, topic); + + // Find all matching routes + for (const auto& route : routingTable) { + // Don't deliver back to sender + if (route.instanceId == sourceId) { + continue; + } + + // Check pattern match + logger->info(" ๐Ÿ” Testing pattern '{}' against topic '{}'", route.originalPattern, topic); + if (std::regex_match(topic, route.pattern)) { + auto targetInstance = instances.find(route.instanceId); + if (targetInstance != instances.end()) { + // Direct delivery to target instance's queue + targetInstance->second->deliverMessage(topic, message, route.isLowFreq); + deliveredCount++; + logger->info(" โ†ช๏ธ Delivered to '{}' ({})", + route.instanceId, + route.isLowFreq ? "low-freq" : "high-freq"); + } else { + logger->warn("โš ๏ธ Target instance '{}' not found for route", route.instanceId); + } + } else { + logger->info(" โŒ Pattern '{}' did not match topic '{}'", route.originalPattern, topic); + } + } + + if (deliveredCount > 0) { + logger->debug("๐Ÿ“ค Message '{}' delivered to {} instances", topic, deliveredCount); + } else { + logger->trace("๐Ÿ“ช No subscribers for topic '{}'", topic); + } +} + +void IntraIOManager::registerSubscription(const std::string& instanceId, const std::string& pattern, bool isLowFreq) { + std::lock_guard lock(managerMutex); + + try { + // Convert topic pattern to regex - use same logic as IntraIO + std::string regexPattern = pattern; + + // Escape special regex characters except our wildcards (: is NOT special) + std::string specialChars = ".^$+()[]{}|\\"; + for (char c : specialChars) { + std::string from = std::string(1, c); + std::string to = "\\" + from; + + size_t pos = 0; + while ((pos = regexPattern.find(from, pos)) != std::string::npos) { + regexPattern.replace(pos, 1, to); + pos += 2; + } + } + + // Convert * to regex equivalent + size_t pos2 = 0; + while ((pos2 = regexPattern.find("*", pos2)) != std::string::npos) { + regexPattern.replace(pos2, 1, ".*"); + pos2 += 2; + } + + logger->info("๐Ÿ” Pattern conversion: '{}' โ†’ '{}'", pattern, regexPattern); + + RouteEntry entry; + entry.instanceId = instanceId; + entry.pattern = std::regex(regexPattern); + entry.originalPattern = pattern; + entry.isLowFreq = isLowFreq; + + routingTable.push_back(entry); + totalRoutes++; + + logger->info("๐Ÿ“‹ Registered subscription: '{}' โ†’ '{}' ({})", + instanceId, pattern, isLowFreq ? "low-freq" : "high-freq"); + logger->debug("๐Ÿ“Š Total routes: {}", routingTable.size()); + + } catch (const std::exception& e) { + logger->error("โŒ Failed to register subscription '{}' for '{}': {}", + pattern, instanceId, e.what()); + throw; + } +} + +void IntraIOManager::unregisterSubscription(const std::string& instanceId, const std::string& pattern) { + std::lock_guard lock(managerMutex); + + auto oldSize = routingTable.size(); + routingTable.erase( + std::remove_if(routingTable.begin(), routingTable.end(), + [&instanceId, &pattern](const RouteEntry& entry) { + return entry.instanceId == instanceId && entry.originalPattern == pattern; + }), + routingTable.end() + ); + + auto removed = oldSize - routingTable.size(); + if (removed > 0) { + logger->info("๐Ÿ—‘๏ธ Unregistered {} subscription(s): '{}' โ†’ '{}'", removed, instanceId, pattern); + } else { + logger->warn("โš ๏ธ Subscription not found for removal: '{}' โ†’ '{}'", instanceId, pattern); + } +} + +void IntraIOManager::clearAllRoutes() { + std::lock_guard lock(managerMutex); + + auto clearedCount = routingTable.size(); + routingTable.clear(); + + logger->info("๐Ÿงน Cleared {} routing entries", clearedCount); +} + +size_t IntraIOManager::getInstanceCount() const { + std::lock_guard lock(managerMutex); + return instances.size(); +} + +std::vector IntraIOManager::getInstanceIds() const { + std::lock_guard lock(managerMutex); + + std::vector ids; + for (const auto& pair : instances) { + ids.push_back(pair.first); + } + return ids; +} + +json IntraIOManager::getRoutingStats() const { + std::lock_guard lock(managerMutex); + + json stats; + stats["total_routed_messages"] = totalRoutedMessages.load(); + stats["total_routes"] = totalRoutes.load(); + stats["active_instances"] = instances.size(); + stats["routing_entries"] = routingTable.size(); + + // Instance details + json instanceDetails = json::object(); + for (const auto& pair : instances) { + instanceDetails[pair.first] = { + {"active", true}, + {"type", "IntraIO"} + }; + } + stats["instances"] = instanceDetails; + + return stats; +} + +void IntraIOManager::setLogLevel(spdlog::level::level_enum level) { + logger->set_level(level); + logger->info("๐Ÿ“ Log level set to: {}", spdlog::level::to_string_view(level)); +} + +// Singleton implementation +IntraIOManager& IntraIOManager::getInstance() { + static IntraIOManager instance; + return instance; +} + +} // namespace warfactory \ No newline at end of file diff --git a/core/src/hot_reload_test.cpp b/core/src/hot_reload_test.cpp index 19228dc..7b44930 100644 --- a/core/src/hot_reload_test.cpp +++ b/core/src/hot_reload_test.cpp @@ -20,7 +20,7 @@ int main() { auto engine = EngineFactory::create("debug"); auto moduleSystem = ModuleSystemFactory::create("sequential"); - auto io = IOFactory::create("intra"); + auto io = IOFactory::create("intra", "hot-reload-test"); ModuleFactory moduleFactory; std::cout << "โœ… All components created" << std::endl; diff --git a/core/src/test_intra_io_routing.cpp b/core/src/test_intra_io_routing.cpp new file mode 100644 index 0000000..0f9dd68 --- /dev/null +++ b/core/src/test_intra_io_routing.cpp @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include + +using namespace warfactory; + +int main() { + std::cout << "๐Ÿงช Testing IntraIO routing system..." << std::endl; + + try { + // Create manager (singleton, auto-initialized) + auto& manager = IntraIOManager::getInstance(); + manager.setLogLevel(spdlog::level::trace); + + // Create two module instances through manager + auto moduleA = manager.createInstance("module-a"); + auto moduleB = manager.createInstance("module-b"); + + // Module A subscribes to "test:*" + moduleA->subscribe("test:*"); + std::cout << "โœ… Module A subscribed to 'test:*'" << std::endl; + + // Module B subscribes to "test:data" + moduleB->subscribe("test:data"); + std::cout << "โœ… Module B subscribed to 'test:data'" << std::endl; + + // Wait a bit for subscriptions to register + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Module A publishes a message + nlohmann::json testData = {{"value", 42}, {"source", "module-a"}}; + moduleA->publish("test:data", testData); + std::cout << "๐Ÿ“ค Module A published message to 'test:data'" << std::endl; + + // Wait for routing + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Check if modules received messages + std::cout << "\n๐Ÿ“Š Checking message reception:" << std::endl; + std::cout << "Module A has " << moduleA->hasMessages() << " messages" << std::endl; + std::cout << "Module B has " << moduleB->hasMessages() << " messages" << std::endl; + + // Module B should have received the message + if (moduleB->hasMessages() > 0) { + auto msg = moduleB->pullMessage(); + std::cout << "โœ… Module B received: " << msg.topic << " -> " << msg.data.dump() << std::endl; + } else { + std::cout << "โŒ Module B did not receive message" << std::endl; + } + + // Test another direction - Module B publishes to Module A + nlohmann::json responseData = {{"response", "ok"}, {"source", "module-b"}}; + moduleB->publish("test:response", responseData); + std::cout << "\n๐Ÿ“ค Module B published message to 'test:response'" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Module A should receive it (subscribed to test:*) + std::cout << "Module A has " << moduleA->hasMessages() << " messages" << std::endl; + if (moduleA->hasMessages() > 0) { + auto msg = moduleA->pullMessage(); + std::cout << "โœ… Module A received: " << msg.topic << " -> " << msg.data.dump() << std::endl; + } else { + std::cout << "โŒ Module A did not receive message" << std::endl; + std::cout << "๐Ÿ” Debug: Pattern 'test:*' should match 'test:response'" << std::endl; + } + + // Show routing stats + auto stats = manager.getRoutingStats(); + std::cout << "\n๐Ÿ“Š Final routing statistics:" << std::endl; + std::cout << " Routed messages: " << stats["total_routed_messages"] << std::endl; + std::cout << " Total routes: " << stats["total_routes"] << std::endl; + std::cout << " Active instances: " << stats["active_instances"] << std::endl; + + std::cout << "\n๐ŸŽ‰ Test completed successfully!" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "โŒ Test failed: " << e.what() << std::endl; + return 1; + } +} \ No newline at end of file diff --git a/core/src/test_unified_io.cpp b/core/src/test_unified_io.cpp new file mode 100644 index 0000000..cffad39 --- /dev/null +++ b/core/src/test_unified_io.cpp @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include + +using namespace warfactory; + +int main() { + std::cout << "๐Ÿงช Testing unified IOFactory with IntraIO routing..." << std::endl; + + try { + // Create two instances via IOFactory (like modules would) + auto moduleIO1 = IOFactory::create("intra", "module-factory-test-1"); + auto moduleIO2 = IOFactory::create("intra", "module-factory-test-2"); + + std::cout << "โœ… Created 2 IO instances via IOFactory" << std::endl; + + // Test subscriptions + moduleIO1->subscribe("test:*"); + moduleIO2->subscribe("test:data"); + + std::cout << "โœ… Set up subscriptions" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Test messaging between factory-created instances + nlohmann::json testData = {{"message", "from factory test"}, {"source", "module1"}}; + moduleIO1->publish("test:data", testData); + + std::cout << "๐Ÿ“ค Module 1 published message" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Check if module 2 received it + if (moduleIO2->hasMessages() > 0) { + auto msg = moduleIO2->pullMessage(); + std::cout << "โœ… Module 2 received: " << msg.topic << " -> " << msg.data.dump() << std::endl; + } else { + std::cout << "โŒ Module 2 did not receive message" << std::endl; + return 1; + } + + // Test the reverse direction + nlohmann::json responseData = {{"response", "ok"}, {"source", "module2"}}; + moduleIO2->publish("test:response", responseData); + + std::cout << "๐Ÿ“ค Module 2 published response" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Module 1 should receive it (subscribed to test:*) + if (moduleIO1->hasMessages() > 0) { + auto msg = moduleIO1->pullMessage(); + std::cout << "โœ… Module 1 received: " << msg.topic << " -> " << msg.data.dump() << std::endl; + } else { + std::cout << "โŒ Module 1 did not receive response" << std::endl; + return 1; + } + + // Show manager stats + auto& manager = IntraIOManager::getInstance(); + auto stats = manager.getRoutingStats(); + std::cout << "\n๐Ÿ“Š Manager statistics:" << std::endl; + std::cout << " Routed messages: " << stats["total_routed_messages"] << std::endl; + std::cout << " Active instances: " << stats["active_instances"] << std::endl; + + std::cout << "\n๐ŸŽ‰ Unified system test PASSED!" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "โŒ Test failed: " << e.what() << std::endl; + return 1; + } +} \ No newline at end of file diff --git a/modules/debug-world-gen/src/DebugWorldGenModuleLight.cpp b/modules/debug-world-gen/src/DebugWorldGenModuleLight.cpp index abb2221..1d505ac 100644 --- a/modules/debug-world-gen/src/DebugWorldGenModuleLight.cpp +++ b/modules/debug-world-gen/src/DebugWorldGenModuleLight.cpp @@ -27,7 +27,7 @@ private: public: DebugWorldGenModuleLight() { - std::cout << "๐ŸŒ LIGHT Debug World Gen Module created" << std::endl; + std::cout << "๐ŸŒ LIGHT Debug World Gen Module created - HOT RELOAD TEST!" << std::endl; } virtual ~DebugWorldGenModuleLight() { diff --git a/modules/economy/CMakeLists.txt b/modules/economy/CMakeLists.txt index b42acef..f3a363c 100644 --- a/modules/economy/CMakeLists.txt +++ b/modules/economy/CMakeLists.txt @@ -11,6 +11,19 @@ if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Debug) endif() +# Find nlohmann_json +find_package(nlohmann_json QUIET) + +# Minimal FetchContent for missing deps +if(NOT nlohmann_json_FOUND) + include(FetchContent) + FetchContent_Declare(nlohmann_json + URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz + URL_HASH SHA256=d6c65aca6b1ed68e7a182f4757257b107ae403032760ed6ef121c9d55e81757d + ) + FetchContent_MakeAvailable(nlohmann_json) +endif() + set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/build) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/build) @@ -39,6 +52,11 @@ add_executable(economy-standalone src/main.cpp ) +# Link nlohmann_json to all targets +target_link_libraries(economy-module PRIVATE nlohmann_json::nlohmann_json) +target_link_libraries(economy-test PRIVATE nlohmann_json::nlohmann_json) +target_link_libraries(economy-standalone PRIVATE nlohmann_json::nlohmann_json) + add_custom_target(build-economy DEPENDS economy-module COMMENT "Building economy.so module" diff --git a/modules/economy/src/main.cpp b/modules/economy/src/main.cpp new file mode 100644 index 0000000..f9fc32d --- /dev/null +++ b/modules/economy/src/main.cpp @@ -0,0 +1,43 @@ +#include +#include +#include "EconomyModule.cpp" + +using json = nlohmann::json; +using namespace warfactory; + +int main() { + std::cout << "๐Ÿฆ Economy Module Standalone Test" << std::endl; + + EconomyModule economy; + + // Test 1: Initialize market + json market_update = { + {"type", "market_update"}, + {"item", "steel_plate"}, + {"supply", 1000}, + {"demand", 800} + }; + + auto result = economy.process(market_update); + std::cout << "Market Update Result: " << result.dump(2) << std::endl; + + // Test 2: Get prices + json price_query = {{"type", "get_prices"}}; + auto prices = economy.process(price_query); + std::cout << "Market Prices: " << prices.dump(2) << std::endl; + + // Test 3: Execute trade + json trade = { + {"type", "trade"}, + {"action", "buy"}, + {"item", "steel_plate"}, + {"quantity", 50}, + {"max_price", 5.0} + }; + + auto trade_result = economy.process(trade); + std::cout << "Trade Result: " << trade_result.dump(2) << std::endl; + + std::cout << "๐ŸŽ‰ Economy module test completed!" << std::endl; + return 0; +} \ No newline at end of file