diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index e99c18a..8be4231 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -115,10 +115,38 @@ GroveEngine uses a **module-based architecture** with hot-reload support: | Component | Purpose | Documentation | |-----------|---------|---------------| | **IModule** | Module interface | [USER_GUIDE.md](USER_GUIDE.md#imodule) | -| **IIO** | Pub/Sub messaging | [USER_GUIDE.md](USER_GUIDE.md#iio) | +| **IIO** | Pull-based pub/sub with callback dispatch | [USER_GUIDE.md](USER_GUIDE.md#iio) | | **IDataNode** | Configuration & data | [USER_GUIDE.md](USER_GUIDE.md#idatanode) | | **ModuleLoader** | Hot-reload system | [USER_GUIDE.md](USER_GUIDE.md#moduleloader) | +#### IIO Callback Dispatch Pattern + +GroveEngine uses a **pull-based callback dispatch** pattern for message processing: + +```cpp +// OLD API (deprecated): +// io->subscribe("topic:pattern"); +// while (io->hasMessages()) { +// auto msg = io->pullMessage(); +// if (msg.topic == "topic:pattern") { /* handle */ } +// } + +// NEW API (callback-based): +io->subscribe("topic:pattern", [this](const Message& msg) { + // Handle message - no if-forest needed +}); + +while (io->hasMessages()) { + io->pullAndDispatch(); // Callbacks invoked automatically +} +``` + +**Key advantages:** +- **No if-forest dispatch**: Register handlers at subscription, not in process loop +- **Module controls WHEN**: Pull-based processing for deterministic ordering +- **Callbacks handle HOW**: Clean separation of concerns +- **Thread-safe**: Callbacks invoked in module's thread context + --- ## Available Modules @@ -252,22 +280,19 @@ uiModule->setConfiguration(uiConfig, uiIO.get(), nullptr); ``` ```cpp -// In your game module - subscribe to button events -gameIO->subscribe("ui:click"); -gameIO->subscribe("ui:action"); +// In your game module - subscribe to button events with callbacks (in setConfiguration) +gameIO->subscribe("ui:action", [this](const grove::Message& msg) { + std::string action = msg.data->getString("action", ""); + std::string widgetId = msg.data->getString("widgetId", ""); -// In process() -while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - - if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - std::string widgetId = msg.data->getString("widgetId", ""); - - if (action == "start_game" && widgetId == "play_button") { - startGame(); - } + if (action == "start_game" && widgetId == "play_button") { + startGame(); } +}); + +// In process() - pull and dispatch to callbacks +while (gameIO->hasMessages() > 0) { + gameIO->pullAndDispatch(); // Callback invoked automatically } ``` @@ -352,37 +377,34 @@ JsonDataNode input("input"); inputModule->process(input); ``` -#### Consuming Input Events +#### Consuming Input Events with Callbacks ```cpp -// Subscribe to input topics -gameIO->subscribe("input:mouse:button"); -gameIO->subscribe("input:keyboard:key"); +// Subscribe to input topics with callback handlers (in setConfiguration) +gameIO->subscribe("input:mouse:button", [this](const grove::Message& msg) { + int button = msg.data->getInt("button", 0); // 0=left, 1=middle, 2=right + bool pressed = msg.data->getBool("pressed", false); + double x = msg.data->getDouble("x", 0.0); + double y = msg.data->getDouble("y", 0.0); -// In process() + if (button == 0 && pressed) { + // Left mouse button pressed at (x, y) + handleClick(x, y); + } +}); + +gameIO->subscribe("input:keyboard:key", [this](const grove::Message& msg) { + int scancode = msg.data->getInt("scancode", 0); // SDL_SCANCODE_* + bool pressed = msg.data->getBool("pressed", false); + + if (scancode == SDL_SCANCODE_SPACE && pressed) { + playerJump(); + } +}); + +// In process() - pull and auto-dispatch to callbacks while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - - if (msg.topic == "input:mouse:button") { - int button = msg.data->getInt("button", 0); // 0=left, 1=middle, 2=right - bool pressed = msg.data->getBool("pressed", false); - double x = msg.data->getDouble("x", 0.0); - double y = msg.data->getDouble("y", 0.0); - - if (button == 0 && pressed) { - // Left mouse button pressed at (x, y) - handleClick(x, y); - } - } - - if (msg.topic == "input:keyboard:key") { - int scancode = msg.data->getInt("scancode", 0); // SDL_SCANCODE_* - bool pressed = msg.data->getBool("pressed", false); - - if (scancode == SDL_SCANCODE_SPACE && pressed) { - playerJump(); - } - } + gameIO->pullAndDispatch(); // Callbacks invoked automatically } ``` @@ -687,25 +709,28 @@ public: grove::ITaskScheduler* scheduler) override { m_io = io; - // Subscribe to UI events - m_io->subscribe("ui:action"); - m_io->subscribe("ui:click"); + // Subscribe to UI events with callback handlers + m_io->subscribe("ui:action", [this](const grove::Message& msg) { + std::string action = msg.data->getString("action", ""); + if (action == "start_game") { + startGame(); + } + }); + + m_io->subscribe("ui:click", [this](const grove::Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + double x = msg.data->getDouble("x", 0.0); + double y = msg.data->getDouble("y", 0.0); + handleClick(widgetId, x, y); + }); } void process(const grove::IDataNode& input) override { double deltaTime = input.getDouble("deltaTime", 0.016); - // Process UI events + // Process UI events - pull and auto-dispatch to callbacks while (m_io->hasMessages() > 0) { - auto msg = m_io->pullMessage(); - - if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - - if (action == "start_game") { - startGame(); - } - } + m_io->pullAndDispatch(); // Callbacks invoked automatically } // Update game logic @@ -905,23 +930,34 @@ io->subscribeLowFreq("analytics:*", config); #### Request-Response Pattern ```cpp -// Module A: Request pathfinding +// Module A: Subscribe to response first (in setConfiguration) +moduleA_io->subscribe("pathfinding:response", [this](const grove::Message& msg) { + std::string requestId = msg.data->getString("requestId", ""); + // ... apply path result ... +}); + +// Module A: Request pathfinding (in process) auto request = std::make_unique("request"); request->setString("requestId", "path_123"); request->setDouble("startX", 10.0); request->setDouble("startY", 20.0); -io->publish("pathfinding:request", std::move(request)); +moduleA_io->publish("pathfinding:request", std::move(request)); -// Module B: Respond with path -moduleB_io->subscribe("pathfinding:request"); -// ... compute path ... -auto response = std::make_unique("response"); -response->setString("requestId", "path_123"); -// ... add path data ... -moduleB_io->publish("pathfinding:response", std::move(response)); +// Module B: Subscribe to request (in setConfiguration) +moduleB_io->subscribe("pathfinding:request", [this](const grove::Message& msg) { + std::string requestId = msg.data->getString("requestId", ""); + // ... compute path ... -// Module A: Receive response -moduleA_io->subscribe("pathfinding:response"); + auto response = std::make_unique("response"); + response->setString("requestId", requestId); + // ... add path data ... + m_io->publish("pathfinding:response", std::move(response)); +}); + +// Module A/B: In process() - pull and dispatch +while (io->hasMessages() > 0) { + io->pullAndDispatch(); // Callbacks invoked automatically +} ``` #### Event Aggregation @@ -932,8 +968,15 @@ io->publish("combat:damage", damageData); io->publish("combat:kill", killData); io->publish("combat:levelup", levelupData); -// Analytics module aggregates all combat events -analyticsIO->subscribe("combat:*"); +// Analytics module aggregates all combat events (in setConfiguration) +analyticsIO->subscribe("combat:*", [this](const grove::Message& msg) { + aggregateCombatEvent(msg); +}); + +// In process() +while (analyticsIO->hasMessages() > 0) { + analyticsIO->pullAndDispatch(); // Callback invoked for each event +} ``` ### Testing Strategies @@ -978,12 +1021,24 @@ ldd build/modules/GameLogic.so #### IIO messages not received ```cpp -// Verify subscription BEFORE publishing -io->subscribe("render:sprite"); // Must be before publish +// Verify subscription with callback BEFORE publishing (in setConfiguration) +io->subscribe("render:sprite", [this](const grove::Message& msg) { + handleSprite(msg); +}); // Check topic patterns -io->subscribe("render:*"); // Matches render:sprite, render:text -io->subscribe("render:sprite:*"); // Only matches render:sprite:batch +io->subscribe("render:*", [this](const grove::Message& msg) { + // Matches render:sprite, render:text, etc. +}); + +io->subscribe("render:sprite:*", [this](const grove::Message& msg) { + // Only matches render:sprite:batch, render:sprite:add, etc. +}); + +// Remember to pullAndDispatch in process() +while (io->hasMessages() > 0) { + io->pullAndDispatch(); +} ``` #### Hot-reload state loss diff --git a/docs/UI_ARCHITECTURE.md b/docs/UI_ARCHITECTURE.md index 257eabe..2bac7a3 100644 --- a/docs/UI_ARCHITECTURE.md +++ b/docs/UI_ARCHITECTURE.md @@ -31,16 +31,24 @@ This is **intentional** to maintain the IIO-based architecture where all communi **Example:** ```cpp -// Slider value changed -if (msg.topic == "ui:value_changed" && widgetId == "volume_slider") { - double value = msg.data->getDouble("value", 0); - setVolume(value); +// Subscribe to slider value changes (in setConfiguration) +gameIO->subscribe("ui:value_changed", [this](const grove::Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + if (widgetId == "volume_slider") { + double value = msg.data->getDouble("value", 0); + setVolume(value); - // Update label (must go through game module) - auto updateMsg = std::make_unique("set_text"); - updateMsg->setString("id", "volume_label"); - updateMsg->setString("text", "Volume: " + std::to_string((int)value) + "%"); - m_io->publish("ui:set_text", std::move(updateMsg)); + // Update label (must go through game module) + auto updateMsg = std::make_unique("set_text"); + updateMsg->setString("id", "volume_label"); + updateMsg->setString("text", "Volume: " + std::to_string((int)value) + "%"); + m_io->publish("ui:set_text", std::move(updateMsg)); + } +}); + +// In process() +while (gameIO->hasMessages() > 0) { + gameIO->pullAndDispatch(); // Callback invoked automatically } ``` @@ -94,8 +102,9 @@ Each module runs in its own thread: void uiThread() { while(running) { // Receive inputs from queue (filled by InputModule thread) + // Callbacks registered at subscribe() handle dispatch while(io->hasMessages()) { - handleMessage(io->pullMessage()); + io->pullAndDispatch(); // Auto-dispatch to registered callbacks } update(deltaTime); @@ -111,8 +120,9 @@ void uiThread() { void gameThread() { while(running) { // Pull messages from queue (latency < 1ms) + // Callbacks registered at subscribe() handle dispatch while(io->hasMessages()) { - handleMessage(io->pullMessage()); // Already in queue! + io->pullAndDispatch(); // Auto-dispatch, already in queue! } updateGameLogic(deltaTime); @@ -337,12 +347,14 @@ These features violate core design principles and will **never** be added: ## Design Principles -1. **IIO-First:** All communication via topics, no direct coupling -2. **Retained Mode:** Cache state, minimize IIO traffic -3. **Hot-Reload Safe:** Full state preservation across reloads -4. **Thread-Safe:** Designed for multi-threaded production use -5. **Module Independence:** UIModule never imports BgfxRenderer or InputModule headers -6. **Game Logic Separation:** Widgets are dumb views, game modules handle logic +1. **IIO-First:** All communication via topics with callback dispatch, no direct coupling +2. **Callback Dispatch:** Subscribe with handlers, no if-forest dispatch in process() +3. **Pull-Based Control:** Module controls WHEN to process (pullAndDispatch), callbacks handle HOW +4. **Retained Mode:** Cache state, minimize IIO traffic +5. **Hot-Reload Safe:** Full state preservation across reloads +6. **Thread-Safe:** Designed for multi-threaded production use +7. **Module Independence:** UIModule never imports BgfxRenderer or InputModule headers +8. **Game Logic Separation:** Widgets are dumb views, game modules handle logic ## Integration with Other Modules diff --git a/docs/UI_MODULE_DEMO.md b/docs/UI_MODULE_DEMO.md index 3fab2c5..3ff28ae 100644 --- a/docs/UI_MODULE_DEMO.md +++ b/docs/UI_MODULE_DEMO.md @@ -221,20 +221,22 @@ uiIO->publish("input:text", std::move(textInput)); ### Event Logging ```cpp -while (uiIO->hasMessages() > 0) { - auto msg = uiIO->pullMessage(); +// Subscribe to UI events with callbacks (during setup) +uiIO->subscribe("ui:click", [&clickCount, &eventLog](const grove::Message& msg) { + clickCount++; + std::string widgetId = msg.data->getString("widgetId", ""); + eventLog.add("πŸ–±οΈ Click: " + widgetId); +}); - if (msg.topic == "ui:click") { - clickCount++; - std::string widgetId = msg.data->getString("widgetId", ""); - eventLog.add("πŸ–±οΈ Click: " + widgetId); - } - else if (msg.topic == "ui:action") { - actionCount++; - std::string action = msg.data->getString("action", ""); - eventLog.add("⚑ Action: " + action); - } - // ... handle other events +uiIO->subscribe("ui:action", [&actionCount, &eventLog](const grove::Message& msg) { + actionCount++; + std::string action = msg.data->getString("action", ""); + eventLog.add("⚑ Action: " + action); +}); + +// In main loop - pull and dispatch to callbacks +while (uiIO->hasMessages() > 0) { + uiIO->pullAndDispatch(); // Callbacks invoked automatically } ``` diff --git a/docs/UI_TOPICS.md b/docs/UI_TOPICS.md index fc26e4f..ac2aea2 100644 --- a/docs/UI_TOPICS.md +++ b/docs/UI_TOPICS.md @@ -57,31 +57,28 @@ See [UI Rendering Documentation](UI_RENDERING.md) for details on retained vs imm ## Usage Examples -### Handling UI Events +### Handling UI Events with Callbacks ```cpp -// Subscribe to UI events -gameIO->subscribe("ui:action"); -gameIO->subscribe("ui:value_changed"); +// Subscribe to UI events with callback handlers (in setConfiguration) +gameIO->subscribe("ui:action", [this](const grove::Message& msg) { + std::string action = msg.data->getString("action", ""); + if (action == "start_game") { + startGame(); + } +}); -// In game loop +gameIO->subscribe("ui:value_changed", [this](const grove::Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + if (widgetId == "volume_slider") { + double value = msg.data->getDouble("value", 50.0); + setVolume(value); + } +}); + +// In game loop (process method) while (m_io->hasMessages() > 0) { - auto msg = m_io->pullMessage(); - - if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - if (action == "start_game") { - startGame(); - } - } - - if (msg.topic == "ui:value_changed") { - std::string widgetId = msg.data->getString("widgetId", ""); - if (widgetId == "volume_slider") { - double value = msg.data->getDouble("value", 50.0); - setVolume(value); - } - } + m_io->pullAndDispatch(); // Callbacks invoked automatically } ``` @@ -112,14 +109,23 @@ m_io->publish("ui:set_value", std::move(msg)); Common pattern: update a label when a slider changes. ```cpp -if (msg.topic == "ui:value_changed" && widgetId == "volume_slider") { - double value = msg.data->getDouble("value", 50.0); - setVolume(value); +// Subscribe to slider value changes (in setConfiguration) +gameIO->subscribe("ui:value_changed", [this](const grove::Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + if (widgetId == "volume_slider") { + double value = msg.data->getDouble("value", 50.0); + setVolume(value); - // Update label to show current value - auto updateMsg = std::make_unique("set_text"); - updateMsg->setString("id", "volume_label"); - updateMsg->setString("text", "Volume: " + std::to_string((int)value) + "%"); - m_io->publish("ui:set_text", std::move(updateMsg)); + // Update label to show current value + auto updateMsg = std::make_unique("set_text"); + updateMsg->setString("id", "volume_label"); + updateMsg->setString("text", "Volume: " + std::to_string((int)value) + "%"); + m_io->publish("ui:set_text", std::move(updateMsg)); + } +}); + +// In process() +while (gameIO->hasMessages() > 0) { + gameIO->pullAndDispatch(); // Callback invoked automatically } ``` diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 53984b7..07ada8a 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -34,7 +34,7 @@ GroveEngine provides: - Modules contain pure business logic (200-300 lines recommended) - No infrastructure code in modules (threading, networking, persistence) - All data via `IDataNode` abstraction (backend agnostic) -- Pull-based message processing (modules control when they read messages) +- Pull-based message processing with callback dispatch (modules control WHEN to process, callbacks handle HOW) --- @@ -57,10 +57,10 @@ Hierarchical data structure for configuration, state, and messages. Supports: ### IIO -Pub/Sub communication interface: +Pull-based pub/sub communication with callback dispatch: - `publish()`: Send messages to topics -- `subscribe()`: Listen to topic patterns -- `pullMessage()`: Consume received messages +- `subscribe()`: Register callback handler for topic pattern +- `pullAndDispatch()`: Pull and auto-dispatch message to handler ### ModuleLoader @@ -257,11 +257,9 @@ void MyModule::process(const grove::IDataNode& input) { // Your processing logic here m_counter++; - // Process incoming messages + // Process incoming messages (dispatch to registered callbacks) while (m_io && m_io->hasMessages() > 0) { - auto msg = m_io->pullMessage(); - m_logger->debug("Received message on topic: {}", msg.topic); - // Handle message... + m_io->pullAndDispatch(); // Callbacks invoked automatically } // Publish events if needed @@ -394,7 +392,20 @@ void destroyModule(grove::IModule* module) { ### IIO Pub/Sub System -Modules communicate via topics using publish/subscribe pattern. +Modules communicate via topics using publish/subscribe pattern with callback dispatch. + +#### Key Design: Pull-Based with Callback Dispatch + +Unlike traditional push-based systems, IIO gives modules control over **WHEN** to process messages while callbacks handle **HOW** to process them: + +1. **Subscribe with Callback** (in `setConfiguration`): Register handlers for topic patterns +2. **Pull and Dispatch** (in `process`): Module controls when to process - callbacks invoked automatically + +**Benefits:** +- **No if-forest dispatch**: Logic registered at subscription, not scattered in process() +- **Module controls timing**: Pull-based means deterministic frame ordering +- **Thread-safe**: Callbacks invoked in module's own thread context +- **Clean separation**: Subscription setup vs. message processing #### Publishing Messages @@ -411,7 +422,7 @@ void MyModule::process(const grove::IDataNode& input) { } ``` -#### Subscribing to Topics +#### Subscribing to Topics with Callbacks ```cpp void MyModule::setConfiguration(const grove::IDataNode& configNode, @@ -419,31 +430,40 @@ void MyModule::setConfiguration(const grove::IDataNode& configNode, grove::ITaskScheduler* scheduler) { m_io = io; - // Subscribe to specific topic - m_io->subscribe("game:player:*"); + // Subscribe to specific topic with callback handler + m_io->subscribe("game:player:position", [this](const grove::Message& msg) { + double x = msg.data->getDouble("x", 0.0); + double y = msg.data->getDouble("y", 0.0); + // Handle position update... + }); + + // Subscribe with wildcard pattern + m_io->subscribe("game:player:*", [this](const grove::Message& msg) { + handlePlayerEvent(msg); + }); // Subscribe with low-frequency batching (for non-critical updates) grove::SubscriptionConfig config; config.batchInterval = 1000; // 1 second batches - m_io->subscribeLowFreq("analytics:*", config); + m_io->subscribeLowFreq("analytics:*", [this](const grove::Message& msg) { + processBatchedAnalytics(msg); + }, config); } ``` -#### Processing Messages +#### Processing Messages with Callback Dispatch ```cpp void MyModule::process(const grove::IDataNode& input) { - // Pull-based: module controls when to process messages + // Pull-based: module controls WHEN to process messages + // Callbacks registered at subscribe() handle HOW to process while (m_io->hasMessages() > 0) { - grove::Message msg = m_io->pullMessage(); - - if (msg.topic == "game:player:position") { - double x = msg.data->getDouble("x", 0.0); - double y = msg.data->getDouble("y", 0.0); - // Handle position update... - } + m_io->pullAndDispatch(); // Automatically invokes registered callback } } + +// No more if-forest dispatch - callbacks were registered at subscription: +// subscribe("game:player:position", [this](const Message& msg) { ... }); ``` ### Topic Patterns @@ -670,10 +690,10 @@ void MyModule::process(const grove::IDataNode& input) { | Method | Description | |--------|-------------| | `publish(topic, data)` | Publish message to topic | -| `subscribe(pattern, config)` | Subscribe to topic pattern | -| `subscribeLowFreq(pattern, config)` | Subscribe with batching | +| `subscribe(pattern, handler, config)` | Subscribe with callback handler | +| `subscribeLowFreq(pattern, handler, config)` | Subscribe with batching and callback | | `hasMessages()` | Count of pending messages | -| `pullMessage()` | Consume one message | +| `pullAndDispatch()` | Pull and auto-dispatch message to handler | | `getHealth()` | Get IO health metrics | ### IModule diff --git a/include/grove/IIO.h b/include/grove/IIO.h index 1210c32..8119658 100644 --- a/include/grove/IIO.h +++ b/include/grove/IIO.h @@ -48,15 +48,28 @@ struct IOHealth { }; /** - * @brief Pub/Sub communication interface with pull-based synchronous design + * @brief Message handler callback type * - * Pull-based pub/sub system optimized for game modules. Modules have full control - * over when they process messages, avoiding threading issues. + * Callback invoked when a message matching the subscribed pattern is pulled. + * Module implements this to handle specific message types without if-forest dispatch. + */ +using MessageHandler = std::function; + +/** + * @brief Pub/Sub communication interface with pull-based callback dispatch + * + * Pull-based pub/sub system with automatic message dispatch to registered handlers. + * Modules subscribe with callbacks, then pull messages - dispatch is automatic. + * + * Design: + * - Modules retain control over WHEN to process (pull-based) + * - No if-forest dispatch (callbacks registered at subscription) + * - Thread-safe for multi-threaded module systems * * Features: * - Topic patterns with wildcards (e.g., "player:*", "economy:*") * - Low-frequency subscriptions for bandwidth optimization - * - Message consumption (pull removes message from queue) + * - Automatic callback dispatch on pull * - Engine health monitoring for backpressure management */ class IIO { @@ -71,18 +84,38 @@ public: virtual void publish(const std::string& topic, std::unique_ptr message) = 0; /** - * @brief Subscribe to topic pattern (high-frequency) + * @brief Subscribe to topic pattern with callback handler (high-frequency) * @param topicPattern Topic pattern with wildcards (e.g., "player:*") + * @param handler Callback invoked when matching message is pulled * @param config Optional subscription configuration + * + * Example: + * io->subscribe("input:mouse", [this](const Message& msg) { + * handleMouseInput(msg); + * }); */ - virtual void subscribe(const std::string& topicPattern, const SubscriptionConfig& config = {}) = 0; + virtual void subscribe( + const std::string& topicPattern, + MessageHandler handler, + const SubscriptionConfig& config = {} + ) = 0; /** - * @brief Subscribe to topic pattern (low-frequency batched) + * @brief Subscribe to topic pattern with callback (low-frequency batched) * @param topicPattern Topic pattern with wildcards + * @param handler Callback invoked when matching message is pulled * @param config Subscription configuration (batchInterval, etc.) + * + * Example: + * io->subscribeLowFreq("analytics:*", [this](const Message& msg) { + * processBatchedAnalytics(msg); + * }, {.batchInterval = 5000}); */ - virtual void subscribeLowFreq(const std::string& topicPattern, const SubscriptionConfig& config = {}) = 0; + virtual void subscribeLowFreq( + const std::string& topicPattern, + MessageHandler handler, + const SubscriptionConfig& config = {} + ) = 0; /** * @brief Get count of pending messages @@ -91,11 +124,18 @@ public: virtual int hasMessages() const = 0; /** - * @brief Pull and consume one message - * @return Message from queue (oldest first). Message is removed from queue. + * @brief Pull and auto-dispatch one message to registered handler * @throws std::runtime_error if no messages available + * + * Pulls oldest message from queue and invokes the callback registered + * during subscribe(). Message is consumed (removed from queue). + * + * Example usage: + * while (io->hasMessages() > 0) { + * io->pullAndDispatch(); // Callbacks invoked automatically + * } */ - virtual Message pullMessage() = 0; + virtual void pullAndDispatch() = 0; /** * @brief Get IO health status for Engine monitoring diff --git a/include/grove/IntraIO.h b/include/grove/IntraIO.h index cc0b99c..a60b446 100644 --- a/include/grove/IntraIO.h +++ b/include/grove/IntraIO.h @@ -66,6 +66,7 @@ private: struct Subscription { std::regex pattern; std::string originalPattern; + MessageHandler handler; // Callback for this subscription SubscriptionConfig config; std::chrono::high_resolution_clock::time_point lastBatch; std::unordered_map batchedMessages; // For replaceable messages @@ -74,7 +75,7 @@ private: // Default constructor Subscription() = default; - // Move-only (Message contains unique_ptr) + // Move-only (Message contains unique_ptr, handler is copyable) Subscription(Subscription&&) = default; Subscription& operator=(Subscription&&) = default; Subscription(const Subscription&) = delete; @@ -113,10 +114,10 @@ public: // IIO implementation void publish(const std::string& topic, std::unique_ptr message) override; - void subscribe(const std::string& topicPattern, const SubscriptionConfig& config = {}) override; - void subscribeLowFreq(const std::string& topicPattern, const SubscriptionConfig& config = {}) override; + void subscribe(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config = {}) override; + void subscribeLowFreq(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config = {}) override; int hasMessages() const override; - Message pullMessage() override; + void pullAndDispatch() override; IOHealth getHealth() const override; IOType getType() const override; diff --git a/modules/BgfxRenderer/Scene/SceneCollector.cpp b/modules/BgfxRenderer/Scene/SceneCollector.cpp index 485646a..d58bba2 100644 --- a/modules/BgfxRenderer/Scene/SceneCollector.cpp +++ b/modules/BgfxRenderer/Scene/SceneCollector.cpp @@ -8,22 +8,9 @@ namespace grove { void SceneCollector::setup(IIO* io, uint16_t width, uint16_t height) { - // Subscribe to all render topics (multi-level wildcard .* matches render:sprite AND render:debug:line) - io->subscribe("render:.*"); - - // Initialize default view with provided dimensions (will be overridden by camera messages) - initDefaultView(width > 0 ? width : 1280, height > 0 ? height : 720); -} - -void SceneCollector::collect(IIO* io, float deltaTime) { - m_deltaTime = deltaTime; - m_frameNumber++; - - // Pull all pending messages - while (io->hasMessages() > 0) { - Message msg = io->pullMessage(); - - if (!msg.data) continue; + // Subscribe to all render topics with callback handler + io->subscribe("render:.*", [this](const Message& msg) { + if (!msg.data) return; // Route message based on topic // Retained mode (new) - sprites @@ -77,6 +64,19 @@ void SceneCollector::collect(IIO* io, float deltaTime) { else if (msg.topic == "render:debug:rect") { parseDebugRect(*msg.data); } + }); + + // Initialize default view with provided dimensions (will be overridden by camera messages) + initDefaultView(width > 0 ? width : 1280, height > 0 ? height : 720); +} + +void SceneCollector::collect(IIO* io, float deltaTime) { + m_deltaTime = deltaTime; + m_frameNumber++; + + // Pull and dispatch all pending messages (callbacks invoked automatically) + while (io->hasMessages() > 0) { + io->pullAndDispatch(); } } diff --git a/modules/InputModule/README.md b/modules/InputModule/README.md index d79cbb1..5a496ac 100644 --- a/modules/InputModule/README.md +++ b/modules/InputModule/README.md @@ -95,9 +95,20 @@ config.setBool("enableMouse", true); config.setBool("enableKeyboard", true); inputModule->setConfiguration(config, inputIO.get(), nullptr); -// Subscribe to events -gameIO->subscribe("input:mouse:button"); -gameIO->subscribe("input:keyboard:key"); +// Subscribe to events with callback handlers +gameIO->subscribe("input:mouse:button", [this](const grove::Message& msg) { + int button = msg.data->getInt("button", 0); + bool pressed = msg.data->getBool("pressed", false); + double x = msg.data->getDouble("x", 0.0); + double y = msg.data->getDouble("y", 0.0); + handleMouseButton(button, pressed, x, y); +}); + +gameIO->subscribe("input:keyboard:key", [this](const grove::Message& msg) { + int scancode = msg.data->getInt("scancode", 0); + bool pressed = msg.data->getBool("pressed", false); + handleKeyboard(scancode, pressed); +}); // Main loop while (running) { @@ -111,15 +122,9 @@ while (running) { grove::JsonDataNode input("input"); inputModule->process(input); - // 3. Process game logic + // 3. Process game logic - pull and auto-dispatch to callbacks while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - - if (msg.topic == "input:mouse:button") { - int button = msg.data->getInt("button", 0); - bool pressed = msg.data->getBool("pressed", false); - // Handle click... - } + gameIO->pullAndDispatch(); // Callbacks invoked automatically } } diff --git a/modules/UIModule/README.md b/modules/UIModule/README.md index 1a31c4f..a521674 100644 --- a/modules/UIModule/README.md +++ b/modules/UIModule/README.md @@ -37,19 +37,23 @@ config.setString("layoutFile", "./ui/menu.json"); config.setInt("baseLayer", 1000); uiModule->setConfiguration(config, uiIO.get(), nullptr); -// Subscribe to UI events -gameIO->subscribe("ui:action"); -gameIO->subscribe("ui:value_changed"); +// Subscribe to UI events with callback handlers +gameIO->subscribe("ui:action", [this](const grove::Message& msg) { + std::string action = msg.data->getString("action", ""); + handleAction(action); +}); + +gameIO->subscribe("ui:value_changed", [this](const grove::Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + double value = msg.data->getDouble("value", 0.0); + handleValueChange(widgetId, value); +}); // Game loop while(running) { - // Handle UI events + // Handle UI events - pull and auto-dispatch to callbacks while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - handleAction(action); - } + gameIO->pullAndDispatch(); // Callbacks invoked automatically } uiModule->process(deltaTime); diff --git a/modules/UIModule/UIModule.cpp b/modules/UIModule/UIModule.cpp index 590a95d..9c42096 100644 --- a/modules/UIModule/UIModule.cpp +++ b/modules/UIModule/UIModule.cpp @@ -73,54 +73,14 @@ void UIModule::setConfiguration(const IDataNode& config, IIO* io, ITaskScheduler } } - // Subscribe to input topics + // Subscribe to input topics with callbacks if (m_io) { - m_io->subscribe("input:mouse:move"); - m_io->subscribe("input:mouse:button"); - m_io->subscribe("input:mouse:wheel"); - m_io->subscribe("input:keyboard"); - m_io->subscribe("ui:load"); // Load new layout - m_io->subscribe("ui:set_value"); // Set widget value - m_io->subscribe("ui:set_visible"); // Show/hide widget - m_io->subscribe("ui:set_text"); // Set widget text (for labels) - } - - m_logger->info("UIModule initialized"); -} - -void UIModule::process(const IDataNode& input) { - float deltaTime = static_cast(input.getDouble("deltaTime", 0.016)); - - // Begin new frame - m_context->beginFrame(); - m_renderer->beginFrame(); - - // Process input messages from IIO - processInput(); - - // Update UI logic - updateUI(deltaTime); - - // Render UI - renderUI(); - - m_frameCount++; -} - -void UIModule::processInput() { - if (!m_io) return; - - while (m_io->hasMessages() > 0) { - auto msg = m_io->pullMessage(); - - if (msg.topic == "input:mouse:move") { + m_io->subscribe("input:mouse:move", [this](const Message& msg) { m_context->mouseX = static_cast(msg.data->getDouble("x", 0.0)); m_context->mouseY = static_cast(msg.data->getDouble("y", 0.0)); - } - else if (msg.topic == "input:mouse:wheel") { - m_context->mouseWheelDelta = static_cast(msg.data->getDouble("delta", 0.0)); - } - else if (msg.topic == "input:mouse:button") { + }); + + m_io->subscribe("input:mouse:button", [this](const Message& msg) { bool pressed = msg.data->getBool("pressed", false); if (pressed && !m_context->mouseDown) { m_context->mousePressed = true; @@ -129,19 +89,26 @@ void UIModule::processInput() { m_context->mouseReleased = true; } m_context->mouseDown = pressed; - } - else if (msg.topic == "input:keyboard") { + }); + + m_io->subscribe("input:mouse:wheel", [this](const Message& msg) { + m_context->mouseWheelDelta = static_cast(msg.data->getDouble("delta", 0.0)); + }); + + m_io->subscribe("input:keyboard", [this](const Message& msg) { m_context->keyPressed = true; m_context->keyCode = msg.data->getInt("keyCode", 0); m_context->keyChar = static_cast(msg.data->getInt("char", 0)); - } - else if (msg.topic == "ui:load") { + }); + + m_io->subscribe("ui:load", [this](const Message& msg) { std::string layoutPath = msg.data->getString("path", ""); if (!layoutPath.empty()) { loadLayout(layoutPath); } - } - else if (msg.topic == "ui:set_visible") { + }); + + m_io->subscribe("ui:set_visible", [this](const Message& msg) { std::string widgetId = msg.data->getString("id", ""); bool visible = msg.data->getBool("visible", true); if (m_root) { @@ -149,8 +116,9 @@ void UIModule::processInput() { widget->visible = visible; } } - } - else if (msg.topic == "ui:set_text") { + }); + + m_io->subscribe("ui:set_text", [this](const Message& msg) { // Timestamp on receive auto now = std::chrono::high_resolution_clock::now(); auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); @@ -179,7 +147,37 @@ void UIModule::processInput() { } } } - } + }); + } + + m_logger->info("UIModule initialized"); +} + +void UIModule::process(const IDataNode& input) { + float deltaTime = static_cast(input.getDouble("deltaTime", 0.016)); + + // Begin new frame + m_context->beginFrame(); + m_renderer->beginFrame(); + + // Process input messages from IIO + processInput(); + + // Update UI logic + updateUI(deltaTime); + + // Render UI + renderUI(); + + m_frameCount++; +} + +void UIModule::processInput() { + if (!m_io) return; + + // Pull and dispatch all pending messages (callbacks invoked automatically) + while (m_io->hasMessages() > 0) { + m_io->pullAndDispatch(); } } diff --git a/src/DebugEngine.cpp b/src/DebugEngine.cpp index c1787d6..610e695 100644 --- a/src/DebugEngine.cpp +++ b/src/DebugEngine.cpp @@ -429,10 +429,7 @@ void DebugEngine::processClientMessages() { for (int j = 0; j < messagesToProcess; ++j) { try { - auto message = socket->pullMessage(); - std::string dataPreview = message.data ? message.data->getData()->toString() : "null"; - logger->debug("πŸ“© Client {} message: topic='{}', data present={}", - i, message.topic, message.data != nullptr); + socket->pullAndDispatch(); // TODO: Route message to appropriate module or process it logger->trace("🚧 TODO: Route client message to modules"); @@ -456,9 +453,7 @@ void DebugEngine::processCoordinatorMessages() { for (int i = 0; i < messagesToProcess; ++i) { try { - auto message = coordinatorSocket->pullMessage(); - logger->debug("πŸ“© Coordinator message: topic='{}', data present={}", - message.topic, message.data != nullptr); + coordinatorSocket->pullAndDispatch(); // TODO: Handle coordinator commands (shutdown, config reload, etc.) logger->trace("🚧 TODO: Handle coordinator commands"); diff --git a/src/IntraIO.cpp b/src/IntraIO.cpp index 6bacc9e..966fdea 100644 --- a/src/IntraIO.cpp +++ b/src/IntraIO.cpp @@ -46,12 +46,13 @@ void IntraIO::publish(const std::string& topic, std::unique_ptr messa IntraIOManager::getInstance().routeMessage(instanceId, topic, jsonData); } -void IntraIO::subscribe(const std::string& topicPattern, const SubscriptionConfig& config) { +void IntraIO::subscribe(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config) { std::lock_guard lock(operationMutex); Subscription sub; sub.originalPattern = topicPattern; sub.pattern = compileTopicPattern(topicPattern); + sub.handler = handler; // Store callback sub.config = config; sub.lastBatch = std::chrono::high_resolution_clock::now(); @@ -61,12 +62,13 @@ void IntraIO::subscribe(const std::string& topicPattern, const SubscriptionConfi IntraIOManager::getInstance().registerSubscription(instanceId, topicPattern, false); } -void IntraIO::subscribeLowFreq(const std::string& topicPattern, const SubscriptionConfig& config) { +void IntraIO::subscribeLowFreq(const std::string& topicPattern, MessageHandler handler, const SubscriptionConfig& config) { std::lock_guard lock(operationMutex); Subscription sub; sub.originalPattern = topicPattern; sub.pattern = compileTopicPattern(topicPattern); + sub.handler = handler; // Store callback sub.config = config; sub.lastBatch = std::chrono::high_resolution_clock::now(); @@ -81,24 +83,38 @@ int IntraIO::hasMessages() const { return static_cast(messageQueue.size() + lowFreqMessageQueue.size()); } -Message IntraIO::pullMessage() { +void IntraIO::pullAndDispatch() { std::lock_guard lock(operationMutex); if (messageQueue.empty() && lowFreqMessageQueue.empty()) { throw std::runtime_error("No messages available"); } + // Pull message from queue Message msg; + bool isLowFreq = false; if (!messageQueue.empty()) { msg = std::move(messageQueue.front()); messageQueue.pop(); } else { msg = std::move(lowFreqMessageQueue.front()); lowFreqMessageQueue.pop(); + isLowFreq = true; } totalPulled++; - return msg; + + // Find ALL matching handlers and dispatch to each + const auto& subscriptions = isLowFreq ? lowFreqSubscriptions : highFreqSubscriptions; + + for (const auto& sub : subscriptions) { + if (matchesPattern(msg.topic, sub.pattern)) { + // Found matching subscription - invoke handler + if (sub.handler) { + sub.handler(msg); + } + } + } } IOHealth IntraIO::getHealth() const { @@ -224,17 +240,26 @@ bool IntraIO::matchesPattern(const std::string& topic, const std::regex& pattern } std::regex IntraIO::compileTopicPattern(const std::string& pattern) const { - // Convert wildcard pattern to regex - std::string regexPattern = pattern; + // Patterns can be: + // 1. Simple wildcard: "*" β†’ convert to ".*" regex + // 2. Regex patterns: "player:.*", "test:.*" β†’ use as-is - // Escape special regex characters except * + // If pattern contains ".*" already, assume it's a regex pattern + if (pattern.find(".*") != std::string::npos) { + // Already a regex pattern - use as-is + return std::regex(pattern); + } + + // Otherwise, convert simple wildcards to regex std::string escaped; - for (char c : regexPattern) { + for (char c : pattern) { if (c == '*') { + // Simple wildcard: convert to regex ".*" escaped += ".*"; - } else if (c == '.' || c == '+' || c == '?' || c == '^' || c == '$' || + } else if (c == '+' || c == '?' || c == '^' || c == '$' || c == '(' || c == ')' || c == '[' || c == ']' || c == '{' || - c == '}' || c == '|' || c == '\\') { + c == '}' || c == '|' || c == '\\' || c == '.') { + // Escape special regex characters escaped += '\\'; escaped += c; } else { diff --git a/tests/integration/IT_014_ui_module_integration.cpp b/tests/integration/IT_014_ui_module_integration.cpp index 873ef56..5faa685 100644 --- a/tests/integration/IT_014_ui_module_integration.cpp +++ b/tests/integration/IT_014_ui_module_integration.cpp @@ -110,17 +110,28 @@ TEST_CASE("IT_014: UIModule Full Integration", "[integration][ui][phase7]") { std::cout << "⚠️ Renderer not healthy (expected for noop backend), skipping renderer process calls\n"; } - // Subscribe to events we want to verify - gameIO->subscribe("ui:click"); - gameIO->subscribe("ui:action"); - gameIO->subscribe("ui:value_changed"); - gameIO->subscribe("ui:hover"); - int clickCount = 0; int actionCount = 0; int valueChangeCount = 0; int hoverCount = 0; + // Subscribe to events we want to verify with callbacks + gameIO->subscribe("ui:click", [&](const Message& msg) { + clickCount++; + }); + gameIO->subscribe("ui:action", [&](const Message& msg) { + actionCount++; + }); + gameIO->subscribe("ui:value_changed", [&](const Message& msg) { + valueChangeCount++; + }); + gameIO->subscribe("ui:hover", [&](const Message& msg) { + bool enter = msg.data->getBool("enter", false); + if (enter) { + hoverCount++; + } + }); + // Simulate 60 frames (~1 second at 60fps) for (int frame = 0; frame < 60; frame++) { // Simulate mouse movement @@ -169,30 +180,9 @@ TEST_CASE("IT_014: UIModule Full Integration", "[integration][ui][phase7]") { renderer->process(frameInput); } - // Check for events + // Dispatch events (callbacks handle counting and logging) while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - - if (msg.topic == "ui:click") { - clickCount++; - std::cout << " Frame " << frame << ": Click event received\n"; - } - else if (msg.topic == "ui:action") { - actionCount++; - std::string action = msg.data->getString("action", ""); - std::cout << " Frame " << frame << ": Action event: " << action << "\n"; - } - else if (msg.topic == "ui:value_changed") { - valueChangeCount++; - std::cout << " Frame " << frame << ": Value changed\n"; - } - else if (msg.topic == "ui:hover") { - bool enter = msg.data->getBool("enter", false); - if (enter) { - hoverCount++; - std::cout << " Frame " << frame << ": Hover event\n"; - } - } + gameIO->pullAndDispatch(); } // Small delay to simulate real-time diff --git a/tests/integration/IT_015_input_ui_integration.cpp b/tests/integration/IT_015_input_ui_integration.cpp index c419a0e..aace571 100644 --- a/tests/integration/IT_015_input_ui_integration.cpp +++ b/tests/integration/IT_015_input_ui_integration.cpp @@ -53,14 +53,22 @@ TEST_CASE("IT_015: UIModule Input Integration", "[integration][input][ui][phase3 REQUIRE_NOTHROW(uiModule->setConfiguration(uiConfig, uiIO.get(), nullptr)); std::cout << "βœ… UIModule loaded\n\n"; - // Subscribe to events - testIO->subscribe("ui:click"); - testIO->subscribe("ui:hover"); - testIO->subscribe("ui:action"); - int uiClicksReceived = 0; int uiHoversReceived = 0; + // Subscribe to events with callbacks + testIO->subscribe("ui:click", [&](const Message& msg) { + uiClicksReceived++; + std::cout << "βœ… Received ui:click event\n"; + }); + testIO->subscribe("ui:hover", [&](const Message& msg) { + uiHoversReceived++; + std::cout << "βœ… Received ui:hover event\n"; + }); + testIO->subscribe("ui:action", [](const Message& msg) { + // Just acknowledge action events + }); + // Publish input events via IIO (simulates InputModule output) std::cout << "Publishing input events...\n"; @@ -85,16 +93,9 @@ TEST_CASE("IT_015: UIModule Input Integration", "[integration][input][ui][phase3 // Process UIModule again uiModule->process(inputData); - // Collect UI events + // Dispatch UI events (callbacks handle counting) while (testIO->hasMessages() > 0) { - auto msg = testIO->pullMessage(); - if (msg.topic == "ui:click") { - uiClicksReceived++; - std::cout << "βœ… Received ui:click event\n"; - } else if (msg.topic == "ui:hover") { - uiHoversReceived++; - std::cout << "βœ… Received ui:hover event\n"; - } + testIO->pullAndDispatch(); } std::cout << "\nResults:\n"; diff --git a/tests/integration/IT_015_input_ui_integration_minimal.cpp b/tests/integration/IT_015_input_ui_integration_minimal.cpp index 5cd978c..ff34f48 100644 --- a/tests/integration/IT_015_input_ui_integration_minimal.cpp +++ b/tests/integration/IT_015_input_ui_integration_minimal.cpp @@ -24,15 +24,26 @@ TEST_CASE("IT_015_Minimal: IIO Message Publishing", "[integration][input][ui][mi auto publisher = ioManager.createInstance("publisher"); auto subscriber = ioManager.createInstance("subscriber"); - // Subscribe to input events - subscriber->subscribe("input:mouse:move"); - subscriber->subscribe("input:mouse:button"); - subscriber->subscribe("input:keyboard:key"); - int mouseMoveCount = 0; int mouseButtonCount = 0; int keyboardKeyCount = 0; + // Subscribe to input events with callbacks + subscriber->subscribe("input:mouse:move", [&](const Message& msg) { + mouseMoveCount++; + int x = msg.data->getInt("x", 0); + int y = msg.data->getInt("y", 0); + std::cout << "βœ… Received input:mouse:move (" << x << ", " << y << ")\n"; + }); + subscriber->subscribe("input:mouse:button", [&](const Message& msg) { + mouseButtonCount++; + std::cout << "βœ… Received input:mouse:button\n"; + }); + subscriber->subscribe("input:keyboard:key", [&](const Message& msg) { + keyboardKeyCount++; + std::cout << "βœ… Received input:keyboard:key\n"; + }); + // Publish input events std::cout << "Publishing input events...\n"; @@ -56,24 +67,9 @@ TEST_CASE("IT_015_Minimal: IIO Message Publishing", "[integration][input][ui][mi keyData->setBool("pressed", true); publisher->publish("input:keyboard:key", std::move(keyData)); - // Collect messages + // Dispatch messages to trigger callbacks while (subscriber->hasMessages() > 0) { - auto msg = subscriber->pullMessage(); - - if (msg.topic == "input:mouse:move") { - mouseMoveCount++; - int x = msg.data->getInt("x", 0); - int y = msg.data->getInt("y", 0); - std::cout << "βœ… Received input:mouse:move (" << x << ", " << y << ")\n"; - } - else if (msg.topic == "input:mouse:button") { - mouseButtonCount++; - std::cout << "βœ… Received input:mouse:button\n"; - } - else if (msg.topic == "input:keyboard:key") { - keyboardKeyCount++; - std::cout << "βœ… Received input:keyboard:key\n"; - } + subscriber->pullAndDispatch(); } // Verify diff --git a/tests/integration/test_11_io_system.cpp b/tests/integration/test_11_io_system.cpp index e147be4..b25e6e0 100644 --- a/tests/integration/test_11_io_system.cpp +++ b/tests/integration/test_11_io_system.cpp @@ -225,8 +225,13 @@ int main() { // ======================================================================== std::cout << "=== TEST 1: Basic Publish-Subscribe ===\n"; - // Consumer subscribes to "test:basic" - consumerIO->subscribe("test:basic"); + // Count received messages + int receivedCount = 0; + + // Consumer subscribes to "test:basic" with callback + consumerIO->subscribe("test:basic", [&](const Message& msg) { + receivedCount++; + }); // Publish 100 messages for (int i = 0; i < 100; i++) { @@ -240,11 +245,9 @@ int main() { // Process to allow routing std::this_thread::sleep_for(std::chrono::milliseconds(10)); - // Count received messages - int receivedCount = 0; + // Dispatch messages to trigger callbacks while (consumerIO->hasMessages() > 0) { - auto msg = consumerIO->pullMessage(); - receivedCount++; + consumerIO->pullAndDispatch(); } ASSERT_EQ(receivedCount, 100, "Should receive all 100 messages"); @@ -258,8 +261,15 @@ int main() { // ======================================================================== std::cout << "=== TEST 2: Pattern Matching ===\n"; - // Subscribe to patterns - consumerIO->subscribe("player:.*"); + // Count player messages (should match 3 of 4) + int playerMsgCount = 0; + + // Subscribe to patterns with callback + consumerIO->subscribe("player:.*", [&](const Message& msg) { + if (msg.topic.find("player:") == 0) { + playerMsgCount++; + } + }); // Publish test messages std::vector testTopics = { @@ -276,13 +286,9 @@ int main() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - // Count player messages (should match 3 of 4) - int playerMsgCount = 0; + // Dispatch messages to trigger callbacks while (consumerIO->hasMessages() > 0) { - auto msg = consumerIO->pullMessage(); - if (msg.topic.find("player:") == 0) { - playerMsgCount++; - } + consumerIO->pullAndDispatch(); } std::cout << " Pattern 'player:.*' matched " << playerMsgCount << " messages\n"; @@ -297,11 +303,25 @@ int main() { std::cout << "=== TEST 3: Multi-Module Routing (1-to-many) ===\n"; std::cout << " Testing for known bug: std::move limitation in routing\n"; - // All modules subscribe to "broadcast:.*" - consumerIO->subscribe("broadcast:.*"); - broadcastIO->subscribe("broadcast:.*"); - batchIO->subscribe("broadcast:.*"); - stressIO->subscribe("broadcast:.*"); + // Track received messages per module + int consumerReceived = 0; + int broadcastReceived = 0; + int batchReceived = 0; + int stressReceived = 0; + + // All modules subscribe to "broadcast:.*" with callbacks + consumerIO->subscribe("broadcast:.*", [&](const Message& msg) { + consumerReceived++; + }); + broadcastIO->subscribe("broadcast:.*", [&](const Message& msg) { + broadcastReceived++; + }); + batchIO->subscribe("broadcast:.*", [&](const Message& msg) { + batchReceived++; + }); + stressIO->subscribe("broadcast:.*", [&](const Message& msg) { + stressReceived++; + }); // Publish 10 broadcast messages for (int i = 0; i < 10; i++) { @@ -311,11 +331,11 @@ int main() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - // Check which modules received messages - int consumerReceived = consumerIO->hasMessages(); - int broadcastReceived = broadcastIO->hasMessages(); - int batchReceived = batchIO->hasMessages(); - int stressReceived = stressIO->hasMessages(); + // Dispatch messages to all subscribers + while (consumerIO->hasMessages() > 0) consumerIO->pullAndDispatch(); + while (broadcastIO->hasMessages() > 0) broadcastIO->pullAndDispatch(); + while (batchIO->hasMessages() > 0) batchIO->pullAndDispatch(); + while (stressIO->hasMessages() > 0) stressIO->pullAndDispatch(); std::cout << " Broadcast distribution:\n"; std::cout << " ConsumerModule: " << consumerReceived << " messages\n"; @@ -340,21 +360,25 @@ int main() { reporter.addAssertion("multi_module_routing_tested", true); std::cout << "βœ“ TEST 3 COMPLETED (bug documented)\n\n"; - // Clean up for next test - while (consumerIO->hasMessages() > 0) consumerIO->pullMessage(); - while (broadcastIO->hasMessages() > 0) broadcastIO->pullMessage(); - while (batchIO->hasMessages() > 0) batchIO->pullMessage(); - while (stressIO->hasMessages() > 0) stressIO->pullMessage(); + // Clean up for next test (already dispatched, so just clear any remaining) + while (consumerIO->hasMessages() > 0) consumerIO->pullAndDispatch(); + while (broadcastIO->hasMessages() > 0) broadcastIO->pullAndDispatch(); + while (batchIO->hasMessages() > 0) batchIO->pullAndDispatch(); + while (stressIO->hasMessages() > 0) stressIO->pullAndDispatch(); // ======================================================================== // TEST 4: Low-Frequency Subscriptions (Batching) // ======================================================================== std::cout << "=== TEST 4: Low-Frequency Subscriptions ===\n"; + int batchesReceived = 0; + SubscriptionConfig batchConfig; batchConfig.replaceable = true; batchConfig.batchInterval = 1000; // 1 second - batchIO->subscribeLowFreq("batch:.*", batchConfig); + batchIO->subscribeLowFreq("batch:.*", [&](const Message& msg) { + batchesReceived++; + }, batchConfig); std::cout << " Publishing 100 messages over 2 seconds...\n"; int batchPublished = 0; @@ -375,10 +399,8 @@ int main() { // Check batched messages std::this_thread::sleep_for(std::chrono::milliseconds(100)); - int batchesReceived = 0; while (batchIO->hasMessages() > 0) { - auto msg = batchIO->pullMessage(); - batchesReceived++; + batchIO->pullAndDispatch(); } std::cout << " Published: " << batchPublished << " messages over " << batchDuration << "s\n"; @@ -397,7 +419,9 @@ int main() { // ======================================================================== std::cout << "=== TEST 5: Backpressure & Queue Overflow ===\n"; - consumerIO->subscribe("stress:flood"); + consumerIO->subscribe("stress:flood", [](const Message& msg) { + // Just consume the message (counting not needed for this test) + }); std::cout << " Publishing 10000 messages without pulling...\n"; for (int i = 0; i < 10000; i++) { @@ -421,19 +445,21 @@ int main() { std::cout << "βœ“ TEST 5 PASSED\n\n"; // Clean up queue - while (consumerIO->hasMessages() > 0) consumerIO->pullMessage(); + while (consumerIO->hasMessages() > 0) consumerIO->pullAndDispatch(); // ======================================================================== // TEST 6: Thread Safety (Concurrent Pub/Pull) // ======================================================================== std::cout << "=== TEST 6: Thread Safety ===\n"; - consumerIO->subscribe("thread:.*"); - std::atomic publishedTotal{0}; std::atomic receivedTotal{0}; std::atomic running{true}; + consumerIO->subscribe("thread:.*", [&](const Message& msg) { + receivedTotal++; + }); + std::cout << " Launching 5 publisher threads...\n"; std::vector publishers; for (int t = 0; t < 5; t++) { @@ -457,8 +483,7 @@ int main() { while (running || consumerIO->hasMessages() > 0) { if (consumerIO->hasMessages() > 0) { try { - auto msg = consumerIO->pullMessage(); - receivedTotal++; + consumerIO->pullAndDispatch(); } catch (...) { // Expected: may have race conditions } diff --git a/tests/integration/test_13_cross_system.cpp b/tests/integration/test_13_cross_system.cpp index d3a933d..281fbef 100644 --- a/tests/integration/test_13_cross_system.cpp +++ b/tests/integration/test_13_cross_system.cpp @@ -79,11 +79,28 @@ int main() { // Load config tree->loadConfigFile("gameplay.json"); - // Player subscribes to config changes - playerIO->subscribe("config:gameplay:changed"); + // Track config change events + std::atomic configChangedEvents{0}; + + // Player subscribes to config changes with callback + playerIO->subscribe("config:gameplay:changed", [&](const Message& msg) { + configChangedEvents++; + // Read new config from tree + auto configRoot = tree->getConfigRoot(); + auto gameplay = configRoot->getChild("gameplay"); + if (gameplay) { + std::string difficulty = gameplay->getString("difficulty"); + double hpMult = gameplay->getDouble("hpMultiplier"); + + std::cout << " PlayerModule received config change: difficulty=" << difficulty + << ", hpMult=" << hpMult << "\n"; + + ASSERT_EQ(difficulty, "hard", "Difficulty should be updated"); + ASSERT_TRUE(std::abs(hpMult - 1.5) < 0.001, "HP multiplier should be updated"); + } + }); // Setup reload callback for ConfigWatcher - std::atomic configChangedEvents{0}; tree->onTreeReloaded([&]() { std::cout << " β†’ Config reloaded, publishing event...\n"; auto data = std::make_unique("configChange", nlohmann::json{ @@ -111,24 +128,9 @@ int main() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - // Check if player received message - if (playerIO->hasMessages() > 0) { - auto msg = playerIO->pullMessage(); - configChangedEvents++; - - // Read new config from tree - auto configRoot = tree->getConfigRoot(); - auto gameplay = configRoot->getChild("gameplay"); - if (gameplay) { - std::string difficulty = gameplay->getString("difficulty"); - double hpMult = gameplay->getDouble("hpMultiplier"); - - std::cout << " PlayerModule received config change: difficulty=" << difficulty - << ", hpMult=" << hpMult << "\n"; - - ASSERT_EQ(difficulty, "hard", "Difficulty should be updated"); - ASSERT_TRUE(std::abs(hpMult - 1.5) < 0.001, "HP multiplier should be updated"); - } + // Dispatch player messages (callback handles verification) + while (playerIO->hasMessages() > 0) { + playerIO->pullAndDispatch(); } auto reloadEnd = std::chrono::high_resolution_clock::now(); @@ -166,23 +168,12 @@ int main() { std::cout << " Data saved to disk\n"; - // Economy subscribes to player events FIRST - economyIO->subscribe("player:*"); - - // Then publish level up event - auto levelUpData = std::make_unique("levelUp", nlohmann::json{ - {"event", "level_up"}, - {"newLevel", 6}, - {"goldBonus", 500} - }); - playerIO->publish("player:level_up", std::move(levelUpData)); - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - // Economy processes message int messagesReceived = 0; - while (economyIO->hasMessages() > 0) { - auto msg = economyIO->pullMessage(); + int syncErrors = 0; // Will be used in TEST 3 + + // Economy subscribes to player events with callback + // This callback will be reused across TEST 2 and TEST 3 + economyIO->subscribe("player:*", [&](const Message& msg) { messagesReceived++; std::cout << " EconomyModule received: " << msg.topic << "\n"; @@ -195,10 +186,38 @@ int main() { if (profileData) { int gold = profileData->getInt("gold"); std::cout << " Player gold: " << gold << "\n"; - ASSERT_EQ(gold, 1000, "Gold should match saved value"); + + // For TEST 2: verify initial gold + if (msg.topic == "player:level_up") { + ASSERT_EQ(gold, 1000, "Gold should match saved value"); + } + + // For TEST 3: verify synchronization + if (msg.topic == "player:gold:updated") { + int msgGold = msg.data->getInt("gold"); + if (msgGold != gold) { + std::cerr << " SYNC ERROR: msg=" << msgGold << " data=" << gold << "\n"; + syncErrors++; + } + } } } } + }); + + // Then publish level up event + auto levelUpData = std::make_unique("levelUp", nlohmann::json{ + {"event", "level_up"}, + {"newLevel", 6}, + {"goldBonus", 500} + }); + playerIO->publish("player:level_up", std::move(levelUpData)); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Dispatch economy messages (callback handles verification) + while (economyIO->hasMessages() > 0) { + economyIO->pullAndDispatch(); } ASSERT_EQ(messagesReceived, 1, "Should receive 1 player event"); @@ -211,7 +230,8 @@ int main() { // ======================================================================== std::cout << "\n=== TEST 3: Multi-Module State Synchronization ===\n"; - int syncErrors = 0; + // Note: syncErrors is already declared earlier and captured by the economyIO callback + // The callback will automatically verify synchronization for "player:gold:updated" messages for (int i = 0; i < 10; i++) { // Update gold in DataNode using read-only access @@ -236,27 +256,9 @@ int main() { std::this_thread::sleep_for(std::chrono::milliseconds(5)); - // Economy verifies synchronization - if (economyIO->hasMessages() > 0) { - auto msg = economyIO->pullMessage(); - int msgGold = msg.data->getInt("gold"); - - // Read from DataNode using read-only access - auto dataRoot = tree->getDataRoot(); - if (dataRoot) { - auto playerCheck = dataRoot->getChildReadOnly("player"); - if (playerCheck) { - auto profileCheck = playerCheck->getChildReadOnly("profile"); - if (profileCheck) { - int dataGold = profileCheck->getInt("gold"); - - if (msgGold != dataGold) { - std::cerr << " SYNC ERROR: msg=" << msgGold << " data=" << dataGold << "\n"; - syncErrors++; - } - } - } - } + // Dispatch economy messages (callback will verify synchronization) + while (economyIO->hasMessages() > 0) { + economyIO->pullAndDispatch(); } } @@ -274,12 +276,16 @@ int main() { auto runtimeRoot = tree->getRuntimeRoot(); - // Subscribe to metrics with low-frequency + int snapshotsReceived = 0; + + // Subscribe to metrics with low-frequency and callback SubscriptionConfig metricsConfig; metricsConfig.replaceable = true; metricsConfig.batchInterval = 1000; // 1 second - playerIO->subscribeLowFreq("metrics:*", metricsConfig); + playerIO->subscribeLowFreq("metrics:*", [&](const Message& msg) { + snapshotsReceived++; + }, metricsConfig); // Publish 20 metrics over 2 seconds for (int i = 0; i < 20; i++) { @@ -295,10 +301,8 @@ int main() { // Check batched messages std::this_thread::sleep_for(std::chrono::milliseconds(200)); - int snapshotsReceived = 0; while (playerIO->hasMessages() > 0) { - playerIO->pullMessage(); - snapshotsReceived++; + playerIO->pullAndDispatch(); } std::cout << "Snapshots received: " << snapshotsReceived << " (expected ~2 due to batching)\n"; diff --git a/tests/integration/test_22_bgfx_sprites_headless.cpp b/tests/integration/test_22_bgfx_sprites_headless.cpp index 21ec99d..0118131 100644 --- a/tests/integration/test_22_bgfx_sprites_headless.cpp +++ b/tests/integration/test_22_bgfx_sprites_headless.cpp @@ -54,8 +54,20 @@ TEST_CASE("IIO sprite message routing between modules", "[bgfx][integration]") { auto gameIO = ioManager.createInstance("test_game_module"); auto rendererIO = ioManager.createInstance("test_renderer_module"); - // Renderer subscribes to render topics - rendererIO->subscribe("render:*"); + int messageCount = 0; + bool firstMessageVerified = false; + + // Renderer subscribes to render topics with callback + rendererIO->subscribe("render:*", [&](const Message& msg) { + messageCount++; + if (messageCount == 1) { + // Verify first message + REQUIRE(msg.topic == "render:sprite"); + REQUIRE(msg.data != nullptr); + REQUIRE_THAT(msg.data->getDouble("x"), WithinAbs(100.0, 0.01)); + firstMessageVerified = true; + } + }); // Game module publishes sprites via IIO for (int i = 0; i < 3; ++i) { @@ -71,14 +83,14 @@ TEST_CASE("IIO sprite message routing between modules", "[bgfx][integration]") { gameIO->publish("render:sprite", std::move(spriteData)); } - // Messages should be routed to renderer - REQUIRE(rendererIO->hasMessages() == 3); + // Dispatch messages to trigger callbacks + while (rendererIO->hasMessages() > 0) { + rendererIO->pullAndDispatch(); + } - // Pull and verify first message - auto msg1 = rendererIO->pullMessage(); - REQUIRE(msg1.topic == "render:sprite"); - REQUIRE(msg1.data != nullptr); - REQUIRE_THAT(msg1.data->getDouble("x"), WithinAbs(100.0, 0.01)); + // Verify we received all 3 messages + REQUIRE(messageCount == 3); + REQUIRE(firstMessageVerified); // Cleanup rendererIO->clearAllMessages(); diff --git a/tests/integration/test_threaded_real_modules.cpp b/tests/integration/test_threaded_real_modules.cpp index 159fdfb..052f51e 100644 --- a/tests/integration/test_threaded_real_modules.cpp +++ b/tests/integration/test_threaded_real_modules.cpp @@ -129,12 +129,34 @@ int main() { std::cout << "\n=== Phase 4: Setup IIO Subscriptions ===\n"; - testIO->subscribe("ui:click"); - testIO->subscribe("ui:action"); - testIO->subscribe("ui:value_changed"); - testIO->subscribe("ui:hover"); - testIO->subscribe("render:sprite"); - testIO->subscribe("render:text"); + int uiClickCount = 0; + int uiActionCount = 0; + int uiValueChangeCount = 0; + int uiHoverCount = 0; + int renderSpriteCount = 0; + int renderTextCount = 0; + + testIO->subscribe("ui:click", [&](const Message& msg) { + uiClickCount++; + }); + testIO->subscribe("ui:action", [&](const Message& msg) { + uiActionCount++; + }); + testIO->subscribe("ui:value_changed", [&](const Message& msg) { + uiValueChangeCount++; + }); + testIO->subscribe("ui:hover", [&](const Message& msg) { + bool enter = msg.data->getBool("enter", false); + if (enter) { + uiHoverCount++; + } + }); + testIO->subscribe("render:sprite", [&](const Message& msg) { + renderSpriteCount++; + }); + testIO->subscribe("render:text", [&](const Message& msg) { + renderTextCount++; + }); std::cout << " βœ“ Subscribed to UI events (click, action, value_changed, hover)\n"; std::cout << " βœ“ Subscribed to render events (sprite, text)\n"; @@ -145,13 +167,6 @@ int main() { std::cout << "\n=== Phase 5: Run Parallel Processing (100 frames) ===\n"; - int uiClickCount = 0; - int uiActionCount = 0; - int uiValueChangeCount = 0; - int uiHoverCount = 0; - int renderSpriteCount = 0; - int renderTextCount = 0; - for (int frame = 0; frame < 100; frame++) { // Simulate mouse input at specific frames if (frame == 10) { @@ -182,41 +197,9 @@ int main() { // Process all modules in parallel system->processModules(1.0f / 60.0f); - // Collect IIO messages from modules + // Dispatch IIO messages from modules (callbacks handle counting) while (testIO->hasMessages() > 0) { - auto msg = testIO->pullMessage(); - - if (msg.topic == "ui:click") { - uiClickCount++; - if (frame < 30) { - std::cout << " Frame " << frame << ": UI click event\n"; - } - } - else if (msg.topic == "ui:action") { - uiActionCount++; - if (frame < 30) { - std::string action = msg.data->getString("action", ""); - std::cout << " Frame " << frame << ": UI action '" << action << "'\n"; - } - } - else if (msg.topic == "ui:value_changed") { - uiValueChangeCount++; - } - else if (msg.topic == "ui:hover") { - bool enter = msg.data->getBool("enter", false); - if (enter) { - uiHoverCount++; - if (frame < 30) { - std::cout << " Frame " << frame << ": UI hover event\n"; - } - } - } - else if (msg.topic == "render:sprite") { - renderSpriteCount++; - } - else if (msg.topic == "render:text") { - renderTextCount++; - } + testIO->pullAndDispatch(); } if ((frame + 1) % 20 == 0) { diff --git a/tests/integration/test_threaded_simple_real.cpp b/tests/integration/test_threaded_simple_real.cpp index 29b77cf..c79e988 100644 --- a/tests/integration/test_threaded_simple_real.cpp +++ b/tests/integration/test_threaded_simple_real.cpp @@ -41,13 +41,10 @@ public: void process(const IDataNode& input) override { processCount++; - // Check for incoming messages + // Pull and auto-dispatch incoming messages if (io && !subscribeTopic.empty()) { while (io->hasMessages() > 0) { - auto msg = io->pullMessage(); - if (msg.topic == subscribeTopic) { - logger->info("{}: Received message on '{}'", name, subscribeTopic); - } + io->pullAndDispatch(); // Callback invoked automatically } } @@ -63,9 +60,11 @@ public: void setConfiguration(const IDataNode& configNode, IIO* ioLayer, ITaskScheduler* scheduler) override { io = ioLayer; - // Subscribe if needed + // Subscribe with callback handler if (io && !subscribeTopic.empty()) { - io->subscribe(subscribeTopic); + io->subscribe(subscribeTopic, [this](const Message& msg) { + logger->info("{}: Received message on '{}'", name, msg.topic); + }); logger->info("{}: Subscribed to '{}'", name, subscribeTopic); } diff --git a/tests/modules/BatchModule.cpp b/tests/modules/BatchModule.cpp index e927263..b95841a 100644 --- a/tests/modules/BatchModule.cpp +++ b/tests/modules/BatchModule.cpp @@ -15,17 +15,10 @@ BatchModule::~BatchModule() { void BatchModule::process(const IDataNode& input) { if (!io) return; - // Pull batched messages (should be low-frequency) + // Pull and dispatch batched messages (callbacks invoked automatically) while (io->hasMessages() > 0) { try { - auto msg = io->pullMessage(); - batchCount++; - - bool verbose = input.getBool("verbose", false); - if (verbose) { - std::cout << "[BatchModule] Received batch #" << batchCount - << " on topic: " << msg.topic << std::endl; - } + io->pullAndDispatch(); } catch (const std::exception& e) { std::cerr << "[BatchModule] Error pulling message: " << e.what() << std::endl; } @@ -39,6 +32,13 @@ void BatchModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, ITas this->scheduler = schedulerPtr; config = std::make_unique("config", nlohmann::json::object()); + + // Subscribe to all messages with callback that counts batches + if (io) { + io->subscribe("*", [this](const Message& msg) { + batchCount++; + }); + } } const IDataNode& BatchModule::getConfiguration() { diff --git a/tests/modules/BroadcastModule.cpp b/tests/modules/BroadcastModule.cpp index fc25c35..821e988 100644 --- a/tests/modules/BroadcastModule.cpp +++ b/tests/modules/BroadcastModule.cpp @@ -15,17 +15,10 @@ BroadcastModule::~BroadcastModule() { void BroadcastModule::process(const IDataNode& input) { if (!io) return; - // Pull all available messages + // Pull and dispatch all available messages (callbacks invoked automatically) while (io->hasMessages() > 0) { try { - auto msg = io->pullMessage(); - receivedCount++; - - bool verbose = input.getBool("verbose", false); - if (verbose) { - std::cout << "[BroadcastModule] Received message #" << receivedCount - << " on topic: " << msg.topic << std::endl; - } + io->pullAndDispatch(); } catch (const std::exception& e) { std::cerr << "[BroadcastModule] Error pulling message: " << e.what() << std::endl; } @@ -39,6 +32,13 @@ void BroadcastModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, this->scheduler = schedulerPtr; config = std::make_unique("config", nlohmann::json::object()); + + // Subscribe to all messages with callback that counts them + if (io) { + io->subscribe("*", [this](const Message& msg) { + receivedCount++; + }); + } } const IDataNode& BroadcastModule::getConfiguration() { diff --git a/tests/modules/ConsumerModule.cpp b/tests/modules/ConsumerModule.cpp index d70a1df..e16c5ff 100644 --- a/tests/modules/ConsumerModule.cpp +++ b/tests/modules/ConsumerModule.cpp @@ -15,18 +15,10 @@ ConsumerModule::~ConsumerModule() { void ConsumerModule::process(const IDataNode& input) { if (!io) return; - // Pull all available messages + // Pull and dispatch all available messages (callbacks invoked automatically) while (io->hasMessages() > 0) { try { - auto msg = io->pullMessage(); - receivedCount++; - - // Optionally log message details - bool verbose = input.getBool("verbose", false); - if (verbose) { - std::cout << "[ConsumerModule] Received message #" << receivedCount - << " on topic: " << msg.topic << std::endl; - } + io->pullAndDispatch(); } catch (const std::exception& e) { std::cerr << "[ConsumerModule] Error pulling message: " << e.what() << std::endl; } @@ -41,6 +33,13 @@ void ConsumerModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, I // Store config config = std::make_unique("config", nlohmann::json::object()); + + // Subscribe to all messages with callback that counts them + if (io) { + io->subscribe("*", [this](const Message& msg) { + receivedCount++; + }); + } } const IDataNode& ConsumerModule::getConfiguration() { diff --git a/tests/modules/EconomyModule.cpp b/tests/modules/EconomyModule.cpp index 3c1d163..5a7bd17 100644 --- a/tests/modules/EconomyModule.cpp +++ b/tests/modules/EconomyModule.cpp @@ -12,11 +12,11 @@ EconomyModule::~EconomyModule() { } void EconomyModule::process(const IDataNode& input) { - // Process incoming messages from IO - if (io && io->hasMessages() > 0) { - auto msg = io->pullMessage(); - playerEventsProcessed++; - handlePlayerEvent(msg.topic, msg.data.get()); + // Pull and dispatch all pending messages (callbacks invoked automatically) + if (io) { + while (io->hasMessages() > 0) { + io->pullAndDispatch(); + } } } @@ -29,9 +29,12 @@ void EconomyModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, IT // Store config config = std::make_unique("config", nlohmann::json::object()); - // Subscribe to player events + // Subscribe to player events with callback if (io) { - io->subscribe("player:*"); + io->subscribe("player:*", [this](const Message& msg) { + playerEventsProcessed++; + handlePlayerEvent(msg.topic, msg.data.get()); + }); } } diff --git a/tests/modules/IOStressModule.cpp b/tests/modules/IOStressModule.cpp index b307b1e..da57463 100644 --- a/tests/modules/IOStressModule.cpp +++ b/tests/modules/IOStressModule.cpp @@ -15,11 +15,10 @@ IOStressModule::~IOStressModule() { void IOStressModule::process(const IDataNode& input) { if (!io) return; - // Pull all available messages (high-frequency consumer) + // Pull and dispatch all available messages (high-frequency consumer) while (io->hasMessages() > 0) { try { - auto msg = io->pullMessage(); - receivedCount++; + io->pullAndDispatch(); } catch (const std::exception& e) { std::cerr << "[IOStressModule] Error pulling message: " << e.what() << std::endl; } @@ -33,6 +32,13 @@ void IOStressModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, I this->scheduler = schedulerPtr; config = std::make_unique("config", nlohmann::json::object()); + + // Subscribe to all messages with callback that counts them + if (io) { + io->subscribe("*", [this](const Message& msg) { + receivedCount++; + }); + } } const IDataNode& IOStressModule::getConfiguration() { diff --git a/tests/modules/MetricsModule.cpp b/tests/modules/MetricsModule.cpp index 812f79c..2e5f29c 100644 --- a/tests/modules/MetricsModule.cpp +++ b/tests/modules/MetricsModule.cpp @@ -22,10 +22,11 @@ void MetricsModule::process(const IDataNode& input) { accumulator = 0.0f; } - // Process incoming messages from IO - if (io && io->hasMessages() > 0) { - auto msg = io->pullMessage(); - std::cout << "[MetricsModule] Received: " << msg.topic << std::endl; + // Pull and dispatch all pending messages (callbacks invoked automatically) + if (io) { + while (io->hasMessages() > 0) { + io->pullAndDispatch(); + } } } @@ -38,9 +39,11 @@ void MetricsModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, IT // Store config config = std::make_unique("config", nlohmann::json::object()); - // Subscribe to economy events + // Subscribe to economy events with callback if (io) { - io->subscribe("economy:*"); + io->subscribe("economy:*", [this](const Message& msg) { + std::cout << "[MetricsModule] Received: " << msg.topic << std::endl; + }); } } diff --git a/tests/modules/PlayerModule.cpp b/tests/modules/PlayerModule.cpp index e467254..78219b6 100644 --- a/tests/modules/PlayerModule.cpp +++ b/tests/modules/PlayerModule.cpp @@ -12,12 +12,10 @@ PlayerModule::~PlayerModule() { } void PlayerModule::process(const IDataNode& input) { - // Process incoming messages from IO - if (io && io->hasMessages() > 0) { - auto msg = io->pullMessage(); - - if (msg.topic.find("config:") == 0) { - handleConfigChange(); + // Pull and dispatch all pending messages (callbacks invoked automatically) + if (io) { + while (io->hasMessages() > 0) { + io->pullAndDispatch(); } } } @@ -31,9 +29,11 @@ void PlayerModule::setConfiguration(const IDataNode& configNode, IIO* ioPtr, ITa // Store config config = std::make_unique("config", nlohmann::json::object()); - // Subscribe to config changes + // Subscribe to config changes with callback if (io) { - io->subscribe("config:gameplay:changed"); + io->subscribe("config:gameplay:changed", [this](const Message& msg) { + handleConfigChange(); + }); } } diff --git a/tests/modules/TestControllerModule.cpp b/tests/modules/TestControllerModule.cpp index 1b86880..0d329b4 100644 --- a/tests/modules/TestControllerModule.cpp +++ b/tests/modules/TestControllerModule.cpp @@ -29,16 +29,39 @@ public: std::cout << "[TestController] Initializing...\n"; - // Subscribe to UI events + // Subscribe to UI events with callbacks if (m_io) { - m_io->subscribe("ui:click"); - m_io->subscribe("ui:action"); - m_io->subscribe("ui:value_changed"); - m_io->subscribe("ui:text_changed"); - m_io->subscribe("ui:text_submit"); - m_io->subscribe("ui:hover"); - m_io->subscribe("ui:focus_gained"); - m_io->subscribe("ui:focus_lost"); + m_io->subscribe("ui:click", [this](const grove::Message& msg) { + handleClick(*msg.data); + }); + + m_io->subscribe("ui:action", [this](const grove::Message& msg) { + handleAction(*msg.data); + }); + + m_io->subscribe("ui:value_changed", [this](const grove::Message& msg) { + handleValueChanged(*msg.data); + }); + + m_io->subscribe("ui:text_changed", [this](const grove::Message& msg) { + handleTextChanged(*msg.data); + }); + + m_io->subscribe("ui:text_submit", [this](const grove::Message& msg) { + handleTextSubmit(*msg.data); + }); + + m_io->subscribe("ui:hover", [this](const grove::Message& msg) { + handleHover(*msg.data); + }); + + m_io->subscribe("ui:focus_gained", [this](const grove::Message& msg) { + handleFocusGained(*msg.data); + }); + + m_io->subscribe("ui:focus_lost", [this](const grove::Message& msg) { + handleFocusLost(*msg.data); + }); } std::cout << "[TestController] Subscribed to UI events\n"; @@ -49,34 +72,9 @@ public: m_frameCount++; - // Process incoming UI events + // Pull and dispatch all pending messages (callbacks invoked automatically) while (m_io->hasMessages() > 0) { - auto msg = m_io->pullMessage(); - - if (msg.topic == "ui:click") { - handleClick(*msg.data); - } - else if (msg.topic == "ui:action") { - handleAction(*msg.data); - } - else if (msg.topic == "ui:value_changed") { - handleValueChanged(*msg.data); - } - else if (msg.topic == "ui:text_changed") { - handleTextChanged(*msg.data); - } - else if (msg.topic == "ui:text_submit") { - handleTextSubmit(*msg.data); - } - else if (msg.topic == "ui:hover") { - handleHover(*msg.data); - } - else if (msg.topic == "ui:focus_gained") { - handleFocusGained(*msg.data); - } - else if (msg.topic == "ui:focus_lost") { - handleFocusLost(*msg.data); - } + m_io->pullAndDispatch(); } // Simulate some game logic diff --git a/tests/visual/test_26_ui_buttons.cpp b/tests/visual/test_26_ui_buttons.cpp index 8cb2a7a..a127159 100644 --- a/tests/visual/test_26_ui_buttons.cpp +++ b/tests/visual/test_26_ui_buttons.cpp @@ -80,10 +80,31 @@ int main(int argc, char* argv[]) { std::cout << "IIO Manager setup complete\n"; - // Subscribe to UI events to see button clicks - uiIO->subscribe("ui:click"); - uiIO->subscribe("ui:hover"); - uiIO->subscribe("ui:action"); + // Subscribe to UI events to see button clicks with callbacks + uiIO->subscribe("ui:click", [](const Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + std::cout << " [UI EVENT] Click: " << widgetId << "\n"; + }); + uiIO->subscribe("ui:hover", [](const Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + bool enter = msg.data->getBool("enter", false); + if (enter && !widgetId.empty()) { + std::cout << " [UI EVENT] Hover: " << widgetId << "\n"; + } + }); + bool running = true; // Will be captured by callback + + uiIO->subscribe("ui:action", [&running](const Message& msg) { + std::string action = msg.data->getString("action", ""); + std::string widgetId = msg.data->getString("widgetId", ""); + std::cout << " [UI EVENT] Action: " << action << " (from " << widgetId << ")\n"; + + // Handle quit action + if (action == "app:quit") { + std::cout << "\nQuit button clicked - exiting!\n"; + running = false; + } + }); // ======================================== // Load BgfxRenderer module @@ -186,7 +207,7 @@ int main(int argc, char* argv[]) { std::cout << "\nMove mouse over buttons and click them!\n"; std::cout << "Press ESC to exit or wait 30 seconds\n\n"; - bool running = true; + // running is already declared above with callbacks uint32_t frameCount = 0; Uint32 startTime = SDL_GetTicks(); const Uint32 testDuration = 30000; // 30 seconds @@ -234,32 +255,9 @@ int main(int argc, char* argv[]) { running = false; } - // Check for UI events + // Dispatch UI events (callbacks handle logging and quit action) while (uiIO->hasMessages() > 0) { - auto msg = uiIO->pullMessage(); - - if (msg.topic == "ui:click") { - std::string widgetId = msg.data->getString("widgetId", ""); - std::cout << " [UI EVENT] Click: " << widgetId << "\n"; - } - else if (msg.topic == "ui:hover") { - std::string widgetId = msg.data->getString("widgetId", ""); - bool enter = msg.data->getBool("enter", false); - if (enter && !widgetId.empty()) { - std::cout << " [UI EVENT] Hover: " << widgetId << "\n"; - } - } - else if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - std::string widgetId = msg.data->getString("widgetId", ""); - std::cout << " [UI EVENT] Action: " << action << " (from " << widgetId << ")\n"; - - // Handle quit action - if (action == "app:quit") { - std::cout << "\nQuit button clicked - exiting!\n"; - running = false; - } - } + uiIO->pullAndDispatch(); } // ======================================== diff --git a/tests/visual/test_30_input_module.cpp b/tests/visual/test_30_input_module.cpp index db4caa4..5376a61 100644 --- a/tests/visual/test_30_input_module.cpp +++ b/tests/visual/test_30_input_module.cpp @@ -131,11 +131,72 @@ int main(int argc, char* argv[]) { // Subscribe to input events // ======================================== - testIO->subscribe("input:mouse:move"); - testIO->subscribe("input:mouse:button"); - testIO->subscribe("input:mouse:wheel"); - testIO->subscribe("input:keyboard:key"); - testIO->subscribe("input:keyboard:text"); + // Track last mouse move to avoid spam + int lastMouseX = -1; + int lastMouseY = -1; + + testIO->subscribe("input:mouse:move", [&](const Message& msg) { + int x = msg.data->getInt("x", 0); + int y = msg.data->getInt("y", 0); + + // Only print if position changed (reduce spam) + if (x != lastMouseX || y != lastMouseY) { + std::cout << "[MOUSE MOVE] x=" << std::setw(4) << x + << ", y=" << std::setw(4) << y << "\n"; + lastMouseX = x; + lastMouseY = y; + } + }); + + testIO->subscribe("input:mouse:button", [](const Message& msg) { + int button = msg.data->getInt("button", 0); + bool pressed = msg.data->getBool("pressed", false); + int x = msg.data->getInt("x", 0); + int y = msg.data->getInt("y", 0); + + const char* buttonNames[] = { "LEFT", "MIDDLE", "RIGHT" }; + const char* buttonName = (button >= 0 && button < 3) ? buttonNames[button] : "UNKNOWN"; + + std::cout << "[MOUSE BUTTON] " << buttonName + << " " << (pressed ? "PRESSED" : "RELEASED") + << " at (" << x << ", " << y << ")\n"; + }); + + testIO->subscribe("input:mouse:wheel", [](const Message& msg) { + double delta = msg.data->getDouble("delta", 0.0); + std::cout << "[MOUSE WHEEL] delta=" << delta + << " (" << (delta > 0 ? "UP" : "DOWN") << ")\n"; + }); + + testIO->subscribe("input:keyboard:key", [](const Message& msg) { + int scancode = msg.data->getInt("scancode", 0); + bool pressed = msg.data->getBool("pressed", false); + bool repeat = msg.data->getBool("repeat", false); + bool shift = msg.data->getBool("shift", false); + bool ctrl = msg.data->getBool("ctrl", false); + bool alt = msg.data->getBool("alt", false); + + const char* keyName = SDL_GetScancodeName(static_cast(scancode)); + + std::cout << "[KEYBOARD KEY] " << keyName + << " " << (pressed ? "PRESSED" : "RELEASED"); + + if (repeat) std::cout << " (REPEAT)"; + if (shift || ctrl || alt) { + std::cout << " ["; + if (shift) std::cout << "SHIFT "; + if (ctrl) std::cout << "CTRL "; + if (alt) std::cout << "ALT"; + std::cout << "]"; + } + + std::cout << "\n"; + }); + + testIO->subscribe("input:keyboard:text", [](const Message& msg) { + std::string text = msg.data->getString("text", ""); + std::cout << "[KEYBOARD TEXT] \"" << text << "\"\n"; + }); std::cout << "Subscribed to all input topics\n"; std::cout << "========================================\n\n"; @@ -148,10 +209,6 @@ int main(int argc, char* argv[]) { uint32_t frameCount = 0; uint32_t lastTime = SDL_GetTicks(); - // Track last mouse move to avoid spam - int lastMouseX = -1; - int lastMouseY = -1; - while (running) { frameCount++; @@ -174,68 +231,9 @@ int main(int argc, char* argv[]) { grove::JsonDataNode input("input"); inputModule->process(input); - // 3. Process IIO messages from InputModule + // 3. Dispatch IIO messages from InputModule (callbacks handle printing) while (testIO->hasMessages() > 0) { - auto msg = testIO->pullMessage(); - - if (msg.topic == "input:mouse:move") { - int x = msg.data->getInt("x", 0); - int y = msg.data->getInt("y", 0); - - // Only print if position changed (reduce spam) - if (x != lastMouseX || y != lastMouseY) { - std::cout << "[MOUSE MOVE] x=" << std::setw(4) << x - << ", y=" << std::setw(4) << y << "\n"; - lastMouseX = x; - lastMouseY = y; - } - } - else if (msg.topic == "input:mouse:button") { - int button = msg.data->getInt("button", 0); - bool pressed = msg.data->getBool("pressed", false); - int x = msg.data->getInt("x", 0); - int y = msg.data->getInt("y", 0); - - const char* buttonNames[] = { "LEFT", "MIDDLE", "RIGHT" }; - const char* buttonName = (button >= 0 && button < 3) ? buttonNames[button] : "UNKNOWN"; - - std::cout << "[MOUSE BUTTON] " << buttonName - << " " << (pressed ? "PRESSED" : "RELEASED") - << " at (" << x << ", " << y << ")\n"; - } - else if (msg.topic == "input:mouse:wheel") { - double delta = msg.data->getDouble("delta", 0.0); - std::cout << "[MOUSE WHEEL] delta=" << delta - << " (" << (delta > 0 ? "UP" : "DOWN") << ")\n"; - } - else if (msg.topic == "input:keyboard:key") { - int scancode = msg.data->getInt("scancode", 0); - bool pressed = msg.data->getBool("pressed", false); - bool repeat = msg.data->getBool("repeat", false); - bool shift = msg.data->getBool("shift", false); - bool ctrl = msg.data->getBool("ctrl", false); - bool alt = msg.data->getBool("alt", false); - - const char* keyName = SDL_GetScancodeName(static_cast(scancode)); - - std::cout << "[KEYBOARD KEY] " << keyName - << " " << (pressed ? "PRESSED" : "RELEASED"); - - if (repeat) std::cout << " (REPEAT)"; - if (shift || ctrl || alt) { - std::cout << " ["; - if (shift) std::cout << "SHIFT "; - if (ctrl) std::cout << "CTRL "; - if (alt) std::cout << "ALT"; - std::cout << "]"; - } - - std::cout << "\n"; - } - else if (msg.topic == "input:keyboard:text") { - std::string text = msg.data->getString("text", ""); - std::cout << "[KEYBOARD TEXT] \"" << text << "\"\n"; - } + testIO->pullAndDispatch(); } // 4. Cap at ~60 FPS diff --git a/tests/visual/test_3buttons_minimal.cpp b/tests/visual/test_3buttons_minimal.cpp index 3719432..350e7cb 100644 --- a/tests/visual/test_3buttons_minimal.cpp +++ b/tests/visual/test_3buttons_minimal.cpp @@ -48,9 +48,15 @@ int main(int argc, char* argv[]) { auto uiIO = IntraIOManager::getInstance().createInstance("ui"); auto rendererIO = IntraIOManager::getInstance().createInstance("renderer"); - gameIO->subscribe("ui:hover"); - gameIO->subscribe("ui:click"); - gameIO->subscribe("ui:action"); + gameIO->subscribe("ui:hover", [](const Message& msg) { + // Hover events (not logged to avoid spam) + }); + gameIO->subscribe("ui:click", [&logger](const Message& msg) { + logger->info("πŸ–±οΈ BOUTON CLICKΓ‰!"); + }); + gameIO->subscribe("ui:action", [&logger](const Message& msg) { + logger->info("πŸ–±οΈ ACTION!"); + }); // Initialize BgfxRenderer WITH 3 TEXTURES loaded via config auto renderer = std::make_unique(); @@ -176,12 +182,9 @@ int main(int argc, char* argv[]) { } } - // Check for UI events + // Dispatch UI events (callbacks handle logging) while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - if (msg.topic == "ui:action") { - logger->info("πŸ–±οΈ BOUTON CLICKΓ‰!"); - } + gameIO->pullAndDispatch(); } // Update modules diff --git a/tests/visual/test_single_button.cpp b/tests/visual/test_single_button.cpp index fd474a0..ccb53fb 100644 --- a/tests/visual/test_single_button.cpp +++ b/tests/visual/test_single_button.cpp @@ -45,10 +45,22 @@ int main(int argc, char* argv[]) { auto uiIO = IntraIOManager::getInstance().createInstance("ui"); auto gameIO = IntraIOManager::getInstance().createInstance("game"); - // Subscribe to UI events for logging - gameIO->subscribe("ui:hover"); - gameIO->subscribe("ui:click"); - gameIO->subscribe("ui:action"); + // Subscribe to UI events for logging with callbacks + gameIO->subscribe("ui:hover", [&logger](const Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + bool enter = msg.data->getBool("enter", false); + logger->info("[UI EVENT] HOVER {} widget '{}'", + enter ? "ENTER" : "LEAVE", widgetId); + }); + gameIO->subscribe("ui:click", [&logger](const Message& msg) { + std::string widgetId = msg.data->getString("widgetId", ""); + logger->info("[UI EVENT] CLICK on widget '{}'", widgetId); + }); + gameIO->subscribe("ui:action", [&logger](const Message& msg) { + std::string action = msg.data->getString("action", ""); + std::string widgetId = msg.data->getString("widgetId", ""); + logger->info("[UI EVENT] ACTION '{}' from widget '{}'", action, widgetId); + }); // Initialize BgfxRenderer auto renderer = std::make_unique(); @@ -176,25 +188,9 @@ int main(int argc, char* argv[]) { } } - // Check for UI events + // Dispatch UI events (callbacks handle logging) while (gameIO->hasMessages() > 0) { - auto msg = gameIO->pullMessage(); - - if (msg.topic == "ui:hover") { - std::string widgetId = msg.data->getString("widgetId", ""); - bool enter = msg.data->getBool("enter", false); - logger->info("[UI EVENT] HOVER {} widget '{}'", - enter ? "ENTER" : "LEAVE", widgetId); - } - else if (msg.topic == "ui:click") { - std::string widgetId = msg.data->getString("widgetId", ""); - logger->info("[UI EVENT] CLICK on widget '{}'", widgetId); - } - else if (msg.topic == "ui:action") { - std::string action = msg.data->getString("action", ""); - std::string widgetId = msg.data->getString("widgetId", ""); - logger->info("[UI EVENT] ACTION '{}' from widget '{}'", action, widgetId); - } + gameIO->pullAndDispatch(); } JsonDataNode input("input");