#
tokens: 44118/50000 2/207 files (page 21/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 21 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/display.py:
--------------------------------------------------------------------------------

```python
"""Display utilities for the Ultimate MCP Server.

This module contains reusable display functions for formatting and
presenting results from Ultimate MCP Server operations using Rich.
"""

import json
import os
import time

# --- Filesystem Tool Display Helper ---
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from rich import box
from rich.console import (
    Capture,  # For capturing table output
    Console,
)
from rich.markup import escape
from rich.panel import Panel
from rich.pretty import pretty_repr
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table
from rich.tree import Tree

# Import the console for consistent styling
from ultimate_mcp_server.utils.logging.console import console

from ..exceptions import ToolError, ToolInputError

# Restore direct tool import
from ..tools.filesystem import list_directory
from .logging.logger import get_logger  # <-- ADD specific import

try:
    # Import only the exception needed by safe_tool_call
    from ..tools.filesystem import ProtectionTriggeredError
except ImportError:
    # Handle case where filesystem tools might not be installed/available
    # Define a dummy exception class if ProtectionTriggeredError cannot be imported
    class ProtectionTriggeredError(Exception):
        def __init__(self, message, context=None):
            super().__init__(message)
            self.context = context if context is not None else {}

def extract_and_parse_content(result: Any) -> Dict[str, Any]:
    """
    Extract content from various result formats and parse JSON if present.
    This handles TextContent objects, lists of TextContent, and other formats.
    
    Args:
        result: Result object that might be TextContent, list, dict, etc.
        
    Returns:
        Dictionary with parsed data or error information
    """
    # Handle list of objects (common in MCP responses)
    if isinstance(result, list):
        if not result:
            return {"error": "Empty result list"}
        # Just use the first item for now (we could process all in the future)
        result = result[0]
    
    # Extract text from TextContent object
    text_content = ""
    if hasattr(result, 'text'):
        text_content = result.text
    elif isinstance(result, str):
        text_content = result
    elif isinstance(result, dict):
        return result  # Already a dict, no need to parse
    else:
        # Convert other types to string representation
        text_content = str(result)
    
    # Try to parse as JSON
    if text_content:
        try:
            parsed_data = json.loads(text_content)
            return parsed_data
        except json.JSONDecodeError:
            # Not JSON, return as raw text
            return {"raw_text": text_content, "error": "Not valid JSON"}
    
    # Empty content
    return {"error": "Empty content"}


def display_text_content_result(
    title: str, 
    result: Any, 
    console_instance: Optional[Console] = None
):
    """
    Display results from TextContent objects more reliably, which is useful for demos.
    This function is more forgiving with different formats and provides better handling
    for TextContent objects that might contain JSON strings.
    
    Args:
        title: Title to display for this result section
        result: Result object from an Ultimate MCP Server tool call (often a TextContent)
        console_instance: Optional console instance to use (defaults to shared console)
    """
    # Use provided console or default to shared console
    output = console_instance or console
    
    # Display section title
    output.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))
    
    # Extract and parse content
    parsed_data = extract_and_parse_content(result)
    
    # Check for extraction errors
    if "error" in parsed_data and "raw_text" in parsed_data:
        # Error parsing JSON, display as text
        output.print(Panel(
            escape(parsed_data["raw_text"]),
            title="[bold]Result Text[/bold]",
            border_style="green"
        ))
        return
    elif "error" in parsed_data and "raw_text" not in parsed_data:
        # Other error
        output.print(f"[red]{escape(parsed_data['error'])}[/red]")
        return
    
    # Display based on content type
    if isinstance(parsed_data, dict):
        # Special handling for QA pairs
        if "qa_pairs" in parsed_data and isinstance(parsed_data["qa_pairs"], list):
            qa_pairs = parsed_data["qa_pairs"]
            output.print(Panel(
                "\n".join([f"[bold]Q{i+1}:[/bold] {escape(pair.get('question', 'N/A'))}\n[bold]A{i+1}:[/bold] {escape(pair.get('answer', 'N/A'))}" 
                        for i, pair in enumerate(qa_pairs)]),
                title="[bold]Q&A Pairs[/bold]", 
                border_style="blue"
            ))
        # Special handling for entities
        elif "entities" in parsed_data:
            entities_data = parsed_data["entities"]
            if isinstance(entities_data, dict):
                # If it's a dict with entity types as keys
                entity_count = 0
                entity_table = Table(box=box.ROUNDED)
                entity_table.add_column("Type", style="cyan")
                entity_table.add_column("Entity", style="white")
                
                for entity_type, entities in entities_data.items():
                    if entities:
                        for entity in entities:
                            entity_text = entity if isinstance(entity, str) else entity.get('text', str(entity))
                            entity_table.add_row(entity_type, escape(entity_text))
                            entity_count += 1
                
                if entity_count > 0:
                    output.print(entity_table)
                else:
                    output.print("[yellow]No entities found in the document.[/yellow]")
            else:
                # If it's some other format, just show the raw data
                output.print(Panel(
                    escape(json.dumps(entities_data, indent=2)),
                    title="[bold]Entities Data[/bold]",
                    border_style="blue"
                ))
        # Summary
        elif "summary" in parsed_data and isinstance(parsed_data["summary"], str):
            output.print(Panel(
                escape(parsed_data["summary"]),
                title="[bold]Generated Summary[/bold]",
                border_style="green"
            ))
        # Generic JSON display for other data
        else:
            # Filter out stats fields for cleaner display
            display_data = {k: v for k, v in parsed_data.items() 
                           if k not in ["model", "provider", "cost", "tokens", "processing_time"]}
            
            # Only show JSON panel if we have data to display
            if display_data:
                output.print(Panel(
                    escape(json.dumps(display_data, indent=2)),
                    title="[bold]Result Data[/bold]",
                    border_style="blue"
                ))
        
        # Display stats if available
        if any(k in parsed_data for k in ["model", "provider", "cost", "tokens", "processing_time"]):
            _display_stats(parsed_data, output)
    else:
        # For other types (arrays, etc.)
        output.print(Panel(
            escape(json.dumps(parsed_data, indent=2)),
            title="[bold]Result Data[/bold]",
            border_style="blue"
        ))


def _display_input_data(input_data: Dict, output: Console):
    """
    Display input data with consistent formatting.
    
    This function formats and displays various types of input data using the Rich
    console library. It handles text content, JSON schemas, search queries, and
    embedding vectors, adjusting the display format appropriately for each type.
    
    Args:
        input_data: Dictionary containing input data to display. May include keys
                   like 'text', 'json_schema', 'query', and 'embeddings'.
        output: Rich Console instance to use for printing formatted output.
    """
    # Display input text if available
    if "text" in input_data:
        text_snippet = input_data["text"][:500] + ("..." if len(input_data["text"]) > 500 else "")
        output.print(Panel(
            escape(text_snippet), 
            title="[cyan]Input Text Snippet[/cyan]", 
            border_style="dim blue"
        ))
    
    # Display schema if available
    if "json_schema" in input_data and input_data["json_schema"]:
        try:
            schema_json = json.dumps(input_data["json_schema"], indent=2)
            output.print(Panel(
                Syntax(schema_json, "json", theme="default", line_numbers=False), 
                title="[cyan]Input Schema[/cyan]", 
                border_style="dim blue"
            ))
        except Exception as e:
            output.print(f"[red]Could not display schema: {escape(str(e))}[/red]")
    
    # Display query if available (for search results)
    if "query" in input_data:
        output.print(Panel(
            escape(input_data["query"]), 
            title="[cyan]Search Query[/cyan]", 
            border_style="dim blue"
        ))
        
    # Display embeddings/vectors if available
    if "embeddings" in input_data:
        if isinstance(input_data["embeddings"], list) and len(input_data["embeddings"]) > 0:
            sample = input_data["embeddings"][0]
            dims = len(sample) if isinstance(sample, (list, tuple)) else "unknown"
            sample_str = str(sample[:3]) + "..." if isinstance(sample, (list, tuple)) else str(sample)
            output.print(Panel(
                f"[cyan]Dimensions:[/cyan] {dims}\n[cyan]Sample:[/cyan] {escape(sample_str)}", 
                title="[cyan]Embedding Sample[/cyan]", 
                border_style="dim blue"
            ))


def _parse_and_display_output(result: Any, output: Console):
    """
    Parse result object and display appropriate visualizations.
    
    This function examines the structure of a result object and determines the best
    way to display it based on its content type. It automatically extracts and formats
    different data types like JSON data, vector search results, tables, key-value pairs,
    entity data, and embeddings.
    
    The function serves as an intelligent formatter that routes different content types
    to specialized display handlers that can present each type with appropriate
    rich formatting and visualization.
    
    Args:
        result: The result object to parse and display. Can be a list, dict, object
               with a 'text' attribute, or other structures.
        output: Rich Console instance to use for displaying the formatted content.
               
    Note:
        This is an internal utility function used by higher-level display functions
        to handle the details of content extraction and formatting.
    """
    # Extract result content
    parsed_result = {}
    raw_text = None
    
    # Handle list results (take first item)
    if isinstance(result, list) and result:
        result = result[0]
        
    # Handle object with text attribute
    if hasattr(result, 'text'):
        raw_text = result.text
        try:
            parsed_result = json.loads(raw_text)
        except json.JSONDecodeError:
            parsed_result = {"error": "Failed to parse JSON", "raw_text": raw_text}
    
    # Handle dictionary result
    elif isinstance(result, dict):
        parsed_result = result
    
    # Handle unknown result type
    else:
        parsed_result = {"error": f"Unexpected result type: {type(result)}"}
    
    # Display results based on content
    _display_result_content(parsed_result, output)


def _display_result_content(parsed_result: Dict, output: Console):
    """
    Display the content of results with appropriate formatting.
    
    This function intelligently selects appropriate display handlers for different
    types of result content. It checks for various data types (JSON data, vector search
    results, tables, key-value pairs, entities, embeddings, etc.) and routes the content
    to specialized display functions.
    
    Args:
        parsed_result: Dictionary containing parsed result data with various possible
                      content types to display.
        output: Rich Console instance to use for printing formatted output.
    """
    # Check for errors first
    if parsed_result.get("error"):
        _display_error(parsed_result, output)
        return
    
    # Display different result types
    
    # JSON Data
    if "data" in parsed_result and parsed_result["data"] is not None:
        _display_json_data(parsed_result["data"], "Extracted JSON Data", output)
    
    # Vector Search Results
    if "results" in parsed_result and isinstance(parsed_result["results"], list):
        _display_vector_results(parsed_result["results"], output)
    
    # Tables
    if "tables" in parsed_result and parsed_result["tables"]:
        _display_tables(parsed_result["tables"], output)
    
    # Key-Value Pairs
    if "key_value_pairs" in parsed_result or "pairs" in parsed_result:
        pairs = parsed_result.get("key_value_pairs", parsed_result.get("pairs", {}))
        _display_key_value_pairs(pairs, output)
    
    # Semantic Schema
    if "schema" in parsed_result and parsed_result["schema"]:
        _display_json_data(parsed_result["schema"], "Inferred Semantic Schema", output)
    
    # Entities
    if "entities" in parsed_result and parsed_result["entities"]:
        _display_entities(parsed_result["entities"], output)
    
    # Embeddings
    if "embeddings" in parsed_result and parsed_result["embeddings"]:
        _display_embeddings_info(parsed_result["embeddings"], 
                                parsed_result.get("model", "unknown"),
                                output)
    
    # Display execution stats if available
    _display_stats(parsed_result, output)


def _display_error(result: Dict, output: Console):
    """
    Display error information.
    
    This function formats and displays error information in a visually distinct way.
    It creates a red-bordered panel containing the error message and optional raw text
    output for debugging purposes.
    
    Args:
        result: Dictionary containing error information. Should include an 'error' key
               and optionally a 'raw_text' key with the original output.
        output: Rich Console instance to use for printing formatted output.
    """
    error_content = f"[red]Error:[/red] {escape(result['error'])}"
    if result.get("raw_text"):
        error_content += f"\n\n[yellow]Raw Text Output:[/yellow]\n{escape(result['raw_text'])}"
    output.print(Panel(
        error_content, 
        title="[bold red]Tool Error[/bold red]", 
        border_style="red"
    ))


def _display_json_data(data: Any, title: str, output: Console):
    """
    Display JSON data with proper formatting.
    
    This function formats and displays JSON data with syntax highlighting and proper
    indentation. It handles JSON serialization errors gracefully and displays the data
    in a visually appealing panel with a descriptive title.
    
    Args:
        data: Any data structure that can be serialized to JSON.
        title: Title string to display above the JSON content.
        output: Rich Console instance to use for printing formatted output.
    """
    try:
        data_json = json.dumps(data, indent=2)
        output.print(Panel(
            Syntax(data_json, "json", theme="default", line_numbers=True, word_wrap=True),
            title=f"[bold green]{title}[/bold green]",
            border_style="green"
        ))
    except Exception as e:
        output.print(f"[red]Could not display JSON data: {escape(str(e))}[/red]")


def _display_vector_results(results: List[Dict], output: Console):
    """
    Display vector search results.
    
    This function creates and displays a formatted table showing vector search results,
    including IDs, similarity scores, metadata, and text snippets. It automatically 
    adapts the table columns based on the structure of the first result item, handling
    various metadata fields dynamically.
    
    Args:
        results: List of dictionaries containing vector search results. Each dictionary
                typically includes 'id', 'similarity' or 'score', optional 'metadata',
                and 'text' fields.
        output: Rich Console instance to use for printing formatted output.
    """
    results_table = Table(title="[bold green]Vector Search Results[/bold green]", box=box.ROUNDED)
    
    # Determine columns based on first result
    if not results:
        output.print("[yellow]No vector search results to display[/yellow]")
        return
    
    first_result = results[0]
    
    # Add standard columns
    results_table.add_column("ID", style="cyan")
    results_table.add_column("Score", style="green", justify="right")
    
    # Add metadata columns if available
    metadata_keys = []
    if "metadata" in first_result and isinstance(first_result["metadata"], dict):
        metadata_keys = list(first_result["metadata"].keys())
        for key in metadata_keys:
            results_table.add_column(key.capitalize(), style="magenta")
    
    # Add text column
    results_table.add_column("Text", style="white")
    
    # Add rows
    for item in results:
        row = [
            escape(str(item.get("id", ""))),
            f"{item.get('similarity', item.get('score', 0.0)):.4f}"
        ]
        
        # Add metadata values
        if metadata_keys:
            metadata = item.get("metadata", {})
            for key in metadata_keys:
                row.append(escape(str(metadata.get(key, ""))))
        
        # Add text
        text = item.get("text", "")
        text_snippet = text[:80] + ("..." if len(text) > 80 else "")
        row.append(escape(text_snippet))
        
        results_table.add_row(*row)
    
    output.print(results_table)


def _display_tables(tables: List[Dict], output: Console):
    """
    Display extracted tables.
    
    This function formats and displays extracted table data in multiple formats
    (JSON, Markdown) with appropriate syntax highlighting. It includes table titles
    and associated metadata when available.
    
    Args:
        tables: List of dictionaries containing table information. Each dictionary may
               include 'title', 'json', 'markdown', and 'metadata' fields.
        output: Rich Console instance to use for printing formatted output.
    """
    for i, table_info in enumerate(tables):
        table_title = table_info.get('title', f'Table {i+1}')
        output.print(Rule(f"[green]Extracted: {escape(table_title)}[/green]"))
        
        # JSON format
        if table_info.get("json"):
            try:
                table_json = json.dumps(table_info["json"], indent=2)
                output.print(Panel(
                    Syntax(table_json, "json", theme="default", line_numbers=False, word_wrap=True),
                    title="[bold]JSON Format[/bold]",
                    border_style="dim green"
                ))
            except Exception as e:
                output.print(f"[red]Could not display table JSON: {escape(str(e))}[/red]")
        
        # Markdown format
        if table_info.get("markdown"):
            output.print(Panel(
                Syntax(table_info["markdown"], "markdown", theme="default"),
                title="[bold]Markdown Format[/bold]",
                border_style="dim green"
            ))
        
        # Metadata
        if table_info.get("metadata"):
            try:
                meta_json = json.dumps(table_info["metadata"], indent=2)
                output.print(Panel(
                    Syntax(meta_json, "json", theme="default", line_numbers=False),
                    title="[bold]Metadata[/bold]",
                    border_style="dim green"
                ))
            except Exception as e:
                output.print(f"[red]Could not display metadata: {escape(str(e))}[/red]")


def _display_key_value_pairs(pairs: Union[Dict, List], output: Console):
    """
    Display key-value pairs in a table.
    
    This function creates and displays a formatted table showing key-value pairs.
    It handles both dictionary and list inputs, adapting the display format
    appropriately for each case.
    
    Args:
        pairs: Dictionary of key-value pairs or list of dictionaries containing
              key-value pairs to display.
        output: Rich Console instance to use for printing formatted output.
    """
    kv_table = Table(title="[bold green]Extracted Key-Value Pairs[/bold green]", box=box.ROUNDED)
    kv_table.add_column("Key", style="magenta")
    kv_table.add_column("Value", style="white")
    
    if isinstance(pairs, dict):
        for k, v in pairs.items():
            kv_table.add_row(escape(str(k)), escape(str(v)))
    elif isinstance(pairs, list):
        for item in pairs:
            if isinstance(item, dict):
                for k, v in item.items():
                    kv_table.add_row(escape(str(k)), escape(str(v)))
    
    if kv_table.row_count > 0:
        output.print(kv_table)


def _display_entities(entities: List[Dict], output: Console):
    """
    Display extracted entities.
    
    This function creates and displays a formatted table showing extracted entities,
    including their type, text, context snippet, and confidence score. It's optimized
    for displaying named entity recognition (NER) results.
    
    Args:
        entities: List of dictionaries containing entity information. Each dictionary
                 typically includes 'type', 'text', 'context', and 'score' fields.
        output: Rich Console instance to use for printing formatted output.
    """
    entity_table = Table(title="[bold green]Extracted Entities[/bold green]", box=box.ROUNDED)
    entity_table.add_column("Type", style="cyan")
    entity_table.add_column("Text", style="white")
    entity_table.add_column("Context", style="dim")
    entity_table.add_column("Score", style="green", justify="right")
    
    for entity in entities:
        context_snippet = entity.get("context", "")[:50] + ("..." if len(entity.get("context", "")) > 50 else "")
        score_str = f"{entity.get('score', 0.0):.2f}" if entity.get('score') is not None else "N/A"
        
        entity_table.add_row(
            escape(entity.get("type", "N/A")),
            escape(entity.get("text", "N/A")),
            escape(context_snippet),
            score_str
        )
    
    output.print(entity_table)


def _display_embeddings_info(embeddings: List, model: str, output: Console):
    """
    Display information about embeddings.
    
    This function creates and displays a summary table of embedding information,
    including model name, embedding count, dimensions, and sample values. It handles
    edge cases like empty embedding lists and non-numeric embedding values.
    
    Args:
        embeddings: List of embedding vectors. Each vector is typically a list of
                  floating-point numbers.
        model: Name of the embedding model used to generate the embeddings.
        output: Rich Console instance to use for printing formatted output.
    """
    if not isinstance(embeddings, list) or len(embeddings) == 0:
        return
    
    # Just display summary info about the embeddings
    sample = embeddings[0]
    dims = len(sample) if isinstance(sample, (list, tuple)) else "unknown"
    
    embed_table = Table(title="[bold green]Embedding Information[/bold green]", box=box.MINIMAL)
    embed_table.add_column("Property", style="cyan")
    embed_table.add_column("Value", style="white")
    
    embed_table.add_row("Model", escape(model))
    embed_table.add_row("Count", str(len(embeddings)))
    embed_table.add_row("Dimensions", str(dims))
    
    # Show a few values from first embedding
    if isinstance(sample, (list, tuple)) and len(sample) > 0:
        sample_values = sample[:3]
        try:
            # Try to round values if they're numeric
            rounded_values = [round(x, 6) for x in sample_values]
            sample_str = str(rounded_values) + "..."
        except (TypeError, ValueError):
            sample_str = str(sample_values) + "..."
        embed_table.add_row("Sample Values", escape(sample_str))
    
    output.print(embed_table)


def _display_stats(result: Dict, output: Console):
    """
    Display execution statistics.
    
    This function creates and displays a summary table of execution statistics,
    including provider, model, cost, token usage, and processing time. It only
    displays statistics that are actually present in the input data.
    
    Args:
        result: Dictionary containing execution statistics. May include keys like
               'provider', 'model', 'cost', 'tokens', and 'processing_time'.
        output: Rich Console instance to use for printing formatted output.
    """
    # Check if we have stats data
    has_stats = any(k in result for k in ["model", "provider", "cost", "tokens", "processing_time"])
    if not has_stats:
        return
    
    stats_table = Table(title="Execution Stats", box=box.MINIMAL, show_header=False)
    stats_table.add_column("Metric", style="cyan")
    stats_table.add_column("Value", style="white")
    
    if "provider" in result:
        stats_table.add_row("Provider", escape(result.get("provider", "N/A")))
    
    if "model" in result:
        stats_table.add_row("Model", escape(result.get("model", "N/A")))
    
    if "cost" in result:
        stats_table.add_row("Cost", f"${result.get('cost', 0.0):.6f}")
    
    if "tokens" in result:
        tokens = result.get("tokens", {})
        if isinstance(tokens, dict):
            stats_table.add_row(
                "Tokens (In/Out/Total)", 
                f"{tokens.get('input', 0)} / {tokens.get('output', 0)} / {tokens.get('total', 0)}"
            )
    
    if "processing_time" in result:
        stats_table.add_row("Processing Time", f"{result.get('processing_time', 0.0):.3f}s")
    
    if stats_table.row_count > 0:
        output.print(stats_table)
    
    # Add a blank line after stats
    output.print()


# Specialized display functions for different demo types

def display_embedding_generation_results(results_data: Dict, output: Optional[Console] = None):
    """
    Display embedding generation results in a formatted table.
    
    This function creates a rich, formatted table visualization of embedding generation 
    results from multiple models. It organizes and presents key information including 
    model names, embedding dimensions, generation times, costs, sample values, and 
    success status for each embedding model.
    
    The visualization is designed to help users compare embedding results across different
    models and providers at a glance, making it easier to evaluate performance, cost,
    and quality differences between embedding options.
    
    Args:
        results_data: Dictionary containing embedding generation results. Expected to
                     contain a 'models' key with a list of model result dictionaries.
                     Each model result should include fields like 'name', 'dimensions',
                     'time', 'cost', 'embedding_sample', and 'success'.
        output: Optional Rich Console instance to use for display. If not provided,
                uses the default shared console.
    
    Note:
        If the results_data dictionary doesn't contain a 'models' key or the list is empty,
        the function will display a warning message instead of a table.
    """
    display = output or console
    
    if not results_data.get("models"):
        display.print("[yellow]No embedding results to display[/yellow]")
        return
    
    results_table = Table(title="Embedding Generation Results", box=box.ROUNDED, show_header=True)
    results_table.add_column("Model", style="magenta")
    results_table.add_column("Dimensions", style="cyan", justify="right")
    results_table.add_column("Gen Time (s)", style="yellow", justify="right")
    results_table.add_column("Cost ($)", style="green", justify="right")
    results_table.add_column("Sample Values", style="dim")
    results_table.add_column("Status", style="white")
    
    for model_info in results_data["models"]:
        status_str = "[green]Success[/green]" if model_info.get("success") else "[red]Failed[/red]"
        
        # Format sample values if available
        sample_str = "N/A"
        if model_info.get("embedding_sample") is not None:
            sample_str = escape(str(model_info["embedding_sample"]) + "...")
        
        results_table.add_row(
            escape(model_info.get("name", "Unknown")),
            str(model_info.get("dimensions", "-")),
            f"{model_info.get('time', 0.0):.3f}",
            f"{model_info.get('cost', 0.0):.6f}",
            sample_str,
            status_str
        )
    
    display.print(results_table)
    display.print()

def display_vector_similarity_results(similarity_data: Dict, output: Optional[Console] = None):
    """
    Display semantic similarity scores between text pairs in a formatted table.
    
    This function creates a rich, visually appealing table visualization of semantic
    similarity results between text pairs. It extracts and presents information about
    each compared text pair and their corresponding similarity score, making it easy
    to see which text segments are semantically related.
    
    The table includes columns for text snippets from each pair (truncated if too long)
    and their corresponding similarity score. This visualization is particularly useful
    for comparing multiple text pairs at once and identifying patterns of semantic
    relatedness across a dataset.
    
    Args:
        similarity_data: Dictionary containing semantic similarity results. Expected
                        to contain a 'pairs' key with a list of comparison result
                        dictionaries. Each pair should include 'text1', 'text2', and
                        'score' fields.
        output: Optional Rich Console instance to use for display. If not provided,
                uses the default shared console.
    
    Note:
        If the similarity_data dictionary doesn't contain valid pairs data or the list
        is empty, the function will display a warning message instead of a table.
        Similarity scores are displayed with 4 decimal places of precision.
    """
    display = output or console
    
    pairs = similarity_data.get("pairs", [])
    if not pairs or not isinstance(pairs, list) or len(pairs) == 0:
        display.print("[yellow]No similarity data to display[/yellow]")
        return
    
    similarity_table = Table(title="Semantic Similarity Scores", box=box.ROUNDED, show_header=True)
    similarity_table.add_column("Text 1 Snippet", style="white")
    similarity_table.add_column("Text 2 Snippet", style="white")
    similarity_table.add_column("Similarity Score", style="green", justify="right")
    
    for pair in pairs:
        text1 = pair.get("text1", "")[:50] + ("..." if len(pair.get("text1", "")) > 50 else "")
        text2 = pair.get("text2", "")[:50] + ("..." if len(pair.get("text2", "")) > 50 else "")
        score = pair.get("score", 0.0)
        
        # If score is a numpy array, convert to scalar
        try:
            if hasattr(score, 'item'):  # Check if it's potentially a numpy scalar
                score = score.item()
        except (AttributeError, TypeError):
            pass
            
        similarity_table.add_row(
            escape(text1),
            escape(text2),
            f"{score:.4f}"
        )
    
    display.print(similarity_table)
    display.print()


def display_analytics_metrics(metrics_data: Dict, output: Optional[Console] = None):
    """
    Display analytics metrics in an attractive format.
    
    This function takes a dictionary of analytics metrics data and displays it in a
    formatted table using the Rich library. The metrics are grouped by category,
    and each category is displayed as a separate table.
    
    Args:
        metrics_data: Dictionary containing analytics metrics data
        output: Optional Rich Console instance to use for display. If not provided,
                the default console will be used.
    """
    # Use provided console or default
    output = output or console

    # Check required data
    if not metrics_data or not isinstance(metrics_data, dict):
        output.print("[yellow]No analytics metrics data to display[/yellow]")
        return
    
    # Display section header
    output.print(Rule("[bold blue]Analytics Metrics[/bold blue]"))
    
    # Create metrics table
    metrics_table = Table(title="[bold]Metrics Overview[/bold]", box=box.ROUNDED)
    metrics_table.add_column("Metric", style="cyan")
    metrics_table.add_column("Count", style="green", justify="right")
    metrics_table.add_column("Details", style="dim")
    
    # Process data
    if "request_counts" in metrics_data:
        for metric, count in metrics_data["request_counts"].items():
            metrics_table.add_row(
                metric.replace("_", " ").title(),
                str(count),
                ""
            )
    
    # Display table
    output.print(metrics_table)
    
    # Display any grouped metrics
    if "request_distributions" in metrics_data:
        for group_name, distribution in metrics_data["request_distributions"].items():
            distribution_table = Table(
                title=f"[bold]{group_name.replace('_', ' ').title()} Distribution[/bold]",
                box=box.SIMPLE
            )
            distribution_table.add_column("Category", style="cyan")
            distribution_table.add_column("Count", style="green", justify="right")
            distribution_table.add_column("Percentage", style="yellow", justify="right")
            
            total = sum(distribution.values())
            for category, count in distribution.items():
                percentage = (count / total) * 100 if total > 0 else 0
                distribution_table.add_row(
                    category,
                    str(count),
                    f"{percentage:.1f}%"
                )
            
            output.print(distribution_table)

# --- Tournament Display Functions ---

def display_tournament_status(status_data: Dict[str, Any], output: Optional[Console] = None):
    """
    Display tournament status with better formatting using Rich.
    
    This function takes a dictionary containing tournament status information
    and displays it in a formatted table using the Rich library. The table
    includes the tournament status, current round, total rounds, progress
    percentage, and timestamps if available.
    
    Args:
        status_data: Dictionary with tournament status information
        output: Optional console to use (defaults to shared console)
    """
    # Use provided console or default
    display = output or console
    
    # Extract status information
    status = status_data.get("status", "UNKNOWN")
    current_round = status_data.get("current_round", 0)
    total_rounds = status_data.get("total_rounds", 0)
    
    # Calculate progress percentage
    if total_rounds > 0:
        progress = (current_round / total_rounds) * 100
    else:
        progress = 0
        
    # Create status table with improved formatting
    status_table = Table(box=box.SIMPLE, show_header=False, expand=False)
    status_table.add_column("Metric", style="cyan")
    status_table.add_column("Value", style="white")
    
    # Add status row with color based on status value
    status_color = "green" if status == "COMPLETED" else "yellow" if status == "RUNNING" else "red"
    status_table.add_row("Status", f"[bold {status_color}]{status}[/bold {status_color}]")
    
    # Add rounds progress
    status_table.add_row("Round", f"{current_round}/{total_rounds}")
    
    # Add progress percentage
    status_table.add_row("Progress", f"[green]{progress:.1f}%[/green]")
    
    # Add timestamps if available
    if "created_at" in status_data:
        status_table.add_row("Created", status_data.get("created_at", "N/A").replace("T", " ").split(".")[0])
    if "updated_at" in status_data:
        status_table.add_row("Last Updated", status_data.get("updated_at", "N/A").replace("T", " ").split(".")[0])
    
    display.print(status_table)
    
    # Add progress bar visual for better UX
    if status == "RUNNING":
        from rich.progress import BarColumn, Progress, TextColumn
        progress_bar = Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%")
        )
        
        with progress_bar:
            task = progress_bar.add_task("Tournament Progress", total=100, completed=progress)  # noqa: F841
            # Just show the bar visualization, don't actually wait/update

def display_tournament_results(results_data: Dict[str, Any], output: Optional[Console] = None):
    """
    Display tournament results with better formatting using Rich.
    
    This function takes a dictionary containing tournament results and displays
    it in a formatted table using the Rich library. The table includes the
    tournament name, type, final status, total rounds, storage path, models
    used, and execution stats if available.
    
    Args:
        results_data: Dictionary with tournament results
        output: Optional console to use (defaults to shared console)
    """
    # Use provided console or default
    display = output or console
    
    # Display section title
    display.print(Rule("[bold blue]Tournament Results[/bold blue]"))
    
    # Create summary table
    summary_table = Table(
        title="[bold green]Final Results Summary[/bold green]", 
        box=box.ROUNDED, 
        show_header=False,
        expand=False
    )
    summary_table.add_column("Metric", style="cyan", no_wrap=True)
    summary_table.add_column("Value", style="white")

    # Add tournament information
    summary_table.add_row("Tournament Name", escape(results_data.get('config', {}).get('name', 'N/A')))
    summary_table.add_row("Tournament Type", escape(results_data.get('config', {}).get('tournament_type', 'N/A')))
    summary_table.add_row("Final Status", f"[bold green]{escape(results_data.get('status', 'N/A'))}[/bold green]")
    summary_table.add_row("Total Rounds", str(results_data.get('config', {}).get('rounds', 'N/A')))
    
    # Add storage path if available
    storage_path = results_data.get("storage_path")
    summary_table.add_row("Storage Path", escape(storage_path) if storage_path else "[dim]Not available[/dim]")
    
    # Display summary table
    display.print(summary_table)
    
    # Display models used in tournament
    models = results_data.get('config', {}).get('models', [])
    if models:
        model_table = Table(title="[bold]Models Used[/bold]", box=box.SIMPLE, show_header=True)
        model_table.add_column("Provider", style="magenta")
        model_table.add_column("Model", style="blue")
        
        for model_config in models:
            model_id = model_config.get('model_id', 'N/A')
            if ':' in model_id:
                provider, model = model_id.split(':', 1)
                model_table.add_row(provider, model)
            else:
                model_table.add_row("Unknown", model_id)
        
        display.print(model_table)
    
    # Display execution stats if available
    if any(key in results_data for key in ["processing_time", "cost", "tokens"]):
        _display_stats(results_data, display)

def display_completion_result(
    console: Console, 
    result: Any, 
    title: str = "Completion Result"
):
    """
    Display a completion result with stats.
    
    This function takes a completion result and displays it in a formatted panel
    using the Rich library. The panel includes the completion text and various
    stats such as input tokens, output tokens, total tokens, cost, and processing
    time if available.
    
    Args:
        console: Rich console to print to
        result: Completion result to display
        title: Title for the result panel
    """
    # Display the completion text
    console.print(Panel(
        result.text.strip(),
        title=title,
        border_style="green",
        expand=False
    ))
    
    # Display stats
    stats_table = Table(title="Completion Stats", show_header=False, box=None)
    stats_table.add_column("Metric", style="green")
    stats_table.add_column("Value", style="white")
    
    # Add standard metrics if they exist
    if hasattr(result, "input_tokens"):
        stats_table.add_row("Input Tokens", str(result.input_tokens))
    if hasattr(result, "output_tokens"):
        stats_table.add_row("Output Tokens", str(result.output_tokens))
    if hasattr(result, "total_tokens"):
        stats_table.add_row("Total Tokens", str(result.total_tokens))
    if hasattr(result, "cost"):
        stats_table.add_row("Cost", f"${result.cost:.6f}")
    if hasattr(result, "processing_time"):
        stats_table.add_row("Processing Time", f"{result.processing_time:.3f}s")
    
    console.print(stats_table)

def display_cache_stats(
    stats: Dict[str, Any], 
    stats_log: Optional[Dict[int, Dict[str, int]]] = None,
    console: Optional[Console] = None
):
    """
    Display cache statistics in a formatted table.
    
    This function takes a dictionary of cache statistics and displays it in a
    formatted table using the Rich library. The table includes information such
    as cache enabled status, persistence, hit rate, total gets, cache hits,
    cache misses, total sets, and estimated savings if available.
    
    Args:
        stats: Cache statistics dictionary
        stats_log: Optional log of statistics at different stages
        console: Rich console to print to (creates one if None)
    """
    if console is None:
        from ultimate_mcp_server.utils.logging.console import console
    
    # Create the stats table
    stats_table = Table(title="Cache Statistics", box=box.SIMPLE)
    stats_table.add_column("Metric", style="cyan")
    stats_table.add_column("Value", style="white")
    
    # Add enabled state
    stats_table.add_row(
        "Cache Enabled",
        "[green]Yes[/green]" if stats.get("enabled", False) else "[red]No[/red]"
    )
    
    # Add persistence information
    stats_table.add_row(
        "Persistence",
        "[green]Enabled[/green]" if stats.get("persistence", False) else "[yellow]Disabled[/yellow]"
    )
    
    # Add hit and miss counts
    cache_stats = stats.get("stats", {})
    stats_table.add_row("Total Gets", str(cache_stats.get("get_count", 0)))
    stats_table.add_row("Cache Hits", str(cache_stats.get("hit_count", 0)))
    stats_table.add_row("Cache Misses", str(cache_stats.get("miss_count", 0)))
    stats_table.add_row("Total Sets", str(cache_stats.get("set_count", 0)))
    
    # Calculate hit rate
    gets = cache_stats.get("get_count", 0)
    hits = cache_stats.get("hit_count", 0)
    hit_rate = (hits / gets) * 100 if gets > 0 else 0
    stats_table.add_row("Hit Rate", f"{hit_rate:.1f}%")
    
    # Add estimated savings if available
    if "savings" in stats:
        savings = stats["savings"]
        if isinstance(savings, dict) and "cost" in savings:
            stats_table.add_row("Cost Savings", f"${savings['cost']:.6f}")
        if isinstance(savings, dict) and "time" in savings:
            stats_table.add_row("Time Savings", f"{savings['time']:.3f}s")
    
    console.print(stats_table)
    
    # Display changes over time if stats_log is provided
    if stats_log and len(stats_log) > 1:
        changes_table = Table(title="Cache Changes During Demo", box=box.SIMPLE)
        changes_table.add_column("Stage", style="cyan")
        changes_table.add_column("Gets", style="white")
        changes_table.add_column("Hits", style="green")
        changes_table.add_column("Misses", style="yellow")
        changes_table.add_column("Sets", style="blue")
        
        for stage, stage_stats in sorted(stats_log.items()):
            changes_table.add_row(
                f"Step {stage}",
                str(stage_stats.get("get_count", 0)),
                str(stage_stats.get("hit_count", 0)),
                str(stage_stats.get("miss_count", 0)),
                str(stage_stats.get("set_count", 0))
            )
        
        console.print(changes_table)

def parse_and_display_result(
    title: str, 
    input_data: Dict, 
    result: Any,
    console: Optional[Console] = None
):
    """
    Parse and display extraction results.
    
    This function takes a title, input data, and extraction result, and displays
    the extracted data in a formatted panel using the Rich library. The function
    supports various extraction formats such as JSON, tables, and entity data.
    
    Args:
        title: Title for the display
        input_data: Input data used for the extraction
        result: Extraction result
        console: Rich console to print to (creates one if None)
    """
    if console is None:
        from ultimate_mcp_server.utils.logging.console import console
    
    console.print(Rule(f"[bold blue]{title}[/bold blue]"))
    
    # Check for errors first
    if "error" in result and result["error"]:
        console.print(f"[bold red]Error:[/bold red] {result['error']}")
        if "raw_text" in result:
            console.print(Panel(result["raw_text"], title="Raw Response", border_style="red"))
        return
    
    # Display the extracted data based on expected keys for different demos
    extracted_data_displayed = False
    
    # 1. JSON Extraction (expects 'json' key)
    if "json" in result and isinstance(result["json"], dict):
        data = result["json"]
        json_str = json.dumps(data, indent=2)
        syntax = Syntax(json_str, "json", theme="monokai", line_numbers=True)
        console.print(Panel(syntax, title="Extracted JSON Data", border_style="green"))
        extracted_data_displayed = True
        
    # 2. Table Extraction (expects 'formats' and 'metadata')
    elif "formats" in result and isinstance(result["formats"], dict):
        formats = result["formats"]
        if "json" in formats and formats["json"]:
            try:
                _display_json_data(formats["json"], "Extracted Table (JSON)", console)
                extracted_data_displayed = True
            except Exception as e:
                console.print(f"[red]Error displaying table JSON: {e}[/red]")
        if "markdown" in formats and formats["markdown"]:
            try:
                console.print(Panel(Syntax(formats["markdown"], "markdown", theme="default"), title="Extracted Table (Markdown)", border_style="dim green"))
                extracted_data_displayed = True # Even if JSON fails, MD might succeed
            except Exception as e:
                 console.print(f"[red]Error displaying table Markdown: {e}[/red]")
        if "metadata" in result and result["metadata"]:
             try:
                _display_json_data(result["metadata"], "Table Metadata", console)
             except Exception as e:
                console.print(f"[red]Error displaying table metadata: {e}[/red]")
                
    # 3. Schema Inference / Entity Extraction (expects 'extracted_data')
    elif "extracted_data" in result:
        data = result["extracted_data"]
        # Check if it looks like entity data (dict with list values)
        is_entity_data = False
        if isinstance(data, dict):
            is_entity_data = all(isinstance(v, list) for v in data.values()) 
            
        if is_entity_data: 
            # Simplified entity display for this function
            entity_table = Table(title="[bold green]Extracted Entities[/bold green]", box=box.ROUNDED)
            entity_table.add_column("Category", style="cyan")
            entity_table.add_column("Value", style="white")
            for category, items in data.items():
                for item in items:
                     entity_text = str(item.get('name', item)) if isinstance(item, dict) else str(item)
                     entity_table.add_row(category, escape(entity_text))
            if entity_table.row_count > 0:
                console.print(entity_table)
                extracted_data_displayed = True
            else:
                 console.print("[yellow]No entities found.[/yellow]")
                 extracted_data_displayed = True # Still counts as displayed
        else:
            # Assume other 'extracted_data' is generic JSON
            try:
                _display_json_data(data, "Extracted Data", console)
                extracted_data_displayed = True
            except Exception as e:
                console.print(f"[red]Error displaying extracted data: {e}[/red]")
                
    # Fallback if no specific keys matched
    if not extracted_data_displayed:
        console.print("[yellow]Could not find expected data keys (json, formats, extracted_data) in result.[/yellow]")
        # Optionally display the whole result as JSON for debugging
        try:
            full_result_json = json.dumps(result, indent=2, default=str) # Use default=str for non-serializable items
            console.print(Panel(Syntax(full_result_json, "json", theme="monokai", line_numbers=False), title="[dim]Full Result Object[/dim]", border_style="dim"))
        except Exception:
            pass # Ignore if full result can't be serialized

    # Display performance metrics
    if any(k in result for k in ["tokens", "cost", "processing_time"]):
        metrics_table = Table(title="Performance Metrics", box=None)
        metrics_table.add_column("Metric", style="cyan")
        metrics_table.add_column("Value", style="white")
        
        # Add provider and model info
        if "provider" in result:
            metrics_table.add_row("Provider", result["provider"])
        if "model" in result:
            metrics_table.add_row("Model", result["model"])
        
        # Add token usage
        if "tokens" in result:
            tokens = result["tokens"]
            if isinstance(tokens, dict):
                for token_type, count in tokens.items():
                    metrics_table.add_row(f"{token_type.title()} Tokens", str(count))
            else:
                metrics_table.add_row("Total Tokens", str(tokens))
        
        # Add cost and timing
        if "cost" in result:
            metrics_table.add_row("Cost", f"${result['cost']:.6f}")
        if "processing_time" in result:
            metrics_table.add_row("Processing Time", f"{result['processing_time']:.3f}s")
        
        console.print(metrics_table)

def display_table_data(table_data: List[Dict], console: Console):
    """
    Display tabular data extracted from text.
    
    This function takes a list of dictionaries representing table rows and
    displays it in a formatted table using the Rich library. The table is also
    displayed as JSON for reference.
    
    Args:
        table_data: List of dictionaries representing table rows
        console: Rich console to print to
    """
    if not table_data:
        console.print("[yellow]No table data found[/yellow]")
        return
    
    # Create a Rich table from the data
    rich_table = Table(box=box.SIMPLE)
    
    # Add columns from the first row's keys
    columns = list(table_data[0].keys())
    for column in columns:
        rich_table.add_column(str(column), style="cyan")
    
    # Add rows
    for row in table_data:
        rich_table.add_row(*[str(row.get(col, "")) for col in columns])
    
    console.print(rich_table)
    
    # Also display as JSON for reference
    json_str = json.dumps(table_data, indent=2)
    syntax = Syntax(json_str, "json", theme="monokai", line_numbers=True)
    console.print(Panel(syntax, title="Table Data (JSON)", border_style="blue"))

def display_key_value_pairs(pairs: List[Dict], console: Console):
    """
    Display key-value pairs extracted from text.
    
    This function takes a list of dictionaries with 'key' and 'value' fields
    and displays it in a formatted table using the Rich library.
    
    Args:
        pairs: List of dictionaries with 'key' and 'value' fields
        console: Rich console to print to
    """
    if not pairs:
        console.print("[yellow]No key-value pairs found[/yellow]")
        return
    
    # Create a Rich table for the key-value pairs
    kv_table = Table(box=None)
    kv_table.add_column("Key", style="green")
    kv_table.add_column("Value", style="white")
    
    for pair in pairs:
        kv_table.add_row(pair.get("key", ""), pair.get("value", ""))
    
    console.print(Panel(kv_table, title="Extracted Key-Value Pairs", border_style="green")) 



logger = get_logger(__name__) # Initialize logger for this module

async def safe_tool_call(tool_func, args_dict, description=""):
    """
    Helper function to safely call an async tool function and display results/errors.
    
    This function wraps an async tool function call and handles common error patterns
    (ToolError, ProtectionTriggeredError, generic exceptions) and formats successful
    outputs for various common tool result structures using Rich.
    
    Args:
        tool_func: The asynchronous tool function to call.
        args_dict: A dictionary of arguments to pass to the tool function.
        description: A description of the tool call for display purposes.
    
    Returns:
        A dictionary containing:
        - 'success': Boolean indicating if the call was successful (no errors/protection).
        - 'result': The raw result from the tool function.
        - 'error' (optional): Error message if an error occurred.
        - 'details' (optional): Additional error details.
        - 'protection_triggered' (optional): Boolean, true if deletion protection was triggered.
        - 'context' (optional): Context dictionary from ProtectionTriggeredError.
    """
    tool_name = tool_func.__name__
    call_desc = description or f"Calling [bold magenta]{tool_name}[/bold magenta]"
    # Use pretty_repr for args for better complex type display
    args_str = ", ".join(f"{k}=[yellow]{pretty_repr(v)}[/yellow]" for k, v in args_dict.items())
    console.print(Panel(f"{call_desc}\nArgs: {args_str}", title="Tool Call", border_style="blue", expand=False))

    start_time = time.monotonic()
    try:
        # Directly await the function
        result = await tool_func(**args_dict)
        duration = time.monotonic() - start_time

        # Check for error/protection structure (often returned by @with_error_handling)
        is_error = isinstance(result, dict) and (result.get("error") is not None or result.get("isError") is True)
        is_protection_triggered = isinstance(result, dict) and result.get("protectionTriggered") is True

        if is_protection_triggered:
             error_msg = result.get("error", "Protection triggered, reason unspecified.")
             context = result.get("details", {}).get("context", {}) # Context might be nested
             console.print(Panel(
                 f"[bold yellow]🛡️ Protection Triggered![/bold yellow]\n"
                 f"Message: {escape(error_msg)}\n"
                 f"Context: {pretty_repr(context)}",
                 title=f"Result: {tool_name} (Blocked)",
                 border_style="yellow",
                 subtitle=f"Duration: {duration:.3f}s"
             ))
             return {"success": False, "protection_triggered": True, "result": result, "error": error_msg, "context": context}

        elif is_error:
            error_msg = result.get("error", "Unknown error occurred.")
            error_code = result.get("error_code", "UNKNOWN_ERROR")
            error_type = result.get("error_type", "ERROR")
            details = result.get("details", None)

            logger.debug(f"Error response structure from {tool_name}: {pretty_repr(result)}")

            error_content = f"[bold red]Error ({error_code})[/bold red]\n"
            error_content += f"Type: {error_type}\n"
            error_content += f"Message: {escape(str(error_msg))}"
            if details:
                 error_content += f"\nDetails:\n{pretty_repr(details)}"
            else:
                 error_content += "\nDetails: N/A"

            console.print(Panel(
                error_content,
                title=f"Result: {tool_name} (Failed)",
                border_style="red",
                subtitle=f"Duration: {duration:.3f}s"
            ))
            return {"success": False, "error": error_msg, "details": details, "result": result, "error_code": error_code}

        else:
            # Successful result - display nicely
            output_content = ""
            if isinstance(result, dict):
                 # Common success patterns
                 if "message" in result:
                      output_content += f"Message: [green]{escape(result['message'])}[/green]\n"
                 if "path" in result:
                      output_content += f"Path: [cyan]{escape(str(result['path']))}[/cyan]\n"
                 if "size" in result and not any(k in result for k in ["content", "files"]): # Avoid printing size if content/files also present
                      # Only print size if it's the primary info (like in write_file result)
                      output_content += f"Size: [yellow]{result['size']}[/yellow] bytes\n"
                 if "created" in result and isinstance(result['created'], bool):
                    output_content += f"Created: {'Yes' if result['created'] else 'No (already existed)'}\n"
                 # Handle 'diff' from edit_file
                 if "diff" in result and result.get("diff") not in ["No changes detected after applying edits.", "No edits provided.", None, ""]:
                       diff_content = result['diff']
                       output_content += f"Diff:\n{Syntax(diff_content, 'diff', theme='monokai', background_color='default')}\n"
                 # Handle 'matches' from search_files
                 if "matches" in result and "pattern" in result:
                       output_content += f"Search Matches ({len(result['matches'])} for pattern '{result['pattern']}'):\n"
                       rel_base = Path(result.get("path", "."))
                       output_content += "\n".join(f"- [cyan]{escape(os.path.relpath(m, rel_base))}[/cyan]" for m in result['matches'][:20])
                       if len(result['matches']) > 20:
                            output_content += "\n- ... (more matches)"
                       if result.get("warnings"):
                            output_content += "\n[yellow]Warnings:[/yellow]\n" + "\n".join(f"- {escape(w)}" for w in result['warnings']) + "\n"
                 # Handle 'entries' from list_directory
                 elif "entries" in result and "path" in result:
                       output_content += f"Directory Listing for [cyan]{escape(str(result['path']))}[/cyan]:\n"
                       table = Table(show_header=True, header_style="bold magenta", box=None)
                       table.add_column("Name", style="cyan", no_wrap=True)
                       table.add_column("Type", style="green")
                       table.add_column("Info", style="yellow")
                       for entry in result.get('entries', []):
                            name = entry.get('name', '?')
                            etype = entry.get('type', 'unknown')
                            info_str = ""
                            if etype == 'file' and 'size' in entry:
                                 info_str += f"{entry['size']} bytes"
                            elif etype == 'symlink' and 'symlink_target' in entry:
                                 info_str += f"-> {escape(str(entry['symlink_target']))}"
                            if 'error' in entry:
                                 info_str += f" [red](Error: {escape(entry['error'])})[/red]"
                            icon = "📄" if etype == "file" else "📁" if etype == "directory" else "🔗" if etype=="symlink" else "❓"
                            table.add_row(f"{icon} {escape(name)}", etype, info_str)
                       with Capture(console) as capture: # Use Capture from rich.console
                            console.print(table)
                       output_content += capture.get()
                       if result.get("warnings"):
                            output_content += "\n[yellow]Warnings:[/yellow]\n" + "\n".join(f"- {escape(w)}" for w in result['warnings']) + "\n"
                 # Handle 'tree' from directory_tree
                 elif "tree" in result and "path" in result:
                       output_content += f"Directory Tree for [cyan]{escape(str(result['path']))}[/cyan]:\n"
                       # Local helper function to build the rich tree recursively
                       def build_rich_tree_display(parent_node, children):
                            for item in children:
                                name = item.get("name", "?")
                                item_type = item.get("type", "unknown")
                                info = ""
                                if "size" in item:
                                     size_bytes = item['size']
                                     if size_bytes < 1024: 
                                         info += f" ({size_bytes}b)"
                                     elif size_bytes < 1024 * 1024: 
                                         info += f" ({size_bytes/1024:.1f}KB)"
                                     else: 
                                         info += f" ({size_bytes/(1024*1024):.1f}MB)"
                                if "target" in item: 
                                    info += f" → {escape(item['target'])}"
                                if "error" in item: 
                                    info += f" [red](Error: {escape(item['error'])})[/red]"

                                if item_type == "directory":
                                    node = parent_node.add(f"📁 [bold cyan]{escape(name)}[/bold cyan]{info}")
                                    if "children" in item: 
                                        build_rich_tree_display(node, item["children"])
                                elif item_type == "file":
                                     icon = "📄" # Default icon
                                     ext = os.path.splitext(name)[1].lower()
                                     if ext in ['.jpg', '.png', '.gif', '.bmp', '.jpeg', '.svg']: 
                                         icon = "🖼️"
                                     elif ext in ['.mp3', '.wav', '.ogg', '.flac']: 
                                         icon = "🎵"
                                     elif ext in ['.mp4', '.avi', '.mov', '.mkv']: 
                                         icon = "🎬"
                                     elif ext in ['.py', '.js', '.java', '.c', '.cpp', '.go', '.rs']: 
                                         icon = "📜"
                                     elif ext in ['.json', '.xml', '.yaml', '.yml']: 
                                         icon = "📋"
                                     elif ext in ['.zip', '.tar', '.gz', '.7z', '.rar']: 
                                         icon = "📦"
                                     elif ext in ['.md', '.txt', '.doc', '.docx', '.pdf']: 
                                         icon = "📝"
                                     parent_node.add(f"{icon} [green]{escape(name)}[/green]{info}")
                                elif item_type == "symlink": 
                                    parent_node.add(f"🔗 [magenta]{escape(name)}[/magenta]{info}")
                                elif item_type == "info": 
                                    parent_node.add(f"ℹ️ [dim]{escape(name)}[/dim]")
                                elif item_type == "error": 
                                    parent_node.add(f"❌ [red]{escape(name)}[/red]{info}")
                                else: 
                                    parent_node.add(f"❓ [yellow]{escape(name)}[/yellow]{info}")

                       rich_tree_root = Tree(f"📁 [bold cyan]{escape(os.path.basename(result['path']))}[/bold cyan]")
                       build_rich_tree_display(rich_tree_root, result["tree"])
                       with Capture(console) as capture: # Use Capture from rich.console
                           console.print(rich_tree_root)
                       output_content += capture.get()
                 # Handle 'directories' from list_allowed_directories
                 elif "directories" in result and "count" in result:
                        output_content += f"Allowed Directories ({result['count']}):\n"
                        output_content += "\n".join(f"- [green]{escape(d)}[/green]" for d in result['directories']) + "\n"
                 # Handle 'files' from read_multiple_files
                 elif "files" in result and "succeeded" in result:
                        output_content += f"Read Results: [green]{result['succeeded']} succeeded[/green], [red]{result['failed']} failed[/red]\n"
                        for file_res in result.get('files', []):
                            path_str = escape(str(file_res.get('path', 'N/A')))
                            if file_res.get('success'):
                                size_info = f" ({file_res.get('size', 'N/A')}b)" if 'size' in file_res else ""
                                # Use preview if available, else content snippet
                                content_display = file_res.get('preview', file_res.get('content', ''))
                                output_content += f"- [green]Success[/green]: [cyan]{path_str}[/cyan]{size_info}\n  Content: '{escape(str(content_display))}'\n"
                            else:
                                output_content += f"- [red]Failed[/red]: [cyan]{path_str}[/cyan]\n  Error: {escape(str(file_res.get('error', 'Unknown')))}\n"
                 # Handle 'content' block (from read_file)
                 elif "content" in result and "path" in result: # More specific check for read_file
                     # Check if content is list of blocks (MCP format) or simple string/bytes
                     content_data = result["content"]
                     preview_content = ""
                     if isinstance(content_data, list) and content_data and "text" in content_data[0]:
                         # Assumes MCP text block format
                         preview_content = "\n".join([escape(block.get("text","")) for block in content_data if block.get("type")=="text"])
                     elif isinstance(content_data, str):
                         # Simple string content
                         preview_content = escape(content_data[:1000] + ('...' if len(content_data) > 1000 else '')) # Limit preview
                     elif isinstance(content_data, bytes):
                         # Handle bytes (e.g., hex preview)
                         try:
                             import binascii
                             hex_preview = binascii.hexlify(content_data[:64]).decode('ascii') # Preview first 64 bytes
                             preview_content = f"[dim]Binary Content (Hex Preview):[/dim]\n{hex_preview}{'...' if len(content_data) > 64 else ''}"
                         except Exception:
                             preview_content = "[dim]Binary Content (Preview unavailable)[/dim]"
                     
                     if preview_content: # Only add if we have something to show
                         output_content += f"Content ({result.get('size', 'N/A')} bytes):\n{preview_content}\n"
                     elif 'size' in result: # If no content preview but size exists
                         output_content += f"Size: [yellow]{result['size']}[/yellow] bytes\n"

                 # Handle 'modified' from get_file_info
                 elif "name" in result and "modified" in result:
                      output_content += f"File Info for [cyan]{escape(result['name'])}[/cyan]:\n"
                      info_table = Table(show_header=False, box=None)
                      info_table.add_column("Property", style="blue")
                      info_table.add_column("Value", style="yellow")
                      skip_keys = {"success", "message", "path", "name"}
                      for k, v in result.items():
                           if k not in skip_keys:
                               info_table.add_row(escape(k), pretty_repr(v))
                      with Capture(console) as capture: # Use Capture from rich.console
                           console.print(info_table)
                      output_content += capture.get()

                 # Fallback for other dictionaries
                 else:
                     excluded_keys = {'content', 'tree', 'entries', 'matches', 'files', 'success', 'message'}
                     display_dict = {k:v for k,v in result.items() if k not in excluded_keys}
                     if display_dict:
                         output_content += "Result Data:\n" + pretty_repr(display_dict) + "\n"
                     elif not output_content: # If nothing else was printed
                          output_content = "[dim](Tool executed successfully, no specific output format matched)[/dim]"

            # Handle non-dict results (should be rare)
            else:
                 output_content = escape(str(result))

            console.print(Panel(
                 output_content,
                 title=f"Result: {tool_name} (Success)",
                 border_style="green",
                 subtitle=f"Duration: {duration:.3f}s"
            ))
            return {"success": True, "result": result}

    except ProtectionTriggeredError as pte:
         duration = time.monotonic() - start_time
         logger.warning(f"Protection triggered calling {tool_name}: {pte}", exc_info=True) # Use logger from display.py
         console.print(Panel(
             f"[bold yellow]🛡️ Protection Triggered![/bold yellow]\n"
             f"Message: {escape(str(pte))}\n"
             f"Context: {pretty_repr(pte.context)}",
             title=f"Result: {tool_name} (Blocked)",
             border_style="yellow",
             subtitle=f"Duration: {duration:.3f}s"
         ))
         return {"success": False, "protection_triggered": True, "error": str(pte), "context": pte.context, "result": None}
    except (ToolInputError, ToolError) as tool_err:
         duration = time.monotonic() - start_time
         error_code = getattr(tool_err, 'error_code', 'TOOL_ERROR')
         details = getattr(tool_err, 'details', None) # Use getattr with default None
         logger.error(f"Tool Error calling {tool_name}: {tool_err} ({error_code})", exc_info=True, extra={'details': details}) # Use logger from display.py

         error_content = f"[bold red]{type(tool_err).__name__} ({error_code})[/bold red]\n"
         error_content += f"Message: {escape(str(tool_err))}"
         if details:
              error_content += f"\nDetails:\n{pretty_repr(details)}"
         else:
              error_content += "\nDetails: N/A"

         error_content += f"\n\nFunction: [yellow]{tool_name}[/yellow]"
         error_content += f"\nArguments: [dim]{pretty_repr(args_dict)}[/dim]" # Use pretty_repr for args

         console.print(Panel(
             error_content,
             title=f"Result: {tool_name} (Failed)",
             border_style="red",
             subtitle=f"Duration: {duration:.3f}s"
         ))
         return {"success": False, "error": str(tool_err), "details": details, "error_code": error_code, "result": None}
    except Exception as e:
        duration = time.monotonic() - start_time
        logger.critical(f"Unexpected Exception calling {tool_name}: {e}", exc_info=True) # Use logger from display.py
        console.print(Panel(
            f"[bold red]Unexpected Error ({type(e).__name__})[/bold red]\n"
            f"{escape(str(e))}",
            title=f"Result: {tool_name} (Critical Failure)",
            border_style="red",
            subtitle=f"Duration: {duration:.3f}s"
        ))
        # Include basic args in details for unexpected errors too
        return {"success": False, "error": f"Unexpected: {str(e)}", "details": {"type": type(e).__name__, "args": args_dict}, "result": None} 

# --- Async Rich Directory Tree Builder ---
# RESTORED ASYNC VERSION

async def _build_rich_directory_tree_recursive(
    path: Path, 
    tree_node: Tree, 
    depth: int, 
    max_depth: int
):
    """
    Recursive helper to build a Rich Tree using async list_directory.
    
    This function is a recursive helper for generating a Rich Tree representation
    of a directory structure using the async list_directory tool. It traverses the
    directory tree and adds nodes for each file, directory, and symlink encountered,
    with appropriate icons and styling based on file types.
    
    Args:
        path: The current directory path (Path object) to display.
        tree_node: The parent Tree node to add child nodes to.
        depth: The current recursion depth in the directory tree.
        max_depth: The maximum depth to traverse, preventing excessive recursion.
    
    Note:
        This function uses different icons for different file types and includes 
        size information for files and target information for symlinks when available.
    """
    if depth >= max_depth:
        tree_node.add("📁 [dim]...(max depth reached)[/dim]")
        return

    try:
        # Call the async list_directory tool
        list_result = await list_directory(path=str(path))

        # Handle potential errors from the list_directory call itself
        if isinstance(list_result, dict) and list_result.get("error"):
            error_msg = list_result.get("error", "Unknown listing error")
            tree_node.add(f"❌ [red]Error listing: {escape(error_msg)}[/red]")
            return
        
        # Ensure result structure is as expected
        if not isinstance(list_result, dict) or "entries" not in list_result:
             tree_node.add(f"❓ [yellow]Unexpected result format from list_directory for {escape(str(path))}[/yellow]")
             logger.warning(f"Unexpected list_directory result for {path}: {list_result}")
             return
             
        entries = sorted(list_result.get("entries", []), key=lambda x: x.get("name", ""))

        for item in entries:
            name = item.get("name", "?")
            item_type = item.get("type", "unknown")
            entry_error = item.get("error")
            item_path = path / name
            
            # Skip hidden files/dirs (same logic as demo)
            # if name.startswith('.') and name != '.gitignore':
            #     continue
                
            # Handle entry-specific errors
            if entry_error:
                tree_node.add(f"❌ [red]{escape(name)} - Error: {escape(entry_error)}[/red]")
                continue
                
            info = ""
            # Use size reported by list_directory
            if "size" in item and item_type == "file": 
                 size_bytes = item['size']
                 if size_bytes < 1024: 
                     info += f" ({size_bytes}b)"
                 elif size_bytes < 1024 * 1024: 
                     info += f" ({size_bytes/1024:.1f}KB)"
                 else: 
                     info += f" ({size_bytes/(1024*1024):.1f}MB)"
            # Use symlink_target reported by list_directory
            if item_type == "symlink" and "symlink_target" in item: 
                info += f" → {escape(str(item['symlink_target']))}" 

            if item_type == "directory":
                # Add node for directory
                dir_node = tree_node.add(f"📁 [bold cyan]{escape(name)}[/bold cyan]{info}")
                # Recurse into subdirectory
                await _build_rich_directory_tree_recursive(item_path, dir_node, depth + 1, max_depth)
            elif item_type == "file":
                 # Icon logic copied from demo
                 icon = "📄"
                 ext = os.path.splitext(name)[1].lower()
                 if ext in ['.jpg', '.png', '.gif', '.bmp', '.jpeg', '.svg']: 
                     icon = "🖼️"
                 elif ext in ['.mp3', '.wav', '.ogg', '.flac']: 
                     icon = "🎵"
                 elif ext in ['.mp4', '.avi', '.mov', '.mkv']: 
                     icon = "🎬"
                 elif ext in ['.py', '.js', '.java', '.c', '.cpp', '.go', '.rs']: 
                     icon = "📜"
                 elif ext in ['.json', '.xml', '.yaml', '.yml']: 
                     icon = "📋"
                 elif ext in ['.zip', '.tar', '.gz', '.7z', '.rar']: 
                     icon = "📦"
                 elif ext in ['.md', '.txt', '.doc', '.docx', '.pdf']: 
                     icon = "📝"
                 tree_node.add(f"{icon} [green]{escape(name)}[/green]{info}")
            elif item_type == "symlink": 
                tree_node.add(f"🔗 [magenta]{escape(name)}[/magenta]{info}")
            # Handle potential info/error items from list_directory (though less common than directory_tree)
            elif item_type == "info": 
                 tree_node.add(f"ℹ️ [dim]{escape(name)}[/dim]")
            elif item_type == "error":
                 tree_node.add(f"❌ [red]{escape(name)}[/red]{info}")
            else: # Handle unknown type
                tree_node.add(f"❓ [yellow]{escape(name)}[/yellow]{info}")
                
    except Exception as e:
        # Catch unexpected errors during the process for this path
        logger.error(f"Unexpected error building tree for {path}: {e}", exc_info=True)
        tree_node.add(f"❌ [bold red]Failed to process: {escape(str(path))} ({type(e).__name__})[/bold red]")

async def generate_rich_directory_tree(path: Union[str, Path], max_depth: int = 3) -> Tree:
    """
    Generates a `rich.Tree` visualization of a directory using async filesystem tools.
    
    This function generates a Rich Tree representation of a directory structure
    using the async filesystem tools provided by the Ultimate MCP Server. It
    supports traversing the directory tree up to a specified maximum depth.
    
    Args:
        path: The starting directory path (string or Path object).
        max_depth: The maximum depth to traverse.
    
    Returns:
        A `rich.Tree` object representing the directory structure.
    """
    start_path = Path(path)
    tree_root = Tree(f"📁 [bold cyan]{escape(start_path.name)}[/bold cyan]")
    
    # Check if the root path exists and is a directory before starting recursion
    try:
        # Use list_directory for the initial check
        initial_check = await list_directory(path=str(start_path))
        if isinstance(initial_check, dict) and initial_check.get("error"):
             # Check if the error is because it's not a directory or doesn't exist
             error_msg = initial_check['error']
             if "Not a directory" in error_msg or "No such file or directory" in error_msg:
                 tree_root.add(f"❌ [red]{escape(error_msg)}: {escape(str(start_path))}[/red]")
             else:
                 tree_root.add(f"❌ [red]Error accessing root path: {escape(error_msg)}[/red]")
             return tree_root # Return tree with only the error
        # We assume if list_directory didn't error, it's a directory.
    except Exception as e:
        logger.error(f"Error during initial check for {start_path}: {e}", exc_info=True)
        tree_root.add(f"❌ [bold red]Failed initial check: {escape(str(start_path))} ({type(e).__name__})[/bold red]")
        return tree_root

    # Start the recursive build if initial check seems okay
    await _build_rich_directory_tree_recursive(start_path, tree_root, depth=0, max_depth=max_depth)
    return tree_root 

# --- Cost Tracking Utility ---

class CostTracker:
    """
    Tracks and aggregates API call costs and token usage across multiple LLM operations.
    
    The CostTracker provides a centralized mechanism for monitoring API usage costs
    and token consumption across multiple calls to language model providers. It maintains
    a structured record of costs organized by provider and model, supporting various
    result formats from different API calls.
    
    The tracker can extract cost and token information from both object attributes
    (like CompletionResult objects) and dictionary structures (like tool results),
    making it versatile for different API response formats.
    
    Features:
    - Detailed tracking by provider and model
    - Support for input and output token counts
    - Optional cost limit monitoring
    - Rich console visualization of cost summaries
    - Aggregation of calls, tokens, and costs
    
    Usage example:
    ```python
    # Initialize tracker
    tracker = CostTracker(limit=5.0)  # Set $5.00 cost limit
    
    # Track costs from various API calls
    tracker.add_call(completion_result)
    tracker.add_call(summarization_result)
    
    # Check if limit exceeded
    if tracker.exceeds_limit():
        print("Cost limit exceeded!")
    
    # Display summary in console
    tracker.display_summary(console)
    ```
    
    Attributes:
        data: Nested dictionary storing cost and token data organized by provider and model
        limit: Optional cost limit in USD to monitor usage against
    """
    def __init__(self, limit: Optional[float] = None):
        """
        Initialize a new cost tracker with an optional spending limit.
        
        Args:
            limit: Optional maximum cost limit in USD. If provided, the tracker
                  can report when costs exceed this threshold using exceeds_limit().
        """
        self.data: Dict[str, Dict[str, Dict[str, Any]]] = {} # {provider: {model: {cost, tokens..., calls}}}
        self.limit: Optional[float] = limit  # Cost limit in USD

    @property
    def total_cost(self) -> float:
        """
        Get the total accumulated cost across all providers and models.
        
        Returns:
            float: The sum of all tracked costs in USD
        """
        total = 0.0
        for provider_data in self.data.values():
            for model_data in provider_data.values():
                total += model_data.get("cost", 0.0)
        return total

    def exceeds_limit(self) -> bool:
        """
        Check if the current total cost exceeds the specified limit.
        
        Returns:
            bool: True if a limit is set and the total cost exceeds it, False otherwise
        """
        if self.limit is None:
            return False
        return self.total_cost >= self.limit

    def add_call(self, result: Any, provider: Optional[str] = None, model: Optional[str] = None):
        """
        Add cost and token data from an API call result to the tracker.
        
        This method extracts cost and token information from various result formats,
        including structured objects with attributes (like CompletionResult) or
        dictionaries (like tool results). It intelligently identifies the relevant
        data fields and updates the tracking statistics.
        
        Args:
            result: The API call result containing cost and token information.
                   Can be an object with attributes or a dictionary.
            provider: Optional provider name override. If not specified, will be
                     extracted from the result if available.
            model: Optional model name override. If not specified, will be
                  extracted from the result if available.
                  
        Example:
            ```python
            # Track a direct API call result
            summarization_result = await summarize_document(...)
            tracker.add_call(summarization_result)
            
            # Track with explicit provider/model specification
            tracker.add_call(
                custom_result,
                provider="openai",
                model="gpt-4o"
            )
            ```
        """
        cost = 0.0
        input_tokens = 0
        output_tokens = 0
        total_tokens = 0

        # Try extracting from object attributes (e.g., CompletionResult)
        if hasattr(result, 'cost') and result.cost is not None:
            cost = float(result.cost)
        if hasattr(result, 'provider') and result.provider:
            provider = result.provider
        if hasattr(result, 'model') and result.model:
            model = result.model
        if hasattr(result, 'input_tokens') and result.input_tokens is not None:
            input_tokens = int(result.input_tokens)
        if hasattr(result, 'output_tokens') and result.output_tokens is not None:
            output_tokens = int(result.output_tokens)
        if hasattr(result, 'total_tokens') and result.total_tokens is not None:
            total_tokens = int(result.total_tokens)
        elif input_tokens > 0 or output_tokens > 0:
             total_tokens = input_tokens + output_tokens # Calculate if not present

        # Try extracting from dictionary keys (e.g., tool results, stats dicts)
        elif isinstance(result, dict):
            cost = float(result.get('cost', 0.0))
            provider = result.get('provider', provider) # Use existing if key not found
            model = result.get('model', model)         # Use existing if key not found
            tokens_data = result.get('tokens', {})
            if isinstance(tokens_data, dict):
                input_tokens = int(tokens_data.get('input', 0))
                output_tokens = int(tokens_data.get('output', 0))
                total_tokens = int(tokens_data.get('total', 0))
                if total_tokens == 0 and (input_tokens > 0 or output_tokens > 0):
                     total_tokens = input_tokens + output_tokens
            elif isinstance(tokens_data, (int, float)): # Handle case where 'tokens' is just a total number
                total_tokens = int(tokens_data)

        # --- Fallback / Defaulting ---
        # If provider/model couldn't be determined, use defaults
        provider = provider or "UnknownProvider"
        model = model or "UnknownModel"

        # --- Update Tracking Data ---
        if provider not in self.data:
            self.data[provider] = {}
        if model not in self.data[provider]:
            self.data[provider][model] = {
                "cost": 0.0,
                "input_tokens": 0,
                "output_tokens": 0,
                "total_tokens": 0,
                "calls": 0
            }

        self.data[provider][model]["cost"] += cost
        self.data[provider][model]["input_tokens"] += input_tokens
        self.data[provider][model]["output_tokens"] += output_tokens
        self.data[provider][model]["total_tokens"] += total_tokens
        self.data[provider][model]["calls"] += 1
        
    def record_call(self, provider: str, model: str, input_tokens: int, output_tokens: int, cost: float):
        """
        Directly record a call with explicit token counts and cost.
        
        This method allows manual tracking of API calls with explicit parameter values,
        useful when the cost information isn't available in a structured result object.
        
        Args:
            provider: The provider name (e.g., "openai", "anthropic")
            model: The model name (e.g., "gpt-4", "claude-3-opus")
            input_tokens: Number of input (prompt) tokens
            output_tokens: Number of output (completion) tokens
            cost: The cost of the API call in USD
            
        Example:
            ```python
            tracker.record_call(
                provider="openai",
                model="gpt-4o",
                input_tokens=1500,
                output_tokens=350,
                cost=0.03
            )
            ```
        """
        if provider not in self.data:
            self.data[provider] = {}
        if model not in self.data[provider]:
            self.data[provider][model] = {
                "cost": 0.0,
                "input_tokens": 0,
                "output_tokens": 0,
                "total_tokens": 0,
                "calls": 0
            }
            
        total_tokens = input_tokens + output_tokens
        
        self.data[provider][model]["cost"] += cost
        self.data[provider][model]["input_tokens"] += input_tokens
        self.data[provider][model]["output_tokens"] += output_tokens
        self.data[provider][model]["total_tokens"] += total_tokens
        self.data[provider][model]["calls"] += 1
        
    def add_custom_cost(self, description: str, provider: str, model: str, cost: float, 
                        input_tokens: int = 0, output_tokens: int = 0):
        """
        Add a custom cost entry with an optional description.
        
        This method is useful for tracking costs that aren't directly tied to a specific
        API call, such as batch processing fees, infrastructure costs, or estimated costs.
        
        Args:
            description: A descriptive label for this cost entry (e.g., "Batch Processing")
            provider: The provider name or service category
            model: The model name or service type
            cost: The cost amount in USD
            input_tokens: Optional input token count (default: 0)
            output_tokens: Optional output token count (default: 0)
            
        Example:
            ```python
            tracker.add_custom_cost(
                "Batch Processing",
                "openai",
                "gpt-4-turbo",
                0.25,
                input_tokens=5000,
                output_tokens=1200
            )
            ```
        """
        # Format the model name to include the description
        custom_model = f"{model} ({description})"
        
        if provider not in self.data:
            self.data[provider] = {}
        if custom_model not in self.data[provider]:
            self.data[provider][custom_model] = {
                "cost": 0.0,
                "input_tokens": 0,
                "output_tokens": 0,
                "total_tokens": 0,
                "calls": 0
            }
            
        total_tokens = input_tokens + output_tokens
        
        self.data[provider][custom_model]["cost"] += cost
        self.data[provider][custom_model]["input_tokens"] += input_tokens
        self.data[provider][custom_model]["output_tokens"] += output_tokens
        self.data[provider][custom_model]["total_tokens"] += total_tokens
        self.data[provider][custom_model]["calls"] += 1

    def display_summary(self, console_instance: Optional[Console] = None, title: str = "Total Demo Cost Summary"):
        """
        Display a formatted summary of all tracked costs and token usage in a Rich console table.
        
        This method generates a detailed tabular report showing:
        - Costs broken down by provider and model
        - Number of calls made to each model
        - Input, output, and total token counts
        - Subtotals by provider (when multiple models are used)
        - Grand totals across all providers and models
        - Progress against cost limit (if set)
        
        The report is formatted using Rich tables with color coding for readability.
        
        Args:
            console_instance: Optional Rich Console instance to use for display.
                             If not provided, uses the default console.
            title: Custom title for the summary report.
                  Defaults to "Total Demo Cost Summary".
                  
        Example:
            ```python
            # Display with default settings
            tracker.display_summary()
            
            # Display with custom title and console
            from rich.console import Console
            custom_console = Console(width=100)
            tracker.display_summary(
                console_instance=custom_console,
                title="AI Generation Cost Report"
            )
            ```
        """
        output = console_instance or console # Use provided or default console

        output.print(Rule(f"[bold blue]{escape(title)}[/bold blue]"))

        if not self.data:
            output.print("[yellow]No cost data tracked.[/yellow]")
            return

        summary_table = Table(
            title="[bold]API Call Costs & Tokens[/bold]",
            box=box.ROUNDED,
            show_footer=True,
            footer_style="bold"
        )
        summary_table.add_column("Provider", style="cyan", footer="Grand Total")
        summary_table.add_column("Model", style="magenta")
        summary_table.add_column("Calls", style="blue", justify="right", footer=" ") # Placeholder footer
        summary_table.add_column("Input Tokens", style="yellow", justify="right", footer=" ")
        summary_table.add_column("Output Tokens", style="yellow", justify="right", footer=" ")
        summary_table.add_column("Total Tokens", style="bold yellow", justify="right", footer=" ")
        summary_table.add_column("Total Cost ($)", style="bold green", justify="right", footer=" ")

        grand_total_cost = 0.0
        grand_total_calls = 0
        grand_total_input = 0
        grand_total_output = 0
        grand_total_tokens = 0

        sorted_providers = sorted(self.data.keys())
        for provider in sorted_providers:
            provider_total_cost = 0.0
            provider_total_calls = 0
            provider_total_input = 0
            provider_total_output = 0
            provider_total_tokens = 0
            
            sorted_models = sorted(self.data[provider].keys())
            num_models = len(sorted_models)

            for i, model in enumerate(sorted_models):
                stats = self.data[provider][model]
                provider_total_cost += stats['cost']
                provider_total_calls += stats['calls']
                provider_total_input += stats['input_tokens']
                provider_total_output += stats['output_tokens']
                provider_total_tokens += stats['total_tokens']

                # Display provider only on the first row for that provider
                provider_display = escape(provider) if i == 0 else ""
                
                summary_table.add_row(
                    provider_display,
                    escape(model),
                    str(stats['calls']),
                    f"{stats['input_tokens']:,}",
                    f"{stats['output_tokens']:,}",
                    f"{stats['total_tokens']:,}",
                    f"{stats['cost']:.6f}"
                )
                
            # Add provider subtotal row if more than one model for the provider
            if num_models > 1:
                 summary_table.add_row(
                     "[dim]Subtotal[/dim]",
                     f"[dim]{provider}[/dim]",
                     f"[dim]{provider_total_calls:,}[/dim]",
                     f"[dim]{provider_total_input:,}[/dim]",
                     f"[dim]{provider_total_output:,}[/dim]",
                     f"[dim]{provider_total_tokens:,}[/dim]",
                     f"[dim]{provider_total_cost:.6f}[/dim]",
                     style="dim",
                     end_section=(provider != sorted_providers[-1]) # Add separator line unless it's the last provider
                 )
            elif provider != sorted_providers[-1]:
                 # Add separator if only one model but not the last provider
                 summary_table.add_row(end_section=True)


            grand_total_cost += provider_total_cost
            grand_total_calls += provider_total_calls
            grand_total_input += provider_total_input
            grand_total_output += provider_total_output
            grand_total_tokens += provider_total_tokens

        # Update footer values (need to re-assign list for footer update)
        summary_table.columns[2].footer = f"{grand_total_calls:,}"
        summary_table.columns[3].footer = f"{grand_total_input:,}"
        summary_table.columns[4].footer = f"{grand_total_output:,}"
        summary_table.columns[5].footer = f"{grand_total_tokens:,}"
        summary_table.columns[6].footer = f"{grand_total_cost:.6f}"

        output.print(summary_table)
        
        # Display cost limit information if set
        if self.limit is not None:
            limit_color = "green" if self.total_cost < self.limit else "red"
            output.print(f"[{limit_color}]Cost limit: ${self.limit:.2f} | Current usage: ${self.total_cost:.2f} ({(self.total_cost/self.limit*100):.1f}%)[/{limit_color}]")
        
        output.print() # Add a blank line after the table

    def display_costs(self, console: Optional[Console] = None, title: str = "Total Demo Cost Summary"):
        """
        Alias for display_summary for backward compatibility.
        
        This method provides a backward-compatible interface for legacy code
        that might be calling display_costs() instead of display_summary().
        
        Args:
            console: Console instance to use for display
            title: Custom title for the summary report
            
        Returns:
            Same as display_summary()
        """
        return self.display_summary(console_instance=console, title=title)
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/local_text_tools.py:
--------------------------------------------------------------------------------

```python
# ultimate_mcp_server/tools/local_text_tools.py
"""
Standalone, secure wrappers around local CLI text-processing utilities (rg, awk, sed, jq)
for the Ultimate MCP Server framework.

This module provides controlled execution of common command-line text tools within
a defined workspace, incorporating enhanced security checks, resource limits, performance
optimizations like caching and concurrency control, and robust error handling.

Key Features:
*   Standalone Functions: Tools exposed as individual async functions.
*   Workspace Confinement: All file/directory operations strictly enforced within WORKSPACE_DIR.
*   Security Hardening: Validates arguments against shell metacharacters, subshells,
    redirection, path traversal, and specific unsafe flags (e.g., `sed -i`). Uses `prctl`
    and `setsid` on Linux for further sandboxing.
*   Resource Limiting: Applies CPU time and memory limits (Unix only).
*   Input Flexibility: Handles input via stdin (`input_data`) or file/directory targets
    specified within the command arguments (`args_str`). Stdin size is capped.
*   Standardized Output: Returns consistent `ToolResult` TypedDict with stdout, stderr,
    exit_code, success status, timing, and truncation info. Output is truncated.
*   Command Integrity: Checks command availability and checksums (lazily, with re-verification).
    Enforces minimum versions.
*   Performance: Includes per-tool concurrency limits and optional disk-based caching
    of identical command invocations.
*   LLM-Friendly: Detailed docstrings, structured errors with codes, optional streaming modes,
    and a `dry_run` option enhance usability for AI agents.
"""

import asyncio
import hashlib
import json
import os
import random
import re
import shlex
import shutil
import sys
import textwrap
import time
from dataclasses import (
    dataclass,
)  # Keep field for potential future use, though not used for update now
from enum import Enum
from pathlib import Path
from typing import (
    AsyncIterator,
    Dict,
    List,
    Optional,
    Sequence,
    Tuple,
    TypedDict,
    cast,
)

import aiofiles  # Needed for async checksum

from ultimate_mcp_server.exceptions import ToolExecutionError, ToolInputError
from ultimate_mcp_server.tools.base import with_error_handling, with_tool_metrics
from ultimate_mcp_server.utils import get_logger

logger = get_logger("ultimate_mcp_server.tools.local_text")

# Conditional import for resource limiting and sandboxing
try:
    import resource  # type: ignore [import-not-found]

    HAS_RESOURCE = True
except ImportError:
    HAS_RESOURCE = False
    logger.debug("`resource` module not found (likely non-Unix). Resource limits disabled.")

try:
    import prctl  # type: ignore [import-not-found]

    HAS_PRCTL = True
except ImportError:
    HAS_PRCTL = False
    logger.debug("`prctl` module not found (likely non-Linux). Advanced sandboxing disabled.")

# --------------------------------------------------------------------------- #
# Configuration (Loaded from Environment or Defaults)
# --------------------------------------------------------------------------- #

#: Maximum bytes returned in stdout / stderr before truncation
MAX_OUTPUT_BYTES = int(os.getenv("MCP_TEXT_MAX_OUTPUT", "1_000_000"))  # 1 MiB default
#: Maximum bytes accepted via stdin (`input_data`)
MAX_INPUT_BYTES = int(os.getenv("MCP_TEXT_MAX_INPUT", "25_000_000"))  # 25 MiB default
#: Maximum seconds a command may run before being terminated
DEFAULT_TIMEOUT = float(os.getenv("MCP_TEXT_TIMEOUT", "30"))
#: Workspace root – **all file/directory arguments must resolve inside this tree**
try:
    WORKSPACE_DIR_STR = os.getenv("MCP_TEXT_WORKSPACE", ".")
    WORKSPACE_DIR = Path(WORKSPACE_DIR_STR).resolve()
    if not WORKSPACE_DIR.is_dir():
        logger.warning(
            f"MCP_TEXT_WORKSPACE ('{WORKSPACE_DIR_STR}' -> '{WORKSPACE_DIR}') is not a directory. Defaulting to current."
        )
        WORKSPACE_DIR = Path(".").resolve()
except Exception as e:
    logger.error(
        f"Error resolving MCP_TEXT_WORKSPACE ('{WORKSPACE_DIR_STR}'): {e}. Defaulting to current."
    )
    WORKSPACE_DIR = Path(".").resolve()
logger.info(f"LocalTextTools workspace confined to: {WORKSPACE_DIR}")

#: Disk cache directory for command results
CACHE_DIR_STR = os.getenv("MCP_TEXT_CACHE_DIR", "~/.cache/ultimate_mcp_server/local_text_tools")
CACHE_DIR = Path(CACHE_DIR_STR).expanduser().resolve()
CACHE_ENABLED = os.getenv("MCP_TEXT_CACHE_ENABLED", "true").lower() == "true"
CACHE_MAX_SIZE_MB = int(os.getenv("MCP_TEXT_CACHE_MAX_MB", "500"))
CACHE_MAX_AGE_DAYS = int(os.getenv("MCP_TEXT_CACHE_MAX_AGE_DAYS", "7"))

if CACHE_ENABLED:
    try:
        CACHE_DIR.mkdir(parents=True, exist_ok=True)
        logger.info(f"Using command invocation cache directory: {CACHE_DIR}")
    except OSError as e:
        logger.error(
            f"Failed to create command cache directory {CACHE_DIR}: {e}. Caching disabled."
        )
        CACHE_ENABLED = False  # Disable cache if directory fails

# Concurrency limits per command
DEFAULT_CONCURRENCY = 4
CONCURRENCY_LIMITS = {
    "rg": int(os.getenv("MCP_TEXT_CONCURRENCY_RG", "8")),
    "awk": int(os.getenv("MCP_TEXT_CONCURRENCY_AWK", str(DEFAULT_CONCURRENCY))),
    "sed": int(os.getenv("MCP_TEXT_CONCURRENCY_SED", str(DEFAULT_CONCURRENCY))),
    "jq": int(os.getenv("MCP_TEXT_CONCURRENCY_JQ", str(DEFAULT_CONCURRENCY))),
}

# Forbidden shell metacharacters (pre-compiled set for efficiency)
_FORBIDDEN_CHARS_SET = frozenset("&;`|><$()")


# --------------------------------------------------------------------------- #
# Error Codes Enum
# --------------------------------------------------------------------------- #
class ToolErrorCode(str, Enum):
    """Machine-readable error codes for local text tools."""

    PATH_TRAVERSAL = "PATH_TRAVERSAL"
    ABS_PATH_FORBIDDEN = "ABS_PATH_FORBIDDEN"
    WORKSPACE_VIOLATION = "WORKSPACE_VIOLATION"
    FORBIDDEN_FLAG = "FORBIDDEN_FLAG"
    FORBIDDEN_CHAR = "FORBIDDEN_CHAR"
    CMD_SUBSTITUTION = "CMD_SUBSTITUTION"  # Often related to $() or ``
    INVALID_ARGS = "INVALID_ARGS"
    INPUT_TOO_LARGE = "INPUT_TOO_LARGE"
    INVALID_JSON_INPUT = "INVALID_JSON_INPUT"
    CMD_NOT_FOUND = "CMD_NOT_FOUND"
    TIMEOUT = "TIMEOUT"
    EXEC_ERROR = "EXEC_ERROR"
    COMMUNICATION_ERROR = "COMMUNICATION_ERROR"
    CHECKSUM_MISMATCH = "CHECKSUM_MISMATCH"
    VERSION_TOO_OLD = "VERSION_TOO_OLD"
    UNEXPECTED_FAILURE = "UNEXPECTED_FAILURE"
    CACHE_ERROR = "CACHE_ERROR"


# --------------------------------------------------------------------------- #
# Result Schema (TypedDict)
# --------------------------------------------------------------------------- #
class ToolResult(TypedDict, total=False):
    """Standardized result structure for local text tool executions."""

    stdout: Optional[str]  # Decoded standard output (potentially truncated)
    stderr: Optional[str]  # Decoded standard error (potentially truncated)
    exit_code: Optional[int]  # Process exit code
    success: bool  # True if execution considered successful (depends on tool/retcode)
    error: Optional[str]  # Human-readable error message if success is False
    error_code: Optional[ToolErrorCode]  # Machine-readable error code if success is False
    duration: float  # Execution duration in seconds
    stdout_truncated: bool  # True if stdout was truncated
    stderr_truncated: bool  # True if stderr was truncated
    cached_result: bool  # True if this result was served from cache
    dry_run_cmdline: Optional[List[str]]  # Populated only if dry_run=True


# --------------------------------------------------------------------------- #
# Command Metadata & Discovery
# --------------------------------------------------------------------------- #


@dataclass(slots=True, frozen=True)  # Make truly immutable
class CommandMeta:
    """Metadata for a command-line tool."""

    name: str
    path: Optional[Path] = None  # Store the resolved absolute path
    checksum: Optional[str] = None  # SHA-256 checksum of the executable (calculated lazily)
    mtime: Optional[float] = None  # Last modification time (for checksum re-verification)
    version: Optional[tuple[int, ...]] = None  # Parsed version tuple (e.g., (13, 0, 1))
    forbidden_flags: frozenset[str] = frozenset()  # Flags disallowed for security
    readonly: bool = True  # True if the command should not modify the filesystem
    min_version: Optional[tuple[int, ...]] = None  # Minimum required version tuple


# Store CommandMeta objects, keyed by command name
_COMMAND_METADATA: Dict[str, CommandMeta] = {
    "rg": CommandMeta("rg", min_version=(13, 0, 0)),  # Example: Require ripgrep >= 13.0.0
    "awk": CommandMeta(
        "awk", forbidden_flags=frozenset({"-i", "--in-place"})
    ),  # AWK in-place is less common but exists (gawk)
    "sed": CommandMeta("sed", forbidden_flags=frozenset({"-i", "--in-place"})),
    "jq": CommandMeta("jq", min_version=(1, 6)),  # Example: Require jq >= 1.6
}
_COMMAND_VERSIONS_CACHE: Dict[str, Optional[tuple[int, ...]]] = {}  # Cache parsed versions
_checksum_lock = asyncio.Lock()  # Lock for lazy checksum calculation
_version_lock = asyncio.Lock()  # Lock for lazy version checking


async def _calculate_sha256sum_async(path: Path, chunk: int = 262_144) -> str:
    """Asynchronously calculates SHA256 checksum using aiofiles."""
    h = hashlib.sha256()
    try:
        async with aiofiles.open(path, "rb") as fh:
            while True:
                blk = await fh.read(chunk)
                if not blk:
                    break
                h.update(blk)
        return h.hexdigest()
    except OSError as e:
        logger.error(f"Failed to calculate SHA256 for {path}: {e}")
        return "error_calculating_checksum"


async def _get_command_checksum(meta: CommandMeta) -> Optional[str]:
    """Lazily calculates and caches the command checksum, verifying mtime."""
    global _COMMAND_METADATA  # Allow modification (replacing entry)
    if not meta.path:
        return None  # Cannot checksum if path unknown

    async with _checksum_lock:
        # Re-fetch meta inside lock in case it was updated by another coroutine
        meta_locked = _COMMAND_METADATA.get(meta.name)
        if not meta_locked or not meta_locked.path:
            return None

        needs_recalc = False
        current_mtime = None
        try:
            # Use asyncio.to_thread for potentially blocking stat
            stat_res = await asyncio.to_thread(meta_locked.path.stat)
            current_mtime = stat_res.st_mtime
            if (
                meta_locked.checksum is None
                or meta_locked.mtime is None
                or current_mtime != meta_locked.mtime
            ):
                needs_recalc = True
        except OSError as e:
            logger.warning(
                f"Could not stat {meta_locked.path} for checksum verification: {e}. Recalculating."
            )
            needs_recalc = True  # Force recalc if stat fails

        if needs_recalc:
            logger.debug(
                f"Calculating checksum for {meta.name} (mtime changed or first calculation)..."
            )
            new_checksum = await _calculate_sha256sum_async(meta_locked.path)
            _COMMAND_METADATA[meta.name] = CommandMeta(
                name=meta_locked.name,
                path=meta_locked.path,
                checksum=new_checksum,
                mtime=current_mtime,  # Store the mtime when checksum was calculated
                forbidden_flags=meta_locked.forbidden_flags,
                readonly=meta_locked.readonly,
                min_version=meta_locked.min_version,
            )
            logger.debug(f"Updated checksum for {meta.name}: {new_checksum[:12]}...")
            return new_checksum
        else:
            # Return cached checksum
            return meta_locked.checksum


async def _parse_version(cmd_path: Path) -> Optional[tuple[int, ...]]:
    """Runs '<tool> --version' and parses semantic version tuple."""
    try:
        proc = await asyncio.create_subprocess_exec(
            str(cmd_path),
            "--version",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=5.0)
        if proc.returncode != 0:
            logger.warning(
                f"Command '{cmd_path.name} --version' failed with code {proc.returncode}: {stderr_b.decode(errors='ignore')[:100]}"
            )
            return None

        output = stdout_b.decode(errors="ignore").strip()
        # Common version patterns (adjust as needed for specific tools)
        match = re.search(r"(\d+)\.(\d+)(?:\.(\d+))?", output)
        if match:
            major = int(match.group(1))
            minor = int(match.group(2))
            patch = int(match.group(3) or 0)  # Default patch to 0
            return (major, minor, patch)
        else:
            logger.warning(f"Could not parse version from '{cmd_path.name}' output: {output[:100]}")
            return None
    except asyncio.TimeoutError:
        logger.warning(f"Timeout getting version for '{cmd_path.name}'.")
        return None
    except Exception as e:
        logger.error(f"Error getting version for '{cmd_path.name}': {e}")
        return None


async def _check_command_version(meta: CommandMeta) -> None:
    """Checks if the command meets the minimum version requirement."""
    global _COMMAND_VERSIONS_CACHE  # Allow modification
    if not meta.path or not meta.min_version:
        return  # Skip check if no path or no minimum defined

    async with _version_lock:
        # Check cache first
        if meta.name in _COMMAND_VERSIONS_CACHE:
            actual_version = _COMMAND_VERSIONS_CACHE[meta.name]
        else:
            # Parse version if not cached
            actual_version = await _parse_version(meta.path)
            _COMMAND_VERSIONS_CACHE[meta.name] = actual_version  # Cache result (even None)

    if actual_version is None:
        logger.warning(
            f"Could not determine version for '{meta.name}'. Skipping minimum version check."
        )
        return

    if actual_version < meta.min_version:
        actual_str = ".".join(map(str, actual_version))
        required_str = ".".join(map(str, meta.min_version))
        raise ToolExecutionError(
            f"Command '{meta.name}' version ({actual_str}) is older than required minimum ({required_str}). Please update.",
            error_code=ToolErrorCode.VERSION_TOO_OLD,
            details={"required": required_str, "actual": actual_str},
        )
    else:
        logger.debug(
            f"Version check passed for {meta.name} (found {''.join(map(str, actual_version))}, required >= {''.join(map(str, meta.min_version))})"
        )


def _initial_command_discovery() -> None:
    """Finds commands and stores paths. Logs warnings for missing commands."""
    global _COMMAND_METADATA  # Modify the global dict
    missing: list[str] = []
    updated_metadata = {}

    for name, meta in _COMMAND_METADATA.items():
        exe_path_str = shutil.which(name)
        if exe_path_str is None:
            missing.append(name)
            # Keep original meta without path if not found
            updated_metadata[name] = CommandMeta(
                name=meta.name,
                path=None,
                checksum=None,
                mtime=None,
                version=None,  # Reset dynamic fields
                forbidden_flags=meta.forbidden_flags,
                readonly=meta.readonly,
                min_version=meta.min_version,
            )
            continue

        resolved_path = Path(exe_path_str).resolve()
        current_mtime = None
        try:
            # Use sync stat here, module load time is acceptable
            current_mtime = resolved_path.stat().st_mtime
        except OSError as e:
            logger.warning(f"Could not stat {resolved_path} during initial discovery: {e}")

        # Create updated CommandMeta with path and mtime (checksum/version are lazy)
        updated_meta = CommandMeta(
            name=meta.name,
            path=resolved_path,
            checksum=None,  # Lazy loaded
            mtime=current_mtime,
            # Keep original version/checksum null, they are loaded async later
            version=None,
            forbidden_flags=meta.forbidden_flags,
            readonly=meta.readonly,
            min_version=meta.min_version,
        )
        updated_metadata[name] = updated_meta
        logger.debug(f"{name}: found at {resolved_path}")

    _COMMAND_METADATA = updated_metadata  # Replace global dict

    if missing:
        logger.warning(
            "Missing local text CLI tools: %s. Corresponding functions will fail.",
            ", ".join(missing),
            # emoji_key="warning", # Assuming logger supports this extra field
        )


# Run discovery when the module is loaded
_initial_command_discovery()


# --------------------------------------------------------------------------- #
# Argument Validation Placeholder
# --------------------------------------------------------------------------- #


def _validate_arguments(cmd_name: str, argv: List[str]) -> None:
    """
    Validates command arguments for security and workspace compliance.
    (Placeholder implementation - refine based on specific security needs)
    """
    meta = _COMMAND_METADATA.get(cmd_name)
    if not meta:
        # This should generally not happen if called after discovery
        raise ToolInputError(
            f"Metadata not found for command '{cmd_name}' during validation.",
            param_name="cmd_name",
            details={"unexpected_failure": True},
        )

    forbidden_flags = meta.forbidden_flags

    for i, arg in enumerate(argv):
        # Check for forbidden flags
        if arg in forbidden_flags:
            raise ToolInputError(
                f"Forbidden flag '{arg}' is not allowed for command '{cmd_name}'.",
                param_name="args_str",
                details={
                    "argument": arg,
                    "index": i,
                    "command": cmd_name,
                    "error_code": ToolErrorCode.FORBIDDEN_FLAG.value,
                },
            )

        # Command-specific validation rules
        if cmd_name == "rg":
            # For rg, allow regex metacharacters ( (), |, ?, +, *, { }, [, ], ^, $, . )
            forbidden_chars = {
                char for char in arg if char in _FORBIDDEN_CHARS_SET and char not in "()|"
            }
            if forbidden_chars:
                raise ToolInputError(
                    f"Argument '{arg}' contains forbidden shell metacharacter(s): {', '.join(sorted(forbidden_chars))}",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "forbidden_chars": sorted(list(forbidden_chars)),
                        "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                    },
                )

            # Basic check for command substitution patterns (can be complex)
            if "`" in arg or "$(" in arg:
                raise ToolInputError(
                    f"Argument '{arg}' seems to contain command substitution, which is forbidden.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.CMD_SUBSTITUTION.value,
                    },
                )

            continue  # Skip further checks for rg

        elif cmd_name == "jq":
            # For jq, allow (, ), |, > and < characters as they're essential for the query language
            forbidden_chars = {
                char for char in arg if char in _FORBIDDEN_CHARS_SET and char not in "()|}><"
            }
            if forbidden_chars:
                raise ToolInputError(
                    f"Argument '{arg}' contains forbidden shell metacharacter(s): {', '.join(sorted(forbidden_chars))}",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "forbidden_chars": sorted(list(forbidden_chars)),
                        "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                    },
                )

            # Basic check for command substitution patterns (can be complex)
            if "`" in arg or "$(" in arg:
                raise ToolInputError(
                    f"Argument '{arg}' seems to contain command substitution, which is forbidden.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.CMD_SUBSTITUTION.value,
                    },
                )

            continue  # Skip further checks for jq

        elif cmd_name == "awk":
            # For awk, allow $ character (for field references), / (for regex patterns),
            # and other characters needed for awk scripts like (, ), ;
            # Allow '>' and '<' for comparisons in AWK
            forbidden_chars = {
                char for char in arg if char in _FORBIDDEN_CHARS_SET and char not in "$/();{}<>"
            }
            
            # Special check for file redirection (> followed by a string in quotes)
            # Pattern detects constructs like: print $1 > "file.txt" or print $1 > 'file.txt'
            if re.search(r'>\s*["\']', arg) or re.search(r'print.*>\s*["\']', arg):
                raise ToolInputError(
                    f"Argument '{arg}' appears to contain file redirection, which is forbidden.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                    },
                )
                
            if forbidden_chars:
                raise ToolInputError(
                    f"Argument '{arg}' contains forbidden shell metacharacter(s): {', '.join(sorted(forbidden_chars))}",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "forbidden_chars": sorted(list(forbidden_chars)),
                        "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                    },
                )

            # Basic check for command substitution patterns (can be complex)
            if "`" in arg or "$(" in arg:
                raise ToolInputError(
                    f"Argument '{arg}' seems to contain command substitution, which is forbidden.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.CMD_SUBSTITUTION.value,
                    },
                )

            # Don't treat awk patterns like /pattern/ as absolute paths
            if arg.startswith("/") and not (arg.count("/") >= 2 and arg[1:].find("/") > 0):
                # Still check for absolute paths that aren't regex patterns
                # A regex pattern would typically have at least one more / after the first character
                try:
                    # Resolve the path relative to the workspace *without* accessing filesystem yet
                    # Use os.path.normpath and os.path.join for basic checks before full resolve
                    norm_path = os.path.normpath(os.path.join(str(WORKSPACE_DIR), arg))
                    if not norm_path.startswith(str(WORKSPACE_DIR)):
                        raise ToolInputError(
                            f"Path traversal or absolute path '{arg}' is forbidden.",
                            param_name="args_str",
                            details={
                                "argument": arg,
                                "index": i,
                                "error_code": ToolErrorCode.PATH_TRAVERSAL.value,
                            },
                        )
                except Exception as e:
                    logger.error(f"Error checking path '{arg}': {e}")
                    raise ToolInputError(
                        f"Invalid path argument '{arg}'.",
                        param_name="args_str",
                        details={
                            "argument": arg,
                            "index": i,
                            "error_code": ToolErrorCode.INVALID_ARGS.value,
                        },
                    ) from e

            continue  # Skip further checks for awk

        elif cmd_name == "sed":
            # For sed, allow / (for regex patterns), as well as |, (, ) for sed expressions
            forbidden_chars = {
                char for char in arg if char in _FORBIDDEN_CHARS_SET and char not in "/|()"
            }
            if forbidden_chars:
                raise ToolInputError(
                    f"Argument '{arg}' contains forbidden shell metacharacter(s): {', '.join(sorted(forbidden_chars))}",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "forbidden_chars": sorted(list(forbidden_chars)),
                        "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                    },
                )

            # Basic check for command substitution patterns (can be complex)
            if "`" in arg or "$(" in arg:
                raise ToolInputError(
                    f"Argument '{arg}' seems to contain command substitution, which is forbidden.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.CMD_SUBSTITUTION.value,
                    },
                )

            # Don't treat sed patterns like /pattern/ as absolute paths
            if (
                arg.startswith("/")
                and arg != "/"
                and "/ " not in arg
                and not (arg.count("/") >= 2 and arg[1:].find("/") > 0)
            ):
                # Still check for absolute paths that aren't regex patterns
                try:
                    # Resolve the path relative to the workspace *without* accessing filesystem yet
                    # Use os.path.normpath and os.path.join for basic checks before full resolve
                    norm_path = os.path.normpath(os.path.join(str(WORKSPACE_DIR), arg))
                    if not norm_path.startswith(str(WORKSPACE_DIR)):
                        raise ToolInputError(
                            f"Path traversal or absolute path '{arg}' is forbidden.",
                            param_name="args_str",
                            details={
                                "argument": arg,
                                "index": i,
                                "error_code": ToolErrorCode.PATH_TRAVERSAL.value,
                            },
                        )
                except Exception as e:
                    logger.error(f"Error checking path '{arg}': {e}")
                    raise ToolInputError(
                        f"Invalid path argument '{arg}'.",
                        param_name="args_str",
                        details={
                            "argument": arg,
                            "index": i,
                            "error_code": ToolErrorCode.INVALID_ARGS.value,
                        },
                    ) from e

            continue  # Skip further checks for sed

        # Standard checks for all other commands
        # Check for forbidden characters
        if any(char in _FORBIDDEN_CHARS_SET for char in arg):
            # Be more specific about which char if possible
            found_chars = {char for char in arg if char in _FORBIDDEN_CHARS_SET}
            raise ToolInputError(
                f"Argument '{arg}' contains forbidden shell metacharacter(s): {', '.join(sorted(found_chars))}",
                param_name="args_str",
                details={
                    "argument": arg,
                    "index": i,
                    "forbidden_chars": sorted(list(found_chars)),
                    "error_code": ToolErrorCode.FORBIDDEN_CHAR.value,
                },
            )

        # Basic check for command substitution patterns (can be complex)
        if "`" in arg or "$(" in arg:
            raise ToolInputError(
                f"Argument '{arg}' seems to contain command substitution, which is forbidden.",
                param_name="args_str",
                details={
                    "argument": arg,
                    "index": i,
                    "error_code": ToolErrorCode.CMD_SUBSTITUTION.value,
                },
            )

        # --- Path Validation ---
        # Heuristic: Does the argument look like a path that needs checking?
        # This is tricky. We might only check args that aren't flags (don't start with '-')
        # or args known to take paths for specific commands.
        # For simplicity here, we check any arg that contains '/' or could be a filename.
        # More robust: parse args properly (difficult) or check based on tool context.
        potential_path = False
        if not arg.startswith("-") and (os.sep in arg or "." in arg or Path(arg).suffix):
            potential_path = True
            # Allow '-' as a special argument representing stdin/stdout
            if arg == "-":
                potential_path = False

        if potential_path:
            try:
                # Disallow absolute paths
                if Path(arg).is_absolute():
                    raise ToolInputError(
                        f"Absolute paths like '{arg}' are forbidden. Use paths relative to the workspace.",
                        param_name="args_str",
                        details={
                            "argument": arg,
                            "index": i,
                            "error_code": ToolErrorCode.ABS_PATH_FORBIDDEN.value,
                        },
                    )

                # Resolve the path relative to the workspace *without* accessing filesystem yet
                # Use os.path.normpath and os.path.join for basic checks before full resolve
                norm_path = os.path.normpath(os.path.join(str(WORKSPACE_DIR), arg))

                # Check for path traversal using normpath result
                if not norm_path.startswith(str(WORKSPACE_DIR)):
                    # Check specifically for '..' components that might escape
                    if ".." in Path(arg).parts:
                        raise ToolInputError(
                            f"Path traversal ('..') is forbidden in argument '{arg}'.",
                            param_name="args_str",
                            details={
                                "argument": arg,
                                "index": i,
                                "error_code": ToolErrorCode.PATH_TRAVERSAL.value,
                            },
                        )
                    else:
                        # Generic workspace violation if normpath doesn't match prefix (e.g., symlinks handled later)
                        raise ToolInputError(
                            f"Argument '{arg}' resolves outside the allowed workspace '{WORKSPACE_DIR}'.",
                            param_name="args_str",
                            details={
                                "argument": arg,
                                "index": i,
                                "resolved_norm": norm_path,
                                "error_code": ToolErrorCode.WORKSPACE_VIOLATION.value,
                            },
                        )

                # More robust check: Resolve the path fully and check again
                # This *does* access the filesystem but ensures symlinks are handled
                resolved_arg_path = (WORKSPACE_DIR / arg).resolve()
                if not resolved_arg_path.is_relative_to(WORKSPACE_DIR):
                    raise ToolInputError(
                        f"Argument '{arg}' resolves outside the allowed workspace '{WORKSPACE_DIR}' (checked after resolving symlinks).",
                        param_name="args_str",
                        details={
                            "argument": arg,
                            "index": i,
                            "resolved_absolute": str(resolved_arg_path),
                            "error_code": ToolErrorCode.WORKSPACE_VIOLATION.value,
                        },
                    )

            except OSError as e:
                # Ignore errors resolving paths that might not exist yet (e.g., output files for some tools)
                # But log a warning. More strict validation could forbid non-existent input paths.
                logger.debug(
                    f"Could not fully resolve potential path argument '{arg}': {e}. Assuming OK if basic checks passed."
                )
            except ToolInputError:
                raise  # Re-raise our specific validation errors
            except Exception as e:
                logger.error(f"Unexpected error validating argument '{arg}': {e}", exc_info=True)
                raise ToolInputError(
                    f"Unexpected error during validation of argument '{arg}'.",
                    param_name="args_str",
                    details={
                        "argument": arg,
                        "index": i,
                        "error_code": ToolErrorCode.UNEXPECTED_FAILURE.value,
                    },
                ) from e


# --------------------------------------------------------------------------- #
# Invocation Caching (Disk-based LRU-like)
# --------------------------------------------------------------------------- #


def _get_cache_key(cmd_name: str, argv: Sequence[str], input_data_bytes: Optional[bytes]) -> str:
    """Creates a hash key based on command, args, and input data bytes."""
    hasher = hashlib.sha256()
    hasher.update(cmd_name.encode())
    for arg in argv:
        hasher.update(arg.encode())
    if input_data_bytes is not None:
        hasher.update(b"\x00\x01")  # Separator for input data
        hasher.update(input_data_bytes)
    else:
        hasher.update(b"\x00\x00")  # Separator for no input data
    return hasher.hexdigest()


def _get_cache_path(key: str) -> Path:
    """Gets the file path for a cache key."""
    # Simple structure: cache_dir / key_prefix / key.json
    prefix = key[:2]
    return CACHE_DIR / prefix / f"{key}.json"


# --- Async Cache Get/Put using asyncio.to_thread for OS file IO ---
async def _cache_get_async(key: str) -> Optional[ToolResult]:
    """Asynchronously gets a result from the disk cache."""
    if not CACHE_ENABLED:
        return None
    cache_path = _get_cache_path(key)
    try:
        if await asyncio.to_thread(cache_path.exists):
            # Check cache entry age
            stat_res = await asyncio.to_thread(cache_path.stat)
            age_seconds = time.time() - stat_res.st_mtime
            if age_seconds > (CACHE_MAX_AGE_DAYS * 24 * 3600):
                logger.debug(
                    f"Cache entry {key[:8]} expired (age {age_seconds:.0f}s > {CACHE_MAX_AGE_DAYS}d). Removing."
                )
                try:
                    await asyncio.to_thread(cache_path.unlink)
                except OSError as e:
                    logger.warning(f"Failed to remove expired cache file {cache_path}: {e}")
                return None

            # Read cached data (aiofiles is okay for read/write content)
            async with aiofiles.open(cache_path, mode="r", encoding="utf-8") as f:
                content = await f.read()
            data = json.loads(content)
            # --- Type Check and Reconstruction ---
            required_keys = {
                "stdout": None,
                "stderr": None,
                "exit_code": None,
                "success": False,
                "duration": 0.0,
                "stdout_truncated": False,
                "stderr_truncated": False,
                "error": None,
                "error_code": None,
                "cached_result": True,  # Set flag here
            }
            validated_data = {}
            for k, default_val in required_keys.items():
                validated_data[k] = data.get(k, default_val)

            # Ensure error_code is None or a valid ToolErrorCode enum member
            raw_error_code = validated_data.get("error_code")
            if raw_error_code is not None:
                try:
                    validated_data["error_code"] = ToolErrorCode(raw_error_code)
                except ValueError:
                    logger.warning(
                        f"Invalid error_code '{raw_error_code}' found in cache for key {key[:8]}. Setting to None."
                    )
                    validated_data["error_code"] = None

            logger.info(f"Cache HIT for key {key[:8]}.")
            # Use cast to satisfy type checker, assuming validation ensures structure
            return cast(ToolResult, validated_data)
        else:
            # logger.debug(f"Cache MISS for key {key[:8]}.")
            return None
    except (OSError, json.JSONDecodeError, TypeError) as e:
        logger.warning(f"Cache read error for key {key[:8]}: {e}. Treating as miss.")
        # Attempt to remove potentially corrupt file
        try:
            if await asyncio.to_thread(cache_path.exists):
                await asyncio.to_thread(cache_path.unlink)
        except OSError:
            pass
        return None
    except Exception as e:
        logger.error(f"Unexpected cache get error for key {key[:8]}: {e}", exc_info=True)
        return None


async def _cache_put_async(key: str, result: ToolResult):
    """Asynchronously puts a result into the disk cache."""
    if not CACHE_ENABLED:
        return
    cache_path = _get_cache_path(key)
    try:
        # Ensure parent directory exists
        await asyncio.to_thread(cache_path.parent.mkdir, parents=True, exist_ok=True)

        # Write data as JSON
        result_to_write = result.copy()
        result_to_write.pop("cached_result", None)  # Don't store cache flag itself
        # Ensure enum is converted to string for JSON
        if result_to_write.get("error_code") is not None:
            result_to_write["error_code"] = result_to_write["error_code"].value

        json_data = json.dumps(result_to_write, indent=2)  # Pretty print for readability
        async with aiofiles.open(cache_path, mode="w", encoding="utf-8") as f:
            await f.write(json_data)
        logger.debug(f"Cache PUT successful for key {key[:8]}.")

        # Trigger cleanup check periodically (simple random approach)
        if random.random() < 0.01:  # ~1% chance on write
            asyncio.create_task(_cleanup_cache_lru())
    except (OSError, TypeError, json.JSONDecodeError) as e:
        logger.error(f"Cache write error for key {key[:8]}: {e}")
    except Exception as e:
        logger.error(f"Unexpected cache put error for key {key[:8]}: {e}", exc_info=True)


# --- Cache Cleanup (LRU-like based on mtime) ---
async def _cleanup_cache_lru():
    """Removes cache files exceeding max size or age."""
    if not CACHE_ENABLED:
        return
    logger.debug("Running cache cleanup...")
    try:
        files: List[Tuple[Path, float, int]] = []
        total_size = 0
        now = time.time()
        max_age_seconds = CACHE_MAX_AGE_DAYS * 24 * 3600
        max_size_bytes = CACHE_MAX_SIZE_MB * 1024 * 1024

        # Use aiofiles.os.walk (designed for async iteration)
        # Note: aiofiles.os.walk might still be less performant than scandir + to_thread for large dirs
        async for root, _, filenames in aiofiles.os.walk(CACHE_DIR):
            for filename in filenames:
                if filename.endswith(".json"):
                    filepath = Path(root) / filename
                    try:
                        stat_res = await asyncio.to_thread(filepath.stat)
                        files.append((filepath, stat_res.st_mtime, stat_res.st_size))
                        total_size += stat_res.st_size
                    except OSError:
                        continue  # Skip files we can't stat

        # Sort by modification time (oldest first)
        files.sort(key=lambda x: x[1])

        removed_count = 0
        removed_size = 0
        current_total_size = total_size  # Keep track of size as we remove

        # Remove files based on age or if total size exceeds limit
        for filepath, mtime, size in files:
            age = now - mtime
            # Check size limit against potentially reduced total size
            over_size_limit = current_total_size > max_size_bytes
            over_age_limit = age > max_age_seconds

            if over_age_limit or over_size_limit:
                try:
                    await asyncio.to_thread(filepath.unlink)
                    removed_count += 1
                    removed_size += size
                    current_total_size -= size
                    logger.debug(
                        f"Cache cleanup removed: {filepath.name} (Reason: {'Age' if over_age_limit else 'Size'})"
                    )
                except OSError as e:
                    logger.warning(f"Cache cleanup failed to remove {filepath}: {e}")
            # Optimization: If we are no longer over the size limit, we only need to continue checking for age limit
            elif not over_size_limit:
                # We must continue iterating to check older files for age limit.
                pass

        if removed_count > 0:
            logger.info(
                f"Cache cleanup complete. Removed {removed_count} files ({removed_size / (1024 * 1024):.1f} MB). Current size: {current_total_size / (1024 * 1024):.1f} MB"
            )
        else:
            logger.debug(
                f"Cache cleanup complete. No files removed. Current size: {current_total_size / (1024 * 1024):.1f} MB"
            )

    except Exception as e:
        logger.error(f"Error during cache cleanup: {e}", exc_info=True)


# --------------------------------------------------------------------------- #
# Resource Limiting Function (Module Scope)
# --------------------------------------------------------------------------- #


def _limit_resources(timeout: float, cmd_name: Optional[str] = None) -> None:
    """Sets resource limits for the child process (Unix only), potentially command-specific."""
    try:
        if HAS_RESOURCE:
            # CPU seconds (add 1s buffer)
            cpu_limit = int(timeout) + 1
            resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit, cpu_limit))

            # Address-space (bytes) - Virtual Memory
            soft_as_str = os.getenv("MCP_TEXT_RLIMIT_AS", "2_147_483_648")  # 2GiB default
            hard_as_str = os.getenv(
                "MCP_TEXT_RLIMIT_AS_HARD", str(int(soft_as_str) + 100 * 1024 * 1024)
            )  # +100MiB buffer default
            try:
                soft_as = int(soft_as_str)
                hard_as = int(hard_as_str)
                if soft_as > 0 and hard_as > 0:  # Allow disabling with 0 or negative
                    resource.setrlimit(resource.RLIMIT_AS, (soft_as, hard_as))
                    logger.debug(
                        f"Applied resource limit: AS={soft_as / (1024**3):.1f}GiB (soft), {hard_as / (1024**3):.1f}GiB (hard)"
                    )
            except ValueError:
                logger.warning(
                    "Invalid value for MCP_TEXT_RLIMIT_AS or MCP_TEXT_RLIMIT_AS_HARD. Skipping AS limit."
                )
            except resource.error as e:
                logger.warning(
                    f"Failed to set AS limit: {e}. Limit might be too low or too high for the system."
                )

            # No core dumps
            try:
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except resource.error as e:
                logger.warning(f"Failed to disable core dumps: {e}")  # May fail in containers

            # --- RLIMIT_NPROC --------------------------------------------------
            try:
                # honour optional global opt-out
                if os.getenv("MCP_TEXT_RLIMIT_NPROC_DISABLE", "").lower() == "true":
                    logger.debug("RLIMIT_NPROC: disabled via env flag")
                else:
                    soft_req = int(os.getenv("MCP_TEXT_RLIMIT_NPROC_SOFT", "4096"))
                    hard_req = int(os.getenv("MCP_TEXT_RLIMIT_NPROC_HARD", "8192"))
                    if cmd_name == "rg":
                        soft_req = int(os.getenv("MCP_TEXT_RLIMIT_NPROC_SOFT_RG", "16384"))
                        hard_req = int(os.getenv("MCP_TEXT_RLIMIT_NPROC_HARD_RG", "32768"))

                    cur_soft, cur_hard = resource.getrlimit(resource.RLIMIT_NPROC)

                    # translate "unlimited"/-1
                    req_soft = cur_soft if soft_req <= 0 else soft_req
                    req_hard = cur_hard if hard_req <= 0 else hard_req

                    # never *lower* an existing limit
                    new_soft = max(cur_soft, req_soft)
                    new_hard = max(cur_hard, req_hard)

                    # only call setrlimit if anything actually changes
                    if (new_soft, new_hard) != (cur_soft, cur_hard):
                        resource.setrlimit(resource.RLIMIT_NPROC, (new_soft, new_hard))
                        logger.debug(
                            f"RLIMIT_NPROC set to (soft={new_soft}, hard={new_hard}) "
                            f"(was {cur_soft}/{cur_hard}) for {cmd_name}"
                        )
            except (ValueError, resource.error) as e:
                logger.warning(f"RLIMIT_NPROC not applied: {e}")

            logger.debug(f"Applied resource limits: CPU={cpu_limit}s")  # Summary log

        if HAS_PRCTL and sys.platform == "linux":
            try:
                # Prevent privilege escalation
                prctl.set_no_new_privs(True)
                logger.debug("Applied prctl NO_NEW_PRIVS.")
            except Exception as e:  # prctl might raise various errors
                logger.warning(f"Failed to set prctl NO_NEW_PRIVS: {e}")

        # Run in new session to isolate from controlling terminal (if any)
        # This helps ensure signals (like Ctrl+C) don't propagate unexpectedly.
        if sys.platform != "win32" and hasattr(os, "setsid"):
            try:
                os.setsid()
                logger.debug("Process started in new session ID.")
            except OSError as e:
                logger.warning(f"Failed to call setsid: {e}")  # May fail if already session leader

    except Exception as exc:
        # Catch-all for any unexpected issue during limit setting
        logger.warning(f"Failed to apply resource limits/sandboxing: {exc}")


# --------------------------------------------------------------------------- #
# Helper Functions
# --------------------------------------------------------------------------- #


def _truncate(data: bytes) -> tuple[str, bool]:
    """Decodes bytes to string and truncates if necessary."""
    truncated = False
    if len(data) > MAX_OUTPUT_BYTES:
        data = data[:MAX_OUTPUT_BYTES]
        truncated = True
        # Try to decode truncated data, replacing errors
        decoded = data.decode("utf-8", errors="replace")
        # Add truncation marker AFTER decoding to avoid corrupting multibyte char
        decoded += "\n... (output truncated)"
    else:
        decoded = data.decode("utf-8", errors="replace")
    return decoded, truncated


def _is_json_or_json_lines(text: str) -> bool:
    """
    True  → text is a single JSON document        (json.loads ok)
    True  → text is newline-delimited JSON lines  (all lines load)
    False → otherwise
    """
    try:
        json.loads(text)
        return True                     # one big doc
    except json.JSONDecodeError:
        pass

    ok = True
    for ln in text.splitlines():
        ln = ln.strip()
        if not ln:                      # skip blanks
            continue
        try:
            json.loads(ln)
        except json.JSONDecodeError:
            ok = False
            break
    return ok


# --------------------------------------------------------------------------- #
# Secure Async Executor Core (with Caching, Checksum, Version Checks)
# --------------------------------------------------------------------------- #
# Dictionary of asyncio.Semaphore objects, one per command
_SEMAPHORES = {cmd: asyncio.Semaphore(limit) for cmd, limit in CONCURRENCY_LIMITS.items()}


async def _run_command_secure(
    cmd_name: str,
    args_str: str,
    *,
    input_data: Optional[str] = None,
    is_file_target: bool = False,
    is_dir_target: bool = False,
    timeout: float = DEFAULT_TIMEOUT,
    dry_run: bool = False,
) -> ToolResult:
    """Securely executes a validated local command with enhancements."""

    # 1. Get Command Metadata and Path
    meta = _COMMAND_METADATA.get(cmd_name)
    if not meta or not meta.path:
        raise ToolExecutionError(
            f"Command '{cmd_name}' is not available or configured.",
            error_code=ToolErrorCode.CMD_NOT_FOUND,
        )

    # Version/checksum checks run before semaphore. This is generally okay,
    # potential for minor concurrent hashing/version checks on very first simultaneous calls.
    # 2. Check Minimum Version (async, cached)
    try:
        await _check_command_version(meta)
    except ToolExecutionError as e:
        # Propagate version error directly
        raise e

    # 3. Verify Checksum (async, lazy, cached)
    try:
        await _get_command_checksum(meta)  # Verifies if mtime changed, re-calcs if needed
        # Optionally, compare against a known-good checksum if available/needed
        # if meta.checksum != EXPECTED_CHECKSUMS[cmd_name]: raise ToolExecutionError(...)
    except ToolExecutionError as e:
        raise e  # Propagate checksum errors

    # 4. Parse Arguments using shlex
    try:
        argv: List[str] = shlex.split(args_str, posix=True)
    except ValueError as exc:
        raise ToolInputError(
            "Invalid quoting or escaping in args_str.",
            param_name="args_str",
            details={"error_code": ToolErrorCode.INVALID_ARGS.value},
        ) from exc

    # 5. Validate Arguments (Security & Workspace) using the placeholder/real validator
    try:
        _validate_arguments(cmd_name, argv)
    except ToolInputError as e:
        # Add command context to validation error
        raise ToolInputError(
            f"Argument validation failed for '{cmd_name}': {e}",
            param_name="args_str",
            details=e.details,
        ) from e

    # 6. Handle Dry Run
    cmd_path_str = str(meta.path)
    cmdline: List[str] = [cmd_path_str, *argv]
    if dry_run:
        logger.info(f"Dry run: Command validated successfully: {shlex.join(cmdline)}")
        # Ensure ToolResult structure is fully populated for dry run
        return ToolResult(
            success=True,
            dry_run_cmdline=cmdline,  # Return the command list
            stdout=None,
            stderr=None,
            exit_code=None,
            error=None,
            error_code=None,
            duration=0.0,
            stdout_truncated=False,
            stderr_truncated=False,
            cached_result=False,
        )

    # 7. Stdin Size Check and Encoding (Optimization)
    input_bytes: Optional[bytes] = None
    input_len = 0
    if input_data is not None:
        input_bytes = input_data.encode("utf-8", errors="ignore")
        input_len = len(input_bytes)
        if input_len > MAX_INPUT_BYTES:
            raise ToolInputError(
                f"input_data exceeds maximum allowed size of {MAX_INPUT_BYTES / (1024 * 1024):.1f} MB.",
                param_name="input_data",
                details={
                    "limit_bytes": MAX_INPUT_BYTES,
                    "actual_bytes": input_len,
                    "error_code": ToolErrorCode.INPUT_TOO_LARGE.value,
                },
            )

    # 8. Prepare for Caching
    cache_key = _get_cache_key(cmd_name, argv, input_bytes)
    cached_result = await _cache_get_async(cache_key)
    if cached_result:
        # Ensure cached_result flag is set correctly (should be done by _cache_get_async)
        cached_result["cached_result"] = True
        return cached_result

    # 9. Acquire Concurrency Semaphore
    semaphore = _SEMAPHORES.get(cmd_name)
    if not semaphore:
        logger.error(
            f"Internal error: No semaphore found for command '{cmd_name}'. Using fallback limit 1."
        )
        # Should not happen if CONCURRENCY_LIMITS is correct
        semaphore = asyncio.Semaphore(1)  # Fallback to concurrency of 1

    async with semaphore:
        # 10. Redacted Logging
        redacted_argv = [
            re.sub(r"(?i)(--?(?:password|token|key|secret)=)\S+", r"\1********", arg)
            for arg in argv
        ]
        log_payload = {
            "event": "execute_local_text_tool",
            "command": cmd_name,
            "args": redacted_argv,
            "input_source": "stdin"
            if input_data is not None
            else ("file" if is_file_target else ("dir" if is_dir_target else "args_only")),
            "timeout_s": timeout,
            "cache_key_prefix": cache_key[:8] if cache_key else None,
        }
        # Using extra assumes logger is configured to handle it
        logger.info(json.dumps(log_payload), extra={"json_fields": log_payload})

        # 11. Resource Limit Setup (Unix only) - Uses module-level function
        def preexec_fn_to_use():
            # Pass cmd_name for command-specific limits
            return _limit_resources(timeout, cmd_name=cmd_name) if sys.platform != "win32" else None

        # 12. Minimal Sanitized Environment
        safe_env = {"PATH": os.getenv("PATH", "/usr/bin:/bin:/usr/local/bin")}  # Add common paths
        # Preserve locale settings for correct text processing
        for safe_var in ["LANG", "LC_ALL", "LC_CTYPE", "LC_MESSAGES", "LC_COLLATE"]:
            if safe_var in os.environ:
                safe_env[safe_var] = os.environ[safe_var]
        # Add HOME for tools that might need it for config (like awk finding extensions)
        if "HOME" in os.environ:
            safe_env["HOME"] = os.environ["HOME"]
        # Add TMPDIR as some tools use it
        if "TMPDIR" in os.environ:
            safe_env["TMPDIR"] = os.environ["TMPDIR"]
        elif os.getenv("TEMP"):  # Windows variant
            safe_env["TEMP"] = os.getenv("TEMP")  # type: ignore[assignment]

        # 13. Launch Subprocess
        t0 = time.perf_counter()
        proc: Optional[asyncio.subprocess.Process] = None
        result: Optional[ToolResult] = None  # Ensure result is defined

        try:
            proc = await asyncio.create_subprocess_exec(
                *cmdline,
                stdin=asyncio.subprocess.PIPE
                if input_bytes is not None
                else asyncio.subprocess.DEVNULL,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                env=safe_env,
                limit=MAX_OUTPUT_BYTES * 2,  # Limit buffer size slightly larger than max output
                preexec_fn=preexec_fn_to_use,
                # cwd=str(WORKSPACE_DIR) # Generally not needed if paths are validated relative to WORKSPACE_DIR
            )

            # 14. Communicate and Handle Timeout
            try:
                stdout_b, stderr_b = await asyncio.wait_for(
                    proc.communicate(input=input_bytes),
                    timeout=timeout,
                )
            except asyncio.TimeoutError as e:
                logger.warning(
                    f"Command '{cmd_name}' timed out after {timeout}s. Terminating.",
                    extra={"command": cmd_name, "timeout": timeout},
                )
                if proc and proc.returncode is None:
                    try:
                        proc.terminate()
                        # Give it a moment to terminate gracefully
                        await asyncio.wait_for(proc.wait(), timeout=1.0)
                    except asyncio.TimeoutError:
                        logger.warning(
                            f"Process {proc.pid} did not terminate gracefully after 1s, killing.",
                            extra={"pid": proc.pid},
                        )
                        try:
                            proc.kill()
                        except ProcessLookupError:
                            pass  # Already gone
                        await proc.wait()  # Wait for kill to complete
                    except ProcessLookupError:
                        pass  # Already gone
                    except Exception as term_err:
                        logger.error(
                            f"Error terminating process {proc.pid}: {term_err}",
                            extra={"pid": proc.pid},
                        )

                raise ToolExecutionError(
                    f"'{cmd_name}' exceeded timeout of {timeout}s.",
                    error_code=ToolErrorCode.TIMEOUT,
                    details={"timeout": timeout},
                ) from e
            except (BrokenPipeError, ConnectionResetError) as comm_err:
                # Handle cases where the process exits before consuming all input/producing output
                exit_code_on_comm_err = proc.returncode if proc else -1
                logger.warning(
                    f"Communication error with '{cmd_name}' (process likely exited early): {comm_err}. Exit code: {exit_code_on_comm_err}",
                    extra={
                        "command": cmd_name,
                        "error": str(comm_err),
                        "exit_code": exit_code_on_comm_err,
                    },
                )
                # Read any remaining output before raising
                stdout_b = b""
                stderr_b = b""
                try:
                    # Use readexactly or read with limit to avoid indefinite block
                    if proc and proc.stdout:
                        stdout_b = await asyncio.wait_for(
                            proc.stdout.read(MAX_OUTPUT_BYTES * 2), timeout=0.5
                        )
                    if proc and proc.stderr:
                        stderr_b = await asyncio.wait_for(
                            proc.stderr.read(MAX_OUTPUT_BYTES * 2), timeout=0.5
                        )
                except asyncio.TimeoutError:
                    pass
                except Exception as read_err:
                    logger.warning(f"Error reading remaining output after comm error: {read_err}")

                # Continue processing with potentially partial output, but mark as communication error
                # Let success be determined by exit code if available, otherwise assume failure
                duration = time.perf_counter() - t0
                exit_code = (
                    proc.returncode if proc and proc.returncode is not None else -1
                )  # Use -1 if unknown
                stdout, stdout_truncated = _truncate(stdout_b)
                stderr, stderr_truncated = _truncate(stderr_b)
                retcode_ok_map = RETCODE_OK.get(cmd_name, {0})
                success = exit_code in retcode_ok_map

                # Construct result indicating communication error
                result = ToolResult(
                    stdout=stdout,
                    stderr=stderr,
                    exit_code=exit_code,
                    success=success,  # May be true if exit code was OK despite pipe error
                    error=f"Communication error with '{cmd_name}': {comm_err}",
                    error_code=ToolErrorCode.COMMUNICATION_ERROR,
                    duration=round(duration, 3),
                    stdout_truncated=stdout_truncated,
                    stderr_truncated=stderr_truncated,
                    cached_result=False,
                )
                # Do not cache communication errors
                return result  # Return immediately, skip normal success/cache path

            except Exception as comm_err:  # Catch other potential communicate errors
                exit_code_on_comm_err = proc.returncode if proc else -1
                raise ToolExecutionError(
                    f"Communication error with '{cmd_name}': {comm_err}",
                    error_code=ToolErrorCode.COMMUNICATION_ERROR,
                    details={"exit_code": exit_code_on_comm_err},
                ) from comm_err

            # 15. Process Results
            duration = time.perf_counter() - t0
            exit_code = proc.returncode
            stdout, stdout_truncated = _truncate(stdout_b)
            stderr, stderr_truncated = _truncate(stderr_b)

            # Use normalization table for success check
            retcode_ok_map = RETCODE_OK.get(cmd_name, {0})  # Default to {0} if cmd not in map
            success = exit_code in retcode_ok_map
            error_message: Optional[str] = None
            error_code: Optional[ToolErrorCode] = None

            if not success:
                error_message = f"Command '{cmd_name}' failed with exit code {exit_code}."
                # Attempt to map common exit codes to specific error types if possible
                # e.g., if exit code 2 for rg means regex error -> ToolErrorCode.INVALID_ARGS
                error_code = ToolErrorCode.EXEC_ERROR  # Default execution error
                if stderr:
                    error_message += (
                        f" Stderr: '{textwrap.shorten(stderr, 150, placeholder='...')}'"
                    )

            # 16. Construct ToolResult TypedDict
            result = ToolResult(
                stdout=stdout,
                stderr=stderr,
                exit_code=exit_code,
                success=success,
                error=error_message,
                error_code=error_code,
                duration=round(duration, 3),
                stdout_truncated=stdout_truncated,
                stderr_truncated=stderr_truncated,
                cached_result=False,  # Will be set by caller if cached
            )

            # 17. Cache the result if successful and not too large? (optional check)
            # Consider not caching extremely large successful results if space is a concern
            # current logic caches all successful results
            if success:
                await _cache_put_async(cache_key, result)

            return result

        except (ToolInputError, ToolExecutionError) as e:
            # Propagate specific errors we raised
            raise e
        except FileNotFoundError as e:
            # Specifically catch if the command itself isn't found at exec time
            logger.error(
                f"Command '{cmd_name}' not found at path: {cmdline[0]}. Ensure it's installed and in PATH.",
                exc_info=True,
            )
            raise ToolExecutionError(
                f"Command '{cmd_name}' executable not found.",
                error_code=ToolErrorCode.CMD_NOT_FOUND,
            ) from e
        except PermissionError as e:
            logger.error(
                f"Permission denied executing command '{cmd_name}' at path: {cmdline[0]}.",
                exc_info=True,
            )
            raise ToolExecutionError(
                f"Permission denied executing '{cmd_name}'. Check file permissions.",
                error_code=ToolErrorCode.EXEC_ERROR,
            ) from e
        except Exception as e:
            # Catch unexpected errors during setup/execution
            logger.critical(f"Unexpected error running command '{cmd_name}': {e}", exc_info=True)
            raise ToolExecutionError(
                f"Unexpected failure executing '{cmd_name}': {e}",
                error_code=ToolErrorCode.UNEXPECTED_FAILURE,
            ) from e
        finally:
            # Ensure process is cleaned up if an exception occurred after creation
            if proc and proc.returncode is None:
                logger.warning(
                    f"Process {proc.pid} for '{cmd_name}' still running after exception, attempting kill."
                )
                try:
                    proc.kill()
                    await proc.wait()
                except ProcessLookupError:
                    pass
                except Exception as final_kill_err:
                    logger.error(
                        f"Error killing process {proc.pid} in finally block: {final_kill_err}"
                    )


# --------------------------------------------------------------------------- #
# Retcode Normalization
# --------------------------------------------------------------------------- #
RETCODE_OK: Dict[str, set[int]] = {
    "rg": {0, 1},  # 0 = matches found, 1 = no matches found (both OK for searching)
    "jq": {0},  # 0 = success. Other codes (e.g., 4 for no matches with -e) indicate issues.
    "awk": {0},  # 0 = success
    "sed": {0},  # 0 = success
}

# --------------------------------------------------------------------------- #
# Public Tool Functions (Standalone Wrappers)
# --------------------------------------------------------------------------- #


def _require_single_source(
    cmd: str,
    *,
    input_data: Optional[str],
    input_file: Optional[bool],
    input_dir: Optional[bool],
) -> None:
    """Validates that exactly one input source mode is indicated."""
    # Convert bool flags to 0 or 1 for summing. None becomes 0.
    modes = [
        input_data is not None,
        input_file is True,  # Explicit True check
        input_dir is True,  # Explicit True check
    ]
    num_modes = sum(modes)
    if num_modes == 0:
        # Use a clearer error message
        raise ToolInputError(
            f"For '{cmd}', you must provide exactly one input: 'input_data' OR 'input_file=True' OR 'input_dir=True'.",
            param_name="inputs",
            details={"error_code": ToolErrorCode.INVALID_ARGS.value},
        )
    elif num_modes > 1:
        raise ToolInputError(
            f"For '{cmd}', specify exactly one input mode: provide 'input_data' OR set 'input_file=True' OR set 'input_dir=True'. Found multiple.",
            param_name="inputs",
            details={"error_code": ToolErrorCode.INVALID_ARGS.value},
        )


# --- run_ripgrep ---
@with_tool_metrics
@with_error_handling
async def run_ripgrep(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default to False for clarity
    input_dir: Optional[bool] = False,  # Default to False for clarity
    timeout: float = DEFAULT_TIMEOUT,
    dry_run: bool = False,
) -> ToolResult:
    """
    Executes the 'rg' (ripgrep) command for fast text pattern searching within the secure workspace.

    Searches recursively through directories or specified files (relative to the workspace)
    for lines matching a regular expression or fixed string pattern.

    Input Handling:
    - `input_data`: Provide text directly via stdin. Omit `input_file`/`input_dir` or set to `False`. Do *not* include a path in `args_str`. Max size controlled by MCP_TEXT_MAX_INPUT.
    - `input_file=True`: Indicates `args_str` contains target file path(s). Omit `input_data`/`input_dir` or set to `False`. Path(s) must be relative to the workspace and specified in `args_str`. Example: `args_str="'pattern' path/to/file.txt"`
    - `input_dir=True`: Indicates `args_str` contains target directory path(s). Omit `input_data`/`input_file` or set to `False`. Path(s) must be relative to the workspace and specified in `args_str`. Example: `args_str="'pattern' path/to/dir"`

    Common `rg` Arguments (include in `args_str`, use workspace-relative paths):
      `'pattern'`: Regex pattern or fixed string.
      `path`: Workspace-relative file or directory path(s).
      `-i`, `--ignore-case`: Case-insensitive search.
      `-v`, `--invert-match`: Select non-matching lines.
      `-l`, `--files-with-matches`: List files containing matches.
      `-c`, `--count`: Count matches per file.
      `-A NUM`, `-B NUM`, `-C NUM`: Context control.
      `--json`: JSON output format.
      `-t type`: Filter by file type (e.g., `py`, `md`).
      `-g glob`: Include/exclude files/dirs by glob (e.g., `-g '*.py' -g '!temp/'`).
      `-o, --only-matching`: Print only matching parts.
      `--follow`: Follow symbolic links (targets must also be within workspace).
      `--no-filename`, `-N`: Suppress filename/line numbers.

    Security: Enforces workspace boundary `"{get_workspace_dir()}"`. Blocks absolute paths, path traversal ('..'), unsafe flags, and shell metacharacters. Limited resources (CPU, Memory, Processes).

    Exit Codes & Success Field:
    - `0`: Matches found -> `success: True`, `error: null`
    - `1`: No matches found -> `success: True`, `error: null` (NOTE: Considered success by this tool wrapper)
    - `2+`: Error occurred -> `success: False`, `error: "..."`, `error_code: EXEC_ERROR`

    Args:
        args_str: Command-line arguments for `rg` (pattern, flags, workspace-relative paths).
        input_data: String data to pipe to `rg` via stdin. Omit/False for `input_file`/`input_dir`.
        input_file: Set to True if `args_str` includes target file path(s). Omit/False for `input_data`/`input_dir`.
        input_dir: Set to True if `args_str` includes target directory path(s). Omit/False for `input_data`/`input_file`.
        timeout: Max execution time in seconds.
        dry_run: If True, validate args and return command line without execution.

    Returns:
        ToolResult: Dictionary containing execution results or dry run info.
                    Includes stdout, stderr, exit_code, success, error, error_code, duration,
                    truncation flags, and cached_result status.

    Raises:
        ToolInputError: For invalid arguments, security violations, or incorrect input mode usage.
        ToolExecutionError: If `rg` is not found, times out, fails version/checksum checks, or fails unexpectedly.
    """
    _require_single_source("rg", input_data=input_data, input_file=input_file, input_dir=input_dir)
    return await _run_command_secure(
        "rg",
        args_str,
        input_data=input_data,
        is_file_target=input_file,  # Pass the boolean flag directly
        is_dir_target=input_dir,  # Pass the boolean flag directly
        timeout=timeout,
        dry_run=dry_run,
    )


# --- run_awk ---
@with_tool_metrics
@with_error_handling
async def run_awk(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default to False for clarity
    timeout: float = DEFAULT_TIMEOUT,
    dry_run: bool = False,
) -> ToolResult:
    """
    Executes the 'awk' command for pattern scanning and field-based text processing within the secure workspace.

    Input Handling:
    - `input_data`: Provide text directly via stdin. Omit `input_file` or set to `False`. Do *not* include filename in `args_str`. Max size controlled by MCP_TEXT_MAX_INPUT.
    - `input_file=True`: Indicates `args_str` contains target file path(s). Omit `input_data` or set to `False`. Path(s) must be relative to the workspace and specified in `args_str`. Example: `args_str="'{print $1}' path/data.log"`

    Common `awk` Arguments (include in `args_str`, use workspace-relative paths):
      `'program'`: The AWK script (e.g., `'{ print $1, $3 }'`).
      `filename(s)`: Workspace-relative input filename(s).
      `-F fs`: Define input field separator (e.g., `-F ','`).
      `-v var=value`: Assign variable.

    Security: Enforces workspace boundary `"{get_workspace_dir()}"`. Blocks unsafe flags (`-i`/`--in-place` gawk), shell characters, absolute paths, traversal. Limited resources.

    Args:
        args_str: Command-line arguments for `awk` (script, flags, workspace-relative paths).
        input_data: String data to pipe to `awk` via stdin. Omit/False for `input_file`.
        input_file: Set to True if `args_str` includes target file path(s). Omit/False for `input_data`.
        timeout: Max execution time in seconds.
        dry_run: If True, validate args and return command line without execution.

    Returns:
        ToolResult: Dictionary with results. `success` is True only if exit code is 0.

    Raises:
        ToolInputError: For invalid arguments, security violations, or incorrect input mode usage.
        ToolExecutionError: If `awk` is not found, times out, fails version/checksum checks, or fails unexpectedly.
    """
    _require_single_source("awk", input_data=input_data, input_file=input_file, input_dir=False)
    return await _run_command_secure(
        "awk",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,  # awk typically doesn't take dir targets directly
        timeout=timeout,
        dry_run=dry_run,
    )


# --- run_sed ---
@with_tool_metrics
@with_error_handling
async def run_sed(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default to False for clarity
    timeout: float = DEFAULT_TIMEOUT,
    dry_run: bool = False,
) -> ToolResult:
    """
    Executes the 'sed' (Stream Editor) command for line-by-line text transformations within the secure workspace.

    Performs substitutions, deletions, insertions based on patterns. **In-place editing (`-i`) is disabled.**

    Input Handling:
    - `input_data`: Provide text directly via stdin. Omit `input_file` or set to `False`. Do *not* include filename in `args_str`. Max size controlled by MCP_TEXT_MAX_INPUT.
    - `input_file=True`: Indicates `args_str` contains target file path. Omit `input_data` or set to `False`. Path must be relative to the workspace and specified in `args_str`. Example: `args_str="'s/ERROR/WARN/g' path/app.log"`

    Common `sed` Arguments (include in `args_str`, use workspace-relative paths):
      `'script'`: The `sed` script or command (e.g., `'s/foo/bar/g'`, `'/^DEBUG/d'`).
      `filename`: Workspace-relative input filename.
      `-e script`: Add multiple scripts.
      `-f script-file`: Read commands from a workspace-relative file.
      `-n`: Suppress automatic printing.
      `-E` or `-r`: Use extended regular expressions.

    Security: Enforces workspace boundary `"{get_workspace_dir()}"`. Blocks `-i` flag, shell characters, absolute paths, traversal. Limited resources.

    Args:
        args_str: Command-line arguments for `sed` (script, flags, workspace-relative path).
        input_data: String data to pipe to `sed` via stdin. Omit/False for `input_file`.
        input_file: Set to True if `args_str` includes target file path. Omit/False for `input_data`.
        timeout: Max execution time in seconds.
        dry_run: If True, validate args and return command line without execution.

    Returns:
        ToolResult: Dictionary with results. `success` is True only if exit code is 0.

    Raises:
        ToolInputError: For invalid arguments, security violations (like using -i), or incorrect input mode usage.
        ToolExecutionError: If `sed` is not found, times out, fails version/checksum checks, or fails unexpectedly.
    """
    _require_single_source("sed", input_data=input_data, input_file=input_file, input_dir=False)
    return await _run_command_secure(
        "sed",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,  # sed operates on files or stdin
        timeout=timeout,
        dry_run=dry_run,
    )


# --- run_jq ---
@with_tool_metrics
@with_error_handling
async def run_jq(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default to False for clarity
    timeout: float = DEFAULT_TIMEOUT,
    dry_run: bool = False,
) -> ToolResult:
    """
    Executes the 'jq' command for querying, filtering, and transforming JSON data within the secure workspace.

    Essential for extracting values, filtering arrays/objects, or restructuring JSON
    provided via stdin (`input_data`) or from a file (`input_file=True`).

    Input Handling:
    - `input_data`: Provide valid JSON text directly via stdin. Omit `input_file` or set to `False`. Do *not* include filename in `args_str`. Max size controlled by MCP_TEXT_MAX_INPUT.
    - `input_file=True`: Indicates `args_str` contains target JSON file path. Omit `input_data` or set to `False`. Path must be relative to the workspace and specified in `args_str`. Example: `args_str="'.items[].name' data.json"`

    Common `jq` Arguments (include in `args_str`, use workspace-relative paths):
      `'filter'`: The `jq` filter expression (e.g., `'.users[] | select(.active==true)'`).
      `filename`: Workspace-relative input JSON filename.
      `-c`: Compact output.
      `-r`: Raw string output (no JSON quotes).
      `-s`: Slurp mode (read input stream into an array).
      `--arg name value`: Define string variable.
      `--argjson name json_value`: Define JSON variable.

    Security: Enforces workspace boundary `"{get_workspace_dir()}"`. Blocks unsafe flags, shell characters, absolute paths, traversal. Limited resources. Validates `input_data` is JSON before execution.

    Args:
        args_str: Command-line arguments for `jq` (filter, flags, workspace-relative path).
        input_data: String containing valid JSON data to pipe to `jq` via stdin. Omit/False for `input_file`.
        input_file: Set to True if `args_str` includes target JSON file path. Omit/False for `input_data`.
        timeout: Max execution time in seconds.
        dry_run: If True, validate args and return command line without execution.

    Returns:
        ToolResult: Dictionary with results. `success` is True only if exit code is 0.

    Raises:
        ToolInputError: If `input_data` is not valid JSON, arguments are invalid, security violations occur, or incorrect input mode usage.
        ToolExecutionError: If `jq` is not found, times out, fails version/checksum checks, or fails unexpectedly.
    """
    _require_single_source("jq", input_data=input_data, input_file=input_file, input_dir=False)
    # Extra check for jq: validate input_data is JSON before running
    if input_data is not None and not _is_json_or_json_lines(input_data):
        raise ToolInputError(
            "input_data is not valid JSON or JSON-Lines",
            param_name="input_data",
            details={"error_code": ToolErrorCode.INVALID_JSON_INPUT.value},
        )

    return await _run_command_secure(
        "jq",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,  # jq operates on files or stdin
        timeout=timeout,
        dry_run=dry_run,
    )


# --------------------------------------------------------------------------- #
# Streaming Core Executor
# --------------------------------------------------------------------------- #


async def _run_command_stream(
    cmd_name: str,
    args_str: str,
    *,
    input_data: Optional[str] = None,
    is_file_target: bool = False,
    is_dir_target: bool = False,
    timeout: float = DEFAULT_TIMEOUT,
) -> AsyncIterator[str]:
    """Securely executes a command and streams its stdout line by line."""

    # 1. Get Command Metadata and Path
    meta = _COMMAND_METADATA.get(cmd_name)
    if not meta or not meta.path:
        raise ToolExecutionError(
            f"Command '{cmd_name}' is not available or configured.",
            error_code=ToolErrorCode.CMD_NOT_FOUND,
        )

    # 2. Version & Checksum Checks (Same as non-streaming)
    await _check_command_version(meta)
    await _get_command_checksum(meta)  # Verification happens here

    # 3. Parse & Validate Arguments
    try:
        argv: List[str] = shlex.split(args_str, posix=True)
        _validate_arguments(cmd_name, argv)
    except ValueError as exc:
        raise ToolInputError(
            "Invalid quoting or escaping in args_str.",
            param_name="args_str",
            details={"error_code": ToolErrorCode.INVALID_ARGS.value},
        ) from exc
    except ToolInputError as e:
        raise ToolInputError(
            f"Argument validation failed for '{cmd_name}' stream: {e}",
            param_name="args_str",
            details=e.details,
        ) from e

    # 4. Prepare command line and logging
    cmd_path_str = str(meta.path)
    cmdline: List[str] = [cmd_path_str, *argv]
    redacted_argv = [
        re.sub(r"(?i)(--?(?:password|token|key|secret)=)\S+", r"\1********", arg) for arg in argv
    ]
    log_payload = {
        "event": "execute_local_text_tool_stream",
        "command": cmd_name,
        "args": redacted_argv,
        "input_source": "stdin"
        if input_data is not None
        else ("file" if is_file_target else ("dir" if is_dir_target else "args_only")),
        "timeout_s": timeout,
    }
    logger.info(json.dumps(log_payload), extra={"json_fields": log_payload})

    # 5. Resource limits & Env (Same as non-streaming)
    def preexec_fn_to_use():
        return _limit_resources(timeout) if sys.platform != "win32" else None

    safe_env = {"PATH": os.getenv("PATH", "/usr/bin:/bin:/usr/local/bin")}
    for safe_var in ["LANG", "LC_ALL", "LC_CTYPE", "LC_MESSAGES", "LC_COLLATE"]:
        if safe_var in os.environ:
            safe_env[safe_var] = os.environ[safe_var]
    if "HOME" in os.environ:
        safe_env["HOME"] = os.environ["HOME"]
    if "TMPDIR" in os.environ:
        safe_env["TMPDIR"] = os.environ["TMPDIR"]
    elif os.getenv("TEMP"):
        safe_env["TEMP"] = os.getenv("TEMP")  # type: ignore[assignment]

    input_bytes: Optional[bytes] = None
    if input_data is not None:
        input_bytes = input_data.encode("utf-8", errors="ignore")
        # Check size *before* starting process
        if len(input_bytes) > MAX_INPUT_BYTES:
            raise ToolInputError(
                f"input_data exceeds maximum allowed size of {MAX_INPUT_BYTES / (1024 * 1024):.1f} MB for streaming.",
                param_name="input_data",
                details={
                    "limit_bytes": MAX_INPUT_BYTES,
                    "actual_bytes": len(input_bytes),
                    "error_code": ToolErrorCode.INPUT_TOO_LARGE.value,
                },
            )

    # 6. Launch Subprocess and Stream Output
    proc: Optional[asyncio.subprocess.Process] = None
    stderr_lines: List[str] = []  # Collect stderr lines
    start_time = time.monotonic()

    try:
        proc = await asyncio.create_subprocess_exec(
            *cmdline,
            stdin=asyncio.subprocess.PIPE
            if input_bytes is not None
            else asyncio.subprocess.DEVNULL,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,  # Capture stderr
            env=safe_env,
            limit=MAX_OUTPUT_BYTES * 2,  # Apply buffer limit
            preexec_fn=preexec_fn_to_use,
        )

        # --- Define Helper Coroutines for IO ---
        async def write_stdin_task(proc: asyncio.subprocess.Process) -> None:
            """Writes input data to stdin if provided and closes stdin."""
            if input_bytes is not None and proc.stdin:
                try:
                    proc.stdin.write(input_bytes)
                    await proc.stdin.drain()
                except (BrokenPipeError, ConnectionResetError) as e:
                    logger.warning(
                        f"Stream: Error writing to stdin for '{cmd_name}' (pid {proc.pid}): {e}. Process might have exited."
                    )
                except Exception as e:
                    logger.error(
                        f"Stream: Unexpected error writing to stdin for '{cmd_name}' (pid {proc.pid}): {e}",
                        exc_info=True,
                    )
                finally:
                    if proc.stdin:
                        try:
                            proc.stdin.close()
                        except Exception:
                            pass  # Ignore errors closing stdin already closed/broken
            elif proc.stdin:
                # Close stdin immediately if no input_data
                try:
                    proc.stdin.close()
                except Exception:
                    pass

        async def read_stderr_task(proc: asyncio.subprocess.Process, lines_list: List[str]) -> None:
            """Reads stderr lines into the provided list."""
            if not proc.stderr:
                return
            stderr_bytes_read = 0
            while True:
                try:
                    # Read with a timeout? readline() could block if process hangs writing stderr
                    line_bytes = await proc.stderr.readline()
                    if not line_bytes:
                        break  # End of stream
                    stderr_bytes_read += len(line_bytes)
                    # Basic truncation within stderr collection to prevent memory issues
                    if stderr_bytes_read > MAX_OUTPUT_BYTES * 1.1:  # Allow slightly more for marker
                        if not lines_list or not lines_list[-1].endswith("...(stderr truncated)"):
                            lines_list.append("...(stderr truncated)")
                        continue  # Stop appending lines but keep reading to drain pipe
                    lines_list.append(line_bytes.decode("utf-8", errors="replace"))
                except Exception as e:
                    logger.warning(
                        f"Stream: Error reading stderr line for '{cmd_name}' (pid {proc.pid}): {e}"
                    )
                    lines_list.append(f"##STREAM_STDERR_READ_ERROR: {e}\n")
                    break  # Stop reading stderr on error

        # --- Create IO Tasks ---
        stdin_writer = asyncio.create_task(write_stdin_task(proc))
        stderr_reader = asyncio.create_task(read_stderr_task(proc, stderr_lines))

        # --- Yield stdout lines (Inlined async generator logic) ---
        stdout_line_count = 0
        stdout_bytes_read = 0
        stdout_truncated = False
        if proc.stdout:
            while True:
                # Check timeout explicitly within the loop
                if time.monotonic() - start_time > timeout:
                    raise asyncio.TimeoutError()

                try:
                    # Use readline with a timeout? Less efficient but prevents hangs.
                    # Or rely on overall wait_for timeout below.
                    line_bytes = await proc.stdout.readline()
                    if not line_bytes:
                        break  # End of stdout stream

                    stdout_bytes_read += len(line_bytes)
                    if stdout_bytes_read > MAX_OUTPUT_BYTES:
                        if not stdout_truncated:  # Add marker only once
                            yield "...(stdout truncated)\n"
                            stdout_truncated = True
                        continue  # Stop yielding lines but keep reading to drain pipe

                    if not stdout_truncated:
                        # Decode and yield the line
                        yield line_bytes.decode("utf-8", errors="replace")
                        stdout_line_count += 1

                except asyncio.TimeoutError as e:  # Catch timeout during readline if applied
                    raise asyncio.TimeoutError() from e  # Re-raise to be caught by outer handler
                except Exception as e:
                    logger.warning(
                        f"Stream: Error reading stdout line for '{cmd_name}' (pid {proc.pid}): {e}"
                    )
                    # Yield an error marker in the stream
                    yield f"##STREAM_STDOUT_READ_ERROR: {e}\n"
                    break  # Stop reading stdout on error
        else:
            logger.warning(
                f"Stream: Process for '{cmd_name}' has no stdout stream.",
                extra={"command": cmd_name},
            )

        # --- Wait for process and stderr/stdin tasks to complete ---
        # Wait for the process itself, applying the main timeout
        try:
            # Wait slightly longer than the timeout to allow for cleanup/exit signals
            exit_code = await asyncio.wait_for(proc.wait(), timeout=timeout + 5.0)
        except asyncio.TimeoutError as e:
            # If proc.wait() times out, it means the process didn't exit within timeout+5s
            # Re-raise the specific timeout error defined earlier
            raise ToolExecutionError(
                f"'{cmd_name}' stream process failed to exit within timeout of {timeout}s (+5s buffer).",
                error_code=ToolErrorCode.TIMEOUT,
                details={"timeout": timeout},
            ) from e

        # Ensure IO tasks have finished (they should have if process exited, but wait briefly)
        try:
            await asyncio.wait_for(stdin_writer, timeout=1.0)
            await asyncio.wait_for(stderr_reader, timeout=1.0)
        except asyncio.TimeoutError:
            logger.warning(
                f"Stream: IO tasks for '{cmd_name}' (pid {proc.pid}) did not complete quickly after process exit."
            )

        # --- Check final status ---
        duration = time.monotonic() - start_time
        retcode_ok_map = RETCODE_OK.get(cmd_name, {0})
        success = exit_code in retcode_ok_map
        stderr_full = "".join(stderr_lines)

        if not success:
            stderr_snip = textwrap.shorten(stderr_full.strip(), 150, placeholder="...")
            error_msg = f"Command '{cmd_name}' stream finished with failure exit code {exit_code}. Stderr: '{stderr_snip}'"
            logger.warning(
                error_msg,
                extra={"command": cmd_name, "exit_code": exit_code, "stderr": stderr_full},
            )
            # Raise error *after* iteration completes to signal failure to the caller
            raise ToolExecutionError(
                error_msg,
                error_code=ToolErrorCode.EXEC_ERROR,
                details={"exit_code": exit_code, "stderr": stderr_full},
            )
        else:
            logger.info(
                f"Stream: '{cmd_name}' (pid {proc.pid}) finished successfully in {duration:.3f}s (code={exit_code}, stdout_lines={stdout_line_count}, truncated={stdout_truncated})"
            )

    except asyncio.TimeoutError as e:
        # This catches the explicit timeout check within the stdout loop or the wait_for on proc.wait()
        logger.warning(
            f"Command '{cmd_name}' stream timed out after ~{timeout}s. Terminating.",
            extra={"command": cmd_name, "timeout": timeout},
        )
        if proc and proc.returncode is None:
            try:
                proc.terminate()
                await asyncio.wait_for(proc.wait(), timeout=1.0)
            except asyncio.TimeoutError:
                logger.warning(f"Killing unresponsive stream process {proc.pid}")
                proc.kill()
                await proc.wait()
            except ProcessLookupError:
                pass
            except Exception as term_err:
                logger.error(f"Error killing stream process {proc.pid}: {term_err}")
        # Raise the standard execution error for timeout
        raise ToolExecutionError(
            f"'{cmd_name}' stream exceeded timeout of {timeout}s.",
            error_code=ToolErrorCode.TIMEOUT,
            details={"timeout": timeout},
        ) from e
    except (ToolInputError, ToolExecutionError) as e:
        # Propagate specific errors raised during setup or validation
        raise e
    except FileNotFoundError as e:
        logger.error(
            f"Stream: Command '{cmd_name}' not found at path: {cmdline[0]}. Ensure it's installed and in PATH.",
            exc_info=True,
        )
        raise ToolExecutionError(
            f"Stream: Command '{cmd_name}' executable not found.",
            error_code=ToolErrorCode.CMD_NOT_FOUND,
        ) from e
    except PermissionError as e:
        logger.error(
            f"Stream: Permission denied executing command '{cmd_name}' at path: {cmdline[0]}.",
            exc_info=True,
        )
        raise ToolExecutionError(
            f"Stream: Permission denied executing '{cmd_name}'. Check file permissions.",
            error_code=ToolErrorCode.EXEC_ERROR,
        ) from e
    except Exception as e:
        logger.error(
            f"Stream: Unexpected error during command stream '{cmd_name}': {e}", exc_info=True
        )
        raise ToolExecutionError(
            f"Unexpected failure during '{cmd_name}' stream: {e}",
            error_code=ToolErrorCode.UNEXPECTED_FAILURE,
        ) from e
    finally:
        # Final cleanup check for the process in case of unexpected exit from the try block
        if proc and proc.returncode is None:
            logger.warning(
                f"Stream: Process {proc.pid} for '{cmd_name}' still running after stream processing finished unexpectedly, killing."
            )
            try:
                proc.kill()
                await proc.wait()
            except ProcessLookupError:
                pass
            except Exception as final_kill_err:
                logger.error(
                    f"Stream: Error killing process {proc.pid} in finally block: {final_kill_err}"
                )


# --- run_ripgrep_stream ---
@with_tool_metrics
@with_error_handling
async def run_ripgrep_stream(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default False
    input_dir: Optional[bool] = False,  # Default False
    timeout: float = DEFAULT_TIMEOUT,
) -> AsyncIterator[str]:
    """
    Executes 'rg' and streams stdout lines asynchronously. Useful for large outputs.

    See `run_ripgrep` for detailed argument descriptions and security notes.
    This variant yields each line of standard output as it becomes available.

    **Note:** The final success status and stderr are not part of the yielded stream.
    Errors during execution (e.g., timeout, non-zero exit code other than 1 for 'no match')
    will raise a `ToolExecutionError` *after* the stream iteration completes
    (or immediately if setup fails). Use a `try...finally` or check status afterwards.

    Args:
        args_str: Command-line arguments for `rg`.
        input_data: String data to pipe via stdin. Omit/False for `input_file`/`input_dir`.
        input_file: Set True if `args_str` includes target file path(s). Omit/False for `input_data`/`input_dir`.
        input_dir: Set True if `args_str` includes target directory path(s). Omit/False for `input_data`/`input_file`.
        timeout: Max execution time in seconds for the entire operation.

    Yields:
        str: Each line of standard output from the `rg` command.

    Raises:
        ToolInputError: For invalid arguments or security violations during setup.
        ToolExecutionError: If `rg` fails execution (timeout, bad exit code > 1),
                            or if errors occur during streaming I/O. Raised *after* iteration.
    """
    _require_single_source(
        "rg (stream)", input_data=input_data, input_file=input_file, input_dir=input_dir
    )
    # Use the streaming executor
    async for line in _run_command_stream(
        "rg",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=input_dir,
        timeout=timeout,
    ):
        yield line


# --- run_awk_stream ---
@with_tool_metrics
@with_error_handling
async def run_awk_stream(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default False
    timeout: float = DEFAULT_TIMEOUT,
) -> AsyncIterator[str]:
    """
    Executes 'awk' and streams stdout lines asynchronously.

    See `run_awk` for detailed argument descriptions and security notes.
    Yields each line of standard output as it becomes available.

    Args:
        args_str: Command-line arguments for `awk`.
        input_data: String data to pipe via stdin. Omit/False for `input_file`.
        input_file: Set True if `args_str` includes target file path(s). Omit/False for `input_data`.
        timeout: Max execution time in seconds.

    Yields:
        str: Each line of standard output from the `awk` command.

    Raises:
        ToolInputError: For invalid arguments or security violations.
        ToolExecutionError: If `awk` fails (non-zero exit), times out, or errors during streaming. Raised *after* iteration.
    """
    _require_single_source(
        "awk (stream)", input_data=input_data, input_file=input_file, input_dir=False
    )
    async for line in _run_command_stream(
        "awk",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,
        timeout=timeout,
    ):
        yield line


# --- run_sed_stream ---
@with_tool_metrics
@with_error_handling
async def run_sed_stream(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default False
    timeout: float = DEFAULT_TIMEOUT,
) -> AsyncIterator[str]:
    """
    Executes 'sed' and streams stdout lines asynchronously.

    See `run_sed` for detailed argument descriptions and security notes.
    Yields each line of standard output as it becomes available.

    Args:
        args_str: Command-line arguments for `sed`.
        input_data: String data to pipe via stdin. Omit/False for `input_file`.
        input_file: Set True if `args_str` includes target file path. Omit/False for `input_data`.
        timeout: Max execution time in seconds.

    Yields:
        str: Each line of standard output from the `sed` command.

    Raises:
        ToolInputError: For invalid arguments or security violations.
        ToolExecutionError: If `sed` fails (non-zero exit), times out, or errors during streaming. Raised *after* iteration.
    """
    _require_single_source(
        "sed (stream)", input_data=input_data, input_file=input_file, input_dir=False
    )
    async for line in _run_command_stream(
        "sed",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,
        timeout=timeout,
    ):
        yield line


# --- run_jq_stream ---
@with_tool_metrics
@with_error_handling
async def run_jq_stream(
    args_str: str,
    *,
    input_data: Optional[str] = None,
    input_file: Optional[bool] = False,  # Default False
    timeout: float = DEFAULT_TIMEOUT,
) -> AsyncIterator[str]:
    """
    Executes 'jq' and streams stdout lines asynchronously.

    See `run_jq` for detailed argument descriptions and security notes.
    Yields each line of standard output (often JSON objects if not using `-c`)
    as it becomes available.

    Args:
        args_str: Command-line arguments for `jq`.
        input_data: String containing valid JSON data to pipe via stdin. Omit/False for `input_file`.
        input_file: Set True if `args_str` includes target JSON file path. Omit/False for `input_data`.
        timeout: Max execution time in seconds.

    Yields:
        str: Each line of standard output from the `jq` command.

    Raises:
        ToolInputError: If `input_data` is not valid JSON, arguments are invalid, or security violations occur.
        ToolExecutionError: If `jq` fails (non-zero exit), times out, or errors during streaming. Raised *after* iteration.
    """
    _require_single_source(
        "jq (stream)", input_data=input_data, input_file=input_file, input_dir=False
    )
    # Validate input_data is JSON if provided, before starting process
    if input_data is not None and not _is_json_or_json_lines(input_data):
        raise ToolInputError(
            "input_data is not valid JSON or JSON-Lines",
            param_name="input_data",
            details={"error_code": ToolErrorCode.INVALID_JSON_INPUT.value},
        )

    async for line in _run_command_stream(
        "jq",
        args_str,
        input_data=input_data,
        is_file_target=input_file,
        is_dir_target=False,
        timeout=timeout,
    ):
        yield line


# --------------------------------------------------------------------------- #
# Public API Exports
# --------------------------------------------------------------------------- #


def get_workspace_dir() -> str:
    """Return the absolute workspace directory path enforced by this module."""
    return str(WORKSPACE_DIR)


# Add command metadata access if needed?
# def get_command_meta(cmd_name: str) -> Optional[CommandMeta]:
#    """Returns the discovered metadata for a command, if available."""
#    return _COMMAND_METADATA.get(cmd_name)


__all__ = [
    # Standard execution
    "run_ripgrep",
    "run_awk",
    "run_sed",
    "run_jq",
    # Streaming variants
    "run_ripgrep_stream",
    "run_awk_stream",
    "run_sed_stream",
    "run_jq_stream",
    # Configuration info
    "get_workspace_dir",
    # Types (for consumers)
    "ToolResult",
    "ToolErrorCode",
    # Maybe export CommandMeta if useful externally?
    # "CommandMeta",
]

```
Page 21/35FirstPrevNextLast