This is page 14 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/text_classification_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """
3 | Text classification demonstration for Ultimate MCP Server.
4 | This example showcases the comprehensive capabilities of the text_classification tool,
5 | demonstrating various classification strategies, multi-label vs. single-label,
6 | hierarchical categories, and more.
7 | """
8 | import asyncio
9 | import json
10 | import os
11 | import sys
12 | import time
13 | from collections import namedtuple # Import namedtuple
14 | from pathlib import Path
15 |
16 | # Add project root to path for imports when running as script
17 | sys.path.insert(0, str(Path(__file__).parent.parent))
18 |
19 | from rich import box
20 | from rich.console import Console
21 | from rich.layout import Layout
22 | from rich.live import Live
23 | from rich.markup import escape
24 | from rich.panel import Panel
25 | from rich.progress import Progress, SpinnerColumn, TextColumn
26 | from rich.rule import Rule
27 | from rich.table import Table
28 | from rich.text import Text
29 |
30 | from ultimate_mcp_server.config import get_config
31 | from ultimate_mcp_server.constants import Provider
32 | from ultimate_mcp_server.tools.text_classification import (
33 | ClassificationStrategy,
34 | text_classification,
35 | )
36 | from ultimate_mcp_server.utils import get_logger
37 | from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker
38 | from ultimate_mcp_server.utils.logging.console import console
39 |
40 | # Initialize logger
41 | logger = get_logger("example.text_classification")
42 |
43 | # Create a separate debug console for detailed logging
44 | debug_console = Console(stderr=True, highlight=False)
45 |
46 | # Get configuration from centralized config system
47 | gateway_config = get_config()
48 | EXPORT_RESULTS = gateway_config.server.debug # Using server.debug as a proxy for export results
49 | RESULTS_DIR = os.path.join(gateway_config.storage_directory, "classification_results")
50 | DEMO_TIMEOUT = 120 # Hard-coded default timeout for demo
51 |
52 | # Cache for demonstration purposes
53 | DEMO_RESULTS_CACHE = {}
54 |
55 | # File paths for sample data
56 | SAMPLE_DIR = Path(__file__).parent / "sample" / "text_classification_samples"
57 | NEWS_SAMPLES_PATH = SAMPLE_DIR / "news_samples.txt"
58 | PRODUCT_REVIEWS_PATH = SAMPLE_DIR / "product_reviews.txt"
59 | SUPPORT_TICKETS_PATH = SAMPLE_DIR / "support_tickets.txt"
60 | EMAIL_SAMPLES_PATH = SAMPLE_DIR / "email_classification.txt"
61 |
62 | # Create a simple structure for cost tracking from dict
63 | TrackableResult = namedtuple("TrackableResult", ["cost", "input_tokens", "output_tokens", "provider", "model", "processing_time"])
64 |
65 | # Helper Functions
66 | def extract_samples_from_file(file_path):
67 | """Extract labeled samples from a text file."""
68 | with open(file_path, "r", encoding="utf-8") as file:
69 | content = file.read()
70 |
71 | samples = {}
72 | current_label = None
73 | current_content = []
74 |
75 | for line in content.split("\n"):
76 | if line.strip().endswith("SAMPLE:") or line.strip().endswith("EMAIL:") or line.strip().endswith("REVIEW:") or line.strip().endswith("ISSUE:") or line.strip().endswith("REPORT:") or line.strip().endswith("REQUEST:") or line.strip().endswith("QUESTION:"):
77 | # Save previous sample
78 | if current_label and current_content:
79 | samples[current_label] = "\n".join(current_content).strip()
80 |
81 | # Start new sample
82 | current_label = line.strip().rstrip(":")
83 | current_content = []
84 | elif line.strip() and current_label is not None:
85 | current_content.append(line)
86 |
87 | # Add the last sample
88 | if current_label and current_content:
89 | samples[current_label] = "\n".join(current_content).strip()
90 |
91 | return samples
92 |
93 | def display_classification_result(result, title, text_sample=None, categories=None):
94 | """Display classification results in a rich formatted table."""
95 | # Create main table for classification results
96 | results_table = Table(title=title, box=box.ROUNDED, show_header=True, expand=True)
97 | results_table.add_column("Category", style="cyan", no_wrap=True)
98 | results_table.add_column("Confidence", style="green", justify="right")
99 | results_table.add_column("Explanation", style="white")
100 |
101 | for classification in result.get("classifications", []):
102 | confidence = classification.get("confidence", 0.0)
103 | confidence_str = f"{confidence:.4f}"
104 | confidence_color = "green" if confidence > 0.8 else "yellow" if confidence > 0.6 else "red"
105 |
106 | results_table.add_row(
107 | classification.get("category", "Unknown"),
108 | f"[{confidence_color}]{confidence_str}[/{confidence_color}]",
109 | escape(classification.get("explanation", ""))[:100] + ("..." if len(classification.get("explanation", "")) > 100 else "")
110 | )
111 |
112 | # Create metadata table
113 | meta_table = Table(show_header=False, box=None, expand=False)
114 | meta_table.add_column("Metric", style="cyan")
115 | meta_table.add_column("Value", style="white")
116 | meta_table.add_row("Provider", result.get("provider", "unknown"))
117 | meta_table.add_row("Model", result.get("model", "unknown"))
118 | meta_table.add_row("Processing Time", f"{result.get('processing_time', 0.0):.3f}s")
119 | meta_table.add_row("Input Tokens", str(result.get("tokens", {}).get("input", 0)))
120 | meta_table.add_row("Output Tokens", str(result.get("tokens", {}).get("output", 0)))
121 | meta_table.add_row("Cost", f"${result.get('cost', 0.0):.6f}")
122 |
123 | if "dominant_category" in result:
124 | meta_table.add_row("Dominant Category", result["dominant_category"])
125 |
126 | if "ensemble_models" in result:
127 | meta_table.add_row("Ensemble Models", ", ".join(result["ensemble_models"]))
128 |
129 | # Display text sample if provided
130 | if text_sample:
131 | text_panel = Panel(
132 | escape(text_sample[:300] + ("..." if len(text_sample) > 300 else "")),
133 | title="Sample Text",
134 | border_style="blue",
135 | expand=False
136 | )
137 | console.print(text_panel)
138 |
139 | # Display categories if provided
140 | if categories:
141 | cat_display = ""
142 | if isinstance(categories, dict):
143 | for parent, subcats in categories.items():
144 | cat_display += f"- {parent}\n"
145 | for sub in subcats:
146 | cat_display += f" - {parent}/{sub}\n"
147 | else:
148 | for cat in categories:
149 | cat_display += f"- {cat}\n"
150 |
151 | cat_panel = Panel(
152 | cat_display.strip(),
153 | title="Classification Categories",
154 | border_style="green",
155 | expand=False
156 | )
157 | console.print(cat_panel)
158 |
159 | # Display results and metadata
160 | console.print(results_table)
161 | console.print(meta_table)
162 |
163 | async def demo_basic_classification(tracker: CostTracker): # Add tracker
164 | """Demonstrate basic single-label classification with zero-shot."""
165 | console.print(Rule("[bold blue]Basic Text Classification Demo[/bold blue]"))
166 | logger.info("Starting basic classification demo", emoji_key="start")
167 |
168 | # Load news samples
169 | news_samples = extract_samples_from_file(NEWS_SAMPLES_PATH)
170 |
171 | # Simple categories for news classification
172 | categories = [
173 | "Technology",
174 | "Sports",
175 | "Politics",
176 | "Health",
177 | "Entertainment",
178 | "Science",
179 | "Business",
180 | "Education"
181 | ]
182 |
183 | # Select a sample
184 | sample_key = "TECH NEWS SAMPLE"
185 | sample_text = news_samples[sample_key]
186 |
187 | logger.info(f"Classifying a {sample_key} with zero-shot strategy", emoji_key="processing")
188 |
189 | # Run classification
190 | start_time = time.time()
191 | with Progress(
192 | SpinnerColumn(),
193 | TextColumn("[progress.description]{task.description}"),
194 | console=console
195 | ) as progress:
196 | progress.add_task("Classifying text...", total=None)
197 | result = await text_classification(
198 | text=sample_text,
199 | categories=categories,
200 | provider=Provider.OPENAI.value,
201 | model="gpt-3.5-turbo", # Using a simpler model for basic demo
202 | multi_label=False,
203 | confidence_threshold=0.5,
204 | strategy=ClassificationStrategy.ZERO_SHOT,
205 | explanation_detail="brief"
206 | )
207 |
208 | # Track cost if possible
209 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
210 | try:
211 | trackable = TrackableResult(
212 | cost=result.get("cost", 0.0),
213 | input_tokens=result.get("tokens", {}).get("input", 0),
214 | output_tokens=result.get("tokens", {}).get("output", 0),
215 | provider=result.get("provider", "unknown"),
216 | model=result.get("model", "unknown"),
217 | processing_time=result.get("processing_time", 0.0)
218 | )
219 | tracker.add_call(trackable)
220 | except Exception as track_err:
221 | logger.warning(f"Could not track cost for basic classification: {track_err}", exc_info=False)
222 |
223 | # Record actual time (may differ from model reported time)
224 | elapsed_time = time.time() - start_time
225 | result["actual_processing_time"] = elapsed_time
226 |
227 | # Cache result for comparison
228 | DEMO_RESULTS_CACHE["basic"] = result
229 |
230 | # Export result if enabled
231 | if EXPORT_RESULTS:
232 | export_result("basic_classification", result, sample_text, categories)
233 |
234 | # Display result
235 | logger.success(f"Basic classification completed in {elapsed_time:.3f}s", emoji_key="success")
236 | display_classification_result(
237 | result,
238 | "Basic Single-Label Classification (Zero-Shot)",
239 | text_sample=sample_text,
240 | categories=categories
241 | )
242 | console.print()
243 | return True
244 |
245 | async def demo_multi_label_classification(tracker: CostTracker): # Add tracker
246 | """Demonstrate multi-label classification."""
247 | console.print(Rule("[bold blue]Multi-Label Classification Demo[/bold blue]"))
248 | logger.info("Starting multi-label classification demo", emoji_key="start")
249 |
250 | # Load support ticket samples
251 | ticket_samples = extract_samples_from_file(SUPPORT_TICKETS_PATH)
252 |
253 | # Select a complex sample that might have multiple labels
254 | sample_key = "BUG REPORT"
255 | sample_text = ticket_samples[sample_key]
256 |
257 | # Categories for support tickets
258 | categories = [
259 | "Bug Report",
260 | "Feature Request",
261 | "Account Issue",
262 | "Billing Question",
263 | "Technical Question",
264 | "Security Issue",
265 | "Performance Problem",
266 | "UI/UX Feedback"
267 | ]
268 |
269 | logger.info("Classifying support ticket with multi-label strategy", emoji_key="processing")
270 |
271 | # Run classification
272 | with Progress(
273 | SpinnerColumn(),
274 | TextColumn("[progress.description]{task.description}"),
275 | console=console
276 | ) as progress:
277 | progress.add_task("Classifying with multiple labels...", total=None)
278 | result = await text_classification(
279 | text=sample_text,
280 | categories=categories,
281 | provider=Provider.OPENAI.value,
282 | model="gpt-4-mini", # Using a better model for nuanced classification
283 | multi_label=True,
284 | confidence_threshold=0.3, # Lower threshold to catch secondary categories
285 | strategy=ClassificationStrategy.STRUCTURED,
286 | explanation_detail="brief",
287 | max_results=3 # Get top 3 matching categories
288 | )
289 |
290 | # Track cost if possible
291 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
292 | try:
293 | trackable = TrackableResult(
294 | cost=result.get("cost", 0.0),
295 | input_tokens=result.get("tokens", {}).get("input", 0),
296 | output_tokens=result.get("tokens", {}).get("output", 0),
297 | provider=result.get("provider", "unknown"),
298 | model=result.get("model", "unknown"),
299 | processing_time=result.get("processing_time", 0.0)
300 | )
301 | tracker.add_call(trackable)
302 | except Exception as track_err:
303 | logger.warning(f"Could not track cost for multi-label classification: {track_err}", exc_info=False)
304 |
305 | # Cache result for comparison
306 | DEMO_RESULTS_CACHE["multi_label"] = result
307 |
308 | # Display result
309 | logger.success("Multi-label classification completed", emoji_key="success")
310 | display_classification_result(
311 | result,
312 | "Multi-Label Classification (Structured)",
313 | text_sample=sample_text,
314 | categories=categories
315 | )
316 | console.print()
317 | return True
318 |
319 | async def demo_hierarchical_classification(tracker: CostTracker): # Add tracker
320 | """Demonstrate hierarchical category classification."""
321 | console.print(Rule("[bold blue]Hierarchical Classification Demo[/bold blue]"))
322 | logger.info("Starting hierarchical classification demo", emoji_key="start")
323 |
324 | # Load product review samples
325 | review_samples = extract_samples_from_file(PRODUCT_REVIEWS_PATH)
326 |
327 | # Select a sample
328 | sample_key = "POSITIVE REVIEW"
329 | sample_text = review_samples[sample_key]
330 |
331 | # Hierarchical categories for product reviews
332 | categories = {
333 | "Sentiment": ["Positive", "Negative", "Neutral"],
334 | "Product Type": ["Electronics", "Appliance", "Clothing", "Software"],
335 | "Aspect": ["Performance", "Quality", "Price", "Customer Service", "Design", "Usability"]
336 | }
337 |
338 | logger.info("Classifying product review with hierarchical categories", emoji_key="processing")
339 |
340 | # Run classification
341 | with Progress(
342 | SpinnerColumn(),
343 | TextColumn("[progress.description]{task.description}"),
344 | console=console
345 | ) as progress:
346 | progress.add_task("Classifying with hierarchical categories...", total=None)
347 | result = await text_classification(
348 | text=sample_text,
349 | categories=categories,
350 | provider=Provider.OPENAI.value,
351 | model="gpt-4-mini",
352 | multi_label=True, # Allow selecting one from each hierarchy
353 | confidence_threshold=0.6,
354 | strategy=ClassificationStrategy.STRUCTURED,
355 | explanation_detail="brief",
356 | taxonomy_description=(
357 | "This taxonomy categorizes product reviews across multiple dimensions: "
358 | "the sentiment (overall positivity/negativity), the type of product being discussed, "
359 | "and the specific aspects of the product mentioned in the review."
360 | )
361 | )
362 |
363 | # Track cost if possible
364 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
365 | try:
366 | trackable = TrackableResult(
367 | cost=result.get("cost", 0.0),
368 | input_tokens=result.get("tokens", {}).get("input", 0),
369 | output_tokens=result.get("tokens", {}).get("output", 0),
370 | provider=result.get("provider", "unknown"),
371 | model=result.get("model", "unknown"),
372 | processing_time=result.get("processing_time", 0.0)
373 | )
374 | tracker.add_call(trackable)
375 | except Exception as track_err:
376 | logger.warning(f"Could not track cost for hierarchical classification: {track_err}", exc_info=False)
377 |
378 | # Cache result for comparison
379 | DEMO_RESULTS_CACHE["hierarchical"] = result
380 |
381 | # Display result
382 | logger.success("Hierarchical classification completed", emoji_key="success")
383 | display_classification_result(
384 | result,
385 | "Hierarchical Multi-Label Classification",
386 | text_sample=sample_text,
387 | categories=categories
388 | )
389 | console.print()
390 | return True
391 |
392 | async def demo_few_shot_classification(tracker: CostTracker): # Add tracker
393 | """Demonstrate few-shot learning classification."""
394 | console.print(Rule("[bold blue]Few-Shot Classification Demo[/bold blue]"))
395 | logger.info("Starting few-shot classification demo", emoji_key="start")
396 |
397 | # Load email samples
398 | email_samples = extract_samples_from_file(EMAIL_SAMPLES_PATH)
399 |
400 | # Select a sample to classify
401 | sample_key = "PHISHING EMAIL"
402 | sample_text = email_samples[sample_key]
403 |
404 | # Categories for email classification
405 | categories = [
406 | "Spam",
407 | "Phishing",
408 | "Promotional",
409 | "Informational",
410 | "Urgent",
411 | "Personal",
412 | "Transactional"
413 | ]
414 |
415 | # Create example data for few-shot learning
416 | examples = [
417 | {
418 | "text": email_samples["SPAM EMAIL"],
419 | "categories": ["Spam"]
420 | },
421 | {
422 | "text": email_samples["PROMOTIONAL EMAIL"],
423 | "categories": ["Promotional"]
424 | },
425 | {
426 | "text": email_samples["PERSONAL EMAIL"],
427 | "categories": ["Personal"]
428 | }
429 | ]
430 |
431 | logger.info("Classifying email with few-shot learning", emoji_key="processing")
432 |
433 | # Run classification
434 | with Progress(
435 | SpinnerColumn(),
436 | TextColumn("[progress.description]{task.description}"),
437 | console=console
438 | ) as progress:
439 | progress.add_task("Classifying with few-shot examples...", total=None)
440 | result = await text_classification(
441 | text=sample_text,
442 | categories=categories,
443 | provider=Provider.OPENAI.value,
444 | model="gpt-3.5-turbo", # Few-shot works well with simpler models
445 | multi_label=False,
446 | confidence_threshold=0.5,
447 | strategy=ClassificationStrategy.FEW_SHOT,
448 | examples=examples,
449 | explanation_detail="detailed" # More detailed explanation
450 | )
451 |
452 | # Track cost if possible
453 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
454 | try:
455 | trackable = TrackableResult(
456 | cost=result.get("cost", 0.0),
457 | input_tokens=result.get("tokens", {}).get("input", 0),
458 | output_tokens=result.get("tokens", {}).get("output", 0),
459 | provider=result.get("provider", "unknown"),
460 | model=result.get("model", "unknown"),
461 | processing_time=result.get("processing_time", 0.0)
462 | )
463 | tracker.add_call(trackable)
464 | except Exception as track_err:
465 | logger.warning(f"Could not track cost for few-shot classification: {track_err}", exc_info=False)
466 |
467 | # Cache result for comparison
468 | DEMO_RESULTS_CACHE["few_shot"] = result
469 |
470 | # Display examples provided
471 | example_table = Table(title="Few-Shot Examples Provided", box=box.SIMPLE)
472 | example_table.add_column("Example", style="cyan")
473 | example_table.add_column("Category", style="green")
474 | example_table.add_column("Text Sample", style="white", max_width=60)
475 |
476 | for i, example in enumerate(examples):
477 | example_table.add_row(
478 | f"Example {i+1}",
479 | ", ".join(example["categories"]),
480 | escape(example["text"][:100] + "...")
481 | )
482 |
483 | console.print(example_table)
484 | console.print()
485 |
486 | # Display result
487 | logger.success("Few-shot classification completed", emoji_key="success")
488 | display_classification_result(
489 | result,
490 | "Few-Shot Classification",
491 | text_sample=sample_text,
492 | categories=categories
493 | )
494 | console.print()
495 | return True
496 |
497 | async def demo_ensemble_classification(tracker: CostTracker): # Add tracker
498 | """Demonstrate ensemble classification using multiple providers."""
499 | console.print(Rule("[bold blue]Ensemble Classification Demo[/bold blue]"))
500 | logger.info("Starting ensemble classification demo", emoji_key="start")
501 |
502 | # Load support ticket samples again but use a different one
503 | ticket_samples = extract_samples_from_file(SUPPORT_TICKETS_PATH)
504 |
505 | # Select a complex sample
506 | sample_key = "FEATURE REQUEST"
507 | sample_text = ticket_samples[sample_key]
508 |
509 | # Categories for support tickets (same as before)
510 | categories = [
511 | "Bug Report",
512 | "Feature Request",
513 | "Account Issue",
514 | "Billing Question",
515 | "Technical Question",
516 | "Security Issue",
517 | "Performance Problem",
518 | "UI/UX Feedback"
519 | ]
520 |
521 | # Configure ensemble with multiple models
522 | ensemble_config = [
523 | {
524 | "provider": Provider.OPENAI.value,
525 | "model": "gpt-3.5-turbo",
526 | "weight": 0.3,
527 | "params": {"temperature": 0.1}
528 | },
529 | {
530 | "provider": Provider.OPENAI.value,
531 | "model": "gpt-4-mini",
532 | "weight": 0.7,
533 | "params": {"temperature": 0.1}
534 | }
535 | # In a real-world scenario, you might include models from different providers
536 | ]
537 |
538 | logger.info("Classifying support ticket with ensemble strategy", emoji_key="processing")
539 |
540 | # Run classification
541 | with Progress(
542 | SpinnerColumn(),
543 | TextColumn("[progress.description]{task.description}"),
544 | console=console
545 | ) as progress:
546 | progress.add_task("Classifying with multiple models...", total=None)
547 | result = await text_classification(
548 | text=sample_text,
549 | categories=categories,
550 | provider=Provider.OPENAI.value, # Base provider (though ensemble will use multiple)
551 | multi_label=True,
552 | confidence_threshold=0.4,
553 | strategy=ClassificationStrategy.ENSEMBLE,
554 | explanation_detail="brief",
555 | ensemble_config=ensemble_config,
556 | allow_abstain=True,
557 | # abstention_threshold=0.4 # Optionally set abstention threshold
558 | )
559 |
560 | # Track cost (The tool result should contain aggregated cost/tokens)
561 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
562 | try:
563 | trackable = TrackableResult(
564 | cost=result.get("cost", 0.0),
565 | input_tokens=result.get("tokens", {}).get("input", 0),
566 | output_tokens=result.get("tokens", {}).get("output", 0),
567 | provider=result.get("provider", "ensemble"), # Provider is 'ensemble'
568 | model=result.get("model", "ensemble"), # Model is 'ensemble'
569 | processing_time=result.get("processing_time", 0.0)
570 | )
571 | tracker.add_call(trackable)
572 | except Exception as track_err:
573 | logger.warning(f"Could not track cost for ensemble classification: {track_err}", exc_info=False)
574 |
575 | # Cache result for comparison
576 | DEMO_RESULTS_CACHE["ensemble"] = result
577 |
578 | # Display ensemble config
579 | ensemble_table = Table(title="Ensemble Configuration", box=box.SIMPLE)
580 | ensemble_table.add_column("Provider", style="cyan")
581 | ensemble_table.add_column("Model", style="green")
582 | ensemble_table.add_column("Weight", style="yellow")
583 |
584 | for config in ensemble_config:
585 | ensemble_table.add_row(
586 | config["provider"],
587 | config["model"],
588 | f"{config['weight']:.2f}"
589 | )
590 |
591 | console.print(ensemble_table)
592 | console.print()
593 |
594 | # Display result
595 | logger.success("Ensemble classification completed", emoji_key="success")
596 | display_classification_result(
597 | result,
598 | "Ensemble Classification",
599 | text_sample=sample_text,
600 | categories=categories
601 | )
602 | console.print()
603 | return True
604 |
605 | async def demo_custom_prompt_template(tracker: CostTracker): # Add tracker
606 | """Demonstrate classification with a custom prompt template."""
607 | console.print(Rule("[bold blue]Custom Prompt Template Demo[/bold blue]"))
608 | logger.info("Starting custom prompt template demo", emoji_key="start")
609 |
610 | # Load news samples again but use a different one
611 | news_samples = extract_samples_from_file(NEWS_SAMPLES_PATH)
612 |
613 | # Select a different sample
614 | sample_key = "SCIENCE NEWS SAMPLE"
615 | sample_text = news_samples[sample_key]
616 |
617 | # Simple categories for news classification
618 | categories = [
619 | "Technology",
620 | "Sports",
621 | "Politics",
622 | "Health",
623 | "Entertainment",
624 | "Science",
625 | "Business",
626 | "Education"
627 | ]
628 |
629 | # Create a custom prompt template
630 | custom_template = """
631 | You are a highly specialized news classification assistant.
632 |
633 | I need you to analyze the following text and determine which category it belongs to:
634 | {categories}
635 |
636 | When classifying, consider:
637 | - The topic and subject matter
638 | - The terminology and jargon used
639 | - The intended audience
640 | - The writing style and tone
641 |
642 | CLASSIFICATION FORMAT:
643 | {format_instruction}
644 |
645 | TEXT TO CLASSIFY:
646 | ```
647 | {text}
648 | ```
649 |
650 | Please provide your analysis now.
651 | """
652 |
653 | logger.info("Classifying news with custom prompt template", emoji_key="processing")
654 |
655 | # Run classification
656 | with Progress(
657 | SpinnerColumn(),
658 | TextColumn("[progress.description]{task.description}"),
659 | console=console
660 | ) as progress:
661 | progress.add_task("Classifying with custom prompt...", total=None)
662 | result = await text_classification(
663 | text=sample_text,
664 | categories=categories,
665 | provider=Provider.OPENAI.value,
666 | model="gpt-4-mini",
667 | multi_label=False,
668 | confidence_threshold=0.5,
669 | strategy=ClassificationStrategy.STRUCTURED,
670 | explanation_detail="detailed",
671 | custom_prompt_template=custom_template
672 | )
673 |
674 | # Track cost if possible
675 | if all(k in result for k in ["cost", "provider", "model"]) and "tokens" in result:
676 | try:
677 | trackable = TrackableResult(
678 | cost=result.get("cost", 0.0),
679 | input_tokens=result.get("tokens", {}).get("input", 0),
680 | output_tokens=result.get("tokens", {}).get("output", 0),
681 | provider=result.get("provider", "unknown"),
682 | model=result.get("model", "unknown"),
683 | processing_time=result.get("processing_time", 0.0)
684 | )
685 | tracker.add_call(trackable)
686 | except Exception as track_err:
687 | logger.warning(f"Could not track cost for custom prompt classification: {track_err}", exc_info=False)
688 |
689 | # Cache result for comparison
690 | DEMO_RESULTS_CACHE["custom_prompt"] = result
691 |
692 | # Display custom prompt template
693 | prompt_panel = Panel(
694 | escape(custom_template),
695 | title="Custom Prompt Template",
696 | border_style="magenta",
697 | expand=False
698 | )
699 | console.print(prompt_panel)
700 | console.print()
701 |
702 | # Display result
703 | logger.success("Custom prompt classification completed", emoji_key="success")
704 | display_classification_result(
705 | result,
706 | "Classification with Custom Prompt",
707 | text_sample=sample_text,
708 | categories=categories
709 | )
710 | console.print()
711 | return True
712 |
713 | def export_result(name, result, text_sample, categories):
714 | """Export classification result to JSON file."""
715 | if not os.path.exists(RESULTS_DIR):
716 | os.makedirs(RESULTS_DIR)
717 |
718 | # Create timestamp for filename
719 | timestamp = time.strftime("%Y%m%d-%H%M%S")
720 | filename = f"{RESULTS_DIR}/{name}_{timestamp}.json"
721 |
722 | # Prepare data to export
723 | export_data = {
724 | "timestamp": time.time(),
725 | "datetime": time.strftime("%Y-%m-%d %H:%M:%S"),
726 | "result": result,
727 | "sample_text": text_sample,
728 | "categories": categories
729 | }
730 |
731 | # Write to file
732 | with open(filename, "w", encoding="utf-8") as f:
733 | json.dump(export_data, f, indent=2)
734 |
735 | logger.info(f"Exported results to {filename}", emoji_key="save")
736 |
737 | async def demo_comparison(tracker: CostTracker):
738 | """Compare different classification strategies."""
739 | console.print(Rule("[bold blue]Classification Strategies Comparison[/bold blue]"))
740 | logger.info("Comparing classification strategies", emoji_key="analytics")
741 |
742 | # Check if we have all cached results
743 | required_demos = ["basic", "multi_label", "hierarchical", "few_shot", "ensemble", "custom_prompt"]
744 | missing = [demo for demo in required_demos if demo not in DEMO_RESULTS_CACHE]
745 |
746 | if missing:
747 | logger.warning(f"Missing results for comparison: {', '.join(missing)}", emoji_key="warning")
748 | console.print("[yellow]Some demo results are missing for comparison. Run all demos first.[/yellow]")
749 | return False
750 |
751 | # Create a comparison table
752 | comparison = Table(title="Classification Strategies Comparison", box=box.ROUNDED)
753 | comparison.add_column("Strategy", style="cyan")
754 | comparison.add_column("Provider/Model", style="green")
755 | comparison.add_column("Tokens", style="yellow")
756 | comparison.add_column("Processing Time", style="magenta")
757 | comparison.add_column("Cost", style="red")
758 | comparison.add_column("Notes", style="white")
759 |
760 | for strategy, result in DEMO_RESULTS_CACHE.items():
761 | provider_model = f"{result.get('provider', 'unknown')}/{result.get('model', 'unknown')}"
762 | tokens = result.get("tokens", {}).get("total", 0)
763 | time_taken = f"{result.get('processing_time', 0.0):.3f}s"
764 | cost = f"${result.get('cost', 0.0):.6f}"
765 |
766 | # Add strategy-specific notes
767 | notes = ""
768 | if strategy == "basic":
769 | notes = "Simple and efficient for clear categories"
770 | elif strategy == "multi_label":
771 | notes = f"Found {len(result.get('classifications', []))} categories"
772 | elif strategy == "hierarchical":
773 | notes = "Effective for multi-dimensional taxonomies"
774 | elif strategy == "few_shot":
775 | notes = "Improved accuracy with example learning"
776 | elif strategy == "ensemble":
777 | notes = f"Aggregated {len(result.get('ensemble_models', []))} models"
778 | elif strategy == "custom_prompt":
779 | notes = "Tailored instruction for specific domain"
780 |
781 | # Format strategy name for display
782 | strategy_display = strategy.replace("_", " ").title()
783 |
784 | comparison.add_row(strategy_display, provider_model, str(tokens), time_taken, cost, notes)
785 |
786 | console.print(comparison)
787 | console.print()
788 |
789 | # Generate chart data for cost comparison
790 | costs = {k: v.get("cost", 0.0) for k, v in DEMO_RESULTS_CACHE.items()}
791 | tokens = {k: v.get("tokens", {}).get("total", 0) for k, v in DEMO_RESULTS_CACHE.items()}
792 | times = {k: v.get("processing_time", 0.0) for k, v in DEMO_RESULTS_CACHE.items()}
793 |
794 | # Create visual dashboard of results
795 | display_visual_dashboard(costs, tokens, times)
796 |
797 | # Export comparison data if enabled
798 | if EXPORT_RESULTS:
799 | comparison_data = {
800 | "timestamp": time.time(),
801 | "datetime": time.strftime("%Y-%m-%d %H:%M:%S"),
802 | "costs": costs,
803 | "tokens": tokens,
804 | "times": times,
805 | "full_results": DEMO_RESULTS_CACHE
806 | }
807 | with open(f"{RESULTS_DIR}/comparison_{time.strftime('%Y%m%d-%H%M%S')}.json", "w") as f:
808 | json.dump(comparison_data, f, indent=2)
809 | logger.info(f"Exported comparison data to {RESULTS_DIR}", emoji_key="save")
810 |
811 | # Display conclusion
812 | conclusion_panel = Panel(
813 | "Classification strategies comparison shows tradeoffs between accuracy, cost, and performance.\n\n"
814 | "- Zero-shot: Fastest and cheapest, good for simple categories\n"
815 | "- Few-shot: Better accuracy with examples, moderate cost increase\n"
816 | "- Hierarchical: Excellent for complex taxonomies, higher token usage\n"
817 | "- Ensemble: Highest accuracy but also highest cost and processing time\n"
818 | "- Custom prompt: Tailored for specific domains, good balance of accuracy and efficiency",
819 | title="Strategy Selection Guidelines",
820 | border_style="green",
821 | expand=False
822 | )
823 | console.print(conclusion_panel)
824 |
825 | return True
826 |
827 | def display_visual_dashboard(costs, tokens, times):
828 | """Display a visual dashboard of classification metrics using Rich Layout."""
829 | # Create layout
830 | layout = Layout()
831 |
832 | # Split into sections
833 | layout.split(
834 | Layout(name="header", size=3),
835 | Layout(name="main", ratio=1)
836 | )
837 |
838 | # Split main section into columns
839 | layout["main"].split_row(
840 | Layout(name="costs", ratio=1),
841 | Layout(name="tokens", ratio=1),
842 | Layout(name="times", ratio=1)
843 | )
844 |
845 | # Create header
846 | header = Panel(
847 | Text("Classification Strategy Metrics", style="bold magenta"),
848 | box=box.ROUNDED
849 | )
850 |
851 | # Create visualization panels
852 | costs_panel = create_metric_panel(costs, "Classification Costs ($)", "red")
853 | tokens_panel = create_metric_panel(tokens, "Token Usage", "yellow")
854 | times_panel = create_metric_panel(times, "Processing Time (s)", "green")
855 |
856 | # Update layout
857 | layout["header"] = header
858 | layout["main"]["costs"] = costs_panel
859 | layout["main"]["tokens"] = tokens_panel
860 | layout["main"]["times"] = times_panel
861 |
862 | # Display dashboard
863 | console.print(layout)
864 |
865 | def create_metric_panel(data, title, color):
866 | """Create a panel with visualization of metric data."""
867 | # Find max value for scaling
868 | max_value = max(data.values()) if data else 1
869 | scale_factor = 20 # Bar length scaling
870 |
871 | # Generate content
872 | content = ""
873 | for strategy, value in data.items():
874 | bar_length = int((value / max_value) * scale_factor) if max_value > 0 else 0
875 | bar = "█" * bar_length
876 | strategy_display = strategy.replace("_", " ").title()
877 | content += f"{strategy_display.ljust(15)} │ [{color}]{bar}[/{color}] {value:.4f}\n"
878 |
879 | return Panel(content, title=title, border_style=color)
880 |
881 | async def run_all_demos(tracker: CostTracker): # Add tracker
882 | """Run all classification demos in sequence."""
883 | console.print(Rule("[bold magenta]Text Classification Comprehensive Demo[/bold magenta]"))
884 | logger.info("Starting comprehensive text classification demo", emoji_key="start")
885 |
886 | start_time = time.time()
887 | success = True
888 |
889 | # Create results directory if exporting is enabled
890 | if EXPORT_RESULTS and not os.path.exists(RESULTS_DIR):
891 | os.makedirs(RESULTS_DIR)
892 | logger.info(f"Created results directory at {RESULTS_DIR}", emoji_key="folder")
893 |
894 | # Setup live display for overall progress
895 | overall_progress = Table.grid(expand=True)
896 | overall_progress.add_column()
897 | overall_progress.add_row("[bold blue]Running Text Classification Demo Suite...[/bold blue]")
898 | overall_progress.add_row("[cyan]Press Ctrl+C to abort[/cyan]")
899 |
900 | try:
901 | # Create a live display that updates during the demo
902 | with Live(overall_progress, refresh_per_second=4, console=console):
903 | # Run demos with timeout protection
904 | demo_tasks = [
905 | asyncio.create_task(demo_basic_classification(tracker)), # Pass tracker
906 | asyncio.create_task(demo_multi_label_classification(tracker)), # Pass tracker
907 | asyncio.create_task(demo_hierarchical_classification(tracker)), # Pass tracker
908 | asyncio.create_task(demo_few_shot_classification(tracker)), # Pass tracker
909 | asyncio.create_task(demo_ensemble_classification(tracker)), # Pass tracker
910 | asyncio.create_task(demo_custom_prompt_template(tracker)) # Pass tracker
911 | ]
912 |
913 | # Run all demos with timeout
914 | completed, pending = await asyncio.wait(
915 | demo_tasks,
916 | timeout=DEMO_TIMEOUT,
917 | return_when=asyncio.ALL_COMPLETED
918 | )
919 |
920 | # Cancel any pending tasks
921 | for task in pending:
922 | task.cancel()
923 | overall_progress.add_row(f"[yellow]Demo timed out after {DEMO_TIMEOUT}s[/yellow]")
924 |
925 | # Compare results if we have enough demos completed
926 | if len(completed) >= 3: # Require at least 3 demos for comparison
927 | await demo_comparison(tracker)
928 |
929 | except asyncio.CancelledError:
930 | logger.warning("Demo was cancelled by user", emoji_key="cancel")
931 | success = False
932 | except Exception as e:
933 | logger.critical(f"Text classification demo failed: {str(e)}", emoji_key="critical", exc_info=True)
934 | console.print(f"[bold red]Critical Demo Error:[/bold red] {escape(str(e))}")
935 | success = False
936 |
937 | # Calculate total time
938 | total_time = time.time() - start_time
939 |
940 | # Display cost summary
941 | tracker.display_summary(console)
942 |
943 | if success:
944 | logger.success(f"Text Classification Demo Completed Successfully in {total_time:.2f}s!", emoji_key="complete")
945 | console.print(Rule(f"[bold magenta]Text Classification Demo Complete ({total_time:.2f}s)[/bold magenta]"))
946 | return 0
947 | else:
948 | logger.error(f"Text classification demo failed after {total_time:.2f}s", emoji_key="error")
949 | console.print(Rule("[bold red]Text Classification Demo Failed[/bold red]"))
950 | return 1
951 |
952 | async def main():
953 | """Run the full text classification demo suite."""
954 | tracker = CostTracker() # Instantiate tracker
955 | try:
956 | return await run_all_demos(tracker) # Pass tracker
957 | except Exception as e:
958 | logger.critical(f"Demo failed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
959 | return 1
960 |
961 | if __name__ == "__main__":
962 | # Check for environment variables and display configuration
963 | if EXPORT_RESULTS:
964 | console.print(f"[blue]Results will be exported to: {RESULTS_DIR}[/blue]")
965 |
966 | # Check if sample files exist
967 | if not all(path.exists() for path in [NEWS_SAMPLES_PATH, PRODUCT_REVIEWS_PATH, SUPPORT_TICKETS_PATH, EMAIL_SAMPLES_PATH]):
968 | console.print("[bold red]Error:[/bold red] Sample data files not found. Please ensure all sample files exist in examples/sample/text_classification_samples/")
969 | sys.exit(1)
970 |
971 | # Run the demo
972 | exit_code = asyncio.run(main())
973 | sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/ultimate_mcp_server/utils/text.py:
--------------------------------------------------------------------------------
```python
1 | """Text processing utilities for Ultimate MCP Server."""
2 | import re
3 | import string
4 | from typing import Any, Dict, List, Optional
5 |
6 | from ultimate_mcp_server.utils import get_logger
7 |
8 | logger = get_logger(__name__)
9 |
10 |
11 | def truncate_text(text: str, max_length: int, add_ellipsis: bool = True) -> str:
12 | """Intelligently truncate text to a maximum length at natural boundaries.
13 |
14 | This function shortens text to fit within a specified maximum length while
15 | attempting to preserve semantic coherence by cutting at natural text boundaries
16 | like the end of sentences or paragraphs. This produces more readable truncated
17 | text compared to simple character-based truncation.
18 |
19 | The truncation algorithm:
20 | 1. If text is already shorter than max_length, return it unchanged
21 | 2. Otherwise, truncate at max_length character position
22 | 3. Look backwards for a natural boundary (., ?, !, or paragraph break)
23 | 4. If a good boundary is found beyond 80% of max_length, truncate there
24 | 5. Optionally add an ellipsis to indicate truncation has occurred
25 |
26 | This approach is useful for:
27 | - Creating text previews or snippets
28 | - Fitting text into UI components with size constraints
29 | - Preparing content for displays with character limits
30 | - Generating summaries while maintaining readability
31 |
32 | Args:
33 | text: Text to truncate. Can be any length, including empty.
34 | max_length: Maximum character length of the returned text (not including ellipsis)
35 | add_ellipsis: Whether to append "..." to truncated text (default: True)
36 |
37 | Returns:
38 | Truncated text, optionally with ellipsis. If the input text is shorter than
39 | max_length, it's returned unchanged. If the input is None or empty, it's returned as is.
40 |
41 | Examples:
42 | >>> # Basic truncation
43 | >>> truncate_text("This is a long sentence that needs truncation.", 20)
44 | 'This is a long...'
45 |
46 | >>> # Truncation at sentence boundary
47 | >>> truncate_text("Short sentence. Another sentence. Yet another one.", 20)
48 | 'Short sentence...'
49 |
50 | >>> # Without ellipsis
51 | >>> truncate_text("A very long text to truncate.", 10, add_ellipsis=False)
52 | 'A very lon'
53 |
54 | >>> # No truncation needed
55 | >>> truncate_text("Short text", 20)
56 | 'Short text'
57 | """
58 | if not text or len(text) <= max_length:
59 | return text
60 |
61 | # Try to truncate at sentence boundary
62 | truncated = text[:max_length]
63 |
64 | # Find the last sentence boundary in the truncated text
65 | last_boundary = max(
66 | truncated.rfind('. '),
67 | truncated.rfind('? '),
68 | truncated.rfind('! '),
69 | truncated.rfind('\n\n')
70 | )
71 |
72 | if last_boundary > max_length * 0.8: # Only truncate at boundary if it's not too short
73 | truncated = truncated[:last_boundary + 1]
74 |
75 | # Add ellipsis if requested and text was truncated
76 | if add_ellipsis and len(text) > len(truncated):
77 | truncated = truncated.rstrip() + "..."
78 |
79 | return truncated
80 |
81 |
82 | def count_tokens(text: str, model: Optional[str] = None) -> int:
83 | """Estimate the number of tokens in text for LLM processing.
84 |
85 | This function calculates how many tokens would be consumed when sending text
86 | to a language model. It uses model-specific tokenizers when possible (via tiktoken)
87 | for accurate counts, or falls back to a character-based heuristic estimation when
88 | tokenizers aren't available.
89 |
90 | Token count is important for:
91 | - Estimating LLM API costs (which are typically billed per token)
92 | - Ensuring text fits within model context windows
93 | - Optimizing content to minimize token usage
94 | - Debugging token-related issues in model interactions
95 |
96 | The function selects the appropriate tokenizer based on the model parameter:
97 | - For GPT-4o models: Uses the "gpt-4o" tokenizer
98 | - For Claude models: Uses "cl100k_base" tokenizer as an approximation
99 | - For other models or when model is not specified: Uses "cl100k_base" (works well for most recent models)
100 |
101 | If the tiktoken library isn't available, the function falls back to character-based
102 | estimation, which applies heuristics based on character types to approximate token count.
103 |
104 | Args:
105 | text: Text string to count tokens for. Can be any length, including empty.
106 | model: Optional model name to select the appropriate tokenizer. Common values include
107 | "gpt-4o", "gpt-4", "claude-3-5-haiku-20241022", "claude-3-sonnet", etc.
108 |
109 | Returns:
110 | Estimated number of tokens in the text. Returns 0 for empty input.
111 |
112 | Examples:
113 | >>> count_tokens("Hello, world!") # Using default tokenizer
114 | 3
115 |
116 | >>> count_tokens("GPT-4 is a multimodal model.", model="gpt-4o")
117 | 7
118 |
119 | >>> # Using fallback estimation if tiktoken is not available
120 | >>> # (actual result may vary based on the estimation algorithm)
121 | >>> count_tokens("This is a fallback example")
122 | 6
123 |
124 | Dependencies:
125 | - Requires the "tiktoken" library for accurate counting
126 | - Falls back to character-based estimation if tiktoken is not available
127 |
128 | Note:
129 | The character-based fallback estimation is approximate and may differ
130 | from actual tokenization, especially for non-English text, code, or
131 | text with many special characters or numbers.
132 | """
133 | if not text:
134 | return 0
135 |
136 | # Try to use tiktoken if available (accurate for OpenAI models)
137 | try:
138 | import tiktoken
139 |
140 | # Select encoding based on model
141 | if model and model.startswith("gpt-4o"):
142 | encoding = tiktoken.encoding_for_model("gpt-4o")
143 | elif model and "claude" in model.lower():
144 | # For Claude, use cl100k_base as approximation
145 | encoding = tiktoken.get_encoding("cl100k_base")
146 | else:
147 | # Default to cl100k_base (used by most recent models)
148 | encoding = tiktoken.get_encoding("cl100k_base")
149 |
150 | return len(encoding.encode(text))
151 |
152 | except ImportError:
153 | # Fallback to character-based estimation if tiktoken is not available
154 | return _estimate_tokens_by_chars(text)
155 |
156 |
157 | def _estimate_tokens_by_chars(text: str) -> int:
158 | """Estimate token count using character-based heuristics when tokenizers aren't available.
159 |
160 | This internal fallback function provides a rough approximation of token count based on
161 | character analysis when the preferred tokenizer-based method (via tiktoken) is not available.
162 | It applies various heuristics based on observed tokenization patterns across common models.
163 |
164 | The estimation algorithm works as follows:
165 | 1. Use a base ratio of 4.0 characters per token (average for English text)
166 | 2. Count the total number of characters in the text
167 | 3. Apply adjustments based on character types:
168 | - Whitespace: Count separately and add with reduced weight (0.5)
169 | since whitespace is often combined with other characters in tokens
170 | - Digits: Count separately and subtract weight (0.5)
171 | since numbers are often encoded more efficiently
172 | 4. Calculate the final token estimate based on adjusted character count
173 |
174 | While not as accurate as model-specific tokenizers, this approach provides a reasonable
175 | approximation that works across different languages and text types. The approximation
176 | tends to be more accurate for:
177 | - Plain English text with standard punctuation
178 | - Text with a typical mix of words and whitespace
179 | - Content with a moderate number of special characters
180 |
181 | The estimation may be less accurate for:
182 | - Text with many numbers or special characters
183 | - Code snippets or markup languages
184 | - Non-Latin script languages
185 | - Very short texts (under 10 characters)
186 |
187 | Args:
188 | text: Text string to estimate token count for
189 |
190 | Returns:
191 | Estimated number of tokens (always at least 1 for non-empty text)
192 |
193 | Note:
194 | This function is intended for internal use by count_tokens() as a fallback when
195 | tiktoken is not available. It always returns at least 1 token for any non-empty text.
196 | """
197 | # Character-based estimation (rough approximation)
198 | avg_chars_per_token = 4.0
199 |
200 | # Count characters
201 | char_count = len(text)
202 |
203 | # Account for whitespace more efficiently representing tokens
204 | whitespace_count = sum(1 for c in text if c.isspace())
205 |
206 | # Count numbers (numbers are often encoded efficiently)
207 | digit_count = sum(1 for c in text if c.isdigit())
208 |
209 | # Adjust total count based on character types
210 | adjusted_count = char_count + (whitespace_count * 0.5) - (digit_count * 0.5)
211 |
212 | # Estimate tokens
213 | return max(1, int(adjusted_count / avg_chars_per_token))
214 |
215 |
216 | def normalize_text(
217 | text: str,
218 | lowercase: bool = True,
219 | remove_punctuation: bool = False,
220 | remove_whitespace: bool = False,
221 | remove_urls: bool = False,
222 | remove_numbers: bool = False,
223 | ) -> str:
224 | """Normalize text with configurable cleaning options for text processing.
225 |
226 | This function standardizes text by applying various normalization procedures
227 | based on the specified parameters. It's useful for preparing text for natural
228 | language processing tasks, text comparison, search operations, and other scenarios
229 | where consistent formatting is important.
230 |
231 | The function applies normalizations in a specific order to ensure consistent results:
232 | 1. Lowercase conversion (if enabled)
233 | 2. URL removal (if enabled)
234 | 3. Number removal (if enabled)
235 | 4. Punctuation removal (if enabled)
236 | 5. Whitespace normalization (if enabled)
237 |
238 | Each normalization step is optional and controlled by a separate parameter,
239 | allowing precise control over the transformations applied.
240 |
241 | Args:
242 | text: The input text to normalize. Can be any length, including empty.
243 | lowercase: Whether to convert text to lowercase (default: True)
244 | remove_punctuation: Whether to remove all punctuation marks (default: False)
245 | remove_whitespace: Whether to replace all whitespace sequences (spaces, tabs,
246 | newlines) with a single space and trim leading/trailing
247 | whitespace (default: False)
248 | remove_urls: Whether to remove web URLs (http, https, www) (default: False)
249 | remove_numbers: Whether to remove all numeric digits (default: False)
250 |
251 | Returns:
252 | Normalized text with requested transformations applied. Empty input
253 | text is returned unchanged.
254 |
255 | Examples:
256 | >>> # Default behavior (only lowercase)
257 | >>> normalize_text("Hello World! Visit https://example.com")
258 | 'hello world! visit https://example.com'
259 |
260 | >>> # Multiple normalizations
261 | >>> normalize_text("Hello, World! 123",
262 | ... lowercase=True,
263 | ... remove_punctuation=True,
264 | ... remove_numbers=True)
265 | 'hello world'
266 |
267 | >>> # URL and whitespace normalization
268 | >>> normalize_text("Check https://example.com for more info!",
269 | ... remove_urls=True,
270 | ... remove_whitespace=True)
271 | 'Check for more info!'
272 |
273 | Notes:
274 | - Removing punctuation eliminates all symbols in string.punctuation
275 | - URL removal uses a regex pattern that matches common URL formats
276 | - When remove_whitespace is True, all sequences of whitespace are collapsed
277 | to a single space and leading/trailing whitespace is removed
278 | """
279 | if not text:
280 | return text
281 |
282 | # Convert to lowercase
283 | if lowercase:
284 | text = text.lower()
285 |
286 | # Remove URLs
287 | if remove_urls:
288 | text = re.sub(r'https?://\S+|www\.\S+', '', text)
289 |
290 | # Remove numbers
291 | if remove_numbers:
292 | text = re.sub(r'\d+', '', text)
293 |
294 | # Remove punctuation
295 | if remove_punctuation:
296 | text = text.translate(str.maketrans('', '', string.punctuation))
297 |
298 | # Normalize whitespace
299 | if remove_whitespace:
300 | text = re.sub(r'\s+', ' ', text).strip()
301 |
302 | return text
303 |
304 |
305 | def extract_key_phrases(text: str, max_phrases: int = 5, min_word_length: int = 3) -> List[str]:
306 | """Extract key phrases from text using statistical methods.
307 |
308 | This function identifies significant phrases from the input text using a frequency-based
309 | approach. It works by normalizing the text, splitting it into sentences, extracting
310 | candidate noun phrases through regex pattern matching, and then ranking them by frequency.
311 | The approach is language-agnostic and works well for medium to large text passages.
312 |
313 | The extraction process follows these steps:
314 | 1. Normalize the input text (lowercase, preserve punctuation, remove URLs)
315 | 2. Split the text into individual sentences
316 | 3. Within each sentence, identify potential noun phrases using regex patterns
317 | that match 1-3 word sequences where at least one word meets the minimum length
318 | 4. Count phrase frequency across the entire text
319 | 5. Sort phrases by frequency (most frequent first)
320 | 6. Return the top N phrases based on the max_phrases parameter
321 |
322 | Args:
323 | text: Source text from which to extract key phrases
324 | max_phrases: Maximum number of phrases to return, default is 5
325 | min_word_length: Minimum length (in characters) for a word to be considered
326 | in phrase extraction, default is 3
327 |
328 | Returns:
329 | List of key phrases sorted by frequency (most frequent first).
330 | Returns an empty list if input text is empty or no phrases are found.
331 |
332 | Examples:
333 | >>> extract_key_phrases("The quick brown fox jumps over the lazy dog. The dog was too lazy to react.")
334 | ['the dog', 'lazy', 'quick brown fox']
335 |
336 | >>> extract_key_phrases("Machine learning is a field of study that gives computers the ability to learn without being explicitly programmed.", max_phrases=3)
337 | ['machine learning', 'field of study', 'computers']
338 |
339 | Notes:
340 | - The function works best on longer text passages with repeated key concepts
341 | - The approach prioritizes frequency over linguistic sophistication, so it's
342 | more effective for factual text than creative writing
343 | - For better results on short texts, consider decreasing min_word_length
344 | List of key phrases
345 | """
346 | if not text:
347 | return []
348 |
349 | # Normalize text
350 | normalized = normalize_text(
351 | text,
352 | lowercase=True,
353 | remove_punctuation=False,
354 | remove_whitespace=True,
355 | remove_urls=True,
356 | )
357 |
358 | # Split into sentences
359 | sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', normalized)
360 |
361 | # Extract phrases (simple noun phrases)
362 | phrases = []
363 | for sentence in sentences:
364 | # Find potential noun phrases
365 | np_matches = re.finditer(
366 | r'\b((?:(?:[A-Za-z]+\s+){0,2}[A-Za-z]{%d,})|(?:[A-Za-z]{%d,}))\b' %
367 | (min_word_length, min_word_length),
368 | sentence
369 | )
370 | for match in np_matches:
371 | phrase = match.group(0).strip()
372 | if len(phrase.split()) <= 3: # Limit to 3-word phrases
373 | phrases.append(phrase)
374 |
375 | # Count phrase frequency
376 | phrase_counts = {}
377 | for phrase in phrases:
378 | if phrase in phrase_counts:
379 | phrase_counts[phrase] += 1
380 | else:
381 | phrase_counts[phrase] = 1
382 |
383 | # Sort by frequency
384 | sorted_phrases = sorted(
385 | phrase_counts.items(),
386 | key=lambda x: x[1],
387 | reverse=True
388 | )
389 |
390 | # Return top phrases
391 | return [phrase for phrase, _ in sorted_phrases[:max_phrases]]
392 |
393 |
394 | def split_text_by_similarity(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
395 | """Split text into chunks of similar size at natural boundaries.
396 |
397 | This function divides a long text into smaller, semantically meaningful chunks while
398 | respecting natural text boundaries. It intelligently searches for boundaries like
399 | paragraph breaks, sentence endings, commas, or whitespace to ensure chunks don't break
400 | in the middle of a sentence or idea, which is important for text processing tasks
401 | like semantic analysis, embedding generation, or LLM processing.
402 |
403 | The chunking algorithm works as follows:
404 | 1. If the text is shorter than the chunk_size, return it as a single chunk
405 | 2. Otherwise, iteratively:
406 | a. Determine the target end position (start + chunk_size)
407 | b. Search for natural boundaries near the target end, prioritizing:
408 | - Paragraph breaks (\n\n)
409 | - Sentence boundaries (. followed by space and capital letter)
410 | - Commas
411 | - Any whitespace
412 | c. Split at the best available boundary
413 | d. Move the start position for the next chunk, including overlap
414 | e. Repeat until the entire text is processed
415 |
416 | Args:
417 | text: The text to split into chunks
418 | chunk_size: Target size of each chunk in characters (default: 1000)
419 | overlap: Number of characters to overlap between chunks (default: 100)
420 | This helps maintain context between chunks for tasks like semantic search
421 |
422 | Returns:
423 | List of text chunks. If the input text is empty or less than chunk_size,
424 | returns a list containing only the original text.
425 |
426 | Examples:
427 | >>> text = "Paragraph one.\\n\\nParagraph two. This is a sentence. And another.\\n\\nParagraph three."
428 | >>> chunks = split_text_by_similarity(text, chunk_size=30, overlap=5)
429 | >>> chunks
430 | ['Paragraph one.', 'Paragraph two. This is a sentence.', ' This is a sentence. And another.', 'Paragraph three.']
431 |
432 | Notes:
433 | - The function prioritizes finding natural boundaries over strictly adhering to chunk_size
434 | - With small chunk_size values and complex texts, some chunks may exceed chunk_size
435 | if no suitable boundary is found within a reasonable range
436 | - The overlap parameter helps maintain context between chunks, which is important
437 | for tasks like semantic search or text analysis
438 | """
439 | if not text or len(text) <= chunk_size:
440 | return [text]
441 |
442 | # Define boundary patterns in order of preference
443 | boundaries = [
444 | r'\n\s*\n', # Double newline (paragraph)
445 | r'\.\s+[A-Z]', # End of sentence
446 | r',\s+', # Comma with space
447 | r'\s+', # Any whitespace
448 | ]
449 |
450 | chunks = []
451 | start = 0
452 |
453 | while start < len(text):
454 | # Determine end position for this chunk
455 | end = min(start + chunk_size, len(text))
456 |
457 | # If we're not at the end of the text, find a good boundary
458 | if end < len(text):
459 | # Try each boundary pattern in order
460 | for pattern in boundaries:
461 | # Search for the boundary pattern before the end position
462 | search_area = text[max(start, end - chunk_size // 4):end]
463 | matches = list(re.finditer(pattern, search_area))
464 |
465 | if matches:
466 | # Found a good boundary, adjust end position
467 | match_end = matches[-1].end()
468 | end = max(start, end - chunk_size // 4) + match_end
469 | break
470 |
471 | # Extract the chunk
472 | chunk = text[start:end]
473 | chunks.append(chunk)
474 |
475 | # Move to the next chunk with overlap
476 | start = end - overlap
477 |
478 | return chunks
479 |
480 |
481 | def sanitize_text(text: str, allowed_tags: Optional[List[str]] = None) -> str:
482 | """Sanitize text by removing potentially harmful elements.
483 |
484 | This function cleans input text by removing potentially dangerous HTML/XML content
485 | that could lead to XSS (Cross-Site Scripting) or other injection attacks. It strips
486 | out script tags, style tags, HTML comments, and by default removes all HTML markup
487 | unless specific tags are explicitly allowed via the allowed_tags parameter.
488 |
489 | The sanitization process follows these steps:
490 | 1. Remove all <script> tags and their contents (highest security priority)
491 | 2. Remove all <style> tags and their contents (to prevent CSS-based attacks)
492 | 3. Remove all HTML comments (which might contain sensitive information)
493 | 4. Process HTML tags based on the allowed_tags parameter:
494 | - If allowed_tags is None: remove ALL HTML tags
495 | - If allowed_tags is provided: keep only those specific tags, remove all others
496 | 5. Convert HTML entities like &, <, etc. to their character equivalents
497 |
498 | Args:
499 | text: The text to sanitize, potentially containing unsafe HTML/XML content
500 | allowed_tags: Optional list of HTML tags to preserve (e.g., ["p", "br", "strong"]).
501 | If None (default), all HTML tags will be removed.
502 |
503 | Returns:
504 | Sanitized text with dangerous elements removed and HTML entities decoded.
505 | The original string is returned if the input is empty.
506 |
507 | Examples:
508 | >>> sanitize_text("<p>Hello <script>alert('XSS')</script> World</p>")
509 | 'Hello World'
510 |
511 | >>> sanitize_text("<p>Hello <b>Bold</b> World</p>", allowed_tags=["b"])
512 | 'Hello <b>Bold</b> World'
513 |
514 | >>> sanitize_text("Safe & sound")
515 | 'Safe & sound'
516 |
517 | Note:
518 | While this function provides basic sanitization, it is not a complete defense
519 | against all possible injection attacks. For highly sensitive applications,
520 | consider using specialized HTML sanitization libraries like bleach or html-sanitizer.
521 | """
522 | if not text:
523 | return text
524 |
525 | # Remove script tags and content
526 | text = re.sub(r'<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>', '', text)
527 |
528 | # Remove style tags and content
529 | text = re.sub(r'<style\b[^<]*(?:(?!<\/style>)<[^<]*)*<\/style>', '', text)
530 |
531 | # Remove comments
532 | text = re.sub(r'<!--.*?-->', '', text)
533 |
534 | # Handle HTML tags based on allowed_tags
535 | if allowed_tags:
536 | # Allow specified tags but remove all others
537 | allowed_pattern = '|'.join(allowed_tags) # noqa: F841
538 |
539 | # Function to process tag matches
540 | def tag_replacer(match):
541 | tag = match.group(1).lower()
542 | if tag in allowed_tags:
543 | return match.group(0)
544 | else:
545 | return ''
546 |
547 | # Replace tags not in allowed_tags
548 | text = re.sub(r'<(\w+)(?:\s[^>]*)?(?:\/?>|>.*?<\/\1>)', tag_replacer, text)
549 | else:
550 | # Remove all HTML tags
551 | text = re.sub(r'<[^>]*>', '', text)
552 |
553 | # Convert HTML entities
554 | text = _convert_html_entities(text)
555 |
556 | return text
557 |
558 |
559 | def _convert_html_entities(text: str) -> str:
560 | """Convert common HTML entities to their corresponding characters.
561 |
562 | This internal utility function translates HTML entity references (both named and numeric)
563 | into their equivalent Unicode characters. It handles common named entities like &,
564 | <, >, as well as decimal ({) and hexadecimal ({) numeric entity references.
565 |
566 | The conversion process:
567 | 1. Replace common named entities with their character equivalents using a lookup table
568 | 2. Convert decimal numeric entities (&#nnn;) to characters using int() and chr()
569 | 3. Convert hexadecimal numeric entities (&#xhh;) to characters using int(hex, 16) and chr()
570 |
571 | This function is primarily used internally by sanitize_text() to ensure that entity-encoded
572 | content is properly decoded after HTML tag processing.
573 |
574 | Args:
575 | text: String containing HTML entities to convert
576 |
577 | Returns:
578 | String with HTML entities replaced by their corresponding Unicode characters.
579 | If the input is empty or contains no entities, the original string is returned.
580 |
581 | Examples:
582 | >>> _convert_html_entities("<div>")
583 | '<div>'
584 |
585 | >>> _convert_html_entities("Copyright © 2023")
586 | 'Copyright © 2023'
587 |
588 | >>> _convert_html_entities("ABC")
589 | 'ABC'
590 |
591 | >>> _convert_html_entities("ABC")
592 | 'ABC'
593 |
594 | Limitations:
595 | - Only handles a subset of common named entities (amp, lt, gt, quot, apos, nbsp)
596 | - Entity references must be properly formed (e.g., & not & )
597 | - Doesn't validate that numeric references point to valid Unicode code points
598 | """
599 | # Define common HTML entities
600 | entities = {
601 | '&': '&',
602 | '<': '<',
603 | '>': '>',
604 | '"': '"',
605 | ''': "'",
606 | ' ': ' ',
607 | }
608 |
609 | # Replace each entity
610 | for entity, char in entities.items():
611 | text = text.replace(entity, char)
612 |
613 | # Handle numeric entities
614 | text = re.sub(r'&#(\d+);', lambda m: chr(int(m.group(1))), text)
615 | text = re.sub(r'&#x([0-9a-f]+);', lambda m: chr(int(m.group(1), 16)), text)
616 |
617 | return text
618 |
619 |
620 | def extract_structured_data(text: str, patterns: Dict[str, str]) -> Dict[str, Any]:
621 | """Extract structured key-value data from text using regex patterns.
622 |
623 | This function applies a set of regular expression patterns to extract specific
624 | information from unstructured text, converting it into a structured dictionary.
625 | It's useful for parsing semi-structured text like logs, emails, reports, or any
626 | content that follows consistent patterns.
627 |
628 | Each key in the patterns dictionary represents a field name in the output,
629 | while the corresponding value is a regex pattern used to extract that field's
630 | value from the input text. If the pattern contains capturing groups, the first
631 | group's match is used as the value; otherwise, the entire match is used.
632 |
633 | Features:
634 | - Multi-field extraction in a single pass
635 | - Case-insensitive matching by default
636 | - Support for multi-line patterns with DOTALL mode
637 | - Capturing group extraction for fine-grained control
638 | - Automatic whitespace trimming of extracted values
639 |
640 | Args:
641 | text: Source text to extract data from
642 | patterns: Dictionary mapping field names to regex patterns.
643 | Example: {"email": r"Email:\\s*([^\\s@]+@[^\\s@]+\\.[^\\s@]+)",
644 | "phone": r"Phone:\\s*(\\d{3}[-\\.\\s]??\\d{3}[-\\.\\s]??\\d{4})"}
645 |
646 | Returns:
647 | Dictionary with field names as keys and extracted values as strings.
648 | Only fields with successful matches are included in the result.
649 | Returns an empty dictionary if the input text is empty or no patterns match.
650 |
651 | Examples:
652 | >>> text = "Name: John Doe\\nEmail: [email protected]\\nAge: 30"
653 | >>> patterns = {
654 | ... "name": r"Name:\\s*(.*?)(?:\\n|$)",
655 | ... "email": r"Email:\\s*([^\\s@]+@[^\\s@]+\\.[^\\s@]+)",
656 | ... "age": r"Age:\\s*(\\d+)"
657 | ... }
658 | >>> extract_structured_data(text, patterns)
659 | {'name': 'John Doe', 'email': '[email protected]', 'age': '30'}
660 |
661 | >>> # Using a pattern without capturing groups
662 | >>> extract_structured_data("Status: Active", {"status": r"Status: \\w+"})
663 | {'status': 'Status: Active'}
664 |
665 | >>> # No matches
666 | >>> extract_structured_data("Empty content", {"field": r"NotFound"})
667 | {}
668 |
669 | Tips:
670 | - Use captured groups (parentheses) to extract specific parts of matches
671 | - Make patterns as specific as possible to avoid false positives
672 | - For complex multi-line extractions, use (?s) flag in your regex or rely on
673 | the built-in DOTALL mode this function applies
674 | - Remember to escape special regex characters when matching literals
675 | """
676 | if not text:
677 | return {}
678 |
679 | result = {}
680 |
681 | # Apply each pattern
682 | for field, pattern in patterns.items():
683 | match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
684 | if match:
685 | # If the pattern has groups, use the first group
686 | if match.groups():
687 | result[field] = match.group(1).strip()
688 | else:
689 | result[field] = match.group(0).strip()
690 |
691 | return result
692 |
693 |
694 | def find_text_similarity(text1: str, text2: str) -> float:
695 | """Calculate text similarity using character n-grams and Jaccard similarity.
696 |
697 | This function measures the similarity between two text strings using character-level
698 | trigrams (3-character sequences) and the Jaccard similarity coefficient. This approach
699 | provides a language-agnostic way to detect similarity that works well for:
700 |
701 | - Fuzzy matching of text fragments
702 | - Detecting near-duplicate content
703 | - Finding related sentences or paragraphs
704 | - Language-independent text comparisons
705 |
706 | The algorithm works as follows:
707 | 1. Normalize both texts (lowercase, remove excess whitespace)
708 | 2. Generate character trigrams (sets of all 3-character sequences) for each text
709 | 3. Calculate Jaccard similarity: |A ∩ B| / |A ∪ B|
710 | (size of intersection divided by size of union)
711 |
712 | This approach emphasizes shared character patterns rather than exact word matches,
713 | making it robust to minor spelling variations, word order changes, and formatting
714 | differences.
715 |
716 | Args:
717 | text1: First text string to compare
718 | text2: Second text string to compare
719 |
720 | Returns:
721 | Similarity score as a float between 0.0 (completely different) and
722 | 1.0 (identical after normalization). Returns 0.0 if either input is empty.
723 | For very short texts (<3 chars), returns 1.0 if both are identical after
724 | normalization, otherwise 0.0.
725 |
726 | Examples:
727 | >>> find_text_similarity("hello world", "hello world")
728 | 1.0
729 |
730 | >>> find_text_similarity("hello world", "world hello")
731 | 0.6153846153846154 # High similarity despite word order change
732 |
733 | >>> find_text_similarity("color", "colour")
734 | 0.5714285714285714 # Handles spelling variations
735 |
736 | >>> find_text_similarity("completely different", "nothing alike")
737 | 0.0
738 |
739 | Performance note:
740 | The function builds complete sets of trigrams for both texts, which can
741 | consume significant memory for very large inputs. Consider chunking or
742 | sampling when processing large documents.
743 | """
744 | if not text1 or not text2:
745 | return 0.0
746 |
747 | # Normalize texts
748 | text1 = normalize_text(text1, lowercase=True, remove_whitespace=True)
749 | text2 = normalize_text(text2, lowercase=True, remove_whitespace=True)
750 |
751 | # Generate character trigrams
752 | def get_trigrams(s):
753 | return set(s[i:i+3] for i in range(len(s) - 2))
754 |
755 | trigrams1 = get_trigrams(text1)
756 | trigrams2 = get_trigrams(text2)
757 |
758 | # Find common trigrams
759 | common = trigrams1.intersection(trigrams2)
760 |
761 | # Calculate Jaccard similarity
762 | if not trigrams1 and not trigrams2:
763 | return 1.0 # Both strings are too short for trigrams
764 |
765 | return len(common) / max(1, len(trigrams1.union(trigrams2)))
766 |
767 |
768 | def get_text_stats(text: str) -> Dict[str, Any]:
769 | """Analyze text to compute various linguistic and structural statistics.
770 |
771 | This function calculates a comprehensive set of metrics that characterize the
772 | input text, including volume metrics (character/word counts), structural features
773 | (sentence/paragraph counts), and readability indicators (average word/sentence length).
774 | It also estimates the number of tokens that would be consumed by LLM processing.
775 |
776 | These statistics are useful for:
777 | - Assessing text complexity and readability
778 | - Estimating processing costs for LLM operations
779 | - Content analysis and comparison
780 | - Enforcing length constraints in applications
781 | - Debugging text processing pipelines
782 |
783 | The function uses regex-based analyses to identify linguistic boundaries
784 | (words, sentences, paragraphs) and delegates token estimation to the count_tokens
785 | function, which uses model-specific tokenizers when available.
786 |
787 | The metrics provided in the output dictionary:
788 | - char_count: Total number of characters in the text, including whitespace.
789 | - word_count: Total number of words, using word boundary regex (\\b\\w+\\b).
790 | This counts sequences of alphanumeric characters as words.
791 | - sentence_count: Number of sentences, detected by looking for periods,
792 | question marks, or exclamation points followed by spaces, with special
793 | handling for common abbreviations to reduce false positives.
794 | - paragraph_count: Number of paragraphs, determined by double newline
795 | sequences (\\n\\n) which typically indicate paragraph breaks.
796 | - avg_word_length: Average length of words in characters, rounded to
797 | one decimal place. Provides a simple readability indicator.
798 | - avg_sentence_length: Average number of words per sentence, rounded to
799 | one decimal place. Higher values typically indicate more complex text.
800 | - estimated_tokens: Estimated number of tokens for LLM processing using
801 | the count_tokens function, which uses model-specific tokenizers when
802 | available or falls back to character-based estimation.
803 |
804 | Args:
805 | text: The text to analyze. Can be any length, including empty text.
806 |
807 | Returns:
808 | A dictionary containing the linguistic and structural statistics described above.
809 | For empty input, returns a dictionary with all values set to 0.
810 |
811 | Examples:
812 | >>> stats = get_text_stats("Hello world. This is a sample text with two sentences.")
813 | >>> stats['word_count']
814 | 10
815 | >>> stats['sentence_count']
816 | 2
817 | >>> stats['avg_word_length']
818 | 4.2
819 |
820 | >>> # Multiple paragraphs
821 | >>> text = "First paragraph with multiple sentences. Second sentence here.\\n\\n"
822 | >>> text += "Second paragraph. This has shorter sentences."
823 | >>> stats = get_text_stats(text)
824 | >>> stats['paragraph_count']
825 | 2
826 | >>> stats['sentence_count']
827 | 4
828 |
829 | >>> # Empty input
830 | >>> get_text_stats("")
831 | {'char_count': 0, 'word_count': 0, 'sentence_count': 0, 'paragraph_count': 0,
832 | 'avg_word_length': 0, 'avg_sentence_length': 0, 'estimated_tokens': 0}
833 | """
834 | if not text:
835 | return {
836 | "char_count": 0,
837 | "word_count": 0,
838 | "sentence_count": 0,
839 | "paragraph_count": 0,
840 | "avg_word_length": 0,
841 | "avg_sentence_length": 0,
842 | "estimated_tokens": 0,
843 | }
844 |
845 | # Character count
846 | char_count = len(text)
847 |
848 | # Word count
849 | words = re.findall(r'\b\w+\b', text)
850 | word_count = len(words)
851 |
852 | # Sentence count
853 | sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', text)
854 | sentence_count = len([s for s in sentences if s.strip()])
855 |
856 | # Paragraph count
857 | paragraphs = re.split(r'\n\s*\n', text)
858 | paragraph_count = len([p for p in paragraphs if p.strip()])
859 |
860 | # Average word length
861 | avg_word_length = sum(len(word) for word in words) / max(1, word_count)
862 |
863 | # Average sentence length (in words)
864 | avg_sentence_length = word_count / max(1, sentence_count)
865 |
866 | # Estimated tokens
867 | estimated_tokens = count_tokens(text)
868 |
869 | return {
870 | "char_count": char_count,
871 | "word_count": word_count,
872 | "sentence_count": sentence_count,
873 | "paragraph_count": paragraph_count,
874 | "avg_word_length": round(avg_word_length, 1),
875 | "avg_sentence_length": round(avg_sentence_length, 1),
876 | "estimated_tokens": estimated_tokens,
877 | }
878 |
879 |
880 | def preprocess_text(text: str) -> str:
881 | """Standardize and clean text for machine learning and NLP tasks.
882 |
883 | This function applies a series of transformations to normalize input text
884 | into a standardized format suitable for classification, embedding generation,
885 | semantic analysis, and other natural language processing tasks. It focuses on
886 | removing noise and irregularities that could interfere with ML/NLP model performance
887 | while preserving the semantic content of the text.
888 |
889 | Transformations applied:
890 | 1. Whitespace normalization: Collapses multiple spaces, tabs, newlines into single spaces
891 | 2. Control character removal: Strips non-printable ASCII control characters
892 | 3. Punctuation normalization: Reduces excessive repeated punctuation (e.g., "!!!!!!" → "!!!")
893 | 4. Length truncation: For extremely long texts, preserves beginning and end with a
894 | truncation marker in the middle to stay under token limits
895 |
896 | This preprocessing is particularly useful for:
897 | - Text classification tasks where consistent input format is important
898 | - Before vectorization or embedding generation
899 | - Preparing text for input to language models
900 | - Reducing noise in text analytics
901 |
902 | Args:
903 | text: The input text to preprocess. Can be any length, including empty.
904 |
905 | Returns:
906 | Preprocessed text with standardized formatting. The original text is returned
907 | if it's empty. For extremely long inputs (>100,000 chars), returns a truncated
908 | version preserving the beginning and end portions.
909 |
910 | Examples:
911 | >>> preprocess_text("Hello world!!!\nHow are\t\tyou?")
912 | 'Hello world!!! How are you?'
913 |
914 | >>> preprocess_text("Too much punctuation!!!!!!!!")
915 | 'Too much punctuation!!!'
916 |
917 | >>> preprocess_text("")
918 | ''
919 |
920 | Note:
921 | This function preserves case, punctuation, and special characters (beyond control chars),
922 | as these may be semantically relevant for many NLP tasks. For more aggressive normalization,
923 | consider using the normalize_text() function with appropriate parameters.
924 | """
925 | if not text:
926 | return text
927 |
928 | # Clean up whitespace
929 | text = re.sub(r'\s+', ' ', text).strip()
930 |
931 | # Remove control characters
932 | text = re.sub(r'[\x00-\x09\x0B\x0C\x0E-\x1F\x7F]', '', text)
933 |
934 | # Remove excessive punctuation repetition
935 | text = re.sub(r'([.!?]){3,}', r'\1\1\1', text)
936 |
937 | # Truncate if extremely long (preserve beginning and end)
938 | max_chars = 100000 # Reasonable limit to prevent token explosion
939 | if len(text) > max_chars:
940 | half = max_chars // 2
941 | text = text[:half] + " [...text truncated...] " + text[-half:]
942 |
943 | return text
```
--------------------------------------------------------------------------------
/examples/meta_api_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """
3 | Meta API Tool Demonstration Script.
4 |
5 | This script demonstrates the functionality of the APIMetaTool class for dynamically
6 | registering and using external APIs via their OpenAPI specifications.
7 |
8 | The demo features:
9 | 1. Registering APIs with the MCP server using their OpenAPI specifications
10 | 2. Listing registered APIs and their endpoints
11 | 3. Getting detailed information about an API and its endpoints
12 | 4. Calling dynamically registered tools
13 | 5. Refreshing an API to update its endpoints
14 | 6. Getting detailed information about a specific tool
15 | 7. Unregistering APIs
16 |
17 | We use the Swagger Petstore API as our primary demo example along with additional
18 | public APIs for multi-API demonstrations.
19 | """
20 | import asyncio
21 | import json
22 | import sys
23 | import time
24 | from pathlib import Path
25 | from typing import Any, Dict, List
26 |
27 | # Add project root to path for imports when running as script
28 | sys.path.insert(0, str(Path(__file__).parent.parent))
29 |
30 | from rich.console import Console
31 | from rich.markdown import Markdown
32 | from rich.panel import Panel
33 | from rich.progress import Progress, SpinnerColumn, TextColumn
34 | from rich.prompt import Confirm, Prompt
35 | from rich.rule import Rule
36 | from rich.syntax import Syntax
37 | from rich.table import Table
38 |
39 | import ultimate_mcp_server.core # To access the global gateway instance
40 | from ultimate_mcp_server import create_app
41 | from ultimate_mcp_server.tools.meta_api_tool import APIMetaTool # Import class for type hinting
42 |
43 | # Initialize Rich console
44 | console = Console()
45 |
46 | # Demo APIs to showcase
47 | DEMO_APIS = {
48 | "petstore": {
49 | "name": "petstore",
50 | "url": "https://petstore.swagger.io/v2/swagger.json",
51 | "description": "Swagger Petstore API - A sample API for pet store management"
52 | },
53 | "weather": {
54 | "name": "weather",
55 | "url": "https://api.met.no/weatherapi/locationforecast/2.0/openapi.json",
56 | "description": "Norwegian Meteorological Institute API - Weather forecast data"
57 | },
58 | "mock": {
59 | "name": "mockapi",
60 | "url": "https://fastapimockserver.onrender.com/openapi.json",
61 | "description": "Mock API Server - A simple mock API for testing"
62 | }
63 | }
64 |
65 | # Default API to use for demos
66 | DEFAULT_API = "petstore"
67 |
68 |
69 | async def show_intro():
70 | """Display an introduction to the demo."""
71 | console.clear()
72 | console.print("\n[bold cyan]META API TOOL DEMONSTRATION[/bold cyan]", justify="center")
73 | console.print("[italic]Dynamically register and use any OpenAPI-compatible API[/italic]", justify="center")
74 | console.print("\n")
75 |
76 | # Display APIs that will be used in demo
77 | table = Table(title="Demo APIs", box=None, highlight=True, border_style="blue")
78 | table.add_column("API Name", style="cyan")
79 | table.add_column("Description", style="green")
80 | table.add_column("OpenAPI URL", style="blue")
81 |
82 | for api_info in DEMO_APIS.values():
83 | table.add_row(api_info["name"], api_info["description"], api_info["url"])
84 |
85 | console.print(Panel(table, border_style="blue", title="Available APIs", expand=False))
86 |
87 | # Display introduction as markdown
88 | intro_md = """
89 | ## Welcome to the Meta API Tool Demo
90 |
91 | This demonstration shows how to use the Meta API Tool to:
92 | - Register external APIs dynamically
93 | - Access API endpoints as tools
94 | - Call external services seamlessly
95 | """
96 | console.print(Markdown(intro_md))
97 | console.print("\n")
98 |
99 |
100 | async def register_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
101 | """Register an API with the MCP server using its OpenAPI specification.
102 |
103 | Args:
104 | api_meta_tool: The APIMetaTool instance
105 | api_name: Name of the API to register
106 |
107 | Returns:
108 | Result of the registration
109 | """
110 | console.print(Rule(f"[bold blue]REGISTERING API: {api_name.upper()}[/bold blue]"))
111 |
112 | api_info = DEMO_APIS.get(api_name)
113 | if not api_info:
114 | console.print(f"[bold red]Error:[/bold red] API '{api_name}' not found in demo configuration.")
115 | return {}
116 |
117 | # Show the API details before registration
118 | console.print(Panel(
119 | f"[bold]API Name:[/bold] {api_info['name']}\n"
120 | f"[bold]OpenAPI URL:[/bold] {api_info['url']}\n"
121 | f"[bold]Description:[/bold] {api_info['description']}",
122 | title="API Registration Details",
123 | border_style="green",
124 | expand=False
125 | ))
126 |
127 | console.print("[cyan]> Fetching OpenAPI specification from URL and registering tools...[/cyan]")
128 |
129 | start_time = time.time()
130 | with Progress(
131 | SpinnerColumn(),
132 | TextColumn("[bold green]Registering API..."),
133 | transient=True
134 | ) as progress:
135 | task = progress.add_task("", total=None) # noqa: F841
136 | try:
137 | # Use the passed-in instance directly
138 | result = await api_meta_tool.register_api(
139 | api_name=api_info["name"],
140 | openapi_url=api_info["url"]
141 | )
142 | processing_time = time.time() - start_time
143 |
144 | # Show success message
145 | console.print(f"[bold green]✓ Success![/bold green] API registered in {processing_time:.2f}s")
146 |
147 | # Display registered tools in a table
148 | if result.get("tools_count", 0) > 0:
149 | table = Table(
150 | title=f"Registered {result['tools_count']} Tools",
151 | box=None,
152 | highlight=True,
153 | border_style="blue"
154 | )
155 | table.add_column("Tool Name", style="cyan")
156 |
157 | for tool in result.get("tools_registered", []):
158 | table.add_row(tool)
159 |
160 | console.print(Panel(table, border_style="green", expand=False))
161 | else:
162 | console.print("[yellow]No tools were registered for this API.[/yellow]")
163 |
164 | return result
165 | except Exception as e:
166 | console.print(f"[bold red]Error during API registration:[/bold red] {str(e)}")
167 | return {}
168 |
169 |
170 | async def list_apis_demo(api_meta_tool: APIMetaTool) -> Dict[str, Any]:
171 | """List all registered APIs and their tools.
172 |
173 | Args:
174 | api_meta_tool: The APIMetaTool instance
175 |
176 | Returns:
177 | Result containing information about registered APIs
178 | """
179 | console.print(Rule("[bold blue]LISTING REGISTERED APIs[/bold blue]"))
180 |
181 | with console.status("[bold green]Fetching registered APIs...", spinner="dots"):
182 | try:
183 | result = await api_meta_tool.list_registered_apis()
184 |
185 | if result.get("total_apis", 0) > 0:
186 | # Display registered APIs in a table
187 | table = Table(
188 | title=f"Registered APIs ({result['total_apis']})",
189 | box=None,
190 | highlight=True,
191 | border_style="blue"
192 | )
193 | table.add_column("API Name", style="cyan")
194 | table.add_column("Base URL", style="blue")
195 | table.add_column("Tools Count", style="green", justify="right")
196 |
197 | for api_name, api_info in result.get("apis", {}).items():
198 | table.add_row(
199 | api_name,
200 | api_info.get("base_url", "N/A"),
201 | str(api_info.get("tools_count", 0))
202 | )
203 |
204 | console.print(Panel(table, border_style="green", expand=False))
205 | console.print(f"[green]Total Tools: {result.get('total_tools', 0)}[/green]")
206 | else:
207 | console.print("[yellow]No APIs are currently registered.[/yellow]")
208 |
209 | return result
210 | except Exception as e:
211 | console.print(f"[bold red]Error listing APIs:[/bold red] {str(e)}")
212 | return {}
213 |
214 |
215 | async def get_api_details_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
216 | """Get detailed information about a registered API.
217 |
218 | Args:
219 | api_meta_tool: The APIMetaTool instance
220 | api_name: Name of the API to get details for
221 |
222 | Returns:
223 | API details
224 | """
225 | console.print(Rule(f"[bold blue]API DETAILS: {api_name.upper()}[/bold blue]"))
226 |
227 | with console.status(f"[bold green]Fetching details for {api_name} API...", spinner="dots"):
228 | try:
229 | result = await api_meta_tool.get_api_details(api_name=api_name)
230 |
231 | # Display API overview
232 | console.print(Panel(
233 | f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
234 | f"[bold]Base URL:[/bold] {result.get('base_url', 'N/A')}\n"
235 | f"[bold]OpenAPI URL:[/bold] {result.get('openapi_url', 'N/A')}\n"
236 | f"[bold]Endpoints Count:[/bold] {result.get('endpoints_count', 0)}",
237 | title="API Overview",
238 | border_style="green",
239 | expand=False
240 | ))
241 |
242 | # Display endpoints in a table
243 | endpoints = result.get("tools", [])
244 | if endpoints:
245 | table = Table(
246 | title=f"API Endpoints ({len(endpoints)})",
247 | box=None,
248 | highlight=True,
249 | border_style="blue"
250 | )
251 | table.add_column("Tool Name", style="cyan")
252 | table.add_column("Method", style="magenta", justify="center")
253 | table.add_column("Path", style="blue")
254 | table.add_column("Summary", style="green")
255 |
256 | for endpoint in endpoints:
257 | table.add_row(
258 | endpoint.get("name", "N/A"),
259 | endpoint.get("method", "N/A").upper(),
260 | endpoint.get("path", "N/A"),
261 | endpoint.get("summary", "No summary") or "No summary"
262 | )
263 |
264 | console.print(Panel(table, border_style="green", expand=False))
265 | else:
266 | console.print("[yellow]No endpoints found for this API.[/yellow]")
267 |
268 | return result
269 | except Exception as e:
270 | console.print(f"[bold red]Error getting API details:[/bold red] {str(e)}")
271 | return {}
272 |
273 |
274 | async def get_tool_details_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
275 | """Get detailed information about a specific tool from an API.
276 |
277 | Args:
278 | api_meta_tool: The APIMetaTool instance
279 | api_name: Name of the API that contains the tool
280 |
281 | Returns:
282 | Tool details
283 | """
284 | console.print(Rule(f"[bold blue]TOOL DETAILS DEMO FOR {api_name.upper()}[/bold blue]"))
285 |
286 | # First get the API details to find available tools
287 | with console.status(f"[bold green]Fetching available tools for {api_name} API...", spinner="dots"):
288 | try:
289 | api_details = await api_meta_tool.get_api_details(api_name=api_name)
290 |
291 | if not api_details.get("tools", []):
292 | console.print(f"[yellow]No tools available for {api_name} API.[/yellow]")
293 | return {}
294 |
295 | # Find a suitable GET tool for demo purposes
296 | tools = api_details.get("tools", [])
297 | get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
298 |
299 | if get_tools:
300 | # Prefer a GET tool with path parameters for a more interesting demo
301 | path_param_tools = [t for t in get_tools if "{" in t.get("path", "")]
302 | if path_param_tools:
303 | selected_tool = path_param_tools[0]
304 | else:
305 | selected_tool = get_tools[0]
306 | else:
307 | # If no GET tools, just pick the first tool
308 | selected_tool = tools[0]
309 |
310 | tool_name = selected_tool.get("name", "")
311 | console.print(f"[cyan]Selected tool for details:[/cyan] [bold]{tool_name}[/bold]")
312 |
313 | # Get detailed information about the selected tool
314 | with console.status(f"[bold green]Fetching details for {tool_name}...", spinner="dots"):
315 | result = await api_meta_tool.get_tool_details(tool_name=tool_name)
316 |
317 | # Display tool overview
318 | console.print(Panel(
319 | f"[bold]Tool Name:[/bold] {result.get('tool_name', 'N/A')}\n"
320 | f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
321 | f"[bold]Method:[/bold] {result.get('method', 'N/A').upper()}\n"
322 | f"[bold]Path:[/bold] {result.get('path', 'N/A')}\n"
323 | f"[bold]Summary:[/bold] {result.get('summary', 'No summary') or 'No summary'}\n"
324 | f"[bold]Description:[/bold] {result.get('description', 'No description') or 'No description'}",
325 | title="Tool Overview",
326 | border_style="green",
327 | expand=False
328 | ))
329 |
330 | # Display parameters if any
331 | parameters = result.get("parameters", [])
332 | if parameters:
333 | param_table = Table(
334 | title="Tool Parameters",
335 | box=None,
336 | highlight=True,
337 | border_style="blue"
338 | )
339 | param_table.add_column("Name", style="cyan")
340 | param_table.add_column("Type", style="blue")
341 | param_table.add_column("Required", style="green", justify="center")
342 | param_table.add_column("In", style="magenta")
343 | param_table.add_column("Description", style="yellow")
344 |
345 | for param in parameters:
346 | param_type = param.get("schema", {}).get("type", "string")
347 | required = "✓" if param.get("required", False) else "-"
348 | param_in = param.get("in", "query")
349 | description = param.get("description", "No description") or "No description"
350 |
351 | param_table.add_row(
352 | param.get("name", "N/A"),
353 | param_type,
354 | required,
355 | param_in,
356 | description
357 | )
358 |
359 | console.print(Panel(param_table, border_style="green", expand=False))
360 | else:
361 | console.print("[yellow]This tool has no parameters.[/yellow]")
362 |
363 | # Display source code
364 | source_code = result.get("source_code", "Source code not available")
365 | if len(source_code) > 500:
366 | # Truncate long source code for display purposes
367 | source_code = source_code[:500] + "\n\n[...truncated...]"
368 |
369 | console.print(Panel(
370 | Syntax(source_code, "python", theme="monokai", line_numbers=True),
371 | title="Tool Source Code",
372 | border_style="green",
373 | expand=False
374 | ))
375 |
376 | return result
377 | except Exception as e:
378 | console.print(f"[bold red]Error getting tool details:[/bold red] {str(e)}")
379 | return {}
380 |
381 |
382 | async def call_tool_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
383 | """Call a dynamically registered tool from an API.
384 |
385 | Args:
386 | api_meta_tool: The APIMetaTool instance
387 | api_name: Name of the API that contains the tool
388 |
389 | Returns:
390 | Result of the tool call
391 | """
392 | console.print(Rule(f"[bold blue]CALLING TOOL FROM {api_name.upper()}[/bold blue]"))
393 |
394 | # First get the API details to find available tools
395 | with Progress(
396 | SpinnerColumn(),
397 | TextColumn(f"[bold green]Fetching available tools for {api_name} API..."),
398 | transient=True
399 | ) as progress:
400 | task = progress.add_task("", total=None) # noqa: F841
401 | try:
402 | api_details = await api_meta_tool.get_api_details(api_name=api_name)
403 |
404 | if not api_details.get("tools", []):
405 | console.print(f"[yellow]No tools available for {api_name} API.[/yellow]")
406 | return {}
407 |
408 | # Find a suitable GET tool for demo purposes
409 | tools = api_details.get("tools", [])
410 |
411 | # For Petstore API, use specific endpoints for better demonstration
412 | if api_name == "petstore":
413 | # Try to find the "findPetsByStatus" endpoint, which is a good demo endpoint
414 | pet_status_tools = [t for t in tools if "findPetsByStatus" in t.get("name", "")]
415 | if pet_status_tools:
416 | selected_tool = pet_status_tools[0]
417 | else:
418 | # Fall back to "getInventory" which doesn't need parameters
419 | inventory_tools = [t for t in tools if "getInventory" in t.get("name", "")]
420 | if inventory_tools:
421 | selected_tool = inventory_tools[0]
422 | else:
423 | # Just pick a GET endpoint if specific ones not found
424 | get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
425 | selected_tool = get_tools[0] if get_tools else tools[0]
426 | else:
427 | # For other APIs, prefer GET endpoints without path parameters for simplicity
428 | get_tools = [t for t in tools if t.get("method", "").lower() == "get" and "{" not in t.get("path", "")]
429 | if get_tools:
430 | selected_tool = get_tools[0]
431 | else:
432 | # Fall back to any GET endpoint
433 | get_tools = [t for t in tools if t.get("method", "").lower() == "get"]
434 | if get_tools:
435 | selected_tool = get_tools[0]
436 | else:
437 | # Just pick the first tool if no GET tools
438 | selected_tool = tools[0]
439 |
440 | tool_name = selected_tool.get("name", "")
441 | tool_method = selected_tool.get("method", "").upper()
442 | tool_path = selected_tool.get("path", "")
443 | tool_summary = selected_tool.get("summary", "No summary") or "No summary"
444 |
445 | console.print(Panel(
446 | f"[bold]Selected Tool:[/bold] {tool_name}\n"
447 | f"[bold]Method:[/bold] {tool_method}\n"
448 | f"[bold]Path:[/bold] {tool_path}\n"
449 | f"[bold]Summary:[/bold] {tool_summary}",
450 | title="Tool Information",
451 | border_style="green",
452 | expand=False
453 | ))
454 |
455 | # Get tool details to determine parameters
456 | tool_details = await api_meta_tool.get_tool_details(tool_name=tool_name)
457 | parameters = tool_details.get("parameters", [])
458 |
459 | # Prepare inputs based on the tool
460 | inputs = {}
461 |
462 | # For Petstore API, use specific values
463 | if api_name == "petstore":
464 | if "findPetsByStatus" in tool_name:
465 | inputs = {"status": "available"}
466 | console.print("[cyan]Using input:[/cyan] status=available")
467 | elif "getPetById" in tool_name:
468 | inputs = {"petId": 1}
469 | console.print("[cyan]Using input:[/cyan] petId=1")
470 | else:
471 | # For other tools, add required parameters
472 | required_params = [p for p in parameters if p.get("required", False)]
473 | if required_params:
474 | console.print("[yellow]This tool requires parameters. Using default values for demo.[/yellow]")
475 |
476 | for param in required_params:
477 | param_name = param.get("name", "")
478 | param_type = param.get("schema", {}).get("type", "string")
479 | param_in = param.get("in", "query")
480 |
481 | # Assign default values based on parameter type
482 | if param_type == "integer":
483 | inputs[param_name] = 1
484 | elif param_type == "number":
485 | inputs[param_name] = 1.0
486 | elif param_type == "boolean":
487 | inputs[param_name] = True
488 | else: # string or other
489 | inputs[param_name] = "test"
490 |
491 | console.print(f"[cyan]Using input:[/cyan] {param_name}={inputs[param_name]} ({param_in})")
492 |
493 | # Call the tool
494 | console.print("\n[bold]Calling the tool...[/bold]")
495 | start_time = time.time()
496 | with console.status(f"[bold green]Executing {tool_name}...", spinner="dots"):
497 | result = await api_meta_tool.call_dynamic_tool(
498 | tool_name=tool_name,
499 | inputs=inputs
500 | )
501 | processing_time = time.time() - start_time
502 |
503 | console.print(f"[bold green]✓ Success![/bold green] Tool executed in {processing_time:.2f}s")
504 |
505 | # Display result as formatted JSON
506 | result_json = json.dumps(result, indent=2)
507 | console.print(Panel(
508 | Syntax(result_json, "json", theme="monokai", line_numbers=True),
509 | title="Tool Response",
510 | border_style="green",
511 | expand=False
512 | ))
513 |
514 | return result
515 | except Exception as e:
516 | console.print(f"[bold red]Error calling tool:[/bold red] {str(e)}")
517 | return {}
518 |
519 |
520 | async def list_tools_demo(api_meta_tool: APIMetaTool) -> Dict[str, Any]:
521 | """List all dynamically registered tools.
522 |
523 | Args:
524 | api_meta_tool: The APIMetaTool instance
525 |
526 | Returns:
527 | Result with information about all available tools
528 | """
529 | console.print(Rule("[bold blue]LISTING ALL AVAILABLE TOOLS[/bold blue]"))
530 |
531 | with console.status("[bold green]Fetching available tools...", spinner="dots"):
532 | try:
533 | result = await api_meta_tool.list_available_tools()
534 |
535 | tools = result.get("tools", [])
536 | if tools:
537 | table = Table(
538 | title=f"Available Tools ({len(tools)})",
539 | box=None,
540 | highlight=True,
541 | border_style="blue"
542 | )
543 | table.add_column("Tool Name", style="cyan")
544 | table.add_column("API Name", style="magenta")
545 | table.add_column("Method", style="green", justify="center")
546 | table.add_column("Path", style="blue")
547 | table.add_column("Summary", style="yellow")
548 |
549 | for tool in tools:
550 | table.add_row(
551 | tool.get("name", "N/A"),
552 | tool.get("api_name", "N/A"),
553 | tool.get("method", "N/A").upper(),
554 | tool.get("path", "N/A"),
555 | tool.get("summary", "No summary") or "No summary"
556 | )
557 |
558 | console.print(Panel(table, border_style="green", expand=False))
559 | else:
560 | console.print("[yellow]No tools are currently registered.[/yellow]")
561 |
562 | return result
563 | except Exception as e:
564 | console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
565 | return {}
566 |
567 |
568 | async def refresh_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
569 | """Refresh an API to update its endpoints.
570 |
571 | Args:
572 | api_meta_tool: The APIMetaTool instance
573 | api_name: Name of the API to refresh
574 |
575 | Returns:
576 | Result of the refresh operation
577 | """
578 | console.print(Rule(f"[bold blue]REFRESHING API: {api_name.upper()}[/bold blue]"))
579 |
580 | console.print(f"[cyan]Refreshing API {api_name} to update endpoints...[/cyan]")
581 |
582 | with console.status(f"[bold green]Refreshing {api_name} API...", spinner="dots"):
583 | try:
584 | start_time = time.time()
585 | result = await api_meta_tool.refresh_api(api_name=api_name)
586 | processing_time = time.time() - start_time
587 |
588 | console.print(f"[bold green]✓ Success![/bold green] API refreshed in {processing_time:.2f}s")
589 |
590 | # Display refresh summary
591 | console.print(Panel(
592 | f"[bold]Tools Added:[/bold] {len(result.get('tools_added', []))}\n"
593 | f"[bold]Tools Updated:[/bold] {len(result.get('tools_updated', []))}\n"
594 | f"[bold]Tools Removed:[/bold] {len(result.get('tools_removed', []))}\n"
595 | f"[bold]Total Tools:[/bold] {result.get('tools_count', 0)}",
596 | title="Refresh Results",
597 | border_style="green",
598 | expand=False
599 | ))
600 |
601 | # Display lists of added/removed tools if any
602 | added_tools = result.get("tools_added", [])
603 | if added_tools:
604 | console.print("[bold]Added Tools:[/bold]")
605 | for tool in added_tools:
606 | console.print(f" [green]+ {tool}[/green]")
607 |
608 | removed_tools = result.get("tools_removed", [])
609 | if removed_tools:
610 | console.print("[bold]Removed Tools:[/bold]")
611 | for tool in removed_tools:
612 | console.print(f" [red]- {tool}[/red]")
613 |
614 | return result
615 | except Exception as e:
616 | console.print(f"[bold red]Error refreshing API:[/bold red] {str(e)}")
617 | return {}
618 |
619 |
620 | async def unregister_api_demo(api_meta_tool: APIMetaTool, api_name: str = DEFAULT_API) -> Dict[str, Any]:
621 | """Unregister an API and all its tools.
622 |
623 | Args:
624 | api_meta_tool: The APIMetaTool instance
625 | api_name: Name of the API to unregister
626 |
627 | Returns:
628 | Result of the unregistration
629 | """
630 | console.print(Rule(f"[bold blue]UNREGISTERING API: {api_name.upper()}[/bold blue]"))
631 |
632 | with console.status(f"[bold green]Unregistering {api_name} API...", spinner="dots"):
633 | try:
634 | start_time = time.time()
635 | result = await api_meta_tool.unregister_api(api_name=api_name)
636 | processing_time = time.time() - start_time
637 |
638 | console.print(f"[bold green]✓ Success![/bold green] API unregistered in {processing_time:.2f}s")
639 |
640 | # Display unregistration summary
641 | console.print(Panel(
642 | f"[bold]API Name:[/bold] {result.get('api_name', 'N/A')}\n"
643 | f"[bold]Tools Unregistered:[/bold] {len(result.get('tools_unregistered', []))}\n",
644 | title="Unregistration Results",
645 | border_style="green",
646 | expand=False
647 | ))
648 |
649 | return result
650 | except Exception as e:
651 | console.print(f"[bold red]Error unregistering API:[/bold red] {str(e)}")
652 | return {}
653 |
654 |
655 | async def run_multi_api_demo(api_meta_tool: APIMetaTool):
656 | """Run a demonstration with multiple APIs registered simultaneously.
657 |
658 | Args:
659 | api_meta_tool: The APIMetaTool instance
660 | """
661 | console.print(Rule("[bold blue]MULTI-API DEMONSTRATION[/bold blue]"))
662 |
663 | console.print(Panel(
664 | "This demo shows how to work with multiple APIs registered simultaneously.\n"
665 | "We'll register two different APIs and interact with them.",
666 | title="Multiple APIs Demo",
667 | border_style="green",
668 | expand=False
669 | ))
670 |
671 | # Register multiple APIs
672 | apis_to_register = ["petstore", "weather"]
673 | registered_apis = []
674 |
675 | for api_name in apis_to_register:
676 | console.print(f"\n[bold]Registering {api_name} API...[/bold]")
677 | result = await register_api_demo(api_meta_tool, api_name)
678 | if result:
679 | registered_apis.append(api_name)
680 |
681 | if len(registered_apis) < 2:
682 | console.print("[yellow]Not enough APIs registered for multi-API demo. Skipping...[/yellow]")
683 | return
684 |
685 | console.print("\n[bold]Now we have multiple APIs registered:[/bold]")
686 |
687 | # List all registered APIs
688 | await list_apis_demo(api_meta_tool)
689 |
690 | # List all available tools
691 | await list_tools_demo(api_meta_tool)
692 |
693 | # Call a tool from each API
694 | for api_name in registered_apis:
695 | console.print(f"\n[bold]Calling a tool from {api_name} API:[/bold]")
696 | await call_tool_demo(api_meta_tool, api_name)
697 |
698 | # Clean up: unregister all APIs
699 | for api_name in registered_apis:
700 | console.print(f"\n[bold]Cleaning up: Unregistering {api_name} API:[/bold]")
701 | await unregister_api_demo(api_meta_tool, api_name)
702 |
703 |
704 | async def run_full_demo(api_meta_tool: APIMetaTool) -> None:
705 | """Run the complete demonstration sequence with proper progress tracking.
706 |
707 | Args:
708 | api_meta_tool: The APIMetaTool instance
709 | """
710 | console.print(Rule("[bold cyan]RUNNING FULL META API DEMONSTRATION[/bold cyan]"))
711 |
712 | steps_md = """
713 | ## Full Demonstration Steps
714 |
715 | 1. **Register API** - Add a new API from OpenAPI spec
716 | 2. **List APIs** - View all registered APIs
717 | 3. **API Details** - Explore API information
718 | 4. **Tool Details** - Get specific tool info
719 | 5. **Call Tool** - Execute an API endpoint
720 | 6. **Refresh API** - Update API definition
721 | 7. **Multi-API Demo** - Work with multiple APIs
722 | 8. **Cleanup** - Unregister APIs
723 | """
724 |
725 | console.print(Markdown(steps_md))
726 |
727 | # Wait for user confirmation
728 | continue_demo = Confirm.ask("\nReady to start the full demonstration?")
729 | if not continue_demo:
730 | console.print("[yellow]Demonstration cancelled.[/yellow]")
731 | return
732 |
733 | # Create list of demo steps for tracking progress
734 | demo_steps: List[str] = [
735 | "Register API",
736 | "List APIs",
737 | "API Details",
738 | "Tool Details",
739 | "Call Tool",
740 | "Refresh API",
741 | "Multi-API Demo",
742 | "Cleanup"
743 | ]
744 |
745 | with Progress(
746 | SpinnerColumn(),
747 | TextColumn("[progress.description]{task.description}"),
748 | transient=False
749 | ) as progress:
750 | overall_task = progress.add_task("[bold cyan]Running full demo...", total=len(demo_steps))
751 |
752 | try:
753 | # 1. Register the Petstore API
754 | progress.update(overall_task, description="[bold cyan]STEP 1: Register the Petstore API[/bold cyan]")
755 | await register_api_demo(api_meta_tool, "petstore")
756 | progress.advance(overall_task)
757 | input("\nPress Enter to continue...")
758 |
759 | # Continue with other steps...
760 | # ... (rest of the function remains unchanged)
761 |
762 | except Exception as e:
763 | console.print(f"[bold red]Error during full demonstration:[/bold red] {str(e)}")
764 |
765 |
766 | async def interactive_demo(api_meta_tool: APIMetaTool):
767 | """Run an interactive menu-driven demonstration.
768 |
769 | Args:
770 | api_meta_tool: The APIMetaTool instance
771 | """
772 | while True:
773 | console.clear()
774 | await show_intro()
775 |
776 | console.print("[bold cyan]META API TOOL MENU[/bold cyan]", justify="center")
777 | console.print("Select an action to demonstrate:", justify="center")
778 | console.print()
779 |
780 | # Menu options
781 | options = [
782 | ("Register an API", "Register a new API from its OpenAPI specification"),
783 | ("List Registered APIs", "List all currently registered APIs"),
784 | ("API Details", "Get detailed information about a registered API"),
785 | ("Tool Details", "Get detailed information about a specific tool"),
786 | ("Call a Tool", "Call a dynamically registered tool"),
787 | ("List All Tools", "List all available tools across all APIs"),
788 | ("Refresh an API", "Refresh an API to update its endpoints"),
789 | ("Unregister an API", "Unregister an API and all its tools"),
790 | ("Multi-API Demo", "Demonstrate using multiple APIs together"),
791 | ("Run Full Demo", "Run the complete demonstration sequence"),
792 | ("Exit", "Exit the demonstration")
793 | ]
794 |
795 | # Display menu
796 | menu_table = Table(box=None, highlight=True, border_style=None)
797 | menu_table.add_column("Option", style="cyan", justify="right")
798 | menu_table.add_column("Description", style="white")
799 |
800 | for i, (option, description) in enumerate(options, 1):
801 | menu_table.add_row(f"{i}. {option}", description)
802 |
803 | console.print(menu_table)
804 | console.print()
805 |
806 | # Get user choice
807 | try:
808 | choice = Prompt.ask(
809 | "[bold green]Enter option number",
810 | choices=[str(i) for i in range(1, len(options) + 1)],
811 | default="1"
812 | )
813 | choice = int(choice)
814 |
815 | if choice == len(options): # Exit option
816 | console.print("[yellow]Exiting demonstration. Goodbye![/yellow]")
817 | break
818 |
819 | # Clear screen for the selected demo
820 | console.clear()
821 | await show_intro()
822 |
823 | # Run the selected demo
824 | if choice == 1: # Register an API
825 | api_choice = Prompt.ask(
826 | "[bold green]Select an API to register",
827 | choices=["1", "2", "3"],
828 | default="1"
829 | )
830 | api_name = list(DEMO_APIS.keys())[int(api_choice) - 1]
831 | await register_api_demo(api_meta_tool, api_name)
832 |
833 | elif choice == 2: # List Registered APIs
834 | await list_apis_demo(api_meta_tool)
835 |
836 | elif choice == 3: # API Details
837 | apis = await list_apis_demo(api_meta_tool)
838 | api_names = list(apis.get("apis", {}).keys())
839 |
840 | if not api_names:
841 | console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
842 | else:
843 | api_options = {str(i): name for i, name in enumerate(api_names, 1)}
844 | api_choice = Prompt.ask(
845 | "[bold green]Select an API",
846 | choices=list(api_options.keys()),
847 | default="1"
848 | )
849 | api_name = api_options[api_choice]
850 | await get_api_details_demo(api_meta_tool, api_name)
851 |
852 | elif choice == 4: # Tool Details
853 | apis = await list_apis_demo(api_meta_tool)
854 | api_names = list(apis.get("apis", {}).keys())
855 |
856 | if not api_names:
857 | console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
858 | else:
859 | api_options = {str(i): name for i, name in enumerate(api_names, 1)}
860 | api_choice = Prompt.ask(
861 | "[bold green]Select an API",
862 | choices=list(api_options.keys()),
863 | default="1"
864 | )
865 | api_name = api_options[api_choice]
866 | await get_tool_details_demo(api_meta_tool, api_name)
867 |
868 | elif choice == 5: # Call a Tool
869 | apis = await list_apis_demo(api_meta_tool)
870 | api_names = list(apis.get("apis", {}).keys())
871 |
872 | if not api_names:
873 | console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
874 | else:
875 | api_options = {str(i): name for i, name in enumerate(api_names, 1)}
876 | api_choice = Prompt.ask(
877 | "[bold green]Select an API",
878 | choices=list(api_options.keys()),
879 | default="1"
880 | )
881 | api_name = api_options[api_choice]
882 | await call_tool_demo(api_meta_tool, api_name)
883 |
884 | elif choice == 6: # List All Tools
885 | await list_tools_demo(api_meta_tool)
886 |
887 | elif choice == 7: # Refresh an API
888 | apis = await list_apis_demo(api_meta_tool)
889 | api_names = list(apis.get("apis", {}).keys())
890 |
891 | if not api_names:
892 | console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
893 | else:
894 | api_options = {str(i): name for i, name in enumerate(api_names, 1)}
895 | api_choice = Prompt.ask(
896 | "[bold green]Select an API",
897 | choices=list(api_options.keys()),
898 | default="1"
899 | )
900 | api_name = api_options[api_choice]
901 | await refresh_api_demo(api_meta_tool, api_name)
902 |
903 | elif choice == 8: # Unregister an API
904 | apis = await list_apis_demo(api_meta_tool)
905 | api_names = list(apis.get("apis", {}).keys())
906 |
907 | if not api_names:
908 | console.print("[yellow]No APIs are registered. Please register an API first.[/yellow]")
909 | else:
910 | api_options = {str(i): name for i, name in enumerate(api_names, 1)}
911 | api_choice = Prompt.ask(
912 | "[bold green]Select an API to unregister",
913 | choices=list(api_options.keys()),
914 | default="1"
915 | )
916 | api_name = api_options[api_choice]
917 | await unregister_api_demo(api_meta_tool, api_name)
918 |
919 | elif choice == 9: # Multi-API Demo
920 | await run_multi_api_demo(api_meta_tool)
921 |
922 | elif choice == 10: # Run Full Demo
923 | await run_full_demo(api_meta_tool)
924 |
925 | # Wait for user to press Enter before returning to menu
926 | input("\nPress Enter to return to the menu...")
927 |
928 | except Exception as e:
929 | console.print(f"[bold red]Error:[/bold red] {str(e)}")
930 | input("\nPress Enter to return to the menu...")
931 |
932 |
933 | async def main():
934 | """Main entry point for the demonstration."""
935 | try:
936 | # Set up the MCP client using create_app
937 | print("=== API Meta-Tool Demo ===")
938 | app = create_app() # noqa: F841
939 |
940 | # Access the globally initialized Gateway instance and its api_meta_tool
941 | gateway_instance = ultimate_mcp_server.core._gateway_instance
942 | if not gateway_instance:
943 | raise RuntimeError("Gateway instance not initialized by create_app.")
944 |
945 | api_meta_tool = gateway_instance.api_meta_tool
946 | if not api_meta_tool:
947 | raise RuntimeError("API Meta Tool instance not found on Gateway. Ensure it was registered.")
948 |
949 | # Run the interactive demo with the retrieved instance
950 | await interactive_demo(api_meta_tool)
951 |
952 | except KeyboardInterrupt:
953 | console.print("\n[yellow]Demonstration interrupted by user.[/yellow]")
954 | except Exception as e:
955 | console.print(f"\n[bold red]Error:[/bold red] {str(e)}")
956 | finally:
957 | console.print("\n[bold green]Meta API Tool Demonstration completed![/bold green]")
958 |
959 | return 0
960 |
961 |
962 | if __name__ == "__main__":
963 | exit_code = asyncio.run(main())
964 | sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/examples/filesystem_operations_demo.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Filesystem operations demo for Ultimate MCP Server Tools.
3 |
4 | This example demonstrates the secure asynchronous filesystem operations tools,
5 | covering file/directory manipulation, searching, metadata retrieval, and
6 | security features like allowed directory restrictions and deletion protection.
7 | """
8 | import argparse
9 | import asyncio
10 | import json
11 | import os
12 | import platform
13 | import shutil
14 | import sys
15 | import tempfile
16 | import time
17 | from pathlib import Path
18 |
19 | # --- Configuration --- (Standard libs only here)
20 | # Add project root to path for imports when running as script
21 | # Adjust this path if your script location relative to the project root differs
22 | try:
23 | PROJECT_ROOT = Path(__file__).resolve().parent.parent
24 | if not (PROJECT_ROOT / "ultimate").is_dir():
25 | # Fallback if running from a different structure
26 | PROJECT_ROOT = Path(__file__).resolve().parent
27 | if not (PROJECT_ROOT / "ultimate").is_dir():
28 | print("Error: Could not reliably determine project root. Make sure ultimate is importable.", file=sys.stderr)
29 | sys.exit(1)
30 | sys.path.insert(0, str(PROJECT_ROOT))
31 |
32 | # --- Important: Set Environment Variables FIRST ---
33 | DEMO_TEMP_DIR = tempfile.mkdtemp(prefix="ultimate_fs_demo_")
34 | os.environ["FILESYSTEM__ALLOWED_DIRECTORIES"] = json.dumps([DEMO_TEMP_DIR])
35 | os.environ["GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES"] = json.dumps([DEMO_TEMP_DIR])
36 | os.environ["GATEWAY_FORCE_CONFIG_RELOAD"] = "true"
37 |
38 | print(f"INFO: Temporarily allowing access to: {DEMO_TEMP_DIR}")
39 | print("DEBUG: Environment variables set:")
40 | print(f" FILESYSTEM__ALLOWED_DIRECTORIES = {os.environ['FILESYSTEM__ALLOWED_DIRECTORIES']}")
41 | print(f" GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES = {os.environ['GATEWAY_FILESYSTEM_ALLOWED_DIRECTORIES']}")
42 | except Exception as e:
43 | print(f"Error during initial setup: {e}", file=sys.stderr)
44 | sys.exit(1)
45 |
46 | # --- Defer ALL ultimate imports until AFTER env vars are set ---
47 | # Import Rich components (can happen earlier, but keep grouped for clarity)
48 | from rich.markup import escape
49 | from rich.panel import Panel
50 | from rich.rule import Rule
51 |
52 | from ultimate_mcp_server.config import get_config
53 |
54 | # Import necessary exceptions
55 | # Filesystem Tools
56 | from ultimate_mcp_server.tools.filesystem import (
57 | create_directory,
58 | delete_path,
59 | directory_tree,
60 | edit_file,
61 | get_file_info,
62 | list_allowed_directories,
63 | list_directory,
64 | move_file,
65 | read_file,
66 | read_multiple_files,
67 | search_files,
68 | write_file,
69 | )
70 | from ultimate_mcp_server.utils import get_logger
71 | from ultimate_mcp_server.utils.display import generate_rich_directory_tree, safe_tool_call
72 |
73 | # Shared console and display utils
74 | from ultimate_mcp_server.utils.logging.console import console
75 |
76 | # Initialize logger AFTER all relevant imports
77 | logger = get_logger("example.filesystem")
78 |
79 | def parse_arguments():
80 | """Parse command line arguments for the demo."""
81 | parser = argparse.ArgumentParser(
82 | description="Filesystem Operations Demo for Ultimate MCP Server Tools",
83 | formatter_class=argparse.RawDescriptionHelpFormatter,
84 | epilog="""Available demos:
85 | all - Run all demos (default)
86 | read - File reading operations
87 | write - File writing and editing operations
88 | directory - Directory operations (create, list, tree)
89 | move_delete - Move, delete, search & info operations
90 | security - Security features demo
91 | """
92 | )
93 |
94 | parser.add_argument('demo', nargs='?', default='all',
95 | choices=['all', 'read', 'write', 'directory', 'move_delete', 'security'],
96 | help='Specific demo to run (default: all)')
97 |
98 | parser.add_argument('-v', '--verbose', action='store_true',
99 | help='Increase output verbosity')
100 |
101 | parser.add_argument('--rich-tree', action='store_true',
102 | help='Use enhanced rich tree visualization for directory trees')
103 |
104 | return parser.parse_args()
105 |
106 | # --- Verify Configuration Loading ---
107 | def verify_config():
108 | """Verify that the filesystem configuration has loaded correctly."""
109 | try:
110 | # Get config only ONCE
111 | config = get_config()
112 | fs_config = config.filesystem
113 | allowed_dirs = fs_config.allowed_directories
114 |
115 | print("Configuration verification:")
116 | print(f" Allowed directories: {allowed_dirs}")
117 |
118 | if not allowed_dirs:
119 | print("WARNING: No allowed directories loaded in filesystem configuration!")
120 | print("Check these environment variables:")
121 | for key in os.environ:
122 | if "ALLOWED_DIRECTORIES" in key:
123 | print(f" {key} = {os.environ[key]}")
124 | print(f"DEMO_TEMP_DIR set to: {DEMO_TEMP_DIR}")
125 | # Do NOT attempt to force update - rely on initial load
126 | print("ERROR: Configuration failed to load allowed_directories from environment variables.")
127 | return False # Fail verification if dirs are missing
128 |
129 | # If allowed_dirs were loaded, check if our temp dir is in it
130 | if DEMO_TEMP_DIR in allowed_dirs:
131 | print(f"SUCCESS: Temporary directory {DEMO_TEMP_DIR} properly loaded in configuration!")
132 | return True
133 | else:
134 | print(f"WARNING: Temporary directory {DEMO_TEMP_DIR} not found in loaded allowed dirs: {allowed_dirs}")
135 | return False # Fail verification if temp dir is missing
136 |
137 | except Exception as e:
138 | print(f"ERROR during config verification: {e}")
139 | import traceback
140 | traceback.print_exc()
141 | return False
142 |
143 | # --- Demo Setup ---
144 | # DEMO_ROOT is the base *within* the allowed temporary directory
145 | DEMO_ROOT = Path(DEMO_TEMP_DIR) / "demo_project"
146 | BULK_FILES_COUNT = 110 # Number of files to create for deletion protection demo (>100)
147 |
148 | async def setup_demo_environment():
149 | """Create a temporary directory structure for the demo."""
150 | logger.info("Setting up demo environment...", emoji_key="setup")
151 | DEMO_ROOT.mkdir(parents=True, exist_ok=True)
152 |
153 | # Create subdirectories
154 | project_dirs = [
155 | DEMO_ROOT / "docs",
156 | DEMO_ROOT / "src" / "utils",
157 | DEMO_ROOT / "data",
158 | DEMO_ROOT / "config",
159 | DEMO_ROOT / "tests",
160 | DEMO_ROOT / ".hidden_dir",
161 | DEMO_ROOT / "bulk_files" # For deletion protection demo
162 | ]
163 | for directory in project_dirs:
164 | directory.mkdir(parents=True, exist_ok=True)
165 |
166 | # Create some sample files
167 | sample_files = {
168 | DEMO_ROOT / "README.md": """# Project Demo
169 |
170 | This is a demonstration project for testing the secure filesystem operations.
171 |
172 | ## Features
173 |
174 | - File reading and writing
175 | - Directory manipulation
176 | - File searching capabilities
177 | - Metadata retrieval
178 |
179 | ## Security
180 |
181 | All operations are restricted to allowed directories for safety.""",
182 |
183 | DEMO_ROOT / "src" / "main.py": """#!/usr/bin/env python
184 | '''Main entry point for the demo application.'''
185 | import sys
186 | from pathlib import Path
187 | # Import logger for edit demo
188 | # Assume logger is configured elsewhere
189 | # import logging
190 | # logger = logging.getLogger(__name__)
191 |
192 | # A line with different whitespace for editing demo
193 | def main():
194 | '''Main function to run the application.'''
195 | print("Hello from the demo application!")
196 |
197 | # Get configuration
198 | config = get_config_local() # Renamed to avoid conflict
199 | print(f"Running with debug mode: {config['debug']}")
200 |
201 | return 0
202 |
203 | def get_config_local(): # Renamed
204 | '''Get application configuration.'''
205 | return {
206 | "debug": True,
207 | "log_level": "INFO",
208 | "max_connections": 10
209 | }
210 |
211 | if __name__ == "__main__":
212 | sys.exit(main())
213 | """,
214 |
215 | DEMO_ROOT / "src" / "utils" / "helpers.py": """'''Helper utilities for the application.'''
216 |
217 | def format_message(message, level="info"):
218 | '''Format a message with level prefix.'''
219 | return f"[{level.upper()}] {message}"
220 |
221 | class DataProcessor:
222 | '''Process application data.'''
223 |
224 | def __init__(self, data_source):
225 | self.data_source = data_source
226 |
227 | def process(self):
228 | '''Process the data.'''
229 | # TODO: Implement actual processing
230 | return f"Processed {self.data_source}"
231 | """,
232 |
233 | DEMO_ROOT / "docs" / "api.md": """# API Documentation
234 |
235 | ## Endpoints
236 |
237 | ### GET /api/v1/status
238 |
239 | Returns the current system status.
240 |
241 | ### POST /api/v1/data
242 |
243 | Submit data for processing.
244 |
245 | ## Authentication
246 |
247 | All API calls require an authorization token.
248 | """,
249 | DEMO_ROOT / "config" / "settings.json": """{
250 | "appName": "Demo Application",
251 | "version": "1.0.0",
252 | "debug": false,
253 | "database": {
254 | "host": "localhost",
255 | "port": 5432,
256 | "name": "demo_db"
257 | },
258 | "logging": {
259 | "level": "info",
260 | "file": "app.log"
261 | }
262 | }""",
263 | DEMO_ROOT / "data" / "sample.csv": "ID,Value,Category\n1,10.5,A\n2,15.2,B\n3,9.8,A",
264 | DEMO_ROOT / "tests" / "test_helpers.py": """import pytest
265 | # Adjust import path if needed relative to test execution
266 | from src.utils.helpers import format_message
267 |
268 | def test_format_message():
269 | assert format_message("Test", "debug") == "[DEBUG] Test"
270 | """,
271 | DEMO_ROOT / ".gitignore": "*.log\n*.tmp\n.hidden_dir/\n",
272 | DEMO_ROOT / "temp.log": "Log file content - should be excluded by search patterns.",
273 | # Add a file with potentially non-UTF8 data (simulated)
274 | DEMO_ROOT / "data" / "binary_data.bin": b'\x80\x02\x95\n\x00\x00\x00\x00\x00\x00\x00}\x94\x8c\x04data\x94\x8c\x06binary\x94s.'
275 | }
276 |
277 | for file_path, content in sample_files.items():
278 | file_path.parent.mkdir(parents=True, exist_ok=True)
279 | if isinstance(content, str):
280 | file_path.write_text(content, encoding='utf-8')
281 | else:
282 | file_path.write_bytes(content)
283 |
284 | # Create bulk files for deletion protection test
285 | bulk_dir = DEMO_ROOT / "bulk_files"
286 | bulk_dir.mkdir(exist_ok=True) # Ensure bulk dir exists
287 |
288 | # Create files with deliberately varied timestamps to trigger protection
289 | current_time = time.time()
290 | file_types = [".txt", ".log", ".dat", ".csv", ".tmp", ".bak", ".json"]
291 |
292 | for i in range(BULK_FILES_COUNT):
293 | # Use a wider variety of extensions
294 | ext = file_types[i % len(file_types)]
295 | fpath = bulk_dir / f"file_{i:03d}{ext}"
296 | fpath.write_text(f"Content for file {i}")
297 |
298 | # Create highly varied timestamps spanning days/weeks, not just minutes
299 | # Some files very old, some very new, to ensure high standard deviation
300 | if i < BULK_FILES_COUNT // 3:
301 | # First third: older files (30-60 days old)
302 | age = 60 * 60 * 24 * (30 + (i % 30)) # 30-60 days in seconds
303 | elif i < 2 * (BULK_FILES_COUNT // 3):
304 | # Middle third: medium age (1-10 days old)
305 | age = 60 * 60 * 24 * (1 + (i % 10)) # 1-10 days in seconds
306 | else:
307 | # Final third: very recent (0-12 hours old)
308 | age = 60 * 60 * (i % 12) # 0-12 hours in seconds
309 |
310 | # Set both access and modification times to the calculated age
311 | try:
312 | timestamp = current_time - age
313 | os.utime(fpath, (timestamp, timestamp))
314 | except OSError as e:
315 | logger.warning(f"Could not set utime for {fpath}: {e}", emoji_key="warning")
316 |
317 | # Add a message about the setup
318 | logger.info(f"Created {BULK_FILES_COUNT} files in 'bulk_files/' with highly varied timestamps and {len(file_types)} different extensions", emoji_key="setup")
319 |
320 | # Create a symlink (if supported)
321 | SYMLINK_PATH = DEMO_ROOT / "link_to_src"
322 | TARGET_PATH = DEMO_ROOT / "src" # Link to src directory
323 | try:
324 | # Check if symlinks are supported (e.g., Windows needs admin rights or dev mode)
325 | can_symlink = hasattr(os, "symlink")
326 | test_link_path = DEMO_ROOT / "test_link_nul_delete"
327 | if platform.system() == "Windows":
328 | # Basic check, might not be perfect
329 | try:
330 | # Use a file target for test link on Windows if dir links need special perms
331 | test_target = DEMO_ROOT / "README.md"
332 | os.symlink(test_target, test_link_path, target_is_directory=False)
333 | test_link_path.unlink() # Clean up test link
334 | except (OSError, AttributeError, NotImplementedError):
335 | can_symlink = False
336 | logger.warning("Symlink creation might not be supported or permitted on this system. Skipping symlink tests.", emoji_key="warning")
337 |
338 | if can_symlink:
339 | # Ensure target exists before creating link
340 | if TARGET_PATH.is_dir():
341 | # Use await aiofiles.os.symlink for consistency? No, os.symlink is sync only.
342 | os.symlink(TARGET_PATH, SYMLINK_PATH, target_is_directory=True)
343 | logger.info(f"Created symlink: {SYMLINK_PATH} -> {TARGET_PATH}", emoji_key="link")
344 | else:
345 | logger.warning(f"Symlink target {TARGET_PATH} does not exist or is not a directory. Skipping symlink creation.", emoji_key="warning")
346 | SYMLINK_PATH = None
347 | else:
348 | SYMLINK_PATH = None # Indicate symlink wasn't created
349 | except OSError as e:
350 | # Handle errors like EEXIST if link already exists, or permission errors
351 | if e.errno == 17: # EEXIST
352 | logger.warning(f"Symlink {SYMLINK_PATH} already exists. Assuming correct setup.", emoji_key="warning")
353 | else:
354 | logger.warning(f"Could not create symlink ({SYMLINK_PATH} -> {TARGET_PATH}): {e}. Skipping symlink tests.", emoji_key="warning")
355 | SYMLINK_PATH = None # Indicate symlink wasn't created
356 | except Exception as e:
357 | logger.error(f"Unexpected error creating symlink: {e}", emoji_key="error", exc_info=True)
358 | SYMLINK_PATH = None
359 |
360 | logger.success(f"Demo environment set up at: {DEMO_ROOT}", emoji_key="success")
361 | console.print(Panel(
362 | f"Created demo project within [cyan]{DEMO_ROOT.parent}[/cyan] at [cyan]{DEMO_ROOT.name}[/cyan]\n"
363 | f"Created [bold]{len(project_dirs)}[/bold] directories and [bold]{len(sample_files)}[/bold] sample files.\n"
364 | f"Created [bold]{BULK_FILES_COUNT}[/bold] files in 'bulk_files/' for deletion test.\n"
365 | f"Symlink created: {'Yes' if SYMLINK_PATH else 'No'}",
366 | title="Demo Environment Ready",
367 | border_style="green",
368 | expand=False
369 | ))
370 | return SYMLINK_PATH
371 |
372 | async def cleanup_demo_environment():
373 | """Remove the temporary directory structure using standard shutil."""
374 | global DEMO_TEMP_DIR
375 | if DEMO_TEMP_DIR and Path(DEMO_TEMP_DIR).exists():
376 | try:
377 | # Use synchronous shutil for cleanup simplicity
378 | shutil.rmtree(DEMO_TEMP_DIR)
379 | logger.info(f"Cleaned up demo directory: {DEMO_TEMP_DIR}", emoji_key="cleanup")
380 | console.print(f"Cleaned up demo directory: [dim]{DEMO_TEMP_DIR}[/dim]")
381 | except Exception as e:
382 | logger.error(f"Error during cleanup of {DEMO_TEMP_DIR}: {e}", emoji_key="error")
383 | console.print(f"[bold red]Error cleaning up demo directory {DEMO_TEMP_DIR}: {e}[/bold red]")
384 | DEMO_TEMP_DIR = None
385 |
386 |
387 | async def demonstrate_file_reading(symlink_path):
388 | """Demonstrate file reading operations."""
389 | console.print(Rule("[bold cyan]1. File Reading Operations[/bold cyan]", style="cyan"))
390 | logger.info("Demonstrating file reading operations...", emoji_key="file")
391 |
392 | # --- Read Single File (Text) ---
393 | readme_path = str(DEMO_ROOT / "README.md")
394 | await safe_tool_call(read_file, {"path": readme_path}, description="Reading a text file (README.md)")
395 |
396 | # --- Read Single File (JSON) ---
397 | settings_path = str(DEMO_ROOT / "config" / "settings.json")
398 | await safe_tool_call(read_file, {"path": settings_path}, description="Reading a JSON file (settings.json)")
399 |
400 | # --- Read Single File (Simulated Binary) ---
401 | binary_path = str(DEMO_ROOT / "data" / "binary_data.bin")
402 | await safe_tool_call(read_file, {"path": binary_path}, description="Reading a binary file (expecting hex preview)")
403 |
404 | # --- Read Non-Existent File ---
405 | non_existent_path = str(DEMO_ROOT / "non_existent.txt")
406 | await safe_tool_call(read_file, {"path": non_existent_path}, description="Attempting to read a non-existent file (should fail)")
407 |
408 | # --- Read a Directory (should fail) ---
409 | dir_path = str(DEMO_ROOT / "src")
410 | await safe_tool_call(read_file, {"path": dir_path}, description="Attempting to read a directory as a file (should fail)")
411 |
412 | # --- Read Multiple Files (Success and Failure Mix) ---
413 | paths_to_read = [
414 | str(DEMO_ROOT / "README.md"),
415 | str(DEMO_ROOT / "src" / "main.py"),
416 | str(DEMO_ROOT / "non_existent.txt"), # This one will fail
417 | str(DEMO_ROOT / "config" / "settings.json"),
418 | str(DEMO_ROOT / "src") # Reading a directory will also fail here
419 | ]
420 | await safe_tool_call(read_multiple_files, {"paths": paths_to_read}, description="Reading multiple files (including some that will fail)")
421 |
422 | # --- Read file via Symlink (if created) ---
423 | if symlink_path:
424 | # Reading a file within the linked directory
425 | linked_file_path = str(symlink_path / "main.py")
426 | await safe_tool_call(read_file, {"path": linked_file_path}, description=f"Reading a file via symlink ({os.path.basename(symlink_path)}/main.py)")
427 |
428 | async def demonstrate_file_writing_editing():
429 | """Demonstrate file writing and editing operations."""
430 | console.print(Rule("[bold cyan]2. File Writing & Editing Operations[/bold cyan]", style="cyan"))
431 | logger.info("Demonstrating file writing and editing operations...", emoji_key="file")
432 |
433 | # --- Write New File ---
434 | new_file_path = str(DEMO_ROOT / "data" / "report.md")
435 | file_content = """# Analysis Report
436 |
437 | ## Summary
438 | This report contains the analysis of project performance metrics.
439 |
440 | ## Key Findings
441 | 1. Response time improved by 15%
442 | 2. Error rate decreased to 0.5%
443 | 3. User satisfaction score: 4.8/5.0
444 |
445 | ## Recommendations
446 | - Continue monitoring performance
447 | - Implement suggested optimizations
448 | - Schedule follow-up review next quarter
449 | """
450 | await safe_tool_call(write_file, {"path": new_file_path, "content": file_content}, description="Writing a new file (report.md)")
451 |
452 | # --- Overwrite Existing File ---
453 | overwrite_content = "# Analysis Report (V2)\n\nReport updated."
454 | await safe_tool_call(write_file, {"path": new_file_path, "content": overwrite_content}, description="Overwriting the existing file (report.md)")
455 | # Verify overwrite
456 | await safe_tool_call(read_file, {"path": new_file_path}, description="Reading the overwritten file to verify")
457 |
458 | # --- Attempt to Write to a Directory (should fail) ---
459 | await safe_tool_call(write_file, {"path": str(DEMO_ROOT / "src"), "content": "test"}, description="Attempting to write over a directory (should fail)")
460 |
461 | # --- Edit File (main.py) ---
462 | target_edit_file = str(DEMO_ROOT / "src" / "main.py")
463 |
464 | # Edits including one requiring whitespace-insensitive fallback
465 | edits = [
466 | {
467 | "oldText": 'print("Hello from the demo application!")', # Exact match
468 | "newText": 'print("Hello from the UPDATED demo application!")\n logger.info("App started")'
469 | },
470 | {
471 | # This uses different leading whitespace than the original file
472 | "oldText": "def main():\n '''Main function to run the application.'''",
473 | # Expected fallback behavior: find based on stripped lines, replace using original indentation
474 | "newText": "def main():\n '''The primary execution function.''' # Docstring updated"
475 | },
476 | {
477 | "oldText": ' return {\n "debug": True,\n "log_level": "INFO",\n "max_connections": 10\n }',
478 | "newText": ' return {\n "debug": False, # Changed to False\n "log_level": "WARNING",\n "max_connections": 25 # Increased limit\n }'
479 | }
480 | ]
481 |
482 | await safe_tool_call(read_file, {"path": target_edit_file}, description="Reading main.py before editing")
483 |
484 | # Edit with Dry Run
485 | await safe_tool_call(edit_file, {"path": target_edit_file, "edits": edits, "dry_run": True}, description="Editing main.py (Dry Run - showing diff)")
486 |
487 | # Apply Edits for Real
488 | await safe_tool_call(edit_file, {"path": target_edit_file, "edits": edits, "dry_run": False}, description="Applying edits to main.py")
489 |
490 | # Verify Edits
491 | await safe_tool_call(read_file, {"path": target_edit_file}, description="Reading main.py after editing")
492 |
493 | # --- Edit with Non-Existent Old Text (should fail) ---
494 | failed_edit = [{"oldText": "This text does not exist in the file", "newText": "Replacement"}]
495 | await safe_tool_call(edit_file, {"path": target_edit_file, "edits": failed_edit}, description="Attempting edit with non-existent 'oldText' (should fail)")
496 |
497 |
498 | async def demonstrate_directory_operations(symlink_path, use_rich_tree=False):
499 | """Demonstrate directory creation, listing, and tree view."""
500 | console.print(Rule("[bold cyan]3. Directory Operations[/bold cyan]", style="cyan"))
501 | logger.info("Demonstrating directory operations...", emoji_key="directory")
502 |
503 | # --- Create Directory ---
504 | # First ensure parent directory exists
505 | logs_dir_path = str(DEMO_ROOT / "logs")
506 | await safe_tool_call(create_directory, {"path": logs_dir_path}, description="Creating parent directory (logs)")
507 |
508 | # Now create nested directory
509 | new_dir_path = str(DEMO_ROOT / "logs" / "debug")
510 | await safe_tool_call(create_directory, {"path": new_dir_path}, description="Creating a new nested directory (logs/debug)")
511 |
512 | # --- Create Directory (already exists) ---
513 | await safe_tool_call(create_directory, {"path": new_dir_path}, description="Attempting to create the same directory again (idempotent)")
514 |
515 | # --- Attempt to Create Directory over a File (should fail) ---
516 | file_path_for_dir = str(DEMO_ROOT / "README.md")
517 | await safe_tool_call(create_directory, {"path": file_path_for_dir}, description="Attempting to create directory over an existing file (README.md - should fail)")
518 |
519 | # --- List Directory (Root) ---
520 | await safe_tool_call(list_directory, {"path": str(DEMO_ROOT)}, description=f"Listing contents of demo root ({DEMO_ROOT.name})")
521 |
522 | # --- List Directory (Subdir) ---
523 | await safe_tool_call(list_directory, {"path": str(DEMO_ROOT / "src")}, description="Listing contents of subdirectory (src)")
524 |
525 | # --- List Directory (via Symlink, if created) ---
526 | if symlink_path:
527 | await safe_tool_call(list_directory, {"path": str(symlink_path)}, description=f"Listing contents via symlink ({os.path.basename(symlink_path)})")
528 |
529 | # --- List Non-Existent Directory (should fail) ---
530 | await safe_tool_call(list_directory, {"path": str(DEMO_ROOT / "no_such_dir")}, description="Attempting to list non-existent directory (should fail)")
531 |
532 | # --- Enhanced visualization for directory tree if requested ---
533 | if use_rich_tree:
534 | # Restore direct call to async tree generator utility
535 | console.print("\n[bold cyan]Enhanced Directory Tree Visualization (Async Tool Based)[/bold cyan]")
536 |
537 | try:
538 | # Generate the tree using the async utility function from display.py
539 | rich_tree = await generate_rich_directory_tree(str(DEMO_ROOT), max_depth=3)
540 | console.print(rich_tree)
541 | except Exception as e:
542 | logger.error(f"Error generating async directory tree: {e}", exc_info=True)
543 | console.print(f"[bold red]Error generating directory tree: {escape(str(e))}[/bold red]")
544 |
545 | console.print() # Add newline
546 |
547 | # --- Directory Tree (Default Depth) --- # This uses the directory_tree TOOL
548 | # The safe_tool_call will now use its built-in tree renderer for this standard call
549 | # Note: The tool 'directory_tree' produces a similar but potentially slightly different
550 | # structure/detail level than the custom async generator above.
551 | await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT)}, description="Generating directory tree for demo root (default depth - using tool)")
552 |
553 | # --- Directory Tree (Specific Depth) ---
554 | await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT), "max_depth": 1}, description="Generating directory tree (max_depth=1)")
555 |
556 | # --- Directory Tree (Include Size) ---
557 | await safe_tool_call(directory_tree, {"path": str(DEMO_ROOT), "max_depth": 2, "include_size": True}, description="Generating directory tree (max_depth=2, include_size=True)")
558 |
559 | # --- Directory Tree (via Symlink, if created) ---
560 | if symlink_path:
561 | await safe_tool_call(directory_tree, {"path": str(symlink_path), "max_depth": 1}, description=f"Generating directory tree via symlink ({os.path.basename(symlink_path)}, max_depth=1)")
562 |
563 | async def demonstrate_move_delete_search(symlink_path):
564 | """Demonstrate file/directory moving, deletion, searching, and info retrieval."""
565 | console.print(Rule("[bold cyan]4. Move, Delete, Search & Info Operations[/bold cyan]", style="cyan"))
566 | logger.info("Demonstrating move, delete, search, info operations...", emoji_key="file")
567 |
568 | # --- Get File Info (File) ---
569 | settings_json_path = str(DEMO_ROOT / "config" / "settings.json")
570 | await safe_tool_call(get_file_info, {"path": settings_json_path}, description="Getting file info for settings.json")
571 |
572 | # --- Get File Info (Directory) ---
573 | src_dir_path = str(DEMO_ROOT / "src")
574 | await safe_tool_call(get_file_info, {"path": src_dir_path}, description="Getting file info for src directory")
575 |
576 | # --- Get File Info (Symlink, if created) ---
577 | if symlink_path:
578 | await safe_tool_call(get_file_info, {"path": str(symlink_path)}, description=f"Getting file info for symlink ({os.path.basename(symlink_path)}) - uses lstat")
579 |
580 | # --- Search Files (Name Match, Case Insensitive) ---
581 | await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "readme"}, description="Searching for 'readme' (case insensitive)")
582 |
583 | # --- Search Files (Name Match, Case Sensitive) ---
584 | await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "README", "case_sensitive": True}, description="Searching for 'README' (case sensitive)")
585 |
586 | # --- Search Files (With Exclusions) ---
587 | await safe_tool_call(search_files,
588 | {"path": str(DEMO_ROOT), "pattern": ".py", "exclude_patterns": ["*/test*", ".hidden_dir/*"]},
589 | description="Searching for '*.py', excluding tests and hidden dir")
590 |
591 | # --- Search Files (Content Search) ---
592 | await safe_tool_call(search_files,
593 | {"path": str(DEMO_ROOT), "pattern": "localhost", "search_content": True},
594 | description="Searching for content 'localhost' inside files")
595 |
596 | # --- Search Files (Content Search, Case Sensitive) ---
597 | await safe_tool_call(search_files,
598 | {"path": str(DEMO_ROOT), "pattern": "DataProcessor", "search_content": True, "case_sensitive": True},
599 | description="Searching for content 'DataProcessor' (case sensitive)")
600 |
601 | # --- Search Files (No Matches) ---
602 | await safe_tool_call(search_files, {"path": str(DEMO_ROOT), "pattern": "xyz_no_match_xyz"}, description="Searching for pattern guaranteed not to match")
603 |
604 | # --- Move File ---
605 | source_move_path = str(DEMO_ROOT / "data" / "sample.csv")
606 | dest_move_path = str(DEMO_ROOT / "data" / "renamed_sample.csv")
607 | await safe_tool_call(move_file, {"source": source_move_path, "destination": dest_move_path}, description="Moving (renaming) sample.csv")
608 | # Verify move by trying to get info on new path
609 | await safe_tool_call(get_file_info, {"path": dest_move_path}, description="Verifying move by getting info on new path")
610 |
611 | # --- Move File (Overwrite) ---
612 | # First create a file to be overwritten
613 | overwrite_target_path = str(DEMO_ROOT / "data" / "overwrite_me.txt")
614 | await safe_tool_call(write_file, {"path": overwrite_target_path, "content": "Original content"}, description="Creating file to be overwritten")
615 | # Now move onto it with overwrite=True
616 | await safe_tool_call(move_file,
617 | {"source": dest_move_path, "destination": overwrite_target_path, "overwrite": True},
618 | description="Moving renamed_sample.csv onto overwrite_me.txt (overwrite=True)")
619 | # Verify overwrite
620 | await safe_tool_call(get_file_info, {"path": overwrite_target_path}, description="Verifying overwrite by getting info")
621 |
622 | # --- Move Directory ---
623 | source_dir_move = str(DEMO_ROOT / "tests")
624 | dest_dir_move = str(DEMO_ROOT / "tests_moved")
625 | await safe_tool_call(move_file, {"source": source_dir_move, "destination": dest_dir_move}, description="Moving the 'tests' directory")
626 | # Verify move
627 | await safe_tool_call(list_directory, {"path": dest_dir_move}, description="Verifying directory move by listing new path")
628 |
629 | # --- Attempt Move (Destination Exists, No Overwrite - should fail) ---
630 | await safe_tool_call(move_file,
631 | {"source": str(DEMO_ROOT / "README.md"), "destination": str(DEMO_ROOT / "config" / "settings.json")},
632 | description="Attempting to move README.md onto settings.json (no overwrite - should fail)")
633 |
634 | # --- Delete File ---
635 | file_to_delete = str(DEMO_ROOT / "temp.log")
636 | await safe_tool_call(get_file_info, {"path": file_to_delete}, description="Checking temp.log exists before deleting")
637 | await safe_tool_call(delete_path, {"path": file_to_delete}, description="Deleting single file (temp.log)")
638 | await safe_tool_call(get_file_info, {"path": file_to_delete}, description="Verifying temp.log deletion (should fail)")
639 |
640 | # --- Delete Symlink (if created) ---
641 | if symlink_path:
642 | # Get the exact path string to the symlink without resolving it
643 | symlink_str = str(symlink_path)
644 | await safe_tool_call(get_file_info, {"path": symlink_str}, description=f"Checking symlink {os.path.basename(symlink_path)} exists before deleting")
645 |
646 | # Explicitly tell the user what we're doing
647 | console.print(f"[cyan]Note:[/cyan] Deleting the symlink itself (not its target) at path: {symlink_str}")
648 |
649 | await safe_tool_call(delete_path, {"path": symlink_str}, description=f"Deleting symlink ({os.path.basename(symlink_path)})")
650 | await safe_tool_call(get_file_info, {"path": symlink_str}, description="Verifying symlink deletion (should fail)")
651 |
652 | # --- Delete Empty Directory ---
653 | empty_dir_to_delete = str(DEMO_ROOT / "logs" / "debug") # Created earlier, should be empty
654 | await safe_tool_call(get_file_info, {"path": empty_dir_to_delete}, description="Checking logs/debug exists before deleting")
655 | await safe_tool_call(delete_path, {"path": empty_dir_to_delete}, description="Deleting empty directory (logs/debug)")
656 | await safe_tool_call(get_file_info, {"path": empty_dir_to_delete}, description="Verifying empty directory deletion (should fail)")
657 |
658 | # --- Delete Directory with Content (Testing Deletion Protection) ---
659 | bulk_dir_path = str(DEMO_ROOT / "bulk_files")
660 | console.print(Panel(
661 | f"Attempting to delete directory '{os.path.basename(bulk_dir_path)}' which contains {BULK_FILES_COUNT} files.\n"
662 | "This will trigger the deletion protection check (heuristics based on file count, timestamps, types).\n"
663 | "Whether it blocks depends on the config thresholds and calculated variances.",
664 | title="🛡️ Testing Deletion Protection 🛡️", border_style="yellow"
665 | ))
666 | # This call might raise ProtectionTriggeredError, which safe_tool_call will catch and display
667 | await safe_tool_call(delete_path, {"path": bulk_dir_path}, description=f"Deleting directory with {BULK_FILES_COUNT} files (bulk_files)")
668 | # Check if it was actually deleted or blocked by protection
669 | await safe_tool_call(get_file_info, {"path": bulk_dir_path}, description="Checking if bulk_files directory still exists after delete attempt")
670 |
671 |
672 | async def demonstrate_security_features():
673 | """Demonstrate security features like allowed directories."""
674 | console.print(Rule("[bold cyan]5. Security Features[/bold cyan]", style="cyan"))
675 | logger.info("Demonstrating security features...", emoji_key="security")
676 |
677 | # --- List Allowed Directories ---
678 | # This reads from the config (which we set via env var for the demo)
679 | await safe_tool_call(list_allowed_directories, {}, description="Listing configured allowed directories")
680 | console.print(f"[dim]Note: For this demo, only the temporary directory [cyan]{DEMO_TEMP_DIR}[/cyan] was allowed via environment variable.[/dim]")
681 |
682 | # --- Try to Access Standard System Root (should fail) ---
683 | # Choose a path guaranteed outside the temp allowed dir
684 | outside_path_root = "/" if platform.system() != "Windows" else "C:\\"
685 | console.print(f"\nAttempting operation outside allowed directory: [red]Listing '{outside_path_root}'[/red]")
686 | await safe_tool_call(list_directory, {"path": outside_path_root}, description=f"Attempting to list root directory '{outside_path_root}' (should fail)")
687 |
688 | # --- Try to Access Specific Sensitive File (should fail) ---
689 | outside_path_file = "/etc/passwd" if platform.system() != "Windows" else "C:\\Windows\\System32\\drivers\\etc\\hosts"
690 | console.print(f"\nAttempting operation outside allowed directory: [red]Reading '{outside_path_file}'[/red]")
691 | await safe_tool_call(read_file, {"path": outside_path_file}, description=f"Attempting to read sensitive file '{outside_path_file}' (should fail)")
692 |
693 | # --- Try to use '..' to escape (should fail due to normalization) ---
694 | escape_path = str(DEMO_ROOT / ".." / "..") # Attempt to go above the allowed temp dir
695 | # Note: validate_path normalizes this, so it might resolve to something unexpected but still potentially outside
696 | # Or, more likely, the normalized path check against allowed dirs will fail.
697 | console.print(f"\nAttempting operation using '..' to potentially escape: [red]Listing '{escape_path}'[/red]")
698 | await safe_tool_call(list_directory, {"path": escape_path}, description=f"Attempting to list path using '..' ('{escape_path}')")
699 |
700 | console.print(Panel(
701 | "Security checks demonstrated:\n"
702 | "1. Operations are confined to the `allowed_directories`.\n"
703 | "2. Accessing paths outside these directories is denied.\n"
704 | "3. Path normalization prevents trivial directory traversal escapes (`..`).\n"
705 | "4. Symlink targets are also validated against `allowed_directories` (implicitly tested via symlink operations).\n"
706 | "5. Deletion protection provides a safety net against accidental bulk deletions (demonstrated earlier).",
707 | title="Security Summary", border_style="green", expand=False
708 | ))
709 |
710 |
711 | async def main():
712 | """Run the filesystem operations demonstration."""
713 | global DEMO_TEMP_DIR # Make sure main knows about this path
714 | symlink_path = None
715 | exit_code = 0
716 |
717 | # Parse command line arguments
718 | args = parse_arguments()
719 |
720 | try:
721 | console.print(Rule("[bold blue]Secure Filesystem Operations Demo[/bold blue]", style="white"))
722 | logger.info("Starting filesystem operations demonstration", emoji_key="start")
723 |
724 | # --- Verify Config Loading ---
725 | print(Rule("Verifying Configuration", style="dim"))
726 | config_valid = verify_config()
727 | if not config_valid:
728 | # Abort with a clear message if config verification fails
729 | console.print("[bold red]Error:[/bold red] Configuration verification failed. Aborting demonstration.", style="red")
730 | return 1 # Exit early if config is wrong
731 |
732 | # --- Verify Config Loading ---
733 | try:
734 | current_config = get_config()
735 | fs_config = current_config.filesystem
736 | loaded_allowed_dirs = fs_config.allowed_directories
737 | console.print(f"[dim]Config Check: Loaded allowed dirs: {loaded_allowed_dirs}[/dim]")
738 | if not loaded_allowed_dirs or DEMO_TEMP_DIR not in loaded_allowed_dirs:
739 | console.print("[bold red]Error:[/bold red] Demo temporary directory not found in loaded allowed directories. Configuration failed.", style="red")
740 | console.print(f"[dim]Expected: {DEMO_TEMP_DIR}[/dim]")
741 | console.print(f"[dim]Loaded Config: {current_config.model_dump()}") # Dump entire config
742 | return 1 # Exit early if config is wrong
743 | except Exception as config_err:
744 | console.print(f"[bold red]Error checking loaded configuration:[/bold red] {config_err}", style="red")
745 | console.print_exception(show_locals=False)
746 | return 1
747 | # --- End Verify Config Loading ---
748 |
749 |
750 | # Display available options if running all demos
751 | if args.demo == 'all':
752 | console.print(Panel(
753 | "This demo includes multiple sections showcasing different filesystem operations.\n"
754 | "You can run individual sections using the following commands:\n\n"
755 | "[yellow]python examples/filesystem_operations_demo.py read[/yellow] - File reading operations\n"
756 | "[yellow]python examples/filesystem_operations_demo.py write[/yellow] - File writing and editing operations\n"
757 | "[yellow]python examples/filesystem_operations_demo.py directory[/yellow] - Directory operations\n"
758 | "[yellow]python examples/filesystem_operations_demo.py move_delete[/yellow] - Move, delete, search & info operations\n"
759 | "[yellow]python examples/filesystem_operations_demo.py security[/yellow] - Security features demo\n\n"
760 | "Add [yellow]--rich-tree[/yellow] for enhanced directory visualization!",
761 | title="Demo Options",
762 | border_style="cyan",
763 | expand=False
764 | ))
765 |
766 | # Display info message
767 | console.print(Panel(
768 | "This demo showcases the secure asynchronous filesystem tools.\n"
769 | f"A temporary directory ([cyan]{DEMO_TEMP_DIR}[/cyan]) has been created and configured as the ONLY allowed directory for this demo's operations via environment variables.",
770 | title="About This Demo",
771 | border_style="cyan"
772 | ))
773 |
774 | # Set up the demo environment *inside* the allowed temp dir
775 | symlink_path = await setup_demo_environment()
776 |
777 | # Run the selected demonstration(s)
778 | if args.demo == 'all' or args.demo == 'read':
779 | await demonstrate_file_reading(symlink_path)
780 | console.print() # Add newline
781 |
782 | if args.demo == 'all' or args.demo == 'write':
783 | await demonstrate_file_writing_editing()
784 | console.print() # Add newline
785 |
786 | if args.demo == 'all' or args.demo == 'directory':
787 | await demonstrate_directory_operations(symlink_path, use_rich_tree=args.rich_tree)
788 | console.print() # Add newline
789 |
790 | if args.demo == 'all' or args.demo == 'move_delete':
791 | await demonstrate_move_delete_search(symlink_path)
792 | console.print() # Add newline
793 |
794 | if args.demo == 'all' or args.demo == 'security':
795 | await demonstrate_security_features()
796 |
797 | logger.success(f"Filesystem Operations Demo(s) completed: {args.demo}", emoji_key="complete")
798 | console.print(Rule("[bold green]Demo Complete[/bold green]", style="green"))
799 |
800 | except Exception as e:
801 | logger.critical(f"Demo crashed unexpectedly: {str(e)}", emoji_key="critical", exc_info=True)
802 | console.print(f"\n[bold red]CRITICAL ERROR:[/bold red] {escape(str(e))}")
803 | console.print_exception(show_locals=False)
804 | exit_code = 1
805 |
806 | finally:
807 | # Clean up the demo environment
808 | console.print(Rule("Cleanup", style="dim"))
809 | await cleanup_demo_environment()
810 |
811 | return exit_code
812 |
813 | def get_config_local(): # Renamed
814 | """Get application configuration."""
815 | return {
816 | "debug": True,
817 | "log_level": "INFO",
818 | "max_connections": 10
819 | }
820 |
821 | if __name__ == "__main__":
822 | # Basic check for asyncio policy on Windows if needed
823 | # if sys.platform == "win32" and sys.version_info >= (3, 8):
824 | # asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
825 |
826 | # Run the demo
827 | final_exit_code = asyncio.run(main())
828 | sys.exit(final_exit_code)
```