# ARCHITECTURE.md ```markdown # ARCHITECTURE.md - Feed Generator Technical Design --- ## SYSTEM OVERVIEW **Feed Generator** aggregates news content from web sources, enriches it with AI-generated image analysis, and produces articles via an existing Node.js API. ### High-Level Flow ``` Web Sources → Scraper → Image Analyzer → Aggregator → Node API Client → Publisher ↓ ↓ ↓ ↓ ↓ ↓ HTML NewsArticle AnalyzedArticle Prompt GeneratedArticle Feed/RSS ``` ### Design Goals 1. **Simplicity** - Clear, readable code over cleverness 2. **Modularity** - Each component has ONE responsibility 3. **Type Safety** - Full type coverage, mypy-compliant 4. **Testability** - Every module independently testable 5. **Prototype Speed** - Working system in 3-5 days 6. **Future-Proof** - Easy to migrate to Node.js later --- ## ARCHITECTURE PRINCIPLES ### 1. Pipeline Architecture **Linear data flow, no circular dependencies.** ``` Input → Transform → Transform → Transform → Output ``` Each stage: - Takes typed input - Performs ONE transformation - Returns typed output - Can fail explicitly ### 2. Dependency Injection **Configuration flows top-down, no global state.** ```python # Main orchestrator config = Config.from_env() scraper = NewsScraper(config.scraper) analyzer = ImageAnalyzer(config.api.openai_key) client = ArticleAPIClient(config.api.node_api_url) publisher = FeedPublisher(config.publisher) # Pass dependencies explicitly pipeline = Pipeline(scraper, analyzer, client, publisher) ``` ### 3. Explicit Error Boundaries **Each module defines its failure modes.** ```python # Module A raises ScrapingError # Module B catches and handles try: articles = scraper.scrape(url) except ScrapingError as e: logger.error(f"Scraping failed: {e}") # Decide: retry, skip, or fail ``` --- ## MODULE RESPONSIBILITIES ### 1. config.py - Configuration Management **Purpose**: Centralize all configuration, load from environment. **Responsibilities**: - Load configuration from `.env` file - Validate required settings - Provide immutable config objects - NO business logic **Data Structures**: ```python @dataclass(frozen=True) class APIConfig: openai_key: str node_api_url: str timeout_seconds: int @dataclass(frozen=True) class ScraperConfig: sources: List[str] max_articles: int timeout_seconds: int @dataclass(frozen=True) class Config: api: APIConfig scraper: ScraperConfig log_level: str ``` **Interface**: ```python def from_env() -> Config: """Load and validate configuration from environment.""" ``` --- ### 2. scraper.py - Web Scraping **Purpose**: Extract news articles from web sources. **Responsibilities**: - HTTP requests to news sites - HTML parsing with BeautifulSoup - Extract: title, content, image URLs - Handle site-specific quirks - NO image analysis, NO article generation **Data Structures**: ```python @dataclass class NewsArticle: title: str url: str content: str image_url: Optional[str] published_at: Optional[datetime] source: str ``` **Interface**: ```python class NewsScraper: def scrape(self, url: str) -> List[NewsArticle]: """Scrape articles from a news source.""" def scrape_all(self) -> List[NewsArticle]: """Scrape all configured sources.""" ``` **Error Handling**: - Raises `ScrapingError` on failure - Logs warnings for individual article failures - Returns partial results when possible --- ### 3. image_analyzer.py - AI Image Analysis **Purpose**: Generate descriptions of news images using GPT-4 Vision. **Responsibilities**: - Call OpenAI GPT-4 Vision API - Generate contextual image descriptions - Handle API rate limits and errors - NO scraping, NO article generation **Data Structures**: ```python @dataclass class ImageAnalysis: image_url: str description: str confidence: float # 0.0 to 1.0 analysis_time: datetime ``` **Interface**: ```python class ImageAnalyzer: def analyze(self, image_url: str, context: str) -> ImageAnalysis: """Analyze single image with context.""" def analyze_batch( self, articles: List[NewsArticle] ) -> Dict[str, ImageAnalysis]: """Analyze multiple images, return dict keyed by URL.""" ``` **Error Handling**: - Raises `ImageAnalysisError` on API failure - Returns None for individual failures in batch - Implements retry logic with exponential backoff --- ### 4. aggregator.py - Content Aggregation **Purpose**: Combine scraped content and image analysis into generation prompts. **Responsibilities**: - Merge NewsArticle + ImageAnalysis - Format prompts for article generation API - Apply business logic (e.g., skip low-confidence images) - NO external API calls **Data Structures**: ```python @dataclass class AggregatedContent: news: NewsArticle image_analysis: Optional[ImageAnalysis] def to_generation_prompt(self) -> Dict[str, str]: """Convert to format expected by Node API.""" return { "topic": self.news.title, "context": self.news.content, "image_description": self.image_analysis.description if self.image_analysis else None } ``` **Interface**: ```python class ContentAggregator: def aggregate( self, articles: List[NewsArticle], analyses: Dict[str, ImageAnalysis] ) -> List[AggregatedContent]: """Combine scraped and analyzed content.""" ``` **Business Rules**: - Skip articles without images if image required - Skip low-confidence image analyses (< 0.5) - Limit prompt length to API constraints --- ### 5. article_client.py - Node API Client **Purpose**: Call existing Node.js article generation API. **Responsibilities**: - HTTP POST to Node.js server - Request/response serialization - Retry logic for transient failures - NO content processing, NO publishing **Data Structures**: ```python @dataclass class GeneratedArticle: original_news: NewsArticle generated_content: str metadata: Dict[str, Any] generation_time: datetime ``` **Interface**: ```python class ArticleAPIClient: def generate(self, prompt: Dict[str, str]) -> GeneratedArticle: """Generate single article.""" def generate_batch( self, prompts: List[Dict[str, str]] ) -> List[GeneratedArticle]: """Generate multiple articles with rate limiting.""" ``` **Error Handling**: - Raises `APIClientError` on failure - Implements exponential backoff retry - Respects API rate limits --- ### 6. publisher.py - Feed Publishing **Purpose**: Publish generated articles to output channels. **Responsibilities**: - Generate RSS/Atom feeds - Post to WordPress (if configured) - Write to local files - NO content generation, NO scraping **Interface**: ```python class FeedPublisher: def publish_rss(self, articles: List[GeneratedArticle], path: Path) -> None: """Generate RSS feed file.""" def publish_wordpress(self, articles: List[GeneratedArticle]) -> None: """Post to WordPress via XML-RPC or REST API.""" def publish_json(self, articles: List[GeneratedArticle], path: Path) -> None: """Write articles as JSON for debugging.""" ``` **Output Formats**: - RSS 2.0 feed - WordPress posts - JSON archive --- ## DATA FLOW DETAIL ### Complete Pipeline ```python def run_pipeline(config: Config) -> None: """Execute complete feed generation pipeline.""" # 1. Initialize components scraper = NewsScraper(config.scraper) analyzer = ImageAnalyzer(config.api.openai_key) aggregator = ContentAggregator() client = ArticleAPIClient(config.api.node_api_url) publisher = FeedPublisher(config.publisher) # 2. Scrape news sources logger.info("Scraping news sources...") articles: List[NewsArticle] = scraper.scrape_all() logger.info(f"Scraped {len(articles)} articles") # 3. Analyze images logger.info("Analyzing images...") analyses: Dict[str, ImageAnalysis] = analyzer.analyze_batch(articles) logger.info(f"Analyzed {len(analyses)} images") # 4. Aggregate content logger.info("Aggregating content...") aggregated: List[AggregatedContent] = aggregator.aggregate(articles, analyses) logger.info(f"Aggregated {len(aggregated)} items") # 5. Generate articles logger.info("Generating articles...") prompts = [item.to_generation_prompt() for item in aggregated] generated: List[GeneratedArticle] = client.generate_batch(prompts) logger.info(f"Generated {len(generated)} articles") # 6. Publish logger.info("Publishing...") publisher.publish_rss(generated, Path("output/feed.rss")) publisher.publish_json(generated, Path("output/articles.json")) logger.info("Pipeline complete!") ``` ### Error Handling in Pipeline ```python def run_pipeline_with_recovery(config: Config) -> None: """Pipeline with error recovery at each stage.""" try: # Stage 1: Scraping articles = scraper.scrape_all() if not articles: logger.warning("No articles scraped, exiting") return except ScrapingError as e: logger.error(f"Scraping failed: {e}") return # Cannot proceed without articles try: # Stage 2: Image Analysis (optional) analyses = analyzer.analyze_batch(articles) except ImageAnalysisError as e: logger.warning(f"Image analysis failed: {e}, proceeding without images") analyses = {} # Continue without image descriptions # Stage 3: Aggregation (cannot fail with valid inputs) aggregated = aggregator.aggregate(articles, analyses) try: # Stage 4: Generation prompts = [item.to_generation_prompt() for item in aggregated] generated = client.generate_batch(prompts) if not generated: logger.error("No articles generated, exiting") return except APIClientError as e: logger.error(f"Article generation failed: {e}") return # Cannot publish without generated articles try: # Stage 5: Publishing publisher.publish_rss(generated, Path("output/feed.rss")) publisher.publish_json(generated, Path("output/articles.json")) except PublishingError as e: logger.error(f"Publishing failed: {e}") # Save to backup location publisher.publish_json(generated, Path("backup/articles.json")) ``` --- ## INTERFACE CONTRACTS ### Module Input/Output Types ```python # scraper.py Input: str (URL) Output: List[NewsArticle] Errors: ScrapingError # image_analyzer.py Input: List[NewsArticle] Output: Dict[str, ImageAnalysis] # Keyed by image_url Errors: ImageAnalysisError # aggregator.py Input: List[NewsArticle], Dict[str, ImageAnalysis] Output: List[AggregatedContent] Errors: None (pure transformation) # article_client.py Input: List[Dict[str, str]] # Prompts Output: List[GeneratedArticle] Errors: APIClientError # publisher.py Input: List[GeneratedArticle] Output: None (side effects: files, API calls) Errors: PublishingError ``` ### Type Safety Guarantees All interfaces use: - **Immutable dataclasses** for data structures - **Explicit Optional** for nullable values - **Specific exceptions** for error cases - **Type hints** on all function signatures ```python # Example: Type-safe interface def process_article( article: NewsArticle, # Required analysis: Optional[ImageAnalysis] # Nullable ) -> Result[GeneratedArticle, ProcessingError]: # Explicit result type """Type signature guarantees correctness.""" ``` --- ## CONFIGURATION STRATEGY ### Environment Variables ```bash # Required OPENAI_API_KEY=sk-... NODE_API_URL=http://localhost:3000 NEWS_SOURCES=https://example.com/news,https://other.com/feed # Optional LOG_LEVEL=INFO MAX_ARTICLES=10 SCRAPER_TIMEOUT=10 API_TIMEOUT=30 ``` ### Configuration Hierarchy ``` Default Values → Environment Variables → CLI Arguments (future) ↓ ↓ ↓ config.py .env file argparse ``` ### Configuration Validation ```python @classmethod def from_env(cls) -> Config: """Load with validation.""" # Required fields openai_key = os.getenv("OPENAI_API_KEY") if not openai_key: raise ValueError("OPENAI_API_KEY required") # Validated parsing node_api_url = os.getenv("NODE_API_URL", "http://localhost:3000") if not node_api_url.startswith(('http://', 'https://')): raise ValueError(f"Invalid NODE_API_URL: {node_api_url}") # List parsing sources_str = os.getenv("NEWS_SOURCES", "") sources = [s.strip() for s in sources_str.split(",") if s.strip()] if not sources: raise ValueError("NEWS_SOURCES required (comma-separated URLs)") return cls(...) ``` --- ## ERROR HANDLING ARCHITECTURE ### Exception Hierarchy ```python class FeedGeneratorError(Exception): """Base exception - catch-all for system errors.""" pass class ScrapingError(FeedGeneratorError): """Web scraping failed.""" pass class ImageAnalysisError(FeedGeneratorError): """GPT-4 Vision analysis failed.""" pass class APIClientError(FeedGeneratorError): """Node.js API communication failed.""" pass class PublishingError(FeedGeneratorError): """Feed publishing failed.""" pass ``` ### Retry Strategy ```python class RetryConfig: """Configuration for retry behavior.""" max_attempts: int = 3 initial_delay: float = 1.0 # seconds backoff_factor: float = 2.0 max_delay: float = 60.0 def with_retry(config: RetryConfig): """Decorator for retryable operations.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): for attempt in range(config.max_attempts): try: return func(*args, **kwargs) except Exception as e: if attempt == config.max_attempts - 1: raise delay = min( config.initial_delay * (config.backoff_factor ** attempt), config.max_delay ) logger.warning(f"Retry {attempt+1}/{config.max_attempts} after {delay}s") time.sleep(delay) return wrapper return decorator ``` ### Partial Failure Handling ```python def scrape_all(self) -> List[NewsArticle]: """Scrape all sources, continue on individual failures.""" all_articles = [] for source in self._config.sources: try: articles = self._scrape_source(source) all_articles.extend(articles) logger.info(f"Scraped {len(articles)} from {source}") except ScrapingError as e: logger.warning(f"Failed to scrape {source}: {e}") # Continue with other sources continue return all_articles ``` --- ## TESTING STRATEGY ### Test Pyramid ``` E2E Tests (1-2) / \ Integration (5-10) / \ Unit Tests (20-30) ``` ### Unit Test Coverage Each module has: - **Happy path tests** - Normal operation - **Error condition tests** - Each exception type - **Edge case tests** - Empty inputs, null values, limits - **Mock external dependencies** - No real HTTP calls ```python # Example: scraper_test.py def test_scrape_success(): """Test successful scraping.""" # Mock HTTP response # Assert correct NewsArticle returned def test_scrape_timeout(): """Test timeout handling.""" # Mock timeout exception # Assert ScrapingError raised def test_scrape_invalid_html(): """Test malformed HTML handling.""" # Mock invalid response # Assert error or empty result ``` ### Integration Test Coverage Test module interactions: - Scraper → Aggregator - Analyzer → Aggregator - Aggregator → API Client - End-to-end pipeline ```python def test_pipeline_integration(): """Test complete pipeline with mocked external services.""" config = Config.from_dict(test_config) with mock_http_responses(): with mock_openai_api(): with mock_node_api(): result = run_pipeline(config) assert len(result) > 0 assert all(isinstance(a, GeneratedArticle) for a in result) ``` ### Test Data Strategy ``` tests/ ├── fixtures/ │ ├── sample_news.html # Mock HTML responses │ ├── sample_api_response.json │ └── sample_images.json └── mocks/ ├── mock_scraper.py ├── mock_analyzer.py └── mock_client.py ``` --- ## PERFORMANCE CONSIDERATIONS ### Current Targets (V1 Prototype) - Scraping: 5-10 articles/source in < 30s - Image analysis: < 5s per image (GPT-4V API latency) - Article generation: < 10s per article (Node API latency) - Total pipeline: < 5 minutes for 50 articles ### Bottlenecks Identified 1. **Sequential API calls** - GPT-4V and Node API 2. **Network latency** - HTTP requests 3. **No caching** - Repeated scraping of same sources ### Future Optimizations (V2+) ```python # Parallel image analysis async def analyze_batch_parallel( self, articles: List[NewsArticle] ) -> Dict[str, ImageAnalysis]: """Analyze images in parallel.""" tasks = [self._analyze_async(a.image_url) for a in articles] results = await asyncio.gather(*tasks, return_exceptions=True) return {url: result for url, result in zip(urls, results) if not isinstance(result, Exception)} ``` ### Caching Strategy (Future) ```python @dataclass class CacheConfig: scraper_ttl: int = 3600 # 1 hour analysis_ttl: int = 86400 # 24 hours # Redis or simple file-based cache cache = Cache(config.cache) def scrape_with_cache(self, url: str) -> List[NewsArticle]: """Scrape with TTL-based caching.""" cached = cache.get(f"scrape:{url}") if cached and not cache.is_expired(cached): return cached.data fresh = self._scrape_source(url) cache.set(f"scrape:{url}", fresh, ttl=self._config.cache.scraper_ttl) return fresh ``` --- ## EXTENSIBILITY POINTS ### Adding New News Sources ```python # 1. Add source-specific parser class BBCParser(NewsParser): """Parser for BBC News.""" def parse(self, html: str) -> List[NewsArticle]: """Extract articles from BBC HTML.""" soup = BeautifulSoup(html, 'html.parser') # BBC-specific extraction logic return articles # 2. Register parser scraper.register_parser("bbc.com", BBCParser()) # 3. Add to configuration NEWS_SOURCES=...,https://bbc.com/news ``` ### Adding Output Formats ```python # 1. Implement publisher interface class JSONPublisher(Publisher): """Publish articles as JSON.""" def publish(self, articles: List[GeneratedArticle]) -> None: """Write to JSON file.""" with open(self._path, 'w') as f: json.dump([a.to_dict() for a in articles], f, indent=2) # 2. Use in pipeline publisher = JSONPublisher(Path("output/feed.json")) publisher.publish(generated_articles) ``` ### Custom Processing Steps ```python # 1. Implement processor interface class SEOOptimizer(Processor): """Add SEO metadata to articles.""" def process(self, article: GeneratedArticle) -> GeneratedArticle: """Enhance with SEO tags.""" optimized = article.copy() optimized.metadata['keywords'] = extract_keywords(article.content) optimized.metadata['description'] = generate_meta_description(article.content) return optimized # 2. Add to pipeline pipeline.add_processor(SEOOptimizer()) ``` --- ## MIGRATION PATH TO NODE.JS ### Why Migrate Later? This Python prototype will eventually be rewritten in Node.js/TypeScript because: 1. **Consistency** - Same stack as article generation API 2. **Maintainability** - One language for entire system 3. **Type safety** - TypeScript strict mode 4. **Integration** - Direct module imports instead of HTTP ### What to Preserve When migrating: - ✅ Module structure (same responsibilities) - ✅ Interface contracts (same types) - ✅ Configuration format (same env vars) - ✅ Error handling strategy (same exceptions) - ✅ Test coverage (same test cases) ### Migration Strategy ```typescript // 1. Create TypeScript interfaces matching Python dataclasses interface NewsArticle { title: string; url: string; content: string; imageUrl?: string; } // 2. Port modules one-by-one class NewsScraper { async scrape(url: string): Promise { // Same logic as Python version } } // 3. Replace HTTP calls with direct imports import { generateArticle } from './article-generator'; // Instead of HTTP POST const article = await generateArticle(prompt); ``` ### Lessons to Apply From this Python prototype to Node.js: - ✅ Use TypeScript strict mode from day 1 - ✅ Define interfaces before implementation - ✅ Write tests alongside code - ✅ Use dependency injection - ✅ Explicit error types - ✅ No global state --- ## DEPLOYMENT CONSIDERATIONS ### Development Environment ```bash # Local development python -m venv venv source venv/bin/activate pip install -r requirements.txt cp .env.example .env # Edit .env with API keys python scripts/run.py ``` ### Production Deployment (Future) ```yaml # docker-compose.yml version: '3.8' services: feed-generator: build: . environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - NODE_API_URL=http://article-api:3000 volumes: - ./output:/app/output restart: unless-stopped article-api: image: node-article-generator:latest ports: - "3000:3000" ``` ### Scheduling ```bash # Cron job for periodic execution 0 */6 * * * cd /app/feed-generator && venv/bin/python scripts/run.py >> logs/cron.log 2>&1 ``` --- ## MONITORING & OBSERVABILITY ### Logging Levels ```python # DEBUG - Detailed execution flow logger.debug(f"Scraping URL: {url}") # INFO - Major pipeline stages logger.info(f"Scraped {len(articles)} articles") # WARNING - Recoverable errors logger.warning(f"Failed to scrape {source}, continuing") # ERROR - Unrecoverable errors logger.error(f"Pipeline failed: {e}", exc_info=True) ``` ### Metrics to Track ```python @dataclass class PipelineMetrics: """Metrics for pipeline execution.""" start_time: datetime end_time: datetime articles_scraped: int images_analyzed: int articles_generated: int articles_published: int errors: List[str] def duration(self) -> float: """Pipeline duration in seconds.""" return (self.end_time - self.start_time).total_seconds() def success_rate(self) -> float: """Percentage of articles successfully processed.""" if self.articles_scraped == 0: return 0.0 return (self.articles_published / self.articles_scraped) * 100 ``` ### Health Checks ```python def health_check() -> Dict[str, Any]: """Check system health.""" return { "status": "healthy", "checks": { "openai_api": check_openai_connection(), "node_api": check_node_api_connection(), "disk_space": check_disk_space(), }, "last_run": get_last_run_metrics(), } ``` --- ## SECURITY CONSIDERATIONS ### API Key Management ```python # ❌ NEVER commit API keys OPENAI_API_KEY = "sk-..." # FORBIDDEN # ✅ Use environment variables api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise ValueError("OPENAI_API_KEY environment variable required") ``` ### Input Validation ```python def validate_url(url: str) -> bool: """Validate URL is safe to scrape.""" parsed = urlparse(url) # Must be HTTP/HTTPS if parsed.scheme not in ('http', 'https'): return False # No localhost or private IPs if parsed.hostname in ('localhost', '127.0.0.1'): return False return True ``` ### Rate Limiting ```python class RateLimiter: """Simple rate limiter for API calls.""" def __init__(self, calls_per_minute: int) -> None: self._calls_per_minute = calls_per_minute self._calls: List[datetime] = [] def wait_if_needed(self) -> None: """Block if rate limit would be exceeded.""" now = datetime.now() minute_ago = now - timedelta(minutes=1) # Remove old calls self._calls = [c for c in self._calls if c > minute_ago] if len(self._calls) >= self._calls_per_minute: sleep_time = (self._calls[0] - minute_ago).total_seconds() time.sleep(sleep_time) self._calls.append(now) ``` --- ## KNOWN LIMITATIONS (V1) ### Scraping Limitations - **Static HTML only** - No JavaScript rendering - **No anti-bot bypass** - May be blocked by Cloudflare/etc - **No authentication** - Cannot access paywalled content - **Site-specific parsing** - Breaks if HTML structure changes ### Analysis Limitations - **Cost** - GPT-4V API is expensive at scale - **Latency** - 3-5s per image analysis - **Rate limits** - OpenAI API quotas - **No caching** - Re-analyzes same images ### Generation Limitations - **Dependent on Node API** - Single point of failure - **No fallback** - If API down, pipeline fails - **Sequential processing** - One article at a time ### Publishing Limitations - **Local files only** - No cloud storage - **No WordPress integration** - RSS only - **No scheduling** - Manual execution --- ## FUTURE ENHANCEMENTS (Post-V1) ### Phase 2: Robustness - [ ] Playwright for JavaScript-rendered sites - [ ] Retry logic with exponential backoff - [ ] Persistent queue for failed items - [ ] Health monitoring dashboard ### Phase 3: Performance - [ ] Async/parallel processing - [ ] Redis caching layer - [ ] Connection pooling - [ ] Batch API requests ### Phase 4: Features - [ ] WordPress integration - [ ] Multiple output formats - [ ] Content filtering rules - [ ] A/B testing for prompts ### Phase 5: Migration to Node.js - [ ] Rewrite in TypeScript - [ ] Direct integration with article generator - [ ] Shared types/interfaces - [ ] Unified deployment --- ## DECISION LOG ### Why Python for V1? **Decision**: Use Python instead of Node.js **Rationale**: - Better scraping libraries (BeautifulSoup, requests) - Simpler OpenAI SDK - Faster prototyping - Can be rewritten later ### Why Not Async from Start? **Decision**: Synchronous code for V1 **Rationale**: - Simpler to understand and debug - Performance not critical for prototype - Can add async in V2 ### Why Dataclasses over Dicts? **Decision**: Use typed dataclasses everywhere **Rationale**: - Type safety catches bugs early - Better IDE support - Self-documenting code - Easy to validate ### Why No Database? **Decision**: File-based storage for V1 **Rationale**: - Simpler deployment - No database management - Sufficient for prototype - Can add later if needed --- End of ARCHITECTURE.md