#pragma once #include #include #include #include #include #include #include #include #include #include #include namespace aissia { using json = nlohmann::json; /** * @brief Synchronous request/response bridge over IIO pub/sub * * Allows tools to make blocking requests to modules and wait for responses. * Each request gets a unique correlation ID to match responses. * * Usage: * auto response = bridge.request("scheduler:query", {"action": "get_current_task"}, 1000); */ class IOBridge { public: explicit IOBridge(grove::IIO* io) : m_io(io) { m_logger = spdlog::get("IOBridge"); if (!m_logger) { m_logger = spdlog::stdout_color_mt("IOBridge"); } } /** * @brief Send a request and wait for response * * @param topic Topic to publish request to * @param request Request data (will add correlation_id) * @param timeoutMs Timeout in milliseconds * @return Response data or error JSON */ json request(const std::string& topic, const json& request, int timeoutMs = 5000) { std::string correlationId = generateCorrelationId(); // Create response promise { std::lock_guard lock(m_mutex); m_pendingRequests[correlationId] = std::make_shared(); } // Build request with correlation ID auto requestNode = std::make_unique("request"); requestNode->setString("correlation_id", correlationId); // Copy all fields from input request for (auto& [key, value] : request.items()) { if (value.is_string()) { requestNode->setString(key, value.get()); } else if (value.is_number_integer()) { requestNode->setInt(key, value.get()); } else if (value.is_number_float()) { requestNode->setDouble(key, value.get()); } else if (value.is_boolean()) { requestNode->setBool(key, value.get()); } } // Publish request m_io->publish(topic, std::move(requestNode)); m_logger->debug("Request sent to {} with correlation_id={}", topic, correlationId); // Wait for response auto pending = m_pendingRequests[correlationId]; std::unique_lock lock(pending->mutex); bool received = pending->cv.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return pending->hasResponse; }); // Cleanup { std::lock_guard globalLock(m_mutex); m_pendingRequests.erase(correlationId); } if (!received) { m_logger->warn("Request to {} timed out after {}ms", topic, timeoutMs); return {{"error", "timeout"}, {"message", "Request timed out"}}; } return pending->response; } /** * @brief Process incoming response messages * * Call this from the service's process loop to handle responses * that come back from modules. * * @param topic The topic the message was received on * @param data The message data */ void handleResponse(const std::string& topic, const grove::IDataNode& data) { std::string correlationId = data.getString("correlation_id", ""); if (correlationId.empty()) { return; // Not a response to our request } std::shared_ptr pending; { std::lock_guard lock(m_mutex); auto it = m_pendingRequests.find(correlationId); if (it == m_pendingRequests.end()) { m_logger->debug("Received response for unknown correlation_id={}", correlationId); return; } pending = it->second; } // Convert IDataNode to JSON json response; // Extract common fields - this is a simplified conversion // For complex nested data, we'd need to traverse the tree auto* jsonNode = dynamic_cast(&data); if (jsonNode) { response = jsonNode->getJsonData(); } else { // Fallback: extract known field types response["correlation_id"] = correlationId; // Add more fields as needed based on what modules return } // Signal the waiting thread { std::lock_guard lock(pending->mutex); pending->response = response; pending->hasResponse = true; } pending->cv.notify_one(); m_logger->debug("Response received for correlation_id={}", correlationId); } /** * @brief Subscribe to response topics * * Call this during initialization to listen for responses */ void subscribeToResponses() { if (!m_io) return; grove::SubscriptionConfig config; // Subscribe to all response topics m_io->subscribe("scheduler:response", config); m_io->subscribe("monitoring:response", config); m_io->subscribe("storage:response", config); m_io->subscribe("voice:response", config); m_io->subscribe("tool:response", config); } private: struct PendingRequest { std::mutex mutex; std::condition_variable cv; json response; bool hasResponse = false; }; grove::IIO* m_io; std::shared_ptr m_logger; std::mutex m_mutex; std::unordered_map> m_pendingRequests; std::atomic m_requestCounter{0}; std::string generateCorrelationId() { auto now = std::chrono::steady_clock::now().time_since_epoch().count(); return "req_" + std::to_string(now) + "_" + std::to_string(m_requestCounter++); } }; } // namespace aissia