GroveEngine/docs/plans/PLAN_scenario_11_io_system.md
StillHammer 572e133f4e docs: Consolidate all plans into docs/plans/ directory
- Create new docs/plans/ directory with organized structure
- Add comprehensive PLAN_deadlock_detection_prevention.md (15h plan)
  - ThreadSanitizer integration (2h)
  - Helgrind validation (3h)
  - std::scoped_lock refactoring (4h)
  - std::shared_mutex optimization (6h)
- Migrate 16 plans from planTI/ to docs/plans/
  - Rename all files to PLAN_*.md convention
  - Update README.md with index and statuses
- Remove old planTI/ directory
- Add run_all_tests.sh script for test automation

Plans now include:
- 1 active development plan (deadlock prevention)
- 3 test architecture plans
- 13 integration test scenario plans

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-21 19:32:33 +08:00

27 KiB

Scénario 11: IO System Stress Test

Priorité: SHOULD HAVE Phase: 2 (SHOULD HAVE) Durée estimée: ~5 minutes Effort implémentation: ~4-6 heures


🎯 Objectif

Valider que le système IntraIO (pub/sub intra-process) fonctionne correctement dans tous les cas d'usage:

  • Pattern matching avec wildcards et regex
  • Multi-module routing (1-to-1, 1-to-many)
  • Message batching et flushing (low-frequency subscriptions)
  • Backpressure et queue overflow
  • Thread safety (concurrent publish/pull)
  • Health monitoring et métriques
  • Subscription lifecycle

Bug connu à valider: IntraIOManager ne route qu'au premier subscriber (limitation std::move sans clone)


📋 Description

Setup Initial

  1. Créer 5 modules avec IntraIO:

    • ProducerModule - Publie 1000 msg/s sur différents topics
    • ConsumerModule - Souscrit à plusieurs patterns
    • BroadcastModule - Publie sur topics avec multiples subscribers
    • BatchModule - Utilise low-frequency subscriptions
    • StressModule - Stress test avec 10k msg/s
  2. Configurer IntraIOManager avec routage entre modules

  3. Tester 8 scénarios différents sur 5 minutes

Test Séquence

Test 1: Basic Publish-Subscribe (30s)

  1. ProducerModule publie 100 messages sur "test:basic"
  2. ConsumerModule souscrit à "test:basic"
  3. Vérifier:
    • 100 messages reçus
    • Ordre FIFO préservé
    • Aucun message perdu

Test 2: Pattern Matching (30s)

  1. ProducerModule publie sur:
    • "player:001:position"
    • "player:001:health"
    • "player:002:position"
    • "enemy:001:position"
  2. ConsumerModule souscrit aux patterns:
    • "player:*" (devrait matcher 3 messages)
    • "player:001:*" (devrait matcher 2 messages)
    • "*:position" (devrait matcher 3 messages)
  3. Vérifier matching counts corrects

Test 3: Multi-Module Routing (60s)

  1. ProducerModule publie "broadcast:data" (100 messages)
  2. ConsumerModule, BatchModule, StressModule souscrivent tous à "broadcast:*"
  3. Vérifier:
    • Bug attendu: Seul le premier subscriber reçoit (limitation clone)
    • Logger quel module reçoit
    • Documenter le bug pour fix futur

Test 4: Message Batching (60s)

  1. BatchModule configure low-frequency subscription:
    • Pattern: "batch:*"
    • Interval: 1000ms
    • replaceable: true
  2. ProducerModule publie "batch:metric" à 100 Hz (toutes les 10ms)
  3. Vérifier:
    • BatchModule reçoit ~1 message/seconde (dernier seulement)
    • Batching fonctionne correctement

Test 5: Backpressure & Queue Overflow (30s)

  1. ProducerModule publie 50k messages sur "stress:flood"
  2. ConsumerModule souscrit mais ne pull que 100 msg/s
  3. Vérifier:
    • Queue overflow détecté (health.dropping = true)
    • Messages droppés comptés (health.droppedMessageCount > 0)
    • Système reste stable (pas de crash)

Test 6: Thread Safety (60s)

  1. Lancer 10 threads qui publient simultanément (1000 msg chacun)
  2. Lancer 5 threads qui pullent simultanément
  3. Vérifier:
    • Aucun crash
    • Aucune corruption de données
    • Total messages reçus = total envoyés (ou moins si overflow)

Test 7: Health Monitoring (30s)

  1. ProducerModule publie à différents débits:
    • Phase 1: 100 msg/s (normal)
    • Phase 2: 10k msg/s (overload)
    • Phase 3: 100 msg/s (recovery)
  2. Monitorer health metrics:
    • queueSize augmente/diminue correctement
    • averageProcessingRate reflète réalité
    • dropping flag activé/désactivé au bon moment

Test 8: Subscription Lifecycle (30s)

  1. Créer/détruire subscriptions dynamiquement
  2. Vérifier:
    • Messages après unsubscribe ne sont pas reçus
    • Re-subscribe fonctionne
    • Pas de leak de subscriptions dans IntraIOManager

🏗️ Implémentation

ProducerModule Structure

// ProducerModule.h
class ProducerModule : public IModule {
public:
    void initialize(std::shared_ptr<IDataNode> config) override;
    void process(float deltaTime) override;
    std::shared_ptr<IDataNode> getState() const override;
    void setState(std::shared_ptr<IDataNode> state) override;
    bool isIdle() const override { return true; }

private:
    std::shared_ptr<IIO> io;
    int messageCount = 0;
    float publishRate = 100.0f; // Hz
    float accumulator = 0.0f;

    void publishTestMessages();
};

ConsumerModule Structure

// ConsumerModule.h
class ConsumerModule : public IModule {
public:
    void initialize(std::shared_ptr<IDataNode> config) override;
    void process(float deltaTime) override;
    std::shared_ptr<IDataNode> getState() const override;
    void setState(std::shared_ptr<IDataNode> state) override;
    bool isIdle() const override { return true; }

    // Test helpers
    int getReceivedCount() const { return receivedMessages.size(); }
    const std::vector<IIO::Message>& getMessages() const { return receivedMessages; }

private:
    std::shared_ptr<IIO> io;
    std::vector<IIO::Message> receivedMessages;

    void processIncomingMessages();
};

Test Principal

// test_11_io_system.cpp
#include "helpers/TestMetrics.h"
#include "helpers/TestAssertions.h"
#include "helpers/TestReporter.h"
#include <thread>
#include <atomic>

int main() {
    TestReporter reporter("IO System Stress Test");
    TestMetrics metrics;

    // === SETUP ===
    DebugEngine engine;

    // Charger modules
    engine.loadModule("ProducerModule", "build/modules/libProducerModule.so");
    engine.loadModule("ConsumerModule", "build/modules/libConsumerModule.so");
    engine.loadModule("BroadcastModule", "build/modules/libBroadcastModule.so");
    engine.loadModule("BatchModule", "build/modules/libBatchModule.so");
    engine.loadModule("StressModule", "build/modules/libStressModule.so");

    // Initialiser avec IOFactory
    auto config = createJsonConfig({
        {"transport", "intra"},
        {"instanceId", "test_engine"}
    });

    engine.initializeModule("ProducerModule", config);
    engine.initializeModule("ConsumerModule", config);
    engine.initializeModule("BroadcastModule", config);
    engine.initializeModule("BatchModule", config);
    engine.initializeModule("StressModule", config);

    // ========================================================================
    // TEST 1: Basic Publish-Subscribe
    // ========================================================================
    std::cout << "\n=== TEST 1: Basic Publish-Subscribe ===\n";

    // ConsumerModule subscribe to "test:basic"
    auto consumerIO = engine.getModuleIO("ConsumerModule");
    consumerIO->subscribe("test:basic", {});

    // ProducerModule publie 100 messages
    auto producerIO = engine.getModuleIO("ProducerModule");
    for (int i = 0; i < 100; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{
            {"id", i},
            {"payload", "test_message_" + std::to_string(i)}
        });
        producerIO->publish("test:basic", std::move(data));
    }

    // Process pour permettre routing
    engine.update(1.0f/60.0f);

    // Vérifier réception
    int receivedCount = 0;
    while (consumerIO->hasMessages() > 0) {
        auto msg = consumerIO->pullMessage();
        receivedCount++;

        // Vérifier ordre FIFO
        auto* jsonData = dynamic_cast<JsonDataNode*>(msg.data.get());
        int msgId = jsonData->getJsonData()["id"];
        ASSERT_EQ(msgId, receivedCount - 1, "Messages should be in FIFO order");
    }

    ASSERT_EQ(receivedCount, 100, "Should receive all 100 messages");
    reporter.addAssertion("basic_pubsub", receivedCount == 100);
    std::cout << "✓ TEST 1 PASSED: " << receivedCount << " messages received\n";

    // ========================================================================
    // TEST 2: Pattern Matching
    // ========================================================================
    std::cout << "\n=== TEST 2: Pattern Matching ===\n";

    // Subscribe to different patterns
    consumerIO->subscribe("player:*", {});
    consumerIO->subscribe("player:001:*", {});
    consumerIO->subscribe("*:position", {});

    // Publish test messages
    std::vector<std::string> testTopics = {
        "player:001:position",  // Matches all 3 patterns
        "player:001:health",    // Matches pattern 1 and 2
        "player:002:position",  // Matches pattern 1 and 3
        "enemy:001:position"    // Matches pattern 3 only
    };

    for (const auto& topic : testTopics) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"topic", topic}});
        producerIO->publish(topic, std::move(data));
    }

    engine.update(1.0f/60.0f);

    // Count messages by pattern
    std::map<std::string, int> patternCounts;
    while (consumerIO->hasMessages() > 0) {
        auto msg = consumerIO->pullMessage();
        auto* jsonData = dynamic_cast<JsonDataNode*>(msg.data.get());
        std::string topic = jsonData->getJsonData()["topic"];
        patternCounts[topic]++;
    }

    // Note: Due to pattern overlap, same message might be received multiple times
    std::cout << "Pattern matching results:\n";
    for (const auto& [topic, count] : patternCounts) {
        std::cout << "  " << topic << ": " << count << " times\n";
    }

    reporter.addAssertion("pattern_matching", true);
    std::cout << "✓ TEST 2 PASSED\n";

    // ========================================================================
    // TEST 3: Multi-Module Routing (Bug Detection)
    // ========================================================================
    std::cout << "\n=== TEST 3: Multi-Module Routing (1-to-many) ===\n";

    // All modules subscribe to "broadcast:*"
    consumerIO->subscribe("broadcast:*", {});
    auto broadcastIO = engine.getModuleIO("BroadcastModule");
    broadcastIO->subscribe("broadcast:*", {});
    auto batchIO = engine.getModuleIO("BatchModule");
    batchIO->subscribe("broadcast:*", {});
    auto stressIO = engine.getModuleIO("StressModule");
    stressIO->subscribe("broadcast:*", {});

    // Publish 10 broadcast messages
    for (int i = 0; i < 10; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"broadcast_id", i}});
        producerIO->publish("broadcast:data", std::move(data));
    }

    engine.update(1.0f/60.0f);

    // Check which modules received messages
    int consumerReceived = consumerIO->hasMessages();
    int broadcastReceived = broadcastIO->hasMessages();
    int batchReceived = batchIO->hasMessages();
    int stressReceived = stressIO->hasMessages();

    std::cout << "Broadcast distribution:\n";
    std::cout << "  ConsumerModule: " << consumerReceived << " messages\n";
    std::cout << "  BroadcastModule: " << broadcastReceived << " messages\n";
    std::cout << "  BatchModule: " << batchReceived << " messages\n";
    std::cout << "  StressModule: " << stressReceived << " messages\n";

    // Expected: Only ONE module receives due to std::move limitation
    int totalReceived = consumerReceived + broadcastReceived + batchReceived + stressReceived;

    if (totalReceived == 10) {
        std::cout << "⚠️  BUG: Only one module received all messages (clone() not implemented)\n";
        reporter.addMetric("broadcast_bug_present", 1.0f);
    } else if (totalReceived == 40) {
        std::cout << "✓ FIXED: All modules received copies (clone() implemented!)\n";
        reporter.addMetric("broadcast_bug_present", 0.0f);
    }

    reporter.addAssertion("multi_module_routing_tested", true);
    std::cout << "✓ TEST 3 COMPLETED (bug documented)\n";

    // ========================================================================
    // TEST 4: Message Batching
    // ========================================================================
    std::cout << "\n=== TEST 4: Message Batching (Low-Frequency) ===\n";

    // Configure low-freq subscription
    IIO::SubscriptionConfig batchConfig;
    batchConfig.replaceable = true;
    batchConfig.batchInterval = 1000; // 1 second
    batchIO->subscribeLowFreq("batch:*", batchConfig);

    // Publish at 100 Hz for 3 seconds (300 messages)
    auto batchStart = std::chrono::high_resolution_clock::now();
    int batchedPublished = 0;

    for (int sec = 0; sec < 3; sec++) {
        for (int i = 0; i < 100; i++) {
            auto data = std::make_unique<JsonDataNode>(nlohmann::json{
                {"timestamp", batchedPublished},
                {"value", batchedPublished * 0.1f}
            });
            producerIO->publish("batch:metric", std::move(data));
            batchedPublished++;

            // Simulate 10ms interval
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            engine.update(1.0f/60.0f);
        }
    }

    auto batchEnd = std::chrono::high_resolution_clock::now();
    float batchDuration = std::chrono::duration<float>(batchEnd - batchStart).count();

    // Check how many batched messages received
    int batchesReceived = 0;
    while (batchIO->hasMessages() > 0) {
        auto msg = batchIO->pullMessage();
        batchesReceived++;
    }

    std::cout << "Published: " << batchedPublished << " messages over " << batchDuration << "s\n";
    std::cout << "Received: " << batchesReceived << " batches\n";
    std::cout << "Expected: ~" << static_cast<int>(batchDuration) << " batches (1/second)\n";

    // Should receive ~3 batches (1 per second)
    ASSERT_TRUE(batchesReceived >= 2 && batchesReceived <= 4,
                "Should receive 2-4 batches for 3 seconds");
    reporter.addMetric("batch_count", batchesReceived);
    reporter.addAssertion("batching_works", batchesReceived >= 2);
    std::cout << "✓ TEST 4 PASSED\n";

    // ========================================================================
    // TEST 5: Backpressure & Queue Overflow
    // ========================================================================
    std::cout << "\n=== TEST 5: Backpressure & Queue Overflow ===\n";

    // Subscribe but don't pull
    consumerIO->subscribe("stress:flood", {});

    // Flood with 50k messages
    std::cout << "Publishing 50000 messages...\n";
    for (int i = 0; i < 50000; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"flood_id", i}});
        producerIO->publish("stress:flood", std::move(data));

        if (i % 10000 == 0) {
            std::cout << "  " << i << " messages published\n";
        }
    }

    engine.update(1.0f/60.0f);

    // Check health
    auto health = consumerIO->getHealth();
    std::cout << "Health status:\n";
    std::cout << "  Queue size: " << health.queueSize << " / " << health.maxQueueSize << "\n";
    std::cout << "  Dropping: " << (health.dropping ? "YES" : "NO") << "\n";
    std::cout << "  Dropped count: " << health.droppedMessageCount << "\n";
    std::cout << "  Processing rate: " << health.averageProcessingRate << " msg/s\n";

    ASSERT_TRUE(health.queueSize > 0, "Queue should have messages");

    // Likely queue overflow happened
    if (health.dropping || health.droppedMessageCount > 0) {
        std::cout << "✓ Backpressure detected correctly\n";
        reporter.addAssertion("backpressure_detected", true);
    }

    reporter.addMetric("queue_size", health.queueSize);
    reporter.addMetric("dropped_messages", health.droppedMessageCount);
    std::cout << "✓ TEST 5 PASSED\n";

    // ========================================================================
    // TEST 6: Thread Safety
    // ========================================================================
    std::cout << "\n=== TEST 6: Thread Safety (Concurrent Pub/Pull) ===\n";

    std::atomic<int> publishedTotal{0};
    std::atomic<int> receivedTotal{0};
    std::atomic<bool> running{true};

    consumerIO->subscribe("thread:*", {});

    // 10 publisher threads
    std::vector<std::thread> publishers;
    for (int t = 0; t < 10; t++) {
        publishers.emplace_back([&, t]() {
            for (int i = 0; i < 1000; i++) {
                auto data = std::make_unique<JsonDataNode>(nlohmann::json{
                    {"thread", t},
                    {"id", i}
                });
                producerIO->publish("thread:test", std::move(data));
                publishedTotal++;
            }
        });
    }

    // 5 consumer threads
    std::vector<std::thread> consumers;
    for (int t = 0; t < 5; t++) {
        consumers.emplace_back([&]() {
            while (running || consumerIO->hasMessages() > 0) {
                if (consumerIO->hasMessages() > 0) {
                    try {
                        auto msg = consumerIO->pullMessage();
                        receivedTotal++;
                    } catch (...) {
                        std::cerr << "ERROR: Exception during pull\n";
                    }
                }
                std::this_thread::sleep_for(std::chrono::microseconds(100));
            }
        });
    }

    // Wait for publishers
    for (auto& t : publishers) {
        t.join();
    }

    std::cout << "All publishers done: " << publishedTotal << " messages\n";

    // Let consumers finish
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    running = false;

    for (auto& t : consumers) {
        t.join();
    }

    std::cout << "All consumers done: " << receivedTotal << " messages\n";

    // May have drops, but should be stable
    ASSERT_GT(receivedTotal, 0, "Should receive at least some messages");
    reporter.addMetric("concurrent_published", publishedTotal);
    reporter.addMetric("concurrent_received", receivedTotal);
    reporter.addAssertion("thread_safety", true); // No crash = success
    std::cout << "✓ TEST 6 PASSED (no crashes)\n";

    // ========================================================================
    // TEST 7: Health Monitoring Accuracy
    // ========================================================================
    std::cout << "\n=== TEST 7: Health Monitoring Accuracy ===\n";

    consumerIO->subscribe("health:*", {});

    // Phase 1: Normal load (100 msg/s)
    std::cout << "Phase 1: Normal load (100 msg/s for 2s)\n";
    for (int i = 0; i < 200; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"phase", 1}});
        producerIO->publish("health:test", std::move(data));
        std::this_thread::sleep_for(std::chrono::milliseconds(10));

        // Pull to keep queue low
        if (consumerIO->hasMessages() > 0) {
            consumerIO->pullMessage();
        }
    }

    auto healthPhase1 = consumerIO->getHealth();
    std::cout << "  Queue: " << healthPhase1.queueSize << ", Dropping: " << healthPhase1.dropping << "\n";

    // Phase 2: Overload (10k msg/s without pulling)
    std::cout << "Phase 2: Overload (10000 msg/s for 1s)\n";
    for (int i = 0; i < 10000; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"phase", 2}});
        producerIO->publish("health:test", std::move(data));
    }
    engine.update(1.0f/60.0f);

    auto healthPhase2 = consumerIO->getHealth();
    std::cout << "  Queue: " << healthPhase2.queueSize << ", Dropping: " << healthPhase2.dropping << "\n";

    ASSERT_GT(healthPhase2.queueSize, healthPhase1.queueSize,
              "Queue should grow during overload");

    // Phase 3: Recovery (pull all)
    std::cout << "Phase 3: Recovery (pulling all messages)\n";
    int pulled = 0;
    while (consumerIO->hasMessages() > 0) {
        consumerIO->pullMessage();
        pulled++;
    }

    auto healthPhase3 = consumerIO->getHealth();
    std::cout << "  Pulled: " << pulled << " messages\n";
    std::cout << "  Queue: " << healthPhase3.queueSize << ", Dropping: " << healthPhase3.dropping << "\n";

    ASSERT_EQ(healthPhase3.queueSize, 0, "Queue should be empty after pulling all");
    reporter.addAssertion("health_monitoring", true);
    std::cout << "✓ TEST 7 PASSED\n";

    // ========================================================================
    // TEST 8: Subscription Lifecycle
    // ========================================================================
    std::cout << "\n=== TEST 8: Subscription Lifecycle ===\n";

    // Subscribe
    consumerIO->subscribe("lifecycle:test", {});

    // Publish 10 messages
    for (int i = 0; i < 10; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
        producerIO->publish("lifecycle:test", std::move(data));
    }
    engine.update(1.0f/60.0f);

    int count1 = 0;
    while (consumerIO->hasMessages() > 0) {
        consumerIO->pullMessage();
        count1++;
    }
    ASSERT_EQ(count1, 10, "Should receive 10 messages");

    // Unsubscribe (if API exists - might not be implemented yet)
    // consumerIO->unsubscribe("lifecycle:test");

    // Publish 10 more
    for (int i = 10; i < 20; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
        producerIO->publish("lifecycle:test", std::move(data));
    }
    engine.update(1.0f/60.0f);

    // If unsubscribe exists, should receive 0. If not, will receive 10.
    int count2 = 0;
    while (consumerIO->hasMessages() > 0) {
        consumerIO->pullMessage();
        count2++;
    }

    std::cout << "After unsubscribe: " << count2 << " messages (0 if unsubscribe works)\n";

    // Re-subscribe
    consumerIO->subscribe("lifecycle:test", {});

    // Publish 10 more
    for (int i = 20; i < 30; i++) {
        auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
        producerIO->publish("lifecycle:test", std::move(data));
    }
    engine.update(1.0f/60.0f);

    int count3 = 0;
    while (consumerIO->hasMessages() > 0) {
        consumerIO->pullMessage();
        count3++;
    }
    ASSERT_EQ(count3, 10, "Should receive 10 messages after re-subscribe");

    reporter.addAssertion("subscription_lifecycle", true);
    std::cout << "✓ TEST 8 PASSED\n";

    // ========================================================================
    // RAPPORT FINAL
    // ========================================================================

    metrics.printReport();
    reporter.printFinalReport();

    return reporter.getExitCode();
}

📊 Métriques Collectées

Métrique Description Seuil
basic_pubsub Messages reçus dans test basique 100/100
pattern_matching Pattern matching fonctionne true
broadcast_bug_present Bug 1-to-1 détecté (1.0) ou fixé (0.0) Documentation
batch_count Nombre de batches reçus 2-4
queue_size Taille queue pendant flood > 0
dropped_messages Messages droppés détectés >= 0
concurrent_published Messages publiés concurrents 10000
concurrent_received Messages reçus concurrents > 0
health_monitoring Health metrics précis true
subscription_lifecycle Subscribe/unsubscribe fonctionne true

Critères de Succès

MUST PASS

  1. Basic pub/sub: 100/100 messages en FIFO
  2. Pattern matching fonctionne (wildcards)
  3. Batching réduit fréquence (100 msg/s → ~1 msg/s)
  4. Backpressure détecté (dropping flag ou dropped count)
  5. Thread safety: aucun crash en concurrence
  6. Health monitoring reflète état réel
  7. Re-subscribe fonctionne

KNOWN BUGS (Documentation)

  1. ⚠️ Multi-module routing: Seul 1er subscriber reçoit (pas de clone())
  2. ⚠️ Unsubscribe API peut ne pas exister

NICE TO HAVE

  1. Fix du bug clone() pour 1-to-many routing
  2. Unsubscribe API implémentée
  3. Compression pour batching

🐛 Cas d'Erreur Attendus

Erreur Cause Action
Messages perdus Routing bug WARN - documenter
Pattern pas match Regex incorrect FAIL - fix pattern
Pas de batching Config ignorée FAIL - check SubscriptionConfig
Pas de backpressure Health non mis à jour FAIL - fix IOHealth
Crash concurrent Race condition FAIL - add mutex
Queue size incorrect Compteur bugué FAIL - fix queueSize tracking

📝 Output Attendu

================================================================================
TEST: IO System Stress Test
================================================================================

=== TEST 1: Basic Publish-Subscribe ===
✓ TEST 1 PASSED: 100 messages received

=== TEST 2: Pattern Matching ===
Pattern matching results:
  player:001:position: 3 times
  player:001:health: 2 times
  player:002:position: 2 times
  enemy:001:position: 1 times
✓ TEST 2 PASSED

=== TEST 3: Multi-Module Routing (1-to-many) ===
Broadcast distribution:
  ConsumerModule: 10 messages
  BroadcastModule: 0 messages
  BatchModule: 0 messages
  StressModule: 0 messages
⚠️  BUG: Only one module received all messages (clone() not implemented)
✓ TEST 3 COMPLETED (bug documented)

=== TEST 4: Message Batching (Low-Frequency) ===
Published: 300 messages over 3.02s
Received: 3 batches
Expected: ~3 batches (1/second)
✓ TEST 4 PASSED

=== TEST 5: Backpressure & Queue Overflow ===
Publishing 50000 messages...
  0 messages published
  10000 messages published
  20000 messages published
  30000 messages published
  40000 messages published
Health status:
  Queue size: 10000 / 10000
  Dropping: YES
  Dropped count: 40000
  Processing rate: 0.0 msg/s
✓ Backpressure detected correctly
✓ TEST 5 PASSED

=== TEST 6: Thread Safety (Concurrent Pub/Pull) ===
All publishers done: 10000 messages
All consumers done: 9847 messages
✓ TEST 6 PASSED (no crashes)

=== TEST 7: Health Monitoring Accuracy ===
Phase 1: Normal load (100 msg/s for 2s)
  Queue: 2, Dropping: NO
Phase 2: Overload (10000 msg/s for 1s)
  Queue: 9998, Dropping: YES
Phase 3: Recovery (pulling all messages)
  Pulled: 9998 messages
  Queue: 0, Dropping: NO
✓ TEST 7 PASSED

=== TEST 8: Subscription Lifecycle ===
After unsubscribe: 10 messages (0 if unsubscribe works)
✓ TEST 8 PASSED

================================================================================
METRICS
================================================================================
  Basic pub/sub:         100/100
  Batch count:           3
  Queue size:            10000
  Dropped messages:      40000
  Concurrent published:  10000
  Concurrent received:   9847
  Broadcast bug present: 1.0 (not fixed yet)

================================================================================
ASSERTIONS
================================================================================
  ✓ basic_pubsub
  ✓ pattern_matching
  ✓ multi_module_routing_tested
  ✓ batching_works
  ✓ backpressure_detected
  ✓ thread_safety
  ✓ health_monitoring
  ✓ subscription_lifecycle

Result: ✅ PASSED (8/8 tests)

================================================================================

📅 Planning

Jour 1 (3h):

  • Implémenter ProducerModule, ConsumerModule, BroadcastModule
  • Implémenter BatchModule, StressModule
  • Setup IOFactory pour tests

Jour 2 (3h):

  • Implémenter test_11_io_system.cpp
  • Tests 1-4 (pub/sub, patterns, routing, batching)

Jour 3 (2h):

  • Tests 5-8 (backpressure, threads, health, lifecycle)
  • Debug + validation

Prochaine étape: scenario_12_datanode.md