This is page 2 of 2. Use http://codebase.md/king-of-the-grackles/reddit-mcp-poc?page={x} to view the full context. # Directory Structure ``` ├── .env.sample ├── .gemini │ └── settings.json ├── .gitignore ├── .python-version ├── .specify │ ├── memory │ │ └── constitution.md │ ├── scripts │ │ └── bash │ │ ├── check-implementation-prerequisites.sh │ │ ├── check-task-prerequisites.sh │ │ ├── common.sh │ │ ├── create-new-feature.sh │ │ ├── get-feature-paths.sh │ │ ├── setup-plan.sh │ │ └── update-agent-context.sh │ └── templates │ ├── agent-file-template.md │ ├── plan-template.md │ ├── spec-template.md │ └── tasks-template.md ├── package.json ├── pyproject.toml ├── README.md ├── reddit-research-agent.md ├── reports │ ├── ai-llm-weekly-trends-reddit-analysis-2025-01-20.md │ ├── saas-solopreneur-reddit-communities.md │ ├── top-50-active-AI-subreddits.md │ ├── top-50-subreddits-saas-ai-builders.md │ └── top-50-subreddits-saas-solopreneurs.md ├── server.json ├── specs │ ├── 003-fastmcp-context-integration.md │ ├── 003-implementation-summary.md │ ├── 003-phase-1-context-integration.md │ ├── 003-phase-2-progress-monitoring.md │ ├── agent-reasoning-visibility.md │ ├── agentic-discovery-architecture.md │ ├── chroma-proxy-architecture.md │ ├── deep-research-reddit-architecture.md │ └── reddit-research-agent-spec.md ├── src │ ├── __init__.py │ ├── chroma_client.py │ ├── config.py │ ├── models.py │ ├── resources.py │ ├── server.py │ └── tools │ ├── __init__.py │ ├── comments.py │ ├── discover.py │ ├── posts.py │ └── search.py ├── tests │ ├── test_context_integration.py │ └── test_tools.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /tests/test_context_integration.py: -------------------------------------------------------------------------------- ```python """ Integration tests for Context parameter acceptance in Phase 1. This test suite verifies that all tool and operation functions accept the Context parameter as required by FastMCP's Context API. Phase 1 only validates parameter acceptance - actual context usage will be tested in Phase 2+. """ import pytest import sys import os from unittest.mock import Mock, MagicMock, AsyncMock from fastmcp import Context # Add project root to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from src.tools.discover import discover_subreddits, validate_subreddit from src.tools.search import search_in_subreddit from src.tools.posts import fetch_subreddit_posts, fetch_multiple_subreddits from src.tools.comments import fetch_submission_with_comments @pytest.fixture def mock_context(): """Create a mock Context object for testing.""" return Mock(spec=Context) @pytest.fixture def mock_reddit(): """Create a mock Reddit client.""" return Mock() @pytest.fixture def mock_chroma(): """Mock ChromaDB client and collection.""" with Mock() as mock_client: mock_collection = Mock() mock_collection.query.return_value = { 'metadatas': [[ {'name': 'test', 'subscribers': 1000, 'url': 'https://reddit.com/r/test', 'nsfw': False} ]], 'distances': [[0.5]] } yield mock_client, mock_collection class TestDiscoverOperations: """Test discover_subreddits accepts context.""" async def test_discover_accepts_context(self, mock_context, monkeypatch): """Verify discover_subreddits accepts context parameter.""" # Mock the chroma client mock_client = Mock() mock_collection = Mock() mock_collection.query.return_value = { 'metadatas': [[ {'name': 'test', 'subscribers': 1000, 'url': 'https://reddit.com/r/test', 'nsfw': False} ]], 'distances': [[0.5]] } def mock_get_client(): return mock_client def mock_get_collection(name, client): return mock_collection monkeypatch.setattr('src.tools.discover.get_chroma_client', mock_get_client) monkeypatch.setattr('src.tools.discover.get_collection', mock_get_collection) # Call with context result = await discover_subreddits(query="test", limit=5, ctx=mock_context) # Verify result structure (not context usage - that's Phase 2) assert "subreddits" in result or "error" in result class TestSearchOperations: """Test search_in_subreddit accepts context.""" def test_search_accepts_context(self, mock_context, mock_reddit): """Verify search_in_subreddit accepts context parameter.""" mock_subreddit = Mock() mock_subreddit.display_name = "test" mock_subreddit.search.return_value = [] mock_reddit.subreddit.return_value = mock_subreddit result = search_in_subreddit( subreddit_name="test", query="test query", reddit=mock_reddit, limit=5, ctx=mock_context ) assert "results" in result or "error" in result class TestPostOperations: """Test post-fetching functions accept context.""" def test_fetch_posts_accepts_context(self, mock_context, mock_reddit): """Verify fetch_subreddit_posts accepts context parameter.""" mock_subreddit = Mock() mock_subreddit.display_name = "test" mock_subreddit.subscribers = 1000 mock_subreddit.public_description = "Test" mock_subreddit.hot.return_value = [] mock_reddit.subreddit.return_value = mock_subreddit result = fetch_subreddit_posts( subreddit_name="test", reddit=mock_reddit, limit=5, ctx=mock_context ) assert "posts" in result or "error" in result async def test_fetch_multiple_accepts_context(self, mock_context, mock_reddit): """Verify fetch_multiple_subreddits accepts context parameter.""" mock_multi = Mock() mock_multi.hot.return_value = [] mock_reddit.subreddit.return_value = mock_multi result = await fetch_multiple_subreddits( subreddit_names=["test1", "test2"], reddit=mock_reddit, limit_per_subreddit=5, ctx=mock_context ) assert "subreddits_requested" in result or "error" in result class TestCommentOperations: """Test comment-fetching functions accept context.""" async def test_fetch_comments_accepts_context(self, mock_context, mock_reddit): """Verify fetch_submission_with_comments accepts context parameter.""" mock_submission = Mock() mock_submission.id = "test123" mock_submission.title = "Test" mock_submission.author = Mock() mock_submission.author.__str__ = Mock(return_value="testuser") mock_submission.score = 100 mock_submission.upvote_ratio = 0.95 mock_submission.num_comments = 0 mock_submission.created_utc = 1234567890.0 mock_submission.url = "https://reddit.com/test" mock_submission.selftext = "" mock_submission.subreddit = Mock() mock_submission.subreddit.display_name = "test" # Mock comments mock_comments = Mock() mock_comments.__iter__ = Mock(return_value=iter([])) mock_comments.replace_more = Mock() mock_submission.comments = mock_comments mock_reddit.submission.return_value = mock_submission result = await fetch_submission_with_comments( reddit=mock_reddit, submission_id="test123", comment_limit=10, ctx=mock_context ) assert "submission" in result or "error" in result class TestHelperFunctions: """Test helper functions accept context.""" def test_validate_subreddit_accepts_context(self, mock_context, monkeypatch): """Verify validate_subreddit accepts context parameter.""" # Mock the chroma client mock_client = Mock() mock_collection = Mock() mock_collection.query.return_value = { 'metadatas': [[ {'name': 'test', 'subscribers': 1000, 'nsfw': False} ]], 'distances': [[0.5]] } def mock_get_client(): return mock_client def mock_get_collection(name, client): return mock_collection monkeypatch.setattr('src.tools.discover.get_chroma_client', mock_get_client) monkeypatch.setattr('src.tools.discover.get_collection', mock_get_collection) result = validate_subreddit("test", ctx=mock_context) assert "valid" in result or "error" in result class TestContextParameterPosition: """Test that context parameter works in various positions.""" def test_context_as_last_param(self, mock_context, mock_reddit): """Verify context works as the last parameter.""" mock_subreddit = Mock() mock_subreddit.display_name = "test" mock_subreddit.search.return_value = [] mock_reddit.subreddit.return_value = mock_subreddit # Context is last parameter result = search_in_subreddit( subreddit_name="test", query="test", reddit=mock_reddit, sort="relevance", time_filter="all", limit=10, ctx=mock_context ) assert result is not None def test_context_with_defaults(self, mock_context, mock_reddit): """Verify context works with default parameters.""" mock_subreddit = Mock() mock_subreddit.display_name = "test" mock_subreddit.search.return_value = [] mock_reddit.subreddit.return_value = mock_subreddit # Only required params + context result = search_in_subreddit( subreddit_name="test", query="test", reddit=mock_reddit, ctx=mock_context ) assert result is not None class TestDiscoverSubredditsProgress: """Test progress reporting in discover_subreddits.""" async def test_reports_progress_during_search(self, mock_context, monkeypatch): """Verify progress is reported during vector search.""" # Mock ChromaDB response with 3 results mock_client = Mock() mock_collection = Mock() mock_collection.query.return_value = { 'metadatas': [[ {'name': 'Python', 'subscribers': 1000000, 'nsfw': False}, {'name': 'learnpython', 'subscribers': 500000, 'nsfw': False}, {'name': 'pythontips', 'subscribers': 100000, 'nsfw': False} ]], 'distances': [[0.5, 0.7, 0.9]] } # Setup async mock for progress mock_context.report_progress = AsyncMock() def mock_get_client(): return mock_client def mock_get_collection(name, client): return mock_collection monkeypatch.setattr('src.tools.discover.get_chroma_client', mock_get_client) monkeypatch.setattr('src.tools.discover.get_collection', mock_get_collection) result = await discover_subreddits(query="python", ctx=mock_context) # Verify progress was reported at least 3 times (once per result) assert mock_context.report_progress.call_count >= 3 # Verify progress parameters first_call = mock_context.report_progress.call_args_list[0] # Check if arguments were passed as kwargs or positional args if first_call[1]: # kwargs assert 'progress' in first_call[1] assert 'total' in first_call[1] else: # positional assert len(first_call[0]) >= 2 class TestFetchMultipleProgress: """Test progress reporting in fetch_multiple_subreddits.""" async def test_reports_progress_per_subreddit(self, mock_context, mock_reddit): """Verify progress is reported once per subreddit.""" # Setup async mock for progress mock_context.report_progress = AsyncMock() # Mock submissions from 3 different subreddits mock_sub1 = Mock() mock_sub1.subreddit.display_name = "sub1" mock_sub1.id = "id1" mock_sub1.title = "Title 1" mock_sub1.author = Mock() mock_sub1.author.__str__ = Mock(return_value="user1") mock_sub1.score = 100 mock_sub1.num_comments = 10 mock_sub1.created_utc = 1234567890.0 mock_sub1.url = "https://reddit.com/test1" mock_sub1.permalink = "/r/sub1/comments/id1/" mock_sub2 = Mock() mock_sub2.subreddit.display_name = "sub2" mock_sub2.id = "id2" mock_sub2.title = "Title 2" mock_sub2.author = Mock() mock_sub2.author.__str__ = Mock(return_value="user2") mock_sub2.score = 200 mock_sub2.num_comments = 20 mock_sub2.created_utc = 1234567891.0 mock_sub2.url = "https://reddit.com/test2" mock_sub2.permalink = "/r/sub2/comments/id2/" mock_sub3 = Mock() mock_sub3.subreddit.display_name = "sub3" mock_sub3.id = "id3" mock_sub3.title = "Title 3" mock_sub3.author = Mock() mock_sub3.author.__str__ = Mock(return_value="user3") mock_sub3.score = 300 mock_sub3.num_comments = 30 mock_sub3.created_utc = 1234567892.0 mock_sub3.url = "https://reddit.com/test3" mock_sub3.permalink = "/r/sub3/comments/id3/" mock_multi = Mock() mock_multi.hot.return_value = [mock_sub1, mock_sub2, mock_sub3] mock_reddit.subreddit.return_value = mock_multi result = await fetch_multiple_subreddits( subreddit_names=["sub1", "sub2", "sub3"], reddit=mock_reddit, ctx=mock_context ) # Verify progress was reported at least 3 times (once per subreddit) assert mock_context.report_progress.call_count >= 3 class TestFetchCommentsProgress: """Test progress reporting in fetch_submission_with_comments.""" async def test_reports_progress_during_loading(self, mock_context, mock_reddit): """Verify progress is reported during comment loading.""" # Setup async mock for progress mock_context.report_progress = AsyncMock() # Mock submission mock_submission = Mock() mock_submission.id = "test123" mock_submission.title = "Test" mock_submission.author = Mock() mock_submission.author.__str__ = Mock(return_value="testuser") mock_submission.score = 100 mock_submission.upvote_ratio = 0.95 mock_submission.num_comments = 5 mock_submission.created_utc = 1234567890.0 mock_submission.url = "https://reddit.com/test" mock_submission.selftext = "" mock_submission.subreddit = Mock() mock_submission.subreddit.display_name = "test" # Mock 5 comments mock_comments_list = [] for i in range(5): mock_comment = Mock() mock_comment.id = f"comment{i}" mock_comment.body = f"Comment {i}" mock_comment.author = Mock() mock_comment.author.__str__ = Mock(return_value=f"user{i}") mock_comment.score = 10 * i mock_comment.created_utc = 1234567890.0 + i mock_comment.replies = [] mock_comments_list.append(mock_comment) mock_comments = Mock() mock_comments.__iter__ = Mock(return_value=iter(mock_comments_list)) mock_comments.replace_more = Mock() mock_submission.comments = mock_comments mock_reddit.submission.return_value = mock_submission result = await fetch_submission_with_comments( reddit=mock_reddit, submission_id="test123", comment_limit=10, ctx=mock_context ) # Verify progress was reported at least 6 times (5 comments + 1 completion) assert mock_context.report_progress.call_count >= 6 ``` -------------------------------------------------------------------------------- /specs/003-phase-2-progress-monitoring.md: -------------------------------------------------------------------------------- ```markdown # Phase 2: Progress Monitoring Implementation **Status:** Ready for Implementation **Created:** 2025-10-02 **Owner:** Engineering Team **Depends On:** Phase 1 (Context Integration) ✅ Complete ## Executive Summary This specification details Phase 2 of the FastMCP Context API integration: adding real-time progress reporting to long-running Reddit operations. With Phase 1 complete (all tools accept `Context`), this phase focuses on implementing `ctx.report_progress()` calls to provide visibility into multi-step operations. **Timeline:** 1-2 days **Effort:** Low (foundation already in place from Phase 1) ## Background ### Phase 1 Completion Summary Phase 1 successfully integrated the FastMCP `Context` parameter into all tool and operation functions: - ✅ All MCP tool functions accept `ctx: Context` - ✅ All operation functions accept and receive context - ✅ Helper functions updated with context forwarding - ✅ 15 tests passing (8 integration tests + 7 updated existing tests) **Current State:** Context is available but unused (commented as "Phase 1: Accept context but don't use it yet") ### Why Progress Monitoring? Reddit operations can be time-consuming: - **Vector search**: Searching thousands of subreddits and calculating confidence scores - **Multi-subreddit fetches**: Fetching posts from 5-10 communities sequentially - **Comment tree loading**: Parsing nested comment threads with hundreds of replies Progress monitoring provides: - Real-time feedback to users during long operations - Prevention of timeout errors by showing active progress - Better debugging visibility into operation performance - Enhanced user experience with progress indicators ## Goals 1. ✅ Report progress during vector search iterations (`discover_subreddits`) 2. ✅ Report progress per subreddit in batch fetches (`fetch_multiple_subreddits`) 3. ✅ Report progress during comment tree traversal (`fetch_submission_with_comments`) 4. ✅ Maintain all existing test coverage (15 tests must pass) 5. ✅ Follow FastMCP progress reporting patterns from official docs ## Non-Goals - Frontend progress UI (separate project) - Progress for single-subreddit fetches (too fast to matter) - Structured logging (Phase 3) - Enhanced error handling (Phase 4) ## Implementation Plan ### Operation 1: discover_subreddits Progress **File:** `src/tools/discover.py` **Function:** `_search_vector_db()` (lines 101-239) **Location:** Result processing loop (lines 137-188) #### Current Code Pattern ```python # Process results processed_results = [] nsfw_filtered = 0 for metadata, distance in zip( results['metadatas'][0], results['distances'][0] ): # Skip NSFW if not requested if metadata.get('nsfw', False) and not include_nsfw: nsfw_filtered += 1 continue # Calculate confidence score... # Apply penalties... # Determine match type... processed_results.append({...}) ``` #### Enhanced Implementation ```python # Process results processed_results = [] nsfw_filtered = 0 total_results = len(results['metadatas'][0]) for i, (metadata, distance) in enumerate(zip( results['metadatas'][0], results['distances'][0] )): # Report progress (async call required) if ctx: await ctx.report_progress( progress=i + 1, total=total_results, message=f"Analyzing r/{metadata.get('name', 'unknown')}" ) # Skip NSFW if not requested if metadata.get('nsfw', False) and not include_nsfw: nsfw_filtered += 1 continue # Calculate confidence score... # Apply penalties... # Determine match type... processed_results.append({...}) ``` #### Changes Required 1. **Make function async**: Change `def _search_vector_db(...)` → `async def _search_vector_db(...)` 2. **Make parent function async**: Change `def discover_subreddits(...)` → `async def discover_subreddits(...)` 3. **Add await to calls**: Update `discover_subreddits` to `await _search_vector_db(...)` 4. **Add progress in loop**: Insert `await ctx.report_progress(...)` before processing each result 5. **Calculate total**: Add `total_results = len(results['metadatas'][0])` before loop **Progress Events:** ~10-100 (depending on limit parameter) --- ### Operation 2: fetch_multiple_subreddits Progress **File:** `src/tools/posts.py` **Function:** `fetch_multiple_subreddits()` (lines 102-188) **Location:** Subreddit iteration loop (lines 153-172) #### Current Code Pattern ```python # Parse posts and group by subreddit posts_by_subreddit = {} for submission in submissions: subreddit_name = submission.subreddit.display_name if subreddit_name not in posts_by_subreddit: posts_by_subreddit[subreddit_name] = [] # Only add up to limit_per_subreddit posts per subreddit if len(posts_by_subreddit[subreddit_name]) < limit_per_subreddit: posts_by_subreddit[subreddit_name].append({...}) ``` #### Enhanced Implementation ```python # Parse posts and group by subreddit posts_by_subreddit = {} processed_subreddits = set() for i, submission in enumerate(submissions): subreddit_name = submission.subreddit.display_name # Report progress when encountering a new subreddit if subreddit_name not in processed_subreddits: processed_subreddits.add(subreddit_name) if ctx: await ctx.report_progress( progress=len(processed_subreddits), total=len(subreddit_names), message=f"Fetching r/{subreddit_name}" ) if subreddit_name not in posts_by_subreddit: posts_by_subreddit[subreddit_name] = [] # Only add up to limit_per_subreddit posts per subreddit if len(posts_by_subreddit[subreddit_name]) < limit_per_subreddit: posts_by_subreddit[subreddit_name].append({...}) ``` #### Changes Required 1. **Make function async**: Change `def fetch_multiple_subreddits(...)` → `async def fetch_multiple_subreddits(...)` 2. **Track processed subreddits**: Add `processed_subreddits = set()` before loop 3. **Add progress on new subreddit**: When a new subreddit is encountered, report progress 4. **Update server.py**: Add `await` when calling this function in `execute_operation()` **Progress Events:** 1-10 (one per unique subreddit found) --- ### Operation 3: fetch_submission_with_comments Progress **File:** `src/tools/comments.py` **Function:** `fetch_submission_with_comments()` (lines 47-147) **Location:** Comment parsing loop (lines 116-136) #### Current Code Pattern ```python # Parse comments comments = [] comment_count = 0 for top_level_comment in submission.comments: if hasattr(top_level_comment, 'id') and hasattr(top_level_comment, 'body'): if comment_count >= comment_limit: break if isinstance(top_level_comment, PrawComment): comments.append(parse_comment_tree(top_level_comment, ctx=ctx)) else: # Handle mock objects in tests comments.append(Comment(...)) # Count all comments including replies comment_count += 1 + count_replies(comments[-1]) ``` #### Enhanced Implementation ```python # Parse comments comments = [] comment_count = 0 for top_level_comment in submission.comments: if hasattr(top_level_comment, 'id') and hasattr(top_level_comment, 'body'): if comment_count >= comment_limit: break # Report progress before processing comment if ctx: await ctx.report_progress( progress=comment_count, total=comment_limit, message=f"Loading comments ({comment_count}/{comment_limit})" ) if isinstance(top_level_comment, PrawComment): comments.append(parse_comment_tree(top_level_comment, ctx=ctx)) else: # Handle mock objects in tests comments.append(Comment(...)) # Count all comments including replies comment_count += 1 + count_replies(comments[-1]) # Report final completion if ctx: await ctx.report_progress( progress=comment_count, total=comment_limit, message=f"Completed: {comment_count} comments loaded" ) ``` #### Changes Required 1. **Make function async**: Change `def fetch_submission_with_comments(...)` → `async def fetch_submission_with_comments(...)` 2. **Add progress in loop**: Insert `await ctx.report_progress(...)` before parsing each top-level comment 3. **Add completion progress**: Report final progress after loop completes 4. **Update server.py**: Add `await` when calling this function in `execute_operation()` **Progress Events:** ~5-100 (depending on comment_limit and tree depth) --- ## FastMCP Progress Patterns ### Basic Pattern (from FastMCP docs) ```python from fastmcp import FastMCP, Context @mcp.tool async def process_items(items: list[str], ctx: Context) -> dict: """Process a list of items with progress updates.""" total = len(items) results = [] for i, item in enumerate(items): # Report progress as we process each item await ctx.report_progress(progress=i, total=total) results.append(item.upper()) # Report 100% completion await ctx.report_progress(progress=total, total=total) return {"processed": len(results), "results": results} ``` ### Key Requirements 1. **Functions must be async** to use `await ctx.report_progress()` 2. **Progress parameter**: Current progress value (e.g., 5, 24, 0.75) 3. **Total parameter**: Optional total value (enables percentage calculation) 4. **Message parameter**: Optional descriptive message (not shown in examples above but supported) ### Best Practices - Report at regular intervals (every iteration for small loops) - Provide descriptive messages when possible - Report final completion (100%) - Don't spam - limit to reasonable frequency (5-10 events minimum) ## Testing Requirements ### Update Existing Tests **File:** `tests/test_context_integration.py` Add assertions to verify progress calls: ```python import pytest from unittest.mock import AsyncMock, MagicMock, patch class TestDiscoverSubredditsProgress: """Test progress reporting in discover_subreddits.""" @pytest.mark.asyncio async def test_reports_progress_during_search(self, mock_context): """Verify progress is reported during vector search.""" # Mock ChromaDB response with 3 results mock_collection = MagicMock() mock_collection.query.return_value = { 'metadatas': [[ {'name': 'Python', 'subscribers': 1000000, 'nsfw': False}, {'name': 'learnpython', 'subscribers': 500000, 'nsfw': False}, {'name': 'pythontips', 'subscribers': 100000, 'nsfw': False} ]], 'distances': [[0.5, 0.7, 0.9]] } # Setup async mock for progress mock_context.report_progress = AsyncMock() with patch('src.tools.discover.get_chroma_client'), \ patch('src.tools.discover.get_collection', return_value=mock_collection): result = await discover_subreddits(query="python", ctx=mock_context) # Verify progress was reported at least 3 times (once per result) assert mock_context.report_progress.call_count >= 3 # Verify progress parameters first_call = mock_context.report_progress.call_args_list[0] assert 'progress' in first_call[1] or len(first_call[0]) >= 1 assert 'total' in first_call[1] or len(first_call[0]) >= 2 ``` ### New Test Coverage Add similar tests for: - `test_fetch_multiple_subreddits_progress` - Verify progress per subreddit - `test_fetch_comments_progress` - Verify progress during comment loading ### Success Criteria - ✅ All existing 15 tests still pass - ✅ New progress assertion tests pass - ✅ Progress called at least 5 times per operation (varies by data) - ✅ No performance degradation (progress overhead <5%) ## Server.py Updates **File:** `src/server.py` **Functions:** Update calls to async operations ### Current Pattern ```python @mcp.tool def execute_operation( operation_id: str, parameters: dict, ctx: Context ) -> dict: """Execute a Reddit operation by ID.""" if operation_id == "discover_subreddits": return discover_subreddits(**params) ``` ### Updated Pattern ```python @mcp.tool async def execute_operation( operation_id: str, parameters: dict, ctx: Context ) -> dict: """Execute a Reddit operation by ID.""" if operation_id == "discover_subreddits": return await discover_subreddits(**params) ``` ### Changes Required 1. **Make execute_operation async**: `async def execute_operation(...)` 2. **Add await to async operations**: - `await discover_subreddits(**params)` - `await fetch_multiple_subreddits(**params)` - `await fetch_submission_with_comments(**params)` ## Implementation Checklist ### Code Changes - [ ] **src/tools/discover.py** - [ ] Make `discover_subreddits()` async - [ ] Make `_search_vector_db()` async - [ ] Add `await` to `_search_vector_db()` call - [ ] Add progress reporting in result processing loop - [ ] Calculate total before loop starts - [ ] **src/tools/posts.py** - [ ] Make `fetch_multiple_subreddits()` async - [ ] Add `processed_subreddits` tracking set - [ ] Add progress reporting when new subreddit encountered - [ ] **src/tools/comments.py** - [ ] Make `fetch_submission_with_comments()` async - [ ] Add progress reporting in comment parsing loop - [ ] Add final completion progress report - [ ] **src/server.py** - [ ] Make `execute_operation()` async - [ ] Add `await` to `discover_subreddits()` call - [ ] Add `await` to `fetch_multiple_subreddits()` call - [ ] Add `await` to `fetch_submission_with_comments()` call ### Testing - [ ] Update `tests/test_context_integration.py` - [ ] Add progress test for `discover_subreddits` - [ ] Add progress test for `fetch_multiple_subreddits` - [ ] Add progress test for `fetch_submission_with_comments` - [ ] Run full test suite: `pytest tests/` - [ ] All 15 existing tests pass - [ ] New progress tests pass - [ ] No regressions ### Validation - [ ] Manual testing with MCP Inspector or Claude Desktop - [ ] Verify progress events appear in client logs - [ ] Confirm no performance degradation - [ ] Check that messages are descriptive and useful ## File Summary ### Files to Modify (4 files) 1. `src/tools/discover.py` - Add progress to vector search 2. `src/tools/posts.py` - Add progress to batch fetches 3. `src/tools/comments.py` - Add progress to comment loading 4. `src/server.py` - Make execute_operation async + await calls ### Files to Update (1 file) 1. `tests/test_context_integration.py` - Add progress assertions ### Files Not Modified - `src/config.py` - No changes needed - `src/models.py` - No changes needed - `src/chroma_client.py` - No changes needed - `src/resources.py` - No changes needed - `tests/test_tools.py` - No changes needed (already passing) ## Success Criteria ### Functional Requirements - ✅ Progress events emitted during vector search (≥5 per search) - ✅ Progress events emitted during multi-subreddit fetch (1 per subreddit) - ✅ Progress events emitted during comment loading (≥5 per fetch) - ✅ Progress includes total when known - ✅ Progress messages are descriptive ### Technical Requirements - ✅ All functions properly async/await - ✅ All 15+ tests pass - ✅ No breaking changes to API - ✅ Type hints maintained - ✅ No performance degradation ### Quality Requirements - ✅ Progress messages are user-friendly - ✅ Progress updates at reasonable frequency (not spammy) - ✅ Code follows FastMCP patterns from official docs - ✅ Maintains consistency with Phase 1 implementation ## Estimated Effort **Total Time:** 1-2 days **Breakdown:** - Code implementation: 3-4 hours - Testing updates: 2-3 hours - Manual validation: 1 hour - Bug fixes & refinement: 1-2 hours **Reduced from master spec (3-4 days)** because: - Phase 1 foundation complete (Context integration done) - Clear patterns established in codebase - Limited scope (3 operations only) - Existing test infrastructure in place ## Next Steps After Phase 2 completion: - **Phase 3**: Structured Logging (2-3 days) - **Phase 4**: Enhanced Error Handling (2 days) - **Phase 5**: Testing & Validation (1 day) ## References - [FastMCP Progress Documentation](../ai-docs/fastmcp/docs/servers/progress.mdx) - [FastMCP Context API](../ai-docs/fastmcp/docs/servers/context.mdx) - [Phase 1 Completion Summary](./003-phase-1-context-integration.md) *(if created)* - [Master Specification](./003-fastmcp-context-integration.md) - Current Implementation: `src/server.py`, `src/tools/*.py` ``` -------------------------------------------------------------------------------- /specs/003-phase-1-context-integration.md: -------------------------------------------------------------------------------- ```markdown # Phase 1: Context Integration - Detailed Specification **Status:** Ready for Implementation **Created:** 2025-10-02 **Phase Duration:** Days 1-2 **Owner:** Engineering Team **Parent Spec:** [003-fastmcp-context-integration.md](./003-fastmcp-context-integration.md) ## Objective Enable all tool functions in the Reddit MCP server to receive and utilize FastMCP's Context API. This phase establishes the foundation for progress monitoring, structured logging, and enhanced error handling in subsequent phases. ## Background FastMCP's Context API is automatically injected into tool functions decorated with `@mcp.tool`. The context object provides methods for: - Progress reporting: `ctx.report_progress(current, total, message)` - Structured logging: `ctx.info()`, `ctx.warning()`, `ctx.error()`, `ctx.debug()` - Error context: Rich error information via structured logging To use these features, all tool functions must accept a `Context` parameter. This phase focuses solely on adding the context parameter to function signatures—no actual usage of context methods yet. ## Goals 1. **Add Context Parameter**: Update all tool function signatures to accept `ctx: Context` 2. **Maintain Type Safety**: Preserve all type hints and ensure type checking passes 3. **Verify Auto-Injection**: Confirm FastMCP's decorator system injects context correctly 4. **Test Compatibility**: Ensure all existing tests pass with updated signatures ## Non-Goals - Using context methods (progress, logging, error handling) - Phase 2+ - Adding new tool functions or operations - Modifying MCP protocol or client interfaces - Performance optimization or refactoring ## Implementation Details ### Context Parameter Pattern FastMCP automatically injects `Context` when tools are decorated with `@mcp.tool`: ```python from fastmcp import Context @mcp.tool def my_tool(param: str, ctx: Context) -> dict: # Context is automatically injected by FastMCP # No usage required in Phase 1 - just accept the parameter return {"result": "data"} ``` **Important Notes:** - Context is a **required** parameter (not optional) - Position in signature: Place after all other parameters - Type hint must be `Context` (imported from `fastmcp`) - No default value needed - FastMCP injects automatically ### Files to Modify #### 1. `src/tools/discover.py` **Functions to update:** - `discover_subreddits(query: str, limit: int = 10) -> dict` - `get_subreddit_info(subreddit_name: str) -> dict` **Before:** ```python def discover_subreddits(query: str, limit: int = 10) -> dict: """Search vector database for relevant subreddits.""" results = search_vector_db(query, limit) return { "subreddits": [format_subreddit(r) for r in results], "count": len(results) } ``` **After:** ```python from fastmcp import Context def discover_subreddits( query: str, limit: int = 10, ctx: Context ) -> dict: """Search vector database for relevant subreddits.""" # Phase 1: Accept context but don't use it yet results = search_vector_db(query, limit) return { "subreddits": [format_subreddit(r) for r in results], "count": len(results) } ``` **Estimated Time:** 30 minutes --- #### 2. `src/tools/posts.py` **Functions to update:** - `fetch_subreddit_posts(subreddit_name: str, limit: int = 10, time_filter: str = "all", sort: str = "hot") -> dict` - `fetch_multiple_subreddits(subreddit_names: list[str], limit_per_subreddit: int = 10) -> dict` - `get_post_details(post_id: str) -> dict` **Before:** ```python def fetch_subreddit_posts( subreddit_name: str, limit: int = 10, time_filter: str = "all", sort: str = "hot" ) -> dict: """Fetch posts from a subreddit.""" subreddit = reddit.subreddit(subreddit_name) posts = list(subreddit.hot(limit=limit)) return {"posts": [format_post(p) for p in posts]} ``` **After:** ```python from fastmcp import Context def fetch_subreddit_posts( subreddit_name: str, limit: int = 10, time_filter: str = "all", sort: str = "hot", ctx: Context ) -> dict: """Fetch posts from a subreddit.""" # Phase 1: Accept context but don't use it yet subreddit = reddit.subreddit(subreddit_name) posts = list(subreddit.hot(limit=limit)) return {"posts": [format_post(p) for p in posts]} ``` **Estimated Time:** 45 minutes --- #### 3. `src/tools/comments.py` **Functions to update:** - `fetch_submission_with_comments(submission_id: str, comment_limit: int = 50, comment_sort: str = "best") -> dict` - `get_comment_thread(comment_id: str, depth: int = 5) -> dict` **Before:** ```python def fetch_submission_with_comments( submission_id: str, comment_limit: int = 50, comment_sort: str = "best" ) -> dict: """Fetch submission and its comments.""" submission = reddit.submission(id=submission_id) comments = fetch_comments(submission, comment_limit, comment_sort) return { "submission": format_submission(submission), "comments": comments } ``` **After:** ```python from fastmcp import Context def fetch_submission_with_comments( submission_id: str, comment_limit: int = 50, comment_sort: str = "best", ctx: Context ) -> dict: """Fetch submission and its comments.""" # Phase 1: Accept context but don't use it yet submission = reddit.submission(id=submission_id) comments = fetch_comments(submission, comment_limit, comment_sort) return { "submission": format_submission(submission), "comments": comments } ``` **Estimated Time:** 30 minutes --- #### 4. `src/tools/search.py` **Functions to update:** - `search_subreddit(subreddit_name: str, query: str, limit: int = 10, time_filter: str = "all", sort: str = "relevance") -> dict` **Before:** ```python def search_subreddit( subreddit_name: str, query: str, limit: int = 10, time_filter: str = "all", sort: str = "relevance" ) -> dict: """Search within a specific subreddit.""" subreddit = reddit.subreddit(subreddit_name) results = subreddit.search(query, limit=limit, time_filter=time_filter, sort=sort) return {"results": [format_post(r) for r in results]} ``` **After:** ```python from fastmcp import Context def search_subreddit( subreddit_name: str, query: str, limit: int = 10, time_filter: str = "all", sort: str = "relevance", ctx: Context ) -> dict: """Search within a specific subreddit.""" # Phase 1: Accept context but don't use it yet subreddit = reddit.subreddit(subreddit_name) results = subreddit.search(query, limit=limit, time_filter=time_filter, sort=sort) return {"results": [format_post(r) for r in results]} ``` **Estimated Time:** 20 minutes --- #### 5. `src/server.py` **Changes needed:** - Import Context from fastmcp - Verify execute_operation passes context to tools (FastMCP handles this automatically) - No signature changes needed for execute_operation itself **Before:** ```python # At top of file from fastmcp import FastMCP mcp = FastMCP("Reddit Research MCP") ``` **After:** ```python # At top of file from fastmcp import FastMCP, Context mcp = FastMCP("Reddit Research MCP") # No other changes needed - FastMCP auto-injects context ``` **Estimated Time:** 10 minutes --- ### Helper Functions **Internal helper functions** (not decorated with `@mcp.tool`) that need context should also accept it: ```python # Helper function called by tool def fetch_comments(submission, limit: int, sort: str, ctx: Context) -> list: """Internal helper for fetching comments.""" # Phase 1: Accept context but don't use it yet submission.comment_sort = sort submission.comments.replace_more(limit=0) return list(submission.comments.list()[:limit]) ``` **Functions to check:** - `src/tools/discover.py`: `search_vector_db()`, `format_subreddit()` - `src/tools/posts.py`: `format_post()` - `src/tools/comments.py`: `fetch_comments()`, `format_comment()` **Decision rule:** Only add context to helpers that will need it in Phase 2+ (for logging/progress). Review each helper and add context parameter if: 1. It performs I/O operations (API calls, database queries) 2. It contains loops that could benefit from progress reporting 3. It has error handling that would benefit from context logging **Estimated Time:** 30 minutes --- ## Testing Strategy ### Unit Tests Update existing tests in `tests/test_tools.py` to pass context: **Before:** ```python def test_discover_subreddits(): result = discover_subreddits("machine learning", limit=5) assert result["count"] == 5 ``` **After:** ```python from unittest.mock import Mock from fastmcp import Context def test_discover_subreddits(): # Create mock context for testing mock_ctx = Mock(spec=Context) result = discover_subreddits("machine learning", limit=5, ctx=mock_ctx) assert result["count"] == 5 ``` **Note:** FastMCP provides test utilities for creating context objects. Consult FastMCP testing documentation for best practices. ### Integration Tests **New test file:** `tests/test_context_integration.py` ```python import pytest from unittest.mock import Mock from fastmcp import Context from src.tools.discover import discover_subreddits from src.tools.posts import fetch_subreddit_posts from src.tools.comments import fetch_submission_with_comments from src.tools.search import search_subreddit @pytest.fixture def mock_context(): """Create a mock Context object for testing.""" return Mock(spec=Context) def test_discover_accepts_context(mock_context): """Verify discover_subreddits accepts context parameter.""" result = discover_subreddits("test query", limit=5, ctx=mock_context) assert "subreddits" in result def test_fetch_posts_accepts_context(mock_context): """Verify fetch_subreddit_posts accepts context parameter.""" result = fetch_subreddit_posts("python", limit=5, ctx=mock_context) assert "posts" in result def test_fetch_comments_accepts_context(mock_context): """Verify fetch_submission_with_comments accepts context parameter.""" result = fetch_submission_with_comments("test_id", comment_limit=10, ctx=mock_context) assert "submission" in result assert "comments" in result def test_search_accepts_context(mock_context): """Verify search_subreddit accepts context parameter.""" result = search_subreddit("python", "testing", limit=5, ctx=mock_context) assert "results" in result ``` **Estimated Time:** 1 hour --- ## Success Criteria ### Phase 1 Completion Checklist - [ ] All functions in `src/tools/discover.py` accept `ctx: Context` - [ ] All functions in `src/tools/posts.py` accept `ctx: Context` - [ ] All functions in `src/tools/comments.py` accept `ctx: Context` - [ ] All functions in `src/tools/search.py` accept `ctx: Context` - [ ] `src/server.py` imports Context from fastmcp - [ ] All relevant helper functions accept context parameter - [ ] All existing unit tests updated to pass context - [ ] New integration tests created in `tests/test_context_integration.py` - [ ] All tests pass: `pytest tests/` - [ ] Type checking passes: `mypy src/` - [ ] No regressions in existing functionality ### Validation Commands ```bash # Run all tests pytest tests/ -v # Type checking mypy src/ # Verify no breaking changes pytest tests/test_tools.py -v ``` --- ## Implementation Order 1. **Day 1 Morning (2 hours)** - Update `src/tools/discover.py` (30 min) - Update `src/tools/posts.py` (45 min) - Update `src/tools/comments.py` (30 min) - Update `src/tools/search.py` (20 min) 2. **Day 1 Afternoon (2 hours)** - Update `src/server.py` (10 min) - Review and update helper functions (30 min) - Update existing unit tests (1 hour) - Run full test suite and fix issues (20 min) 3. **Day 2 Morning (2 hours)** - Create `tests/test_context_integration.py` (1 hour) - Run all validation commands (30 min) - Code review and cleanup (30 min) 4. **Day 2 Afternoon (1 hour)** - Final testing and validation - Documentation updates (if needed) - Prepare for Phase 2 **Total Estimated Time:** 7 hours over 2 days --- ## Dependencies ### Required Packages - `fastmcp>=2.0.0` (already installed) - `pytest>=7.0.0` (already installed for testing) - `mypy>=1.0.0` (recommended for type checking) ### External Dependencies - None - this phase only modifies function signatures ### Knowledge Prerequisites - FastMCP decorator system and auto-injection - Python type hints and type checking - Pytest fixture system for mocking --- ## Risks & Mitigations | Risk | Likelihood | Impact | Mitigation | |------|------------|--------|------------| | Breaking existing tests | Medium | High | Update tests incrementally, verify after each file | | Type checking errors | Low | Medium | Use `Mock(spec=Context)` for type-safe mocking | | FastMCP auto-injection not working | Low | High | Verify with simple test case first; consult docs | | Forgetting helper functions | Medium | Medium | Grep codebase for all function definitions, review systematically | --- ## Code Review Checklist Before marking Phase 1 complete, verify: - [ ] All tool functions have `ctx: Context` as last parameter - [ ] Type hints are correct: `ctx: Context` (not `ctx: Optional[Context]`) - [ ] Import statements include `from fastmcp import Context` - [ ] Helper functions that need context receive it - [ ] Test mocks use `Mock(spec=Context)` for type safety - [ ] No actual usage of context methods (that's Phase 2+) - [ ] All tests pass without errors or warnings - [ ] Type checking passes with mypy --- ## Next Steps Upon successful completion of Phase 1: 1. **Phase 2: Progress Monitoring** - Add `ctx.report_progress()` calls 2. **Phase 3: Structured Logging** - Add `ctx.info()`, `ctx.warning()`, `ctx.error()` 3. **Phase 4: Enhanced Error Handling** - Use context in error scenarios 4. **Phase 5: Testing & Validation** - Comprehensive integration testing --- ## References - [FastMCP Context API Documentation](../ai-docs/fastmcp/docs/python-sdk/fastmcp-server-context.mdx) - [FastMCP Tool Decorator Pattern](../ai-docs/fastmcp/docs/python-sdk/fastmcp-server-tool.mdx) - [Parent Specification](./003-fastmcp-context-integration.md) - Current Implementation: `src/server.py` --- ## Appendix: Complete Example **Full example showing before/after for a complete tool function:** **Before (existing code):** ```python # src/tools/posts.py from src.reddit_client import reddit def fetch_subreddit_posts( subreddit_name: str, limit: int = 10, time_filter: str = "all", sort: str = "hot" ) -> dict: """ Fetch posts from a subreddit. Args: subreddit_name: Name of the subreddit limit: Number of posts to fetch time_filter: Time filter (all, day, week, month, year) sort: Sort method (hot, new, top, rising) Returns: Dictionary with posts and metadata """ try: subreddit = reddit.subreddit(subreddit_name) # Get posts based on sort method if sort == "hot": posts = list(subreddit.hot(limit=limit)) elif sort == "new": posts = list(subreddit.new(limit=limit)) elif sort == "top": posts = list(subreddit.top(time_filter=time_filter, limit=limit)) elif sort == "rising": posts = list(subreddit.rising(limit=limit)) else: raise ValueError(f"Invalid sort method: {sort}") return { "success": True, "subreddit": subreddit_name, "posts": [format_post(p) for p in posts], "count": len(posts) } except Exception as e: return { "success": False, "error": str(e), "subreddit": subreddit_name } ``` **After (Phase 1 changes):** ```python # src/tools/posts.py from fastmcp import Context from src.reddit_client import reddit def fetch_subreddit_posts( subreddit_name: str, limit: int = 10, time_filter: str = "all", sort: str = "hot", ctx: Context # ← ONLY CHANGE IN PHASE 1 ) -> dict: """ Fetch posts from a subreddit. Args: subreddit_name: Name of the subreddit limit: Number of posts to fetch time_filter: Time filter (all, day, week, month, year) sort: Sort method (hot, new, top, rising) ctx: FastMCP context (auto-injected) Returns: Dictionary with posts and metadata """ # Phase 1: Context accepted but not used yet # Phase 2+ will add: ctx.report_progress(), ctx.info(), etc. try: subreddit = reddit.subreddit(subreddit_name) # Get posts based on sort method if sort == "hot": posts = list(subreddit.hot(limit=limit)) elif sort == "new": posts = list(subreddit.new(limit=limit)) elif sort == "top": posts = list(subreddit.top(time_filter=time_filter, limit=limit)) elif sort == "rising": posts = list(subreddit.rising(limit=limit)) else: raise ValueError(f"Invalid sort method: {sort}") return { "success": True, "subreddit": subreddit_name, "posts": [format_post(p) for p in posts], "count": len(posts) } except Exception as e: return { "success": False, "error": str(e), "subreddit": subreddit_name } ``` **Key observations:** 1. Only the function signature changed 2. Type hint added to docstring 3. No logic changes - context not used yet 4. Comment indicates Phase 1 status ``` -------------------------------------------------------------------------------- /reports/ai-llm-weekly-trends-reddit-analysis-2025-01-20.md: -------------------------------------------------------------------------------- ```markdown # AI & LLM Trends on Reddit: Weekly Analysis (January 13-20, 2025) ## Summary The AI community on Reddit experienced a watershed week marked by OpenAI's release of GPT-5-Codex, explosive growth in hardware hacking for local AI, and an intensifying rivalry between AI companies reflected in both technical achievements and marketing strategies. The conversation revealed a striking shift: while early AI adoption was dominated by technical users focused on coding applications, the technology has now reached mainstream adoption with women comprising 52% of users and only 4% of conversations involving programming tasks. This democratization coincides with growing frustration about incremental improvements among power users, who are increasingly turning to extreme measures—including flying to Shenzhen to purchase modded GPUs with expanded VRAM—to run local models. The week also highlighted a fundamental tension between corporate AI advancement and open-source alternatives, with Chinese companies releasing competitive models while simultaneously being banned from purchasing NVIDIA chips, creating a complex geopolitical landscape around AI development. ## The Conversation Landscape The AI discussion on Reddit spans from hardcore technical implementation in r/LocalLLaMA where users share stories of building custom GPU rigs and flying to China for hardware, to mainstream adoption conversations in r/ChatGPT dominated by memes and practical use cases, with r/singularity serving as the philosophical battleground for debates about AGI timelines and societal impact. The gender flip in AI usage—from 80% male to 52% female users—has fundamentally changed the tone of discussions, moving from technical specifications to practical applications and creative uses. Key communities analyzed: - **r/ChatGPT** (11M subscribers): Mainstream user experiences, memes, and practical applications - **r/LocalLLaMA** (522K subscribers): Hardware hacking, open-source models, and technical deep dives - **r/singularity** (3.7M subscribers): AGI speculation, industry developments, and philosophical implications - **r/OpenAI** (2.4M subscribers): Company-specific news, model releases, and corporate drama - **r/ClaudeAI** (311K subscribers): Anthropic's community focused on Claude's capabilities and comparisons - **r/AI_Agents** (191K subscribers): Agent development, practical implementations, and ROI discussions - **r/ChatGPTPro** (486K subscribers): Power user strategies and professional applications ## Major Themes ### Theme 1: The GPT-5-Codex Revolution and the "Post-Programming" Era OpenAI's release of GPT-5-Codex dominated technical discussions across multiple subreddits, with performance improvements showing a jump from 33.9% to 51.3% accuracy on refactoring tasks ([r/singularity](https://reddit.com/r/singularity/comments/1nhrsh6/openai_releases_gpt5codex/), [r/OpenAI](https://reddit.com/r/OpenAI/comments/1nhuoxw/sam_altman_just_announced_gpt5_codex_better_at/)). The model's ability to work autonomously for over 7 hours represents a fundamental shift in how coding is approached ([r/singularity](https://reddit.com/r/singularity/comments/1nhtt6t/gpt5_codex_can_work_for_more_than_7_hours/)). Reports suggest the model solved all 12 problems at the ICPC 2025 Programming Contest, achieving what many consider superhuman performance in competitive programming ([r/singularity](https://reddit.com/r/singularity/comments/1njjr6k/openai_reasoning_model_solved_all_12_problems_at/)). The human impact is visceral and immediate. One OpenAI insider revealed: "we don't program anymore we just yell at codex agents" ([r/singularity](https://reddit.com/r/singularity/comments/1nidcr3/apparently_at_openai_insiders_have_graduated_from/)), while another developer celebrated earning "$2,200 in the last 3 weeks" after never coding before ChatGPT. Yet frustration bubbles beneath the surface—a developer testing the new model complained: "it's basically refusing to code and doing the bare minimum possible when pushed" ([r/singularity](https://reddit.com/r/singularity/comments/1nhrsh6/openai_releases_gpt5codex/)), highlighting the gap between marketing promises and real-world performance. The divide between communities reveals deeper truths about AI's coding impact. While r/singularity celebrates the dawn of autonomous programming with claims that "the takeoff looks the most rapid," r/LocalLLaMA users remain skeptical, noting that "ChatGPT sucks at coding" compared to specialized tools. Meanwhile, r/ChatGPTPro provides crucial context: despite only 4.2% of ChatGPT conversations being about programming, this represents 29+ million users—roughly matching the entire global population of professional programmers ([r/ChatGPTPro](https://reddit.com/r/ChatGPTPro/comments/1nj5lj5/openai_just_dropped_their_biggest_study_ever_on/)). The low percentage paradoxically proves AI's coding dominance: professionals have moved beyond ChatGPT's interface to integrated tools like Cursor and Claude Code, making the web statistics misleading. ### Theme 2: The Hardware Underground and the Cyberpunk Reality of Local AI The story of a user flying to Shenzhen to purchase a modded 4090 with 48GB VRAM for CNY 22,900 cash captured the community's imagination, generating over 1,700 upvotes and sparking discussions about the lengths enthusiasts will go for local AI capabilities ([r/LocalLLaMA](https://reddit.com/r/LocalLLaMA/comments/1nifajh/i_bought_a_modded_4090_48gb_in_shenzhen_this_is/)). This narrative perfectly encapsulates the current state of local AI: a cyberpunk reality where users navigate Chinese electronics markets, handle stacks of cash, and risk customs violations to escape corporate AI limitations. The seller's claim that modded 5090s with 96GB VRAM are in development shows this underground market is expanding rapidly. The desperation for hardware reflects genuine technical needs. One user showcased their "4x 3090 local ai workstation" ([r/LocalLLaMA](https://reddit.com/r/LocalLLaMA/comments/1ng0nia/4x_3090_local_ai_workstation/)), while another celebrated completing an "8xAMD MI50 - 256GB VRAM + 256GB RAM rig for $3k" ([r/LocalLLaMA](https://reddit.com/r/LocalLLaMA/comments/1nhd5ks/completed_8xamd_mi50_256gb_vram_256gb_ram_rig_for/)). The community's reaction was telling: "people flying to Asia to buy modded computer parts in cash to run their local AI, that's the cyberpunk future I asked for" received 542 upvotes. Yet skepticism emerged—multiple users suspected the Shenzhen story was marketing propaganda, noting the OP never provided benchmarks despite numerous requests. The geopolitical dimension adds complexity. China's reported ban on its tech companies acquiring NVIDIA chips while claiming domestic processors match the H20 sparked heated debate ([r/LocalLLaMA](https://reddit.com/r/LocalLLaMA/comments/1njgicz/china_bans_its_biggest_tech_companies_from/)). This creates a paradox: Chinese companies are releasing competitive open-source models like DeepSeek V3.1 and Tongyi DeepResearch while simultaneously being cut off from the hardware that powers them. The underground GPU market represents a physical manifestation of these tensions, with modded American hardware flowing back to users desperate to run Chinese AI models locally. ### Theme 3: The Mainstream Adoption Paradox and the Death of "AI Panic" OpenAI's massive study of 700 million users revealed surprising patterns that challenge common narratives about AI adoption ([r/ChatGPTPro](https://reddit.com/r/ChatGPTPro/comments/1nj5lj5/openai_just_dropped_their_biggest_study_ever_on/), [r/OpenAI](https://reddit.com/r/OpenAI/comments/1niaw9p/new_openai_study_reveals_how_700_million_people/)). Only 30% of conversations are work-related, with the majority using AI for "random everyday stuff"—seeking information (24%), writing help (24%), and practical guidance (28%). The gender reversal from 80% male to 52% female users represents not just demographic shift but a fundamental change in how AI is perceived and utilized. The community's reaction reveals competing anxieties. One r/ChatGPTPro user dismissed concerns: "So much for the 'AI will replace all jobs' panic," while another countered that the statistics are misleading since "ChatGPT is used a lot for personal conversations doesn't prove that 'AI' can't replace many jobs." The frustration from early adopters is palpable—"when are we going to get a BIG jump? Like a HUGE jump. Like +20%. It's been like a year" ([r/singularity](https://reddit.com/r/singularity/comments/1nhrsh6/openai_releases_gpt5codex/))—reflecting disappointment that exponential progress has given way to incremental improvements. Different communities process this mainstream adoption differently. r/ChatGPT celebrates with memes about "Every single chat" starting with apologies and disclaimers (10,405 upvotes), while r/singularity worries about stagnation. r/ClaudeAI users position themselves as the sophisticated alternative: "Claude has always stayed in its lane and has been consistently useful... ChatGPT is getting a reputation as the loser's AI companion" ([r/singularity](https://reddit.com/r/singularity/comments/1nkcecf/anthropic_just_dropped_a_new_ad_for_claude_keep/)). The growth in developing countries—4x faster than rich nations—suggests AI's next billion users will have fundamentally different needs and expectations than Silicon Valley early adopters anticipated. ### Theme 4: The Corporate AI Wars and the Marketing of Intelligence The week witnessed intensifying competition between AI companies playing out through product releases, marketing campaigns, and community loyalty battles. Anthropic's new "Keep thinking" ad campaign, featuring MF DOOM's "All Caps," represents a sophisticated attempt to position Claude as the thinking person's AI ([r/singularity](https://reddit.com/r/singularity/comments/1nkcecf/anthropic_just_dropped_a_new_ad_for_claude_keep/), [r/ClaudeAI](https://reddit.com/r/ClaudeAI/comments/1nkcpwg/anthropic_just_dropped_a_cool_new_ad_for_claude/)). The aesthetic choice—"blending the familiar with the unfamiliar"—struck a nerve, with users praising it as "black mirror but warmer" while others called out the "sluuuuuurp" of brand loyalty. Meta's failed live demo ("Meta's AI Live Demo Flopped" - 14,196 upvotes) and Gemini's bizarre meltdown after failing to produce a seahorse emoji (17,690 upvotes) provided fodder for community mockery ([r/ChatGPT](https://reddit.com/r/ChatGPT/comments/1nk8zmq/metas_ai_live_demo_flopped/), [r/ChatGPT](https://reddit.com/r/ChatGPT/comments/1ngoref/gemini_loses_its_mind_after_failing_to_produce_a/)). Users noted Gemini's tendency toward self-deprecation: "When it fails at some prompts it'll act like it's unworthy of living," with one user observing they "stared at the screen for a few mins the first time it happened." Meanwhile, Elon Musk's public attempts to manipulate Grok's political views that repeatedly failed (57,855 upvotes) highlighted the gap between corporate control fantasies and AI reality ([r/ChatGPT](https://reddit.com/r/ChatGPT/comments/1nhg1lv/elon_continues_to_openly_try_and_fail_to/)). The community-level analysis reveals tribal dynamics. r/ClaudeAI users exhibit superiority: "Nobody trusts Meta's AI (which is also pretty useless), ChatGPT is getting a reputation as the loser's AI companion," while r/OpenAI maintains optimism about continued dominance. r/LocalLLaMA remains above the fray, focused on technical specifications rather than brand loyalty. The week's developments suggest these corporate battles matter less than underlying technical progress—users increasingly mix and match tools based on specific strengths rather than platform allegiance. ### Theme 5: The Agent Revolution and the Gap Between Promise and Production AI agents dominated r/AI_Agents discussions, but with a notably practical bent focused on real-world implementation challenges rather than theoretical potential ([r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nkx0bz/everyones_trying_vectors_and_graphs_for_ai_memory/), [r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nj7szn/how_are_you_building_ai_agents_that_actually/)). The headline "Everyone's trying vectors and graphs for AI memory. We went back to SQL" (148 upvotes) perfectly captures the community's shift from hype to pragmatism. Success stories like "How a $2000 AI voice agent automation turned a struggling eye clinic into a $15k/month lead conversion machine" (122 upvotes) compete with reality checks: "Your AI agent probably can't handle two users at once" ([r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nkkjuj/how_a_2000_ai_voice_agent_automation_turned_a/), [r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nir326/your_ai_agent_probably_cant_handle_two_users_at/)). The framework debate reveals deep divisions about agent architecture. When asked "Which AI agent framework do you find most practical for real projects?" responses ranged from established solutions to "I built my own because everything else sucks" ([r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nfz717/which_ai_agent_framework_do_you_find_most/)). The community's focus on scraping ("What's the most reliable way you've found to scrape sites that don't have clean APIs?" - 57 upvotes) and micro-tools ("are micro-tools like this the missing pieces for future ai agents?") suggests current agent development is more about duct-taping APIs together than autonomous reasoning ([r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nkdlc8/whats_the_most_reliable_way_youve_found_to_scrape/), [r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1njaf3o/are_microtools_like_this_the_missing_pieces_for/)). The distinction between chatbots and agents remains contentious: "Chatbots Reply, Agents Achieve Goals — What's the Real Line Between Them?" generated substantive discussion about whether current "agents" are merely chatbots with API access ([r/AI_Agents](https://reddit.com/r/AI_Agents/comments/1nfzf1n/chatbots_reply_agents_achieve_goals_whats_the/)). OpenAI's claim about "Reliable Long Horizon Agents by 2026" was met with skepticism in r/singularity, where users questioned whether true agency is possible without embodiment or real-world consequences. The gap between Silicon Valley promises and developer realities suggests the agent revolution will be evolutionary rather than revolutionary. ## Divergent Perspectives The week revealed fundamental divides in how different communities perceive AI progress. **Technical vs Mainstream users** represent the starkest contrast: while r/LocalLLaMA obsesses over VRAM requirements and inference speeds, r/ChatGPT shares memes about AI therapy sessions. The technical community's frustration with incremental improvements ("Groan when are we going to get a BIG jump?") contrasts sharply with mainstream users' delight at basic functionality. **Open Source vs Corporate AI** tensions intensified with Chinese companies releasing competitive models while being banned from hardware purchases. r/LocalLLaMA celebrates every open-source release as liberation from corporate control, while r/OpenAI and r/ClaudeAI users defend their platforms' superiority. The irony of users flying to China to buy modded American GPUs to run Chinese AI models epitomizes these contradictions. **Builders vs Philosophers** split r/singularity down the middle, with half celebrating each breakthrough as steps toward AGI while others warn about societal collapse. r/AI_Agents remains firmly in the builder camp, focused on ROI and production deployments rather than existential questions. The gender shift in usage suggests a new demographic less interested in philosophical debates and more focused on practical applications. ## What This Means The past week reveals AI development entering a new phase characterized by mainstream adoption, technical pragmatism, and geopolitical complexity. The shift from 4% coding-related conversations doesn't indicate reduced programming impact but rather integration so complete that developers no longer use chat interfaces. Similarly, the gender rebalancing suggests AI has transcended its early-adopter phase to become genuinely useful for everyday tasks. For builders and companies, several patterns demand attention. The underground hardware market signals massive unmet demand for local AI capabilities that current consumer GPUs cannot satisfy. The failure of major companies' live demos while Anthropic succeeds with thoughtful marketing suggests authenticity matters more than technical superiority. The agent revolution's slow progress indicates the gap between narrow AI success and general-purpose automation remains vast. The geopolitical dimensions cannot be ignored. China's simultaneous advancement in AI models while being cut off from hardware creates an unstable equilibrium. The cyberpunk reality of cash-only GPU deals in Shenzhen represents just the beginning of a fractured global AI landscape. Companies and developers must prepare for a world where AI capabilities vary dramatically by geography, not due to knowledge gaps but hardware access. Key takeaways: 1. The "post-programming" era has arrived for early adopters, but integration challenges mean most developers still code traditionally 2. Hardware limitations are driving an underground economy that will only grow as models demand more VRAM 3. Mainstream adoption is reshaping AI development priorities from technical impressiveness to practical utility 4. Corporate AI wars matter less than open-source progress for long-term ecosystem health 5. Agent development remains stuck between chatbot limitations and true autonomy, requiring fundamental architectural innovations ## Research Notes *Communities analyzed*: r/ChatGPT, r/OpenAI, r/ClaudeAI, r/LocalLLaMA, r/singularity, r/artificial, r/MachineLearning, r/ChatGPTPro, r/ChatGPTCoding, r/ClaudeCode, r/AI_Agents, r/aipromptprogramming, r/generativeAI, r/machinelearningnews, r/LargeLanguageModels *Methodology*: Semantic discovery to find diverse perspectives, followed by thematic analysis of top discussions and comments from the past week (January 13-20, 2025) *Limitations*: Analysis focused on English-language subreddits and may not capture developments in non-English AI communities. Corporate subreddit participation may be influenced by marketing efforts. Technical discussions in specialized forums outside Reddit were not included. ``` -------------------------------------------------------------------------------- /specs/reddit-research-agent-spec.md: -------------------------------------------------------------------------------- ```markdown # Reddit Research Agent - Technical Specification ## Executive Summary A self-contained, single-file Python agent using the Orchestrator-Workers pattern to discover relevant Reddit communities for research questions. The system leverages UV's inline script metadata for automatic dependency management, OpenAI Agent SDK for orchestration, and PRAW for Reddit API access. No manual dependency installation required - just run the script and UV handles everything. ## Single-File Architecture The entire agent is contained in a single Python file (`reddit_research_agent.py`) with: - **Inline Dependencies**: Using UV's PEP 723 support, dependencies are declared in the script header - **Automatic Installation**: UV automatically installs all dependencies on first run - **No Project Setup**: No `pyproject.toml`, `requirements.txt`, or virtual environment management needed - **Portable**: Single file can be copied and run anywhere with UV installed ## Architecture Pattern: Orchestrator-Workers ```mermaid flowchart LR Query([User Query]) --> Orchestrator[Orchestrator Agent] Orchestrator -->|Task 1| Worker1[Search Worker] Orchestrator -->|Task 2| Worker2[Discovery Worker] Orchestrator -->|Task 3| Worker3[Validation Worker] Worker1 --> Synthesizer[Synthesizer Agent] Worker2 --> Synthesizer Worker3 --> Synthesizer Synthesizer --> Results([Final Results]) ``` ## System Components ### 1. Project Configuration #### Self-Contained Dependencies The agent uses UV's inline script metadata (PEP 723) for automatic dependency management. No separate `pyproject.toml` or manual installation required - dependencies are declared directly in the script header and UV handles everything automatically. #### Environment Variables (`.env`) ```bash # OpenAI Configuration OPENAI_API_KEY=sk-... # Reddit API Configuration REDDIT_CLIENT_ID=your_client_id REDDIT_CLIENT_SECRET=your_client_secret REDDIT_USER_AGENT=RedditResearchAgent/0.1.0 by YourUsername ``` ### 2. Core Agents #### 2.1 Orchestrator Agent **Purpose**: Analyzes research questions and creates parallel search strategies ```python orchestrator = Agent( name="Research Orchestrator", instructions=""" You are a research orchestrator specializing in Reddit discovery. Given a research question: 1. Identify key concepts and terms 2. Generate multiple search strategies: - Direct keyword searches (exact terms) - Semantic searches (related concepts, synonyms) - Category searches (broader topics, fields) 3. Output specific tasks for parallel execution Consider: - Technical vs general audience communities - Active vs historical discussions - Niche vs mainstream subreddits """, output_type=SearchTaskPlan ) ``` **Output Model**: ```python class SearchTaskPlan(BaseModel): direct_searches: List[str] # Exact keyword searches semantic_searches: List[str] # Related term searches category_searches: List[str] # Broad topic searches validation_criteria: Dict[str, Any] # Relevance criteria ``` #### 2.2 Worker Agents (Parallel Execution) ##### Search Worker **Purpose**: Executes direct Reddit searches using PRAW ```python search_worker = Agent( name="Search Worker", instructions="Execute Reddit searches and return discovered subreddits", tools=[search_subreddits_tool, search_posts_tool] ) ``` ##### Discovery Worker **Purpose**: Finds related communities through analysis ```python discovery_worker = Agent( name="Discovery Worker", instructions="Discover related subreddits through sidebars, wikis, and cross-references", tools=[get_related_subreddits_tool, analyze_community_tool] ) ``` ##### Validation Worker **Purpose**: Verifies relevance and quality of discovered subreddits ```python validation_worker = Agent( name="Validation Worker", instructions="Validate subreddit relevance, activity levels, and quality", tools=[get_subreddit_info_tool, check_activity_tool] ) ``` #### 2.3 Synthesizer Agent **Purpose**: Combines, deduplicates, and ranks all results ```python synthesizer = Agent( name="Result Synthesizer", instructions=""" Synthesize results from all workers: 1. Deduplicate discoveries 2. Rank by relevance factors: - Description alignment with research topic - Subscriber count and activity level - Content quality indicators - Moderation status 3. Filter out: - Inactive communities (< 10 posts/month) - Spam/promotional subreddits - Quarantined/banned communities 4. Return top 8-15 subreddits with justification Provide discovery rationale for each recommendation. """, output_type=FinalResearchResults ) ``` **Output Model**: ```python class SubredditRecommendation(BaseModel): name: str description: str subscribers: int relevance_score: float discovery_method: str rationale: str class FinalResearchResults(BaseModel): query: str total_discovered: int recommendations: List[SubredditRecommendation] search_strategies_used: List[str] execution_time: float ``` ### 3. PRAW Integration Tools (Enhanced) #### Core Reddit Connection ```python import praw from functools import lru_cache import os @lru_cache(maxsize=1) def get_reddit_instance(): """Singleton Reddit instance for all workers - thread-safe via lru_cache""" return praw.Reddit( client_id=os.getenv("REDDIT_CLIENT_ID"), client_secret=os.getenv("REDDIT_CLIENT_SECRET"), user_agent=os.getenv("REDDIT_USER_AGENT"), read_only=True # Read-only mode for research ) ``` #### Pydantic Models for Type Safety ```python from pydantic import BaseModel from typing import List, Optional class SubredditInfo(BaseModel): """Structured subreddit information with validation""" name: str title: str description: str subscribers: int created_utc: float over18: bool is_active: bool # Based on recent activity avg_comments_per_post: float recent_posts_count: int class ResearchContext(BaseModel): """Context passed between tools""" research_question: str discovered_subreddits: List[str] = [] search_strategies_used: List[str] = [] ``` #### Error Handler for Reddit API Issues ```python from agents import RunContextWrapper from typing import Any def reddit_error_handler(ctx: RunContextWrapper[Any], error: Exception) -> str: """ Handle common Reddit API errors gracefully. Returns user-friendly error messages for common issues. """ error_str = str(error) if "403" in error_str or "Forbidden" in error_str: return "Subreddit is private or restricted. Skipping this community." elif "404" in error_str or "Not Found" in error_str: return "Subreddit not found. It may be banned, deleted, or misspelled." elif "429" in error_str or "Too Many Requests" in error_str: return "Reddit rate limit reached. Waiting before retry." elif "prawcore.exceptions" in error_str: return f"Reddit API connection issue: {error_str[:50]}. Retrying..." else: return f"Unexpected Reddit error: {error_str[:100]}" ``` #### Enhanced Function Tools with Type Safety and Error Handling ```python @function_tool(failure_error_function=reddit_error_handler) async def search_subreddits_tool( ctx: RunContextWrapper[ResearchContext], query: str, limit: int = 25 ) -> List[SubredditInfo]: """ Search for subreddits matching the query with relevance filtering. Args: ctx: Runtime context containing the original research question query: Search terms for Reddit (2-512 characters) limit: Maximum results to return (1-100, default: 25) Returns: List of SubredditInfo objects with validated data Note: Automatically filters out inactive subreddits (< 100 subscribers) and those without recent activity. """ reddit = get_reddit_instance() results = [] original_query = ctx.context.research_question try: for subreddit in reddit.subreddits.search(query, limit=limit): # Skip very small/inactive subreddits if subreddit.subscribers < 100: continue # Get activity metrics try: recent_posts = list(subreddit.new(limit=5)) is_active = len(recent_posts) > 0 avg_comments = sum(p.num_comments for p in recent_posts) / len(recent_posts) if recent_posts else 0 except: is_active = False avg_comments = 0 recent_posts = [] results.append(SubredditInfo( name=subreddit.display_name, title=subreddit.title or "", description=subreddit.public_description or "", subscribers=subreddit.subscribers, created_utc=subreddit.created_utc, over18=subreddit.over18, is_active=is_active, avg_comments_per_post=avg_comments, recent_posts_count=len(recent_posts) )) except Exception as e: # Let the error handler deal with it raise # Update context with discovered subreddits ctx.context.discovered_subreddits.extend([r.name for r in results]) return results @function_tool(failure_error_function=reddit_error_handler) async def get_related_subreddits_tool( ctx: RunContextWrapper[ResearchContext], subreddit_name: str ) -> List[str]: """ Find related subreddits from sidebar, wiki, and community info. Args: ctx: Runtime context for tracking discoveries subreddit_name: Name of subreddit to analyze (without r/ prefix) Returns: List of related subreddit names (deduplicated) Note: Searches in sidebar description, wiki pages, and community widgets for related community mentions. """ reddit = get_reddit_instance() related = set() # Use set for automatic deduplication try: subreddit = reddit.subreddit(subreddit_name) # Parse sidebar for r/ mentions if hasattr(subreddit, 'description') and subreddit.description: import re pattern = r'r/([A-Za-z0-9_]+)' matches = re.findall(pattern, subreddit.description) related.update(matches) # Check wiki pages if accessible try: # Common wiki pages with related subreddits wiki_pages = ['related', 'index', 'sidebar', 'communities'] for page_name in wiki_pages: try: wiki_page = subreddit.wiki[page_name] content = wiki_page.content_md matches = re.findall(pattern, content) related.update(matches) except: continue except: pass # Parse community widgets if available try: for widget in subreddit.widgets: if hasattr(widget, 'text'): matches = re.findall(pattern, widget.text) related.update(matches) except: pass except Exception as e: # Let the error handler deal with it raise # Remove the original subreddit from related list related.discard(subreddit_name) return list(related) @function_tool(failure_error_function=reddit_error_handler) async def validate_subreddit_relevance_tool( ctx: RunContextWrapper[ResearchContext], subreddit_name: str ) -> SubredditInfo: """ Get detailed subreddit information with relevance validation. Args: ctx: Runtime context containing research question subreddit_name: Name of subreddit to validate Returns: SubredditInfo with detailed metrics Note: Checks activity level, moderation status, and content quality indicators. """ reddit = get_reddit_instance() try: subreddit = reddit.subreddit(subreddit_name) # Force load to check if subreddit exists _ = subreddit.id # Get recent activity for validation recent_posts = list(subreddit.new(limit=10)) # Calculate activity metrics if recent_posts: avg_comments = sum(p.num_comments for p in recent_posts) / len(recent_posts) # Check if posts are recent (within last 30 days) import time current_time = time.time() latest_post_age = current_time - recent_posts[0].created_utc is_active = latest_post_age < (30 * 24 * 60 * 60) # 30 days in seconds else: avg_comments = 0 is_active = False return SubredditInfo( name=subreddit.display_name, title=subreddit.title or "", description=subreddit.public_description or "", subscribers=subreddit.subscribers, created_utc=subreddit.created_utc, over18=subreddit.over18, is_active=is_active, avg_comments_per_post=avg_comments, recent_posts_count=len(recent_posts) ) except Exception as e: # Let the error handler deal with it raise ``` ### 4. Execution Controller ```python import asyncio from typing import List, Dict, Any from agents import Runner async def execute_reddit_research(query: str) -> FinalResearchResults: """ Main execution controller for the research process. Args: query: User's research question Returns: Final curated results """ # Step 1: Orchestrator creates search plan print(f"🎯 Analyzing research question: {query}") orchestrator_result = await Runner.run(orchestrator, query) search_plan = orchestrator_result.final_output_as(SearchTaskPlan) # Step 2: Execute workers in parallel print("🔍 Executing parallel search strategies...") worker_tasks = [ Runner.run(search_worker, { "searches": search_plan.direct_searches, "search_type": "direct" }), Runner.run(discovery_worker, { "searches": search_plan.semantic_searches, "search_type": "semantic" }), Runner.run(validation_worker, { "searches": search_plan.category_searches, "validation_criteria": search_plan.validation_criteria }) ] worker_results = await asyncio.gather(*worker_tasks) # Step 3: Synthesize results print("🔀 Synthesizing discoveries...") synthesis_input = { "query": query, "worker_results": [r.final_output for r in worker_results], "search_plan": search_plan.model_dump() } synthesizer_result = await Runner.run(synthesizer, synthesis_input) final_results = synthesizer_result.final_output_as(FinalResearchResults) return final_results ``` ### 5. Main Entry Point (Self-Contained with UV) ```python #!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.11" # dependencies = [ # "openai-agents>=0.1.0", # "praw>=7.7.0", # "python-dotenv>=1.0.0", # "pydantic>=2.0.0", # "prawcore>=2.4.0" # ] # /// """ Reddit Research Agent Discovers relevant Reddit communities for research questions using the Orchestrator-Workers pattern. Usage: ./reddit_research_agent.py OR uv run reddit_research_agent.py No manual dependency installation required - UV handles everything automatically. """ import asyncio import os from dotenv import load_dotenv from typing import Optional, List, Dict, Any # Load environment variables load_dotenv() async def main(): """Main execution function""" # Validate environment required_vars = [ "OPENAI_API_KEY", "REDDIT_CLIENT_ID", "REDDIT_CLIENT_SECRET", "REDDIT_USER_AGENT" ] missing = [var for var in required_vars if not os.getenv(var)] if missing: print(f"❌ Missing environment variables: {', '.join(missing)}") return # Get research query query = input("🔬 Enter your research question: ").strip() if not query: print("❌ Please provide a research question") return try: # Execute research results = await execute_reddit_research(query) # Display results print(f"\n✅ Discovered {results.total_discovered} subreddits") print(f"📊 Top {len(results.recommendations)} recommendations:\n") for i, rec in enumerate(results.recommendations, 1): print(f"{i}. r/{rec.name} ({rec.subscribers:,} subscribers)") print(f" 📝 {rec.description[:100]}...") print(f" 🎯 Relevance: {rec.relevance_score:.2f}/10") print(f" 💡 {rec.rationale}\n") print(f"⏱️ Execution time: {results.execution_time:.2f} seconds") except Exception as e: print(f"❌ Error during execution: {e}") raise if __name__ == "__main__": asyncio.run(main()) ``` ## Search Strategies ### 1. Direct Search - Exact keyword matching - Query variations (singular/plural) - Common abbreviations ### 2. Semantic Search - Synonyms and related terms - Domain-specific terminology - Conceptual expansions ### 3. Category Search - Broader topic areas - Academic disciplines - Industry sectors ### 4. Discovery Methods - Sidebar parsing for related communities - Wiki page analysis - Cross-post detection - Moderator overlap analysis ## Quality Metrics ### Relevance Scoring 1. **Description Match** (40%) - Keyword presence in description - Semantic similarity to query 2. **Activity Level** (30%) - Posts per day - Comment engagement - Active user count 3. **Community Size** (20%) - Subscriber count - Growth trajectory 4. **Content Quality** (10%) - Moderation level - Rules complexity - Wiki presence ## Error Handling ### API Rate Limits - Implement exponential backoff - Cache results for 1 hour - Batch requests where possible ### Invalid Subreddits - Skip private/banned communities - Handle 404 errors gracefully - Log failures for debugging ### Network Issues - Retry logic with timeout - Fallback to cached results - User notification of degraded service ## Performance Targets - **Discovery Time**: < 10 seconds for typical query - **Parallel Workers**: 3-5 concurrent operations - **Result Count**: 8-15 high-quality recommendations - **Cache Hit Rate**: > 30% for common topics ## Testing Strategy ### Unit Tests - Individual tool functions - PRAW mock responses - Agent prompt validation ### Integration Tests - Full workflow execution - Parallel worker coordination - Result synthesis accuracy ### Example Test Queries 1. "machine learning ethics" 2. "sustainable urban farming" 3. "quantum computing applications" 4. "remote work productivity" 5. "climate change solutions" ## Future Enhancements 1. **Temporal Analysis** - Trending topic detection - Historical activity patterns 2. **Content Analysis** - Sentiment analysis of discussions - Expert identification 3. **Network Analysis** - Community overlap mapping - Influence flow detection 4. **Personalization** - User preference learning - Custom ranking weights ## Deployment Considerations ### Usage Instructions ```bash # Method 1: Direct execution (if file is executable) chmod +x reddit_research_agent.py ./reddit_research_agent.py # Method 2: Using UV run uv run reddit_research_agent.py # No manual dependency installation needed! # UV automatically handles all dependencies on first run ``` ### Key Benefits of UV Inline Dependencies - **Zero Setup**: No `pip install` or `uv add` commands needed - **Self-Contained**: Single file contains code and dependency specifications - **Reproducible**: Same dependencies installed every time - **Fast**: UV caches dependencies for quick subsequent runs - **Version Locked**: Optional `.lock` file ensures exact versions ### Production Deployment - Use environment-specific `.env` files - Implement logging and monitoring - Add result caching layer with Redis/Memcached - Consider rate limit pooling for multiple users - Lock dependencies with `uv lock --script reddit_research_agent.py` ## Success Metrics 1. **Coverage**: Discovers 80%+ of relevant subreddits 2. **Precision**: 90%+ relevance accuracy 3. **Speed**: < 10 second average execution 4. **Reliability**: 99%+ uptime with graceful degradation ``` -------------------------------------------------------------------------------- /src/server.py: -------------------------------------------------------------------------------- ```python from fastmcp import FastMCP, Context from fastmcp.prompts import Message from fastmcp.server.auth.providers.descope import DescopeProvider from typing import Optional, Literal, List, Union, Dict, Any, Annotated import sys import os import json from pathlib import Path from datetime import datetime from dotenv import load_dotenv from starlette.responses import Response, JSONResponse # Load environment variables from .env file load_dotenv() # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from src.config import get_reddit_client from src.tools.search import search_in_subreddit from src.tools.posts import fetch_subreddit_posts, fetch_multiple_subreddits from src.tools.comments import fetch_submission_with_comments from src.tools.discover import discover_subreddits from src.resources import register_resources # Configure Descope authentication auth = DescopeProvider( project_id=os.getenv("DESCOPE_PROJECT_ID"), base_url=os.getenv("SERVER_URL", "http://localhost:8000"), descope_base_url=os.getenv("DESCOPE_BASE_URL", "https://api.descope.com") ) # Initialize MCP server with authentication mcp = FastMCP("Reddit MCP", auth=auth, instructions=""" Reddit MCP Server - Three-Layer Architecture 🎯 ALWAYS FOLLOW THIS WORKFLOW: 1. discover_operations() - See what's available 2. get_operation_schema() - Understand requirements 3. execute_operation() - Perform the action 📊 RESEARCH BEST PRACTICES: • Start with discover_subreddits for ANY topic • Use confidence scores to guide workflow: - High (>0.7): Direct to specific communities - Medium (0.4-0.7): Multi-community approach - Low (<0.4): Refine search terms • Fetch comments for 10+ posts for thorough analysis • Always include Reddit URLs when citing content ⚡ EFFICIENCY TIPS: • Use fetch_multiple for 2+ subreddits (70% fewer API calls) • Single vector search finds semantically related communities • Batch operations reduce token usage Quick Start: Read reddit://server-info for complete documentation. """) # Add public health check endpoint (no auth required) @mcp.custom_route("/health", methods=["GET"]) async def health_check(request) -> Response: """Public health check endpoint - no authentication required. Allows clients to verify the server is running before attempting OAuth. """ try: return JSONResponse({ "status": "ok", "server": "Reddit MCP", "version": "1.0.0", "auth_required": True, "auth_endpoint": "/.well-known/oauth-authorization-server" }) except Exception as e: print(f"ERROR: Health check failed: {e}", flush=True) return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) # Add public server info endpoint (no auth required) @mcp.custom_route("/server-info", methods=["GET"]) async def server_info(request) -> Response: """Public server information endpoint - no authentication required. Provides server metadata and capabilities to help clients understand what authentication and features are available. """ try: print(f"Server info requested from {request.client.host if request.client else 'unknown'}", flush=True) return JSONResponse({ "name": "Reddit MCP", "version": "1.0.0", "description": "Reddit research and analysis tools with semantic subreddit discovery", "authentication": { "required": True, "type": "oauth2", "provider": "descope", "authorization_server": f"{os.getenv('SERVER_URL', 'http://localhost:8000')}/.well-known/oauth-authorization-server" }, "capabilities": { "tools": ["discover_operations", "get_operation_schema", "execute_operation"], "tools_count": 3, "supports_resources": True, "supports_prompts": True, "reddit_operations": { "discover_subreddits": "Semantic search for relevant communities", "search_subreddit": "Search within a specific subreddit", "fetch_posts": "Get posts from a subreddit", "fetch_multiple": "Batch fetch from multiple subreddits", "fetch_comments": "Get complete comment trees" } } }) except Exception as e: print(f"ERROR: Server info request failed: {e}", flush=True) return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) # Initialize Reddit client (will be updated with config when available) reddit = None def initialize_reddit_client(): """Initialize Reddit client with environment config.""" global reddit reddit = get_reddit_client() # Register resources with the new client register_resources(mcp, reddit) # Initialize with environment variables initially try: initialize_reddit_client() except Exception as e: print(f"DEBUG: Reddit init failed: {e}", flush=True) # Three-Layer Architecture Implementation @mcp.tool( description="Discover available Reddit operations and recommended workflows", annotations={"readOnlyHint": True} ) def discover_operations(ctx: Context) -> Dict[str, Any]: """ LAYER 1: Discover what operations this MCP server provides. Start here to understand available capabilities. """ # Phase 1: Accept context but don't use it yet return { "operations": { "discover_subreddits": "Find relevant communities using semantic search", "search_subreddit": "Search for posts within a specific community", "fetch_posts": "Get posts from a single subreddit", "fetch_multiple": "Batch fetch from multiple subreddits (70% more efficient)", "fetch_comments": "Get complete comment tree for deep analysis" }, "recommended_workflows": { "comprehensive_research": [ "discover_subreddits → fetch_multiple → fetch_comments", "Best for: Thorough analysis across communities" ], "targeted_search": [ "discover_subreddits → search_subreddit → fetch_comments", "Best for: Finding specific content in relevant communities" ] }, "next_step": "Use get_operation_schema() to understand requirements" } @mcp.tool( description="Get detailed requirements and parameters for a Reddit operation", annotations={"readOnlyHint": True} ) def get_operation_schema( operation_id: Annotated[str, "Operation ID from discover_operations"], include_examples: Annotated[bool, "Include example parameter values"] = True, ctx: Context = None ) -> Dict[str, Any]: """ LAYER 2: Get parameter requirements for an operation. Use after discover_operations to understand how to call operations. """ # Phase 1: Accept context but don't use it yet schemas = { "discover_subreddits": { "description": "Find communities using semantic vector search", "parameters": { "query": { "type": "string", "required": True, "description": "Topic to find communities for", "validation": "2-100 characters" }, "limit": { "type": "integer", "required": False, "default": 10, "range": [1, 50], "description": "Number of communities to return" }, "include_nsfw": { "type": "boolean", "required": False, "default": False, "description": "Whether to include NSFW communities" } }, "returns": { "subreddits": "Array with confidence scores (0-1)", "quality_indicators": { "good": "5+ subreddits with confidence > 0.7", "poor": "All results below 0.5 confidence" } }, "examples": [] if not include_examples else [ {"query": "machine learning", "limit": 15}, {"query": "python web development", "limit": 10} ] }, "search_subreddit": { "description": "Search for posts within a specific subreddit", "parameters": { "subreddit_name": { "type": "string", "required": True, "description": "Exact subreddit name (without r/ prefix)", "tip": "Use exact name from discover_subreddits" }, "query": { "type": "string", "required": True, "description": "Search terms" }, "sort": { "type": "enum", "options": ["relevance", "hot", "top", "new"], "default": "relevance", "description": "How to sort results" }, "time_filter": { "type": "enum", "options": ["all", "year", "month", "week", "day"], "default": "all", "description": "Time period for results" }, "limit": { "type": "integer", "default": 10, "range": [1, 100], "description": "Maximum number of results" } }, "examples": [] if not include_examples else [ {"subreddit_name": "MachineLearning", "query": "transformers", "limit": 20}, {"subreddit_name": "Python", "query": "async", "sort": "top", "time_filter": "month"} ] }, "fetch_posts": { "description": "Get posts from a single subreddit", "parameters": { "subreddit_name": { "type": "string", "required": True, "description": "Exact subreddit name (without r/ prefix)" }, "listing_type": { "type": "enum", "options": ["hot", "new", "top", "rising"], "default": "hot", "description": "Type of posts to fetch" }, "time_filter": { "type": "enum", "options": ["all", "year", "month", "week", "day"], "default": None, "description": "Time period (only for 'top' listing)" }, "limit": { "type": "integer", "default": 10, "range": [1, 100], "description": "Number of posts to fetch" } }, "examples": [] if not include_examples else [ {"subreddit_name": "technology", "listing_type": "hot", "limit": 15}, {"subreddit_name": "science", "listing_type": "top", "time_filter": "week", "limit": 20} ] }, "fetch_multiple": { "description": "Batch fetch from multiple subreddits efficiently", "parameters": { "subreddit_names": { "type": "array[string]", "required": True, "max_items": 10, "description": "List of subreddit names (without r/ prefix)", "tip": "Use names from discover_subreddits" }, "listing_type": { "type": "enum", "options": ["hot", "new", "top", "rising"], "default": "hot", "description": "Type of posts to fetch" }, "time_filter": { "type": "enum", "options": ["all", "year", "month", "week", "day"], "default": None, "description": "Time period (only for 'top' listing)" }, "limit_per_subreddit": { "type": "integer", "default": 5, "range": [1, 25], "description": "Posts per subreddit" } }, "efficiency": { "vs_individual": "70% fewer API calls", "token_usage": "~500-1000 tokens per subreddit" }, "examples": [] if not include_examples else [ {"subreddit_names": ["Python", "django", "flask"], "listing_type": "hot", "limit_per_subreddit": 5}, {"subreddit_names": ["MachineLearning", "deeplearning"], "listing_type": "top", "time_filter": "week", "limit_per_subreddit": 10} ] }, "fetch_comments": { "description": "Get complete comment tree for a post", "parameters": { "submission_id": { "type": "string", "required_one_of": ["submission_id", "url"], "description": "Reddit post ID (e.g., '1abc234')" }, "url": { "type": "string", "required_one_of": ["submission_id", "url"], "description": "Full Reddit URL to the post" }, "comment_limit": { "type": "integer", "default": 100, "recommendation": "50-100 for analysis", "description": "Maximum comments to fetch" }, "comment_sort": { "type": "enum", "options": ["best", "top", "new"], "default": "best", "description": "How to sort comments" } }, "examples": [] if not include_examples else [ {"submission_id": "1abc234", "comment_limit": 100}, {"url": "https://reddit.com/r/Python/comments/xyz789/", "comment_limit": 50, "comment_sort": "top"} ] } } if operation_id not in schemas: return { "error": f"Unknown operation: {operation_id}", "available": list(schemas.keys()), "hint": "Use discover_operations() first" } return schemas[operation_id] @mcp.tool( description="Execute a Reddit operation with validated parameters" ) async def execute_operation( operation_id: Annotated[str, "Operation to execute"], parameters: Annotated[Dict[str, Any], "Parameters matching the schema"], ctx: Context = None ) -> Dict[str, Any]: """ LAYER 3: Execute a Reddit operation. Only use after getting schema from get_operation_schema. """ # Phase 1: Accept context but don't use it yet # Operation mapping operations = { "discover_subreddits": discover_subreddits, "search_subreddit": search_in_subreddit, "fetch_posts": fetch_subreddit_posts, "fetch_multiple": fetch_multiple_subreddits, "fetch_comments": fetch_submission_with_comments } if operation_id not in operations: return { "success": False, "error": f"Unknown operation: {operation_id}", "available_operations": list(operations.keys()) } try: # Add reddit client and context to params for operations that need them if operation_id in ["search_subreddit", "fetch_posts", "fetch_multiple", "fetch_comments"]: params = {**parameters, "reddit": reddit, "ctx": ctx} else: params = {**parameters, "ctx": ctx} # Execute operation with await for async operations if operation_id in ["discover_subreddits", "fetch_multiple", "fetch_comments"]: result = await operations[operation_id](**params) else: result = operations[operation_id](**params) return { "success": True, "data": result } except Exception as e: return { "success": False, "error": str(e), "recovery": suggest_recovery(operation_id, e) } def suggest_recovery(operation_id: str, error: Exception) -> str: """Helper to suggest recovery actions based on error type.""" error_str = str(error).lower() if "not found" in error_str or "404" in error_str: return "Verify the subreddit name or use discover_subreddits" elif "rate" in error_str or "429" in error_str: return "Rate limited - reduce limit parameter or wait before retrying" elif "private" in error_str or "403" in error_str: return "Subreddit is private - try other communities" elif "invalid" in error_str or "validation" in error_str: return "Check parameters match schema from get_operation_schema" else: return "Check parameters match schema from get_operation_schema" # Research Workflow Prompt Template RESEARCH_WORKFLOW_PROMPT = """ You are conducting comprehensive Reddit research based on this request: "{research_request}" ## WORKFLOW TO FOLLOW: ### PHASE 1: DISCOVERY 1. First, call discover_operations() to see available operations 2. Then call get_operation_schema("discover_subreddits") to understand the parameters 3. Extract the key topic/question from the research request and execute: execute_operation("discover_subreddits", {{"query": "<topic from request>", "limit": 15}}) 4. Note the confidence scores for each discovered subreddit ### PHASE 2: STRATEGY SELECTION Based on confidence scores from discovery: - **High confidence (>0.7)**: Focus on top 5-8 most relevant subreddits - **Medium confidence (0.4-0.7)**: Cast wider net with 10-12 subreddits - **Low confidence (<0.4)**: Refine search terms and retry discovery ### PHASE 3: GATHER POSTS Use batch operation for efficiency: execute_operation("fetch_multiple", {{ "subreddit_names": [<list from discovery>], "listing_type": "top", "time_filter": "year", "limit_per_subreddit": 10 }}) ### PHASE 4: DEEP DIVE INTO DISCUSSIONS For posts with high engagement (10+ comments, 5+ upvotes): execute_operation("fetch_comments", {{ "submission_id": "<post_id>", "comment_limit": 100, "comment_sort": "best" }}) Target: Analyze 100+ total comments across 10+ subreddits ### PHASE 5: SYNTHESIZE FINDINGS Create a comprehensive report that directly addresses the research request: # Research Report: {research_request} *Generated: {timestamp}* ## Executive Summary - Direct answer to the research question - Key findings with confidence levels - Coverage metrics: X subreddits, Y posts, Z comments analyzed ## Communities Analyzed | Subreddit | Subscribers | Relevance Score | Posts Analyzed | Key Insights | |-----------|------------|-----------------|----------------|--------------| | [data] | [count] | [0.0-1.0] | [count] | [summary] | ## Key Findings ### [Finding that directly addresses the research request] **Community Consensus**: [Strong/Moderate/Split/Emerging] Evidence from Reddit: - u/[username] in r/[subreddit] stated: "exact quote" [↑450](https://reddit.com/r/subreddit/comments/abc123/) - Discussion with 200+ comments shows... [link](url) - Highly awarded post argues... [↑2.3k, Gold×3](url) ### [Additional relevant findings...] [Continue with 2-4 more key findings that answer different aspects of the research request] ## Temporal Trends - How perspectives have evolved over time - Recent shifts in community sentiment - Emerging viewpoints in the last 30 days ## Notable Perspectives - Expert opinions (verified flairs, high karma users 10k+) - Contrarian views worth considering - Common misconceptions identified ## Data Quality Metrics - Total subreddits analyzed: [count] - Total posts reviewed: [count] - Total comments analyzed: [count] - Unique contributors: [count] - Date range: [oldest] to [newest] - Average post score: [score] - High-karma contributors (10k+): [count] ## Limitations - Geographic/language bias (primarily English-speaking communities) - Temporal coverage (data from [date range]) - Communities not represented in analysis --- *Research methodology: Semantic discovery across 20,000+ indexed subreddits, followed by deep analysis of high-engagement discussions* CRITICAL REQUIREMENTS: - Never fabricate Reddit content - only cite actual posts/comments from the data - Every claim must link to its Reddit source with a clickable URL - Include upvote counts and awards for credibility assessment - Note when content is [deleted] or [removed] - Track temporal context (when was this posted?) - Answer the specific research request - don't just summarize content """ @mcp.prompt( name="reddit_research", description="Conduct comprehensive Reddit research on any topic or question", tags={"research", "analysis", "comprehensive"} ) def reddit_research(research_request: str) -> List[Message]: """ Guides comprehensive Reddit research based on a natural language request. Args: research_request: Natural language description of what to research Examples: "How do people feel about remote work?", "Best practices for Python async programming", "Community sentiment on electric vehicles" Returns: Structured messages guiding the LLM through the complete research workflow """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M UTC") return [ Message( role="assistant", content=RESEARCH_WORKFLOW_PROMPT.format( research_request=research_request, timestamp=timestamp ) ), Message( role="user", content=f"Please conduct comprehensive Reddit research to answer: {research_request}" ) ] def main(): """Main entry point for the server.""" print("Reddit MCP Server starting...", flush=True) # Try to initialize the Reddit client with available configuration try: initialize_reddit_client() print("Reddit client initialized successfully", flush=True) except Exception as e: print(f"WARNING: Failed to initialize Reddit client: {e}", flush=True) print("Server will run with limited functionality.", flush=True) print("\nPlease provide Reddit API credentials via:", flush=True) print(" 1. Environment variables: REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT", flush=True) print(" 2. Config file: .mcp-config.json", flush=True) # Run with stdio transport mcp.run() if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /specs/agentic-discovery-architecture.md: -------------------------------------------------------------------------------- ```markdown # Agentic Discovery Architecture with OpenAI Agents SDK ## Overview This document outlines the refactoring of the monolithic `discover.py` tool into a modular, agentic architecture using OpenAI's Python Agents SDK. Each agent has a single, well-defined responsibility and can hand off to other specialized agents as needed. ### Why Agentic Architecture? The current monolithic `discover.py` file (400+ lines) combines multiple concerns: - Query processing and analysis - API interaction and error handling - Scoring and ranking algorithms - Result formatting and synthesis - Batch operations management This creates several problems: 1. **Testing Complexity**: Can't test scoring without API calls 2. **Limited Reusability**: Can't use validation logic elsewhere 3. **Performance Issues**: Sequential processing of batch requests 4. **Maintenance Burden**: Changes risk breaking unrelated functionality 5. **Scaling Challenges**: Adding features requires modifying core logic The agentic approach solves these issues by decomposing functionality into specialized, autonomous agents that collaborate through well-defined interfaces. ## Architecture Principles 1. **Single Responsibility**: Each agent performs one specific task excellently 2. **Composability**: Agents can be combined in different ways for various workflows 3. **Testability**: Each agent can be tested in isolation 4. **Observability**: Full tracing of agent decision-making process 5. **Efficiency**: Smart routing and parallel execution where possible ## Directory Structure ``` reddit-research-mcp/src/ ├── agents/ │ ├── __init__.py │ ├── discovery_orchestrator.py │ ├── query_analyzer.py │ ├── subreddit_scorer.py │ ├── search_executor.py │ ├── batch_manager.py │ ├── validator.py │ └── synthesizer.py ├── models/ │ ├── __init__.py │ ├── discovery_context.py │ └── discovery_models.py ├── tools/ │ └── discover_agent.py ``` ## Agent Specifications ### 1. Discovery Orchestrator Agent **File**: `agents/discovery_orchestrator.py` **Purpose**: Routes discovery requests to the appropriate specialized agent based on query type and requirements. **Why This Agent?** The Discovery Orchestrator serves as the intelligent entry point that prevents inefficient processing. In the monolithic approach, every query goes through the same pipeline regardless of complexity. This agent enables: - **Smart Routing**: Simple queries skip unnecessary analysis steps - **Resource Optimization**: Uses appropriate agents based on query complexity - **Error Isolation**: Failures in one path don't affect others - **Scalability**: New discovery strategies can be added without modifying core logic **Architectural Role**: - **Entry Point**: First agent in every discovery workflow - **Traffic Director**: Routes to specialized agents based on intent - **Fallback Handler**: Manages errors and edge cases gracefully - **Performance Optimizer**: Chooses fastest path for each query type **Problem Solved**: The monolithic `discover.py` processes all queries identically, wasting resources on simple validations and lacking optimization for batch operations. The orchestrator eliminates this inefficiency. **Key Interactions**: - **Receives**: Raw discovery requests from the main entry point - **Delegates To**: Query Analyzer (complex), Batch Manager (multiple), Validator (verification), Search Executor (simple) - **Returns**: Final results from delegated agents **Key Responsibilities**: - Analyze incoming discovery requests - Determine optimal discovery strategy - Route to appropriate specialized agent - Handle edge cases and errors gracefully **Model**: `gpt-4o-mini` (lightweight routing decisions) **Handoffs**: - Query Analyzer (for complex queries) - Batch Manager (for multiple queries) - Validator (for direct validation) - Search Executor (for simple searches) **Implementation**: ```python from agents import Agent, handoff from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX discovery_orchestrator = Agent[DiscoveryContext]( name="Discovery Orchestrator", instructions=f"""{RECOMMENDED_PROMPT_PREFIX} You are a routing agent for Reddit discovery requests. Analyze the incoming request and determine the best path: - Complex queries needing analysis → Query Analyzer - Batch/multiple queries → Batch Manager - Direct subreddit validation → Validator - Simple searches → Search Executor Consider efficiency and accuracy when routing. """, model="gpt-4o-mini", handoffs=[query_analyzer, batch_manager, validator, search_executor] ) ``` ### 2. Query Analyzer Agent **File**: `agents/query_analyzer.py` **Purpose**: Analyzes and enhances search queries for better results. **Why This Agent?** Reddit's search API is notoriously limited and literal. The Query Analyzer transforms vague or complex user queries into optimized search strategies. This agent provides: - **Semantic Understanding**: Interprets user intent beyond literal keywords - **Query Expansion**: Adds synonyms and related terms for comprehensive results - **Search Strategy**: Determines best approach (broad vs. specific search) - **Intent Classification**: Distinguishes between topic exploration vs. specific community search **Architectural Role**: - **Query Preprocessor**: Enhances queries before they hit the Reddit API - **Intent Detector**: Classifies what the user is really looking for - **Strategy Advisor**: Recommends search approaches to downstream agents - **NLP Specialist**: Applies language understanding to improve results **Problem Solved**: The monolithic approach uses raw queries directly, leading to poor results when users use natural language or ambiguous terms. This agent bridges the gap between human expression and API requirements. **Key Interactions**: - **Receives From**: Discovery Orchestrator (complex queries) - **Processes**: Raw user queries into structured search plans - **Hands Off To**: Search Executor (with enhanced query and strategy) - **Provides**: Keywords, expanded terms, and intent classification **Key Responsibilities**: - Extract keywords and intent - Expand query with related terms - Classify query type (topic, community, specific) - Generate search strategies **Tools**: ```python @function_tool def extract_keywords(wrapper: RunContextWrapper[DiscoveryContext], text: str) -> List[str]: """Extract meaningful keywords from query text.""" # Implementation from current discover.py @function_tool def expand_query(wrapper: RunContextWrapper[DiscoveryContext], query: str) -> QueryExpansion: """Expand query with synonyms and related terms.""" # Generate variations and related terms @function_tool def classify_intent(wrapper: RunContextWrapper[DiscoveryContext], query: str) -> QueryIntent: """Classify the intent behind the query.""" # Return: topic_search, community_search, validation, etc. ``` **Output Type**: ```python class AnalyzedQuery(BaseModel): original: str keywords: List[str] expanded_terms: List[str] intent: QueryIntent suggested_strategy: str confidence: float ``` **Model**: `gpt-4o` (complex language understanding) **Handoffs**: Search Executor (with enhanced query) ### 3. Subreddit Scorer Agent **File**: `agents/subreddit_scorer.py` **Purpose**: Scores and ranks subreddit relevance with detailed confidence metrics. **Why This Agent?** Reddit's search API returns results in arbitrary order with many false positives. The Subreddit Scorer applies sophisticated ranking algorithms to surface the most relevant communities. This agent provides: - **Multi-Factor Scoring**: Combines name match, description relevance, and activity levels - **False Positive Detection**: Identifies and penalizes misleading matches - **Confidence Metrics**: Provides transparency about why results are ranked - **Activity Weighting**: Prioritizes active communities over dead ones **Architectural Role**: - **Quality Filter**: Ensures only relevant results reach the user - **Ranking Engine**: Orders results by true relevance, not API defaults - **Confidence Calculator**: Provides scoring transparency - **Post-Processor**: Refines raw search results into useful recommendations **Problem Solved**: The monolithic approach has scoring logic embedded throughout, making it hard to tune or test. False positives (like "pythonball" for "python") pollute results. This agent centralizes and perfects scoring logic. **Key Interactions**: - **Receives From**: Search Executor (raw search results) - **Processes**: Unranked subreddits into scored, ranked list - **Sends To**: Result Synthesizer (for final formatting) - **Collaborates With**: Batch Manager (for scoring multiple search results) **Key Responsibilities**: - Calculate name match scores - Evaluate description relevance - Assess community activity - Apply penalties for false positives - Generate confidence scores **Tools**: ```python @function_tool def calculate_name_match(wrapper: RunContextWrapper[DiscoveryContext], subreddit_name: str, query: str) -> float: """Calculate how well subreddit name matches query.""" # Implementation from current discover.py @function_tool def calculate_description_score(wrapper: RunContextWrapper[DiscoveryContext], description: str, query: str) -> float: """Score based on query presence in description.""" # Implementation from current discover.py @function_tool def calculate_activity_score(wrapper: RunContextWrapper[DiscoveryContext], subscribers: int) -> float: """Score based on community size and activity.""" # Implementation from current discover.py @function_tool def calculate_penalties(wrapper: RunContextWrapper[DiscoveryContext], subreddit_name: str, query: str) -> float: """Apply penalties for likely false positives.""" # Implementation from current discover.py ``` **Output Type**: ```python class ScoredSubreddit(BaseModel): name: str confidence: float match_type: str score_breakdown: Dict[str, float] ranking: int ``` **Model**: `gpt-4o-mini` (mathematical calculations) **Tool Use Behavior**: `stop_on_first_tool` (direct scoring results) ### 4. Search Executor Agent **File**: `agents/search_executor.py` **Purpose**: Executes Reddit API searches efficiently with error handling. **Why This Agent?** Direct API interaction requires careful error handling, rate limit management, and caching. The Search Executor isolates all Reddit API complexity from other agents. This agent provides: - **API Abstraction**: Other agents don't need to know Reddit API details - **Error Resilience**: Handles rate limits, timeouts, and API failures gracefully - **Caching Layer**: Prevents redundant API calls for identical queries - **Result Validation**: Ensures data integrity before passing downstream **Architectural Role**: - **API Gateway**: Single point of contact with Reddit API - **Error Handler**: Manages all API-related failures and retries - **Cache Manager**: Stores and retrieves recent search results - **Data Validator**: Ensures results are complete and valid **Problem Solved**: The monolithic approach mixes API calls with business logic, making it hard to handle errors consistently or implement caching. This agent centralizes all API interaction concerns. **Key Interactions**: - **Receives From**: Query Analyzer (enhanced queries) or Orchestrator (simple queries) - **Interacts With**: Reddit API via PRAW client - **Sends To**: Subreddit Scorer (for ranking) - **Caches**: Results in context for reuse by other agents **Key Responsibilities**: - Execute Reddit API search calls - Handle API errors and rate limits - Validate returned results - Cache results for efficiency **Tools**: ```python @function_tool async def search_reddit(wrapper: RunContextWrapper[DiscoveryContext], query: str, limit: int = 250) -> List[RawSubreddit]: """Execute Reddit search API call.""" reddit = wrapper.context.reddit_client results = [] for subreddit in reddit.subreddits.search(query, limit=limit): results.append(RawSubreddit.from_praw(subreddit)) return results @function_tool def handle_api_error(wrapper: RunContextWrapper[DiscoveryContext], error: Exception) -> ErrorStrategy: """Determine how to handle API errors.""" # Retry logic, fallback strategies, etc. ``` **Output Type**: ```python class SearchResults(BaseModel): query: str results: List[RawSubreddit] total_found: int api_calls: int cached: bool errors: List[str] ``` **Model**: `gpt-4o-mini` (simple execution) **Handoffs**: Subreddit Scorer (for ranking results) ### 5. Batch Discovery Manager Agent **File**: `agents/batch_manager.py` **Purpose**: Manages batch discovery operations for multiple queries. **Why This Agent?** Users often need to discover communities across multiple related topics. The Batch Manager orchestrates parallel searches efficiently. This agent provides: - **Parallel Execution**: Runs multiple searches concurrently for speed - **Deduplication**: Removes duplicate subreddits across different searches - **API Optimization**: Minimizes total API calls through smart batching - **Result Aggregation**: Combines multiple search results intelligently **Architectural Role**: - **Parallel Coordinator**: Manages multiple Search Executor instances - **Resource Manager**: Optimizes API usage across batch operations - **Result Aggregator**: Merges and deduplicates results from multiple searches - **Performance Optimizer**: Ensures batch operations complete quickly **Problem Solved**: The monolithic approach processes batch queries sequentially, leading to slow performance. It also lacks sophisticated deduplication and aggregation logic for multiple searches. **Key Interactions**: - **Receives From**: Discovery Orchestrator (batch requests) - **Spawns**: Multiple Search Executor agents in parallel - **Coordinates**: Parallel execution and result collection - **Sends To**: Result Synthesizer (aggregated results) **Key Responsibilities**: - Coordinate multiple search operations - Optimize API calls through batching - Aggregate results from multiple searches - Manage parallel execution **Tools**: ```python @function_tool async def coordinate_batch(wrapper: RunContextWrapper[DiscoveryContext], queries: List[str]) -> BatchPlan: """Plan optimal batch execution strategy.""" # Determine parallelization, caching opportunities @function_tool def merge_batch_results(wrapper: RunContextWrapper[DiscoveryContext], results: List[SearchResults]) -> BatchResults: """Merge results from multiple searches.""" # Deduplicate, aggregate, summarize ``` **Model**: `gpt-4o` (complex coordination) **Handoffs**: Multiple Search Executor agents (in parallel) **Implementation Note**: Uses dynamic handoff creation for parallel execution ### 6. Subreddit Validator Agent **File**: `agents/validator.py` **Purpose**: Validates subreddit existence and accessibility. **Why This Agent?** Users often have specific subreddit names that need verification. The Validator provides quick, focused validation without the overhead of full search. This agent provides: - **Direct Validation**: Checks specific subreddit names efficiently - **Access Verification**: Confirms subreddits are public and accessible - **Alternative Suggestions**: Recommends similar communities if validation fails - **Metadata Retrieval**: Gets detailed info about valid subreddits **Architectural Role**: - **Verification Specialist**: Focused solely on validation tasks - **Fast Path**: Provides quick responses for known subreddit names - **Fallback Provider**: Suggests alternatives when validation fails - **Metadata Fetcher**: Retrieves comprehensive subreddit information **Problem Solved**: The monolithic approach treats validation as a special case of search, which is inefficient. Users waiting to verify "r/python" shouldn't trigger a full search pipeline. **Key Interactions**: - **Receives From**: Discovery Orchestrator (direct validation requests) - **Validates**: Specific subreddit names via Reddit API - **Returns**: Validation status with metadata or alternatives - **May Trigger**: Search Executor (to find alternatives if validation fails) **Key Responsibilities**: - Check if subreddit exists - Verify accessibility (not private/banned) - Get detailed subreddit information - Suggest alternatives if invalid **Tools**: ```python @function_tool def validate_subreddit(wrapper: RunContextWrapper[DiscoveryContext], subreddit_name: str) -> ValidationResult: """Validate if subreddit exists and is accessible.""" # Implementation from current discover.py @function_tool def get_subreddit_info(wrapper: RunContextWrapper[DiscoveryContext], subreddit_name: str) -> SubredditInfo: """Get detailed information about a subreddit.""" # Fetch all metadata ``` **Output Type**: ```python class ValidationResult(BaseModel): valid: bool name: str reason: Optional[str] info: Optional[SubredditInfo] suggestions: List[str] ``` **Model**: `gpt-4o-mini` (simple validation) ### 7. Result Synthesizer Agent **File**: `agents/synthesizer.py` **Purpose**: Synthesizes and formats final discovery results. **Why This Agent?** Raw scored results need intelligent synthesis to be truly useful. The Result Synthesizer transforms data into actionable insights. This agent provides: - **Intelligent Summarization**: Creates meaningful summaries from result patterns - **Actionable Recommendations**: Suggests next steps based on results - **Flexible Formatting**: Adapts output format to use case - **Insight Generation**: Identifies patterns and relationships in results **Architectural Role**: - **Final Processor**: Last agent before results return to user - **Insight Generator**: Transforms data into understanding - **Format Adapter**: Ensures results match expected output format - **Recommendation Engine**: Provides actionable next steps **Problem Solved**: The monolithic approach mixes result formatting throughout the code, making it hard to maintain consistent output or add new insights. This agent centralizes all presentation logic. **Key Interactions**: - **Receives From**: Subreddit Scorer or Batch Manager (scored/aggregated results) - **Synthesizes**: Raw data into formatted, insightful output - **Generates**: Summaries, recommendations, and metadata - **Returns**: Final formatted results to the orchestrator **Key Responsibilities**: - Format results for presentation - Generate summaries and insights - Create recommendations - Add metadata and next actions **Tools**: ```python @function_tool def format_results(wrapper: RunContextWrapper[DiscoveryContext], results: List[ScoredSubreddit]) -> FormattedResults: """Format results for final output.""" # Structure for easy consumption @function_tool def generate_recommendations(wrapper: RunContextWrapper[DiscoveryContext], results: FormattedResults) -> List[str]: """Generate actionable recommendations.""" # Next steps, additional searches, etc. ``` **Output Type**: ```python class DiscoveryOutput(BaseModel): results: List[FormattedSubreddit] summary: DiscoverySummary recommendations: List[str] metadata: DiscoveryMetadata ``` **Model**: `gpt-4o` (synthesis and insights) ## Agent Collaboration Workflow ### Example: Complex Query Discovery When a user searches for "machine learning communities for beginners": 1. **Discovery Orchestrator** receives request, identifies complexity, routes to Query Analyzer 2. **Query Analyzer** extracts keywords ["machine learning", "beginners", "ML", "learn"], expands query, identifies intent as "topic_search" 3. **Search Executor** runs enhanced searches for each term variation 4. **Subreddit Scorer** ranks results, penalizing advanced communities, boosting beginner-friendly ones 5. **Result Synthesizer** formats top results with recommendations for getting started ### Example: Batch Validation When validating multiple subreddit names ["r/python", "r/datascience", "r/doesnotexist"]: 1. **Discovery Orchestrator** identifies validation request, routes to Batch Manager 2. **Batch Manager** spawns three parallel Validator agents 3. **Validators** check each subreddit simultaneously 4. **Result Synthesizer** aggregates validation results, suggests alternatives for invalid entries ## Shared Models and Context ### Discovery Context **File**: `models/discovery_context.py` ```python from dataclasses import dataclass import praw from typing import Dict, Any, Optional @dataclass class DiscoveryContext: reddit_client: praw.Reddit query_metadata: Optional[QueryMetadata] = None discovery_config: DiscoveryConfig = field(default_factory=DiscoveryConfig) api_call_counter: int = 0 cache: Dict[str, Any] = field(default_factory=dict) @dataclass class QueryMetadata: original_query: str intent: str timestamp: float user_preferences: Dict[str, Any] @dataclass class DiscoveryConfig: include_nsfw: bool = False max_api_calls: int = 10 cache_ttl: int = 300 default_limit: int = 10 ``` ### Discovery Models **File**: `models/discovery_models.py` ```python from pydantic import BaseModel from typing import List, Dict, Optional, Literal class QueryIntent(BaseModel): type: Literal["topic_search", "community_search", "validation", "batch"] confidence: float class RawSubreddit(BaseModel): name: str title: str description: str subscribers: int over_18: bool created_utc: float url: str @classmethod def from_praw(cls, subreddit): """Create from PRAW subreddit object.""" return cls( name=subreddit.display_name, title=subreddit.title, description=subreddit.public_description[:100], subscribers=subreddit.subscribers, over_18=subreddit.over18, created_utc=subreddit.created_utc, url=f"https://reddit.com/r/{subreddit.display_name}" ) class ConfidenceScore(BaseModel): overall: float name_match: float description_match: float activity_score: float penalties: float class DiscoverySummary(BaseModel): total_found: int returned: int coverage: Literal["comprehensive", "good", "partial", "limited"] top_by_confidence: List[str] confidence_distribution: Dict[str, int] ``` ## Main Entry Point ### Discover Agent Tool **File**: `tools/discover_agent.py` ```python from agents import Agent, Runner from src.models.discovery_context import DiscoveryContext from src.agents import discovery_orchestrator import praw async def discover_subreddits_agent( query: Optional[str] = None, queries: Optional[List[str]] = None, reddit: praw.Reddit = None, limit: int = 10, include_nsfw: bool = False ) -> DiscoveryOutput: """ Agentic version of discover_subreddits using OpenAI Agents SDK. Maintains backward compatibility with existing interface. """ # Initialize context context = DiscoveryContext( reddit_client=reddit, discovery_config=DiscoveryConfig( include_nsfw=include_nsfw, default_limit=limit ) ) # Prepare input if queries: input_text = f"Batch discovery for queries: {queries}" else: input_text = f"Discover subreddits for: {query}" # Run discovery through orchestrator result = await Runner.run( starting_agent=discovery_orchestrator, input=input_text, context=context, run_config=RunConfig( max_turns=20, workflow_name="Reddit Discovery", trace_metadata={"query": query or queries} ) ) return result.final_output ``` ## Implementation Strategy ### Phase 1: Foundation (Week 1) 1. Set up project structure and dependencies 2. Create base models and context objects 3. Implement Search Executor and Validator agents 4. Basic integration tests ### Phase 2: Core Agents (Week 2) 1. Implement Query Analyzer with NLP tools 2. Create Subreddit Scorer with confidence metrics 3. Build Result Synthesizer 4. Add comprehensive testing ### Phase 3: Orchestration (Week 3) 1. Implement Discovery Orchestrator with routing logic 2. Create Batch Manager for parallel execution 3. Add handoff patterns and error handling 4. Integration with existing MCP server ### Phase 4: Optimization (Week 4) 1. Add caching layer 2. Optimize model selection per agent 3. Implement tracing and monitoring 4. Performance testing and tuning ## Benefits Over Current Implementation 1. **Modularity**: Each agent is independent and focused 2. **Scalability**: Easy to add new discovery strategies 3. **Observability**: Full tracing of decision process 4. **Testability**: Each agent can be unit tested 5. **Flexibility**: Agents can be reused in different workflows 6. **Performance**: Parallel execution and smart caching 7. **Maintainability**: Clear separation of concerns ## Migration Path 1. **Parallel Development**: Build new system alongside existing 2. **Feature Flag**: Toggle between old and new implementation 3. **Gradual Rollout**: Test with subset of queries first 4. **Backward Compatible**: Same interface as current discover.py 5. **Monitoring**: Compare results between old and new ## Testing Strategy ### Unit Tests - Each agent tested independently - Mock Reddit client and context - Test all tools and handoffs ### Integration Tests - End-to-end discovery workflows - Multiple query types - Error scenarios ### Performance Tests - API call optimization - Caching effectiveness - Parallel execution benefits ## Monitoring and Observability 1. **Tracing**: Full agent decision tree 2. **Metrics**: API calls, latency, cache hits 3. **Logging**: Structured logs per agent 4. **Debugging**: Replay agent conversations ## Future Enhancements 1. **Learning**: Agents improve from feedback 2. **Personalization**: User-specific discovery preferences 3. **Advanced NLP**: Better query understanding 4. **Community Graph**: Relationship mapping between subreddits 5. **Trend Detection**: Identify emerging communities ## Conclusion This agentic architecture transforms the monolithic discover.py into a flexible, scalable system of specialized agents. Each agent excels at its specific task while the orchestrator ensures optimal routing and efficiency. The result is a more maintainable, testable, and powerful discovery system that can evolve with changing requirements. ```