This is page 3 of 3. Use http://codebase.md/tokidoo/crawl4ai-rag-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .gitattributes
├── .gitignore
├── Caddyfile
├── crawled_pages.sql
├── docker-compose.yml
├── Dockerfile
├── knowledge_graphs
│ ├── ai_hallucination_detector.py
│ ├── ai_script_analyzer.py
│ ├── hallucination_reporter.py
│ ├── knowledge_graph_validator.py
│ ├── parse_repo_into_neo4j.py
│ ├── query_knowledge_graph.py
│ └── test_script.py
├── LICENSE
├── pyproject.toml
├── README.md
├── searxng
│ ├── limiter.toml
│ └── settings.yml
├── src
│ ├── crawl4ai_mcp.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/crawl4ai_mcp.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | MCP server for web crawling with Crawl4AI.
3 |
4 | This server provides tools to crawl websites using Crawl4AI, automatically detecting
5 | the appropriate crawl method based on URL type (sitemap, txt file, or regular webpage).
6 | Also includes AI hallucination detection and repository parsing tools using Neo4j knowledge graphs.
7 | """
8 | from mcp.server.fastmcp import FastMCP, Context
9 | from sentence_transformers import CrossEncoder
10 | from contextlib import asynccontextmanager
11 | from collections.abc import AsyncIterator
12 | from dataclasses import dataclass
13 | from typing import List, Dict, Any, Optional, Union
14 | from urllib.parse import urlparse, urldefrag
15 | from xml.etree import ElementTree
16 | from dotenv import load_dotenv
17 | from supabase import Client
18 | from pathlib import Path
19 | import requests
20 | import asyncio
21 | import json
22 | import os
23 | import re
24 | import concurrent.futures
25 | import sys
26 | import time
27 |
28 | from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher
29 |
30 | # Add knowledge_graphs folder to path for importing knowledge graph modules
31 | knowledge_graphs_path = Path(__file__).resolve().parent.parent / 'knowledge_graphs'
32 | sys.path.append(str(knowledge_graphs_path))
33 |
34 | from utils import (
35 | get_supabase_client,
36 | add_documents_to_supabase,
37 | search_documents,
38 | extract_code_blocks,
39 | generate_code_example_summary,
40 | add_code_examples_to_supabase,
41 | update_source_info,
42 | extract_source_summary,
43 | search_code_examples
44 | )
45 |
46 | # Import knowledge graph modules
47 | from knowledge_graph_validator import KnowledgeGraphValidator
48 | from parse_repo_into_neo4j import DirectNeo4jExtractor
49 | from ai_script_analyzer import AIScriptAnalyzer
50 | from hallucination_reporter import HallucinationReporter
51 |
52 | # Load environment variables from the project root .env file
53 | project_root = Path(__file__).resolve().parent.parent
54 | dotenv_path = project_root / '.env'
55 |
56 | # Force override of existing environment variables
57 | load_dotenv(dotenv_path, override=True)
58 |
59 | # Helper functions for Neo4j validation and error handling
60 | def validate_neo4j_connection() -> bool:
61 | """Check if Neo4j environment variables are configured."""
62 | return all([
63 | os.getenv("NEO4J_URI"),
64 | os.getenv("NEO4J_USER"),
65 | os.getenv("NEO4J_PASSWORD")
66 | ])
67 |
68 | def format_neo4j_error(error: Exception) -> str:
69 | """Format Neo4j connection errors for user-friendly messages."""
70 | error_str = str(error).lower()
71 | if "authentication" in error_str or "unauthorized" in error_str:
72 | return "Neo4j authentication failed. Check NEO4J_USER and NEO4J_PASSWORD."
73 | elif "connection" in error_str or "refused" in error_str or "timeout" in error_str:
74 | return "Cannot connect to Neo4j. Check NEO4J_URI and ensure Neo4j is running."
75 | elif "database" in error_str:
76 | return "Neo4j database error. Check if the database exists and is accessible."
77 | else:
78 | return f"Neo4j error: {str(error)}"
79 |
80 | def validate_script_path(script_path: str) -> Dict[str, Any]:
81 | """Validate script path and return error info if invalid."""
82 | if not script_path or not isinstance(script_path, str):
83 | return {"valid": False, "error": "Script path is required"}
84 |
85 | if not os.path.exists(script_path):
86 | return {"valid": False, "error": f"Script not found: {script_path}"}
87 |
88 | if not script_path.endswith('.py'):
89 | return {"valid": False, "error": "Only Python (.py) files are supported"}
90 |
91 | try:
92 | # Check if file is readable
93 | with open(script_path, 'r', encoding='utf-8') as f:
94 | f.read(1) # Read first character to test
95 | return {"valid": True}
96 | except Exception as e:
97 | return {"valid": False, "error": f"Cannot read script file: {str(e)}"}
98 |
99 | def validate_github_url(repo_url: str) -> Dict[str, Any]:
100 | """Validate GitHub repository URL."""
101 | if not repo_url or not isinstance(repo_url, str):
102 | return {"valid": False, "error": "Repository URL is required"}
103 |
104 | repo_url = repo_url.strip()
105 |
106 | # Basic GitHub URL validation
107 | if not ("github.com" in repo_url.lower() or repo_url.endswith(".git")):
108 | return {"valid": False, "error": "Please provide a valid GitHub repository URL"}
109 |
110 | # Check URL format
111 | if not (repo_url.startswith("https://") or repo_url.startswith("git@")):
112 | return {"valid": False, "error": "Repository URL must start with https:// or git@"}
113 |
114 | return {"valid": True, "repo_name": repo_url.split('/')[-1].replace('.git', '')}
115 |
116 | # Create a dataclass for our application context
117 | @dataclass
118 | class Crawl4AIContext:
119 | """Context for the Crawl4AI MCP server."""
120 | crawler: AsyncWebCrawler
121 | supabase_client: Client
122 | reranking_model: Optional[CrossEncoder] = None
123 | knowledge_validator: Optional[Any] = None # KnowledgeGraphValidator when available
124 | repo_extractor: Optional[Any] = None # DirectNeo4jExtractor when available
125 |
126 | @asynccontextmanager
127 | async def crawl4ai_lifespan(server: FastMCP) -> AsyncIterator[Crawl4AIContext]:
128 | """
129 | Manages the Crawl4AI client lifecycle.
130 |
131 | Args:
132 | server: The FastMCP server instance
133 |
134 | Yields:
135 | Crawl4AIContext: The context containing the Crawl4AI crawler and Supabase client
136 | """
137 | # Create browser configuration
138 | browser_config = BrowserConfig(
139 | headless=True,
140 | verbose=False
141 | )
142 |
143 | # Initialize the crawler
144 | crawler = AsyncWebCrawler(config=browser_config)
145 | await crawler.__aenter__()
146 |
147 | # Initialize Supabase client
148 | supabase_client = get_supabase_client()
149 |
150 | # Initialize cross-encoder model for reranking if enabled
151 | reranking_model = None
152 | if os.getenv("USE_RERANKING", "false") == "true":
153 | try:
154 | reranking_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
155 | except Exception as e:
156 | print(f"Failed to load reranking model: {e}")
157 | reranking_model = None
158 |
159 | # Initialize Neo4j components if configured and enabled
160 | knowledge_validator = None
161 | repo_extractor = None
162 |
163 | # Check if knowledge graph functionality is enabled
164 | knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true"
165 |
166 | if knowledge_graph_enabled:
167 | neo4j_uri = os.getenv("NEO4J_URI")
168 | neo4j_user = os.getenv("NEO4J_USER")
169 | neo4j_password = os.getenv("NEO4J_PASSWORD")
170 |
171 | if neo4j_uri and neo4j_user and neo4j_password:
172 | try:
173 | print("Initializing knowledge graph components...")
174 |
175 | # Initialize knowledge graph validator
176 | knowledge_validator = KnowledgeGraphValidator(neo4j_uri, neo4j_user, neo4j_password)
177 | await knowledge_validator.initialize()
178 | print("✓ Knowledge graph validator initialized")
179 |
180 | # Initialize repository extractor
181 | repo_extractor = DirectNeo4jExtractor(neo4j_uri, neo4j_user, neo4j_password)
182 | await repo_extractor.initialize()
183 | print("✓ Repository extractor initialized")
184 |
185 | except Exception as e:
186 | print(f"Failed to initialize Neo4j components: {format_neo4j_error(e)}")
187 | knowledge_validator = None
188 | repo_extractor = None
189 | else:
190 | print("Neo4j credentials not configured - knowledge graph tools will be unavailable")
191 | else:
192 | print("Knowledge graph functionality disabled - set USE_KNOWLEDGE_GRAPH=true to enable")
193 |
194 | try:
195 | yield Crawl4AIContext(
196 | crawler=crawler,
197 | supabase_client=supabase_client,
198 | reranking_model=reranking_model,
199 | knowledge_validator=knowledge_validator,
200 | repo_extractor=repo_extractor
201 | )
202 | finally:
203 | # Clean up all components
204 | await crawler.__aexit__(None, None, None)
205 | if knowledge_validator:
206 | try:
207 | await knowledge_validator.close()
208 | print("✓ Knowledge graph validator closed")
209 | except Exception as e:
210 | print(f"Error closing knowledge validator: {e}")
211 | if repo_extractor:
212 | try:
213 | await repo_extractor.close()
214 | print("✓ Repository extractor closed")
215 | except Exception as e:
216 | print(f"Error closing repository extractor: {e}")
217 |
218 | # Initialize FastMCP server
219 | mcp = FastMCP(
220 | "mcp-crawl4ai-rag",
221 | description="MCP server for RAG and web crawling with Crawl4AI",
222 | lifespan=crawl4ai_lifespan,
223 | host=os.getenv("HOST", "0.0.0.0"),
224 | port=os.getenv("PORT", "8051")
225 | )
226 |
227 | def rerank_results(model: CrossEncoder, query: str, results: List[Dict[str, Any]], content_key: str = "content") -> List[Dict[str, Any]]:
228 | """
229 | Rerank search results using a cross-encoder model.
230 |
231 | Args:
232 | model: The cross-encoder model to use for reranking
233 | query: The search query
234 | results: List of search results
235 | content_key: The key in each result dict that contains the text content
236 |
237 | Returns:
238 | Reranked list of results
239 | """
240 | if not model or not results:
241 | return results
242 |
243 | try:
244 | # Extract content from results
245 | texts = [result.get(content_key, "") for result in results]
246 |
247 | # Create pairs of [query, document] for the cross-encoder
248 | pairs = [[query, text] for text in texts]
249 |
250 | # Get relevance scores from the cross-encoder
251 | scores = model.predict(pairs)
252 |
253 | # Add scores to results and sort by score (descending)
254 | for i, result in enumerate(results):
255 | result["rerank_score"] = float(scores[i])
256 |
257 | # Sort by rerank score
258 | reranked = sorted(results, key=lambda x: x.get("rerank_score", 0), reverse=True)
259 |
260 | return reranked
261 | except Exception as e:
262 | print(f"Error during reranking: {e}")
263 | return results
264 |
265 | def is_sitemap(url: str) -> bool:
266 | """
267 | Check if a URL is a sitemap.
268 |
269 | Args:
270 | url: URL to check
271 |
272 | Returns:
273 | True if the URL is a sitemap, False otherwise
274 | """
275 | return url.endswith('sitemap.xml') or 'sitemap' in urlparse(url).path
276 |
277 | def is_txt(url: str) -> bool:
278 | """
279 | Check if a URL is a text file.
280 |
281 | Args:
282 | url: URL to check
283 |
284 | Returns:
285 | True if the URL is a text file, False otherwise
286 | """
287 | return url.endswith('.txt')
288 |
289 | def parse_sitemap(sitemap_url: str) -> List[str]:
290 | """
291 | Parse a sitemap and extract URLs.
292 |
293 | Args:
294 | sitemap_url: URL of the sitemap
295 |
296 | Returns:
297 | List of URLs found in the sitemap
298 | """
299 | resp = requests.get(sitemap_url)
300 | urls = []
301 |
302 | if resp.status_code == 200:
303 | try:
304 | tree = ElementTree.fromstring(resp.content)
305 | urls = [loc.text for loc in tree.findall('.//{*}loc')]
306 | except Exception as e:
307 | print(f"Error parsing sitemap XML: {e}")
308 |
309 | return urls
310 |
311 | def smart_chunk_markdown(text: str, chunk_size: int = 5000) -> List[str]:
312 | """Split text into chunks, respecting code blocks and paragraphs."""
313 | chunks = []
314 | start = 0
315 | text_length = len(text)
316 |
317 | while start < text_length:
318 | # Calculate end position
319 | end = start + chunk_size
320 |
321 | # If we're at the end of the text, just take what's left
322 | if end >= text_length:
323 | chunks.append(text[start:].strip())
324 | break
325 |
326 | # Try to find a code block boundary first (```)
327 | chunk = text[start:end]
328 | code_block = chunk.rfind('```')
329 | if code_block != -1 and code_block > chunk_size * 0.3:
330 | end = start + code_block
331 |
332 | # If no code block, try to break at a paragraph
333 | elif '\n\n' in chunk:
334 | # Find the last paragraph break
335 | last_break = chunk.rfind('\n\n')
336 | if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
337 | end = start + last_break
338 |
339 | # If no paragraph break, try to break at a sentence
340 | elif '. ' in chunk:
341 | # Find the last sentence break
342 | last_period = chunk.rfind('. ')
343 | if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
344 | end = start + last_period + 1
345 |
346 | # Extract chunk and clean it up
347 | chunk = text[start:end].strip()
348 | if chunk:
349 | chunks.append(chunk)
350 |
351 | # Move start position for next chunk
352 | start = end
353 |
354 | return chunks
355 |
356 | def extract_section_info(chunk: str) -> Dict[str, Any]:
357 | """
358 | Extracts headers and stats from a chunk.
359 |
360 | Args:
361 | chunk: Markdown chunk
362 |
363 | Returns:
364 | Dictionary with headers and stats
365 | """
366 | headers = re.findall(r'^(#+)\s+(.+)$', chunk, re.MULTILINE)
367 | header_str = '; '.join([f'{h[0]} {h[1]}' for h in headers]) if headers else ''
368 |
369 | return {
370 | "headers": header_str,
371 | "char_count": len(chunk),
372 | "word_count": len(chunk.split())
373 | }
374 |
375 | def process_code_example(args):
376 | """
377 | Process a single code example to generate its summary.
378 | This function is designed to be used with concurrent.futures.
379 |
380 | Args:
381 | args: Tuple containing (code, context_before, context_after)
382 |
383 | Returns:
384 | The generated summary
385 | """
386 | code, context_before, context_after = args
387 | return generate_code_example_summary(code, context_before, context_after)
388 |
389 | @mcp.tool()
390 | async def search(ctx: Context, query: str, return_raw_markdown: bool = False, num_results: int = 6, batch_size: int = 20, max_concurrent: int = 10, max_rag_workers: int = 5) -> str:
391 | """
392 | Comprehensive search tool that integrates SearXNG search with scraping and RAG functionality.
393 | Optionally, use `return_raw_markdown=true` to return raw markdown for more detailed analysis.
394 |
395 | This tool performs a complete search, scrape, and RAG workflow:
396 | 1. Searches SearXNG with the provided query, obtaining `num_results` URLs
397 | 2. Extracts markdown from URLs, chunks embedding data into Supabase
398 | 3. Scrapes all returned URLs using existing scraping functionality
399 | 4. Returns organized results with comprehensive metadata
400 |
401 | Args:
402 | query: The search query for SearXNG
403 | return_raw_markdown: If True, skip embedding/RAG and return raw markdown content (default: False)
404 | num_results: Number of search results to return from SearXNG (default: 6)
405 | batch_size: Batch size for database operations (default: 20)
406 | max_concurrent: Maximum concurrent browser sessions for scraping (default: 10)
407 | max_rag_workers: Maximum concurrent RAG query workers for parallel processing (default: 5)
408 |
409 | Returns:
410 | JSON string with search results, or raw markdown of each URL if `return_raw_markdown=true`
411 | """
412 | start_time = time.time()
413 |
414 | try:
415 | # Step 1: Environment validation - check if SEARXNG_URL is configured
416 | searxng_url = os.getenv("SEARXNG_URL")
417 | if not searxng_url:
418 | return json.dumps({
419 | "success": False,
420 | "error": "SEARXNG_URL environment variable is not configured. Please set it to your SearXNG instance URL."
421 | }, indent=2)
422 |
423 | searxng_url = searxng_url.rstrip('/') # Remove trailing slash
424 | search_endpoint = f"{searxng_url}/search"
425 |
426 | # Get optional configuration
427 | user_agent = os.getenv("SEARXNG_USER_AGENT", "MCP-Crawl4AI-RAG-Server/1.0")
428 | timeout = int(os.getenv("SEARXNG_TIMEOUT", "30"))
429 | default_engines = os.getenv("SEARXNG_DEFAULT_ENGINES", "")
430 |
431 | # Step 2: SearXNG request - make HTTP GET request with parameters
432 | headers = {
433 | "User-Agent": user_agent,
434 | "Accept": "application/json"
435 | }
436 |
437 | # Prepare request parameters
438 | params = {
439 | "q": query,
440 | "format": "json",
441 | "categories": "general",
442 | "limit": num_results # SearXNG uses 'limit'
443 | }
444 |
445 | # Add engines if specified
446 | if default_engines:
447 | params["engines"] = default_engines
448 |
449 | print(f"Making SearXNG request to: {search_endpoint}")
450 | print(f"Parameters: {params}")
451 |
452 | # Make the HTTP request to SearXNG
453 | try:
454 | response = requests.get(
455 | search_endpoint,
456 | params=params,
457 | headers=headers,
458 | timeout=timeout
459 | )
460 | response.raise_for_status() # Raise exception for HTTP errors
461 |
462 | except requests.exceptions.Timeout:
463 | return json.dumps({
464 | "success": False,
465 | "error": f"SearXNG request timed out after {timeout} seconds. Check your SearXNG instance."
466 | }, indent=2)
467 | except requests.exceptions.ConnectionError:
468 | return json.dumps({
469 | "success": False,
470 | "error": f"Cannot connect to SearXNG at {searxng_url}. Check the URL and ensure SearXNG is running."
471 | }, indent=2)
472 | except requests.exceptions.HTTPError as e:
473 | return json.dumps({
474 | "success": False,
475 | "error": f"SearXNG HTTP error: {e}. Check your SearXNG configuration."
476 | }, indent=2)
477 | except Exception as e:
478 | return json.dumps({
479 | "success": False,
480 | "error": f"SearXNG request failed: {str(e)}"
481 | }, indent=2)
482 |
483 | # Step 3: Response parsing - extract URLs from SearXNG JSON response
484 | try:
485 | search_data = response.json()
486 | except json.JSONDecodeError as e:
487 | return json.dumps({
488 | "success": False,
489 | "error": f"Invalid JSON response from SearXNG: {str(e)}"
490 | }, indent=2)
491 |
492 | # Extract results from response
493 | results = search_data.get("results", [])
494 | if not results:
495 | return json.dumps({
496 | "success": False,
497 | "query": query,
498 | "error": "No search results returned from SearXNG"
499 | }, indent=2)
500 |
501 | # Step 4: URL filtering - limit to num_results and validate URLs
502 | valid_urls = []
503 | for result in results[:num_results]:
504 | url = result.get("url", "").strip()
505 | if url and url.startswith(("http://", "https://")):
506 | valid_urls.append(url)
507 |
508 | if not valid_urls:
509 | return json.dumps({
510 | "success": False,
511 | "query": query,
512 | "error": "No valid URLs found in search results"
513 | }, indent=2)
514 |
515 | print(f"Found {len(valid_urls)} valid URLs to process")
516 |
517 | # Step 5: Content processing - use existing scrape_urls function
518 | try:
519 | # Use the existing scrape_urls function to scrape all URLs
520 | scrape_result_str = await scrape_urls(ctx, valid_urls, max_concurrent, batch_size)
521 | scrape_result = json.loads(scrape_result_str)
522 |
523 | if not scrape_result.get("success", False):
524 | return json.dumps({
525 | "success": False,
526 | "query": query,
527 | "searxng_results": valid_urls,
528 | "error": f"Scraping failed: {scrape_result.get('error', 'Unknown error')}"
529 | }, indent=2)
530 |
531 | except Exception as e:
532 | return json.dumps({
533 | "success": False,
534 | "query": query,
535 | "searxng_results": valid_urls,
536 | "error": f"Scraping error: {str(e)}"
537 | }, indent=2)
538 |
539 | # Step 6: Results processing based on return_raw_markdown flag
540 | results_data = {}
541 | processed_urls = 0
542 |
543 | if return_raw_markdown:
544 | # Raw markdown mode - just return scraped content without RAG
545 | # Get content from database for each URL
546 | supabase_client = ctx.request_context.lifespan_context.supabase_client
547 |
548 | for url in valid_urls:
549 | try:
550 | # Query the database for content from this URL
551 | result = supabase_client.from_('crawled_pages')\
552 | .select('content')\
553 | .eq('url', url)\
554 | .execute()
555 |
556 | if result.data:
557 | # Combine all chunks for this URL
558 | content_chunks = [row['content'] for row in result.data]
559 | combined_content = '\n\n'.join(content_chunks)
560 | results_data[url] = combined_content
561 | processed_urls += 1
562 | else:
563 | results_data[url] = "No content found"
564 |
565 | except Exception as e:
566 | results_data[url] = f"Error retrieving content: {str(e)}"
567 |
568 | else:
569 | # RAG mode - perform RAG query for each URL with parallel processing
570 | import asyncio
571 |
572 | # Prepare RAG query tasks
573 | async def process_rag_query_for_url(url: str):
574 | """Process RAG query for a single URL."""
575 | url_start_time = time.time()
576 | print(f"Processing RAG query for URL: {url}")
577 |
578 | try:
579 | # Extract source_id from URL for RAG filtering
580 | parsed_url = urlparse(url)
581 | source_id = parsed_url.netloc or parsed_url.path
582 |
583 | # Validate source_id
584 | if not source_id or source_id.strip() == "":
585 | print(f"Warning: Empty source_id for URL {url}, using fallback")
586 | source_id = "unknown_source"
587 |
588 | print(f"Using source_id: '{source_id}' for URL: {url}")
589 |
590 | # Perform RAG query with timeout protection (30 second timeout)
591 | try:
592 | rag_result_str = await asyncio.wait_for(
593 | perform_rag_query(ctx, query, source_id, match_count=5),
594 | timeout=30.0
595 | )
596 | print(f"RAG query completed for {url} in {time.time() - url_start_time:.2f}s")
597 | except asyncio.TimeoutError:
598 | print(f"RAG query timeout for URL: {url}")
599 | return url, "RAG query timed out after 30 seconds"
600 |
601 | # Parse JSON with error handling
602 | try:
603 | rag_result = json.loads(rag_result_str)
604 | except json.JSONDecodeError as e:
605 | print(f"JSON decode error for URL {url}: {e}")
606 | return url, f"JSON parsing error: {str(e)}"
607 |
608 | if rag_result.get("success", False) and rag_result.get("results"):
609 | # Format RAG results for this URL
610 | formatted_results = []
611 | for result in rag_result["results"]:
612 | formatted_results.append({
613 | "content": result.get("content", ""),
614 | "similarity": result.get("similarity", 0),
615 | "metadata": result.get("metadata", {})
616 | })
617 | print(f"Successfully processed RAG results for {url}: {len(formatted_results)} results")
618 | return url, formatted_results
619 | else:
620 | error_msg = rag_result.get("error", "No RAG results found")
621 | print(f"No RAG results for {url}: {error_msg}")
622 | return url, f"No relevant results: {error_msg}"
623 |
624 | except Exception as e:
625 | print(f"Unexpected error processing URL {url}: {str(e)}")
626 | return url, f"RAG query error: {str(e)}"
627 |
628 | # Use provided max_rag_workers or get from environment or use default
629 | if max_rag_workers is None:
630 | max_rag_workers = int(os.getenv("MAX_RAG_WORKERS", "5"))
631 |
632 | # Create tasks for parallel execution with semaphore for rate limiting
633 | semaphore = asyncio.Semaphore(max_rag_workers)
634 |
635 | async def rate_limited_rag_query(url):
636 | async with semaphore:
637 | return await process_rag_query_for_url(url)
638 |
639 | # Execute all RAG queries in parallel
640 | rag_tasks = [rate_limited_rag_query(url) for url in valid_urls]
641 | rag_results = await asyncio.gather(*rag_tasks)
642 |
643 | # Process results
644 | for url, result in rag_results:
645 | results_data[url] = result
646 | if isinstance(result, list): # Successfully got results
647 | processed_urls += 1
648 |
649 | # Calculate processing statistics
650 | processing_time = time.time() - start_time
651 |
652 | # Step 7: Format final results according to specification
653 | return json.dumps({
654 | "success": True,
655 | "query": query,
656 | "searxng_results": valid_urls,
657 | "mode": "raw_markdown" if return_raw_markdown else "rag_query",
658 | "results": results_data,
659 | "summary": {
660 | "urls_found": len(results),
661 | "urls_scraped": len(valid_urls),
662 | "urls_processed": processed_urls,
663 | "processing_time_seconds": round(processing_time, 2)
664 | },
665 | "performance": {
666 | "num_results": num_results,
667 | "batch_size": batch_size,
668 | "max_concurrent": max_concurrent,
669 | "max_rag_workers": max_rag_workers,
670 | "searxng_endpoint": search_endpoint
671 | }
672 | }, indent=2)
673 |
674 | except Exception as e:
675 | processing_time = time.time() - start_time
676 | return json.dumps({
677 | "success": False,
678 | "query": query,
679 | "error": f"Search operation failed: {str(e)}",
680 | "processing_time_seconds": round(processing_time, 2)
681 | }, indent=2)
682 |
683 | @mcp.tool()
684 | async def scrape_urls(ctx: Context, url: Union[str, List[str]], max_concurrent: int = 10, batch_size: int = 20, return_raw_markdown: bool = False) -> str:
685 | """
686 | Scrape **one or more URLs** and store their contents as embedding chunks in Supabase.
687 | Optionally, use `return_raw_markdown=true` to return raw markdown content without storing.
688 |
689 | The content is scraped and stored in Supabase for later retrieval and querying via perform_rag_query tool, unless
690 | `return_raw_markdown=True` is specified, in which case raw markdown is returned directly.
691 |
692 | Args:
693 | url: URL to scrape, or list of URLs for batch processing
694 | max_concurrent: Maximum number of concurrent browser sessions for multi-URL mode (default: 10)
695 | batch_size: Size of batches for database operations (default: 20)
696 | return_raw_markdown: If True, skip database storage and return raw markdown content (default: False)
697 |
698 | Returns:
699 | Summary of the scraping operation and storage in Supabase, or raw markdown content if requested
700 | """
701 | start_time = time.time()
702 |
703 | try:
704 | # Input validation and type detection
705 | if isinstance(url, str):
706 | # Single URL - convert to list for unified processing
707 | urls_to_process = [url]
708 | elif isinstance(url, list):
709 | # Multiple URLs
710 | if not url:
711 | return json.dumps({
712 | "success": False,
713 | "error": "URL list cannot be empty"
714 | }, indent=2)
715 |
716 | # Validate all URLs are strings and remove duplicates
717 | validated_urls = []
718 | for i, u in enumerate(url):
719 | if not isinstance(u, str):
720 | return json.dumps({
721 | "success": False,
722 | "error": f"URL at index {i} must be a string, got {type(u).__name__}"
723 | }, indent=2)
724 | if u.strip(): # Only add non-empty URLs
725 | validated_urls.append(u.strip())
726 |
727 | if not validated_urls:
728 | return json.dumps({
729 | "success": False,
730 | "error": "No valid URLs found in the list"
731 | }, indent=2)
732 |
733 | # Remove duplicates while preserving order
734 | seen = set()
735 | urls_to_process = []
736 | for u in validated_urls:
737 | if u not in seen:
738 | seen.add(u)
739 | urls_to_process.append(u)
740 | else:
741 | return json.dumps({
742 | "success": False,
743 | "error": f"URL must be a string or list of strings, got {type(url).__name__}"
744 | }, indent=2)
745 |
746 | # Get context components
747 | crawler = ctx.request_context.lifespan_context.crawler
748 | supabase_client = ctx.request_context.lifespan_context.supabase_client
749 |
750 | # Always use unified processing (handles both single and multiple URLs seamlessly)
751 | return await _process_multiple_urls(
752 | crawler, supabase_client, urls_to_process,
753 | max_concurrent, batch_size, start_time, return_raw_markdown
754 | )
755 |
756 | except Exception as e:
757 | processing_time = time.time() - start_time
758 | return json.dumps({
759 | "success": False,
760 | "url": url if isinstance(url, str) else f"[{len(url)} URLs]" if isinstance(url, list) else str(url),
761 | "error": str(e),
762 | "processing_time_seconds": round(processing_time, 2)
763 | }, indent=2)
764 |
765 |
766 | async def _process_multiple_urls(
767 | crawler: AsyncWebCrawler,
768 | supabase_client: Client,
769 | urls: List[str],
770 | max_concurrent: int,
771 | batch_size: int,
772 | start_time: float,
773 | return_raw_markdown: bool = False
774 | ) -> str:
775 | """
776 | Process one or more URLs using batch crawling and enhanced error handling.
777 |
778 | This function seamlessly handles both single URL and multiple URL scenarios,
779 | maintaining backward compatibility for single URL inputs while providing
780 | enhanced performance and error handling for all cases.
781 |
782 | Args:
783 | crawler: AsyncWebCrawler instance
784 | supabase_client: Supabase client
785 | urls: List of URLs to process (can be single URL or multiple)
786 | max_concurrent: Maximum concurrent browser sessions
787 | batch_size: Batch size for database operations
788 | start_time: Start time for performance tracking
789 |
790 | Returns:
791 | JSON string with crawl results (single URL format for 1 URL, multi format for multiple)
792 | """
793 | try:
794 | # Batch crawl all URLs using existing infrastructure
795 | crawl_results = await crawl_batch(crawler, urls, max_concurrent=max_concurrent)
796 |
797 | # Raw markdown mode - return immediately without storing
798 | if return_raw_markdown:
799 | results = {}
800 | total_content_length = 0
801 | urls_processed = 0
802 |
803 | for original_url in urls:
804 | # Find matching result
805 | crawl_result = None
806 | for cr in crawl_results:
807 | if cr['url'] == original_url:
808 | crawl_result = cr
809 | break
810 |
811 | if crawl_result and crawl_result.get('markdown'):
812 | results[original_url] = crawl_result['markdown']
813 | total_content_length += len(crawl_result['markdown'])
814 | urls_processed += 1
815 | else:
816 | results[original_url] = "No content retrieved"
817 |
818 | # Calculate processing time
819 | processing_time = time.time() - start_time
820 |
821 | return json.dumps({
822 | "success": True,
823 | "mode": "raw_markdown",
824 | "results": results,
825 | "summary": {
826 | "urls_processed": urls_processed,
827 | "total_content_length": total_content_length,
828 | "processing_time_seconds": round(processing_time, 2)
829 | }
830 | }, indent=2)
831 |
832 | # Initialize tracking variables for normal (database storage) mode
833 | all_urls = []
834 | all_chunk_numbers = []
835 | all_contents = []
836 | all_metadatas = []
837 | all_url_to_full_document = {}
838 |
839 | # Track sources and processing results
840 | source_content_map = {}
841 | source_word_counts = {}
842 |
843 | # Track individual URL results for detailed reporting
844 | url_results = []
845 | successful_urls = 0
846 | failed_urls = 0
847 | total_chunks = 0
848 | total_content_length = 0
849 | total_word_count = 0
850 | errors = []
851 |
852 | # Process each crawl result
853 | for original_url in urls:
854 | # Find matching result
855 | crawl_result = None
856 | for cr in crawl_results:
857 | if cr['url'] == original_url:
858 | crawl_result = cr
859 | break
860 |
861 | if crawl_result and crawl_result.get('markdown'):
862 | # Successful crawl
863 | try:
864 | # Extract source_id
865 | parsed_url = urlparse(original_url)
866 | source_id = parsed_url.netloc or parsed_url.path
867 |
868 | # Chunk the content
869 | chunks = smart_chunk_markdown(crawl_result['markdown'])
870 |
871 | # Store content for source summary generation
872 | if source_id not in source_content_map:
873 | source_content_map[source_id] = crawl_result['markdown'][:5000]
874 | source_word_counts[source_id] = 0
875 |
876 | url_word_count = 0
877 |
878 | # Process chunks
879 | for i, chunk in enumerate(chunks):
880 | all_urls.append(original_url)
881 | all_chunk_numbers.append(i)
882 | all_contents.append(chunk)
883 |
884 | # Extract metadata
885 | meta = extract_section_info(chunk)
886 | meta["chunk_index"] = i
887 | meta["url"] = original_url
888 | meta["source"] = source_id
889 | meta["crawl_type"] = "multi_url"
890 | meta["crawl_time"] = str(asyncio.current_task().get_coro().__name__)
891 | all_metadatas.append(meta)
892 |
893 | # Accumulate word counts
894 | chunk_word_count = meta.get("word_count", 0)
895 | url_word_count += chunk_word_count
896 | source_word_counts[source_id] += chunk_word_count
897 | total_word_count += chunk_word_count
898 |
899 | # Store full document mapping
900 | all_url_to_full_document[original_url] = crawl_result['markdown']
901 |
902 | # Track successful URL result
903 | url_results.append({
904 | "url": original_url,
905 | "success": True,
906 | "chunks_stored": len(chunks),
907 | "content_length": len(crawl_result['markdown']),
908 | "word_count": url_word_count,
909 | "source_id": source_id
910 | })
911 |
912 | successful_urls += 1
913 | total_chunks += len(chunks)
914 | total_content_length += len(crawl_result['markdown'])
915 |
916 | except Exception as e:
917 | # Error processing successful crawl
918 | error_detail = {
919 | "url": original_url,
920 | "error": str(e),
921 | "phase": "processing"
922 | }
923 | errors.append(error_detail)
924 | url_results.append({
925 | "url": original_url,
926 | "success": False,
927 | "error": str(e)
928 | })
929 | failed_urls += 1
930 | else:
931 | # Failed crawl
932 | error_msg = "No content retrieved"
933 | error_detail = {
934 | "url": original_url,
935 | "error": error_msg,
936 | "phase": "crawl"
937 | }
938 | errors.append(error_detail)
939 | url_results.append({
940 | "url": original_url,
941 | "success": False,
942 | "error": error_msg
943 | })
944 | failed_urls += 1
945 |
946 | # Update source information in parallel (if any successful crawls)
947 | if source_content_map:
948 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
949 | source_summary_args = [(source_id, content) for source_id, content in source_content_map.items()]
950 | source_summaries = list(executor.map(lambda args: extract_source_summary(args[0], args[1]), source_summary_args))
951 |
952 | for (source_id, _), summary in zip(source_summary_args, source_summaries):
953 | word_count = source_word_counts.get(source_id, 0)
954 | update_source_info(supabase_client, source_id, summary, word_count)
955 |
956 | # Add documentation chunks to Supabase in batches (if any)
957 | if all_contents:
958 | add_documents_to_supabase(
959 | supabase_client,
960 | all_urls,
961 | all_chunk_numbers,
962 | all_contents,
963 | all_metadatas,
964 | all_url_to_full_document,
965 | batch_size=batch_size
966 | )
967 |
968 | # Process code examples from all successful documents (if enabled)
969 | total_code_examples = 0
970 | extract_code_examples_enabled = os.getenv("USE_AGENTIC_RAG", "false") == "true"
971 | if extract_code_examples_enabled and crawl_results:
972 | code_urls = []
973 | code_chunk_numbers = []
974 | code_examples = []
975 | code_summaries = []
976 | code_metadatas = []
977 |
978 | # Extract code blocks from all successful documents
979 | for doc in crawl_results:
980 | if doc.get('markdown'):
981 | source_url = doc['url']
982 | md = doc['markdown']
983 | code_blocks = extract_code_blocks(md)
984 |
985 | if code_blocks:
986 | # Process code examples in parallel
987 | with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
988 | summary_args = [(block['code'], block['context_before'], block['context_after'])
989 | for block in code_blocks]
990 | summaries = list(executor.map(process_code_example, summary_args))
991 |
992 | # Prepare code example data
993 | parsed_url = urlparse(source_url)
994 | source_id = parsed_url.netloc or parsed_url.path
995 |
996 | for i, (block, summary) in enumerate(zip(code_blocks, summaries)):
997 | code_urls.append(source_url)
998 | code_chunk_numbers.append(len(code_examples))
999 | code_examples.append(block['code'])
1000 | code_summaries.append(summary)
1001 |
1002 | code_meta = {
1003 | "chunk_index": len(code_examples) - 1,
1004 | "url": source_url,
1005 | "source": source_id,
1006 | "char_count": len(block['code']),
1007 | "word_count": len(block['code'].split())
1008 | }
1009 | code_metadatas.append(code_meta)
1010 |
1011 | # Add all code examples to Supabase
1012 | if code_examples:
1013 | add_code_examples_to_supabase(
1014 | supabase_client,
1015 | code_urls,
1016 | code_chunk_numbers,
1017 | code_examples,
1018 | code_summaries,
1019 | code_metadatas,
1020 | batch_size=batch_size
1021 | )
1022 | total_code_examples = len(code_examples)
1023 |
1024 | # Calculate processing time
1025 | processing_time = time.time() - start_time
1026 |
1027 | # Return format based on number of URLs (maintain backward compatibility)
1028 | if len(urls) == 1:
1029 | # Single URL mode - return legacy-compatible format
1030 | single_url_result = url_results[0] if url_results else None
1031 | if single_url_result and single_url_result["success"]:
1032 | # Get the first crawl result for links information
1033 | first_crawl_result = None
1034 | for cr in crawl_results:
1035 | if cr['url'] == urls[0]:
1036 | first_crawl_result = cr
1037 | break
1038 |
1039 | return json.dumps({
1040 | "success": True,
1041 | "url": urls[0],
1042 | "chunks_stored": single_url_result.get("chunks_stored", 0),
1043 | "code_examples_stored": total_code_examples,
1044 | "content_length": single_url_result.get("content_length", 0),
1045 | "total_word_count": single_url_result.get("word_count", 0),
1046 | "source_id": single_url_result.get("source_id", ""),
1047 | "links_count": {
1048 | "internal": len(first_crawl_result.get("links", {}).get("internal", [])) if first_crawl_result else 0,
1049 | "external": len(first_crawl_result.get("links", {}).get("external", [])) if first_crawl_result else 0
1050 | }
1051 | }, indent=2)
1052 | else:
1053 | # Single URL failed
1054 | error_msg = single_url_result.get("error", "No content retrieved") if single_url_result else "No content retrieved"
1055 | return json.dumps({
1056 | "success": False,
1057 | "url": urls[0],
1058 | "error": error_msg
1059 | }, indent=2)
1060 | else:
1061 | # Multiple URLs mode - return comprehensive results
1062 | return json.dumps({
1063 | "success": True,
1064 | "mode": "multi_url",
1065 | "summary": {
1066 | "total_urls": len(urls),
1067 | "successful_urls": successful_urls,
1068 | "failed_urls": failed_urls,
1069 | "total_chunks_stored": total_chunks,
1070 | "total_code_examples_stored": total_code_examples,
1071 | "total_content_length": total_content_length,
1072 | "total_word_count": total_word_count,
1073 | "sources_updated": len(source_content_map),
1074 | "processing_time_seconds": round(processing_time, 2)
1075 | },
1076 | "results": url_results,
1077 | "errors": errors if errors else [],
1078 | "performance": {
1079 | "max_concurrent": max_concurrent,
1080 | "batch_size": batch_size,
1081 | "average_time_per_url": round(processing_time / len(urls), 2) if urls else 0
1082 | }
1083 | }, indent=2)
1084 |
1085 | except Exception as e:
1086 | processing_time = time.time() - start_time
1087 | if len(urls) == 1:
1088 | # Single URL error - return legacy-compatible format
1089 | return json.dumps({
1090 | "success": False,
1091 | "url": urls[0],
1092 | "error": str(e)
1093 | }, indent=2)
1094 | else:
1095 | # Multiple URLs error
1096 | return json.dumps({
1097 | "success": False,
1098 | "mode": "multi_url",
1099 | "error": str(e),
1100 | "summary": {
1101 | "total_urls": len(urls),
1102 | "processing_time_seconds": round(processing_time, 2)
1103 | }
1104 | }, indent=2)
1105 |
1106 | @mcp.tool()
1107 | async def smart_crawl_url(ctx: Context, url: str, max_depth: int = 3, max_concurrent: int = 10, chunk_size: int = 5000, return_raw_markdown: bool = False, query: List[str] = None, max_rag_workers: int = 5) -> str:
1108 | """
1109 | Intelligently crawl a URL based on its type and store content in Supabase.
1110 | Enhanced with raw markdown return and RAG query capabilities.
1111 |
1112 | This tool automatically detects the URL type and applies the appropriate crawling method:
1113 | - For sitemaps: Extracts and crawls all URLs in parallel
1114 | - For text files (llms.txt): Directly retrieves the content
1115 | - For regular webpages: Recursively crawls internal links up to the specified depth
1116 |
1117 | All crawled content is chunked and stored in Supabase for later retrieval and querying.
1118 |
1119 | Args:
1120 | url: URL to crawl (can be a regular webpage, sitemap.xml, or .txt file)
1121 | max_depth: Maximum recursion depth for regular URLs (default: 3)
1122 | max_concurrent: Maximum number of concurrent browser sessions (default: 10)
1123 | chunk_size: Maximum size of each content chunk in characters (default: 5000)
1124 | return_raw_markdown: If True, return raw markdown content instead of just storing (default: False)
1125 | query: List of queries to perform RAG search on crawled content (default: None)
1126 | max_rag_workers: Maximum concurrent RAG query workers for parallel processing (default: 5)
1127 |
1128 | Returns:
1129 | JSON string with crawl summary, raw markdown (if requested), or RAG query results
1130 | """
1131 | try:
1132 | # Get the crawler from the context
1133 | crawler = ctx.request_context.lifespan_context.crawler
1134 | supabase_client = ctx.request_context.lifespan_context.supabase_client
1135 |
1136 | # Determine the crawl strategy
1137 | crawl_results = []
1138 | crawl_type = None
1139 |
1140 | if is_txt(url):
1141 | # For text files, use simple crawl
1142 | crawl_results = await crawl_markdown_file(crawler, url)
1143 | crawl_type = "text_file"
1144 | elif is_sitemap(url):
1145 | # For sitemaps, extract URLs and crawl in parallel
1146 | sitemap_urls = parse_sitemap(url)
1147 | if not sitemap_urls:
1148 | return json.dumps({
1149 | "success": False,
1150 | "url": url,
1151 | "error": "No URLs found in sitemap"
1152 | }, indent=2)
1153 | crawl_results = await crawl_batch(crawler, sitemap_urls, max_concurrent=max_concurrent)
1154 | crawl_type = "sitemap"
1155 | else:
1156 | # For regular URLs, use recursive crawl
1157 | crawl_results = await crawl_recursive_internal_links(crawler, [url], max_depth=max_depth, max_concurrent=max_concurrent)
1158 | crawl_type = "webpage"
1159 |
1160 | if not crawl_results:
1161 | return json.dumps({
1162 | "success": False,
1163 | "url": url,
1164 | "error": "No content found"
1165 | }, indent=2)
1166 |
1167 | # Raw markdown mode - return immediately without storing
1168 | if return_raw_markdown:
1169 | results = {}
1170 | total_content_length = 0
1171 |
1172 | for doc in crawl_results:
1173 | results[doc['url']] = doc['markdown']
1174 | total_content_length += len(doc['markdown'])
1175 |
1176 | return json.dumps({
1177 | "success": True,
1178 | "mode": "raw_markdown",
1179 | "crawl_type": crawl_type,
1180 | "results": results,
1181 | "summary": {
1182 | "pages_crawled": len(crawl_results),
1183 | "total_content_length": total_content_length
1184 | }
1185 | }, indent=2)
1186 |
1187 | # Process results and store in Supabase for default and query modes
1188 | urls = []
1189 | chunk_numbers = []
1190 | contents = []
1191 | metadatas = []
1192 | chunk_count = 0
1193 |
1194 | # Track sources and their content
1195 | source_content_map = {}
1196 | source_word_counts = {}
1197 |
1198 | # Process documentation chunks
1199 | for doc in crawl_results:
1200 | source_url = doc['url']
1201 | md = doc['markdown']
1202 | chunks = smart_chunk_markdown(md, chunk_size=chunk_size)
1203 |
1204 | # Extract source_id
1205 | parsed_url = urlparse(source_url)
1206 | source_id = parsed_url.netloc or parsed_url.path
1207 |
1208 | # Store content for source summary generation
1209 | if source_id not in source_content_map:
1210 | source_content_map[source_id] = md[:5000] # Store first 5000 chars
1211 | source_word_counts[source_id] = 0
1212 |
1213 | for i, chunk in enumerate(chunks):
1214 | urls.append(source_url)
1215 | chunk_numbers.append(i)
1216 | contents.append(chunk)
1217 |
1218 | # Extract metadata
1219 | meta = extract_section_info(chunk)
1220 | meta["chunk_index"] = i
1221 | meta["url"] = source_url
1222 | meta["source"] = source_id
1223 | meta["crawl_type"] = crawl_type
1224 | meta["crawl_time"] = str(asyncio.current_task().get_coro().__name__)
1225 | metadatas.append(meta)
1226 |
1227 | # Accumulate word count
1228 | source_word_counts[source_id] += meta.get("word_count", 0)
1229 |
1230 | chunk_count += 1
1231 |
1232 | # Create url_to_full_document mapping
1233 | url_to_full_document = {}
1234 | for doc in crawl_results:
1235 | url_to_full_document[doc['url']] = doc['markdown']
1236 |
1237 | # Update source information for each unique source FIRST (before inserting documents)
1238 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
1239 | source_summary_args = [(source_id, content) for source_id, content in source_content_map.items()]
1240 | source_summaries = list(executor.map(lambda args: extract_source_summary(args[0], args[1]), source_summary_args))
1241 |
1242 | for (source_id, _), summary in zip(source_summary_args, source_summaries):
1243 | word_count = source_word_counts.get(source_id, 0)
1244 | update_source_info(supabase_client, source_id, summary, word_count)
1245 |
1246 | # Add documentation chunks to Supabase (AFTER sources exist)
1247 | batch_size = 20
1248 | add_documents_to_supabase(supabase_client, urls, chunk_numbers, contents, metadatas, url_to_full_document, batch_size=batch_size)
1249 |
1250 | # Extract and process code examples from all documents only if enabled
1251 | code_examples = []
1252 | extract_code_examples_enabled = os.getenv("USE_AGENTIC_RAG", "false") == "true"
1253 | if extract_code_examples_enabled:
1254 | all_code_blocks = []
1255 | code_urls = []
1256 | code_chunk_numbers = []
1257 | code_summaries = []
1258 | code_metadatas = []
1259 |
1260 | # Extract code blocks from all documents
1261 | for doc in crawl_results:
1262 | source_url = doc['url']
1263 | md = doc['markdown']
1264 | code_blocks = extract_code_blocks(md)
1265 |
1266 | if code_blocks:
1267 | # Process code examples in parallel
1268 | with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
1269 | # Prepare arguments for parallel processing
1270 | summary_args = [(block['code'], block['context_before'], block['context_after'])
1271 | for block in code_blocks]
1272 |
1273 | # Generate summaries in parallel
1274 | summaries = list(executor.map(process_code_example, summary_args))
1275 |
1276 | # Prepare code example data
1277 | parsed_url = urlparse(source_url)
1278 | source_id = parsed_url.netloc or parsed_url.path
1279 |
1280 | for i, (block, summary) in enumerate(zip(code_blocks, summaries)):
1281 | code_urls.append(source_url)
1282 | code_chunk_numbers.append(len(code_examples)) # Use global code example index
1283 | code_examples.append(block['code'])
1284 | code_summaries.append(summary)
1285 |
1286 | # Create metadata for code example
1287 | code_meta = {
1288 | "chunk_index": len(code_examples) - 1,
1289 | "url": source_url,
1290 | "source": source_id,
1291 | "char_count": len(block['code']),
1292 | "word_count": len(block['code'].split())
1293 | }
1294 | code_metadatas.append(code_meta)
1295 |
1296 | # Add all code examples to Supabase
1297 | if code_examples:
1298 | add_code_examples_to_supabase(
1299 | supabase_client,
1300 | code_urls,
1301 | code_chunk_numbers,
1302 | code_examples,
1303 | code_summaries,
1304 | code_metadatas,
1305 | batch_size=batch_size
1306 | )
1307 |
1308 | # Query mode - perform RAG queries on all crawled URLs with parallel processing
1309 | if query and len(query) > 0:
1310 | results = {}
1311 | total_rag_queries = 0
1312 |
1313 | # Prepare all RAG query tasks for parallel execution
1314 | async def process_single_rag_query(doc_url: str, q: str, source_id: str):
1315 | """Process a single RAG query for a URL and query combination."""
1316 | try:
1317 | # Perform RAG query using existing function
1318 | rag_result_str = await perform_rag_query(ctx, q, source_id, match_count=5)
1319 | rag_result = json.loads(rag_result_str)
1320 |
1321 | if rag_result.get("success", False) and rag_result.get("results"):
1322 | # Format RAG results for this URL and query
1323 | formatted_results = []
1324 | for result in rag_result["results"]:
1325 | formatted_results.append({
1326 | "content": result.get("content", ""),
1327 | "similarity": result.get("similarity", 0),
1328 | "metadata": result.get("metadata", {})
1329 | })
1330 | return doc_url, q, formatted_results
1331 | else:
1332 | return doc_url, q, "No relevant results found"
1333 |
1334 | except Exception as e:
1335 | return doc_url, q, f"RAG query error: {str(e)}"
1336 |
1337 | # Use provided max_rag_workers or get from environment or use default
1338 | if max_rag_workers is None:
1339 | max_rag_workers = int(os.getenv("MAX_RAG_WORKERS", "5"))
1340 |
1341 | # Create semaphore for rate limiting
1342 | semaphore = asyncio.Semaphore(max_rag_workers)
1343 |
1344 | async def rate_limited_query(doc_url, q, source_id):
1345 | async with semaphore:
1346 | return await process_single_rag_query(doc_url, q, source_id)
1347 |
1348 | # Build list of all query tasks
1349 | query_tasks = []
1350 | for doc in crawl_results:
1351 | doc_url = doc['url']
1352 | # Extract source_id from URL for RAG filtering
1353 | parsed_url = urlparse(doc_url)
1354 | source_id = parsed_url.netloc or parsed_url.path
1355 |
1356 | results[doc_url] = {}
1357 |
1358 | for q in query:
1359 | query_tasks.append(rate_limited_query(doc_url, q, source_id))
1360 |
1361 | # Execute all queries in parallel
1362 | query_results = await asyncio.gather(*query_tasks)
1363 |
1364 | # Process results
1365 | for doc_url, q, result in query_results:
1366 | results[doc_url][q] = result
1367 | total_rag_queries += 1
1368 |
1369 | return json.dumps({
1370 | "success": True,
1371 | "mode": "rag_query",
1372 | "crawl_type": crawl_type,
1373 | "results": results,
1374 | "summary": {
1375 | "pages_crawled": len(crawl_results),
1376 | "queries_processed": len(query),
1377 | "total_rag_queries": total_rag_queries,
1378 | "max_rag_workers": max_rag_workers
1379 | }
1380 | }, indent=2)
1381 |
1382 | # Default mode - return crawl statistics as before
1383 | return json.dumps({
1384 | "success": True,
1385 | "url": url,
1386 | "crawl_type": crawl_type,
1387 | "pages_crawled": len(crawl_results),
1388 | "chunks_stored": chunk_count,
1389 | "code_examples_stored": len(code_examples),
1390 | "sources_updated": len(source_content_map),
1391 | "urls_crawled": [doc['url'] for doc in crawl_results][:5] + (["..."] if len(crawl_results) > 5 else [])
1392 | }, indent=2)
1393 | except Exception as e:
1394 | return json.dumps({
1395 | "success": False,
1396 | "url": url,
1397 | "error": str(e)
1398 | }, indent=2)
1399 |
1400 | @mcp.tool()
1401 | async def get_available_sources(ctx: Context) -> str:
1402 | """
1403 | Get all available sources from the sources table.
1404 |
1405 | This tool returns a list of all unique sources (domains) that have been crawled and stored
1406 | in the database, along with their summaries and statistics. This is useful for discovering
1407 | what content is available for querying.
1408 |
1409 | Always use this tool before calling the RAG query or code example query tool
1410 | with a specific source filter!
1411 |
1412 | Args:
1413 | NONE
1414 |
1415 | Returns:
1416 | JSON string with the list of available sources and their details
1417 | """
1418 | try:
1419 | # Get the Supabase client from the context
1420 | supabase_client = ctx.request_context.lifespan_context.supabase_client
1421 |
1422 | # Query the sources table directly
1423 | result = supabase_client.from_('sources')\
1424 | .select('*')\
1425 | .order('source_id')\
1426 | .execute()
1427 |
1428 | # Format the sources with their details
1429 | sources = []
1430 | if result.data:
1431 | for source in result.data:
1432 | sources.append({
1433 | "source_id": source.get("source_id"),
1434 | "summary": source.get("summary"),
1435 | "total_words": source.get("total_words"),
1436 | "created_at": source.get("created_at"),
1437 | "updated_at": source.get("updated_at")
1438 | })
1439 |
1440 | return json.dumps({
1441 | "success": True,
1442 | "sources": sources,
1443 | "count": len(sources)
1444 | }, indent=2)
1445 | except Exception as e:
1446 | return json.dumps({
1447 | "success": False,
1448 | "error": str(e)
1449 | }, indent=2)
1450 |
1451 | @mcp.tool()
1452 | async def perform_rag_query(ctx: Context, query: str, source: str = None, match_count: int = 5) -> str:
1453 | """
1454 | Perform a RAG (Retrieval Augmented Generation) query on the stored content.
1455 |
1456 | This tool searches the vector database for content relevant to the query and returns
1457 | the matching documents. Optionally filter by source domain.
1458 | Get the source by using the get_available_sources tool before calling this search!
1459 |
1460 | Args:
1461 | query: The search query
1462 | source: Optional source domain to filter results (e.g., 'example.com')
1463 | match_count: Maximum number of results to return (default: 5)
1464 |
1465 | Returns:
1466 | JSON string with the search results
1467 | """
1468 | import asyncio
1469 |
1470 | query_start_time = time.time()
1471 |
1472 | try:
1473 | print(f"Starting RAG query: '{query}' with source filter: '{source}'")
1474 |
1475 | # Input validation
1476 | if not query or not query.strip():
1477 | return json.dumps({
1478 | "success": False,
1479 | "error": "Query cannot be empty"
1480 | }, indent=2)
1481 |
1482 | if match_count <= 0:
1483 | match_count = 5
1484 | elif match_count > 50: # Reasonable limit
1485 | match_count = 50
1486 |
1487 | # Validate and sanitize source filter
1488 | if source:
1489 | source = source.strip()
1490 | if not source:
1491 | source = None
1492 | elif len(source) > 200: # Reasonable limit
1493 | return json.dumps({
1494 | "success": False,
1495 | "error": "Source filter too long (max 200 characters)"
1496 | }, indent=2)
1497 |
1498 | # Get the Supabase client from the context
1499 | supabase_client = ctx.request_context.lifespan_context.supabase_client
1500 |
1501 | if not supabase_client:
1502 | return json.dumps({
1503 | "success": False,
1504 | "error": "Database client not available"
1505 | }, indent=2)
1506 |
1507 | # Check if hybrid search is enabled
1508 | use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false") == "true"
1509 |
1510 | # Prepare source filter if source is provided and not empty
1511 | # The source parameter should be the source_id (domain) not full URL
1512 | if source:
1513 | print(f"[DEBUG] Using source filter: '{source}'")
1514 |
1515 | results = []
1516 |
1517 | if use_hybrid_search:
1518 | print("[DEBUG] Using hybrid search mode")
1519 | try:
1520 | # Hybrid search: combine vector and keyword search with timeout protection
1521 |
1522 | # 1. Get vector search results with timeout (15 seconds)
1523 | print("[DEBUG] Executing vector search...")
1524 | try:
1525 | vector_results = await asyncio.wait_for(
1526 | asyncio.get_event_loop().run_in_executor(
1527 | None,
1528 | lambda: search_documents(
1529 | client=supabase_client,
1530 | query=query,
1531 | match_count=match_count * 2, # Get double to have room for filtering
1532 | source_id_filter=source # Use source_id_filter instead of filter_metadata
1533 | )
1534 | ),
1535 | timeout=15.0
1536 | )
1537 | print(f"Vector search completed: {len(vector_results)} results")
1538 | except asyncio.TimeoutError:
1539 | print("Vector search timed out, falling back to keyword search only")
1540 | vector_results = []
1541 | except Exception as e:
1542 | print(f"Vector search failed: {e}, falling back to keyword search only")
1543 | vector_results = []
1544 |
1545 | # 2. Get keyword search results with timeout (10 seconds)
1546 | print("Executing keyword search...")
1547 | try:
1548 | keyword_query = supabase_client.from_('crawled_pages')\
1549 | .select('id, url, chunk_number, content, metadata, source_id')\
1550 | .ilike('content', f'%{query}%')
1551 |
1552 | # Apply source filter if provided
1553 | if source:
1554 | keyword_query = keyword_query.eq('source_id', source)
1555 |
1556 | # Execute keyword search with timeout
1557 | keyword_response = await asyncio.wait_for(
1558 | asyncio.get_event_loop().run_in_executor(
1559 | None,
1560 | lambda: keyword_query.limit(match_count * 2).execute()
1561 | ),
1562 | timeout=10.0
1563 | )
1564 | keyword_results = keyword_response.data if keyword_response.data else []
1565 | print(f"Keyword search completed: {len(keyword_results)} results")
1566 | except asyncio.TimeoutError:
1567 | print("Keyword search timed out")
1568 | keyword_results = []
1569 | except Exception as e:
1570 | print(f"Keyword search failed: {e}")
1571 | keyword_results = []
1572 |
1573 | # 3. Combine results with preference for items appearing in both
1574 | if vector_results or keyword_results:
1575 | seen_ids = set()
1576 | combined_results = []
1577 |
1578 | # First, add items that appear in both searches (these are the best matches)
1579 | vector_ids = {r.get('id') for r in vector_results if r.get('id')}
1580 | for kr in keyword_results:
1581 | if kr['id'] in vector_ids and kr['id'] not in seen_ids:
1582 | # Find the vector result to get similarity score
1583 | for vr in vector_results:
1584 | if vr.get('id') == kr['id']:
1585 | # Boost similarity score for items in both results
1586 | vr['similarity'] = min(1.0, vr.get('similarity', 0) * 1.2)
1587 | combined_results.append(vr)
1588 | seen_ids.add(kr['id'])
1589 | break
1590 |
1591 | # Then add remaining vector results (semantic matches without exact keyword)
1592 | for vr in vector_results:
1593 | if vr.get('id') and vr['id'] not in seen_ids and len(combined_results) < match_count:
1594 | combined_results.append(vr)
1595 | seen_ids.add(vr['id'])
1596 |
1597 | # Finally, add pure keyword matches if we still need more results
1598 | for kr in keyword_results:
1599 | if kr['id'] not in seen_ids and len(combined_results) < match_count:
1600 | # Convert keyword result to match vector result format
1601 | combined_results.append({
1602 | 'id': kr['id'],
1603 | 'url': kr['url'],
1604 | 'chunk_number': kr['chunk_number'],
1605 | 'content': kr['content'],
1606 | 'metadata': kr['metadata'],
1607 | 'source_id': kr['source_id'],
1608 | 'similarity': 0.5 # Default similarity for keyword-only matches
1609 | })
1610 | seen_ids.add(kr['id'])
1611 |
1612 | # Use combined results
1613 | results = combined_results[:match_count]
1614 | print(f"Hybrid search combined: {len(results)} final results")
1615 | else:
1616 | print("No results from either vector or keyword search")
1617 | results = []
1618 |
1619 | except Exception as e:
1620 | print(f"Hybrid search failed: {e}, falling back to vector search")
1621 | use_hybrid_search = False
1622 |
1623 | if not use_hybrid_search:
1624 | print("[DEBUG] Using vector search only")
1625 | try:
1626 | # Standard vector search with timeout protection (20 seconds)
1627 | results = await asyncio.wait_for(
1628 | asyncio.get_event_loop().run_in_executor(
1629 | None,
1630 | lambda: search_documents(
1631 | client=supabase_client,
1632 | query=query,
1633 | match_count=match_count,
1634 | source_id_filter=source # Use source_id_filter instead of filter_metadata
1635 | )
1636 | ),
1637 | timeout=20.0
1638 | )
1639 | print(f"Vector search completed: {len(results)} results")
1640 | except asyncio.TimeoutError:
1641 | print("Vector search timed out")
1642 | return json.dumps({
1643 | "success": False,
1644 | "query": query,
1645 | "error": "Search query timed out after 20 seconds. Try reducing match_count or simplifying the query."
1646 | }, indent=2)
1647 | except Exception as e:
1648 | print(f"Vector search failed: {e}")
1649 | return json.dumps({
1650 | "success": False,
1651 | "query": query,
1652 | "error": f"Database search failed: {str(e)}"
1653 | }, indent=2)
1654 |
1655 | # Apply reranking if enabled and we have results
1656 | use_reranking = os.getenv("USE_RERANKING", "false") == "true"
1657 | if use_reranking and results and ctx.request_context.lifespan_context.reranking_model:
1658 | try:
1659 | print("Applying reranking...")
1660 | reranked_results = await asyncio.wait_for(
1661 | asyncio.get_event_loop().run_in_executor(
1662 | None,
1663 | lambda: rerank_results(
1664 | ctx.request_context.lifespan_context.reranking_model,
1665 | query,
1666 | results,
1667 | content_key="content"
1668 | )
1669 | ),
1670 | timeout=10.0
1671 | )
1672 | results = reranked_results
1673 | print("Reranking completed")
1674 | except asyncio.TimeoutError:
1675 | print("Reranking timed out, using original results")
1676 | except Exception as e:
1677 | print(f"Reranking failed: {e}, using original results")
1678 |
1679 | # Format the results
1680 | formatted_results = []
1681 | for result in results:
1682 | try:
1683 | formatted_result = {
1684 | "url": result.get("url", ""),
1685 | "content": result.get("content", ""),
1686 | "metadata": result.get("metadata", {}),
1687 | "similarity": result.get("similarity", 0.0)
1688 | }
1689 | # Include rerank score if available
1690 | if "rerank_score" in result:
1691 | formatted_result["rerank_score"] = result["rerank_score"]
1692 | formatted_results.append(formatted_result)
1693 | except Exception as e:
1694 | print(f"Error formatting result: {e}")
1695 | continue
1696 |
1697 | processing_time = time.time() - query_start_time
1698 | print(f"RAG query completed in {processing_time:.2f}s with {len(formatted_results)} results")
1699 |
1700 | return json.dumps({
1701 | "success": True,
1702 | "query": query,
1703 | "source_filter": source,
1704 | "search_mode": "hybrid" if use_hybrid_search else "vector",
1705 | "reranking_applied": use_reranking and ctx.request_context.lifespan_context.reranking_model is not None,
1706 | "results": formatted_results,
1707 | "count": len(formatted_results),
1708 | "processing_time_seconds": round(processing_time, 2)
1709 | }, indent=2)
1710 |
1711 | except Exception as e:
1712 | processing_time = time.time() - query_start_time
1713 | print(f"RAG query failed after {processing_time:.2f}s: {e}")
1714 | return json.dumps({
1715 | "success": False,
1716 | "query": query,
1717 | "source_filter": source,
1718 | "error": f"Search operation failed: {str(e)}",
1719 | "processing_time_seconds": round(processing_time, 2)
1720 | }, indent=2)
1721 |
1722 | @mcp.tool()
1723 | async def search_code_examples(ctx: Context, query: str, source_id: str = None, match_count: int = 5) -> str:
1724 | """
1725 | Search for code examples relevant to the query.
1726 |
1727 | This tool searches the vector database for code examples relevant to the query and returns
1728 | the matching examples with their summaries. Optionally filter by source_id.
1729 | Get the source_id by using the get_available_sources tool before calling this search!
1730 |
1731 | Use the get_available_sources tool first to see what sources are available for filtering.
1732 |
1733 | Args:
1734 | query: The search query
1735 | source_id: Optional source ID to filter results (e.g., 'example.com')
1736 | match_count: Maximum number of results to return (default: 5)
1737 |
1738 | Returns:
1739 | JSON string with the search results
1740 | """
1741 | # Check if code example extraction is enabled
1742 | extract_code_examples_enabled = os.getenv("USE_AGENTIC_RAG", "false") == "true"
1743 | if not extract_code_examples_enabled:
1744 | return json.dumps({
1745 | "success": False,
1746 | "error": "Code example extraction is disabled. Perform a normal RAG search."
1747 | }, indent=2)
1748 |
1749 | try:
1750 | # Get the Supabase client from the context
1751 | supabase_client = ctx.request_context.lifespan_context.supabase_client
1752 |
1753 | # Check if hybrid search is enabled
1754 | use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false") == "true"
1755 |
1756 | # Prepare filter if source is provided and not empty
1757 | filter_metadata = None
1758 | if source_id and source_id.strip():
1759 | filter_metadata = {"source": source_id}
1760 |
1761 | if use_hybrid_search:
1762 | # Hybrid search: combine vector and keyword search
1763 |
1764 | # Import the search function from utils
1765 | from utils import search_code_examples as search_code_examples_impl
1766 |
1767 | # 1. Get vector search results (get more to account for filtering)
1768 | vector_results = search_code_examples_impl(
1769 | client=supabase_client,
1770 | query=query,
1771 | match_count=match_count * 2, # Get double to have room for filtering
1772 | filter_metadata=filter_metadata
1773 | )
1774 |
1775 | # 2. Get keyword search results using ILIKE on both content and summary
1776 | keyword_query = supabase_client.from_('code_examples')\
1777 | .select('id, url, chunk_number, content, summary, metadata, source_id')\
1778 | .or_(f'content.ilike.%{query}%,summary.ilike.%{query}%')
1779 |
1780 | # Apply source filter if provided
1781 | if source_id and source_id.strip():
1782 | keyword_query = keyword_query.eq('source_id', source_id)
1783 |
1784 | # Execute keyword search
1785 | keyword_response = keyword_query.limit(match_count * 2).execute()
1786 | keyword_results = keyword_response.data if keyword_response.data else []
1787 |
1788 | # 3. Combine results with preference for items appearing in both
1789 | seen_ids = set()
1790 | combined_results = []
1791 |
1792 | # First, add items that appear in both searches (these are the best matches)
1793 | vector_ids = {r.get('id') for r in vector_results if r.get('id')}
1794 | for kr in keyword_results:
1795 | if kr['id'] in vector_ids and kr['id'] not in seen_ids:
1796 | # Find the vector result to get similarity score
1797 | for vr in vector_results:
1798 | if vr.get('id') == kr['id']:
1799 | # Boost similarity score for items in both results
1800 | vr['similarity'] = min(1.0, vr.get('similarity', 0) * 1.2)
1801 | combined_results.append(vr)
1802 | seen_ids.add(kr['id'])
1803 | break
1804 |
1805 | # Then add remaining vector results (semantic matches without exact keyword)
1806 | for vr in vector_results:
1807 | if vr.get('id') and vr['id'] not in seen_ids and len(combined_results) < match_count:
1808 | combined_results.append(vr)
1809 | seen_ids.add(vr['id'])
1810 |
1811 | # Finally, add pure keyword matches if we still need more results
1812 | for kr in keyword_results:
1813 | if kr['id'] not in seen_ids and len(combined_results) < match_count:
1814 | # Convert keyword result to match vector result format
1815 | combined_results.append({
1816 | 'id': kr['id'],
1817 | 'url': kr['url'],
1818 | 'chunk_number': kr['chunk_number'],
1819 | 'content': kr['content'],
1820 | 'summary': kr['summary'],
1821 | 'metadata': kr['metadata'],
1822 | 'source_id': kr['source_id'],
1823 | 'similarity': 0.5 # Default similarity for keyword-only matches
1824 | })
1825 | seen_ids.add(kr['id'])
1826 |
1827 | # Use combined results
1828 | results = combined_results[:match_count]
1829 |
1830 | else:
1831 | # Standard vector search only
1832 | from utils import search_code_examples as search_code_examples_impl
1833 |
1834 | results = search_code_examples_impl(
1835 | client=supabase_client,
1836 | query=query,
1837 | match_count=match_count,
1838 | filter_metadata=filter_metadata
1839 | )
1840 |
1841 | # Apply reranking if enabled
1842 | use_reranking = os.getenv("USE_RERANKING", "false") == "true"
1843 | if use_reranking and ctx.request_context.lifespan_context.reranking_model:
1844 | results = rerank_results(ctx.request_context.lifespan_context.reranking_model, query, results, content_key="content")
1845 |
1846 | # Format the results
1847 | formatted_results = []
1848 | for result in results:
1849 | formatted_result = {
1850 | "url": result.get("url"),
1851 | "code": result.get("content"),
1852 | "summary": result.get("summary"),
1853 | "metadata": result.get("metadata"),
1854 | "source_id": result.get("source_id"),
1855 | "similarity": result.get("similarity")
1856 | }
1857 | # Include rerank score if available
1858 | if "rerank_score" in result:
1859 | formatted_result["rerank_score"] = result["rerank_score"]
1860 | formatted_results.append(formatted_result)
1861 |
1862 | return json.dumps({
1863 | "success": True,
1864 | "query": query,
1865 | "source_filter": source_id,
1866 | "search_mode": "hybrid" if use_hybrid_search else "vector",
1867 | "reranking_applied": use_reranking and ctx.request_context.lifespan_context.reranking_model is not None,
1868 | "results": formatted_results,
1869 | "count": len(formatted_results)
1870 | }, indent=2)
1871 | except Exception as e:
1872 | return json.dumps({
1873 | "success": False,
1874 | "query": query,
1875 | "error": str(e)
1876 | }, indent=2)
1877 |
1878 | @mcp.tool()
1879 | async def check_ai_script_hallucinations(ctx: Context, script_path: str) -> str:
1880 | """
1881 | Check an AI-generated Python script for hallucinations using the knowledge graph.
1882 |
1883 | This tool analyzes a Python script for potential AI hallucinations by validating
1884 | imports, method calls, class instantiations, and function calls against a Neo4j
1885 | knowledge graph containing real repository data.
1886 |
1887 | The tool performs comprehensive analysis including:
1888 | - Import validation against known repositories
1889 | - Method call validation on classes from the knowledge graph
1890 | - Class instantiation parameter validation
1891 | - Function call parameter validation
1892 | - Attribute access validation
1893 |
1894 | Args:
1895 | script_path: Absolute path to the Python script to analyze
1896 |
1897 | Returns:
1898 | JSON string with hallucination detection results, confidence scores, and recommendations
1899 | """
1900 | try:
1901 | # Check if knowledge graph functionality is enabled
1902 | knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true"
1903 | if not knowledge_graph_enabled:
1904 | return json.dumps({
1905 | "success": False,
1906 | "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment."
1907 | }, indent=2)
1908 |
1909 | # Get the knowledge validator from context
1910 | knowledge_validator = ctx.request_context.lifespan_context.knowledge_validator
1911 |
1912 | if not knowledge_validator:
1913 | return json.dumps({
1914 | "success": False,
1915 | "error": "Knowledge graph validator not available. Check Neo4j configuration in environment variables."
1916 | }, indent=2)
1917 |
1918 | # Validate script path
1919 | validation = validate_script_path(script_path)
1920 | if not validation["valid"]:
1921 | return json.dumps({
1922 | "success": False,
1923 | "script_path": script_path,
1924 | "error": validation["error"]
1925 | }, indent=2)
1926 |
1927 | # Step 1: Analyze script structure using AST
1928 | analyzer = AIScriptAnalyzer()
1929 | analysis_result = analyzer.analyze_script(script_path)
1930 |
1931 | if analysis_result.errors:
1932 | print(f"Analysis warnings for {script_path}: {analysis_result.errors}")
1933 |
1934 | # Step 2: Validate against knowledge graph
1935 | validation_result = await knowledge_validator.validate_script(analysis_result)
1936 |
1937 | # Step 3: Generate comprehensive report
1938 | reporter = HallucinationReporter()
1939 | report = reporter.generate_comprehensive_report(validation_result)
1940 |
1941 | # Format response with comprehensive information
1942 | return json.dumps({
1943 | "success": True,
1944 | "script_path": script_path,
1945 | "overall_confidence": validation_result.overall_confidence,
1946 | "validation_summary": {
1947 | "total_validations": report["validation_summary"]["total_validations"],
1948 | "valid_count": report["validation_summary"]["valid_count"],
1949 | "invalid_count": report["validation_summary"]["invalid_count"],
1950 | "uncertain_count": report["validation_summary"]["uncertain_count"],
1951 | "not_found_count": report["validation_summary"]["not_found_count"],
1952 | "hallucination_rate": report["validation_summary"]["hallucination_rate"]
1953 | },
1954 | "hallucinations_detected": report["hallucinations_detected"],
1955 | "recommendations": report["recommendations"],
1956 | "analysis_metadata": {
1957 | "total_imports": report["analysis_metadata"]["total_imports"],
1958 | "total_classes": report["analysis_metadata"]["total_classes"],
1959 | "total_methods": report["analysis_metadata"]["total_methods"],
1960 | "total_attributes": report["analysis_metadata"]["total_attributes"],
1961 | "total_functions": report["analysis_metadata"]["total_functions"]
1962 | },
1963 | "libraries_analyzed": report.get("libraries_analyzed", [])
1964 | }, indent=2)
1965 |
1966 | except Exception as e:
1967 | return json.dumps({
1968 | "success": False,
1969 | "script_path": script_path,
1970 | "error": f"Analysis failed: {str(e)}"
1971 | }, indent=2)
1972 |
1973 | @mcp.tool()
1974 | async def query_knowledge_graph(ctx: Context, command: str) -> str:
1975 | """
1976 | Query and explore the Neo4j knowledge graph containing repository data.
1977 |
1978 | This tool provides comprehensive access to the knowledge graph for exploring repositories,
1979 | classes, methods, functions, and their relationships. Perfect for understanding what data
1980 | is available for hallucination detection and debugging validation results.
1981 |
1982 | **⚠️ IMPORTANT: Always start with the `repos` command first!**
1983 | Before using any other commands, run `repos` to see what repositories are available
1984 | in your knowledge graph. This will help you understand what data you can explore.
1985 |
1986 | ## Available Commands:
1987 |
1988 | **Repository Commands:**
1989 | - `repos` - **START HERE!** List all repositories in the knowledge graph
1990 | - `explore <repo_name>` - Get detailed overview of a specific repository
1991 |
1992 | **Class Commands:**
1993 | - `classes` - List all classes across all repositories (limited to 20)
1994 | - `classes <repo_name>` - List classes in a specific repository
1995 | - `class <class_name>` - Get detailed information about a specific class including methods and attributes
1996 |
1997 | **Method Commands:**
1998 | - `method <method_name>` - Search for methods by name across all classes
1999 | - `method <method_name> <class_name>` - Search for a method within a specific class
2000 |
2001 | **Custom Query:**
2002 | - `query <cypher_query>` - Execute a custom Cypher query (results limited to 20 records)
2003 |
2004 | ## Knowledge Graph Schema:
2005 |
2006 | **Node Types:**
2007 | - Repository: `(r:Repository {name: string})`
2008 | - File: `(f:File {path: string, module_name: string})`
2009 | - Class: `(c:Class {name: string, full_name: string})`
2010 | - Method: `(m:Method {name: string, params_list: [string], params_detailed: [string], return_type: string, args: [string]})`
2011 | - Function: `(func:Function {name: string, params_list: [string], params_detailed: [string], return_type: string, args: [string]})`
2012 | - Attribute: `(a:Attribute {name: string, type: string})`
2013 |
2014 | **Relationships:**
2015 | - `(r:Repository)-[:CONTAINS]->(f:File)`
2016 | - `(f:File)-[:DEFINES]->(c:Class)`
2017 | - `(c:Class)-[:HAS_METHOD]->(m:Method)`
2018 | - `(c:Class)-[:HAS_ATTRIBUTE]->(a:Attribute)`
2019 | - `(f:File)-[:DEFINES]->(func:Function)`
2020 |
2021 | ## Example Workflow:
2022 | ```
2023 | 1. repos # See what repositories are available
2024 | 2. explore pydantic-ai # Explore a specific repository
2025 | 3. classes pydantic-ai # List classes in that repository
2026 | 4. class Agent # Explore the Agent class
2027 | 5. method run_stream # Search for run_stream method
2028 | 6. method __init__ Agent # Find Agent constructor
2029 | 7. query "MATCH (c:Class)-[:HAS_METHOD]->(m:Method) WHERE m.name = 'run' RETURN c.name, m.name LIMIT 5"
2030 | ```
2031 |
2032 | Args:
2033 | command: Command string to execute (see available commands above)
2034 |
2035 | Returns:
2036 | JSON string with query results, statistics, and metadata
2037 | """
2038 | try:
2039 | # Check if knowledge graph functionality is enabled
2040 | knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true"
2041 | if not knowledge_graph_enabled:
2042 | return json.dumps({
2043 | "success": False,
2044 | "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment."
2045 | }, indent=2)
2046 |
2047 | # Get Neo4j driver from context
2048 | repo_extractor = ctx.request_context.lifespan_context.repo_extractor
2049 | if not repo_extractor or not repo_extractor.driver:
2050 | return json.dumps({
2051 | "success": False,
2052 | "error": "Neo4j connection not available. Check Neo4j configuration in environment variables."
2053 | }, indent=2)
2054 |
2055 | # Parse command
2056 | command = command.strip()
2057 | if not command:
2058 | return json.dumps({
2059 | "success": False,
2060 | "command": "",
2061 | "error": "Command cannot be empty. Available commands: repos, explore <repo>, classes [repo], class <name>, method <name> [class], query <cypher>"
2062 | }, indent=2)
2063 |
2064 | parts = command.split()
2065 | cmd = parts[0].lower()
2066 | args = parts[1:] if len(parts) > 1 else []
2067 |
2068 | async with repo_extractor.driver.session() as session:
2069 | # Route to appropriate handler
2070 | if cmd == "repos":
2071 | return await _handle_repos_command(session, command)
2072 | elif cmd == "explore":
2073 | if not args:
2074 | return json.dumps({
2075 | "success": False,
2076 | "command": command,
2077 | "error": "Repository name required. Usage: explore <repo_name>"
2078 | }, indent=2)
2079 | return await _handle_explore_command(session, command, args[0])
2080 | elif cmd == "classes":
2081 | repo_name = args[0] if args else None
2082 | return await _handle_classes_command(session, command, repo_name)
2083 | elif cmd == "class":
2084 | if not args:
2085 | return json.dumps({
2086 | "success": False,
2087 | "command": command,
2088 | "error": "Class name required. Usage: class <class_name>"
2089 | }, indent=2)
2090 | return await _handle_class_command(session, command, args[0])
2091 | elif cmd == "method":
2092 | if not args:
2093 | return json.dumps({
2094 | "success": False,
2095 | "command": command,
2096 | "error": "Method name required. Usage: method <method_name> [class_name]"
2097 | }, indent=2)
2098 | method_name = args[0]
2099 | class_name = args[1] if len(args) > 1 else None
2100 | return await _handle_method_command(session, command, method_name, class_name)
2101 | elif cmd == "query":
2102 | if not args:
2103 | return json.dumps({
2104 | "success": False,
2105 | "command": command,
2106 | "error": "Cypher query required. Usage: query <cypher_query>"
2107 | }, indent=2)
2108 | cypher_query = " ".join(args)
2109 | return await _handle_query_command(session, command, cypher_query)
2110 | else:
2111 | return json.dumps({
2112 | "success": False,
2113 | "command": command,
2114 | "error": f"Unknown command '{cmd}'. Available commands: repos, explore <repo>, classes [repo], class <name>, method <name> [class], query <cypher>"
2115 | }, indent=2)
2116 |
2117 | except Exception as e:
2118 | return json.dumps({
2119 | "success": False,
2120 | "command": command,
2121 | "error": f"Query execution failed: {str(e)}"
2122 | }, indent=2)
2123 |
2124 |
2125 | async def _handle_repos_command(session, command: str) -> str:
2126 | """Handle 'repos' command - list all repositories"""
2127 | query = "MATCH (r:Repository) RETURN r.name as name ORDER BY r.name"
2128 | result = await session.run(query)
2129 |
2130 | repos = []
2131 | async for record in result:
2132 | repos.append(record['name'])
2133 |
2134 | return json.dumps({
2135 | "success": True,
2136 | "command": command,
2137 | "data": {
2138 | "repositories": repos
2139 | },
2140 | "metadata": {
2141 | "total_results": len(repos),
2142 | "limited": False
2143 | }
2144 | }, indent=2)
2145 |
2146 |
2147 | async def _handle_explore_command(session, command: str, repo_name: str) -> str:
2148 | """Handle 'explore <repo>' command - get repository overview"""
2149 | # Check if repository exists
2150 | repo_check_query = "MATCH (r:Repository {name: $repo_name}) RETURN r.name as name"
2151 | result = await session.run(repo_check_query, repo_name=repo_name)
2152 | repo_record = await result.single()
2153 |
2154 | if not repo_record:
2155 | return json.dumps({
2156 | "success": False,
2157 | "command": command,
2158 | "error": f"Repository '{repo_name}' not found in knowledge graph"
2159 | }, indent=2)
2160 |
2161 | # Get file count
2162 | files_query = """
2163 | MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)
2164 | RETURN count(f) as file_count
2165 | """
2166 | result = await session.run(files_query, repo_name=repo_name)
2167 | file_count = (await result.single())['file_count']
2168 |
2169 | # Get class count
2170 | classes_query = """
2171 | MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class)
2172 | RETURN count(DISTINCT c) as class_count
2173 | """
2174 | result = await session.run(classes_query, repo_name=repo_name)
2175 | class_count = (await result.single())['class_count']
2176 |
2177 | # Get function count
2178 | functions_query = """
2179 | MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(func:Function)
2180 | RETURN count(DISTINCT func) as function_count
2181 | """
2182 | result = await session.run(functions_query, repo_name=repo_name)
2183 | function_count = (await result.single())['function_count']
2184 |
2185 | # Get method count
2186 | methods_query = """
2187 | MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class)-[:HAS_METHOD]->(m:Method)
2188 | RETURN count(DISTINCT m) as method_count
2189 | """
2190 | result = await session.run(methods_query, repo_name=repo_name)
2191 | method_count = (await result.single())['method_count']
2192 |
2193 | return json.dumps({
2194 | "success": True,
2195 | "command": command,
2196 | "data": {
2197 | "repository": repo_name,
2198 | "statistics": {
2199 | "files": file_count,
2200 | "classes": class_count,
2201 | "functions": function_count,
2202 | "methods": method_count
2203 | }
2204 | },
2205 | "metadata": {
2206 | "total_results": 1,
2207 | "limited": False
2208 | }
2209 | }, indent=2)
2210 |
2211 |
2212 | async def _handle_classes_command(session, command: str, repo_name: str = None) -> str:
2213 | """Handle 'classes [repo]' command - list classes"""
2214 | limit = 20
2215 |
2216 | if repo_name:
2217 | query = """
2218 | MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class)
2219 | RETURN c.name as name, c.full_name as full_name
2220 | ORDER BY c.name
2221 | LIMIT $limit
2222 | """
2223 | result = await session.run(query, repo_name=repo_name, limit=limit)
2224 | else:
2225 | query = """
2226 | MATCH (c:Class)
2227 | RETURN c.name as name, c.full_name as full_name
2228 | ORDER BY c.name
2229 | LIMIT $limit
2230 | """
2231 | result = await session.run(query, limit=limit)
2232 |
2233 | classes = []
2234 | async for record in result:
2235 | classes.append({
2236 | 'name': record['name'],
2237 | 'full_name': record['full_name']
2238 | })
2239 |
2240 | return json.dumps({
2241 | "success": True,
2242 | "command": command,
2243 | "data": {
2244 | "classes": classes,
2245 | "repository_filter": repo_name
2246 | },
2247 | "metadata": {
2248 | "total_results": len(classes),
2249 | "limited": len(classes) >= limit
2250 | }
2251 | }, indent=2)
2252 |
2253 |
2254 | async def _handle_class_command(session, command: str, class_name: str) -> str:
2255 | """Handle 'class <name>' command - explore specific class"""
2256 | # Find the class
2257 | class_query = """
2258 | MATCH (c:Class)
2259 | WHERE c.name = $class_name OR c.full_name = $class_name
2260 | RETURN c.name as name, c.full_name as full_name
2261 | LIMIT 1
2262 | """
2263 | result = await session.run(class_query, class_name=class_name)
2264 | class_record = await result.single()
2265 |
2266 | if not class_record:
2267 | return json.dumps({
2268 | "success": False,
2269 | "command": command,
2270 | "error": f"Class '{class_name}' not found in knowledge graph"
2271 | }, indent=2)
2272 |
2273 | actual_name = class_record['name']
2274 | full_name = class_record['full_name']
2275 |
2276 | # Get methods
2277 | methods_query = """
2278 | MATCH (c:Class)-[:HAS_METHOD]->(m:Method)
2279 | WHERE c.name = $class_name OR c.full_name = $class_name
2280 | RETURN m.name as name, m.params_list as params_list, m.params_detailed as params_detailed, m.return_type as return_type
2281 | ORDER BY m.name
2282 | """
2283 | result = await session.run(methods_query, class_name=class_name)
2284 |
2285 | methods = []
2286 | async for record in result:
2287 | # Use detailed params if available, fall back to simple params
2288 | params_to_use = record['params_detailed'] or record['params_list'] or []
2289 | methods.append({
2290 | 'name': record['name'],
2291 | 'parameters': params_to_use,
2292 | 'return_type': record['return_type'] or 'Any'
2293 | })
2294 |
2295 | # Get attributes
2296 | attributes_query = """
2297 | MATCH (c:Class)-[:HAS_ATTRIBUTE]->(a:Attribute)
2298 | WHERE c.name = $class_name OR c.full_name = $class_name
2299 | RETURN a.name as name, a.type as type
2300 | ORDER BY a.name
2301 | """
2302 | result = await session.run(attributes_query, class_name=class_name)
2303 |
2304 | attributes = []
2305 | async for record in result:
2306 | attributes.append({
2307 | 'name': record['name'],
2308 | 'type': record['type'] or 'Any'
2309 | })
2310 |
2311 | return json.dumps({
2312 | "success": True,
2313 | "command": command,
2314 | "data": {
2315 | "class": {
2316 | "name": actual_name,
2317 | "full_name": full_name,
2318 | "methods": methods,
2319 | "attributes": attributes
2320 | }
2321 | },
2322 | "metadata": {
2323 | "total_results": 1,
2324 | "methods_count": len(methods),
2325 | "attributes_count": len(attributes),
2326 | "limited": False
2327 | }
2328 | }, indent=2)
2329 |
2330 |
2331 | async def _handle_method_command(session, command: str, method_name: str, class_name: str = None) -> str:
2332 | """Handle 'method <name> [class]' command - search for methods"""
2333 | if class_name:
2334 | query = """
2335 | MATCH (c:Class)-[:HAS_METHOD]->(m:Method)
2336 | WHERE (c.name = $class_name OR c.full_name = $class_name)
2337 | AND m.name = $method_name
2338 | RETURN c.name as class_name, c.full_name as class_full_name,
2339 | m.name as method_name, m.params_list as params_list,
2340 | m.params_detailed as params_detailed, m.return_type as return_type, m.args as args
2341 | """
2342 | result = await session.run(query, class_name=class_name, method_name=method_name)
2343 | else:
2344 | query = """
2345 | MATCH (c:Class)-[:HAS_METHOD]->(m:Method)
2346 | WHERE m.name = $method_name
2347 | RETURN c.name as class_name, c.full_name as class_full_name,
2348 | m.name as method_name, m.params_list as params_list,
2349 | m.params_detailed as params_detailed, m.return_type as return_type, m.args as args
2350 | ORDER BY c.name
2351 | LIMIT 20
2352 | """
2353 | result = await session.run(query, method_name=method_name)
2354 |
2355 | methods = []
2356 | async for record in result:
2357 | # Use detailed params if available, fall back to simple params
2358 | params_to_use = record['params_detailed'] or record['params_list'] or []
2359 | methods.append({
2360 | 'class_name': record['class_name'],
2361 | 'class_full_name': record['class_full_name'],
2362 | 'method_name': record['method_name'],
2363 | 'parameters': params_to_use,
2364 | 'return_type': record['return_type'] or 'Any',
2365 | 'legacy_args': record['args'] or []
2366 | })
2367 |
2368 | if not methods:
2369 | return json.dumps({
2370 | "success": False,
2371 | "command": command,
2372 | "error": f"Method '{method_name}'" + (f" in class '{class_name}'" if class_name else "") + " not found"
2373 | }, indent=2)
2374 |
2375 | return json.dumps({
2376 | "success": True,
2377 | "command": command,
2378 | "data": {
2379 | "methods": methods,
2380 | "class_filter": class_name
2381 | },
2382 | "metadata": {
2383 | "total_results": len(methods),
2384 | "limited": len(methods) >= 20 and not class_name
2385 | }
2386 | }, indent=2)
2387 |
2388 |
2389 | async def _handle_query_command(session, command: str, cypher_query: str) -> str:
2390 | """Handle 'query <cypher>' command - execute custom Cypher query"""
2391 | try:
2392 | # Execute the query with a limit to prevent overwhelming responses
2393 | result = await session.run(cypher_query)
2394 |
2395 | records = []
2396 | count = 0
2397 | async for record in result:
2398 | records.append(dict(record))
2399 | count += 1
2400 | if count >= 20: # Limit results to prevent overwhelming responses
2401 | break
2402 |
2403 | return json.dumps({
2404 | "success": True,
2405 | "command": command,
2406 | "data": {
2407 | "query": cypher_query,
2408 | "results": records
2409 | },
2410 | "metadata": {
2411 | "total_results": len(records),
2412 | "limited": len(records) >= 20
2413 | }
2414 | }, indent=2)
2415 |
2416 | except Exception as e:
2417 | return json.dumps({
2418 | "success": False,
2419 | "command": command,
2420 | "error": f"Cypher query error: {str(e)}",
2421 | "data": {
2422 | "query": cypher_query
2423 | }
2424 | }, indent=2)
2425 |
2426 |
2427 | @mcp.tool()
2428 | async def parse_github_repository(ctx: Context, repo_url: str) -> str:
2429 | """
2430 | Parse a GitHub repository into the Neo4j knowledge graph.
2431 |
2432 | This tool clones a GitHub repository, analyzes its Python files, and stores
2433 | the code structure (classes, methods, functions, imports) in Neo4j for use
2434 | in hallucination detection. The tool:
2435 |
2436 | - Clones the repository to a temporary location
2437 | - Analyzes Python files to extract code structure
2438 | - Stores classes, methods, functions, and imports in Neo4j
2439 | - Provides detailed statistics about the parsing results
2440 | - Automatically handles module name detection for imports
2441 |
2442 | Args:
2443 | repo_url: GitHub repository URL (e.g., 'https://github.com/user/repo.git')
2444 |
2445 | Returns:
2446 | JSON string with parsing results, statistics, and repository information
2447 | """
2448 | try:
2449 | # Check if knowledge graph functionality is enabled
2450 | knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true"
2451 | if not knowledge_graph_enabled:
2452 | return json.dumps({
2453 | "success": False,
2454 | "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment."
2455 | }, indent=2)
2456 |
2457 | # Get the repository extractor from context
2458 | repo_extractor = ctx.request_context.lifespan_context.repo_extractor
2459 |
2460 | if not repo_extractor:
2461 | return json.dumps({
2462 | "success": False,
2463 | "error": "Repository extractor not available. Check Neo4j configuration in environment variables."
2464 | }, indent=2)
2465 |
2466 | # Validate repository URL
2467 | validation = validate_github_url(repo_url)
2468 | if not validation["valid"]:
2469 | return json.dumps({
2470 | "success": False,
2471 | "repo_url": repo_url,
2472 | "error": validation["error"]
2473 | }, indent=2)
2474 |
2475 | repo_name = validation["repo_name"]
2476 |
2477 | # Parse the repository (this includes cloning, analysis, and Neo4j storage)
2478 | print(f"Starting repository analysis for: {repo_name}")
2479 | await repo_extractor.analyze_repository(repo_url)
2480 | print(f"Repository analysis completed for: {repo_name}")
2481 |
2482 | # Query Neo4j for statistics about the parsed repository
2483 | async with repo_extractor.driver.session() as session:
2484 | # Get comprehensive repository statistics
2485 | stats_query = """
2486 | MATCH (r:Repository {name: $repo_name})
2487 | OPTIONAL MATCH (r)-[:CONTAINS]->(f:File)
2488 | OPTIONAL MATCH (f)-[:DEFINES]->(c:Class)
2489 | OPTIONAL MATCH (c)-[:HAS_METHOD]->(m:Method)
2490 | OPTIONAL MATCH (f)-[:DEFINES]->(func:Function)
2491 | OPTIONAL MATCH (c)-[:HAS_ATTRIBUTE]->(a:Attribute)
2492 | WITH r,
2493 | count(DISTINCT f) as files_count,
2494 | count(DISTINCT c) as classes_count,
2495 | count(DISTINCT m) as methods_count,
2496 | count(DISTINCT func) as functions_count,
2497 | count(DISTINCT a) as attributes_count
2498 |
2499 | // Get some sample module names
2500 | OPTIONAL MATCH (r)-[:CONTAINS]->(sample_f:File)
2501 | WITH r, files_count, classes_count, methods_count, functions_count, attributes_count,
2502 | collect(DISTINCT sample_f.module_name)[0..5] as sample_modules
2503 |
2504 | RETURN
2505 | r.name as repo_name,
2506 | files_count,
2507 | classes_count,
2508 | methods_count,
2509 | functions_count,
2510 | attributes_count,
2511 | sample_modules
2512 | """
2513 |
2514 | result = await session.run(stats_query, repo_name=repo_name)
2515 | record = await result.single()
2516 |
2517 | if record:
2518 | stats = {
2519 | "repository": record['repo_name'],
2520 | "files_processed": record['files_count'],
2521 | "classes_created": record['classes_count'],
2522 | "methods_created": record['methods_count'],
2523 | "functions_created": record['functions_count'],
2524 | "attributes_created": record['attributes_count'],
2525 | "sample_modules": record['sample_modules'] or []
2526 | }
2527 | else:
2528 | return json.dumps({
2529 | "success": False,
2530 | "repo_url": repo_url,
2531 | "error": f"Repository '{repo_name}' not found in database after parsing"
2532 | }, indent=2)
2533 |
2534 | return json.dumps({
2535 | "success": True,
2536 | "repo_url": repo_url,
2537 | "repo_name": repo_name,
2538 | "message": f"Successfully parsed repository '{repo_name}' into knowledge graph",
2539 | "statistics": stats,
2540 | "ready_for_validation": True,
2541 | "next_steps": [
2542 | "Repository is now available for hallucination detection",
2543 | f"Use check_ai_script_hallucinations to validate scripts against {repo_name}",
2544 | "The knowledge graph contains classes, methods, and functions from this repository"
2545 | ]
2546 | }, indent=2)
2547 |
2548 | except Exception as e:
2549 | return json.dumps({
2550 | "success": False,
2551 | "repo_url": repo_url,
2552 | "error": f"Repository parsing failed: {str(e)}"
2553 | }, indent=2)
2554 |
2555 | async def crawl_markdown_file(crawler: AsyncWebCrawler, url: str) -> List[Dict[str, Any]]:
2556 | """
2557 | Crawl a .txt or markdown file.
2558 |
2559 | Args:
2560 | crawler: AsyncWebCrawler instance
2561 | url: URL of the file
2562 |
2563 | Returns:
2564 | List of dictionaries with URL and markdown content
2565 | """
2566 | crawl_config = CrawlerRunConfig()
2567 |
2568 | result = await crawler.arun(url=url, config=crawl_config)
2569 | if result.success and result.markdown:
2570 | return [{'url': url, 'markdown': result.markdown}]
2571 | else:
2572 | print(f"Failed to crawl {url}: {result.error_message}")
2573 | return []
2574 |
2575 | async def crawl_batch(crawler: AsyncWebCrawler, urls: List[str], max_concurrent: int = 10) -> List[Dict[str, Any]]:
2576 | """
2577 | Batch crawl multiple URLs in parallel.
2578 |
2579 | Args:
2580 | crawler: AsyncWebCrawler instance
2581 | urls: List of URLs to crawl
2582 | max_concurrent: Maximum number of concurrent browser sessions
2583 |
2584 | Returns:
2585 | List of dictionaries with URL and markdown content
2586 | """
2587 | crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=False)
2588 | dispatcher = MemoryAdaptiveDispatcher(
2589 | memory_threshold_percent=70.0,
2590 | check_interval=1.0,
2591 | max_session_permit=max_concurrent
2592 | )
2593 |
2594 | results = await crawler.arun_many(urls=urls, config=crawl_config, dispatcher=dispatcher)
2595 | return [{'url': r.url, 'markdown': r.markdown, 'links': r.links} for r in results if r.success and r.markdown]
2596 |
2597 | async def crawl_recursive_internal_links(crawler: AsyncWebCrawler, start_urls: List[str], max_depth: int = 3, max_concurrent: int = 10) -> List[Dict[str, Any]]:
2598 | """
2599 | Recursively crawl internal links from start URLs up to a maximum depth.
2600 |
2601 | Args:
2602 | crawler: AsyncWebCrawler instance
2603 | start_urls: List of starting URLs
2604 | max_depth: Maximum recursion depth
2605 | max_concurrent: Maximum number of concurrent browser sessions
2606 |
2607 | Returns:
2608 | List of dictionaries with URL and markdown content
2609 | """
2610 | run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=False)
2611 | dispatcher = MemoryAdaptiveDispatcher(
2612 | memory_threshold_percent=70.0,
2613 | check_interval=1.0,
2614 | max_session_permit=max_concurrent
2615 | )
2616 |
2617 | visited = set()
2618 |
2619 | def normalize_url(url):
2620 | return urldefrag(url)[0]
2621 |
2622 | current_urls = set([normalize_url(u) for u in start_urls])
2623 | results_all = []
2624 |
2625 | for depth in range(max_depth):
2626 | urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited]
2627 | if not urls_to_crawl:
2628 | break
2629 |
2630 | results = await crawler.arun_many(urls=urls_to_crawl, config=run_config, dispatcher=dispatcher)
2631 | next_level_urls = set()
2632 |
2633 | for result in results:
2634 | norm_url = normalize_url(result.url)
2635 | visited.add(norm_url)
2636 |
2637 | if result.success and result.markdown:
2638 | results_all.append({'url': result.url, 'markdown': result.markdown})
2639 | for link in result.links.get("internal", []):
2640 | next_url = normalize_url(link["href"])
2641 | if next_url not in visited:
2642 | next_level_urls.add(next_url)
2643 |
2644 | current_urls = next_level_urls
2645 |
2646 | return results_all
2647 |
2648 | async def main():
2649 | transport = os.getenv("TRANSPORT", "sse")
2650 | if transport == 'sse':
2651 | # Run the MCP server with sse transport
2652 | await mcp.run_sse_async()
2653 | else:
2654 | # Run the MCP server with stdio transport
2655 | await mcp.run_stdio_async()
2656 |
2657 | if __name__ == "__main__":
2658 | asyncio.run(main())
```