#
tokens: 43434/50000 5/207 files (page 11/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 11 of 35. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│   ├── __init__.py
│   ├── advanced_agent_flows_using_unified_memory_system_demo.py
│   ├── advanced_extraction_demo.py
│   ├── advanced_unified_memory_system_demo.py
│   ├── advanced_vector_search_demo.py
│   ├── analytics_reporting_demo.py
│   ├── audio_transcription_demo.py
│   ├── basic_completion_demo.py
│   ├── cache_demo.py
│   ├── claude_integration_demo.py
│   ├── compare_synthesize_demo.py
│   ├── cost_optimization.py
│   ├── data
│   │   ├── sample_event.txt
│   │   ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│   │   └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│   ├── docstring_refiner_demo.py
│   ├── document_conversion_and_processing_demo.py
│   ├── entity_relation_graph_demo.py
│   ├── filesystem_operations_demo.py
│   ├── grok_integration_demo.py
│   ├── local_text_tools_demo.py
│   ├── marqo_fused_search_demo.py
│   ├── measure_model_speeds.py
│   ├── meta_api_demo.py
│   ├── multi_provider_demo.py
│   ├── ollama_integration_demo.py
│   ├── prompt_templates_demo.py
│   ├── python_sandbox_demo.py
│   ├── rag_example.py
│   ├── research_workflow_demo.py
│   ├── sample
│   │   ├── article.txt
│   │   ├── backprop_paper.pdf
│   │   ├── buffett.pdf
│   │   ├── contract_link.txt
│   │   ├── legal_contract.txt
│   │   ├── medical_case.txt
│   │   ├── northwind.db
│   │   ├── research_paper.txt
│   │   ├── sample_data.json
│   │   └── text_classification_samples
│   │       ├── email_classification.txt
│   │       ├── news_samples.txt
│   │       ├── product_reviews.txt
│   │       └── support_tickets.txt
│   ├── sample_docs
│   │   └── downloaded
│   │       └── attention_is_all_you_need.pdf
│   ├── sentiment_analysis_demo.py
│   ├── simple_completion_demo.py
│   ├── single_shot_synthesis_demo.py
│   ├── smart_browser_demo.py
│   ├── sql_database_demo.py
│   ├── sse_client_demo.py
│   ├── test_code_extraction.py
│   ├── test_content_detection.py
│   ├── test_ollama.py
│   ├── text_classification_demo.py
│   ├── text_redline_demo.py
│   ├── tool_composition_examples.py
│   ├── tournament_code_demo.py
│   ├── tournament_text_demo.py
│   ├── unified_memory_system_demo.py
│   ├── vector_search_demo.py
│   ├── web_automation_instruction_packs.py
│   └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│   └── smart_browser_internal
│       ├── locator_cache.db
│       ├── readability.js
│       └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   └── test_server.py
│   ├── manual
│   │   ├── test_extraction_advanced.py
│   │   └── test_extraction.py
│   └── unit
│       ├── __init__.py
│       ├── test_cache.py
│       ├── test_providers.py
│       └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── cli
│   │   ├── __init__.py
│   │   ├── __main__.py
│   │   ├── commands.py
│   │   ├── helpers.py
│   │   └── typer_cli.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── completion_client.py
│   │   └── rag_client.py
│   ├── config
│   │   └── examples
│   │       └── filesystem_config.yaml
│   ├── config.py
│   ├── constants.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── evaluation
│   │   │   ├── base.py
│   │   │   └── evaluators.py
│   │   ├── providers
│   │   │   ├── __init__.py
│   │   │   ├── anthropic.py
│   │   │   ├── base.py
│   │   │   ├── deepseek.py
│   │   │   ├── gemini.py
│   │   │   ├── grok.py
│   │   │   ├── ollama.py
│   │   │   ├── openai.py
│   │   │   └── openrouter.py
│   │   ├── server.py
│   │   ├── state_store.py
│   │   ├── tournaments
│   │   │   ├── manager.py
│   │   │   ├── tasks.py
│   │   │   └── utils.py
│   │   └── ums_api
│   │       ├── __init__.py
│   │       ├── ums_database.py
│   │       ├── ums_endpoints.py
│   │       ├── ums_models.py
│   │       └── ums_services.py
│   ├── exceptions.py
│   ├── graceful_shutdown.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── analytics
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   └── reporting.py
│   │   ├── cache
│   │   │   ├── __init__.py
│   │   │   ├── cache_service.py
│   │   │   ├── persistence.py
│   │   │   ├── strategies.py
│   │   │   └── utils.py
│   │   ├── cache.py
│   │   ├── document.py
│   │   ├── knowledge_base
│   │   │   ├── __init__.py
│   │   │   ├── feedback.py
│   │   │   ├── manager.py
│   │   │   ├── rag_engine.py
│   │   │   ├── retriever.py
│   │   │   └── utils.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── repository.py
│   │   │   └── templates.py
│   │   ├── prompts.py
│   │   └── vector
│   │       ├── __init__.py
│   │       ├── embeddings.py
│   │       └── vector_service.py
│   ├── tool_token_counter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── audio_transcription.py
│   │   ├── base.py
│   │   ├── completion.py
│   │   ├── docstring_refiner.py
│   │   ├── document_conversion_and_processing.py
│   │   ├── enhanced-ums-lookbook.html
│   │   ├── entity_relation_graph.py
│   │   ├── excel_spreadsheet_automation.py
│   │   ├── extraction.py
│   │   ├── filesystem.py
│   │   ├── html_to_markdown.py
│   │   ├── local_text_tools.py
│   │   ├── marqo_fused_search.py
│   │   ├── meta_api_tool.py
│   │   ├── ocr_tools.py
│   │   ├── optimization.py
│   │   ├── provider.py
│   │   ├── pyodide_boot_template.html
│   │   ├── python_sandbox.py
│   │   ├── rag.py
│   │   ├── redline-compiled.css
│   │   ├── sentiment_analysis.py
│   │   ├── single_shot_synthesis.py
│   │   ├── smart_browser.py
│   │   ├── sql_databases.py
│   │   ├── text_classification.py
│   │   ├── text_redline_tools.py
│   │   ├── tournament.py
│   │   ├── ums_explorer.html
│   │   └── unified_memory_system.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── async_utils.py
│   │   ├── display.py
│   │   ├── logging
│   │   │   ├── __init__.py
│   │   │   ├── console.py
│   │   │   ├── emojis.py
│   │   │   ├── formatter.py
│   │   │   ├── logger.py
│   │   │   ├── panels.py
│   │   │   ├── progress.py
│   │   │   └── themes.py
│   │   ├── parse_yaml.py
│   │   ├── parsing.py
│   │   ├── security.py
│   │   └── text.py
│   └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/ultimate_mcp_server/core/tournaments/tasks.py:
--------------------------------------------------------------------------------

```python
"""
Tournament task implementations for asynchronous tournament execution.
"""
# Standard Library Imports
import asyncio
import random
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional

from ultimate_mcp_server.core.evaluation.base import EvaluationScore
from ultimate_mcp_server.core.models.tournament import (
    ModelConfig,
    ModelResponseData,
    TournamentData,
    TournamentRoundResult,
    TournamentStatus,
)
from ultimate_mcp_server.core.tournaments.manager import tournament_manager
from ultimate_mcp_server.core.tournaments.utils import (
    calculate_weighted_score,
    create_round_prompt,
    extract_thinking,
    generate_comparison_file_content,
    generate_leaderboard_file_content,
    save_model_response_content,
    update_overall_best_response,
)
from ultimate_mcp_server.tools.completion import generate_completion
from ultimate_mcp_server.tools.extraction import extract_code_from_response
from ultimate_mcp_server.utils.logging import get_logger

logger = get_logger("ultimate_mcp_server.tournaments.tasks")

# --- Global semaphore for concurrent model calls ---
MODEL_CALL_SEMAPHORE: Optional[asyncio.Semaphore] = None

def initialize_semaphore(max_concurrent_calls: int):
    global MODEL_CALL_SEMAPHORE
    MODEL_CALL_SEMAPHORE = asyncio.Semaphore(max_concurrent_calls)
    logger.info(f"Tournament task semaphore initialized with concurrency: {max_concurrent_calls}")

async def run_tournament_async(tournament_id: str):
    """Main async task to orchestrate the entire tournament."""
    await asyncio.sleep(0.1) # Small delay for state propagation
    
    tournament = tournament_manager.get_tournament(tournament_id, force_reload=True)
    if not tournament:
        logger.error(f"[TASK_ERROR] Tournament {tournament_id} not found for execution.")
        return

    if tournament.status != TournamentStatus.RUNNING: # Check if it was set to RUNNING
        logger.warning(f"[TASK_WARN] Tournament {tournament_id} not in RUNNING state ({tournament.status}). Aborting task.")
        return

    # --- Initialize semaphore based on tournament config ---
    if MODEL_CALL_SEMAPHORE is None or MODEL_CALL_SEMAPHORE._value != tournament.config.max_concurrent_model_calls:
        initialize_semaphore(tournament.config.max_concurrent_model_calls)

    logger.info(f"[TASK_START] Starting execution for tournament '{tournament.name}' (ID: {tournament_id})")

    try:
        if tournament.current_round < 0: # If just started
            tournament.current_round = 0 
        
        while tournament.current_round < tournament.config.rounds:
            # --- Check for cancellation before starting a round ---
            current_tournament_state = tournament_manager.get_tournament(tournament_id, force_reload=True)
            if not current_tournament_state or current_tournament_state.status == TournamentStatus.CANCELLED:
                logger.info(f"[TASK_CANCEL] Tournament {tournament_id} cancelled. Halting execution.")
                if current_tournament_state and current_tournament_state.status != TournamentStatus.CANCELLED: # Ensure it's marked
                     tournament_manager.update_tournament_status(tournament_id, TournamentStatus.CANCELLED, "Cancelled during execution.")
                return

            round_num = tournament.current_round
            logger.info(f"[ROUND_START] Processing Round {round_num}/{tournament.config.rounds -1 } for '{tournament.name}'")
            
            round_result_obj = tournament.rounds_results[round_num] # Assumes initialized by manager
            round_result_obj.status = TournamentStatus.RUNNING
            round_result_obj.start_time = datetime.now(timezone.utc)
            tournament_manager._save_tournament_state(tournament)

            await process_single_round(tournament, round_num, round_result_obj)

            round_result_obj.status = TournamentStatus.COMPLETED # Mark round as completed
            round_result_obj.end_time = datetime.now(timezone.utc)
            tournament_manager._save_tournament_state(tournament)
            logger.info(f"[ROUND_END] Round {round_num} for '{tournament.name}' completed.")

            # --- Update overall best response after each round ---
            update_overall_best_response(tournament) # Utility function to find and set best
            tournament_manager._save_tournament_state(tournament)

            tournament.current_round += 1
            tournament_manager._save_tournament_state(tournament) # Save progress

        tournament.status = TournamentStatus.COMPLETED
        logger.info(f"[TASK_COMPLETE] Tournament '{tournament.name}' (ID: {tournament_id}) completed successfully.")

    except Exception as e:
        logger.error(f"[TASK_FAILURE] Tournament '{tournament.name}' failed: {e}", exc_info=True)
        tournament.status = TournamentStatus.FAILED
        tournament.error_message = str(e)
    finally:
        tournament.end_time = datetime.now(timezone.utc)
        tournament_manager._save_tournament_state(tournament)
        logger.info(f"Final state saved for tournament {tournament_id}. Status: {tournament.status}")


async def process_single_round(tournament: TournamentData, round_num: int, round_result_obj: TournamentRoundResult):
    """Processes all model variants for a single round."""
    
    # Determine previous responses for synthesis rounds > 0
    previous_round_variant_responses: Dict[str, ModelResponseData] = {}
    if round_num > 0:
        prev_round_idx = round_num - 1
        if prev_round_idx < len(tournament.rounds_results):
            previous_round_result = tournament.rounds_results[prev_round_idx]
            previous_round_variant_responses = previous_round_result.responses # These are already ModelResponseData objects
        else:
            logger.warning(f"Could not find previous round {prev_round_idx} data for round {round_num}. Proceeding without it.")
    
    tasks = []
    for model_cfg in tournament.config.models:
        for i in range(model_cfg.diversity_count):
            variant_id = f"{model_cfg.model_id}/v{i}"
            
            # --- Check for cancellation before each model task ---
            current_tournament_state = tournament_manager.get_tournament(tournament.tournament_id, force_reload=True)
            if not current_tournament_state or current_tournament_state.status == TournamentStatus.CANCELLED:
                logger.info(f"[MODEL_TASK_CANCEL] Cancellation detected for tournament {tournament.tournament_id}. Skipping variant {variant_id}.")
                continue # Skip remaining tasks in this round

            # Skip if already processed (e.g., resuming a failed round)
            if variant_id in round_result_obj.responses and round_result_obj.responses[variant_id].response_text:
                logger.info(f"Variant {variant_id} for round {round_num} already processed. Skipping.")
                continue

            tasks.append(
                process_single_model_variant(
                    tournament,
                    model_cfg,
                    variant_id, # Pass the unique variant ID
                    round_num,
                    round_result_obj,
                    previous_round_variant_responses # Pass full ModelResponseData dict
                )
            )
    
    if not tasks:
        logger.info(f"No new model variants to process for round {round_num}.")
        round_result_obj.status = TournamentStatus.COMPLETED
        return

    logger.info(f"Gathering {len(tasks)} model variant tasks for round {round_num}.")
    # Await all tasks and catch any unhandled exceptions
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        logger.error(f"An error occurred during asyncio.gather in round {round_num}: {e}", exc_info=True)
        round_result_obj.error_message = (getattr(round_result_obj, 'error_message', '') or "") + f"; Error during task gathering: {str(e)}"
        round_result_obj.status = TournamentStatus.FAILED # Mark round as failed if gather fails
        # Individual task errors are handled within process_single_model_variant
        return

    # --- Generate comparison and leaderboard files ---
    # (Ensure these utils exist and are updated)
    comparison_md = generate_comparison_file_content(tournament, round_num)
    leaderboard_md = generate_leaderboard_file_content(tournament, round_num) # New utility

    round_storage_path = Path(tournament.storage_path) / f"round_{round_num}"
    round_storage_path.mkdir(parents=True, exist_ok=True)

    if comparison_md:
        comp_path = round_storage_path / "round_comparison_report.md"
        comp_path.write_text(comparison_md, encoding='utf-8')
        round_result_obj.comparison_file_path = str(comp_path)
    if leaderboard_md:
        lead_path = round_storage_path / "round_leaderboard.md"
        lead_path.write_text(leaderboard_md, encoding='utf-8')
        round_result_obj.leaderboard_file_path = str(lead_path)
    
    # Save state after generating reports
    tournament_manager._save_tournament_state(tournament)


async def process_single_model_variant(
    tournament: TournamentData,
    model_config: "ModelConfig", # Forward ref as string, ModelConfig is imported
    variant_id: str, # e.g., "openai/gpt-4o/v0"
    round_num: int,
    round_result_obj: TournamentRoundResult,
    previous_round_variant_responses: Dict[str, ModelResponseData]
):
    """Processes a single model variant (handles diversity), including retries and evaluation."""
    
    # --- Acquire semaphore ---
    if MODEL_CALL_SEMAPHORE: # Should always be initialized
      await MODEL_CALL_SEMAPHORE.acquire()
    
    response_data = ModelResponseData(
        model_id_original=model_config.model_id,
        model_id_variant=variant_id,
        round_num=round_num
    )
    task_start_time = time.monotonic()
    
    # --- Prepare storage paths if needed (handled in save_model_response_content) ---
    
    try:
        # --- Check for cancellation ---
        current_tournament_state = tournament_manager.get_tournament(tournament.tournament_id, force_reload=True)
        if not current_tournament_state or current_tournament_state.status == TournamentStatus.CANCELLED:
            response_data.error = "Tournament cancelled before model execution."
            logger.info(f"Model task {variant_id} skipped due to tournament cancellation.")
            raise asyncio.CancelledError("Tournament cancelled")


        prompt = create_round_prompt(
            tournament, 
            round_num, 
            previous_round_variant_responses,
            target_model_variant_id=variant_id # For personalized prompts if needed
        )

        # --- LLM Call with Retries ---
        current_attempt = 0
        llm_response_dict = None
        while current_attempt <= tournament.config.max_retries_per_model_call:
            try:
                logger.info(f"[MODEL_CALL_START] Attempt {current_attempt+1}/{tournament.config.max_retries_per_model_call+1} for {variant_id}, Round {round_num}")
                
                provider_id = model_config.model_id.split('/')[0] if '/' in model_config.model_id else None
                
                # Parameters that are direct arguments to the generate_completion tool
                tool_direct_params = {
                    "prompt": prompt,
                    "model": model_config.model_id, # Use original model_id for API call
                    "provider": provider_id,
                    "temperature": model_config.temperature,
                    # max_tokens is added conditionally below
                }
                if model_config.max_tokens is not None:
                    tool_direct_params["max_tokens"] = model_config.max_tokens

                # Parameters that should be passed via the 'additional_params' argument of the tool
                tool_additional_params = {}
                if model_config.system_prompt is not None:
                    tool_additional_params["system_prompt"] = model_config.system_prompt
                if model_config.seed is not None:
                    tool_additional_params["seed"] = model_config.seed
                # Example: if model_config had top_p, it would be added here too:
                # if hasattr(model_config, 'top_p') and model_config.top_p is not None:
                #    tool_additional_params["top_p"] = model_config.top_p

                llm_response_dict = await generate_completion(
                    **tool_direct_params,
                    additional_params=tool_additional_params
                )
                
                if llm_response_dict.get("success"):
                    logger.info(f"[MODEL_CALL_SUCCESS] {variant_id} successful on attempt {current_attempt+1}")
                    break # Success, exit retry loop
                else:
                    error_msg = llm_response_dict.get("error", "Unknown LLM error")
                    logger.warning(f"Attempt {current_attempt+1} for {variant_id} failed: {error_msg}")
                    if current_attempt == tournament.config.max_retries_per_model_call:
                        raise RuntimeError(f"LLM call failed after max retries: {error_msg}")
            
            except Exception as e: # Catch exceptions from generate_completion itself
                logger.warning(f"Exception on attempt {current_attempt+1} for {variant_id}: {e}")
                if current_attempt == tournament.config.max_retries_per_model_call:
                    raise RuntimeError(f"LLM call failed after max retries (exception): {e}") from e
            
            current_attempt += 1
            # Decorrelated jitter backoff
            sleep_time = random.uniform(
                tournament.config.retry_backoff_base_seconds, 
                tournament.config.retry_backoff_base_seconds * 1.5 * (2 ** (current_attempt -1))
            )
            # Max sleep to prevent overly long waits
            sleep_time = min(sleep_time, 30.0) # e.g., max 30s backoff
            logger.info(f"Retrying {variant_id} in {sleep_time:.2f} seconds...")
            await asyncio.sleep(sleep_time)
        
        # --- Process Successful LLM Response ---
        response_data.response_text = llm_response_dict.get("text", "")
        response_data.metrics.update({
            "input_tokens": llm_response_dict.get("tokens", {}).get("input"),
            "output_tokens": llm_response_dict.get("tokens", {}).get("output"),
            "cost": llm_response_dict.get("cost", 0.0),
            "latency_ms": int(llm_response_dict.get("processing_time", 0) * 1000),
            "api_model_id_used": llm_response_dict.get("model", model_config.model_id)
        })

        response_data.thinking_process = await extract_thinking(response_data.response_text)
        
        if tournament.config.tournament_type == "code":
            # Use the tool function for extraction
            extracted_code_string = await extract_code_from_response(
                response_text=response_data.response_text,
                model=tournament.config.extraction_model_id # Pass extraction_model_id as the model for extraction
                # timeout parameter uses its default from extract_code_from_response
            )
            if extracted_code_string: # Check if a non-empty string was returned
                 response_data.extracted_code = extracted_code_string.strip()
            else:
                 logger.warning(f"Code extraction returned empty or failed for {variant_id}. Original response length: {len(response_data.response_text or '')}")
                 response_data.extracted_code = None # Explicitly set to None on failure or empty string

        # --- Save response content ---
        # (This util saves the main readable MD and potentially the raw code file)
        saved_paths = await save_model_response_content(
            tournament_storage_path=Path(tournament.storage_path),
            round_num=round_num,
            variant_id=variant_id, # Use variant_id for unique filenames
            response_text=response_data.response_text,
            extracted_code=response_data.extracted_code,
            thinking_process=response_data.thinking_process,
            metrics=response_data.metrics,
            tournament_type=tournament.config.tournament_type
        )
        response_data.response_file_path = saved_paths.get("markdown_file")
        response_data.extracted_code_file_path = saved_paths.get("code_file")

        # --- Run Evaluations ---
        evaluators = tournament_manager.get_evaluators_for_tournament(tournament.tournament_id)
        if evaluators:
            logger.info(f"Running {len(evaluators)} evaluators for {variant_id}...")
            for evaluator_instance in evaluators:
                eval_config = next((e for e in tournament.config.evaluators if e.evaluator_id == evaluator_instance.config.get("evaluator_id_ref", evaluator_instance.evaluator_type)), None) # Find original config for ID

                eval_id_for_scores = eval_config.evaluator_id if eval_config else evaluator_instance.evaluator_type

                try:
                    eval_score_obj = await evaluator_instance.score(
                        response_data, # Pass the full ModelResponseData
                        tournament.config.prompt,
                        tournament.config.tournament_type
                    )
                    response_data.scores[eval_id_for_scores] = eval_score_obj.model_dump() # Store full score object
                    logger.debug(f"Evaluator '{eval_id_for_scores}' score for {variant_id}: {eval_score_obj.score}")
                except Exception as eval_e:
                    logger.error(f"Evaluator '{eval_id_for_scores}' failed for {variant_id}: {eval_e}", exc_info=True)
                    response_data.scores[eval_id_for_scores] = EvaluationScore(score=0.0, details=f"Evaluation error: {str(eval_e)}").model_dump()
            
            # Calculate overall weighted score
            response_data.overall_score = calculate_weighted_score(response_data.scores, tournament.config.evaluators)


    except asyncio.CancelledError: # Handle task cancellation gracefully
        logger.info(f"Task for {variant_id} in round {round_num} was cancelled.")
        response_data.error = "Task cancelled."
        response_data.metrics["final_status"] = "cancelled"
    except Exception as e:
        logger.error(f"[MODEL_TASK_FAILURE] Error processing {variant_id}: {e}", exc_info=True)
        response_data.error = str(e)
        response_data.metrics["final_status"] = "failed"
    finally:
        response_data.metrics["total_task_time_ms"] = int((time.monotonic() - task_start_time) * 1000)
        # --- Add response to the round_result_obj (which is part of tournament state) ---
        # This needs to be thread-safe if multiple tasks could update this concurrently,
        # but asyncio tasks run on a single thread, so direct assignment is fine here.
        # The `tournament` object itself is shared, so saving it needs care.
        round_result_obj.responses[variant_id] = response_data
        
        # Defer saving the full tournament state to the calling round processor
        # to batch saves, but log that this variant is done.
        logger.info(f"Finished processing variant {variant_id}. Error: {response_data.error is not None}")
        
        # --- Release semaphore ---
        if MODEL_CALL_SEMAPHORE:
          MODEL_CALL_SEMAPHORE.release()

async def process_single_model(
    model_id: str,
    prompt: str,
    tournament_id: str,
    round_num: int,
    is_code_tournament: bool,
    extraction_model_id: Optional[str] = None
) -> ModelResponseData:
    """
    Handles the logic for calling a single model provider using the generate_completion tool.
    """
    start_time = time.monotonic()
    logger.info(f"[MODEL TASK] Processing model {model_id} for round {round_num}")
    
    # Get tournament to access storage path
    tournament = tournament_manager.get_tournament(tournament_id)
    if not tournament:
        raise ValueError(f"Tournament {tournament_id} not found")
    
    # Setup storage paths
    round_storage_path = Path(tournament.storage_path) / f"round_{round_num}"
    round_storage_path.mkdir(exist_ok=True, parents=True)
    
    response_data = ModelResponseData(
        model_id_original=model_id,
        model_id_variant=model_id, # In this context, variant is the same as original
        round_num=round_num
    )
    extracted_code: Optional[str] = None  # noqa: F841
    file_extension = ".py" if is_code_tournament else ".md"
    
    provider_name = model_id.split('/')[0] if '/' in model_id else None # Infer provider from model_id if possible
    if not provider_name:
        logger.warning(f"[MODEL TASK] Could not infer provider from model_id: {model_id}. Attempting call without explicit provider.")
        # Note: generate_completion might fail if provider isn't specified and cannot be inferred

    try:
        # Use generate_completion tool
        logger.info(f"[MODEL TASK] Calling generate_completion for model {model_id} with prompt length {len(prompt)}")
        # Log prompt preview
        preview_length = 100
        prompt_preview = prompt[:preview_length] + "..." if len(prompt) > preview_length else prompt
        logger.info(f"[MODEL TASK] Prompt preview: {prompt_preview}")

        # Call the tool function directly
        completion_result_dict = await generate_completion(
            prompt=prompt,
            model=model_id, # Pass the full model ID
            provider=provider_name # Pass inferred provider
            # Add other params like max_tokens, temperature if needed/available in TournamentConfig
        )
        
        # Check for success
        if not completion_result_dict.get("success"):
            error_msg = completion_result_dict.get("error", "generate_completion tool indicated failure")
            raise RuntimeError(f"Completion failed for {model_id}: {error_msg}")

        # Extract data from the dictionary returned by the tool
        response_text = completion_result_dict.get("text", "")
        actual_model_used = completion_result_dict.get("model", model_id) # Use actual model if returned
        token_info = completion_result_dict.get("tokens", {})
        cost = completion_result_dict.get("cost", 0.0)
        processing_time_sec = completion_result_dict.get("processing_time", 0.0)
        latency_ms = int(processing_time_sec * 1000)

        # Log response preview
        response_preview = response_text[:preview_length] + "..." if len(response_text) > preview_length else response_text
        logger.info(f"[MODEL TASK] Response preview for {actual_model_used}: {response_preview}")

        # Extract metrics from the tool result
        completion_metrics = {
            "input_tokens": token_info.get("input"),
            "output_tokens": token_info.get("output"),
            "cost": cost,
            "latency_ms": latency_ms, # Use processing_time from tool
            "api_model_id_used": actual_model_used # Store the actual model ID used by the API
        }

        # Process response - use async extract_thinking
        thinking = await extract_thinking(response_text)
        code_metrics = {} # Placeholder for potential future code analysis metrics

        # Save response to file with better naming pattern
        timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
        safe_model_id = re.sub(r'[^a-zA-Z0-9_\-.]', '_', actual_model_used) # Use actual model name
        safe_tournament_id = re.sub(r'[^a-zA-Z0-9_\-.]', '_', tournament_id)

        filename_base = f"tournament_{safe_tournament_id}_round-{round_num}_model-{safe_model_id}_{timestamp}"
        raw_response_path = round_storage_path / f"{filename_base}{file_extension}"

        raw_response_path.write_text(response_text or "", encoding="utf-8")

        # Create a more user-friendly version with added context
        readable_content = f"""# Tournament Response
**Tournament ID:** {tournament_id}
**Round:** {round_num}
**Model (Configured):** {model_id}
**Model (Actual API):** {actual_model_used}
**Timestamp:** {datetime.now().isoformat()}
**Tokens:** {completion_metrics.get('input_tokens', 'N/A')} in, {completion_metrics.get('output_tokens', 'N/A')} out
**Cost:** ${completion_metrics.get('cost', 0.0):.6f}
**Latency:** {completion_metrics.get('latency_ms', 'N/A')}ms

## Prompt
```
{prompt}
```

## Response
```
{response_text}
```
"""
        readable_path = round_storage_path / f"{filename_base}_readable{file_extension}"
        readable_path.write_text(readable_content, encoding="utf-8")

        logger.info(f"[MODEL TASK] Saved response to: {readable_path}")

        # Populate response data
        # model_id_original and model_id_variant are already set
        response_data.response_text = response_text
        response_data.thinking_process = thinking
        response_data.metrics = {**completion_metrics, **code_metrics}
        response_data.timestamp = datetime.now(timezone.utc)
        response_data.response_file_path = str(raw_response_path) # Store path to raw response
        response_data.metrics["total_processing_time_ms"] = int((time.monotonic() - start_time) * 1000) # Keep overall task time

        logger.info(f"[MODEL TASK] Finished processing model {actual_model_used} for round {round_num} in {response_data.metrics['total_processing_time_ms']}ms")

    except Exception as e:
        logger.error(f"[MODEL TASK] Error processing model {model_id}: {e}", exc_info=True)
        response_data.error = str(e)
    
    return response_data

async def run_single_round_task(tournament_id: str, round_num: int):
    """
    Task that runs a single round of the tournament, including LLM calls.
    """
    logger.info(f"[ROUND TASK START] Running round {round_num} for tournament {tournament_id}")
    tournament = tournament_manager.get_tournament(tournament_id, force_reload=True)
    
    # --- Check if tournament exists or was cancelled before proceeding --- 
    if not tournament:
        logger.error(f"[ROUND TASK FAIL] Tournament {tournament_id} not found at start of round {round_num}.")
        return
    if tournament.status == TournamentStatus.CANCELLED:
        logger.info(f"[ROUND TASK ABORT] Tournament {tournament_id} was cancelled. Stopping round {round_num}.")
        # Ensure round status reflects cancellation if it was running
        if round_num < len(tournament.rounds_results):
             round_result = tournament.rounds_results[round_num]
             if round_result.status == TournamentStatus.RUNNING:
                  round_result.status = TournamentStatus.CANCELLED
                  round_result.error = "Cancelled by user request during execution."
                  round_result.end_time = datetime.now(timezone.utc)
                  tournament_manager._save_tournament_state(tournament)
        return
    # -------------------------------------------------------------------
    
    if round_num >= len(tournament.rounds_results):
        logger.error(f"[ROUND TASK FAIL] Invalid round number {round_num} for tournament {tournament_id} state.")
        return
    
    round_result = tournament.rounds_results[round_num]
    
    try:
        # Mark round as running
        round_result.status = TournamentStatus.RUNNING
        round_result.start_time = datetime.now(timezone.utc)
        tournament_manager._save_tournament_state(tournament)
        logger.info(f"[ROUND TASK] Round {round_num} marked as running")
        
        # Get tournament config
        is_code_tournament = tournament.config.tournament_type == "code"
        extraction_model_id = tournament.config.extraction_model_id
        
        # Create prompt for this round
        prompt = create_round_prompt(tournament, round_num)
        
        # Create tasks for all configured models
        model_tasks = []
        for model_config in tournament.config.models:
            model_id = model_config.model_id
            
            # Skip if already processed
            if model_id in round_result.responses:
                logger.info(f"[ROUND TASK] Skipping already processed model {model_id}")
                continue
            
            # Add task for this model
            task = process_single_model(
                model_id=model_id,
                prompt=prompt,
                tournament_id=tournament_id,
                round_num=round_num,
                is_code_tournament=is_code_tournament,
                extraction_model_id=extraction_model_id
            )
            model_tasks.append(task)
            logger.info(f"[ROUND TASK] Added task for model {model_id}")
        
        # Exit if no tasks to run
        if not model_tasks:
            logger.info(f"[ROUND TASK] No models to process for round {round_num}")
            round_result.status = TournamentStatus.COMPLETED
            round_result.end_time = datetime.now(timezone.utc)
            tournament_manager._save_tournament_state(tournament)
            return
        
        # Run all model tasks in parallel
        logger.info(f"[ROUND TASK] Running {len(model_tasks)} model tasks in parallel")
        results = await asyncio.gather(*model_tasks, return_exceptions=True)
        
        # Process results
        for i, result in enumerate(results):
            model_id = tournament.config.models[i].model_id
            
            # Handle exceptions
            if isinstance(result, Exception):
                logger.error(f"[ROUND TASK] Error processing model {model_id}: {result}", exc_info=True)
                continue
            
            # Store result
            round_result.responses[model_id] = result
            tournament_manager._save_tournament_state(tournament)
        
        # Create comparison file
        comparison_content = generate_comparison_file_content(tournament, round_num)
        if comparison_content:
            round_dir = Path(tournament.storage_path) / f"round_{round_num}"
            round_dir.mkdir(exist_ok=True)
            comparison_file = round_dir / "model_comparison.md"
            
            with open(comparison_file, 'w', encoding='utf-8') as f:
                f.write(comparison_content)
            
            # Store the path in round results
            round_result.comparison_file_path = str(comparison_file)
            tournament_manager._save_tournament_state(tournament)
        
        # Mark round as completed
        round_result.status = TournamentStatus.COMPLETED
        round_result.end_time = datetime.now(timezone.utc)
        tournament_manager._save_tournament_state(tournament)
        logger.info(f"[ROUND TASK COMPLETE] Round {round_num} for tournament {tournament_id} completed successfully")
        
        # If this was the last round, mark the tournament as completed
        if round_num == tournament.config.rounds - 1:
            tournament.status = TournamentStatus.COMPLETED
            tournament.end_time = datetime.now(timezone.utc)
            tournament_manager._save_tournament_state(tournament)
            logger.info(f"[ROUND TASK] Tournament {tournament_id} marked as completed after final round")
    
    except Exception as e:
        logger.error(f"[ROUND TASK ERROR] Error processing round {round_num}: {e}", exc_info=True)
        round_result.status = TournamentStatus.FAILED
        round_result.error = str(e)
        round_result.end_time = datetime.now(timezone.utc)
        tournament_manager._save_tournament_state(tournament)
        
        # Mark tournament as failed
        tournament.status = TournamentStatus.FAILED
        tournament.error_message = f"Failed during round {round_num}: {str(e)}"
        tournament.end_time = datetime.now(timezone.utc)
        tournament_manager._save_tournament_state(tournament) 

async def process_model_task(
    tournament: TournamentData,
    model_id: str,
    round_num: int,
    previous_round_responses: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
    """Process a single model task for the tournament using generate_completion tool.
    
    Args:
        tournament: Tournament data
        model_id: Model to use (e.g., 'openai/gpt-4o')
        round_num: Current round number
        previous_round_responses: Previous round responses (for rounds > 0)
        
    Returns:
        Model task result with response text and metrics
    """
    start_task_time = time.monotonic()
    # Infer provider from model_id format 'provider:model_name' or 'provider/model_name'
    provider_id = None
    if ':' in model_id:
        provider_id = model_id.split(':')[0]
    elif '/' in model_id: # Keep backward compatibility if '/' is used
        provider_id = model_id.split('/')[0]
        
    if not provider_id:
         logger.warning(f"[MODEL TASK] Could not infer provider from model_id: {model_id}. Attempting call without explicit provider.")
    
    try:
        logger.info(f"[MODEL TASK] Processing model {model_id} for round {round_num} (Provider: {provider_id})")
            
        # Generate prompt based on tournament type and round
        if round_num == 0:
            prompt = tournament.config.prompt
        else:
            prompt = create_round_prompt(tournament, round_num, previous_round_responses)
        
        # Generate completion using the tool
        logger.info(f"[MODEL TASK] Calling generate_completion for model {model_id} with prompt length {len(prompt)}")
        preview_length = 100
        prompt_preview = prompt[:preview_length] + "..." if len(prompt) > preview_length else prompt
        logger.info(f"[MODEL TASK] Prompt preview: {prompt_preview}")

        completion_result_dict = await generate_completion(
            prompt=prompt,
            model=model_id,
            provider=provider_id # Pass the inferred provider
            # Add other params like max_tokens, temperature if needed/available
        )

        # Check for success
        if not completion_result_dict.get("success"):
            error_msg = completion_result_dict.get("error", "generate_completion tool indicated failure")
            raise RuntimeError(f"Completion failed for {model_id}: {error_msg}")

        # Extract data from the result dictionary
        response_text = completion_result_dict.get("text", "")
        actual_model_used = completion_result_dict.get("model", model_id)
        token_info = completion_result_dict.get("tokens", {})
        cost = completion_result_dict.get("cost", 0.0)
        processing_time_sec = completion_result_dict.get("processing_time", 0.0)

        # Log response preview
        response_preview = response_text[:preview_length] + "..." if len(response_text) > preview_length else response_text
        logger.info(f"[MODEL TASK] Response preview for {actual_model_used}: {response_preview}")

        # Extract metrics from the tool result
        completion_metrics = {
            "input_tokens": token_info.get("input"),
            "output_tokens": token_info.get("output"),
            "cost": cost,
            "processing_time_ms": int(processing_time_sec * 1000) # Use tool's processing time
        }
        
        # Extract thinking/reasoning if present - use async extract_thinking
        thinking = await extract_thinking(response_text)
        
        # Save response to a file with timestamp - use async save_model_response
        response_file = await save_model_response_content(
            tournament_storage_path=Path(tournament.storage_path),
            round_num=round_num,
            variant_id=model_id, # Use model_id for unique filenames
            response_text=response_text,
            extracted_code=None, # No extracted code for this task
            thinking_process=thinking,
            metrics=completion_metrics,
            tournament_type=tournament.config.tournament_type
        )
        
        total_task_time_ms = int((time.monotonic() - start_task_time) * 1000)
        completion_metrics["total_task_time_ms"] = total_task_time_ms # Add overall task time

        logger.info(f"[MODEL TASK] Finished processing model {actual_model_used} for round {round_num} in {total_task_time_ms}ms (LLM time: {completion_metrics['processing_time_ms']}ms)")
        
        return {
            "model_id": actual_model_used, # Return actual model used
            "response_text": response_text,
            "thinking": thinking,
            "metrics": completion_metrics,
            "response_file": str(response_file.get("markdown_file")) if isinstance(response_file, dict) else str(response_file) # Ensure path is string
        }
    except Exception as e:
        logger.error(f"[MODEL TASK] Error processing model {model_id}: {str(e)}", exc_info=True)
        total_task_time_ms = int((time.monotonic() - start_task_time) * 1000)
        return {
            "model_id": model_id,
            "error": str(e),
            "response_text": f"Error generating response: {str(e)}",
            "thinking": None,
            "metrics": {
                "error": str(e), 
                "total_task_time_ms": total_task_time_ms,
                "processing_time_ms": None # LLM call failed
            },
            "response_file": None
        } 
```

--------------------------------------------------------------------------------
/examples/text_classification_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Text classification demonstration for Ultimate MCP Server.
This example showcases the comprehensive capabilities of the text_classification tool,
demonstrating various classification strategies, multi-label vs. single-label,
hierarchical categories, and more.
"""
import asyncio
import json
import os
import sys
import time
from collections import namedtuple  # Import namedtuple
from pathlib import Path

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich import box
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.rule import Rule
from rich.table import Table
from rich.text import Text

from ultimate_mcp_server.config import get_config
from ultimate_mcp_server.constants import Provider
from ultimate_mcp_server.tools.text_classification import (
    ClassificationStrategy,
    text_classification,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import CostTracker  # Import CostTracker
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger
logger = get_logger("example.text_classification")

# Create a separate debug console for detailed logging
debug_console = Console(stderr=True, highlight=False)

# Get configuration from centralized config system
gateway_config = get_config()
EXPORT_RESULTS = gateway_config.server.debug  # Using server.debug as a proxy for export results
RESULTS_DIR = os.path.join(gateway_config.storage_directory, "classification_results")
DEMO_TIMEOUT = 120  # Hard-coded default timeout for demo

# Cache for demonstration purposes
DEMO_RESULTS_CACHE = {}

# File paths for sample data
SAMPLE_DIR = Path(__file__).parent / "sample" / "text_classification_samples"
NEWS_SAMPLES_PATH = SAMPLE_DIR / "news_samples.txt"
PRODUCT_REVIEWS_PATH = SAMPLE_DIR / "product_reviews.txt" 
SUPPORT_TICKETS_PATH = SAMPLE_DIR / "support_tickets.txt"
EMAIL_SAMPLES_PATH = SAMPLE_DIR / "email_classification.txt"

# Create a simple structure for cost tracking from dict
TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])

# Helper Functions
def extract_samples_from_file(file_path):
    """Extract labeled samples from a text file."""
    with open(file_path, "r", encoding="utf-8") as file:
        content = file.read()
    
    samples = {}
    current_label = None
    current_content = []
    
    for line in content.split("\n"):
        if line.strip().endswith("SAMPLE:") or line.strip().endswith("EMAIL:") or line.strip().endswith("REVIEW:") or line.strip().endswith("ISSUE:") or line.strip().endswith("REPORT:") or line.strip().endswith("REQUEST:") or line.strip().endswith("QUESTION:"):
            # Save previous sample
            if current_label and current_content:
                samples[current_label] = "\n".join(current_content).strip()
            
            # Start new sample
            current_label = line.strip().rstrip(":")
            current_content = []
        elif line.strip() and current_label is not None:
            current_content.append(line)
    
    # Add the last sample
    if current_label and current_content:
        samples[current_label] = "\n".join(current_content).strip()
    
    return samples

def display_classification_result(result, title, text_sample=None, categories=None):
    """Display classification results in a rich formatted table."""
    # Create main table for classification results
    results_table = Table(title=title, box=box.ROUNDED, show_header=True, expand=True)
    results_table.add_column("Category", style="cyan", no_wrap=True)
    results_table.add_column("Confidence", style="green", justify="right")
    results_table.add_column("Explanation", style="white")
    
    for classification in result.get("classifications", []):
        confidence = classification.get("confidence", 0.0)
        confidence_str = f"{confidence:.4f}"
        confidence_color = "green" if confidence > 0.8 else "yellow" if confidence > 0.6 else "red"
        
        results_table.add_row(
            classification.get("category", "Unknown"),
            f"[{confidence_color}]{confidence_str}[/{confidence_color}]",
            escape(classification.get("explanation", ""))[:100] + ("..." if len(classification.get("explanation", "")) > 100 else "")
        )
    
    # Create metadata table
    meta_table = Table(show_header=False, box=None, expand=False)
    meta_table.add_column("Metric", style="cyan")
    meta_table.add_column("Value", style="white")
    meta_table.add_row("Provider", result.get("provider", "unknown"))
    meta_table.add_row("Model", result.get("model", "unknown"))
    meta_table.add_row("Processing Time", f"{result.get('processing_time', 0.0):.3f}s")
    meta_table.add_row("Input Tokens", str(result.get("tokens", {}).get("input", 0)))
    meta_table.add_row("Output Tokens", str(result.get("tokens", {}).get("output", 0)))
    meta_table.add_row("Cost", f"${result.get('cost', 0.0):.6f}")
    
    if "dominant_category" in result:
        meta_table.add_row("Dominant Category", result["dominant_category"])
    
    if "ensemble_models" in result:
        meta_table.add_row("Ensemble Models", ", ".join(result["ensemble_models"]))
        
    # Display text sample if provided
    if text_sample:
        text_panel = Panel(
            escape(text_sample[:300] + ("..." if len(text_sample) > 300 else "")),
            title="Sample Text",
            border_style="blue",
            expand=False
        )
        console.print(text_panel)
    
    # Display categories if provided
    if categories:
        cat_display = ""
        if isinstance(categories, dict):
            for parent, subcats in categories.items():
                cat_display += f"- {parent}\n"
                for sub in subcats:
                    cat_display += f"  - {parent}/{sub}\n"
        else:
            for cat in categories:
                cat_display += f"- {cat}\n"
        
        cat_panel = Panel(
            cat_display.strip(),
            title="Classification Categories",
            border_style="green",
            expand=False
        )
        console.print(cat_panel)
    
    # Display results and metadata
    console.print(results_table)
    console.print(meta_table)

async def demo_basic_classification(tracker: CostTracker): # Add tracker
    """Demonstrate basic single-label classification with zero-shot."""
    console.print(Rule("[bold blue]Basic Text Classification Demo[/bold blue]"))
    logger.info("Starting basic classification demo", emoji_key="start")
    
    # Load news samples
    news_samples = extract_samples_from_file(NEWS_SAMPLES_PATH)
    
    # Simple categories for news classification
    categories = [
        "Technology",
        "Sports",
        "Politics",
        "Health",
        "Entertainment",
        "Science",
        "Business",
        "Education"
    ]
    
    # Select a sample
    sample_key = "TECH NEWS SAMPLE"
    sample_text = news_samples[sample_key]
    
    logger.info(f"Classifying a {sample_key} with zero-shot strategy", emoji_key="processing")
    
    # Run classification
    start_time = time.time()
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying text...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,
            model="gpt-3.5-turbo",  # Using a simpler model for basic demo
            multi_label=False,
            confidence_threshold=0.5,
            strategy=ClassificationStrategy.ZERO_SHOT,
            explanation_detail="brief"
        )
    
    # Track cost if possible
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "unknown"),
                model=result.get("model", "unknown"),
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for basic classification: {track_err}", exc_info=False)

    # Record actual time (may differ from model reported time)
    elapsed_time = time.time() - start_time
    result["actual_processing_time"] = elapsed_time
    
    # Cache result for comparison
    DEMO_RESULTS_CACHE["basic"] = result
    
    # Export result if enabled
    if EXPORT_RESULTS:
        export_result("basic_classification", result, sample_text, categories)
    
    # Display result
    logger.success(f"Basic classification completed in {elapsed_time:.3f}s", emoji_key="success")
    display_classification_result(
        result,
        "Basic Single-Label Classification (Zero-Shot)",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

async def demo_multi_label_classification(tracker: CostTracker): # Add tracker
    """Demonstrate multi-label classification."""
    console.print(Rule("[bold blue]Multi-Label Classification Demo[/bold blue]"))
    logger.info("Starting multi-label classification demo", emoji_key="start")
    
    # Load support ticket samples
    ticket_samples = extract_samples_from_file(SUPPORT_TICKETS_PATH)
    
    # Select a complex sample that might have multiple labels
    sample_key = "BUG REPORT"
    sample_text = ticket_samples[sample_key]
    
    # Categories for support tickets
    categories = [
        "Bug Report",
        "Feature Request",
        "Account Issue",
        "Billing Question",
        "Technical Question",
        "Security Issue",
        "Performance Problem",
        "UI/UX Feedback"
    ]
    
    logger.info("Classifying support ticket with multi-label strategy", emoji_key="processing")
    
    # Run classification
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying with multiple labels...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,
            model="gpt-4-mini", # Using a better model for nuanced classification
            multi_label=True,
            confidence_threshold=0.3,  # Lower threshold to catch secondary categories
            strategy=ClassificationStrategy.STRUCTURED,
            explanation_detail="brief",
            max_results=3  # Get top 3 matching categories
        )
    
    # Track cost if possible
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "unknown"),
                model=result.get("model", "unknown"),
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for multi-label classification: {track_err}", exc_info=False)

    # Cache result for comparison
    DEMO_RESULTS_CACHE["multi_label"] = result
    
    # Display result
    logger.success("Multi-label classification completed", emoji_key="success")
    display_classification_result(
        result,
        "Multi-Label Classification (Structured)",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

async def demo_hierarchical_classification(tracker: CostTracker): # Add tracker
    """Demonstrate hierarchical category classification."""
    console.print(Rule("[bold blue]Hierarchical Classification Demo[/bold blue]"))
    logger.info("Starting hierarchical classification demo", emoji_key="start")
    
    # Load product review samples
    review_samples = extract_samples_from_file(PRODUCT_REVIEWS_PATH)
    
    # Select a sample
    sample_key = "POSITIVE REVIEW"
    sample_text = review_samples[sample_key]
    
    # Hierarchical categories for product reviews
    categories = {
        "Sentiment": ["Positive", "Negative", "Neutral"],
        "Product Type": ["Electronics", "Appliance", "Clothing", "Software"],
        "Aspect": ["Performance", "Quality", "Price", "Customer Service", "Design", "Usability"]
    }
    
    logger.info("Classifying product review with hierarchical categories", emoji_key="processing")
    
    # Run classification
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying with hierarchical categories...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,
            model="gpt-4-mini",
            multi_label=True,  # Allow selecting one from each hierarchy
            confidence_threshold=0.6,
            strategy=ClassificationStrategy.STRUCTURED,
            explanation_detail="brief",
            taxonomy_description=(
                "This taxonomy categorizes product reviews across multiple dimensions: "
                "the sentiment (overall positivity/negativity), the type of product being discussed, "
                "and the specific aspects of the product mentioned in the review."
            )
        )
    
    # Track cost if possible
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "unknown"),
                model=result.get("model", "unknown"),
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for hierarchical classification: {track_err}", exc_info=False)

    # Cache result for comparison
    DEMO_RESULTS_CACHE["hierarchical"] = result
    
    # Display result
    logger.success("Hierarchical classification completed", emoji_key="success")
    display_classification_result(
        result,
        "Hierarchical Multi-Label Classification",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

async def demo_few_shot_classification(tracker: CostTracker): # Add tracker
    """Demonstrate few-shot learning classification."""
    console.print(Rule("[bold blue]Few-Shot Classification Demo[/bold blue]"))
    logger.info("Starting few-shot classification demo", emoji_key="start")
    
    # Load email samples
    email_samples = extract_samples_from_file(EMAIL_SAMPLES_PATH)
    
    # Select a sample to classify
    sample_key = "PHISHING EMAIL"
    sample_text = email_samples[sample_key]
    
    # Categories for email classification
    categories = [
        "Spam",
        "Phishing",
        "Promotional",
        "Informational",
        "Urgent",
        "Personal",
        "Transactional"
    ]
    
    # Create example data for few-shot learning
    examples = [
        {
            "text": email_samples["SPAM EMAIL"],
            "categories": ["Spam"]
        },
        {
            "text": email_samples["PROMOTIONAL EMAIL"],
            "categories": ["Promotional"]
        },
        {
            "text": email_samples["PERSONAL EMAIL"],
            "categories": ["Personal"]
        }
    ]
    
    logger.info("Classifying email with few-shot learning", emoji_key="processing")
    
    # Run classification
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying with few-shot examples...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,
            model="gpt-3.5-turbo",  # Few-shot works well with simpler models
            multi_label=False,
            confidence_threshold=0.5,
            strategy=ClassificationStrategy.FEW_SHOT,
            examples=examples,
            explanation_detail="detailed"  # More detailed explanation
        )
    
    # Track cost if possible
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "unknown"),
                model=result.get("model", "unknown"),
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for few-shot classification: {track_err}", exc_info=False)

    # Cache result for comparison
    DEMO_RESULTS_CACHE["few_shot"] = result
    
    # Display examples provided
    example_table = Table(title="Few-Shot Examples Provided", box=box.SIMPLE)
    example_table.add_column("Example", style="cyan")
    example_table.add_column("Category", style="green")
    example_table.add_column("Text Sample", style="white", max_width=60)
    
    for i, example in enumerate(examples):
        example_table.add_row(
            f"Example {i+1}",
            ", ".join(example["categories"]),
            escape(example["text"][:100] + "...")
        )
    
    console.print(example_table)
    console.print()
    
    # Display result
    logger.success("Few-shot classification completed", emoji_key="success")
    display_classification_result(
        result,
        "Few-Shot Classification",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

async def demo_ensemble_classification(tracker: CostTracker): # Add tracker
    """Demonstrate ensemble classification using multiple providers."""
    console.print(Rule("[bold blue]Ensemble Classification Demo[/bold blue]"))
    logger.info("Starting ensemble classification demo", emoji_key="start")
    
    # Load support ticket samples again but use a different one
    ticket_samples = extract_samples_from_file(SUPPORT_TICKETS_PATH)
    
    # Select a complex sample
    sample_key = "FEATURE REQUEST"
    sample_text = ticket_samples[sample_key]
    
    # Categories for support tickets (same as before)
    categories = [
        "Bug Report",
        "Feature Request",
        "Account Issue",
        "Billing Question",
        "Technical Question",
        "Security Issue",
        "Performance Problem",
        "UI/UX Feedback"
    ]
    
    # Configure ensemble with multiple models
    ensemble_config = [
        {
            "provider": Provider.OPENAI.value,
            "model": "gpt-3.5-turbo",
            "weight": 0.3,
            "params": {"temperature": 0.1}
        },
        {
            "provider": Provider.OPENAI.value,
            "model": "gpt-4-mini",
            "weight": 0.7,
            "params": {"temperature": 0.1}
        }
        # In a real-world scenario, you might include models from different providers
    ]
    
    logger.info("Classifying support ticket with ensemble strategy", emoji_key="processing")
    
    # Run classification
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying with multiple models...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,  # Base provider (though ensemble will use multiple)
            multi_label=True,
            confidence_threshold=0.4,
            strategy=ClassificationStrategy.ENSEMBLE,
            explanation_detail="brief",
            ensemble_config=ensemble_config,
            allow_abstain=True,
            # abstention_threshold=0.4 # Optionally set abstention threshold
        )
    
    # Track cost (The tool result should contain aggregated cost/tokens)
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "ensemble"), # Provider is 'ensemble'
                model=result.get("model", "ensemble"), # Model is 'ensemble'
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for ensemble classification: {track_err}", exc_info=False)

    # Cache result for comparison
    DEMO_RESULTS_CACHE["ensemble"] = result
    
    # Display ensemble config
    ensemble_table = Table(title="Ensemble Configuration", box=box.SIMPLE)
    ensemble_table.add_column("Provider", style="cyan")
    ensemble_table.add_column("Model", style="green")
    ensemble_table.add_column("Weight", style="yellow")
    
    for config in ensemble_config:
        ensemble_table.add_row(
            config["provider"],
            config["model"],
            f"{config['weight']:.2f}"
        )
    
    console.print(ensemble_table)
    console.print()
    
    # Display result
    logger.success("Ensemble classification completed", emoji_key="success")
    display_classification_result(
        result,
        "Ensemble Classification",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

async def demo_custom_prompt_template(tracker: CostTracker): # Add tracker
    """Demonstrate classification with a custom prompt template."""
    console.print(Rule("[bold blue]Custom Prompt Template Demo[/bold blue]"))
    logger.info("Starting custom prompt template demo", emoji_key="start")
    
    # Load news samples again but use a different one
    news_samples = extract_samples_from_file(NEWS_SAMPLES_PATH)
    
    # Select a different sample
    sample_key = "SCIENCE NEWS SAMPLE"
    sample_text = news_samples[sample_key]
    
    # Simple categories for news classification
    categories = [
        "Technology",
        "Sports",
        "Politics",
        "Health",
        "Entertainment",
        "Science",
        "Business",
        "Education"
    ]
    
    # Create a custom prompt template
    custom_template = """
You are a highly specialized news classification assistant.

I need you to analyze the following text and determine which category it belongs to:
{categories}

When classifying, consider:
- The topic and subject matter
- The terminology and jargon used
- The intended audience
- The writing style and tone

CLASSIFICATION FORMAT:
{format_instruction}

TEXT TO CLASSIFY:
```
{text}
```

Please provide your analysis now.
"""
    
    logger.info("Classifying news with custom prompt template", emoji_key="processing")
    
    # Run classification
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        progress.add_task("Classifying with custom prompt...", total=None)
        result = await text_classification(
            text=sample_text,
            categories=categories,
            provider=Provider.OPENAI.value,
            model="gpt-4-mini",
            multi_label=False,
            confidence_threshold=0.5,
            strategy=ClassificationStrategy.STRUCTURED,
            explanation_detail="detailed",
            custom_prompt_template=custom_template
        )
    
    # Track cost if possible
    if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
        try:
            trackable = TrackableResult(
                cost=result.get("cost", 0.0),
                input_tokens=result.get("tokens", {}).get("input", 0),
                output_tokens=result.get("tokens", {}).get("output", 0),
                provider=result.get("provider", "unknown"),
                model=result.get("model", "unknown"),
                processing_time=result.get("processing_time", 0.0)
            )
            tracker.add_call(trackable)
        except Exception as track_err:
            logger.warning(f"Could not track cost for custom prompt classification: {track_err}", exc_info=False)

    # Cache result for comparison
    DEMO_RESULTS_CACHE["custom_prompt"] = result
    
    # Display custom prompt template
    prompt_panel = Panel(
        escape(custom_template),
        title="Custom Prompt Template",
        border_style="magenta",
        expand=False
    )
    console.print(prompt_panel)
    console.print()
    
    # Display result
    logger.success("Custom prompt classification completed", emoji_key="success")
    display_classification_result(
        result,
        "Classification with Custom Prompt",
        text_sample=sample_text,
        categories=categories
    )
    console.print()
    return True

def export_result(name, result, text_sample, categories):
    """Export classification result to JSON file."""
    if not os.path.exists(RESULTS_DIR):
        os.makedirs(RESULTS_DIR)
    
    # Create timestamp for filename
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    filename = f"{RESULTS_DIR}/{name}_{timestamp}.json"
    
    # Prepare data to export
    export_data = {
        "timestamp": time.time(),
        "datetime": time.strftime("%Y-%m-%d %H:%M:%S"),
        "result": result,
        "sample_text": text_sample,
        "categories": categories
    }
    
    # Write to file
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(export_data, f, indent=2)
    
    logger.info(f"Exported results to {filename}", emoji_key="save")

async def demo_comparison(tracker: CostTracker):
    """Compare different classification strategies."""
    console.print(Rule("[bold blue]Classification Strategies Comparison[/bold blue]"))
    logger.info("Comparing classification strategies", emoji_key="analytics")
    
    # Check if we have all cached results
    required_demos = ["basic", "multi_label", "hierarchical", "few_shot", "ensemble", "custom_prompt"]
    missing = [demo for demo in required_demos if demo not in DEMO_RESULTS_CACHE]
    
    if missing:
        logger.warning(f"Missing results for comparison: {', '.join(missing)}", emoji_key="warning")
        console.print("[yellow]Some demo results are missing for comparison. Run all demos first.[/yellow]")
        return False
    
    # Create a comparison table
    comparison = Table(title="Classification Strategies Comparison", box=box.ROUNDED)
    comparison.add_column("Strategy", style="cyan")
    comparison.add_column("Provider/Model", style="green")
    comparison.add_column("Tokens", style="yellow")
    comparison.add_column("Processing Time", style="magenta")
    comparison.add_column("Cost", style="red")
    comparison.add_column("Notes", style="white")
    
    for strategy, result in DEMO_RESULTS_CACHE.items():
        provider_model = f"{result.get('provider', 'unknown')}/{result.get('model', 'unknown')}"
        tokens = result.get("tokens", {}).get("total", 0)
        time_taken = f"{result.get('processing_time', 0.0):.3f}s"
        cost = f"${result.get('cost', 0.0):.6f}"
        
        # Add strategy-specific notes
        notes = ""
        if strategy == "basic":
            notes = "Simple and efficient for clear categories"
        elif strategy == "multi_label":
            notes = f"Found {len(result.get('classifications', []))} categories"
        elif strategy == "hierarchical":
            notes = "Effective for multi-dimensional taxonomies"
        elif strategy == "few_shot":
            notes = "Improved accuracy with example learning"
        elif strategy == "ensemble":
            notes = f"Aggregated {len(result.get('ensemble_models', []))} models"
        elif strategy == "custom_prompt":
            notes = "Tailored instruction for specific domain"
        
        # Format strategy name for display
        strategy_display = strategy.replace("_", " ").title()
        
        comparison.add_row(strategy_display, provider_model, str(tokens), time_taken, cost, notes)
    
    console.print(comparison)
    console.print()
    
    # Generate chart data for cost comparison
    costs = {k: v.get("cost", 0.0) for k, v in DEMO_RESULTS_CACHE.items()}
    tokens = {k: v.get("tokens", {}).get("total", 0) for k, v in DEMO_RESULTS_CACHE.items()}
    times = {k: v.get("processing_time", 0.0) for k, v in DEMO_RESULTS_CACHE.items()}
    
    # Create visual dashboard of results
    display_visual_dashboard(costs, tokens, times)
    
    # Export comparison data if enabled
    if EXPORT_RESULTS:
        comparison_data = {
            "timestamp": time.time(),
            "datetime": time.strftime("%Y-%m-%d %H:%M:%S"),
            "costs": costs,
            "tokens": tokens,
            "times": times,
            "full_results": DEMO_RESULTS_CACHE
        }
        with open(f"{RESULTS_DIR}/comparison_{time.strftime('%Y%m%d-%H%M%S')}.json", "w") as f:
            json.dump(comparison_data, f, indent=2)
        logger.info(f"Exported comparison data to {RESULTS_DIR}", emoji_key="save")
    
    # Display conclusion
    conclusion_panel = Panel(
        "Classification strategies comparison shows tradeoffs between accuracy, cost, and performance.\n\n"
        "- Zero-shot: Fastest and cheapest, good for simple categories\n"
        "- Few-shot: Better accuracy with examples, moderate cost increase\n"
        "- Hierarchical: Excellent for complex taxonomies, higher token usage\n"
        "- Ensemble: Highest accuracy but also highest cost and processing time\n"
        "- Custom prompt: Tailored for specific domains, good balance of accuracy and efficiency",
        title="Strategy Selection Guidelines",
        border_style="green",
        expand=False
    )
    console.print(conclusion_panel)
    
    return True

def display_visual_dashboard(costs, tokens, times):
    """Display a visual dashboard of classification metrics using Rich Layout."""
    # Create layout
    layout = Layout()
    
    # Split into sections
    layout.split(
        Layout(name="header", size=3),
        Layout(name="main", ratio=1)
    )
    
    # Split main section into columns
    layout["main"].split_row(
        Layout(name="costs", ratio=1),
        Layout(name="tokens", ratio=1),
        Layout(name="times", ratio=1)
    )
    
    # Create header
    header = Panel(
        Text("Classification Strategy Metrics", style="bold magenta"),
        box=box.ROUNDED
    )
    
    # Create visualization panels
    costs_panel = create_metric_panel(costs, "Classification Costs ($)", "red")
    tokens_panel = create_metric_panel(tokens, "Token Usage", "yellow")
    times_panel = create_metric_panel(times, "Processing Time (s)", "green")
    
    # Update layout
    layout["header"] = header
    layout["main"]["costs"] = costs_panel
    layout["main"]["tokens"] = tokens_panel
    layout["main"]["times"] = times_panel
    
    # Display dashboard
    console.print(layout)

def create_metric_panel(data, title, color):
    """Create a panel with visualization of metric data."""
    # Find max value for scaling
    max_value = max(data.values()) if data else 1
    scale_factor = 20  # Bar length scaling
    
    # Generate content
    content = ""
    for strategy, value in data.items():
        bar_length = int((value / max_value) * scale_factor) if max_value > 0 else 0
        bar = "█" * bar_length
        strategy_display = strategy.replace("_", " ").title()
        content += f"{strategy_display.ljust(15)} │ [{color}]{bar}[/{color}] {value:.4f}\n"
    
    return Panel(content, title=title, border_style=color)

async def run_all_demos(tracker: CostTracker): # Add tracker
    """Run all classification demos in sequence."""
    console.print(Rule("[bold magenta]Text Classification Comprehensive Demo[/bold magenta]"))
    logger.info("Starting comprehensive text classification demo", emoji_key="start")
    
    start_time = time.time()
    success = True
    
    # Create results directory if exporting is enabled
    if EXPORT_RESULTS and not os.path.exists(RESULTS_DIR):
        os.makedirs(RESULTS_DIR)
        logger.info(f"Created results directory at {RESULTS_DIR}", emoji_key="folder")
    
    # Setup live display for overall progress
    overall_progress = Table.grid(expand=True)
    overall_progress.add_column()
    overall_progress.add_row("[bold blue]Running Text Classification Demo Suite...[/bold blue]")
    overall_progress.add_row("[cyan]Press Ctrl+C to abort[/cyan]")
    
    try:
        # Create a live display that updates during the demo
        with Live(overall_progress, refresh_per_second=4, console=console):
            # Run demos with timeout protection
            demo_tasks = [
                asyncio.create_task(demo_basic_classification(tracker)), # Pass tracker
                asyncio.create_task(demo_multi_label_classification(tracker)), # Pass tracker
                asyncio.create_task(demo_hierarchical_classification(tracker)), # Pass tracker
                asyncio.create_task(demo_few_shot_classification(tracker)), # Pass tracker
                asyncio.create_task(demo_ensemble_classification(tracker)), # Pass tracker
                asyncio.create_task(demo_custom_prompt_template(tracker)) # Pass tracker
            ]
            
            # Run all demos with timeout
            completed, pending = await asyncio.wait(
                demo_tasks, 
                timeout=DEMO_TIMEOUT,
                return_when=asyncio.ALL_COMPLETED
            )
            
            # Cancel any pending tasks
            for task in pending:
                task.cancel()
                overall_progress.add_row(f"[yellow]Demo timed out after {DEMO_TIMEOUT}s[/yellow]")
            
            # Compare results if we have enough demos completed
            if len(completed) >= 3:  # Require at least 3 demos for comparison
                await demo_comparison(tracker)
        
    except asyncio.CancelledError:
        logger.warning("Demo was cancelled by user", emoji_key="cancel")
        success = False
    except Exception as e:
        logger.critical(f"Text classification demo failed: {str(e)}", emoji_key="critical", exc_info=True)
        console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
        success = False
    
    # Calculate total time
    total_time = time.time() - start_time
    
    # Display cost summary
    tracker.display_summary(console)

    if success:
        logger.success(f"Text Classification Demo Completed Successfully in {total_time:.2f}s!", emoji_key="complete")
        console.print(Rule(f"[bold magenta]Text Classification Demo Complete ({total_time:.2f}s)[/bold magenta]"))
        return 0
    else:
        logger.error(f"Text classification demo failed after {total_time:.2f}s", emoji_key="error")
        console.print(Rule("[bold red]Text Classification Demo Failed[/bold red]"))
        return 1

async def main():
    """Run the full text classification demo suite."""
    tracker = CostTracker() # Instantiate tracker
    try:
        return await run_all_demos(tracker) # Pass tracker
    except Exception as e:
        logger.critical(f"Demo failed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
        return 1

if __name__ == "__main__":
    # Check for environment variables and display configuration
    if EXPORT_RESULTS:
        console.print(f"[blue]Results will be exported to: {RESULTS_DIR}[/blue]")
    
    # Check if sample files exist
    if not all(path.exists() for path in [NEWS_SAMPLES_PATH, PRODUCT_REVIEWS_PATH, SUPPORT_TICKETS_PATH, EMAIL_SAMPLES_PATH]):
        console.print("[bold red]Error:[/bold red] Sample data files not found. Please ensure all sample files exist in examples/sample/text_classification_samples/")
        sys.exit(1)
        
    # Run the demo
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/text.py:
--------------------------------------------------------------------------------

```python
"""Text processing utilities for Ultimate MCP Server."""
import re
import string
from typing import Any, Dict, List, Optional

from ultimate_mcp_server.utils import get_logger

logger = get_logger(__name__)


def truncate_text(text: str, max_length: int, add_ellipsis: bool = True) -> str:
    """Intelligently truncate text to a maximum length at natural boundaries.
    
    This function shortens text to fit within a specified maximum length while
    attempting to preserve semantic coherence by cutting at natural text boundaries
    like the end of sentences or paragraphs. This produces more readable truncated
    text compared to simple character-based truncation.
    
    The truncation algorithm:
    1. If text is already shorter than max_length, return it unchanged
    2. Otherwise, truncate at max_length character position
    3. Look backwards for a natural boundary (., ?, !, or paragraph break)
    4. If a good boundary is found beyond 80% of max_length, truncate there
    5. Optionally add an ellipsis to indicate truncation has occurred
    
    This approach is useful for:
    - Creating text previews or snippets
    - Fitting text into UI components with size constraints
    - Preparing content for displays with character limits
    - Generating summaries while maintaining readability
    
    Args:
        text: Text to truncate. Can be any length, including empty.
        max_length: Maximum character length of the returned text (not including ellipsis)
        add_ellipsis: Whether to append "..." to truncated text (default: True)
        
    Returns:
        Truncated text, optionally with ellipsis. If the input text is shorter than
        max_length, it's returned unchanged. If the input is None or empty, it's returned as is.
        
    Examples:
        >>> # Basic truncation
        >>> truncate_text("This is a long sentence that needs truncation.", 20)
        'This is a long...'
        
        >>> # Truncation at sentence boundary
        >>> truncate_text("Short sentence. Another sentence. Yet another one.", 20)
        'Short sentence...'
        
        >>> # Without ellipsis
        >>> truncate_text("A very long text to truncate.", 10, add_ellipsis=False)
        'A very lon'
        
        >>> # No truncation needed
        >>> truncate_text("Short text", 20)
        'Short text'
    """
    if not text or len(text) <= max_length:
        return text
        
    # Try to truncate at sentence boundary
    truncated = text[:max_length]
    
    # Find the last sentence boundary in the truncated text
    last_boundary = max(
        truncated.rfind('. '), 
        truncated.rfind('? '), 
        truncated.rfind('! '),
        truncated.rfind('\n\n')
    )
    
    if last_boundary > max_length * 0.8:  # Only truncate at boundary if it's not too short
        truncated = truncated[:last_boundary + 1]
    
    # Add ellipsis if requested and text was truncated
    if add_ellipsis and len(text) > len(truncated):
        truncated = truncated.rstrip() + "..."
        
    return truncated


def count_tokens(text: str, model: Optional[str] = None) -> int:
    """Estimate the number of tokens in text for LLM processing.
    
    This function calculates how many tokens would be consumed when sending text
    to a language model. It uses model-specific tokenizers when possible (via tiktoken)
    for accurate counts, or falls back to a character-based heuristic estimation when
    tokenizers aren't available.
    
    Token count is important for:
    - Estimating LLM API costs (which are typically billed per token)
    - Ensuring text fits within model context windows
    - Optimizing content to minimize token usage
    - Debugging token-related issues in model interactions
    
    The function selects the appropriate tokenizer based on the model parameter:
    - For GPT-4o models: Uses the "gpt-4o" tokenizer
    - For Claude models: Uses "cl100k_base" tokenizer as an approximation
    - For other models or when model is not specified: Uses "cl100k_base" (works well for most recent models)
    
    If the tiktoken library isn't available, the function falls back to character-based
    estimation, which applies heuristics based on character types to approximate token count.
    
    Args:
        text: Text string to count tokens for. Can be any length, including empty.
        model: Optional model name to select the appropriate tokenizer. Common values include
               "gpt-4o", "gpt-4", "claude-3-5-haiku-20241022", "claude-3-sonnet", etc.
        
    Returns:
        Estimated number of tokens in the text. Returns 0 for empty input.
        
    Examples:
        >>> count_tokens("Hello, world!")  # Using default tokenizer
        3
        
        >>> count_tokens("GPT-4 is a multimodal model.", model="gpt-4o") 
        7
        
        >>> # Using fallback estimation if tiktoken is not available
        >>> # (actual result may vary based on the estimation algorithm)
        >>> count_tokens("This is a fallback example")  
        6
        
    Dependencies:
        - Requires the "tiktoken" library for accurate counting
        - Falls back to character-based estimation if tiktoken is not available
        
    Note:
        The character-based fallback estimation is approximate and may differ
        from actual tokenization, especially for non-English text, code, or 
        text with many special characters or numbers.
    """
    if not text:
        return 0
        
    # Try to use tiktoken if available (accurate for OpenAI models)
    try:
        import tiktoken
        
        # Select encoding based on model
        if model and model.startswith("gpt-4o"):
            encoding = tiktoken.encoding_for_model("gpt-4o")
        elif model and "claude" in model.lower():
            # For Claude, use cl100k_base as approximation
            encoding = tiktoken.get_encoding("cl100k_base")
        else:
            # Default to cl100k_base (used by most recent models)
            encoding = tiktoken.get_encoding("cl100k_base")
            
        return len(encoding.encode(text))
        
    except ImportError:
        # Fallback to character-based estimation if tiktoken is not available
        return _estimate_tokens_by_chars(text)


def _estimate_tokens_by_chars(text: str) -> int:
    """Estimate token count using character-based heuristics when tokenizers aren't available.
    
    This internal fallback function provides a rough approximation of token count based on
    character analysis when the preferred tokenizer-based method (via tiktoken) is not available.
    It applies various heuristics based on observed tokenization patterns across common models.
    
    The estimation algorithm works as follows:
    1. Use a base ratio of 4.0 characters per token (average for English text)
    2. Count the total number of characters in the text
    3. Apply adjustments based on character types:
       - Whitespace: Count separately and add with reduced weight (0.5)
         since whitespace is often combined with other characters in tokens
       - Digits: Count separately and subtract weight (0.5)
         since numbers are often encoded more efficiently
    4. Calculate the final token estimate based on adjusted character count
    
    While not as accurate as model-specific tokenizers, this approach provides a reasonable
    approximation that works across different languages and text types. The approximation
    tends to be more accurate for:
    - Plain English text with standard punctuation
    - Text with a typical mix of words and whitespace
    - Content with a moderate number of special characters
    
    The estimation may be less accurate for:
    - Text with many numbers or special characters
    - Code snippets or markup languages
    - Non-Latin script languages
    - Very short texts (under 10 characters)
    
    Args:
        text: Text string to estimate token count for
        
    Returns:
        Estimated number of tokens (always at least 1 for non-empty text)
        
    Note:
        This function is intended for internal use by count_tokens() as a fallback when
        tiktoken is not available. It always returns at least 1 token for any non-empty text.
    """
    # Character-based estimation (rough approximation)
    avg_chars_per_token = 4.0
    
    # Count characters
    char_count = len(text)
    
    # Account for whitespace more efficiently representing tokens
    whitespace_count = sum(1 for c in text if c.isspace())
    
    # Count numbers (numbers are often encoded efficiently)
    digit_count = sum(1 for c in text if c.isdigit())
    
    # Adjust total count based on character types
    adjusted_count = char_count + (whitespace_count * 0.5) - (digit_count * 0.5)
    
    # Estimate tokens
    return max(1, int(adjusted_count / avg_chars_per_token))


def normalize_text(
    text: str,
    lowercase: bool = True,
    remove_punctuation: bool = False,
    remove_whitespace: bool = False,
    remove_urls: bool = False,
    remove_numbers: bool = False,
) -> str:
    """Normalize text with configurable cleaning options for text processing.
    
    This function standardizes text by applying various normalization procedures
    based on the specified parameters. It's useful for preparing text for natural
    language processing tasks, text comparison, search operations, and other scenarios
    where consistent formatting is important.
    
    The function applies normalizations in a specific order to ensure consistent results:
    1. Lowercase conversion (if enabled)
    2. URL removal (if enabled)
    3. Number removal (if enabled)
    4. Punctuation removal (if enabled)
    5. Whitespace normalization (if enabled)
    
    Each normalization step is optional and controlled by a separate parameter,
    allowing precise control over the transformations applied.
    
    Args:
        text: The input text to normalize. Can be any length, including empty.
        lowercase: Whether to convert text to lowercase (default: True)
        remove_punctuation: Whether to remove all punctuation marks (default: False)
        remove_whitespace: Whether to replace all whitespace sequences (spaces, tabs,
                          newlines) with a single space and trim leading/trailing
                          whitespace (default: False)
        remove_urls: Whether to remove web URLs (http, https, www) (default: False)
        remove_numbers: Whether to remove all numeric digits (default: False)
        
    Returns:
        Normalized text with requested transformations applied. Empty input
        text is returned unchanged.
        
    Examples:
        >>> # Default behavior (only lowercase)
        >>> normalize_text("Hello World! Visit https://example.com")
        'hello world! visit https://example.com'
        
        >>> # Multiple normalizations
        >>> normalize_text("Hello, World! 123", 
        ...                lowercase=True, 
        ...                remove_punctuation=True,
        ...                remove_numbers=True)
        'hello world'
        
        >>> # URL and whitespace normalization
        >>> normalize_text("Check   https://example.com   for more info!",
        ...                remove_urls=True,
        ...                remove_whitespace=True)
        'Check for more info!'
        
    Notes:
        - Removing punctuation eliminates all symbols in string.punctuation
        - URL removal uses a regex pattern that matches common URL formats
        - When remove_whitespace is True, all sequences of whitespace are collapsed
          to a single space and leading/trailing whitespace is removed
    """
    if not text:
        return text
        
    # Convert to lowercase
    if lowercase:
        text = text.lower()
        
    # Remove URLs
    if remove_urls:
        text = re.sub(r'https?://\S+|www\.\S+', '', text)
        
    # Remove numbers
    if remove_numbers:
        text = re.sub(r'\d+', '', text)
        
    # Remove punctuation
    if remove_punctuation:
        text = text.translate(str.maketrans('', '', string.punctuation))
        
    # Normalize whitespace
    if remove_whitespace:
        text = re.sub(r'\s+', ' ', text).strip()
        
    return text


def extract_key_phrases(text: str, max_phrases: int = 5, min_word_length: int = 3) -> List[str]:
    """Extract key phrases from text using statistical methods.
    
    This function identifies significant phrases from the input text using a frequency-based
    approach. It works by normalizing the text, splitting it into sentences, extracting
    candidate noun phrases through regex pattern matching, and then ranking them by frequency.
    The approach is language-agnostic and works well for medium to large text passages.
    
    The extraction process follows these steps:
    1. Normalize the input text (lowercase, preserve punctuation, remove URLs)
    2. Split the text into individual sentences
    3. Within each sentence, identify potential noun phrases using regex patterns
       that match 1-3 word sequences where at least one word meets the minimum length
    4. Count phrase frequency across the entire text
    5. Sort phrases by frequency (most frequent first)
    6. Return the top N phrases based on the max_phrases parameter
    
    Args:
        text: Source text from which to extract key phrases
        max_phrases: Maximum number of phrases to return, default is 5
        min_word_length: Minimum length (in characters) for a word to be considered
                        in phrase extraction, default is 3
        
    Returns:
        List of key phrases sorted by frequency (most frequent first).
        Returns an empty list if input text is empty or no phrases are found.
        
    Examples:
        >>> extract_key_phrases("The quick brown fox jumps over the lazy dog. The dog was too lazy to react.")
        ['the dog', 'lazy', 'quick brown fox']
        
        >>> extract_key_phrases("Machine learning is a field of study that gives computers the ability to learn without being explicitly programmed.", max_phrases=3)
        ['machine learning', 'field of study', 'computers']
        
    Notes:
        - The function works best on longer text passages with repeated key concepts
        - The approach prioritizes frequency over linguistic sophistication, so it's
          more effective for factual text than creative writing
        - For better results on short texts, consider decreasing min_word_length
        List of key phrases
    """
    if not text:
        return []
        
    # Normalize text
    normalized = normalize_text(
        text,
        lowercase=True,
        remove_punctuation=False,
        remove_whitespace=True,
        remove_urls=True,
    )
    
    # Split into sentences
    sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', normalized)
    
    # Extract phrases (simple noun phrases)
    phrases = []
    for sentence in sentences:
        # Find potential noun phrases
        np_matches = re.finditer(
            r'\b((?:(?:[A-Za-z]+\s+){0,2}[A-Za-z]{%d,})|(?:[A-Za-z]{%d,}))\b' % 
            (min_word_length, min_word_length),
            sentence
        )
        for match in np_matches:
            phrase = match.group(0).strip()
            if len(phrase.split()) <= 3:  # Limit to 3-word phrases
                phrases.append(phrase)
    
    # Count phrase frequency
    phrase_counts = {}
    for phrase in phrases:
        if phrase in phrase_counts:
            phrase_counts[phrase] += 1
        else:
            phrase_counts[phrase] = 1
    
    # Sort by frequency
    sorted_phrases = sorted(
        phrase_counts.items(), 
        key=lambda x: x[1], 
        reverse=True
    )
    
    # Return top phrases
    return [phrase for phrase, _ in sorted_phrases[:max_phrases]]


def split_text_by_similarity(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
    """Split text into chunks of similar size at natural boundaries.
    
    This function divides a long text into smaller, semantically meaningful chunks while
    respecting natural text boundaries. It intelligently searches for boundaries like
    paragraph breaks, sentence endings, commas, or whitespace to ensure chunks don't break
    in the middle of a sentence or idea, which is important for text processing tasks
    like semantic analysis, embedding generation, or LLM processing.
    
    The chunking algorithm works as follows:
    1. If the text is shorter than the chunk_size, return it as a single chunk
    2. Otherwise, iteratively:
       a. Determine the target end position (start + chunk_size)
       b. Search for natural boundaries near the target end, prioritizing:
          - Paragraph breaks (\n\n)
          - Sentence boundaries (. followed by space and capital letter)
          - Commas
          - Any whitespace
       c. Split at the best available boundary
       d. Move the start position for the next chunk, including overlap
       e. Repeat until the entire text is processed
    
    Args:
        text: The text to split into chunks
        chunk_size: Target size of each chunk in characters (default: 1000)
        overlap: Number of characters to overlap between chunks (default: 100)
                This helps maintain context between chunks for tasks like semantic search
        
    Returns:
        List of text chunks. If the input text is empty or less than chunk_size,
        returns a list containing only the original text.
        
    Examples:
        >>> text = "Paragraph one.\\n\\nParagraph two. This is a sentence. And another.\\n\\nParagraph three."
        >>> chunks = split_text_by_similarity(text, chunk_size=30, overlap=5)
        >>> chunks
        ['Paragraph one.', 'Paragraph two. This is a sentence.', ' This is a sentence. And another.', 'Paragraph three.']
        
    Notes:
        - The function prioritizes finding natural boundaries over strictly adhering to chunk_size
        - With small chunk_size values and complex texts, some chunks may exceed chunk_size
          if no suitable boundary is found within a reasonable range
        - The overlap parameter helps maintain context between chunks, which is important
          for tasks like semantic search or text analysis
    """
    if not text or len(text) <= chunk_size:
        return [text]
    
    # Define boundary patterns in order of preference
    boundaries = [
        r'\n\s*\n',  # Double newline (paragraph)
        r'\.\s+[A-Z]',  # End of sentence
        r',\s+',  # Comma with space
        r'\s+',  # Any whitespace
    ]
    
    chunks = []
    start = 0
    
    while start < len(text):
        # Determine end position for this chunk
        end = min(start + chunk_size, len(text))
        
        # If we're not at the end of the text, find a good boundary
        if end < len(text):
            # Try each boundary pattern in order
            for pattern in boundaries:
                # Search for the boundary pattern before the end position
                search_area = text[max(start, end - chunk_size // 4):end]
                matches = list(re.finditer(pattern, search_area))
                
                if matches:
                    # Found a good boundary, adjust end position
                    match_end = matches[-1].end()
                    end = max(start, end - chunk_size // 4) + match_end
                    break
        
        # Extract the chunk
        chunk = text[start:end]
        chunks.append(chunk)
        
        # Move to the next chunk with overlap
        start = end - overlap
    
    return chunks


def sanitize_text(text: str, allowed_tags: Optional[List[str]] = None) -> str:
    """Sanitize text by removing potentially harmful elements.
    
    This function cleans input text by removing potentially dangerous HTML/XML content
    that could lead to XSS (Cross-Site Scripting) or other injection attacks. It strips
    out script tags, style tags, HTML comments, and by default removes all HTML markup
    unless specific tags are explicitly allowed via the allowed_tags parameter.
    
    The sanitization process follows these steps:
    1. Remove all <script> tags and their contents (highest security priority)
    2. Remove all <style> tags and their contents (to prevent CSS-based attacks)
    3. Remove all HTML comments (which might contain sensitive information)
    4. Process HTML tags based on the allowed_tags parameter:
       - If allowed_tags is None: remove ALL HTML tags
       - If allowed_tags is provided: keep only those specific tags, remove all others
    5. Convert HTML entities like &amp;, &lt;, etc. to their character equivalents
    
    Args:
        text: The text to sanitize, potentially containing unsafe HTML/XML content
        allowed_tags: Optional list of HTML tags to preserve (e.g., ["p", "br", "strong"]).
                     If None (default), all HTML tags will be removed.
        
    Returns:
        Sanitized text with dangerous elements removed and HTML entities decoded.
        The original string is returned if the input is empty.
        
    Examples:
        >>> sanitize_text("<p>Hello <script>alert('XSS')</script> World</p>")
        'Hello  World'
        
        >>> sanitize_text("<p>Hello <b>Bold</b> World</p>", allowed_tags=["b"])
        'Hello <b>Bold</b> World'
        
        >>> sanitize_text("Safe &amp; sound")
        'Safe & sound'
        
    Note:
        While this function provides basic sanitization, it is not a complete defense
        against all possible injection attacks. For highly sensitive applications,
        consider using specialized HTML sanitization libraries like bleach or html-sanitizer.
    """
    if not text:
        return text
    
    # Remove script tags and content
    text = re.sub(r'<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>', '', text)
    
    # Remove style tags and content
    text = re.sub(r'<style\b[^<]*(?:(?!<\/style>)<[^<]*)*<\/style>', '', text)
    
    # Remove comments
    text = re.sub(r'<!--.*?-->', '', text)
    
    # Handle HTML tags based on allowed_tags
    if allowed_tags:
        # Allow specified tags but remove all others
        allowed_pattern = '|'.join(allowed_tags)  # noqa: F841
        
        # Function to process tag matches
        def tag_replacer(match):
            tag = match.group(1).lower()
            if tag in allowed_tags:
                return match.group(0)
            else:
                return ''
                
        # Replace tags not in allowed_tags
        text = re.sub(r'<(\w+)(?:\s[^>]*)?(?:\/?>|>.*?<\/\1>)', tag_replacer, text)
    else:
        # Remove all HTML tags
        text = re.sub(r'<[^>]*>', '', text)
    
    # Convert HTML entities
    text = _convert_html_entities(text)
    
    return text


def _convert_html_entities(text: str) -> str:
    """Convert common HTML entities to their corresponding characters.
    
    This internal utility function translates HTML entity references (both named and numeric)
    into their equivalent Unicode characters. It handles common named entities like &amp;, 
    &lt;, &gt;, as well as decimal (&#123;) and hexadecimal (&#x7B;) numeric entity references.
    
    The conversion process:
    1. Replace common named entities with their character equivalents using a lookup table
    2. Convert decimal numeric entities (&#nnn;) to characters using int() and chr()
    3. Convert hexadecimal numeric entities (&#xhh;) to characters using int(hex, 16) and chr()
    
    This function is primarily used internally by sanitize_text() to ensure that entity-encoded
    content is properly decoded after HTML tag processing.
    
    Args:
        text: String containing HTML entities to convert
        
    Returns:
        String with HTML entities replaced by their corresponding Unicode characters.
        If the input is empty or contains no entities, the original string is returned.
        
    Examples:
        >>> _convert_html_entities("&lt;div&gt;")
        '<div>'
        
        >>> _convert_html_entities("Copyright &copy; 2023")
        'Copyright © 2023'
        
        >>> _convert_html_entities("&#65;&#66;&#67;")
        'ABC'
        
        >>> _convert_html_entities("&#x41;&#x42;&#x43;")
        'ABC'
        
    Limitations:
        - Only handles a subset of common named entities (amp, lt, gt, quot, apos, nbsp)
        - Entity references must be properly formed (e.g., &amp; not &amp )
        - Doesn't validate that numeric references point to valid Unicode code points
    """
    # Define common HTML entities
    entities = {
        '&amp;': '&',
        '&lt;': '<',
        '&gt;': '>',
        '&quot;': '"',
        '&apos;': "'",
        '&nbsp;': ' ',
    }
    
    # Replace each entity
    for entity, char in entities.items():
        text = text.replace(entity, char)
    
    # Handle numeric entities
    text = re.sub(r'&#(\d+);', lambda m: chr(int(m.group(1))), text)
    text = re.sub(r'&#x([0-9a-f]+);', lambda m: chr(int(m.group(1), 16)), text)
    
    return text


def extract_structured_data(text: str, patterns: Dict[str, str]) -> Dict[str, Any]:
    """Extract structured key-value data from text using regex patterns.
    
    This function applies a set of regular expression patterns to extract specific
    information from unstructured text, converting it into a structured dictionary.
    It's useful for parsing semi-structured text like logs, emails, reports, or any
    content that follows consistent patterns.
    
    Each key in the patterns dictionary represents a field name in the output,
    while the corresponding value is a regex pattern used to extract that field's
    value from the input text. If the pattern contains capturing groups, the first
    group's match is used as the value; otherwise, the entire match is used.
    
    Features:
    - Multi-field extraction in a single pass
    - Case-insensitive matching by default
    - Support for multi-line patterns with DOTALL mode
    - Capturing group extraction for fine-grained control
    - Automatic whitespace trimming of extracted values
    
    Args:
        text: Source text to extract data from
        patterns: Dictionary mapping field names to regex patterns.
                 Example: {"email": r"Email:\\s*([^\\s@]+@[^\\s@]+\\.[^\\s@]+)",
                          "phone": r"Phone:\\s*(\\d{3}[-\\.\\s]??\\d{3}[-\\.\\s]??\\d{4})"}
        
    Returns:
        Dictionary with field names as keys and extracted values as strings.
        Only fields with successful matches are included in the result.
        Returns an empty dictionary if the input text is empty or no patterns match.
        
    Examples:
        >>> text = "Name: John Doe\\nEmail: [email protected]\\nAge: 30"
        >>> patterns = {
        ...     "name": r"Name:\\s*(.*?)(?:\\n|$)",
        ...     "email": r"Email:\\s*([^\\s@]+@[^\\s@]+\\.[^\\s@]+)",
        ...     "age": r"Age:\\s*(\\d+)"
        ... }
        >>> extract_structured_data(text, patterns)
        {'name': 'John Doe', 'email': '[email protected]', 'age': '30'}
        
        >>> # Using a pattern without capturing groups
        >>> extract_structured_data("Status: Active", {"status": r"Status: \\w+"})
        {'status': 'Status: Active'}
        
        >>> # No matches
        >>> extract_structured_data("Empty content", {"field": r"NotFound"})
        {}
        
    Tips:
        - Use captured groups (parentheses) to extract specific parts of matches
        - Make patterns as specific as possible to avoid false positives
        - For complex multi-line extractions, use (?s) flag in your regex or rely on
          the built-in DOTALL mode this function applies
        - Remember to escape special regex characters when matching literals
    """
    if not text:
        return {}
    
    result = {}
    
    # Apply each pattern
    for field, pattern in patterns.items():
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        if match:
            # If the pattern has groups, use the first group
            if match.groups():
                result[field] = match.group(1).strip()
            else:
                result[field] = match.group(0).strip()
    
    return result


def find_text_similarity(text1: str, text2: str) -> float:
    """Calculate text similarity using character n-grams and Jaccard similarity.
    
    This function measures the similarity between two text strings using character-level
    trigrams (3-character sequences) and the Jaccard similarity coefficient. This approach
    provides a language-agnostic way to detect similarity that works well for:
    
    - Fuzzy matching of text fragments
    - Detecting near-duplicate content
    - Finding related sentences or paragraphs
    - Language-independent text comparisons
    
    The algorithm works as follows:
    1. Normalize both texts (lowercase, remove excess whitespace)
    2. Generate character trigrams (sets of all 3-character sequences) for each text
    3. Calculate Jaccard similarity: |A ∩ B| / |A ∪ B|
       (size of intersection divided by size of union)
    
    This approach emphasizes shared character patterns rather than exact word matches,
    making it robust to minor spelling variations, word order changes, and formatting
    differences.
    
    Args:
        text1: First text string to compare
        text2: Second text string to compare
        
    Returns:
        Similarity score as a float between 0.0 (completely different) and 
        1.0 (identical after normalization). Returns 0.0 if either input is empty.
        For very short texts (<3 chars), returns 1.0 if both are identical after
        normalization, otherwise 0.0.
        
    Examples:
        >>> find_text_similarity("hello world", "hello world")
        1.0
        
        >>> find_text_similarity("hello world", "world hello")
        0.6153846153846154  # High similarity despite word order change
        
        >>> find_text_similarity("color", "colour")
        0.5714285714285714  # Handles spelling variations
        
        >>> find_text_similarity("completely different", "nothing alike")
        0.0
        
    Performance note:
        The function builds complete sets of trigrams for both texts, which can
        consume significant memory for very large inputs. Consider chunking or
        sampling when processing large documents.
    """
    if not text1 or not text2:
        return 0.0
    
    # Normalize texts
    text1 = normalize_text(text1, lowercase=True, remove_whitespace=True)
    text2 = normalize_text(text2, lowercase=True, remove_whitespace=True)
    
    # Generate character trigrams
    def get_trigrams(s):
        return set(s[i:i+3] for i in range(len(s) - 2))
        
    trigrams1 = get_trigrams(text1)
    trigrams2 = get_trigrams(text2)
    
    # Find common trigrams
    common = trigrams1.intersection(trigrams2)
    
    # Calculate Jaccard similarity
    if not trigrams1 and not trigrams2:
        return 1.0  # Both strings are too short for trigrams
    
    return len(common) / max(1, len(trigrams1.union(trigrams2)))


def get_text_stats(text: str) -> Dict[str, Any]:
    """Analyze text to compute various linguistic and structural statistics.
    
    This function calculates a comprehensive set of metrics that characterize the
    input text, including volume metrics (character/word counts), structural features
    (sentence/paragraph counts), and readability indicators (average word/sentence length).
    It also estimates the number of tokens that would be consumed by LLM processing.
    
    These statistics are useful for:
    - Assessing text complexity and readability
    - Estimating processing costs for LLM operations
    - Content analysis and comparison
    - Enforcing length constraints in applications
    - Debugging text processing pipelines
    
    The function uses regex-based analyses to identify linguistic boundaries
    (words, sentences, paragraphs) and delegates token estimation to the count_tokens
    function, which uses model-specific tokenizers when available.
    
    The metrics provided in the output dictionary:
    - char_count: Total number of characters in the text, including whitespace.
    - word_count: Total number of words, using word boundary regex (\\b\\w+\\b).
      This counts sequences of alphanumeric characters as words.
    - sentence_count: Number of sentences, detected by looking for periods, 
      question marks, or exclamation points followed by spaces, with special
      handling for common abbreviations to reduce false positives.
    - paragraph_count: Number of paragraphs, determined by double newline 
      sequences (\\n\\n) which typically indicate paragraph breaks.
    - avg_word_length: Average length of words in characters, rounded to 
      one decimal place. Provides a simple readability indicator.
    - avg_sentence_length: Average number of words per sentence, rounded to
      one decimal place. Higher values typically indicate more complex text.
    - estimated_tokens: Estimated number of tokens for LLM processing using 
      the count_tokens function, which uses model-specific tokenizers when
      available or falls back to character-based estimation.
    
    Args:
        text: The text to analyze. Can be any length, including empty text.
        
    Returns:
        A dictionary containing the linguistic and structural statistics described above.
        For empty input, returns a dictionary with all values set to 0.
        
    Examples:
        >>> stats = get_text_stats("Hello world. This is a sample text with two sentences.")
        >>> stats['word_count']
        10
        >>> stats['sentence_count']
        2
        >>> stats['avg_word_length']
        4.2
        
        >>> # Multiple paragraphs
        >>> text = "First paragraph with multiple sentences. Second sentence here.\\n\\n"
        >>> text += "Second paragraph. This has shorter sentences."
        >>> stats = get_text_stats(text)
        >>> stats['paragraph_count']
        2
        >>> stats['sentence_count']
        4
        
        >>> # Empty input
        >>> get_text_stats("")
        {'char_count': 0, 'word_count': 0, 'sentence_count': 0, 'paragraph_count': 0, 
         'avg_word_length': 0, 'avg_sentence_length': 0, 'estimated_tokens': 0}
    """
    if not text:
        return {
            "char_count": 0,
            "word_count": 0,
            "sentence_count": 0,
            "paragraph_count": 0,
            "avg_word_length": 0,
            "avg_sentence_length": 0,
            "estimated_tokens": 0,
        }
    
    # Character count
    char_count = len(text)
    
    # Word count
    words = re.findall(r'\b\w+\b', text)
    word_count = len(words)
    
    # Sentence count
    sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', text)
    sentence_count = len([s for s in sentences if s.strip()])
    
    # Paragraph count
    paragraphs = re.split(r'\n\s*\n', text)
    paragraph_count = len([p for p in paragraphs if p.strip()])
    
    # Average word length
    avg_word_length = sum(len(word) for word in words) / max(1, word_count)
    
    # Average sentence length (in words)
    avg_sentence_length = word_count / max(1, sentence_count)
    
    # Estimated tokens
    estimated_tokens = count_tokens(text)
    
    return {
        "char_count": char_count,
        "word_count": word_count,
        "sentence_count": sentence_count,
        "paragraph_count": paragraph_count,
        "avg_word_length": round(avg_word_length, 1),
        "avg_sentence_length": round(avg_sentence_length, 1),
        "estimated_tokens": estimated_tokens,
    }


def preprocess_text(text: str) -> str:
    """Standardize and clean text for machine learning and NLP tasks.
    
    This function applies a series of transformations to normalize input text
    into a standardized format suitable for classification, embedding generation,
    semantic analysis, and other natural language processing tasks. It focuses on
    removing noise and irregularities that could interfere with ML/NLP model performance
    while preserving the semantic content of the text.
    
    Transformations applied:
    1. Whitespace normalization: Collapses multiple spaces, tabs, newlines into single spaces
    2. Control character removal: Strips non-printable ASCII control characters
    3. Punctuation normalization: Reduces excessive repeated punctuation (e.g., "!!!!!!" → "!!!")
    4. Length truncation: For extremely long texts, preserves beginning and end with a
       truncation marker in the middle to stay under token limits
    
    This preprocessing is particularly useful for:
    - Text classification tasks where consistent input format is important
    - Before vectorization or embedding generation
    - Preparing text for input to language models
    - Reducing noise in text analytics
    
    Args:
        text: The input text to preprocess. Can be any length, including empty.
        
    Returns:
        Preprocessed text with standardized formatting. The original text is returned
        if it's empty. For extremely long inputs (>100,000 chars), returns a truncated
        version preserving the beginning and end portions.
        
    Examples:
        >>> preprocess_text("Hello   world!!!\nHow are\t\tyou?")
        'Hello world!!! How are you?'
        
        >>> preprocess_text("Too much punctuation!!!!!!!!")
        'Too much punctuation!!!'
        
        >>> preprocess_text("") 
        ''
        
    Note:
        This function preserves case, punctuation, and special characters (beyond control chars),
        as these may be semantically relevant for many NLP tasks. For more aggressive normalization,
        consider using the normalize_text() function with appropriate parameters.
    """
    if not text:
        return text
    
    # Clean up whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Remove control characters
    text = re.sub(r'[\x00-\x09\x0B\x0C\x0E-\x1F\x7F]', '', text)
    
    # Remove excessive punctuation repetition
    text = re.sub(r'([.!?]){3,}', r'\1\1\1', text)
    
    # Truncate if extremely long (preserve beginning and end)
    max_chars = 100000  # Reasonable limit to prevent token explosion
    if len(text) > max_chars:
        half = max_chars // 2
        text = text[:half] + " [...text truncated...] " + text[-half:]
        
    return text
```

--------------------------------------------------------------------------------
/examples/meta_api_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Meta API Tool Demonstration Script.

This script demonstrates the functionality of the APIMetaTool class for dynamically
registering and using external APIs via their OpenAPI specifications.

The demo features:
1. Registering APIs with the MCP server using their OpenAPI specifications
2. Listing registered APIs and their endpoints
3. Getting detailed information about an API and its endpoints
4. Calling dynamically registered tools
5. Refreshing an API to update its endpoints
6. Getting detailed information about a specific tool
7. Unregistering APIs

We use the Swagger Petstore API as our primary demo example along with additional
public APIs for multi-API demonstrations.
"""
import asyncio
import json
import sys
import time
from pathlib import Path
from typing import Any, Dict, List

# Add project root to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent))

from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm, Prompt
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table

import ultimate_mcp_server.core  # To access the global gateway instance
from ultimate_mcp_server import create_app
from ultimate_mcp_server.tools.meta_api_tool import APIMetaTool  # Import class for type hinting

# Initialize Rich console
console = Console()

# Demo APIs to showcase
DEMO_APIS = {
    "petstore": {
        "name": "petstore", 
        "url": "https://petstore.swagger.io/v2/swagger.json",
        "description": "Swagger Petstore API - A sample API for pet store management"
    },
    "weather": {
        "name": "weather",
        "url": "https://api.met.no/weatherapi/locationforecast/2.0/openapi.json",
        "description": "Norwegian Meteorological Institute API - Weather forecast data"
    },
    "mock": {
        "name": "mockapi",
        "url": "https://fastapimockserver.onrender.com/openapi.json",
        "description": "Mock API Server - A simple mock API for testing"
    }
}

# Default API to use for demos
DEFAULT_API = "petstore"


async def show_intro():
    """Display an introduction to the demo."""
    console.clear()
    console.print("\n[bold cyan]META API TOOL DEMONSTRATION[/bold cyan]", justify="center")
    console.print("[italic]Dynamically register and use any OpenAPI-compatible API[/italic]", justify="center")
    console.print("\n")
    
    # Display APIs that will be used in demo
    table = Table(title="Demo APIs", box=None, highlight=True, border_style="blue")
    table.add_column("API Name", style="cyan")
    table.add_column("Description", style="green")
    table.add_column("OpenAPI URL", style="blue")
    
    for api_info in DEMO_APIS.values():
        table.add_row(api_info["name"], api_info["description"], api_info["url"])
    
    console.print(Panel(table, border_style="blue", title="Available APIs", expand=False))
    
    # Display introduction as markdown
    intro_md = """
    ## Welcome to the Meta API Tool Demo
    
    This demonstration shows how to use the Meta API Tool to:
    - Register external APIs dynamically
    - Access API endpoints as tools
    - Call external services seamlessly
    """
    console.print(Markdown(intro_md))
    console.print("\n")


async def register_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Register an API with the MCP server using its OpenAPI specification.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API to register
        
    Returns:
        Result of the registration
    """
    console.print(Rule(f"[bold blue]REGISTERING API: {api_name.upper()}[/bold blue]"))
    
    api_info = DEMO_APIS.get(api_name)
    if not api_info:
        console.print(f"[bold red]Error:[/bold red] API '{api_name}' not found in demo configuration.")
        return {}
    
    # Show the API details before registration
    console.print(Panel(
        f"[bold]API Name:[/bold] {api_info['name']}\n"
        f"[bold]OpenAPI URL:[/bold] {api_info['url']}\n"
        f"[bold]Description:[/bold] {api_info['description']}",
        title="API Registration Details",
        border_style="green",
        expand=False
    ))
    
    console.print("[cyan]> Fetching OpenAPI specification from URL and registering tools...[/cyan]")
    
    start_time = time.time()
    with Progress(
        SpinnerColumn(),
        TextColumn("[bold green]Registering API..."),
        transient=True
    ) as progress:
        task = progress.add_task("", total=None)  # noqa: F841
        try:
            # Use the passed-in instance directly
            result = await api_meta_tool.register_api(
                api_name=api_info["name"],
                openapi_url=api_info["url"]
            )
            processing_time = time.time() - start_time
            
            # Show success message
            console.print(f"[bold green]✓ Success![/bold green] API registered in {processing_time:.2f}s")
            
            # Display registered tools in a table
            if result.get("tools_count", 0) > 0:
                table = Table(
                    title=f"Registered {result['tools_count']} Tools",
                    box=None,
                    highlight=True,
                    border_style="blue"
                )
                table.add_column("Tool Name", style="cyan")
                
                for tool in result.get("tools_registered", []):
                    table.add_row(tool)
                
                console.print(Panel(table, border_style="green", expand=False))
            else:
                console.print("[yellow]No tools were registered for this API.[/yellow]")
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error during API registration:[/bold red] {str(e)}")
            return {}


async def list_apis_demo(api_meta_tool: APIMetaTool) -> Dict[str, Any]:
    """List all registered APIs and their tools.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        
    Returns:
        Result containing information about registered APIs
    """
    console.print(Rule("[bold blue]LISTING REGISTERED APIs[/bold blue]"))
    
    with console.status("[bold green]Fetching registered APIs...", spinner="dots"):
        try:
            result = await api_meta_tool.list_registered_apis()
            
            if result.get("total_apis", 0) > 0:
                # Display registered APIs in a table
                table = Table(
                    title=f"Registered APIs ({result['total_apis']})", 
                    box=None,
                    highlight=True,
                    border_style="blue"
                )
                table.add_column("API Name", style="cyan")
                table.add_column("Base URL", style="blue")
                table.add_column("Tools Count", style="green", justify="right")
                
                for api_name, api_info in result.get("apis", {}).items():
                    table.add_row(
                        api_name, 
                        api_info.get("base_url", "N/A"), 
                        str(api_info.get("tools_count", 0))
                    )
                
                console.print(Panel(table, border_style="green", expand=False))
                console.print(f"[green]Total Tools: {result.get('total_tools', 0)}[/green]")
            else:
                console.print("[yellow]No APIs are currently registered.[/yellow]")
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error listing APIs:[/bold red] {str(e)}")
            return {}


async def get_api_details_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Get detailed information about a registered API.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API to get details for
        
    Returns:
        API details
    """
    console.print(Rule(f"[bold blue]API DETAILS: {api_name.upper()}[/bold blue]"))
    
    with console.status(f"[bold green]Fetching details for {api_name} API...", spinner="dots"):
        try:
            result = await api_meta_tool.get_api_details(api_name=api_name)
            
            # Display API overview
            console.print(Panel(
                f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
                f"[bold]Base URL:[/bold] {result.get('base_url', 'N/A')}\n"
                f"[bold]OpenAPI URL:[/bold] {result.get('openapi_url', 'N/A')}\n"
                f"[bold]Endpoints Count:[/bold] {result.get('endpoints_count', 0)}",
                title="API Overview",
                border_style="green",
                expand=False
            ))
            
            # Display endpoints in a table
            endpoints = result.get("tools", [])
            if endpoints:
                table = Table(
                    title=f"API Endpoints ({len(endpoints)})",
                    box=None,
                    highlight=True,
                    border_style="blue"
                )
                table.add_column("Tool Name", style="cyan")
                table.add_column("Method", style="magenta", justify="center")
                table.add_column("Path", style="blue")
                table.add_column("Summary", style="green")
                
                for endpoint in endpoints:
                    table.add_row(
                        endpoint.get("name", "N/A"),
                        endpoint.get("method", "N/A").upper(),
                        endpoint.get("path", "N/A"),
                        endpoint.get("summary", "No summary") or "No summary"
                    )
                
                console.print(Panel(table, border_style="green", expand=False))
            else:
                console.print("[yellow]No endpoints found for this API.[/yellow]")
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error getting API details:[/bold red] {str(e)}")
            return {}


async def get_tool_details_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Get detailed information about a specific tool from an API.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API that contains the tool
        
    Returns:
        Tool details
    """
    console.print(Rule(f"[bold blue]TOOL DETAILS DEMO FOR {api_name.upper()}[/bold blue]"))
    
    # First get the API details to find available tools
    with console.status(f"[bold green]Fetching available tools for {api_name} API...", spinner="dots"):
        try:
            api_details = await api_meta_tool.get_api_details(api_name=api_name)
            
            if not api_details.get("tools", []):
                console.print(f"[yellow]No tools available for {api_name} API.[/yellow]")
                return {}
            
            # Find a suitable GET tool for demo purposes
            tools = api_details.get("tools", [])
            get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
            
            if get_tools:
                # Prefer a GET tool with path parameters for a more interesting demo
                path_param_tools = [t for t in get_tools if "{" in t.get("path", "")]
                if path_param_tools:
                    selected_tool = path_param_tools[0]
                else:
                    selected_tool = get_tools[0]
            else:
                # If no GET tools, just pick the first tool
                selected_tool = tools[0]
            
            tool_name = selected_tool.get("name", "")
            console.print(f"[cyan]Selected tool for details:[/cyan] [bold]{tool_name}[/bold]")
            
            # Get detailed information about the selected tool
            with console.status(f"[bold green]Fetching details for {tool_name}...", spinner="dots"):
                result = await api_meta_tool.get_tool_details(tool_name=tool_name)
                
                # Display tool overview
                console.print(Panel(
                    f"[bold]Tool Name:[/bold] {result.get('tool_name', 'N/A')}\n"
                    f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
                    f"[bold]Method:[/bold] {result.get('method', 'N/A').upper()}\n"
                    f"[bold]Path:[/bold] {result.get('path', 'N/A')}\n"
                    f"[bold]Summary:[/bold] {result.get('summary', 'No summary') or 'No summary'}\n"
                    f"[bold]Description:[/bold] {result.get('description', 'No description') or 'No description'}",
                    title="Tool Overview",
                    border_style="green",
                    expand=False
                ))
                
                # Display parameters if any
                parameters = result.get("parameters", [])
                if parameters:
                    param_table = Table(
                        title="Tool Parameters",
                        box=None,
                        highlight=True,
                        border_style="blue"
                    )
                    param_table.add_column("Name", style="cyan")
                    param_table.add_column("Type", style="blue")
                    param_table.add_column("Required", style="green", justify="center")
                    param_table.add_column("In", style="magenta")
                    param_table.add_column("Description", style="yellow")
                    
                    for param in parameters:
                        param_type = param.get("schema", {}).get("type", "string")
                        required = "✓" if param.get("required", False) else "-"
                        param_in = param.get("in", "query")
                        description = param.get("description", "No description") or "No description"
                        
                        param_table.add_row(
                            param.get("name", "N/A"),
                            param_type,
                            required,
                            param_in,
                            description
                        )
                    
                    console.print(Panel(param_table, border_style="green", expand=False))
                else:
                    console.print("[yellow]This tool has no parameters.[/yellow]")
                
                # Display source code
                source_code = result.get("source_code", "Source code not available")
                if len(source_code) > 500:
                    # Truncate long source code for display purposes
                    source_code = source_code[:500] + "\n\n[...truncated...]"
                
                console.print(Panel(
                    Syntax(source_code, "python", theme="monokai", line_numbers=True),
                    title="Tool Source Code",
                    border_style="green",
                    expand=False
                ))
                
                return result
        except Exception as e:
            console.print(f"[bold red]Error getting tool details:[/bold red] {str(e)}")
            return {}


async def call_tool_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Call a dynamically registered tool from an API.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API that contains the tool
        
    Returns:
        Result of the tool call
    """
    console.print(Rule(f"[bold blue]CALLING TOOL FROM {api_name.upper()}[/bold blue]"))
    
    # First get the API details to find available tools
    with Progress(
        SpinnerColumn(),
        TextColumn(f"[bold green]Fetching available tools for {api_name} API..."),
        transient=True
    ) as progress:
        task = progress.add_task("", total=None)  # noqa: F841
        try:
            api_details = await api_meta_tool.get_api_details(api_name=api_name)
            
            if not api_details.get("tools", []):
                console.print(f"[yellow]No tools available for {api_name} API.[/yellow]")
                return {}
            
            # Find a suitable GET tool for demo purposes
            tools = api_details.get("tools", [])
            
            # For Petstore API, use specific endpoints for better demonstration
            if api_name == "petstore":
                # Try to find the "findPetsByStatus" endpoint, which is a good demo endpoint
                pet_status_tools = [t for t in tools if "findPetsByStatus" in t.get("name", "")]
                if pet_status_tools:
                    selected_tool = pet_status_tools[0]
                else:
                    # Fall back to "getInventory" which doesn't need parameters
                    inventory_tools = [t for t in tools if "getInventory" in t.get("name", "")]
                    if inventory_tools:
                        selected_tool = inventory_tools[0]
                    else:
                        # Just pick a GET endpoint if specific ones not found
                        get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
                        selected_tool = get_tools[0] if get_tools else tools[0]
            else:
                # For other APIs, prefer GET endpoints without path parameters for simplicity
                get_tools = [t for t in tools if t.get("method", "").lower() == "get" and "{" not in t.get("path", "")]
                if get_tools:
                    selected_tool = get_tools[0]
                else:
                    # Fall back to any GET endpoint
                    get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
                    if get_tools:
                        selected_tool = get_tools[0]
                    else:
                        # Just pick the first tool if no GET tools
                        selected_tool = tools[0]
            
            tool_name = selected_tool.get("name", "")
            tool_method = selected_tool.get("method", "").upper()
            tool_path = selected_tool.get("path", "")
            tool_summary = selected_tool.get("summary", "No summary") or "No summary"
            
            console.print(Panel(
                f"[bold]Selected Tool:[/bold] {tool_name}\n"
                f"[bold]Method:[/bold] {tool_method}\n"
                f"[bold]Path:[/bold] {tool_path}\n"
                f"[bold]Summary:[/bold] {tool_summary}",
                title="Tool Information",
                border_style="green",
                expand=False
            ))
            
            # Get tool details to determine parameters
            tool_details = await api_meta_tool.get_tool_details(tool_name=tool_name)
            parameters = tool_details.get("parameters", [])
            
            # Prepare inputs based on the tool
            inputs = {}
            
            # For Petstore API, use specific values
            if api_name == "petstore":
                if "findPetsByStatus" in tool_name:
                    inputs = {"status": "available"}
                    console.print("[cyan]Using input:[/cyan] status=available")
                elif "getPetById" in tool_name:
                    inputs = {"petId": 1}
                    console.print("[cyan]Using input:[/cyan] petId=1")
            else:
                # For other tools, add required parameters
                required_params = [p for p in parameters if p.get("required", False)]
                if required_params:
                    console.print("[yellow]This tool requires parameters. Using default values for demo.[/yellow]")
                    
                    for param in required_params:
                        param_name = param.get("name", "")
                        param_type = param.get("schema", {}).get("type", "string")
                        param_in = param.get("in", "query")
                        
                        # Assign default values based on parameter type
                        if param_type == "integer":
                            inputs[param_name] = 1
                        elif param_type == "number":
                            inputs[param_name] = 1.0
                        elif param_type == "boolean":
                            inputs[param_name] = True
                        else:  # string or other
                            inputs[param_name] = "test"
                        
                        console.print(f"[cyan]Using input:[/cyan] {param_name}={inputs[param_name]} ({param_in})")
            
            # Call the tool
            console.print("\n[bold]Calling the tool...[/bold]")
            start_time = time.time()
            with console.status(f"[bold green]Executing {tool_name}...", spinner="dots"):
                result = await api_meta_tool.call_dynamic_tool(
                    tool_name=tool_name,
                    inputs=inputs
                )
                processing_time = time.time() - start_time
                
            console.print(f"[bold green]✓ Success![/bold green] Tool executed in {processing_time:.2f}s")
            
            # Display result as formatted JSON
            result_json = json.dumps(result, indent=2)
            console.print(Panel(
                Syntax(result_json, "json", theme="monokai", line_numbers=True),
                title="Tool Response",
                border_style="green",
                expand=False
            ))
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error calling tool:[/bold red] {str(e)}")
            return {}


async def list_tools_demo(api_meta_tool: APIMetaTool) -> Dict[str, Any]:
    """List all dynamically registered tools.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        
    Returns:
        Result with information about all available tools
    """
    console.print(Rule("[bold blue]LISTING ALL AVAILABLE TOOLS[/bold blue]"))
    
    with console.status("[bold green]Fetching available tools...", spinner="dots"):
        try:
            result = await api_meta_tool.list_available_tools()
            
            tools = result.get("tools", [])
            if tools:
                table = Table(
                    title=f"Available Tools ({len(tools)})",
                    box=None,
                    highlight=True,
                    border_style="blue"
                )
                table.add_column("Tool Name", style="cyan")
                table.add_column("API Name", style="magenta")
                table.add_column("Method", style="green", justify="center")
                table.add_column("Path", style="blue")
                table.add_column("Summary", style="yellow")
                
                for tool in tools:
                    table.add_row(
                        tool.get("name", "N/A"),
                        tool.get("api_name", "N/A"),
                        tool.get("method", "N/A").upper(),
                        tool.get("path", "N/A"),
                        tool.get("summary", "No summary") or "No summary"
                    )
                
                console.print(Panel(table, border_style="green", expand=False))
            else:
                console.print("[yellow]No tools are currently registered.[/yellow]")
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
            return {}


async def refresh_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Refresh an API to update its endpoints.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API to refresh
        
    Returns:
        Result of the refresh operation
    """
    console.print(Rule(f"[bold blue]REFRESHING API: {api_name.upper()}[/bold blue]"))
    
    console.print(f"[cyan]Refreshing API {api_name} to update endpoints...[/cyan]")
    
    with console.status(f"[bold green]Refreshing {api_name} API...", spinner="dots"):
        try:
            start_time = time.time()
            result = await api_meta_tool.refresh_api(api_name=api_name)
            processing_time = time.time() - start_time
            
            console.print(f"[bold green]✓ Success![/bold green] API refreshed in {processing_time:.2f}s")
            
            # Display refresh summary
            console.print(Panel(
                f"[bold]Tools Added:[/bold] {len(result.get('tools_added', []))}\n"
                f"[bold]Tools Updated:[/bold] {len(result.get('tools_updated', []))}\n"
                f"[bold]Tools Removed:[/bold] {len(result.get('tools_removed', []))}\n"
                f"[bold]Total Tools:[/bold] {result.get('tools_count', 0)}",
                title="Refresh Results",
                border_style="green",
                expand=False
            ))
            
            # Display lists of added/removed tools if any
            added_tools = result.get("tools_added", [])
            if added_tools:
                console.print("[bold]Added Tools:[/bold]")
                for tool in added_tools:
                    console.print(f"  [green]+ {tool}[/green]")
            
            removed_tools = result.get("tools_removed", [])
            if removed_tools:
                console.print("[bold]Removed Tools:[/bold]")
                for tool in removed_tools:
                    console.print(f"  [red]- {tool}[/red]")
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error refreshing API:[/bold red] {str(e)}")
            return {}


async def unregister_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
    """Unregister an API and all its tools.
    
    Args:
        api_meta_tool: The APIMetaTool instance
        api_name: Name of the API to unregister
        
    Returns:
        Result of the unregistration
    """
    console.print(Rule(f"[bold blue]UNREGISTERING API: {api_name.upper()}[/bold blue]"))
    
    with console.status(f"[bold green]Unregistering {api_name} API...", spinner="dots"):
        try:
            start_time = time.time()
            result = await api_meta_tool.unregister_api(api_name=api_name)
            processing_time = time.time() - start_time
            
            console.print(f"[bold green]✓ Success![/bold green] API unregistered in {processing_time:.2f}s")
            
            # Display unregistration summary
            console.print(Panel(
                f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
                f"[bold]Tools Unregistered:[/bold] {len(result.get('tools_unregistered', []))}\n",
                title="Unregistration Results",
                border_style="green",
                expand=False
            ))
            
            return result
        except Exception as e:
            console.print(f"[bold red]Error unregistering API:[/bold red] {str(e)}")
            return {}


async def run_multi_api_demo(api_meta_tool: APIMetaTool):
    """Run a demonstration with multiple APIs registered simultaneously.
    
    Args:
        api_meta_tool: The APIMetaTool instance
    """
    console.print(Rule("[bold blue]MULTI-API DEMONSTRATION[/bold blue]"))
    
    console.print(Panel(
        "This demo shows how to work with multiple APIs registered simultaneously.\n"
        "We'll register two different APIs and interact with them.",
        title="Multiple APIs Demo",
        border_style="green",
        expand=False
    ))
    
    # Register multiple APIs
    apis_to_register = ["petstore", "weather"]
    registered_apis = []
    
    for api_name in apis_to_register:
        console.print(f"\n[bold]Registering {api_name} API...[/bold]")
        result = await register_api_demo(api_meta_tool, api_name)
        if result:
            registered_apis.append(api_name)
    
    if len(registered_apis) < 2:
        console.print("[yellow]Not enough APIs registered for multi-API demo. Skipping...[/yellow]")
        return
    
    console.print("\n[bold]Now we have multiple APIs registered:[/bold]")
    
    # List all registered APIs
    await list_apis_demo(api_meta_tool)
    
    # List all available tools
    await list_tools_demo(api_meta_tool)
    
    # Call a tool from each API
    for api_name in registered_apis:
        console.print(f"\n[bold]Calling a tool from {api_name} API:[/bold]")
        await call_tool_demo(api_meta_tool, api_name)
    
    # Clean up: unregister all APIs
    for api_name in registered_apis:
        console.print(f"\n[bold]Cleaning up: Unregistering {api_name} API:[/bold]")
        await unregister_api_demo(api_meta_tool, api_name)


async def run_full_demo(api_meta_tool: APIMetaTool) -> None:
    """Run the complete demonstration sequence with proper progress tracking.
    
    Args:
        api_meta_tool: The APIMetaTool instance
    """
    console.print(Rule("[bold cyan]RUNNING FULL META API DEMONSTRATION[/bold cyan]"))
    
    steps_md = """
    ## Full Demonstration Steps
    
    1. **Register API** - Add a new API from OpenAPI spec
    2. **List APIs** - View all registered APIs
    3. **API Details** - Explore API information
    4. **Tool Details** - Get specific tool info
    5. **Call Tool** - Execute an API endpoint
    6. **Refresh API** - Update API definition
    7. **Multi-API Demo** - Work with multiple APIs
    8. **Cleanup** - Unregister APIs
    """
    
    console.print(Markdown(steps_md))
    
    # Wait for user confirmation
    continue_demo = Confirm.ask("\nReady to start the full demonstration?")
    if not continue_demo:
        console.print("[yellow]Demonstration cancelled.[/yellow]")
        return
    
    # Create list of demo steps for tracking progress
    demo_steps: List[str] = [
        "Register API",
        "List APIs",
        "API Details",
        "Tool Details", 
        "Call Tool",
        "Refresh API",
        "Multi-API Demo",
        "Cleanup"
    ]
    
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        transient=False
    ) as progress:
        overall_task = progress.add_task("[bold cyan]Running full demo...", total=len(demo_steps))
        
        try:
            # 1. Register the Petstore API
            progress.update(overall_task, description="[bold cyan]STEP 1: Register the Petstore API[/bold cyan]")
            await register_api_demo(api_meta_tool, "petstore")
            progress.advance(overall_task)
            input("\nPress Enter to continue...")
            
            # Continue with other steps...
            # ... (rest of the function remains unchanged)
            
        except Exception as e:
            console.print(f"[bold red]Error during full demonstration:[/bold red] {str(e)}")


async def interactive_demo(api_meta_tool: APIMetaTool):
    """Run an interactive menu-driven demonstration.
    
    Args:
        api_meta_tool: The APIMetaTool instance
    """
    while True:
        console.clear()
        await show_intro()
        
        console.print("[bold cyan]META API TOOL MENU[/bold cyan]", justify="center")
        console.print("Select an action to demonstrate:", justify="center")
        console.print()
        
        # Menu options
        options = [
            ("Register an API", "Register a new API from its OpenAPI specification"),
            ("List Registered APIs", "List all currently registered APIs"),
            ("API Details", "Get detailed information about a registered API"),
            ("Tool Details", "Get detailed information about a specific tool"),
            ("Call a Tool", "Call a dynamically registered tool"),
            ("List All Tools", "List all available tools across all APIs"),
            ("Refresh an API", "Refresh an API to update its endpoints"),
            ("Unregister an API", "Unregister an API and all its tools"),
            ("Multi-API Demo", "Demonstrate using multiple APIs together"),
            ("Run Full Demo", "Run the complete demonstration sequence"),
            ("Exit", "Exit the demonstration")
        ]
        
        # Display menu
        menu_table = Table(box=None, highlight=True, border_style=None)
        menu_table.add_column("Option", style="cyan", justify="right")
        menu_table.add_column("Description", style="white")
        
        for i, (option, description) in enumerate(options, 1):
            menu_table.add_row(f"{i}. {option}", description)
        
        console.print(menu_table)
        console.print()
        
        # Get user choice
        try:
            choice = Prompt.ask(
                "[bold green]Enter option number",
                choices=[str(i) for i in range(1, len(options) + 1)],
                default="1"
            )
            choice = int(choice)
            
            if choice == len(options):  # Exit option
                console.print("[yellow]Exiting demonstration. Goodbye![/yellow]")
                break
            
            # Clear screen for the selected demo
            console.clear()
            await show_intro()
            
            # Run the selected demo
            if choice == 1:  # Register an API
                api_choice = Prompt.ask(
                    "[bold green]Select an API to register",
                    choices=["1", "2", "3"],
                    default="1"
                )
                api_name = list(DEMO_APIS.keys())[int(api_choice) - 1]
                await register_api_demo(api_meta_tool, api_name)
            
            elif choice == 2:  # List Registered APIs
                await list_apis_demo(api_meta_tool)
            
            elif choice == 3:  # API Details
                apis = await list_apis_demo(api_meta_tool)
                api_names = list(apis.get("apis", {}).keys())
                
                if not api_names:
                    console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
                else:
                    api_options = {str(i): name for i, name in enumerate(api_names, 1)}
                    api_choice = Prompt.ask(
                        "[bold green]Select an API",
                        choices=list(api_options.keys()),
                        default="1"
                    )
                    api_name = api_options[api_choice]
                    await get_api_details_demo(api_meta_tool, api_name)
            
            elif choice == 4:  # Tool Details
                apis = await list_apis_demo(api_meta_tool)
                api_names = list(apis.get("apis", {}).keys())
                
                if not api_names:
                    console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
                else:
                    api_options = {str(i): name for i, name in enumerate(api_names, 1)}
                    api_choice = Prompt.ask(
                        "[bold green]Select an API",
                        choices=list(api_options.keys()),
                        default="1"
                    )
                    api_name = api_options[api_choice]
                    await get_tool_details_demo(api_meta_tool, api_name)
            
            elif choice == 5:  # Call a Tool
                apis = await list_apis_demo(api_meta_tool)
                api_names = list(apis.get("apis", {}).keys())
                
                if not api_names:
                    console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
                else:
                    api_options = {str(i): name for i, name in enumerate(api_names, 1)}
                    api_choice = Prompt.ask(
                        "[bold green]Select an API",
                        choices=list(api_options.keys()),
                        default="1"
                    )
                    api_name = api_options[api_choice]
                    await call_tool_demo(api_meta_tool, api_name)
            
            elif choice == 6:  # List All Tools
                await list_tools_demo(api_meta_tool)
            
            elif choice == 7:  # Refresh an API
                apis = await list_apis_demo(api_meta_tool)
                api_names = list(apis.get("apis", {}).keys())
                
                if not api_names:
                    console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
                else:
                    api_options = {str(i): name for i, name in enumerate(api_names, 1)}
                    api_choice = Prompt.ask(
                        "[bold green]Select an API",
                        choices=list(api_options.keys()),
                        default="1"
                    )
                    api_name = api_options[api_choice]
                    await refresh_api_demo(api_meta_tool, api_name)
            
            elif choice == 8:  # Unregister an API
                apis = await list_apis_demo(api_meta_tool)
                api_names = list(apis.get("apis", {}).keys())
                
                if not api_names:
                    console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
                else:
                    api_options = {str(i): name for i, name in enumerate(api_names, 1)}
                    api_choice = Prompt.ask(
                        "[bold green]Select an API to unregister",
                        choices=list(api_options.keys()),
                        default="1"
                    )
                    api_name = api_options[api_choice]
                    await unregister_api_demo(api_meta_tool, api_name)
            
            elif choice == 9:  # Multi-API Demo
                await run_multi_api_demo(api_meta_tool)
            
            elif choice == 10:  # Run Full Demo
                await run_full_demo(api_meta_tool)
            
            # Wait for user to press Enter before returning to menu
            input("\nPress Enter to return to the menu...")
            
        except Exception as e:
            console.print(f"[bold red]Error:[/bold red] {str(e)}")
            input("\nPress Enter to return to the menu...")


async def main():
    """Main entry point for the demonstration."""
    try:
        # Set up the MCP client using create_app
        print("=== API Meta-Tool Demo ===")
        app = create_app()  # noqa: F841
        
        # Access the globally initialized Gateway instance and its api_meta_tool
        gateway_instance = ultimate_mcp_server.core._gateway_instance
        if not gateway_instance:
            raise RuntimeError("Gateway instance not initialized by create_app.")
            
        api_meta_tool = gateway_instance.api_meta_tool
        if not api_meta_tool:
            raise RuntimeError("API Meta Tool instance not found on Gateway. Ensure it was registered.")
            
        # Run the interactive demo with the retrieved instance
        await interactive_demo(api_meta_tool)
        
    except KeyboardInterrupt:
        console.print("\n[yellow]Demonstration interrupted by user.[/yellow]")
    except Exception as e:
        console.print(f"\n[bold red]Error:[/bold red] {str(e)}")
    finally:
        console.print("\n[bold green]Meta API Tool Demonstration completed![/bold green]")
        
    return 0


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code) 
```

--------------------------------------------------------------------------------
/examples/filesystem_operations_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""Filesystem operations demo for Ultimate MCP Server Tools.

This example demonstrates the secure asynchronous filesystem operations tools,
covering file/directory manipulation, searching, metadata retrieval, and
security features like allowed directory restrictions and deletion protection.
"""
import argparse
import asyncio
import json
import os
import platform
import shutil
import sys
import tempfile
import time
from pathlib import Path

# --- Configuration --- (Standard libs only here)
# Add project root to path for imports when running as script
# Adjust this path if your script location relative to the project root differs
try:
    PROJECT_ROOT = Path(__file__).resolve().parent.parent
    if not (PROJECT_ROOT / "ultimate").is_dir():
        # Fallback if running from a different structure
        PROJECT_ROOT = Path(__file__).resolve().parent
        if not (PROJECT_ROOT / "ultimate").is_dir():
             print("Error: Could not reliably determine project root. Make sure ultimate is importable.", file=sys.stderr)
             sys.exit(1)
    sys.path.insert(0, str(PROJECT_ROOT))

    # --- Important: Set Environment Variables FIRST --- 
    DEMO_TEMP_DIR = tempfile.mkdtemp(prefix="ultimate_fs_demo_")
    os.environ["FILESYSTEM__ALLOWED_DIRECTORIES"] = json.dumps([DEMO_TEMP_DIR])
    os.environ["GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES"] = json.dumps([DEMO_TEMP_DIR])
    os.environ["GATEWAY_FORCE_CONFIG_RELOAD"] = "true"
    
    print(f"INFO: Temporarily allowing access to: {DEMO_TEMP_DIR}")
    print("DEBUG: Environment variables set:")
    print(f"  FILESYSTEM__ALLOWED_DIRECTORIES = {os.environ['FILESYSTEM__ALLOWED_DIRECTORIES']}")
    print(f"  GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES = {os.environ['GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES']}")
except Exception as e:
    print(f"Error during initial setup: {e}", file=sys.stderr)
    sys.exit(1)

# --- Defer ALL ultimate imports until AFTER env vars are set ---
# Import Rich components (can happen earlier, but keep grouped for clarity)
from rich.markup import escape
from rich.panel import Panel
from rich.rule import Rule

from ultimate_mcp_server.config import get_config

# Import necessary exceptions
# Filesystem Tools
from ultimate_mcp_server.tools.filesystem import (
    create_directory,
    delete_path,
    directory_tree,
    edit_file,
    get_file_info,
    list_allowed_directories,
    list_directory,
    move_file,
    read_file,
    read_multiple_files,
    search_files,
    write_file,
)
from ultimate_mcp_server.utils import get_logger
from ultimate_mcp_server.utils.display import generate_rich_directory_tree, safe_tool_call

# Shared console and display utils
from ultimate_mcp_server.utils.logging.console import console

# Initialize logger AFTER all relevant imports
logger = get_logger("example.filesystem")

def parse_arguments():
    """Parse command line arguments for the demo."""
    parser = argparse.ArgumentParser(
        description="Filesystem Operations Demo for Ultimate MCP Server Tools",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""Available demos:
  all           - Run all demos (default)
  read          - File reading operations
  write         - File writing and editing operations
  directory     - Directory operations (create, list, tree)
  move_delete   - Move, delete, search & info operations
  security      - Security features demo
"""
    )

    parser.add_argument('demo', nargs='?', default='all',
                        choices=['all', 'read', 'write', 'directory', 'move_delete', 'security'],
                        help='Specific demo to run (default: all)')

    parser.add_argument('-v', '--verbose', action='store_true',
                        help='Increase output verbosity')

    parser.add_argument('--rich-tree', action='store_true',
                        help='Use enhanced rich tree visualization for directory trees')

    return parser.parse_args()

# --- Verify Configuration Loading ---
def verify_config():
    """Verify that the filesystem configuration has loaded correctly."""
    try:
        # Get config only ONCE
        config = get_config()
        fs_config = config.filesystem
        allowed_dirs = fs_config.allowed_directories
        
        print("Configuration verification:")
        print(f"  Allowed directories: {allowed_dirs}")
        
        if not allowed_dirs:
            print("WARNING: No allowed directories loaded in filesystem configuration!")
            print("Check these environment variables:")
            for key in os.environ:
                if "ALLOWED_DIRECTORIES" in key:
                    print(f"  {key} = {os.environ[key]}")
            print(f"DEMO_TEMP_DIR set to: {DEMO_TEMP_DIR}")
            # Do NOT attempt to force update - rely on initial load
            print("ERROR: Configuration failed to load allowed_directories from environment variables.")
            return False # Fail verification if dirs are missing
        
        # If allowed_dirs were loaded, check if our temp dir is in it
        if DEMO_TEMP_DIR in allowed_dirs:
            print(f"SUCCESS: Temporary directory {DEMO_TEMP_DIR} properly loaded in configuration!")
            return True
        else:
            print(f"WARNING: Temporary directory {DEMO_TEMP_DIR} not found in loaded allowed dirs: {allowed_dirs}")
            return False # Fail verification if temp dir is missing
            
    except Exception as e:
        print(f"ERROR during config verification: {e}")
        import traceback
        traceback.print_exc()
        return False

# --- Demo Setup ---
# DEMO_ROOT is the base *within* the allowed temporary directory
DEMO_ROOT = Path(DEMO_TEMP_DIR) / "demo_project"
BULK_FILES_COUNT = 110 # Number of files to create for deletion protection demo (>100)

async def setup_demo_environment():
    """Create a temporary directory structure for the demo."""
    logger.info("Setting up demo environment...", emoji_key="setup")
    DEMO_ROOT.mkdir(parents=True, exist_ok=True)

    # Create subdirectories
    project_dirs = [
        DEMO_ROOT / "docs",
        DEMO_ROOT / "src" / "utils",
        DEMO_ROOT / "data",
        DEMO_ROOT / "config",
        DEMO_ROOT / "tests",
        DEMO_ROOT / ".hidden_dir",
        DEMO_ROOT / "bulk_files" # For deletion protection demo
    ]
    for directory in project_dirs:
        directory.mkdir(parents=True, exist_ok=True)

    # Create some sample files
    sample_files = {
        DEMO_ROOT / "README.md": """# Project Demo

This is a demonstration project for testing the secure filesystem operations.

## Features

- File reading and writing
- Directory manipulation
- File searching capabilities
- Metadata retrieval

## Security

All operations are restricted to allowed directories for safety.""",

        DEMO_ROOT / "src" / "main.py": """#!/usr/bin/env python
'''Main entry point for the demo application.'''
import sys
from pathlib import Path
# Import logger for edit demo
# Assume logger is configured elsewhere
# import logging
# logger = logging.getLogger(__name__)

# A line with different whitespace for editing demo
def main():
	'''Main function to run the application.'''
	print("Hello from the demo application!")

	# Get configuration
	config = get_config_local() # Renamed to avoid conflict
	print(f"Running with debug mode: {config['debug']}")

	return 0

def get_config_local(): # Renamed
    '''Get application configuration.'''
    return {
        "debug": True,
        "log_level": "INFO",
        "max_connections": 10
    }

if __name__ == "__main__":
    sys.exit(main())
""",

        DEMO_ROOT / "src" / "utils" / "helpers.py": """'''Helper utilities for the application.'''

def format_message(message, level="info"):
    '''Format a message with level prefix.'''
    return f"[{level.upper()}] {message}"

class DataProcessor:
    '''Process application data.'''

    def __init__(self, data_source):
        self.data_source = data_source

    def process(self):
        '''Process the data.'''
        # TODO: Implement actual processing
        return f"Processed {self.data_source}"
""",

        DEMO_ROOT / "docs" / "api.md": """# API Documentation

## Endpoints

### GET /api/v1/status

Returns the current system status.

### POST /api/v1/data

Submit data for processing.

## Authentication

All API calls require an authorization token.
""",
        DEMO_ROOT / "config" / "settings.json": """{
    "appName": "Demo Application",
    "version": "1.0.0",
    "debug": false,
    "database": {
        "host": "localhost",
        "port": 5432,
        "name": "demo_db"
    },
    "logging": {
        "level": "info",
        "file": "app.log"
    }
}""",
        DEMO_ROOT / "data" / "sample.csv": "ID,Value,Category\n1,10.5,A\n2,15.2,B\n3,9.8,A",
        DEMO_ROOT / "tests" / "test_helpers.py": """import pytest
# Adjust import path if needed relative to test execution
from src.utils.helpers import format_message

def test_format_message():
    assert format_message("Test", "debug") == "[DEBUG] Test"
""",
        DEMO_ROOT / ".gitignore": "*.log\n*.tmp\n.hidden_dir/\n",
        DEMO_ROOT / "temp.log": "Log file content - should be excluded by search patterns.",
        # Add a file with potentially non-UTF8 data (simulated)
        DEMO_ROOT / "data" / "binary_data.bin": b'\x80\x02\x95\n\x00\x00\x00\x00\x00\x00\x00}\x94\x8c\x04data\x94\x8c\x06binary\x94s.'
    }

    for file_path, content in sample_files.items():
        file_path.parent.mkdir(parents=True, exist_ok=True)
        if isinstance(content, str):
            file_path.write_text(content, encoding='utf-8')
        else:
            file_path.write_bytes(content)

    # Create bulk files for deletion protection test
    bulk_dir = DEMO_ROOT / "bulk_files"
    bulk_dir.mkdir(exist_ok=True) # Ensure bulk dir exists
    
    # Create files with deliberately varied timestamps to trigger protection
    current_time = time.time()
    file_types = [".txt", ".log", ".dat", ".csv", ".tmp", ".bak", ".json"]
    
    for i in range(BULK_FILES_COUNT):
        # Use a wider variety of extensions
        ext = file_types[i % len(file_types)]
        fpath = bulk_dir / f"file_{i:03d}{ext}"
        fpath.write_text(f"Content for file {i}")
        
        # Create highly varied timestamps spanning days/weeks, not just minutes
        # Some files very old, some very new, to ensure high standard deviation
        if i < BULK_FILES_COUNT // 3:
            # First third: older files (30-60 days old)
            age = 60 * 60 * 24 * (30 + (i % 30))  # 30-60 days in seconds
        elif i < 2 * (BULK_FILES_COUNT // 3):
            # Middle third: medium age (1-10 days old)
            age = 60 * 60 * 24 * (1 + (i % 10))  # 1-10 days in seconds
        else:
            # Final third: very recent (0-12 hours old)
            age = 60 * 60 * (i % 12)  # 0-12 hours in seconds
            
        # Set both access and modification times to the calculated age
        try:
            timestamp = current_time - age
            os.utime(fpath, (timestamp, timestamp))
        except OSError as e:
            logger.warning(f"Could not set utime for {fpath}: {e}", emoji_key="warning")
    
    # Add a message about the setup
    logger.info(f"Created {BULK_FILES_COUNT} files in 'bulk_files/' with highly varied timestamps and {len(file_types)} different extensions", emoji_key="setup")

    # Create a symlink (if supported)
    SYMLINK_PATH = DEMO_ROOT / "link_to_src"
    TARGET_PATH = DEMO_ROOT / "src" # Link to src directory
    try:
        # Check if symlinks are supported (e.g., Windows needs admin rights or dev mode)
        can_symlink = hasattr(os, "symlink")
        test_link_path = DEMO_ROOT / "test_link_nul_delete"
        if platform.system() == "Windows":
            # Basic check, might not be perfect
            try:
                # Use a file target for test link on Windows if dir links need special perms
                test_target = DEMO_ROOT / "README.md"
                os.symlink(test_target, test_link_path, target_is_directory=False)
                test_link_path.unlink() # Clean up test link
            except (OSError, AttributeError, NotImplementedError):
                 can_symlink = False
                 logger.warning("Symlink creation might not be supported or permitted on this system. Skipping symlink tests.", emoji_key="warning")

        if can_symlink:
            # Ensure target exists before creating link
            if TARGET_PATH.is_dir():
                 # Use await aiofiles.os.symlink for consistency? No, os.symlink is sync only.
                 os.symlink(TARGET_PATH, SYMLINK_PATH, target_is_directory=True)
                 logger.info(f"Created symlink: {SYMLINK_PATH} -> {TARGET_PATH}", emoji_key="link")
            else:
                 logger.warning(f"Symlink target {TARGET_PATH} does not exist or is not a directory. Skipping symlink creation.", emoji_key="warning")
                 SYMLINK_PATH = None
        else:
             SYMLINK_PATH = None # Indicate symlink wasn't created
    except OSError as e:
        # Handle errors like EEXIST if link already exists, or permission errors
        if e.errno == 17: # EEXIST
             logger.warning(f"Symlink {SYMLINK_PATH} already exists. Assuming correct setup.", emoji_key="warning")
        else:
             logger.warning(f"Could not create symlink ({SYMLINK_PATH} -> {TARGET_PATH}): {e}. Skipping symlink tests.", emoji_key="warning")
             SYMLINK_PATH = None # Indicate symlink wasn't created
    except Exception as e:
        logger.error(f"Unexpected error creating symlink: {e}", emoji_key="error", exc_info=True)
        SYMLINK_PATH = None

    logger.success(f"Demo environment set up at: {DEMO_ROOT}", emoji_key="success")
    console.print(Panel(
        f"Created demo project within [cyan]{DEMO_ROOT.parent}[/cyan] at [cyan]{DEMO_ROOT.name}[/cyan]\n"
        f"Created [bold]{len(project_dirs)}[/bold] directories and [bold]{len(sample_files)}[/bold] sample files.\n"
        f"Created [bold]{BULK_FILES_COUNT}[/bold] files in 'bulk_files/' for deletion test.\n"
        f"Symlink created: {'Yes' if SYMLINK_PATH else 'No'}",
        title="Demo Environment Ready",
        border_style="green",
        expand=False
    ))
    return SYMLINK_PATH

async def cleanup_demo_environment():
    """Remove the temporary directory structure using standard shutil."""
    global DEMO_TEMP_DIR
    if DEMO_TEMP_DIR and Path(DEMO_TEMP_DIR).exists():
        try:
            # Use synchronous shutil for cleanup simplicity
            shutil.rmtree(DEMO_TEMP_DIR)
            logger.info(f"Cleaned up demo directory: {DEMO_TEMP_DIR}", emoji_key="cleanup")
            console.print(f"Cleaned up demo directory: [dim]{DEMO_TEMP_DIR}[/dim]")
        except Exception as e:
            logger.error(f"Error during cleanup of {DEMO_TEMP_DIR}: {e}", emoji_key="error")
            console.print(f"[bold red]Error cleaning up demo directory {DEMO_TEMP_DIR}: {e}[/bold red]")
    DEMO_TEMP_DIR = None


async def demonstrate_file_reading(symlink_path):
    """Demonstrate file reading operations."""
    console.print(Rule("[bold cyan]1. File Reading Operations[/bold cyan]", style="cyan"))
    logger.info("Demonstrating file reading operations...", emoji_key="file")

    # --- Read Single File (Text) ---
    readme_path = str(DEMO_ROOT / "README.md")
    await safe_tool_call(read_file, {"path": readme_path}, description="Reading a text file (README.md)")

    # --- Read Single File (JSON) ---
    settings_path = str(DEMO_ROOT / "config" / "settings.json")
    await safe_tool_call(read_file, {"path": settings_path}, description="Reading a JSON file (settings.json)")

    # --- Read Single File (Simulated Binary) ---
    binary_path = str(DEMO_ROOT / "data" / "binary_data.bin")
    await safe_tool_call(read_file, {"path": binary_path}, description="Reading a binary file (expecting hex preview)")

    # --- Read Non-Existent File ---
    non_existent_path = str(DEMO_ROOT / "non_existent.txt")
    await safe_tool_call(read_file, {"path": non_existent_path}, description="Attempting to read a non-existent file (should fail)")

    # --- Read a Directory (should fail) ---
    dir_path = str(DEMO_ROOT / "src")
    await safe_tool_call(read_file, {"path": dir_path}, description="Attempting to read a directory as a file (should fail)")

    # --- Read Multiple Files (Success and Failure Mix) ---
    paths_to_read = [
        str(DEMO_ROOT / "README.md"),
        str(DEMO_ROOT / "src" / "main.py"),
        str(DEMO_ROOT / "non_existent.txt"), # This one will fail
        str(DEMO_ROOT / "config" / "settings.json"),
        str(DEMO_ROOT / "src") # Reading a directory will also fail here
    ]
    await safe_tool_call(read_multiple_files, {"paths": paths_to_read}, description="Reading multiple files (including some that will fail)")

    # --- Read file via Symlink (if created) ---
    if symlink_path:
         # Reading a file within the linked directory
         linked_file_path = str(symlink_path / "main.py")
         await safe_tool_call(read_file, {"path": linked_file_path}, description=f"Reading a file via symlink ({os.path.basename(symlink_path)}/main.py)")

async def demonstrate_file_writing_editing():
    """Demonstrate file writing and editing operations."""
    console.print(Rule("[bold cyan]2. File Writing & Editing Operations[/bold cyan]", style="cyan"))
    logger.info("Demonstrating file writing and editing operations...", emoji_key="file")

    # --- Write New File ---
    new_file_path = str(DEMO_ROOT / "data" / "report.md")
    file_content = """# Analysis Report

## Summary
This report contains the analysis of project performance metrics.

## Key Findings
1. Response time improved by 15%
2. Error rate decreased to 0.5%
3. User satisfaction score: 4.8/5.0

## Recommendations
- Continue monitoring performance
- Implement suggested optimizations
- Schedule follow-up review next quarter
"""
    await safe_tool_call(write_file, {"path": new_file_path, "content": file_content}, description="Writing a new file (report.md)")

    # --- Overwrite Existing File ---
    overwrite_content = "# Analysis Report (V2)\n\nReport updated."
    await safe_tool_call(write_file, {"path": new_file_path, "content": overwrite_content}, description="Overwriting the existing file (report.md)")
    # Verify overwrite
    await safe_tool_call(read_file, {"path": new_file_path}, description="Reading the overwritten file to verify")

    # --- Attempt to Write to a Directory (should fail) ---
    await safe_tool_call(write_file, {"path": str(DEMO_ROOT / "src"), "content": "test"}, description="Attempting to write over a directory (should fail)")

    # --- Edit File (main.py) ---
    target_edit_file = str(DEMO_ROOT / "src" / "main.py")

    # Edits including one requiring whitespace-insensitive fallback
    edits = [
        {
            "oldText": 'print("Hello from the demo application!")', # Exact match
            "newText": 'print("Hello from the UPDATED demo application!")\n    logger.info("App started")'
        },
        {
            # This uses different leading whitespace than the original file
            "oldText": "def main():\n    '''Main function to run the application.'''",
            # Expected fallback behavior: find based on stripped lines, replace using original indentation
            "newText": "def main():\n    '''The primary execution function.''' # Docstring updated"
        },
         {
             "oldText": '    return {\n        "debug": True,\n        "log_level": "INFO",\n        "max_connections": 10\n    }',
             "newText": '    return {\n        "debug": False, # Changed to False\n        "log_level": "WARNING",\n        "max_connections": 25 # Increased limit\n    }'
         }
    ]

    await safe_tool_call(read_file, {"path": target_edit_file}, description="Reading main.py before editing")

    # Edit with Dry Run
    await safe_tool_call(edit_file, {"path": target_edit_file, "edits": edits, "dry_run": True}, description="Editing main.py (Dry Run - showing diff)")

    # Apply Edits for Real
    await safe_tool_call(edit_file, {"path": target_edit_file, "edits": edits, "dry_run": False}, description="Applying edits to main.py")

    # Verify Edits
    await safe_tool_call(read_file, {"path": target_edit_file}, description="Reading main.py after editing")

    # --- Edit with Non-Existent Old Text (should fail) ---
    failed_edit = [{"oldText": "This text does not exist in the file", "newText": "Replacement"}]
    await safe_tool_call(edit_file, {"path": target_edit_file, "edits": failed_edit}, description="Attempting edit with non-existent 'oldText' (should fail)")


async def demonstrate_directory_operations(symlink_path, use_rich_tree=False):
    """Demonstrate directory creation, listing, and tree view."""
    console.print(Rule("[bold cyan]3. Directory Operations[/bold cyan]", style="cyan"))
    logger.info("Demonstrating directory operations...", emoji_key="directory")

    # --- Create Directory ---
    # First ensure parent directory exists
    logs_dir_path = str(DEMO_ROOT / "logs")
    await safe_tool_call(create_directory, {"path": logs_dir_path}, description="Creating parent directory (logs)")
    
    # Now create nested directory
    new_dir_path = str(DEMO_ROOT / "logs" / "debug")
    await safe_tool_call(create_directory, {"path": new_dir_path}, description="Creating a new nested directory (logs/debug)")

    # --- Create Directory (already exists) ---
    await safe_tool_call(create_directory, {"path": new_dir_path}, description="Attempting to create the same directory again (idempotent)")

    # --- Attempt to Create Directory over a File (should fail) ---
    file_path_for_dir = str(DEMO_ROOT / "README.md")
    await safe_tool_call(create_directory, {"path": file_path_for_dir}, description="Attempting to create directory over an existing file (README.md - should fail)")

    # --- List Directory (Root) ---
    await safe_tool_call(list_directory, {"path": str(DEMO_ROOT)}, description=f"Listing contents of demo root ({DEMO_ROOT.name})")

    # --- List Directory (Subdir) ---
    await safe_tool_call(list_directory, {"path": str(DEMO_ROOT / "src")}, description="Listing contents of subdirectory (src)")

    # --- List Directory (via Symlink, if created) ---
    if symlink_path:
         await safe_tool_call(list_directory, {"path": str(symlink_path)}, description=f"Listing contents via symlink ({os.path.basename(symlink_path)})")

    # --- List Non-Existent Directory (should fail) ---
    await safe_tool_call(list_directory, {"path": str(DEMO_ROOT / "no_such_dir")}, description="Attempting to list non-existent directory (should fail)")

    # --- Enhanced visualization for directory tree if requested ---
    if use_rich_tree:
        # Restore direct call to async tree generator utility
        console.print("\n[bold cyan]Enhanced Directory Tree Visualization (Async Tool Based)[/bold cyan]")
        
        try:
            # Generate the tree using the async utility function from display.py
            rich_tree = await generate_rich_directory_tree(str(DEMO_ROOT), max_depth=3)
            console.print(rich_tree)
        except Exception as e:
            logger.error(f"Error generating async directory tree: {e}", exc_info=True)
            console.print(f"[bold red]Error generating directory tree: {escape(str(e))}[/bold red]")
            
        console.print() # Add newline

    # --- Directory Tree (Default Depth) --- # This uses the directory_tree TOOL
    # The safe_tool_call will now use its built-in tree renderer for this standard call
    # Note: The tool 'directory_tree' produces a similar but potentially slightly different
    # structure/detail level than the custom async generator above.
    await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT)}, description="Generating directory tree for demo root (default depth - using tool)")

    # --- Directory Tree (Specific Depth) ---
    await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT), "max_depth": 1}, description="Generating directory tree (max_depth=1)")

    # --- Directory Tree (Include Size) ---
    await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT), "max_depth": 2, "include_size": True}, description="Generating directory tree (max_depth=2, include_size=True)")

    # --- Directory Tree (via Symlink, if created) ---
    if symlink_path:
         await safe_tool_call(directory_tree, {"path": str(symlink_path), "max_depth": 1}, description=f"Generating directory tree via symlink ({os.path.basename(symlink_path)}, max_depth=1)")

async def demonstrate_move_delete_search(symlink_path):
    """Demonstrate file/directory moving, deletion, searching, and info retrieval."""
    console.print(Rule("[bold cyan]4. Move, Delete, Search & Info Operations[/bold cyan]", style="cyan"))
    logger.info("Demonstrating move, delete, search, info operations...", emoji_key="file")

    # --- Get File Info (File) ---
    settings_json_path = str(DEMO_ROOT / "config" / "settings.json")
    await safe_tool_call(get_file_info, {"path": settings_json_path}, description="Getting file info for settings.json")

    # --- Get File Info (Directory) ---
    src_dir_path = str(DEMO_ROOT / "src")
    await safe_tool_call(get_file_info, {"path": src_dir_path}, description="Getting file info for src directory")

    # --- Get File Info (Symlink, if created) ---
    if symlink_path:
        await safe_tool_call(get_file_info, {"path": str(symlink_path)}, description=f"Getting file info for symlink ({os.path.basename(symlink_path)}) - uses lstat")

    # --- Search Files (Name Match, Case Insensitive) ---
    await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "readme"}, description="Searching for 'readme' (case insensitive)")

    # --- Search Files (Name Match, Case Sensitive) ---
    await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "README", "case_sensitive": True}, description="Searching for 'README' (case sensitive)")

    # --- Search Files (With Exclusions) ---
    await safe_tool_call(search_files,
                         {"path": str(DEMO_ROOT), "pattern": ".py", "exclude_patterns": ["*/test*", ".hidden_dir/*"]},
                         description="Searching for '*.py', excluding tests and hidden dir")

    # --- Search Files (Content Search) ---
    await safe_tool_call(search_files,
                         {"path": str(DEMO_ROOT), "pattern": "localhost", "search_content": True},
                         description="Searching for content 'localhost' inside files")

    # --- Search Files (Content Search, Case Sensitive) ---
    await safe_tool_call(search_files,
                         {"path": str(DEMO_ROOT), "pattern": "DataProcessor", "search_content": True, "case_sensitive": True},
                         description="Searching for content 'DataProcessor' (case sensitive)")

    # --- Search Files (No Matches) ---
    await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "xyz_no_match_xyz"}, description="Searching for pattern guaranteed not to match")

    # --- Move File ---
    source_move_path = str(DEMO_ROOT / "data" / "sample.csv")
    dest_move_path = str(DEMO_ROOT / "data" / "renamed_sample.csv")
    await safe_tool_call(move_file, {"source": source_move_path, "destination": dest_move_path}, description="Moving (renaming) sample.csv")
    # Verify move by trying to get info on new path
    await safe_tool_call(get_file_info, {"path": dest_move_path}, description="Verifying move by getting info on new path")

    # --- Move File (Overwrite) ---
    # First create a file to be overwritten
    overwrite_target_path = str(DEMO_ROOT / "data" / "overwrite_me.txt")
    await safe_tool_call(write_file, {"path": overwrite_target_path, "content": "Original content"}, description="Creating file to be overwritten")
    # Now move onto it with overwrite=True
    await safe_tool_call(move_file,
                         {"source": dest_move_path, "destination": overwrite_target_path, "overwrite": True},
                         description="Moving renamed_sample.csv onto overwrite_me.txt (overwrite=True)")
    # Verify overwrite
    await safe_tool_call(get_file_info, {"path": overwrite_target_path}, description="Verifying overwrite by getting info")

    # --- Move Directory ---
    source_dir_move = str(DEMO_ROOT / "tests")
    dest_dir_move = str(DEMO_ROOT / "tests_moved")
    await safe_tool_call(move_file, {"source": source_dir_move, "destination": dest_dir_move}, description="Moving the 'tests' directory")
    # Verify move
    await safe_tool_call(list_directory, {"path": dest_dir_move}, description="Verifying directory move by listing new path")

    # --- Attempt Move (Destination Exists, No Overwrite - should fail) ---
    await safe_tool_call(move_file,
                         {"source": str(DEMO_ROOT / "README.md"), "destination": str(DEMO_ROOT / "config" / "settings.json")},
                         description="Attempting to move README.md onto settings.json (no overwrite - should fail)")

    # --- Delete File ---
    file_to_delete = str(DEMO_ROOT / "temp.log")
    await safe_tool_call(get_file_info, {"path": file_to_delete}, description="Checking temp.log exists before deleting")
    await safe_tool_call(delete_path, {"path": file_to_delete}, description="Deleting single file (temp.log)")
    await safe_tool_call(get_file_info, {"path": file_to_delete}, description="Verifying temp.log deletion (should fail)")

    # --- Delete Symlink (if created) ---
    if symlink_path:
        # Get the exact path string to the symlink without resolving it
        symlink_str = str(symlink_path)
        await safe_tool_call(get_file_info, {"path": symlink_str}, description=f"Checking symlink {os.path.basename(symlink_path)} exists before deleting")
        
        # Explicitly tell the user what we're doing
        console.print(f"[cyan]Note:[/cyan] Deleting the symlink itself (not its target) at path: {symlink_str}")
        
        await safe_tool_call(delete_path, {"path": symlink_str}, description=f"Deleting symlink ({os.path.basename(symlink_path)})")
        await safe_tool_call(get_file_info, {"path": symlink_str}, description="Verifying symlink deletion (should fail)")

    # --- Delete Empty Directory ---
    empty_dir_to_delete = str(DEMO_ROOT / "logs" / "debug") # Created earlier, should be empty
    await safe_tool_call(get_file_info, {"path": empty_dir_to_delete}, description="Checking logs/debug exists before deleting")
    await safe_tool_call(delete_path, {"path": empty_dir_to_delete}, description="Deleting empty directory (logs/debug)")
    await safe_tool_call(get_file_info, {"path": empty_dir_to_delete}, description="Verifying empty directory deletion (should fail)")

    # --- Delete Directory with Content (Testing Deletion Protection) ---
    bulk_dir_path = str(DEMO_ROOT / "bulk_files")
    console.print(Panel(
        f"Attempting to delete directory '{os.path.basename(bulk_dir_path)}' which contains {BULK_FILES_COUNT} files.\n"
        "This will trigger the deletion protection check (heuristics based on file count, timestamps, types).\n"
        "Whether it blocks depends on the config thresholds and calculated variances.",
        title="🛡️ Testing Deletion Protection 🛡️", border_style="yellow"
    ))
    # This call might raise ProtectionTriggeredError, which safe_tool_call will catch and display
    await safe_tool_call(delete_path, {"path": bulk_dir_path}, description=f"Deleting directory with {BULK_FILES_COUNT} files (bulk_files)")
    # Check if it was actually deleted or blocked by protection
    await safe_tool_call(get_file_info, {"path": bulk_dir_path}, description="Checking if bulk_files directory still exists after delete attempt")


async def demonstrate_security_features():
    """Demonstrate security features like allowed directories."""
    console.print(Rule("[bold cyan]5. Security Features[/bold cyan]", style="cyan"))
    logger.info("Demonstrating security features...", emoji_key="security")

    # --- List Allowed Directories ---
    # This reads from the config (which we set via env var for the demo)
    await safe_tool_call(list_allowed_directories, {}, description="Listing configured allowed directories")
    console.print(f"[dim]Note: For this demo, only the temporary directory [cyan]{DEMO_TEMP_DIR}[/cyan] was allowed via environment variable.[/dim]")

    # --- Try to Access Standard System Root (should fail) ---
    # Choose a path guaranteed outside the temp allowed dir
    outside_path_root = "/" if platform.system() != "Windows" else "C:\\"
    console.print(f"\nAttempting operation outside allowed directory: [red]Listing '{outside_path_root}'[/red]")
    await safe_tool_call(list_directory, {"path": outside_path_root}, description=f"Attempting to list root directory '{outside_path_root}' (should fail)")

    # --- Try to Access Specific Sensitive File (should fail) ---
    outside_path_file = "/etc/passwd" if platform.system() != "Windows" else "C:\\Windows\\System32\\drivers\\etc\\hosts"
    console.print(f"\nAttempting operation outside allowed directory: [red]Reading '{outside_path_file}'[/red]")
    await safe_tool_call(read_file, {"path": outside_path_file}, description=f"Attempting to read sensitive file '{outside_path_file}' (should fail)")

    # --- Try to use '..' to escape (should fail due to normalization) ---
    escape_path = str(DEMO_ROOT / ".." / "..") # Attempt to go above the allowed temp dir
    # Note: validate_path normalizes this, so it might resolve to something unexpected but still potentially outside
    # Or, more likely, the normalized path check against allowed dirs will fail.
    console.print(f"\nAttempting operation using '..' to potentially escape: [red]Listing '{escape_path}'[/red]")
    await safe_tool_call(list_directory, {"path": escape_path}, description=f"Attempting to list path using '..' ('{escape_path}')")

    console.print(Panel(
        "Security checks demonstrated:\n"
        "1. Operations are confined to the `allowed_directories`.\n"
        "2. Accessing paths outside these directories is denied.\n"
        "3. Path normalization prevents trivial directory traversal escapes (`..`).\n"
        "4. Symlink targets are also validated against `allowed_directories` (implicitly tested via symlink operations).\n"
        "5. Deletion protection provides a safety net against accidental bulk deletions (demonstrated earlier).",
        title="Security Summary", border_style="green", expand=False
    ))


async def main():
    """Run the filesystem operations demonstration."""
    global DEMO_TEMP_DIR # Make sure main knows about this path
    symlink_path = None
    exit_code = 0

    # Parse command line arguments
    args = parse_arguments()

    try:
        console.print(Rule("[bold blue]Secure Filesystem Operations Demo[/bold blue]", style="white"))
        logger.info("Starting filesystem operations demonstration", emoji_key="start")

        # --- Verify Config Loading ---
        print(Rule("Verifying Configuration", style="dim"))
        config_valid = verify_config()
        if not config_valid:
            # Abort with a clear message if config verification fails
            console.print("[bold red]Error:[/bold red] Configuration verification failed. Aborting demonstration.", style="red")
            return 1 # Exit early if config is wrong
            
        # --- Verify Config Loading ---
        try:
             current_config = get_config()
             fs_config = current_config.filesystem
             loaded_allowed_dirs = fs_config.allowed_directories
             console.print(f"[dim]Config Check: Loaded allowed dirs: {loaded_allowed_dirs}[/dim]")
             if not loaded_allowed_dirs or DEMO_TEMP_DIR not in loaded_allowed_dirs:
                  console.print("[bold red]Error:[/bold red] Demo temporary directory not found in loaded allowed directories. Configuration failed.", style="red")
                  console.print(f"[dim]Expected: {DEMO_TEMP_DIR}[/dim]")
                  console.print(f"[dim]Loaded Config: {current_config.model_dump()}") # Dump entire config
                  return 1 # Exit early if config is wrong
        except Exception as config_err:
             console.print(f"[bold red]Error checking loaded configuration:[/bold red] {config_err}", style="red")
             console.print_exception(show_locals=False)
             return 1
        # --- End Verify Config Loading ---


        # Display available options if running all demos
        if args.demo == 'all':
            console.print(Panel(
                "This demo includes multiple sections showcasing different filesystem operations.\n"
                "You can run individual sections using the following commands:\n\n"
                "[yellow]python examples/filesystem_operations_demo.py read[/yellow] - File reading operations\n"
                "[yellow]python examples/filesystem_operations_demo.py write[/yellow] - File writing and editing operations\n"
                "[yellow]python examples/filesystem_operations_demo.py directory[/yellow] - Directory operations\n"
                "[yellow]python examples/filesystem_operations_demo.py move_delete[/yellow] - Move, delete, search & info operations\n"
                "[yellow]python examples/filesystem_operations_demo.py security[/yellow] - Security features demo\n\n"
                "Add [yellow]--rich-tree[/yellow] for enhanced directory visualization!",
                title="Demo Options",
                border_style="cyan",
                expand=False
            ))

        # Display info message
        console.print(Panel(
            "This demo showcases the secure asynchronous filesystem tools.\n"
            f"A temporary directory ([cyan]{DEMO_TEMP_DIR}[/cyan]) has been created and configured as the ONLY allowed directory for this demo's operations via environment variables.",
            title="About This Demo",
            border_style="cyan"
        ))

        # Set up the demo environment *inside* the allowed temp dir
        symlink_path = await setup_demo_environment()

        # Run the selected demonstration(s)
        if args.demo == 'all' or args.demo == 'read':
            await demonstrate_file_reading(symlink_path)
            console.print() # Add newline

        if args.demo == 'all' or args.demo == 'write':
            await demonstrate_file_writing_editing()
            console.print() # Add newline

        if args.demo == 'all' or args.demo == 'directory':
            await demonstrate_directory_operations(symlink_path, use_rich_tree=args.rich_tree)
            console.print() # Add newline

        if args.demo == 'all' or args.demo == 'move_delete':
            await demonstrate_move_delete_search(symlink_path)
            console.print() # Add newline

        if args.demo == 'all' or args.demo == 'security':
            await demonstrate_security_features()

        logger.success(f"Filesystem Operations Demo(s) completed: {args.demo}", emoji_key="complete")
        console.print(Rule("[bold green]Demo Complete[/bold green]", style="green"))

    except Exception as e:
        logger.critical(f"Demo crashed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
        console.print(f"\n[bold red]CRITICAL ERROR:[/bold red] {escape(str(e))}")
        console.print_exception(show_locals=False)
        exit_code = 1

    finally:
        # Clean up the demo environment
        console.print(Rule("Cleanup", style="dim"))
        await cleanup_demo_environment()

    return exit_code

def get_config_local(): # Renamed
    """Get application configuration."""
    return {
        "debug": True,
        "log_level": "INFO",
        "max_connections": 10
    }

if __name__ == "__main__":
    # Basic check for asyncio policy on Windows if needed
    # if sys.platform == "win32" and sys.version_info >= (3, 8):
    #     asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

    # Run the demo
    final_exit_code = asyncio.run(main())
    sys.exit(final_exit_code)
```
Page 11/35FirstPrevNextLast