This is page 2 of 2. Use http://codebase.md/coleam00/mcp-crawl4ai-rag?page={x} to view the full context. # Directory Structure ``` ├── .dockerignore ├── .env.example ├── .gitattributes ├── .gitignore ├── crawled_pages.sql ├── Dockerfile ├── knowledge_graphs │ ├── ai_hallucination_detector.py │ ├── ai_script_analyzer.py │ ├── hallucination_reporter.py │ ├── knowledge_graph_validator.py │ ├── parse_repo_into_neo4j.py │ ├── query_knowledge_graph.py │ └── test_script.py ├── LICENSE ├── pyproject.toml ├── README.md ├── src │ ├── crawl4ai_mcp.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/crawl4ai_mcp.py: -------------------------------------------------------------------------------- ```python """ MCP server for web crawling with Crawl4AI. This server provides tools to crawl websites using Crawl4AI, automatically detecting the appropriate crawl method based on URL type (sitemap, txt file, or regular webpage). Also includes AI hallucination detection and repository parsing tools using Neo4j knowledge graphs. """ from mcp.server.fastmcp import FastMCP, Context from sentence_transformers import CrossEncoder from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from typing import List, Dict, Any, Optional from urllib.parse import urlparse, urldefrag from xml.etree import ElementTree from dotenv import load_dotenv from supabase import Client from pathlib import Path import requests import asyncio import json import os import re import concurrent.futures import sys from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher # Add knowledge_graphs folder to path for importing knowledge graph modules knowledge_graphs_path = Path(__file__).resolve().parent.parent / 'knowledge_graphs' sys.path.append(str(knowledge_graphs_path)) from utils import ( get_supabase_client, add_documents_to_supabase, search_documents, extract_code_blocks, generate_code_example_summary, add_code_examples_to_supabase, update_source_info, extract_source_summary, search_code_examples ) # Import knowledge graph modules from knowledge_graph_validator import KnowledgeGraphValidator from parse_repo_into_neo4j import DirectNeo4jExtractor from ai_script_analyzer import AIScriptAnalyzer from hallucination_reporter import HallucinationReporter # Load environment variables from the project root .env file project_root = Path(__file__).resolve().parent.parent dotenv_path = project_root / '.env' # Force override of existing environment variables load_dotenv(dotenv_path, override=True) # Helper functions for Neo4j validation and error handling def validate_neo4j_connection() -> bool: """Check if Neo4j environment variables are configured.""" return all([ os.getenv("NEO4J_URI"), os.getenv("NEO4J_USER"), os.getenv("NEO4J_PASSWORD") ]) def format_neo4j_error(error: Exception) -> str: """Format Neo4j connection errors for user-friendly messages.""" error_str = str(error).lower() if "authentication" in error_str or "unauthorized" in error_str: return "Neo4j authentication failed. Check NEO4J_USER and NEO4J_PASSWORD." elif "connection" in error_str or "refused" in error_str or "timeout" in error_str: return "Cannot connect to Neo4j. Check NEO4J_URI and ensure Neo4j is running." elif "database" in error_str: return "Neo4j database error. Check if the database exists and is accessible." else: return f"Neo4j error: {str(error)}" def validate_script_path(script_path: str) -> Dict[str, Any]: """Validate script path and return error info if invalid.""" if not script_path or not isinstance(script_path, str): return {"valid": False, "error": "Script path is required"} if not os.path.exists(script_path): return {"valid": False, "error": f"Script not found: {script_path}"} if not script_path.endswith('.py'): return {"valid": False, "error": "Only Python (.py) files are supported"} try: # Check if file is readable with open(script_path, 'r', encoding='utf-8') as f: f.read(1) # Read first character to test return {"valid": True} except Exception as e: return {"valid": False, "error": f"Cannot read script file: {str(e)}"} def validate_github_url(repo_url: str) -> Dict[str, Any]: """Validate GitHub repository URL.""" if not repo_url or not isinstance(repo_url, str): return {"valid": False, "error": "Repository URL is required"} repo_url = repo_url.strip() # Basic GitHub URL validation if not ("github.com" in repo_url.lower() or repo_url.endswith(".git")): return {"valid": False, "error": "Please provide a valid GitHub repository URL"} # Check URL format if not (repo_url.startswith("https://") or repo_url.startswith("git@")): return {"valid": False, "error": "Repository URL must start with https:// or git@"} return {"valid": True, "repo_name": repo_url.split('/')[-1].replace('.git', '')} # Create a dataclass for our application context @dataclass class Crawl4AIContext: """Context for the Crawl4AI MCP server.""" crawler: AsyncWebCrawler supabase_client: Client reranking_model: Optional[CrossEncoder] = None knowledge_validator: Optional[Any] = None # KnowledgeGraphValidator when available repo_extractor: Optional[Any] = None # DirectNeo4jExtractor when available @asynccontextmanager async def crawl4ai_lifespan(server: FastMCP) -> AsyncIterator[Crawl4AIContext]: """ Manages the Crawl4AI client lifecycle. Args: server: The FastMCP server instance Yields: Crawl4AIContext: The context containing the Crawl4AI crawler and Supabase client """ # Create browser configuration browser_config = BrowserConfig( headless=True, verbose=False ) # Initialize the crawler crawler = AsyncWebCrawler(config=browser_config) await crawler.__aenter__() # Initialize Supabase client supabase_client = get_supabase_client() # Initialize cross-encoder model for reranking if enabled reranking_model = None if os.getenv("USE_RERANKING", "false") == "true": try: reranking_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") except Exception as e: print(f"Failed to load reranking model: {e}") reranking_model = None # Initialize Neo4j components if configured and enabled knowledge_validator = None repo_extractor = None # Check if knowledge graph functionality is enabled knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true" if knowledge_graph_enabled: neo4j_uri = os.getenv("NEO4J_URI") neo4j_user = os.getenv("NEO4J_USER") neo4j_password = os.getenv("NEO4J_PASSWORD") if neo4j_uri and neo4j_user and neo4j_password: try: print("Initializing knowledge graph components...") # Initialize knowledge graph validator knowledge_validator = KnowledgeGraphValidator(neo4j_uri, neo4j_user, neo4j_password) await knowledge_validator.initialize() print("✓ Knowledge graph validator initialized") # Initialize repository extractor repo_extractor = DirectNeo4jExtractor(neo4j_uri, neo4j_user, neo4j_password) await repo_extractor.initialize() print("✓ Repository extractor initialized") except Exception as e: print(f"Failed to initialize Neo4j components: {format_neo4j_error(e)}") knowledge_validator = None repo_extractor = None else: print("Neo4j credentials not configured - knowledge graph tools will be unavailable") else: print("Knowledge graph functionality disabled - set USE_KNOWLEDGE_GRAPH=true to enable") try: yield Crawl4AIContext( crawler=crawler, supabase_client=supabase_client, reranking_model=reranking_model, knowledge_validator=knowledge_validator, repo_extractor=repo_extractor ) finally: # Clean up all components await crawler.__aexit__(None, None, None) if knowledge_validator: try: await knowledge_validator.close() print("✓ Knowledge graph validator closed") except Exception as e: print(f"Error closing knowledge validator: {e}") if repo_extractor: try: await repo_extractor.close() print("✓ Repository extractor closed") except Exception as e: print(f"Error closing repository extractor: {e}") # Initialize FastMCP server mcp = FastMCP( "mcp-crawl4ai-rag", description="MCP server for RAG and web crawling with Crawl4AI", lifespan=crawl4ai_lifespan, host=os.getenv("HOST", "0.0.0.0"), port=os.getenv("PORT", "8051") ) def rerank_results(model: CrossEncoder, query: str, results: List[Dict[str, Any]], content_key: str = "content") -> List[Dict[str, Any]]: """ Rerank search results using a cross-encoder model. Args: model: The cross-encoder model to use for reranking query: The search query results: List of search results content_key: The key in each result dict that contains the text content Returns: Reranked list of results """ if not model or not results: return results try: # Extract content from results texts = [result.get(content_key, "") for result in results] # Create pairs of [query, document] for the cross-encoder pairs = [[query, text] for text in texts] # Get relevance scores from the cross-encoder scores = model.predict(pairs) # Add scores to results and sort by score (descending) for i, result in enumerate(results): result["rerank_score"] = float(scores[i]) # Sort by rerank score reranked = sorted(results, key=lambda x: x.get("rerank_score", 0), reverse=True) return reranked except Exception as e: print(f"Error during reranking: {e}") return results def is_sitemap(url: str) -> bool: """ Check if a URL is a sitemap. Args: url: URL to check Returns: True if the URL is a sitemap, False otherwise """ return url.endswith('sitemap.xml') or 'sitemap' in urlparse(url).path def is_txt(url: str) -> bool: """ Check if a URL is a text file. Args: url: URL to check Returns: True if the URL is a text file, False otherwise """ return url.endswith('.txt') def parse_sitemap(sitemap_url: str) -> List[str]: """ Parse a sitemap and extract URLs. Args: sitemap_url: URL of the sitemap Returns: List of URLs found in the sitemap """ resp = requests.get(sitemap_url) urls = [] if resp.status_code == 200: try: tree = ElementTree.fromstring(resp.content) urls = [loc.text for loc in tree.findall('.//{*}loc')] except Exception as e: print(f"Error parsing sitemap XML: {e}") return urls def smart_chunk_markdown(text: str, chunk_size: int = 5000) -> List[str]: """Split text into chunks, respecting code blocks and paragraphs.""" chunks = [] start = 0 text_length = len(text) while start < text_length: # Calculate end position end = start + chunk_size # If we're at the end of the text, just take what's left if end >= text_length: chunks.append(text[start:].strip()) break # Try to find a code block boundary first (```) chunk = text[start:end] code_block = chunk.rfind('```') if code_block != -1 and code_block > chunk_size * 0.3: end = start + code_block # If no code block, try to break at a paragraph elif '\n\n' in chunk: # Find the last paragraph break last_break = chunk.rfind('\n\n') if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size end = start + last_break # If no paragraph break, try to break at a sentence elif '. ' in chunk: # Find the last sentence break last_period = chunk.rfind('. ') if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size end = start + last_period + 1 # Extract chunk and clean it up chunk = text[start:end].strip() if chunk: chunks.append(chunk) # Move start position for next chunk start = end return chunks def extract_section_info(chunk: str) -> Dict[str, Any]: """ Extracts headers and stats from a chunk. Args: chunk: Markdown chunk Returns: Dictionary with headers and stats """ headers = re.findall(r'^(#+)\s+(.+)$', chunk, re.MULTILINE) header_str = '; '.join([f'{h[0]} {h[1]}' for h in headers]) if headers else '' return { "headers": header_str, "char_count": len(chunk), "word_count": len(chunk.split()) } def process_code_example(args): """ Process a single code example to generate its summary. This function is designed to be used with concurrent.futures. Args: args: Tuple containing (code, context_before, context_after) Returns: The generated summary """ code, context_before, context_after = args return generate_code_example_summary(code, context_before, context_after) @mcp.tool() async def crawl_single_page(ctx: Context, url: str) -> str: """ Crawl a single web page and store its content in Supabase. This tool is ideal for quickly retrieving content from a specific URL without following links. The content is stored in Supabase for later retrieval and querying. Args: ctx: The MCP server provided context url: URL of the web page to crawl Returns: Summary of the crawling operation and storage in Supabase """ try: # Get the crawler from the context crawler = ctx.request_context.lifespan_context.crawler supabase_client = ctx.request_context.lifespan_context.supabase_client # Configure the crawl run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=False) # Crawl the page result = await crawler.arun(url=url, config=run_config) if result.success and result.markdown: # Extract source_id parsed_url = urlparse(url) source_id = parsed_url.netloc or parsed_url.path # Chunk the content chunks = smart_chunk_markdown(result.markdown) # Prepare data for Supabase urls = [] chunk_numbers = [] contents = [] metadatas = [] total_word_count = 0 for i, chunk in enumerate(chunks): urls.append(url) chunk_numbers.append(i) contents.append(chunk) # Extract metadata meta = extract_section_info(chunk) meta["chunk_index"] = i meta["url"] = url meta["source"] = source_id meta["crawl_time"] = str(asyncio.current_task().get_coro().__name__) metadatas.append(meta) # Accumulate word count total_word_count += meta.get("word_count", 0) # Create url_to_full_document mapping url_to_full_document = {url: result.markdown} # Update source information FIRST (before inserting documents) source_summary = extract_source_summary(source_id, result.markdown[:5000]) # Use first 5000 chars for summary update_source_info(supabase_client, source_id, source_summary, total_word_count) # Add documentation chunks to Supabase (AFTER source exists) add_documents_to_supabase(supabase_client, urls, chunk_numbers, contents, metadatas, url_to_full_document) # Extract and process code examples only if enabled extract_code_examples = os.getenv("USE_AGENTIC_RAG", "false") == "true" if extract_code_examples: code_blocks = extract_code_blocks(result.markdown) if code_blocks: code_urls = [] code_chunk_numbers = [] code_examples = [] code_summaries = [] code_metadatas = [] # Process code examples in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Prepare arguments for parallel processing summary_args = [(block['code'], block['context_before'], block['context_after']) for block in code_blocks] # Generate summaries in parallel summaries = list(executor.map(process_code_example, summary_args)) # Prepare code example data for i, (block, summary) in enumerate(zip(code_blocks, summaries)): code_urls.append(url) code_chunk_numbers.append(i) code_examples.append(block['code']) code_summaries.append(summary) # Create metadata for code example code_meta = { "chunk_index": i, "url": url, "source": source_id, "char_count": len(block['code']), "word_count": len(block['code'].split()) } code_metadatas.append(code_meta) # Add code examples to Supabase add_code_examples_to_supabase( supabase_client, code_urls, code_chunk_numbers, code_examples, code_summaries, code_metadatas ) return json.dumps({ "success": True, "url": url, "chunks_stored": len(chunks), "code_examples_stored": len(code_blocks) if code_blocks else 0, "content_length": len(result.markdown), "total_word_count": total_word_count, "source_id": source_id, "links_count": { "internal": len(result.links.get("internal", [])), "external": len(result.links.get("external", [])) } }, indent=2) else: return json.dumps({ "success": False, "url": url, "error": result.error_message }, indent=2) except Exception as e: return json.dumps({ "success": False, "url": url, "error": str(e) }, indent=2) @mcp.tool() async def smart_crawl_url(ctx: Context, url: str, max_depth: int = 3, max_concurrent: int = 10, chunk_size: int = 5000) -> str: """ Intelligently crawl a URL based on its type and store content in Supabase. This tool automatically detects the URL type and applies the appropriate crawling method: - For sitemaps: Extracts and crawls all URLs in parallel - For text files (llms.txt): Directly retrieves the content - For regular webpages: Recursively crawls internal links up to the specified depth All crawled content is chunked and stored in Supabase for later retrieval and querying. Args: ctx: The MCP server provided context url: URL to crawl (can be a regular webpage, sitemap.xml, or .txt file) max_depth: Maximum recursion depth for regular URLs (default: 3) max_concurrent: Maximum number of concurrent browser sessions (default: 10) chunk_size: Maximum size of each content chunk in characters (default: 1000) Returns: JSON string with crawl summary and storage information """ try: # Get the crawler from the context crawler = ctx.request_context.lifespan_context.crawler supabase_client = ctx.request_context.lifespan_context.supabase_client # Determine the crawl strategy crawl_results = [] crawl_type = None if is_txt(url): # For text files, use simple crawl crawl_results = await crawl_markdown_file(crawler, url) crawl_type = "text_file" elif is_sitemap(url): # For sitemaps, extract URLs and crawl in parallel sitemap_urls = parse_sitemap(url) if not sitemap_urls: return json.dumps({ "success": False, "url": url, "error": "No URLs found in sitemap" }, indent=2) crawl_results = await crawl_batch(crawler, sitemap_urls, max_concurrent=max_concurrent) crawl_type = "sitemap" else: # For regular URLs, use recursive crawl crawl_results = await crawl_recursive_internal_links(crawler, [url], max_depth=max_depth, max_concurrent=max_concurrent) crawl_type = "webpage" if not crawl_results: return json.dumps({ "success": False, "url": url, "error": "No content found" }, indent=2) # Process results and store in Supabase urls = [] chunk_numbers = [] contents = [] metadatas = [] chunk_count = 0 # Track sources and their content source_content_map = {} source_word_counts = {} # Process documentation chunks for doc in crawl_results: source_url = doc['url'] md = doc['markdown'] chunks = smart_chunk_markdown(md, chunk_size=chunk_size) # Extract source_id parsed_url = urlparse(source_url) source_id = parsed_url.netloc or parsed_url.path # Store content for source summary generation if source_id not in source_content_map: source_content_map[source_id] = md[:5000] # Store first 5000 chars source_word_counts[source_id] = 0 for i, chunk in enumerate(chunks): urls.append(source_url) chunk_numbers.append(i) contents.append(chunk) # Extract metadata meta = extract_section_info(chunk) meta["chunk_index"] = i meta["url"] = source_url meta["source"] = source_id meta["crawl_type"] = crawl_type meta["crawl_time"] = str(asyncio.current_task().get_coro().__name__) metadatas.append(meta) # Accumulate word count source_word_counts[source_id] += meta.get("word_count", 0) chunk_count += 1 # Create url_to_full_document mapping url_to_full_document = {} for doc in crawl_results: url_to_full_document[doc['url']] = doc['markdown'] # Update source information for each unique source FIRST (before inserting documents) with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: source_summary_args = [(source_id, content) for source_id, content in source_content_map.items()] source_summaries = list(executor.map(lambda args: extract_source_summary(args[0], args[1]), source_summary_args)) for (source_id, _), summary in zip(source_summary_args, source_summaries): word_count = source_word_counts.get(source_id, 0) update_source_info(supabase_client, source_id, summary, word_count) # Add documentation chunks to Supabase (AFTER sources exist) batch_size = 20 add_documents_to_supabase(supabase_client, urls, chunk_numbers, contents, metadatas, url_to_full_document, batch_size=batch_size) # Extract and process code examples from all documents only if enabled extract_code_examples_enabled = os.getenv("USE_AGENTIC_RAG", "false") == "true" if extract_code_examples_enabled: all_code_blocks = [] code_urls = [] code_chunk_numbers = [] code_examples = [] code_summaries = [] code_metadatas = [] # Extract code blocks from all documents for doc in crawl_results: source_url = doc['url'] md = doc['markdown'] code_blocks = extract_code_blocks(md) if code_blocks: # Process code examples in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Prepare arguments for parallel processing summary_args = [(block['code'], block['context_before'], block['context_after']) for block in code_blocks] # Generate summaries in parallel summaries = list(executor.map(process_code_example, summary_args)) # Prepare code example data parsed_url = urlparse(source_url) source_id = parsed_url.netloc or parsed_url.path for i, (block, summary) in enumerate(zip(code_blocks, summaries)): code_urls.append(source_url) code_chunk_numbers.append(len(code_examples)) # Use global code example index code_examples.append(block['code']) code_summaries.append(summary) # Create metadata for code example code_meta = { "chunk_index": len(code_examples) - 1, "url": source_url, "source": source_id, "char_count": len(block['code']), "word_count": len(block['code'].split()) } code_metadatas.append(code_meta) # Add all code examples to Supabase if code_examples: add_code_examples_to_supabase( supabase_client, code_urls, code_chunk_numbers, code_examples, code_summaries, code_metadatas, batch_size=batch_size ) return json.dumps({ "success": True, "url": url, "crawl_type": crawl_type, "pages_crawled": len(crawl_results), "chunks_stored": chunk_count, "code_examples_stored": len(code_examples), "sources_updated": len(source_content_map), "urls_crawled": [doc['url'] for doc in crawl_results][:5] + (["..."] if len(crawl_results) > 5 else []) }, indent=2) except Exception as e: return json.dumps({ "success": False, "url": url, "error": str(e) }, indent=2) @mcp.tool() async def get_available_sources(ctx: Context) -> str: """ Get all available sources from the sources table. This tool returns a list of all unique sources (domains) that have been crawled and stored in the database, along with their summaries and statistics. This is useful for discovering what content is available for querying. Always use this tool before calling the RAG query or code example query tool with a specific source filter! Args: ctx: The MCP server provided context Returns: JSON string with the list of available sources and their details """ try: # Get the Supabase client from the context supabase_client = ctx.request_context.lifespan_context.supabase_client # Query the sources table directly result = supabase_client.from_('sources')\ .select('*')\ .order('source_id')\ .execute() # Format the sources with their details sources = [] if result.data: for source in result.data: sources.append({ "source_id": source.get("source_id"), "summary": source.get("summary"), "total_words": source.get("total_words"), "created_at": source.get("created_at"), "updated_at": source.get("updated_at") }) return json.dumps({ "success": True, "sources": sources, "count": len(sources) }, indent=2) except Exception as e: return json.dumps({ "success": False, "error": str(e) }, indent=2) @mcp.tool() async def perform_rag_query(ctx: Context, query: str, source: str = None, match_count: int = 5) -> str: """ Perform a RAG (Retrieval Augmented Generation) query on the stored content. This tool searches the vector database for content relevant to the query and returns the matching documents. Optionally filter by source domain. Get the source by using the get_available_sources tool before calling this search! Args: ctx: The MCP server provided context query: The search query source: Optional source domain to filter results (e.g., 'example.com') match_count: Maximum number of results to return (default: 5) Returns: JSON string with the search results """ try: # Get the Supabase client from the context supabase_client = ctx.request_context.lifespan_context.supabase_client # Check if hybrid search is enabled use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false") == "true" # Prepare filter if source is provided and not empty filter_metadata = None if source and source.strip(): filter_metadata = {"source": source} if use_hybrid_search: # Hybrid search: combine vector and keyword search # 1. Get vector search results (get more to account for filtering) vector_results = search_documents( client=supabase_client, query=query, match_count=match_count * 2, # Get double to have room for filtering filter_metadata=filter_metadata ) # 2. Get keyword search results using ILIKE keyword_query = supabase_client.from_('crawled_pages')\ .select('id, url, chunk_number, content, metadata, source_id')\ .ilike('content', f'%{query}%') # Apply source filter if provided if source and source.strip(): keyword_query = keyword_query.eq('source_id', source) # Execute keyword search keyword_response = keyword_query.limit(match_count * 2).execute() keyword_results = keyword_response.data if keyword_response.data else [] # 3. Combine results with preference for items appearing in both seen_ids = set() combined_results = [] # First, add items that appear in both searches (these are the best matches) vector_ids = {r.get('id') for r in vector_results if r.get('id')} for kr in keyword_results: if kr['id'] in vector_ids and kr['id'] not in seen_ids: # Find the vector result to get similarity score for vr in vector_results: if vr.get('id') == kr['id']: # Boost similarity score for items in both results vr['similarity'] = min(1.0, vr.get('similarity', 0) * 1.2) combined_results.append(vr) seen_ids.add(kr['id']) break # Then add remaining vector results (semantic matches without exact keyword) for vr in vector_results: if vr.get('id') and vr['id'] not in seen_ids and len(combined_results) < match_count: combined_results.append(vr) seen_ids.add(vr['id']) # Finally, add pure keyword matches if we still need more results for kr in keyword_results: if kr['id'] not in seen_ids and len(combined_results) < match_count: # Convert keyword result to match vector result format combined_results.append({ 'id': kr['id'], 'url': kr['url'], 'chunk_number': kr['chunk_number'], 'content': kr['content'], 'metadata': kr['metadata'], 'source_id': kr['source_id'], 'similarity': 0.5 # Default similarity for keyword-only matches }) seen_ids.add(kr['id']) # Use combined results results = combined_results[:match_count] else: # Standard vector search only results = search_documents( client=supabase_client, query=query, match_count=match_count, filter_metadata=filter_metadata ) # Apply reranking if enabled use_reranking = os.getenv("USE_RERANKING", "false") == "true" if use_reranking and ctx.request_context.lifespan_context.reranking_model: results = rerank_results(ctx.request_context.lifespan_context.reranking_model, query, results, content_key="content") # Format the results formatted_results = [] for result in results: formatted_result = { "url": result.get("url"), "content": result.get("content"), "metadata": result.get("metadata"), "similarity": result.get("similarity") } # Include rerank score if available if "rerank_score" in result: formatted_result["rerank_score"] = result["rerank_score"] formatted_results.append(formatted_result) return json.dumps({ "success": True, "query": query, "source_filter": source, "search_mode": "hybrid" if use_hybrid_search else "vector", "reranking_applied": use_reranking and ctx.request_context.lifespan_context.reranking_model is not None, "results": formatted_results, "count": len(formatted_results) }, indent=2) except Exception as e: return json.dumps({ "success": False, "query": query, "error": str(e) }, indent=2) @mcp.tool() async def search_code_examples(ctx: Context, query: str, source_id: str = None, match_count: int = 5) -> str: """ Search for code examples relevant to the query. This tool searches the vector database for code examples relevant to the query and returns the matching examples with their summaries. Optionally filter by source_id. Get the source_id by using the get_available_sources tool before calling this search! Use the get_available_sources tool first to see what sources are available for filtering. Args: ctx: The MCP server provided context query: The search query source_id: Optional source ID to filter results (e.g., 'example.com') match_count: Maximum number of results to return (default: 5) Returns: JSON string with the search results """ # Check if code example extraction is enabled extract_code_examples_enabled = os.getenv("USE_AGENTIC_RAG", "false") == "true" if not extract_code_examples_enabled: return json.dumps({ "success": False, "error": "Code example extraction is disabled. Perform a normal RAG search." }, indent=2) try: # Get the Supabase client from the context supabase_client = ctx.request_context.lifespan_context.supabase_client # Check if hybrid search is enabled use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false") == "true" # Prepare filter if source is provided and not empty filter_metadata = None if source_id and source_id.strip(): filter_metadata = {"source": source_id} if use_hybrid_search: # Hybrid search: combine vector and keyword search # Import the search function from utils from utils import search_code_examples as search_code_examples_impl # 1. Get vector search results (get more to account for filtering) vector_results = search_code_examples_impl( client=supabase_client, query=query, match_count=match_count * 2, # Get double to have room for filtering filter_metadata=filter_metadata ) # 2. Get keyword search results using ILIKE on both content and summary keyword_query = supabase_client.from_('code_examples')\ .select('id, url, chunk_number, content, summary, metadata, source_id')\ .or_(f'content.ilike.%{query}%,summary.ilike.%{query}%') # Apply source filter if provided if source_id and source_id.strip(): keyword_query = keyword_query.eq('source_id', source_id) # Execute keyword search keyword_response = keyword_query.limit(match_count * 2).execute() keyword_results = keyword_response.data if keyword_response.data else [] # 3. Combine results with preference for items appearing in both seen_ids = set() combined_results = [] # First, add items that appear in both searches (these are the best matches) vector_ids = {r.get('id') for r in vector_results if r.get('id')} for kr in keyword_results: if kr['id'] in vector_ids and kr['id'] not in seen_ids: # Find the vector result to get similarity score for vr in vector_results: if vr.get('id') == kr['id']: # Boost similarity score for items in both results vr['similarity'] = min(1.0, vr.get('similarity', 0) * 1.2) combined_results.append(vr) seen_ids.add(kr['id']) break # Then add remaining vector results (semantic matches without exact keyword) for vr in vector_results: if vr.get('id') and vr['id'] not in seen_ids and len(combined_results) < match_count: combined_results.append(vr) seen_ids.add(vr['id']) # Finally, add pure keyword matches if we still need more results for kr in keyword_results: if kr['id'] not in seen_ids and len(combined_results) < match_count: # Convert keyword result to match vector result format combined_results.append({ 'id': kr['id'], 'url': kr['url'], 'chunk_number': kr['chunk_number'], 'content': kr['content'], 'summary': kr['summary'], 'metadata': kr['metadata'], 'source_id': kr['source_id'], 'similarity': 0.5 # Default similarity for keyword-only matches }) seen_ids.add(kr['id']) # Use combined results results = combined_results[:match_count] else: # Standard vector search only from utils import search_code_examples as search_code_examples_impl results = search_code_examples_impl( client=supabase_client, query=query, match_count=match_count, filter_metadata=filter_metadata ) # Apply reranking if enabled use_reranking = os.getenv("USE_RERANKING", "false") == "true" if use_reranking and ctx.request_context.lifespan_context.reranking_model: results = rerank_results(ctx.request_context.lifespan_context.reranking_model, query, results, content_key="content") # Format the results formatted_results = [] for result in results: formatted_result = { "url": result.get("url"), "code": result.get("content"), "summary": result.get("summary"), "metadata": result.get("metadata"), "source_id": result.get("source_id"), "similarity": result.get("similarity") } # Include rerank score if available if "rerank_score" in result: formatted_result["rerank_score"] = result["rerank_score"] formatted_results.append(formatted_result) return json.dumps({ "success": True, "query": query, "source_filter": source_id, "search_mode": "hybrid" if use_hybrid_search else "vector", "reranking_applied": use_reranking and ctx.request_context.lifespan_context.reranking_model is not None, "results": formatted_results, "count": len(formatted_results) }, indent=2) except Exception as e: return json.dumps({ "success": False, "query": query, "error": str(e) }, indent=2) @mcp.tool() async def check_ai_script_hallucinations(ctx: Context, script_path: str) -> str: """ Check an AI-generated Python script for hallucinations using the knowledge graph. This tool analyzes a Python script for potential AI hallucinations by validating imports, method calls, class instantiations, and function calls against a Neo4j knowledge graph containing real repository data. The tool performs comprehensive analysis including: - Import validation against known repositories - Method call validation on classes from the knowledge graph - Class instantiation parameter validation - Function call parameter validation - Attribute access validation Args: ctx: The MCP server provided context script_path: Absolute path to the Python script to analyze Returns: JSON string with hallucination detection results, confidence scores, and recommendations """ try: # Check if knowledge graph functionality is enabled knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true" if not knowledge_graph_enabled: return json.dumps({ "success": False, "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment." }, indent=2) # Get the knowledge validator from context knowledge_validator = ctx.request_context.lifespan_context.knowledge_validator if not knowledge_validator: return json.dumps({ "success": False, "error": "Knowledge graph validator not available. Check Neo4j configuration in environment variables." }, indent=2) # Validate script path validation = validate_script_path(script_path) if not validation["valid"]: return json.dumps({ "success": False, "script_path": script_path, "error": validation["error"] }, indent=2) # Step 1: Analyze script structure using AST analyzer = AIScriptAnalyzer() analysis_result = analyzer.analyze_script(script_path) if analysis_result.errors: print(f"Analysis warnings for {script_path}: {analysis_result.errors}") # Step 2: Validate against knowledge graph validation_result = await knowledge_validator.validate_script(analysis_result) # Step 3: Generate comprehensive report reporter = HallucinationReporter() report = reporter.generate_comprehensive_report(validation_result) # Format response with comprehensive information return json.dumps({ "success": True, "script_path": script_path, "overall_confidence": validation_result.overall_confidence, "validation_summary": { "total_validations": report["validation_summary"]["total_validations"], "valid_count": report["validation_summary"]["valid_count"], "invalid_count": report["validation_summary"]["invalid_count"], "uncertain_count": report["validation_summary"]["uncertain_count"], "not_found_count": report["validation_summary"]["not_found_count"], "hallucination_rate": report["validation_summary"]["hallucination_rate"] }, "hallucinations_detected": report["hallucinations_detected"], "recommendations": report["recommendations"], "analysis_metadata": { "total_imports": report["analysis_metadata"]["total_imports"], "total_classes": report["analysis_metadata"]["total_classes"], "total_methods": report["analysis_metadata"]["total_methods"], "total_attributes": report["analysis_metadata"]["total_attributes"], "total_functions": report["analysis_metadata"]["total_functions"] }, "libraries_analyzed": report.get("libraries_analyzed", []) }, indent=2) except Exception as e: return json.dumps({ "success": False, "script_path": script_path, "error": f"Analysis failed: {str(e)}" }, indent=2) @mcp.tool() async def query_knowledge_graph(ctx: Context, command: str) -> str: """ Query and explore the Neo4j knowledge graph containing repository data. This tool provides comprehensive access to the knowledge graph for exploring repositories, classes, methods, functions, and their relationships. Perfect for understanding what data is available for hallucination detection and debugging validation results. **⚠️ IMPORTANT: Always start with the `repos` command first!** Before using any other commands, run `repos` to see what repositories are available in your knowledge graph. This will help you understand what data you can explore. ## Available Commands: **Repository Commands:** - `repos` - **START HERE!** List all repositories in the knowledge graph - `explore <repo_name>` - Get detailed overview of a specific repository **Class Commands:** - `classes` - List all classes across all repositories (limited to 20) - `classes <repo_name>` - List classes in a specific repository - `class <class_name>` - Get detailed information about a specific class including methods and attributes **Method Commands:** - `method <method_name>` - Search for methods by name across all classes - `method <method_name> <class_name>` - Search for a method within a specific class **Custom Query:** - `query <cypher_query>` - Execute a custom Cypher query (results limited to 20 records) ## Knowledge Graph Schema: **Node Types:** - Repository: `(r:Repository {name: string})` - File: `(f:File {path: string, module_name: string})` - Class: `(c:Class {name: string, full_name: string})` - Method: `(m:Method {name: string, params_list: [string], params_detailed: [string], return_type: string, args: [string]})` - Function: `(func:Function {name: string, params_list: [string], params_detailed: [string], return_type: string, args: [string]})` - Attribute: `(a:Attribute {name: string, type: string})` **Relationships:** - `(r:Repository)-[:CONTAINS]->(f:File)` - `(f:File)-[:DEFINES]->(c:Class)` - `(c:Class)-[:HAS_METHOD]->(m:Method)` - `(c:Class)-[:HAS_ATTRIBUTE]->(a:Attribute)` - `(f:File)-[:DEFINES]->(func:Function)` ## Example Workflow: ``` 1. repos # See what repositories are available 2. explore pydantic-ai # Explore a specific repository 3. classes pydantic-ai # List classes in that repository 4. class Agent # Explore the Agent class 5. method run_stream # Search for run_stream method 6. method __init__ Agent # Find Agent constructor 7. query "MATCH (c:Class)-[:HAS_METHOD]->(m:Method) WHERE m.name = 'run' RETURN c.name, m.name LIMIT 5" ``` Args: ctx: The MCP server provided context command: Command string to execute (see available commands above) Returns: JSON string with query results, statistics, and metadata """ try: # Check if knowledge graph functionality is enabled knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true" if not knowledge_graph_enabled: return json.dumps({ "success": False, "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment." }, indent=2) # Get Neo4j driver from context repo_extractor = ctx.request_context.lifespan_context.repo_extractor if not repo_extractor or not repo_extractor.driver: return json.dumps({ "success": False, "error": "Neo4j connection not available. Check Neo4j configuration in environment variables." }, indent=2) # Parse command command = command.strip() if not command: return json.dumps({ "success": False, "command": "", "error": "Command cannot be empty. Available commands: repos, explore <repo>, classes [repo], class <name>, method <name> [class], query <cypher>" }, indent=2) parts = command.split() cmd = parts[0].lower() args = parts[1:] if len(parts) > 1 else [] async with repo_extractor.driver.session() as session: # Route to appropriate handler if cmd == "repos": return await _handle_repos_command(session, command) elif cmd == "explore": if not args: return json.dumps({ "success": False, "command": command, "error": "Repository name required. Usage: explore <repo_name>" }, indent=2) return await _handle_explore_command(session, command, args[0]) elif cmd == "classes": repo_name = args[0] if args else None return await _handle_classes_command(session, command, repo_name) elif cmd == "class": if not args: return json.dumps({ "success": False, "command": command, "error": "Class name required. Usage: class <class_name>" }, indent=2) return await _handle_class_command(session, command, args[0]) elif cmd == "method": if not args: return json.dumps({ "success": False, "command": command, "error": "Method name required. Usage: method <method_name> [class_name]" }, indent=2) method_name = args[0] class_name = args[1] if len(args) > 1 else None return await _handle_method_command(session, command, method_name, class_name) elif cmd == "query": if not args: return json.dumps({ "success": False, "command": command, "error": "Cypher query required. Usage: query <cypher_query>" }, indent=2) cypher_query = " ".join(args) return await _handle_query_command(session, command, cypher_query) else: return json.dumps({ "success": False, "command": command, "error": f"Unknown command '{cmd}'. Available commands: repos, explore <repo>, classes [repo], class <name>, method <name> [class], query <cypher>" }, indent=2) except Exception as e: return json.dumps({ "success": False, "command": command, "error": f"Query execution failed: {str(e)}" }, indent=2) async def _handle_repos_command(session, command: str) -> str: """Handle 'repos' command - list all repositories""" query = "MATCH (r:Repository) RETURN r.name as name ORDER BY r.name" result = await session.run(query) repos = [] async for record in result: repos.append(record['name']) return json.dumps({ "success": True, "command": command, "data": { "repositories": repos }, "metadata": { "total_results": len(repos), "limited": False } }, indent=2) async def _handle_explore_command(session, command: str, repo_name: str) -> str: """Handle 'explore <repo>' command - get repository overview""" # Check if repository exists repo_check_query = "MATCH (r:Repository {name: $repo_name}) RETURN r.name as name" result = await session.run(repo_check_query, repo_name=repo_name) repo_record = await result.single() if not repo_record: return json.dumps({ "success": False, "command": command, "error": f"Repository '{repo_name}' not found in knowledge graph" }, indent=2) # Get file count files_query = """ MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File) RETURN count(f) as file_count """ result = await session.run(files_query, repo_name=repo_name) file_count = (await result.single())['file_count'] # Get class count classes_query = """ MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class) RETURN count(DISTINCT c) as class_count """ result = await session.run(classes_query, repo_name=repo_name) class_count = (await result.single())['class_count'] # Get function count functions_query = """ MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(func:Function) RETURN count(DISTINCT func) as function_count """ result = await session.run(functions_query, repo_name=repo_name) function_count = (await result.single())['function_count'] # Get method count methods_query = """ MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class)-[:HAS_METHOD]->(m:Method) RETURN count(DISTINCT m) as method_count """ result = await session.run(methods_query, repo_name=repo_name) method_count = (await result.single())['method_count'] return json.dumps({ "success": True, "command": command, "data": { "repository": repo_name, "statistics": { "files": file_count, "classes": class_count, "functions": function_count, "methods": method_count } }, "metadata": { "total_results": 1, "limited": False } }, indent=2) async def _handle_classes_command(session, command: str, repo_name: str = None) -> str: """Handle 'classes [repo]' command - list classes""" limit = 20 if repo_name: query = """ MATCH (r:Repository {name: $repo_name})-[:CONTAINS]->(f:File)-[:DEFINES]->(c:Class) RETURN c.name as name, c.full_name as full_name ORDER BY c.name LIMIT $limit """ result = await session.run(query, repo_name=repo_name, limit=limit) else: query = """ MATCH (c:Class) RETURN c.name as name, c.full_name as full_name ORDER BY c.name LIMIT $limit """ result = await session.run(query, limit=limit) classes = [] async for record in result: classes.append({ 'name': record['name'], 'full_name': record['full_name'] }) return json.dumps({ "success": True, "command": command, "data": { "classes": classes, "repository_filter": repo_name }, "metadata": { "total_results": len(classes), "limited": len(classes) >= limit } }, indent=2) async def _handle_class_command(session, command: str, class_name: str) -> str: """Handle 'class <name>' command - explore specific class""" # Find the class class_query = """ MATCH (c:Class) WHERE c.name = $class_name OR c.full_name = $class_name RETURN c.name as name, c.full_name as full_name LIMIT 1 """ result = await session.run(class_query, class_name=class_name) class_record = await result.single() if not class_record: return json.dumps({ "success": False, "command": command, "error": f"Class '{class_name}' not found in knowledge graph" }, indent=2) actual_name = class_record['name'] full_name = class_record['full_name'] # Get methods methods_query = """ MATCH (c:Class)-[:HAS_METHOD]->(m:Method) WHERE c.name = $class_name OR c.full_name = $class_name RETURN m.name as name, m.params_list as params_list, m.params_detailed as params_detailed, m.return_type as return_type ORDER BY m.name """ result = await session.run(methods_query, class_name=class_name) methods = [] async for record in result: # Use detailed params if available, fall back to simple params params_to_use = record['params_detailed'] or record['params_list'] or [] methods.append({ 'name': record['name'], 'parameters': params_to_use, 'return_type': record['return_type'] or 'Any' }) # Get attributes attributes_query = """ MATCH (c:Class)-[:HAS_ATTRIBUTE]->(a:Attribute) WHERE c.name = $class_name OR c.full_name = $class_name RETURN a.name as name, a.type as type ORDER BY a.name """ result = await session.run(attributes_query, class_name=class_name) attributes = [] async for record in result: attributes.append({ 'name': record['name'], 'type': record['type'] or 'Any' }) return json.dumps({ "success": True, "command": command, "data": { "class": { "name": actual_name, "full_name": full_name, "methods": methods, "attributes": attributes } }, "metadata": { "total_results": 1, "methods_count": len(methods), "attributes_count": len(attributes), "limited": False } }, indent=2) async def _handle_method_command(session, command: str, method_name: str, class_name: str = None) -> str: """Handle 'method <name> [class]' command - search for methods""" if class_name: query = """ MATCH (c:Class)-[:HAS_METHOD]->(m:Method) WHERE (c.name = $class_name OR c.full_name = $class_name) AND m.name = $method_name RETURN c.name as class_name, c.full_name as class_full_name, m.name as method_name, m.params_list as params_list, m.params_detailed as params_detailed, m.return_type as return_type, m.args as args """ result = await session.run(query, class_name=class_name, method_name=method_name) else: query = """ MATCH (c:Class)-[:HAS_METHOD]->(m:Method) WHERE m.name = $method_name RETURN c.name as class_name, c.full_name as class_full_name, m.name as method_name, m.params_list as params_list, m.params_detailed as params_detailed, m.return_type as return_type, m.args as args ORDER BY c.name LIMIT 20 """ result = await session.run(query, method_name=method_name) methods = [] async for record in result: # Use detailed params if available, fall back to simple params params_to_use = record['params_detailed'] or record['params_list'] or [] methods.append({ 'class_name': record['class_name'], 'class_full_name': record['class_full_name'], 'method_name': record['method_name'], 'parameters': params_to_use, 'return_type': record['return_type'] or 'Any', 'legacy_args': record['args'] or [] }) if not methods: return json.dumps({ "success": False, "command": command, "error": f"Method '{method_name}'" + (f" in class '{class_name}'" if class_name else "") + " not found" }, indent=2) return json.dumps({ "success": True, "command": command, "data": { "methods": methods, "class_filter": class_name }, "metadata": { "total_results": len(methods), "limited": len(methods) >= 20 and not class_name } }, indent=2) async def _handle_query_command(session, command: str, cypher_query: str) -> str: """Handle 'query <cypher>' command - execute custom Cypher query""" try: # Execute the query with a limit to prevent overwhelming responses result = await session.run(cypher_query) records = [] count = 0 async for record in result: records.append(dict(record)) count += 1 if count >= 20: # Limit results to prevent overwhelming responses break return json.dumps({ "success": True, "command": command, "data": { "query": cypher_query, "results": records }, "metadata": { "total_results": len(records), "limited": len(records) >= 20 } }, indent=2) except Exception as e: return json.dumps({ "success": False, "command": command, "error": f"Cypher query error: {str(e)}", "data": { "query": cypher_query } }, indent=2) @mcp.tool() async def parse_github_repository(ctx: Context, repo_url: str) -> str: """ Parse a GitHub repository into the Neo4j knowledge graph. This tool clones a GitHub repository, analyzes its Python files, and stores the code structure (classes, methods, functions, imports) in Neo4j for use in hallucination detection. The tool: - Clones the repository to a temporary location - Analyzes Python files to extract code structure - Stores classes, methods, functions, and imports in Neo4j - Provides detailed statistics about the parsing results - Automatically handles module name detection for imports Args: ctx: The MCP server provided context repo_url: GitHub repository URL (e.g., 'https://github.com/user/repo.git') Returns: JSON string with parsing results, statistics, and repository information """ try: # Check if knowledge graph functionality is enabled knowledge_graph_enabled = os.getenv("USE_KNOWLEDGE_GRAPH", "false") == "true" if not knowledge_graph_enabled: return json.dumps({ "success": False, "error": "Knowledge graph functionality is disabled. Set USE_KNOWLEDGE_GRAPH=true in environment." }, indent=2) # Get the repository extractor from context repo_extractor = ctx.request_context.lifespan_context.repo_extractor if not repo_extractor: return json.dumps({ "success": False, "error": "Repository extractor not available. Check Neo4j configuration in environment variables." }, indent=2) # Validate repository URL validation = validate_github_url(repo_url) if not validation["valid"]: return json.dumps({ "success": False, "repo_url": repo_url, "error": validation["error"] }, indent=2) repo_name = validation["repo_name"] # Parse the repository (this includes cloning, analysis, and Neo4j storage) print(f"Starting repository analysis for: {repo_name}") await repo_extractor.analyze_repository(repo_url) print(f"Repository analysis completed for: {repo_name}") # Query Neo4j for statistics about the parsed repository async with repo_extractor.driver.session() as session: # Get comprehensive repository statistics stats_query = """ MATCH (r:Repository {name: $repo_name}) OPTIONAL MATCH (r)-[:CONTAINS]->(f:File) OPTIONAL MATCH (f)-[:DEFINES]->(c:Class) OPTIONAL MATCH (c)-[:HAS_METHOD]->(m:Method) OPTIONAL MATCH (f)-[:DEFINES]->(func:Function) OPTIONAL MATCH (c)-[:HAS_ATTRIBUTE]->(a:Attribute) WITH r, count(DISTINCT f) as files_count, count(DISTINCT c) as classes_count, count(DISTINCT m) as methods_count, count(DISTINCT func) as functions_count, count(DISTINCT a) as attributes_count // Get some sample module names OPTIONAL MATCH (r)-[:CONTAINS]->(sample_f:File) WITH r, files_count, classes_count, methods_count, functions_count, attributes_count, collect(DISTINCT sample_f.module_name)[0..5] as sample_modules RETURN r.name as repo_name, files_count, classes_count, methods_count, functions_count, attributes_count, sample_modules """ result = await session.run(stats_query, repo_name=repo_name) record = await result.single() if record: stats = { "repository": record['repo_name'], "files_processed": record['files_count'], "classes_created": record['classes_count'], "methods_created": record['methods_count'], "functions_created": record['functions_count'], "attributes_created": record['attributes_count'], "sample_modules": record['sample_modules'] or [] } else: return json.dumps({ "success": False, "repo_url": repo_url, "error": f"Repository '{repo_name}' not found in database after parsing" }, indent=2) return json.dumps({ "success": True, "repo_url": repo_url, "repo_name": repo_name, "message": f"Successfully parsed repository '{repo_name}' into knowledge graph", "statistics": stats, "ready_for_validation": True, "next_steps": [ "Repository is now available for hallucination detection", f"Use check_ai_script_hallucinations to validate scripts against {repo_name}", "The knowledge graph contains classes, methods, and functions from this repository" ] }, indent=2) except Exception as e: return json.dumps({ "success": False, "repo_url": repo_url, "error": f"Repository parsing failed: {str(e)}" }, indent=2) async def crawl_markdown_file(crawler: AsyncWebCrawler, url: str) -> List[Dict[str, Any]]: """ Crawl a .txt or markdown file. Args: crawler: AsyncWebCrawler instance url: URL of the file Returns: List of dictionaries with URL and markdown content """ crawl_config = CrawlerRunConfig() result = await crawler.arun(url=url, config=crawl_config) if result.success and result.markdown: return [{'url': url, 'markdown': result.markdown}] else: print(f"Failed to crawl {url}: {result.error_message}") return [] async def crawl_batch(crawler: AsyncWebCrawler, urls: List[str], max_concurrent: int = 10) -> List[Dict[str, Any]]: """ Batch crawl multiple URLs in parallel. Args: crawler: AsyncWebCrawler instance urls: List of URLs to crawl max_concurrent: Maximum number of concurrent browser sessions Returns: List of dictionaries with URL and markdown content """ crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=False) dispatcher = MemoryAdaptiveDispatcher( memory_threshold_percent=70.0, check_interval=1.0, max_session_permit=max_concurrent ) results = await crawler.arun_many(urls=urls, config=crawl_config, dispatcher=dispatcher) return [{'url': r.url, 'markdown': r.markdown} for r in results if r.success and r.markdown] async def crawl_recursive_internal_links(crawler: AsyncWebCrawler, start_urls: List[str], max_depth: int = 3, max_concurrent: int = 10) -> List[Dict[str, Any]]: """ Recursively crawl internal links from start URLs up to a maximum depth. Args: crawler: AsyncWebCrawler instance start_urls: List of starting URLs max_depth: Maximum recursion depth max_concurrent: Maximum number of concurrent browser sessions Returns: List of dictionaries with URL and markdown content """ run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=False) dispatcher = MemoryAdaptiveDispatcher( memory_threshold_percent=70.0, check_interval=1.0, max_session_permit=max_concurrent ) visited = set() def normalize_url(url): return urldefrag(url)[0] current_urls = set([normalize_url(u) for u in start_urls]) results_all = [] for depth in range(max_depth): urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited] if not urls_to_crawl: break results = await crawler.arun_many(urls=urls_to_crawl, config=run_config, dispatcher=dispatcher) next_level_urls = set() for result in results: norm_url = normalize_url(result.url) visited.add(norm_url) if result.success and result.markdown: results_all.append({'url': result.url, 'markdown': result.markdown}) for link in result.links.get("internal", []): next_url = normalize_url(link["href"]) if next_url not in visited: next_level_urls.add(next_url) current_urls = next_level_urls return results_all async def main(): transport = os.getenv("TRANSPORT", "sse") if transport == 'sse': # Run the MCP server with sse transport await mcp.run_sse_async() else: # Run the MCP server with stdio transport await mcp.run_stdio_async() if __name__ == "__main__": asyncio.run(main()) ```