- Create new docs/plans/ directory with organized structure - Add comprehensive PLAN_deadlock_detection_prevention.md (15h plan) - ThreadSanitizer integration (2h) - Helgrind validation (3h) - std::scoped_lock refactoring (4h) - std::shared_mutex optimization (6h) - Migrate 16 plans from planTI/ to docs/plans/ - Rename all files to PLAN_*.md convention - Update README.md with index and statuses - Remove old planTI/ directory - Add run_all_tests.sh script for test automation Plans now include: - 1 active development plan (deadlock prevention) - 3 test architecture plans - 13 integration test scenario plans 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
781 lines
27 KiB
Markdown
781 lines
27 KiB
Markdown
# Scénario 11: IO System Stress Test
|
|
|
|
**Priorité**: ⭐⭐ SHOULD HAVE
|
|
**Phase**: 2 (SHOULD HAVE)
|
|
**Durée estimée**: ~5 minutes
|
|
**Effort implémentation**: ~4-6 heures
|
|
|
|
---
|
|
|
|
## 🎯 Objectif
|
|
|
|
Valider que le système IntraIO (pub/sub intra-process) fonctionne correctement dans tous les cas d'usage:
|
|
- Pattern matching avec wildcards et regex
|
|
- Multi-module routing (1-to-1, 1-to-many)
|
|
- Message batching et flushing (low-frequency subscriptions)
|
|
- Backpressure et queue overflow
|
|
- Thread safety (concurrent publish/pull)
|
|
- Health monitoring et métriques
|
|
- Subscription lifecycle
|
|
|
|
**Bug connu à valider**: IntraIOManager ne route qu'au premier subscriber (limitation std::move sans clone)
|
|
|
|
---
|
|
|
|
## 📋 Description
|
|
|
|
### Setup Initial
|
|
1. Créer 5 modules avec IntraIO:
|
|
- **ProducerModule** - Publie 1000 msg/s sur différents topics
|
|
- **ConsumerModule** - Souscrit à plusieurs patterns
|
|
- **BroadcastModule** - Publie sur topics avec multiples subscribers
|
|
- **BatchModule** - Utilise low-frequency subscriptions
|
|
- **StressModule** - Stress test avec 10k msg/s
|
|
|
|
2. Configurer IntraIOManager avec routage entre modules
|
|
|
|
3. Tester 8 scénarios différents sur 5 minutes
|
|
|
|
### Test Séquence
|
|
|
|
#### Test 1: Basic Publish-Subscribe (30s)
|
|
1. ProducerModule publie 100 messages sur "test:basic"
|
|
2. ConsumerModule souscrit à "test:basic"
|
|
3. Vérifier:
|
|
- 100 messages reçus
|
|
- Ordre FIFO préservé
|
|
- Aucun message perdu
|
|
|
|
#### Test 2: Pattern Matching (30s)
|
|
1. ProducerModule publie sur:
|
|
- "player:001:position"
|
|
- "player:001:health"
|
|
- "player:002:position"
|
|
- "enemy:001:position"
|
|
2. ConsumerModule souscrit aux patterns:
|
|
- "player:*" (devrait matcher 3 messages)
|
|
- "player:001:*" (devrait matcher 2 messages)
|
|
- "*:position" (devrait matcher 3 messages)
|
|
3. Vérifier matching counts corrects
|
|
|
|
#### Test 3: Multi-Module Routing (60s)
|
|
1. ProducerModule publie "broadcast:data" (100 messages)
|
|
2. ConsumerModule, BatchModule, StressModule souscrivent tous à "broadcast:*"
|
|
3. Vérifier:
|
|
- **Bug attendu**: Seul le premier subscriber reçoit (limitation clone)
|
|
- Logger quel module reçoit
|
|
- Documenter le bug pour fix futur
|
|
|
|
#### Test 4: Message Batching (60s)
|
|
1. BatchModule configure low-frequency subscription:
|
|
- Pattern: "batch:*"
|
|
- Interval: 1000ms
|
|
- replaceable: true
|
|
2. ProducerModule publie "batch:metric" à 100 Hz (toutes les 10ms)
|
|
3. Vérifier:
|
|
- BatchModule reçoit ~1 message/seconde (dernier seulement)
|
|
- Batching fonctionne correctement
|
|
|
|
#### Test 5: Backpressure & Queue Overflow (30s)
|
|
1. ProducerModule publie 50k messages sur "stress:flood"
|
|
2. ConsumerModule souscrit mais ne pull que 100 msg/s
|
|
3. Vérifier:
|
|
- Queue overflow détecté (health.dropping = true)
|
|
- Messages droppés comptés (health.droppedMessageCount > 0)
|
|
- Système reste stable (pas de crash)
|
|
|
|
#### Test 6: Thread Safety (60s)
|
|
1. Lancer 10 threads qui publient simultanément (1000 msg chacun)
|
|
2. Lancer 5 threads qui pullent simultanément
|
|
3. Vérifier:
|
|
- Aucun crash
|
|
- Aucune corruption de données
|
|
- Total messages reçus = total envoyés (ou moins si overflow)
|
|
|
|
#### Test 7: Health Monitoring (30s)
|
|
1. ProducerModule publie à différents débits:
|
|
- Phase 1: 100 msg/s (normal)
|
|
- Phase 2: 10k msg/s (overload)
|
|
- Phase 3: 100 msg/s (recovery)
|
|
2. Monitorer health metrics:
|
|
- queueSize augmente/diminue correctement
|
|
- averageProcessingRate reflète réalité
|
|
- dropping flag activé/désactivé au bon moment
|
|
|
|
#### Test 8: Subscription Lifecycle (30s)
|
|
1. Créer/détruire subscriptions dynamiquement
|
|
2. Vérifier:
|
|
- Messages après unsubscribe ne sont pas reçus
|
|
- Re-subscribe fonctionne
|
|
- Pas de leak de subscriptions dans IntraIOManager
|
|
|
|
---
|
|
|
|
## 🏗️ Implémentation
|
|
|
|
### ProducerModule Structure
|
|
|
|
```cpp
|
|
// ProducerModule.h
|
|
class ProducerModule : public IModule {
|
|
public:
|
|
void initialize(std::shared_ptr<IDataNode> config) override;
|
|
void process(float deltaTime) override;
|
|
std::shared_ptr<IDataNode> getState() const override;
|
|
void setState(std::shared_ptr<IDataNode> state) override;
|
|
bool isIdle() const override { return true; }
|
|
|
|
private:
|
|
std::shared_ptr<IIO> io;
|
|
int messageCount = 0;
|
|
float publishRate = 100.0f; // Hz
|
|
float accumulator = 0.0f;
|
|
|
|
void publishTestMessages();
|
|
};
|
|
```
|
|
|
|
### ConsumerModule Structure
|
|
|
|
```cpp
|
|
// ConsumerModule.h
|
|
class ConsumerModule : public IModule {
|
|
public:
|
|
void initialize(std::shared_ptr<IDataNode> config) override;
|
|
void process(float deltaTime) override;
|
|
std::shared_ptr<IDataNode> getState() const override;
|
|
void setState(std::shared_ptr<IDataNode> state) override;
|
|
bool isIdle() const override { return true; }
|
|
|
|
// Test helpers
|
|
int getReceivedCount() const { return receivedMessages.size(); }
|
|
const std::vector<IIO::Message>& getMessages() const { return receivedMessages; }
|
|
|
|
private:
|
|
std::shared_ptr<IIO> io;
|
|
std::vector<IIO::Message> receivedMessages;
|
|
|
|
void processIncomingMessages();
|
|
};
|
|
```
|
|
|
|
### Test Principal
|
|
|
|
```cpp
|
|
// test_11_io_system.cpp
|
|
#include "helpers/TestMetrics.h"
|
|
#include "helpers/TestAssertions.h"
|
|
#include "helpers/TestReporter.h"
|
|
#include <thread>
|
|
#include <atomic>
|
|
|
|
int main() {
|
|
TestReporter reporter("IO System Stress Test");
|
|
TestMetrics metrics;
|
|
|
|
// === SETUP ===
|
|
DebugEngine engine;
|
|
|
|
// Charger modules
|
|
engine.loadModule("ProducerModule", "build/modules/libProducerModule.so");
|
|
engine.loadModule("ConsumerModule", "build/modules/libConsumerModule.so");
|
|
engine.loadModule("BroadcastModule", "build/modules/libBroadcastModule.so");
|
|
engine.loadModule("BatchModule", "build/modules/libBatchModule.so");
|
|
engine.loadModule("StressModule", "build/modules/libStressModule.so");
|
|
|
|
// Initialiser avec IOFactory
|
|
auto config = createJsonConfig({
|
|
{"transport", "intra"},
|
|
{"instanceId", "test_engine"}
|
|
});
|
|
|
|
engine.initializeModule("ProducerModule", config);
|
|
engine.initializeModule("ConsumerModule", config);
|
|
engine.initializeModule("BroadcastModule", config);
|
|
engine.initializeModule("BatchModule", config);
|
|
engine.initializeModule("StressModule", config);
|
|
|
|
// ========================================================================
|
|
// TEST 1: Basic Publish-Subscribe
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 1: Basic Publish-Subscribe ===\n";
|
|
|
|
// ConsumerModule subscribe to "test:basic"
|
|
auto consumerIO = engine.getModuleIO("ConsumerModule");
|
|
consumerIO->subscribe("test:basic", {});
|
|
|
|
// ProducerModule publie 100 messages
|
|
auto producerIO = engine.getModuleIO("ProducerModule");
|
|
for (int i = 0; i < 100; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{
|
|
{"id", i},
|
|
{"payload", "test_message_" + std::to_string(i)}
|
|
});
|
|
producerIO->publish("test:basic", std::move(data));
|
|
}
|
|
|
|
// Process pour permettre routing
|
|
engine.update(1.0f/60.0f);
|
|
|
|
// Vérifier réception
|
|
int receivedCount = 0;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
auto msg = consumerIO->pullMessage();
|
|
receivedCount++;
|
|
|
|
// Vérifier ordre FIFO
|
|
auto* jsonData = dynamic_cast<JsonDataNode*>(msg.data.get());
|
|
int msgId = jsonData->getJsonData()["id"];
|
|
ASSERT_EQ(msgId, receivedCount - 1, "Messages should be in FIFO order");
|
|
}
|
|
|
|
ASSERT_EQ(receivedCount, 100, "Should receive all 100 messages");
|
|
reporter.addAssertion("basic_pubsub", receivedCount == 100);
|
|
std::cout << "✓ TEST 1 PASSED: " << receivedCount << " messages received\n";
|
|
|
|
// ========================================================================
|
|
// TEST 2: Pattern Matching
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 2: Pattern Matching ===\n";
|
|
|
|
// Subscribe to different patterns
|
|
consumerIO->subscribe("player:*", {});
|
|
consumerIO->subscribe("player:001:*", {});
|
|
consumerIO->subscribe("*:position", {});
|
|
|
|
// Publish test messages
|
|
std::vector<std::string> testTopics = {
|
|
"player:001:position", // Matches all 3 patterns
|
|
"player:001:health", // Matches pattern 1 and 2
|
|
"player:002:position", // Matches pattern 1 and 3
|
|
"enemy:001:position" // Matches pattern 3 only
|
|
};
|
|
|
|
for (const auto& topic : testTopics) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"topic", topic}});
|
|
producerIO->publish(topic, std::move(data));
|
|
}
|
|
|
|
engine.update(1.0f/60.0f);
|
|
|
|
// Count messages by pattern
|
|
std::map<std::string, int> patternCounts;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
auto msg = consumerIO->pullMessage();
|
|
auto* jsonData = dynamic_cast<JsonDataNode*>(msg.data.get());
|
|
std::string topic = jsonData->getJsonData()["topic"];
|
|
patternCounts[topic]++;
|
|
}
|
|
|
|
// Note: Due to pattern overlap, same message might be received multiple times
|
|
std::cout << "Pattern matching results:\n";
|
|
for (const auto& [topic, count] : patternCounts) {
|
|
std::cout << " " << topic << ": " << count << " times\n";
|
|
}
|
|
|
|
reporter.addAssertion("pattern_matching", true);
|
|
std::cout << "✓ TEST 2 PASSED\n";
|
|
|
|
// ========================================================================
|
|
// TEST 3: Multi-Module Routing (Bug Detection)
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 3: Multi-Module Routing (1-to-many) ===\n";
|
|
|
|
// All modules subscribe to "broadcast:*"
|
|
consumerIO->subscribe("broadcast:*", {});
|
|
auto broadcastIO = engine.getModuleIO("BroadcastModule");
|
|
broadcastIO->subscribe("broadcast:*", {});
|
|
auto batchIO = engine.getModuleIO("BatchModule");
|
|
batchIO->subscribe("broadcast:*", {});
|
|
auto stressIO = engine.getModuleIO("StressModule");
|
|
stressIO->subscribe("broadcast:*", {});
|
|
|
|
// Publish 10 broadcast messages
|
|
for (int i = 0; i < 10; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"broadcast_id", i}});
|
|
producerIO->publish("broadcast:data", std::move(data));
|
|
}
|
|
|
|
engine.update(1.0f/60.0f);
|
|
|
|
// Check which modules received messages
|
|
int consumerReceived = consumerIO->hasMessages();
|
|
int broadcastReceived = broadcastIO->hasMessages();
|
|
int batchReceived = batchIO->hasMessages();
|
|
int stressReceived = stressIO->hasMessages();
|
|
|
|
std::cout << "Broadcast distribution:\n";
|
|
std::cout << " ConsumerModule: " << consumerReceived << " messages\n";
|
|
std::cout << " BroadcastModule: " << broadcastReceived << " messages\n";
|
|
std::cout << " BatchModule: " << batchReceived << " messages\n";
|
|
std::cout << " StressModule: " << stressReceived << " messages\n";
|
|
|
|
// Expected: Only ONE module receives due to std::move limitation
|
|
int totalReceived = consumerReceived + broadcastReceived + batchReceived + stressReceived;
|
|
|
|
if (totalReceived == 10) {
|
|
std::cout << "⚠️ BUG: Only one module received all messages (clone() not implemented)\n";
|
|
reporter.addMetric("broadcast_bug_present", 1.0f);
|
|
} else if (totalReceived == 40) {
|
|
std::cout << "✓ FIXED: All modules received copies (clone() implemented!)\n";
|
|
reporter.addMetric("broadcast_bug_present", 0.0f);
|
|
}
|
|
|
|
reporter.addAssertion("multi_module_routing_tested", true);
|
|
std::cout << "✓ TEST 3 COMPLETED (bug documented)\n";
|
|
|
|
// ========================================================================
|
|
// TEST 4: Message Batching
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 4: Message Batching (Low-Frequency) ===\n";
|
|
|
|
// Configure low-freq subscription
|
|
IIO::SubscriptionConfig batchConfig;
|
|
batchConfig.replaceable = true;
|
|
batchConfig.batchInterval = 1000; // 1 second
|
|
batchIO->subscribeLowFreq("batch:*", batchConfig);
|
|
|
|
// Publish at 100 Hz for 3 seconds (300 messages)
|
|
auto batchStart = std::chrono::high_resolution_clock::now();
|
|
int batchedPublished = 0;
|
|
|
|
for (int sec = 0; sec < 3; sec++) {
|
|
for (int i = 0; i < 100; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{
|
|
{"timestamp", batchedPublished},
|
|
{"value", batchedPublished * 0.1f}
|
|
});
|
|
producerIO->publish("batch:metric", std::move(data));
|
|
batchedPublished++;
|
|
|
|
// Simulate 10ms interval
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
engine.update(1.0f/60.0f);
|
|
}
|
|
}
|
|
|
|
auto batchEnd = std::chrono::high_resolution_clock::now();
|
|
float batchDuration = std::chrono::duration<float>(batchEnd - batchStart).count();
|
|
|
|
// Check how many batched messages received
|
|
int batchesReceived = 0;
|
|
while (batchIO->hasMessages() > 0) {
|
|
auto msg = batchIO->pullMessage();
|
|
batchesReceived++;
|
|
}
|
|
|
|
std::cout << "Published: " << batchedPublished << " messages over " << batchDuration << "s\n";
|
|
std::cout << "Received: " << batchesReceived << " batches\n";
|
|
std::cout << "Expected: ~" << static_cast<int>(batchDuration) << " batches (1/second)\n";
|
|
|
|
// Should receive ~3 batches (1 per second)
|
|
ASSERT_TRUE(batchesReceived >= 2 && batchesReceived <= 4,
|
|
"Should receive 2-4 batches for 3 seconds");
|
|
reporter.addMetric("batch_count", batchesReceived);
|
|
reporter.addAssertion("batching_works", batchesReceived >= 2);
|
|
std::cout << "✓ TEST 4 PASSED\n";
|
|
|
|
// ========================================================================
|
|
// TEST 5: Backpressure & Queue Overflow
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 5: Backpressure & Queue Overflow ===\n";
|
|
|
|
// Subscribe but don't pull
|
|
consumerIO->subscribe("stress:flood", {});
|
|
|
|
// Flood with 50k messages
|
|
std::cout << "Publishing 50000 messages...\n";
|
|
for (int i = 0; i < 50000; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"flood_id", i}});
|
|
producerIO->publish("stress:flood", std::move(data));
|
|
|
|
if (i % 10000 == 0) {
|
|
std::cout << " " << i << " messages published\n";
|
|
}
|
|
}
|
|
|
|
engine.update(1.0f/60.0f);
|
|
|
|
// Check health
|
|
auto health = consumerIO->getHealth();
|
|
std::cout << "Health status:\n";
|
|
std::cout << " Queue size: " << health.queueSize << " / " << health.maxQueueSize << "\n";
|
|
std::cout << " Dropping: " << (health.dropping ? "YES" : "NO") << "\n";
|
|
std::cout << " Dropped count: " << health.droppedMessageCount << "\n";
|
|
std::cout << " Processing rate: " << health.averageProcessingRate << " msg/s\n";
|
|
|
|
ASSERT_TRUE(health.queueSize > 0, "Queue should have messages");
|
|
|
|
// Likely queue overflow happened
|
|
if (health.dropping || health.droppedMessageCount > 0) {
|
|
std::cout << "✓ Backpressure detected correctly\n";
|
|
reporter.addAssertion("backpressure_detected", true);
|
|
}
|
|
|
|
reporter.addMetric("queue_size", health.queueSize);
|
|
reporter.addMetric("dropped_messages", health.droppedMessageCount);
|
|
std::cout << "✓ TEST 5 PASSED\n";
|
|
|
|
// ========================================================================
|
|
// TEST 6: Thread Safety
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 6: Thread Safety (Concurrent Pub/Pull) ===\n";
|
|
|
|
std::atomic<int> publishedTotal{0};
|
|
std::atomic<int> receivedTotal{0};
|
|
std::atomic<bool> running{true};
|
|
|
|
consumerIO->subscribe("thread:*", {});
|
|
|
|
// 10 publisher threads
|
|
std::vector<std::thread> publishers;
|
|
for (int t = 0; t < 10; t++) {
|
|
publishers.emplace_back([&, t]() {
|
|
for (int i = 0; i < 1000; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{
|
|
{"thread", t},
|
|
{"id", i}
|
|
});
|
|
producerIO->publish("thread:test", std::move(data));
|
|
publishedTotal++;
|
|
}
|
|
});
|
|
}
|
|
|
|
// 5 consumer threads
|
|
std::vector<std::thread> consumers;
|
|
for (int t = 0; t < 5; t++) {
|
|
consumers.emplace_back([&]() {
|
|
while (running || consumerIO->hasMessages() > 0) {
|
|
if (consumerIO->hasMessages() > 0) {
|
|
try {
|
|
auto msg = consumerIO->pullMessage();
|
|
receivedTotal++;
|
|
} catch (...) {
|
|
std::cerr << "ERROR: Exception during pull\n";
|
|
}
|
|
}
|
|
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
|
}
|
|
});
|
|
}
|
|
|
|
// Wait for publishers
|
|
for (auto& t : publishers) {
|
|
t.join();
|
|
}
|
|
|
|
std::cout << "All publishers done: " << publishedTotal << " messages\n";
|
|
|
|
// Let consumers finish
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
|
running = false;
|
|
|
|
for (auto& t : consumers) {
|
|
t.join();
|
|
}
|
|
|
|
std::cout << "All consumers done: " << receivedTotal << " messages\n";
|
|
|
|
// May have drops, but should be stable
|
|
ASSERT_GT(receivedTotal, 0, "Should receive at least some messages");
|
|
reporter.addMetric("concurrent_published", publishedTotal);
|
|
reporter.addMetric("concurrent_received", receivedTotal);
|
|
reporter.addAssertion("thread_safety", true); // No crash = success
|
|
std::cout << "✓ TEST 6 PASSED (no crashes)\n";
|
|
|
|
// ========================================================================
|
|
// TEST 7: Health Monitoring Accuracy
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 7: Health Monitoring Accuracy ===\n";
|
|
|
|
consumerIO->subscribe("health:*", {});
|
|
|
|
// Phase 1: Normal load (100 msg/s)
|
|
std::cout << "Phase 1: Normal load (100 msg/s for 2s)\n";
|
|
for (int i = 0; i < 200; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"phase", 1}});
|
|
producerIO->publish("health:test", std::move(data));
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
|
// Pull to keep queue low
|
|
if (consumerIO->hasMessages() > 0) {
|
|
consumerIO->pullMessage();
|
|
}
|
|
}
|
|
|
|
auto healthPhase1 = consumerIO->getHealth();
|
|
std::cout << " Queue: " << healthPhase1.queueSize << ", Dropping: " << healthPhase1.dropping << "\n";
|
|
|
|
// Phase 2: Overload (10k msg/s without pulling)
|
|
std::cout << "Phase 2: Overload (10000 msg/s for 1s)\n";
|
|
for (int i = 0; i < 10000; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"phase", 2}});
|
|
producerIO->publish("health:test", std::move(data));
|
|
}
|
|
engine.update(1.0f/60.0f);
|
|
|
|
auto healthPhase2 = consumerIO->getHealth();
|
|
std::cout << " Queue: " << healthPhase2.queueSize << ", Dropping: " << healthPhase2.dropping << "\n";
|
|
|
|
ASSERT_GT(healthPhase2.queueSize, healthPhase1.queueSize,
|
|
"Queue should grow during overload");
|
|
|
|
// Phase 3: Recovery (pull all)
|
|
std::cout << "Phase 3: Recovery (pulling all messages)\n";
|
|
int pulled = 0;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
consumerIO->pullMessage();
|
|
pulled++;
|
|
}
|
|
|
|
auto healthPhase3 = consumerIO->getHealth();
|
|
std::cout << " Pulled: " << pulled << " messages\n";
|
|
std::cout << " Queue: " << healthPhase3.queueSize << ", Dropping: " << healthPhase3.dropping << "\n";
|
|
|
|
ASSERT_EQ(healthPhase3.queueSize, 0, "Queue should be empty after pulling all");
|
|
reporter.addAssertion("health_monitoring", true);
|
|
std::cout << "✓ TEST 7 PASSED\n";
|
|
|
|
// ========================================================================
|
|
// TEST 8: Subscription Lifecycle
|
|
// ========================================================================
|
|
std::cout << "\n=== TEST 8: Subscription Lifecycle ===\n";
|
|
|
|
// Subscribe
|
|
consumerIO->subscribe("lifecycle:test", {});
|
|
|
|
// Publish 10 messages
|
|
for (int i = 0; i < 10; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
|
|
producerIO->publish("lifecycle:test", std::move(data));
|
|
}
|
|
engine.update(1.0f/60.0f);
|
|
|
|
int count1 = 0;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
consumerIO->pullMessage();
|
|
count1++;
|
|
}
|
|
ASSERT_EQ(count1, 10, "Should receive 10 messages");
|
|
|
|
// Unsubscribe (if API exists - might not be implemented yet)
|
|
// consumerIO->unsubscribe("lifecycle:test");
|
|
|
|
// Publish 10 more
|
|
for (int i = 10; i < 20; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
|
|
producerIO->publish("lifecycle:test", std::move(data));
|
|
}
|
|
engine.update(1.0f/60.0f);
|
|
|
|
// If unsubscribe exists, should receive 0. If not, will receive 10.
|
|
int count2 = 0;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
consumerIO->pullMessage();
|
|
count2++;
|
|
}
|
|
|
|
std::cout << "After unsubscribe: " << count2 << " messages (0 if unsubscribe works)\n";
|
|
|
|
// Re-subscribe
|
|
consumerIO->subscribe("lifecycle:test", {});
|
|
|
|
// Publish 10 more
|
|
for (int i = 20; i < 30; i++) {
|
|
auto data = std::make_unique<JsonDataNode>(nlohmann::json{{"id", i}});
|
|
producerIO->publish("lifecycle:test", std::move(data));
|
|
}
|
|
engine.update(1.0f/60.0f);
|
|
|
|
int count3 = 0;
|
|
while (consumerIO->hasMessages() > 0) {
|
|
consumerIO->pullMessage();
|
|
count3++;
|
|
}
|
|
ASSERT_EQ(count3, 10, "Should receive 10 messages after re-subscribe");
|
|
|
|
reporter.addAssertion("subscription_lifecycle", true);
|
|
std::cout << "✓ TEST 8 PASSED\n";
|
|
|
|
// ========================================================================
|
|
// RAPPORT FINAL
|
|
// ========================================================================
|
|
|
|
metrics.printReport();
|
|
reporter.printFinalReport();
|
|
|
|
return reporter.getExitCode();
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 📊 Métriques Collectées
|
|
|
|
| Métrique | Description | Seuil |
|
|
|----------|-------------|-------|
|
|
| **basic_pubsub** | Messages reçus dans test basique | 100/100 |
|
|
| **pattern_matching** | Pattern matching fonctionne | true |
|
|
| **broadcast_bug_present** | Bug 1-to-1 détecté (1.0) ou fixé (0.0) | Documentation |
|
|
| **batch_count** | Nombre de batches reçus | 2-4 |
|
|
| **queue_size** | Taille queue pendant flood | > 0 |
|
|
| **dropped_messages** | Messages droppés détectés | >= 0 |
|
|
| **concurrent_published** | Messages publiés concurrents | 10000 |
|
|
| **concurrent_received** | Messages reçus concurrents | > 0 |
|
|
| **health_monitoring** | Health metrics précis | true |
|
|
| **subscription_lifecycle** | Subscribe/unsubscribe fonctionne | true |
|
|
|
|
---
|
|
|
|
## ✅ Critères de Succès
|
|
|
|
### MUST PASS
|
|
1. ✅ Basic pub/sub: 100/100 messages en FIFO
|
|
2. ✅ Pattern matching fonctionne (wildcards)
|
|
3. ✅ Batching réduit fréquence (100 msg/s → ~1 msg/s)
|
|
4. ✅ Backpressure détecté (dropping flag ou dropped count)
|
|
5. ✅ Thread safety: aucun crash en concurrence
|
|
6. ✅ Health monitoring reflète état réel
|
|
7. ✅ Re-subscribe fonctionne
|
|
|
|
### KNOWN BUGS (Documentation)
|
|
1. ⚠️ Multi-module routing: Seul 1er subscriber reçoit (pas de clone())
|
|
2. ⚠️ Unsubscribe API peut ne pas exister
|
|
|
|
### NICE TO HAVE
|
|
1. ✅ Fix du bug clone() pour 1-to-many routing
|
|
2. ✅ Unsubscribe API implémentée
|
|
3. ✅ Compression pour batching
|
|
|
|
---
|
|
|
|
## 🐛 Cas d'Erreur Attendus
|
|
|
|
| Erreur | Cause | Action |
|
|
|--------|-------|--------|
|
|
| Messages perdus | Routing bug | WARN - documenter |
|
|
| Pattern pas match | Regex incorrect | FAIL - fix pattern |
|
|
| Pas de batching | Config ignorée | FAIL - check SubscriptionConfig |
|
|
| Pas de backpressure | Health non mis à jour | FAIL - fix IOHealth |
|
|
| Crash concurrent | Race condition | FAIL - add mutex |
|
|
| Queue size incorrect | Compteur bugué | FAIL - fix queueSize tracking |
|
|
|
|
---
|
|
|
|
## 📝 Output Attendu
|
|
|
|
```
|
|
================================================================================
|
|
TEST: IO System Stress Test
|
|
================================================================================
|
|
|
|
=== TEST 1: Basic Publish-Subscribe ===
|
|
✓ TEST 1 PASSED: 100 messages received
|
|
|
|
=== TEST 2: Pattern Matching ===
|
|
Pattern matching results:
|
|
player:001:position: 3 times
|
|
player:001:health: 2 times
|
|
player:002:position: 2 times
|
|
enemy:001:position: 1 times
|
|
✓ TEST 2 PASSED
|
|
|
|
=== TEST 3: Multi-Module Routing (1-to-many) ===
|
|
Broadcast distribution:
|
|
ConsumerModule: 10 messages
|
|
BroadcastModule: 0 messages
|
|
BatchModule: 0 messages
|
|
StressModule: 0 messages
|
|
⚠️ BUG: Only one module received all messages (clone() not implemented)
|
|
✓ TEST 3 COMPLETED (bug documented)
|
|
|
|
=== TEST 4: Message Batching (Low-Frequency) ===
|
|
Published: 300 messages over 3.02s
|
|
Received: 3 batches
|
|
Expected: ~3 batches (1/second)
|
|
✓ TEST 4 PASSED
|
|
|
|
=== TEST 5: Backpressure & Queue Overflow ===
|
|
Publishing 50000 messages...
|
|
0 messages published
|
|
10000 messages published
|
|
20000 messages published
|
|
30000 messages published
|
|
40000 messages published
|
|
Health status:
|
|
Queue size: 10000 / 10000
|
|
Dropping: YES
|
|
Dropped count: 40000
|
|
Processing rate: 0.0 msg/s
|
|
✓ Backpressure detected correctly
|
|
✓ TEST 5 PASSED
|
|
|
|
=== TEST 6: Thread Safety (Concurrent Pub/Pull) ===
|
|
All publishers done: 10000 messages
|
|
All consumers done: 9847 messages
|
|
✓ TEST 6 PASSED (no crashes)
|
|
|
|
=== TEST 7: Health Monitoring Accuracy ===
|
|
Phase 1: Normal load (100 msg/s for 2s)
|
|
Queue: 2, Dropping: NO
|
|
Phase 2: Overload (10000 msg/s for 1s)
|
|
Queue: 9998, Dropping: YES
|
|
Phase 3: Recovery (pulling all messages)
|
|
Pulled: 9998 messages
|
|
Queue: 0, Dropping: NO
|
|
✓ TEST 7 PASSED
|
|
|
|
=== TEST 8: Subscription Lifecycle ===
|
|
After unsubscribe: 10 messages (0 if unsubscribe works)
|
|
✓ TEST 8 PASSED
|
|
|
|
================================================================================
|
|
METRICS
|
|
================================================================================
|
|
Basic pub/sub: 100/100
|
|
Batch count: 3
|
|
Queue size: 10000
|
|
Dropped messages: 40000
|
|
Concurrent published: 10000
|
|
Concurrent received: 9847
|
|
Broadcast bug present: 1.0 (not fixed yet)
|
|
|
|
================================================================================
|
|
ASSERTIONS
|
|
================================================================================
|
|
✓ basic_pubsub
|
|
✓ pattern_matching
|
|
✓ multi_module_routing_tested
|
|
✓ batching_works
|
|
✓ backpressure_detected
|
|
✓ thread_safety
|
|
✓ health_monitoring
|
|
✓ subscription_lifecycle
|
|
|
|
Result: ✅ PASSED (8/8 tests)
|
|
|
|
================================================================================
|
|
```
|
|
|
|
---
|
|
|
|
## 📅 Planning
|
|
|
|
**Jour 1 (3h):**
|
|
- Implémenter ProducerModule, ConsumerModule, BroadcastModule
|
|
- Implémenter BatchModule, StressModule
|
|
- Setup IOFactory pour tests
|
|
|
|
**Jour 2 (3h):**
|
|
- Implémenter test_11_io_system.cpp
|
|
- Tests 1-4 (pub/sub, patterns, routing, batching)
|
|
|
|
**Jour 3 (2h):**
|
|
- Tests 5-8 (backpressure, threads, health, lifecycle)
|
|
- Debug + validation
|
|
|
|
---
|
|
|
|
**Prochaine étape**: `scenario_12_datanode.md`
|