This is page 36 of 45. Use http://codebase.md/dicklesworthstone/llm_gateway_mcp_server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .env.example
├── .envrc
├── .gitignore
├── additional_features.md
├── check_api_keys.py
├── completion_support.py
├── comprehensive_test.py
├── docker-compose.yml
├── Dockerfile
├── empirically_measured_model_speeds.json
├── error_handling.py
├── example_structured_tool.py
├── examples
│ ├── __init__.py
│ ├── advanced_agent_flows_using_unified_memory_system_demo.py
│ ├── advanced_extraction_demo.py
│ ├── advanced_unified_memory_system_demo.py
│ ├── advanced_vector_search_demo.py
│ ├── analytics_reporting_demo.py
│ ├── audio_transcription_demo.py
│ ├── basic_completion_demo.py
│ ├── cache_demo.py
│ ├── claude_integration_demo.py
│ ├── compare_synthesize_demo.py
│ ├── cost_optimization.py
│ ├── data
│ │ ├── sample_event.txt
│ │ ├── Steve_Jobs_Introducing_The_iPhone_compressed.md
│ │ └── Steve_Jobs_Introducing_The_iPhone_compressed.mp3
│ ├── docstring_refiner_demo.py
│ ├── document_conversion_and_processing_demo.py
│ ├── entity_relation_graph_demo.py
│ ├── filesystem_operations_demo.py
│ ├── grok_integration_demo.py
│ ├── local_text_tools_demo.py
│ ├── marqo_fused_search_demo.py
│ ├── measure_model_speeds.py
│ ├── meta_api_demo.py
│ ├── multi_provider_demo.py
│ ├── ollama_integration_demo.py
│ ├── prompt_templates_demo.py
│ ├── python_sandbox_demo.py
│ ├── rag_example.py
│ ├── research_workflow_demo.py
│ ├── sample
│ │ ├── article.txt
│ │ ├── backprop_paper.pdf
│ │ ├── buffett.pdf
│ │ ├── contract_link.txt
│ │ ├── legal_contract.txt
│ │ ├── medical_case.txt
│ │ ├── northwind.db
│ │ ├── research_paper.txt
│ │ ├── sample_data.json
│ │ └── text_classification_samples
│ │ ├── email_classification.txt
│ │ ├── news_samples.txt
│ │ ├── product_reviews.txt
│ │ └── support_tickets.txt
│ ├── sample_docs
│ │ └── downloaded
│ │ └── attention_is_all_you_need.pdf
│ ├── sentiment_analysis_demo.py
│ ├── simple_completion_demo.py
│ ├── single_shot_synthesis_demo.py
│ ├── smart_browser_demo.py
│ ├── sql_database_demo.py
│ ├── sse_client_demo.py
│ ├── test_code_extraction.py
│ ├── test_content_detection.py
│ ├── test_ollama.py
│ ├── text_classification_demo.py
│ ├── text_redline_demo.py
│ ├── tool_composition_examples.py
│ ├── tournament_code_demo.py
│ ├── tournament_text_demo.py
│ ├── unified_memory_system_demo.py
│ ├── vector_search_demo.py
│ ├── web_automation_instruction_packs.py
│ └── workflow_delegation_demo.py
├── LICENSE
├── list_models.py
├── marqo_index_config.json.example
├── mcp_protocol_schema_2025-03-25_version.json
├── mcp_python_lib_docs.md
├── mcp_tool_context_estimator.py
├── model_preferences.py
├── pyproject.toml
├── quick_test.py
├── README.md
├── resource_annotations.py
├── run_all_demo_scripts_and_check_for_errors.py
├── storage
│ └── smart_browser_internal
│ ├── locator_cache.db
│ ├── readability.js
│ └── storage_state.enc
├── test_client.py
├── test_connection.py
├── TEST_README.md
├── test_sse_client.py
├── test_stdio_client.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ └── test_server.py
│ ├── manual
│ │ ├── test_extraction_advanced.py
│ │ └── test_extraction.py
│ └── unit
│ ├── __init__.py
│ ├── test_cache.py
│ ├── test_providers.py
│ └── test_tools.py
├── TODO.md
├── tool_annotations.py
├── tools_list.json
├── ultimate_mcp_banner.webp
├── ultimate_mcp_logo.webp
├── ultimate_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── commands.py
│ │ ├── helpers.py
│ │ └── typer_cli.py
│ ├── clients
│ │ ├── __init__.py
│ │ ├── completion_client.py
│ │ └── rag_client.py
│ ├── config
│ │ └── examples
│ │ └── filesystem_config.yaml
│ ├── config.py
│ ├── constants.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── evaluation
│ │ │ ├── base.py
│ │ │ └── evaluators.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── anthropic.py
│ │ │ ├── base.py
│ │ │ ├── deepseek.py
│ │ │ ├── gemini.py
│ │ │ ├── grok.py
│ │ │ ├── ollama.py
│ │ │ ├── openai.py
│ │ │ └── openrouter.py
│ │ ├── server.py
│ │ ├── state_store.py
│ │ ├── tournaments
│ │ │ ├── manager.py
│ │ │ ├── tasks.py
│ │ │ └── utils.py
│ │ └── ums_api
│ │ ├── __init__.py
│ │ ├── ums_database.py
│ │ ├── ums_endpoints.py
│ │ ├── ums_models.py
│ │ └── ums_services.py
│ ├── exceptions.py
│ ├── graceful_shutdown.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── analytics
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ └── reporting.py
│ │ ├── cache
│ │ │ ├── __init__.py
│ │ │ ├── cache_service.py
│ │ │ ├── persistence.py
│ │ │ ├── strategies.py
│ │ │ └── utils.py
│ │ ├── cache.py
│ │ ├── document.py
│ │ ├── knowledge_base
│ │ │ ├── __init__.py
│ │ │ ├── feedback.py
│ │ │ ├── manager.py
│ │ │ ├── rag_engine.py
│ │ │ ├── retriever.py
│ │ │ └── utils.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── repository.py
│ │ │ └── templates.py
│ │ ├── prompts.py
│ │ └── vector
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ └── vector_service.py
│ ├── tool_token_counter.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── audio_transcription.py
│ │ ├── base.py
│ │ ├── completion.py
│ │ ├── docstring_refiner.py
│ │ ├── document_conversion_and_processing.py
│ │ ├── enhanced-ums-lookbook.html
│ │ ├── entity_relation_graph.py
│ │ ├── excel_spreadsheet_automation.py
│ │ ├── extraction.py
│ │ ├── filesystem.py
│ │ ├── html_to_markdown.py
│ │ ├── local_text_tools.py
│ │ ├── marqo_fused_search.py
│ │ ├── meta_api_tool.py
│ │ ├── ocr_tools.py
│ │ ├── optimization.py
│ │ ├── provider.py
│ │ ├── pyodide_boot_template.html
│ │ ├── python_sandbox.py
│ │ ├── rag.py
│ │ ├── redline-compiled.css
│ │ ├── sentiment_analysis.py
│ │ ├── single_shot_synthesis.py
│ │ ├── smart_browser.py
│ │ ├── sql_databases.py
│ │ ├── text_classification.py
│ │ ├── text_redline_tools.py
│ │ ├── tournament.py
│ │ ├── ums_explorer.html
│ │ └── unified_memory_system.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── async_utils.py
│ │ ├── display.py
│ │ ├── logging
│ │ │ ├── __init__.py
│ │ │ ├── console.py
│ │ │ ├── emojis.py
│ │ │ ├── formatter.py
│ │ │ ├── logger.py
│ │ │ ├── panels.py
│ │ │ ├── progress.py
│ │ │ └── themes.py
│ │ ├── parse_yaml.py
│ │ ├── parsing.py
│ │ ├── security.py
│ │ └── text.py
│ └── working_memory_api.py
├── unified_memory_system_technical_analysis.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/ultimate_mcp_server/tools/excel_spreadsheet_automation.py:
--------------------------------------------------------------------------------
```python
1 | """Excel Spreadsheet Automation Tools for Ultimate MCP Server.
2 |
3 | This module provides powerful, flexible tools for AI agents to automate Excel workflows through
4 | the Model Context Protocol (MCP). These tools leverage the intelligence of the Large Language Model
5 | while providing deep integration with Microsoft Excel on Windows.
6 |
7 | The core philosophy is minimalist but powerful - a few highly flexible functions that can be composed
8 | to perform complex operations, with the LLM (Claude) providing the intelligence to drive these tools.
9 |
10 | Key capabilities:
11 | - Direct Excel manipulation (create, modify, analyze spreadsheets)
12 | - Learning from exemplar templates and applying patterns to new contexts
13 | - Formula debugging and optimization
14 | - Rich automated formatting and visualization
15 | - VBA generation and execution
16 |
17 | Windows-specific: Uses COM automation with win32com and requires Excel to be installed.
18 |
19 | Example usage:
20 | ```python
21 | # Execute Excel operations with natural language instructions
22 | result = await client.tools.excel_execute(
23 | instruction="Create a new workbook with two sheets: 'Revenue' and 'Expenses'. "
24 | "In the Revenue sheet, create a quarterly forecast for 2025 with "
25 | "monthly growth of 5%. Include columns for Product A and Product B "
26 | "with initial values of $10,000 and $5,000. Format as a professional "
27 | "financial table with totals and proper currency formatting.",
28 | file_path="financial_forecast.xlsx",
29 | operation_type="create"
30 | )
31 |
32 | # Learn from an exemplar template and adapt it to a new context
33 | result = await client.tools.excel_learn_and_apply(
34 | exemplar_path="templates/financial_model.xlsx",
35 | output_path="healthcare_startup.xlsx",
36 | adaptation_context="Create a 3-year financial model for a healthcare SaaS startup "
37 | "with subscription revenue model. Include revenue forecast, expense "
38 | "projections, cash flow, and key metrics for investors. Adapt all "
39 | "growth rates and assumptions for the healthcare tech market."
40 | )
41 |
42 | # Debug and optimize complex formulas
43 | result = await client.tools.excel_analyze_formulas(
44 | file_path="complex_model.xlsx",
45 | sheet_name="Valuation",
46 | cell_range="D15:G25",
47 | analysis_type="optimize",
48 | detail_level="detailed"
49 | )
50 | ```
51 | """
52 | import asyncio
53 | import os
54 | import re
55 | import time
56 | from contextlib import asynccontextmanager
57 | from typing import Any, Dict, List, Optional
58 |
59 | # Try to import Windows-specific libraries
60 | try:
61 | import pythoncom # type: ignore
62 | import win32com.client # type: ignore
63 | import win32com.client.gencache # type: ignore
64 | from win32com.client import constants as win32c # type: ignore
65 | WINDOWS_EXCEL_AVAILABLE = True
66 | except ImportError:
67 | WINDOWS_EXCEL_AVAILABLE = False
68 |
69 | from ultimate_mcp_server.exceptions import ToolError, ToolInputError
70 | from ultimate_mcp_server.tools.base import (
71 | BaseTool,
72 | with_error_handling,
73 | with_state_management,
74 | with_tool_metrics,
75 | )
76 | from ultimate_mcp_server.tools.filesystem import (
77 | create_directory,
78 | get_allowed_directories,
79 | read_file_content,
80 | validate_path,
81 | write_file_content,
82 | )
83 | from ultimate_mcp_server.utils import get_logger
84 |
85 | logger = get_logger("ultimate_mcp_server.tools.excel_spreadsheet_automation")
86 |
87 | class ExcelSession:
88 | """Manages a single Excel Application session with enhanced reliability and safety."""
89 |
90 | def __init__(self, visible=False):
91 | """Initialize a new Excel session.
92 |
93 | Args:
94 | visible: Whether Excel should be visible on screen
95 | """
96 | if not WINDOWS_EXCEL_AVAILABLE:
97 | raise ToolError("Excel automation requires Windows with Excel installed")
98 |
99 | # Initialize COM in this thread
100 | pythoncom.CoInitialize()
101 |
102 | self.app = None
103 | self.workbooks = {}
104 | self.visible = visible
105 | self.status = "initializing"
106 |
107 | try:
108 | self.app = win32com.client.Dispatch("Excel.Application")
109 | self.app.Visible = visible
110 | self.app.DisplayAlerts = False
111 | self.app.ScreenUpdating = False
112 | self.app_version = self.app.Version
113 | self.status = "ready"
114 | except Exception as e:
115 | self.status = "error"
116 | raise ToolError(f"Failed to create Excel instance: {str(e)}") from e
117 |
118 | def open_workbook(self, path, read_only=False):
119 | """Open an Excel workbook.
120 |
121 | Args:
122 | path: Path to the workbook file
123 | read_only: Whether to open in read-only mode
124 |
125 | Returns:
126 | Workbook COM object
127 | """
128 | try:
129 | # Use the path as is, validation should happen at the async layer
130 | # that calls this sync method. The path should already be validated.
131 | abs_path = os.path.abspath(path)
132 | wb = self.app.Workbooks.Open(abs_path, ReadOnly=read_only)
133 | self.workbooks[wb.Name] = wb
134 | return wb
135 | except Exception as e:
136 | raise ToolError(f"Failed to open workbook at {path}: {str(e)}") from e
137 |
138 | def create_workbook(self):
139 | """Create a new Excel workbook.
140 |
141 | Returns:
142 | Workbook COM object
143 | """
144 | try:
145 | wb = self.app.Workbooks.Add()
146 | self.workbooks[wb.Name] = wb
147 | return wb
148 | except Exception as e:
149 | raise ToolError(f"Failed to create new workbook: {str(e)}") from e
150 |
151 | def save_workbook(self, workbook, path):
152 | """Save a workbook to a specified path.
153 |
154 | Args:
155 | workbook: Workbook COM object
156 | path: Path to save the workbook
157 | """
158 | try:
159 | # Note: Directory creation should happen at the async layer before calling this sync method
160 | # Path validation should also happen at the async layer
161 | workbook.SaveAs(os.path.abspath(path))
162 | return True
163 | except Exception as e:
164 | raise ToolError(f"Failed to save workbook to {path}: {str(e)}") from e
165 |
166 | def close_workbook(self, workbook, save_changes=False):
167 | """Close a workbook.
168 |
169 | Args:
170 | workbook: Workbook COM object
171 | save_changes: Whether to save changes before closing
172 | """
173 | try:
174 | workbook.Close(SaveChanges=save_changes)
175 | if workbook.Name in self.workbooks:
176 | del self.workbooks[workbook.Name]
177 | except Exception as e:
178 | logger.warning(f"Error closing workbook: {str(e)}")
179 |
180 | def close(self):
181 | """Close the Excel application and release resources."""
182 | if not self.app:
183 | return
184 |
185 | try:
186 | # Close all workbooks
187 | for wb_name in list(self.workbooks.keys()):
188 | try:
189 | self.close_workbook(self.workbooks[wb_name], False)
190 | except Exception:
191 | pass
192 |
193 | # Quit Excel
194 | try:
195 | self.app.DisplayAlerts = False
196 | self.app.ScreenUpdating = True
197 | self.app.Quit()
198 | except Exception:
199 | pass
200 |
201 | # Release COM references
202 | del self.app
203 | self.app = None
204 |
205 | # Uninitialize COM
206 | pythoncom.CoUninitialize()
207 |
208 | self.status = "closed"
209 | except Exception as e:
210 | self.status = "error_closing"
211 | logger.error(f"Error closing Excel session: {e}")
212 |
213 | def __enter__(self):
214 | """Context manager entry."""
215 | return self
216 |
217 | def __exit__(self, exc_type, exc_val, exc_tb):
218 | """Context manager exit."""
219 | self.close()
220 |
221 | @asynccontextmanager
222 | async def get_excel_session(visible=False):
223 | """Async context manager for getting an Excel session.
224 |
225 | Args:
226 | visible: Whether Excel should be visible
227 |
228 | Yields:
229 | ExcelSession: An Excel session
230 | """
231 | session = None
232 | try:
233 | # Create the Excel session in a thread pool to avoid blocking
234 | session = await asyncio.to_thread(ExcelSession, visible=visible)
235 | yield session
236 | finally:
237 | # Cleanup in a thread pool as well
238 | if session:
239 | await asyncio.to_thread(session.close)
240 |
241 | class ExcelSpreadsheetTools(BaseTool):
242 | """Tool for automating Excel spreadsheet operations."""
243 |
244 | tool_name = "excel_spreadsheet_tools"
245 | description = "Tool for automating Excel spreadsheet operations."
246 |
247 | def __init__(self, mcp_server):
248 | """Initialize Excel Spreadsheet Tools.
249 |
250 | Args:
251 | mcp_server: MCP server instance
252 | """
253 | super().__init__(mcp_server)
254 |
255 | # Inform if Excel is not available
256 | if not WINDOWS_EXCEL_AVAILABLE:
257 | raise ToolError("Excel automation requires Windows with Excel installed")
258 |
259 | @with_tool_metrics
260 | @with_error_handling
261 | @with_state_management("excel_tools")
262 | async def excel_execute(
263 | self,
264 | instruction: str,
265 | file_path: Optional[str] = None,
266 | operation_type: str = "create",
267 | template_path: Optional[str] = None,
268 | parameters: Optional[Dict[str, Any]] = None,
269 | show_excel: bool = False,
270 | get_state=None,
271 | set_state=None,
272 | delete_state=None,
273 | ctx=None
274 | ) -> Dict[str, Any]:
275 | """Execute Excel operations based on natural language instructions.
276 |
277 | This is the primary function for manipulating Excel files. It can create new files,
278 | modify existing ones, and perform various operations based on natural language instructions.
279 | The intelligence for interpreting these instructions comes from the LLM (Claude),
280 | which generates the appropriate parameters and logic.
281 |
282 | Args:
283 | instruction: Natural language instruction describing what to do
284 | file_path: Path to save or modify an Excel file
285 | operation_type: Type of operation (create, modify, analyze, format, etc.)
286 | template_path: Optional path to a template file to use as a starting point
287 | parameters: Optional structured parameters to supplement the instruction
288 | show_excel: Whether to make Excel visible during execution
289 | get_state: Function to get state (injected by with_state_management)
290 | set_state: Function to set state (injected by with_state_management)
291 | delete_state: Function to delete state (injected by with_state_management)
292 | ctx: Context object (injected by with_state_management)
293 |
294 | Returns:
295 | Dictionary with operation results and metadata
296 | """
297 | start_time = time.time()
298 |
299 | # Basic validation
300 | if not instruction:
301 | raise ToolInputError("instruction cannot be empty")
302 |
303 | if operation_type == "create" and not file_path:
304 | raise ToolInputError("file_path is required for 'create' operations")
305 |
306 | if operation_type in ["modify", "analyze", "format"] and (not file_path or not os.path.exists(file_path)):
307 | raise ToolInputError(f"Valid existing file_path is required for '{operation_type}' operations")
308 |
309 | # Use parameters if provided, otherwise empty dict
310 | parameters = parameters or {}
311 |
312 | # Process template path if provided
313 | if template_path and not os.path.exists(template_path):
314 | raise ToolInputError(f"Template file not found at {template_path}")
315 |
316 | # Execute the requested operation
317 | try:
318 | # Create or retrieve the Excel session from state
319 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
320 |
321 | result = await self._execute_excel_operation(
322 | session=session,
323 | instruction=instruction,
324 | operation_type=operation_type,
325 | file_path=file_path,
326 | template_path=template_path,
327 | parameters=parameters
328 | )
329 |
330 | processing_time = time.time() - start_time
331 | result["processing_time"] = processing_time
332 |
333 | logger.info(
334 | f"Excel operation '{operation_type}' completed in {processing_time:.2f}s",
335 | emoji_key="success"
336 | )
337 |
338 | return result
339 |
340 | except Exception as e:
341 | logger.error(
342 | f"Error executing Excel operation: {str(e)}",
343 | emoji_key="error",
344 | exc_info=True
345 | )
346 | # Try to clean up session on error
347 | await self._cleanup_excel_session(delete_state)
348 | raise ToolError(
349 | f"Failed to execute Excel operation: {str(e)}",
350 | details={"operation_type": operation_type, "file_path": file_path}
351 | ) from e
352 |
353 | @with_tool_metrics
354 | @with_error_handling
355 | @with_state_management("excel_tools")
356 | async def excel_learn_and_apply(
357 | self,
358 | exemplar_path: str,
359 | output_path: str,
360 | adaptation_context: str,
361 | parameters: Optional[Dict[str, Any]] = None,
362 | show_excel: bool = False,
363 | get_state=None,
364 | set_state=None,
365 | delete_state=None,
366 | ctx=None
367 | ) -> Dict[str, Any]:
368 | """Learn from an exemplar Excel template and apply it to a new context.
369 |
370 | This powerful function allows Claude to analyze an existing Excel model or template,
371 | understand its structure and formulas, and then create a new file adapted to a different
372 | context while preserving the intelligence embedded in the original.
373 |
374 | Args:
375 | exemplar_path: Path to the Excel file to learn from
376 | output_path: Path where the new adapted file should be saved
377 | adaptation_context: Natural language description of how to adapt the template
378 | parameters: Optional structured parameters with specific adaptation instructions
379 | show_excel: Whether to make Excel visible during processing
380 | get_state: Function to get state (injected by with_state_management)
381 | set_state: Function to set state (injected by with_state_management)
382 | delete_state: Function to delete state (injected by with_state_management)
383 | ctx: Context object (injected by with_state_management)
384 |
385 | Returns:
386 | Dictionary with operation results and adaptations made
387 | """
388 | start_time = time.time()
389 |
390 | # Validate paths
391 | try:
392 | validated_exemplar_path = await validate_path(exemplar_path, check_exists=True)
393 | validated_output_path = await validate_path(output_path, check_exists=False, check_parent_writable=True)
394 |
395 | # Ensure parent directory for output exists
396 | parent_dir = os.path.dirname(validated_output_path)
397 | if parent_dir:
398 | await create_directory(parent_dir)
399 | except ToolInputError:
400 | raise
401 | except Exception as e:
402 | raise ToolInputError(f"Path validation error: {str(e)}") from e
403 |
404 | if not adaptation_context:
405 | raise ToolInputError("adaptation_context cannot be empty")
406 |
407 | # Use parameters if provided, otherwise empty dict
408 | parameters = parameters or {}
409 |
410 | # Execute the template learning and application
411 | try:
412 | # Create or retrieve the Excel session from state
413 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
414 |
415 | # First, learn the template structure
416 | template_analysis = await self._analyze_excel_template( # noqa: F841
417 | session=session,
418 | exemplar_path=validated_exemplar_path,
419 | parameters=parameters
420 | )
421 |
422 | # Apply the learned template to the new context
423 | result = await self._apply_excel_template(
424 | session=session,
425 | exemplar_path=validated_exemplar_path,
426 | output_path=validated_output_path,
427 | data={"mappings": [], "adaptation_context": adaptation_context},
428 | parameters=parameters
429 | )
430 |
431 | processing_time = time.time() - start_time
432 | result["processing_time"] = processing_time
433 |
434 | logger.info(
435 | f"Excel template learning and application completed in {processing_time:.2f}s",
436 | emoji_key="success"
437 | )
438 |
439 | return result
440 |
441 | except Exception as e:
442 | logger.error(
443 | f"Error in template learning and application: {str(e)}",
444 | emoji_key="error",
445 | exc_info=True
446 | )
447 | # Try to clean up session on error
448 | await self._cleanup_excel_session(delete_state)
449 | raise ToolError(
450 | f"Failed to learn and apply template: {str(e)}",
451 | details={"exemplar_path": exemplar_path, "output_path": output_path}
452 | ) from e
453 |
454 | @with_tool_metrics
455 | @with_error_handling
456 | @with_state_management("excel_tools")
457 | async def excel_analyze_formulas(
458 | self,
459 | file_path: str,
460 | sheet_name: Optional[str] = None,
461 | cell_range: Optional[str] = None,
462 | analysis_type: str = "analyze",
463 | detail_level: str = "standard",
464 | show_excel: bool = False,
465 | get_state=None,
466 | set_state=None,
467 | delete_state=None,
468 | ctx=None
469 | ) -> Dict[str, Any]:
470 | """Analyze, debug, and optimize Excel formulas.
471 |
472 | This function provides deep insights into Excel formulas, identifying errors,
473 | suggesting optimizations, and explaining complex calculations in natural language.
474 |
475 | Args:
476 | file_path: Path to the Excel file to analyze
477 | sheet_name: Name of the sheet to analyze (if None, active sheet is used)
478 | cell_range: Cell range to analyze (if None, all formulas are analyzed)
479 | analysis_type: Type of analysis (analyze, debug, optimize, explain)
480 | detail_level: Level of detail in the analysis (basic, standard, detailed)
481 | show_excel: Whether to make Excel visible during analysis
482 | get_state: Function to get state (injected by with_state_management)
483 | set_state: Function to set state (injected by with_state_management)
484 | delete_state: Function to delete state (injected by with_state_management)
485 | ctx: Context object (injected by with_state_management)
486 |
487 | Returns:
488 | Dictionary with analysis results, issues found, and suggestions
489 | """
490 | start_time = time.time()
491 |
492 | # Validate the file path
493 | try:
494 | validated_file_path = await validate_path(file_path, check_exists=True)
495 | except ToolInputError:
496 | raise
497 | except Exception as e:
498 | raise ToolInputError(f"Invalid file path: {str(e)}", param_name="file_path", provided_value=file_path) from e
499 |
500 | # Execute the formula analysis
501 | try:
502 | # Create or retrieve the Excel session from state
503 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
504 |
505 | result = await self._analyze_excel_formulas(
506 | session=session,
507 | file_path=validated_file_path,
508 | sheet_name=sheet_name,
509 | cell_range=cell_range,
510 | analysis_type=analysis_type,
511 | detail_level=detail_level
512 | )
513 |
514 | processing_time = time.time() - start_time
515 | result["processing_time"] = processing_time
516 |
517 | logger.info(
518 | f"Excel formula analysis completed in {processing_time:.2f}s",
519 | emoji_key="success"
520 | )
521 |
522 | return result
523 |
524 | except Exception as e:
525 | logger.error(
526 | f"Error analyzing Excel formulas: {str(e)}",
527 | emoji_key="error",
528 | exc_info=True
529 | )
530 | # Try to clean up session on error
531 | await self._cleanup_excel_session(delete_state)
532 | raise ToolError(
533 | f"Failed to analyze Excel formulas: {str(e)}",
534 | details={"file_path": file_path, "sheet_name": sheet_name, "cell_range": cell_range}
535 | ) from e
536 |
537 | @with_tool_metrics
538 | @with_error_handling
539 | @with_state_management("excel_tools")
540 | async def excel_generate_macro(
541 | self,
542 | instruction: str,
543 | file_path: Optional[str] = None,
544 | template: Optional[str] = None,
545 | test_execution: bool = False,
546 | security_level: str = "standard",
547 | show_excel: bool = False,
548 | get_state=None,
549 | set_state=None,
550 | delete_state=None,
551 | ctx=None
552 | ) -> Dict[str, Any]:
553 | """Generate and optionally execute Excel VBA macros based on natural language instructions.
554 |
555 | This function leverages Claude's capability to generate Excel VBA code for automating
556 | complex tasks within Excel. It can create new macros or modify existing ones.
557 |
558 | Args:
559 | instruction: Natural language description of what the macro should do
560 | file_path: Path to the Excel file where the macro should be added
561 | template: Optional template or skeleton code to use as a starting point
562 | test_execution: Whether to test execute the generated macro
563 | security_level: Security restrictions for macro execution (standard, restricted, permissive)
564 | show_excel: Whether to make Excel visible during processing
565 | get_state: Function to get state (injected by with_state_management)
566 | set_state: Function to set state (injected by with_state_management)
567 | delete_state: Function to delete state (injected by with_state_management)
568 | ctx: Context object (injected by with_state_management)
569 |
570 | Returns:
571 | Dictionary with the generated macro code and execution results if applicable
572 | """
573 | start_time = time.time()
574 |
575 | # Basic validation
576 | if not instruction:
577 | raise ToolInputError("instruction cannot be empty")
578 |
579 | if file_path and file_path.endswith(".xlsx"):
580 | # Convert to .xlsm for macro support if needed
581 | file_path = file_path.replace(".xlsx", ".xlsm")
582 | logger.info(f"Changed file extension to .xlsm for macro support: {file_path}")
583 |
584 | # Execute the macro generation
585 | try:
586 | # Create or retrieve the Excel session from state
587 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
588 |
589 | result = await self._generate_excel_macro(
590 | session=session,
591 | instruction=instruction,
592 | file_path=file_path,
593 | template=template,
594 | test_execution=test_execution,
595 | security_level=security_level
596 | )
597 |
598 | processing_time = time.time() - start_time
599 | result["processing_time"] = processing_time
600 |
601 | logger.info(
602 | f"Excel macro generation completed in {processing_time:.2f}s",
603 | emoji_key="success"
604 | )
605 |
606 | return result
607 |
608 | except Exception as e:
609 | logger.error(
610 | f"Error generating Excel macro: {str(e)}",
611 | emoji_key="error",
612 | exc_info=True
613 | )
614 | # Try to clean up session on error
615 | await self._cleanup_excel_session(delete_state)
616 | raise ToolError(
617 | f"Failed to generate Excel macro: {str(e)}",
618 | details={"file_path": file_path}
619 | ) from e
620 |
621 | @with_tool_metrics
622 | @with_error_handling
623 | @with_state_management("excel_tools")
624 | async def excel_export_sheet_to_csv(
625 | self,
626 | file_path: str,
627 | sheet_name: str,
628 | output_path: Optional[str] = None,
629 | delimiter: str = ",",
630 | show_excel: bool = False,
631 | get_state=None,
632 | set_state=None,
633 | delete_state=None,
634 | ctx=None
635 | ) -> Dict[str, Any]:
636 | """Export an Excel sheet to a CSV file.
637 |
638 | This function allows exporting data from an Excel sheet to a CSV file,
639 | which can be useful for data exchange or further processing.
640 |
641 | Args:
642 | file_path: Path to the Excel file
643 | sheet_name: Name of the sheet to export
644 | output_path: Path where to save the CSV file (default: same as Excel with .csv)
645 | delimiter: Character to use as delimiter (default: comma)
646 | show_excel: Whether to make Excel visible during processing
647 | get_state: Function to get state (injected by with_state_management)
648 | set_state: Function to set state (injected by with_state_management)
649 | delete_state: Function to delete state (injected by with_state_management)
650 | ctx: Context object (injected by with_state_management)
651 |
652 | Returns:
653 | Dictionary with export results
654 | """
655 | start_time = time.time()
656 |
657 | # Validate the file path
658 | try:
659 | # Use our custom validation with get_allowed_directories
660 | validated_file_path = await self._validate_excel_file_path(file_path, check_exists=True)
661 | except ToolInputError:
662 | raise
663 | except Exception as e:
664 | raise ToolInputError(f"Invalid file path: {str(e)}", param_name="file_path", provided_value=file_path) from e
665 |
666 | # Set default output path if not provided
667 | if not output_path:
668 | output_path = os.path.splitext(validated_file_path)[0] + '.csv'
669 | else:
670 | # Validate the output path
671 | try:
672 | temp_validated_path = await validate_path(output_path, check_exists=False, check_parent_writable=True)
673 | output_path = temp_validated_path
674 |
675 | # Ensure parent directory exists
676 | parent_dir = os.path.dirname(temp_validated_path)
677 | if parent_dir:
678 | await create_directory(parent_dir)
679 | except Exception as e:
680 | raise ToolInputError(f"Invalid output path: {str(e)}", param_name="output_path", provided_value=output_path) from e
681 |
682 | # Execute the export operation
683 | try:
684 | # Create or retrieve the Excel session from state
685 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
686 |
687 | # Open the workbook
688 | workbook = session.open_workbook(validated_file_path, read_only=True)
689 |
690 | # Find the worksheet
691 | worksheet = None
692 | for sheet in workbook.Worksheets:
693 | if sheet.Name.lower() == sheet_name.lower():
694 | worksheet = sheet
695 | break
696 |
697 | if not worksheet:
698 | raise ToolInputError(f"Sheet '{sheet_name}' not found in workbook", param_name="sheet_name", provided_value=sheet_name)
699 |
700 | # Get data from the worksheet
701 | used_range = worksheet.UsedRange
702 | row_count = used_range.Rows.Count
703 | col_count = used_range.Columns.Count
704 |
705 | # Extract data
706 | csv_data = []
707 | for row in range(1, row_count + 1):
708 | row_data = []
709 | for col in range(1, col_count + 1):
710 | cell_value = used_range.Cells(row, col).Value
711 | row_data.append(str(cell_value) if cell_value is not None else "")
712 | csv_data.append(row_data)
713 |
714 | # Close the workbook
715 | workbook.Close(SaveChanges=False)
716 |
717 | # Convert data to CSV format
718 | csv_content = ""
719 | for row_data in csv_data:
720 | # Escape any delimiter characters in the data and wrap in quotes if needed
721 | escaped_row = []
722 | for cell in row_data:
723 | if delimiter in cell or '"' in cell or '\n' in cell:
724 | # Replace double quotes with escaped double quotes
725 | escaped_cell = cell.replace('"', '""')
726 | escaped_row.append(f'"{escaped_cell}"')
727 | else:
728 | escaped_row.append(cell)
729 |
730 | csv_content += delimiter.join(escaped_row) + "\n"
731 |
732 | # Write the CSV content to file
733 | await write_file_content(output_path, csv_content)
734 |
735 | processing_time = time.time() - start_time
736 | result = {
737 | "success": True,
738 | "file_path": validated_file_path,
739 | "sheet_name": sheet_name,
740 | "output_path": output_path,
741 | "row_count": row_count,
742 | "column_count": col_count,
743 | "processing_time": processing_time
744 | }
745 |
746 | logger.info(
747 | f"Excel sheet export completed in {processing_time:.2f}s",
748 | emoji_key="success"
749 | )
750 |
751 | return result
752 |
753 | except Exception as e:
754 | logger.error(
755 | f"Error exporting Excel sheet: {str(e)}",
756 | emoji_key="error",
757 | exc_info=True
758 | )
759 | # Try to clean up session on error
760 | await self._cleanup_excel_session(delete_state)
761 | raise ToolError(
762 | f"Failed to export Excel sheet: {str(e)}",
763 | details={"file_path": file_path, "sheet_name": sheet_name}
764 | ) from e
765 |
766 | @with_tool_metrics
767 | @with_error_handling
768 | @with_state_management("excel_tools")
769 | async def excel_import_csv_to_sheet(
770 | self,
771 | file_path: str,
772 | csv_path: str,
773 | sheet_name: Optional[str] = None,
774 | delimiter: str = ",",
775 | start_cell: str = "A1",
776 | create_sheet: bool = False,
777 | show_excel: bool = False,
778 | get_state=None,
779 | set_state=None,
780 | delete_state=None,
781 | ctx=None
782 | ) -> Dict[str, Any]:
783 | """Import CSV data into an Excel sheet.
784 |
785 | This function allows importing data from a CSV file into an Excel workbook,
786 | either into an existing sheet or by creating a new sheet.
787 |
788 | Args:
789 | file_path: Path to the Excel file
790 | csv_path: Path to the CSV file to import
791 | sheet_name: Name of the sheet to import into (if None, uses active sheet)
792 | delimiter: Character used as delimiter in the CSV (default: comma)
793 | start_cell: Cell where to start importing (default: A1)
794 | create_sheet: Whether to create a new sheet if sheet_name doesn't exist
795 | show_excel: Whether to make Excel visible during processing
796 | get_state: Function to get state (injected by with_state_management)
797 | set_state: Function to set state (injected by with_state_management)
798 | delete_state: Function to delete state (injected by with_state_management)
799 | ctx: Context object (injected by with_state_management)
800 |
801 | Returns:
802 | Dictionary with import results
803 | """
804 | start_time = time.time()
805 |
806 | # Validate the Excel file path
807 | try:
808 | validated_file_path = await self._validate_excel_file_path(file_path, check_exists=True)
809 | except ToolInputError:
810 | raise
811 | except Exception as e:
812 | raise ToolInputError(f"Invalid Excel file path: {str(e)}", param_name="file_path", provided_value=file_path) from e
813 |
814 | # Validate the CSV file path
815 | try:
816 | validated_csv_path = await validate_path(csv_path, check_exists=True)
817 | except ToolInputError:
818 | raise
819 | except Exception as e:
820 | raise ToolInputError(f"Invalid CSV file path: {str(e)}", param_name="csv_path", provided_value=csv_path) from e
821 |
822 | # Execute the import operation
823 | try:
824 | # Read the CSV content
825 | csv_content = await read_file_content(validated_csv_path)
826 |
827 | # Parse CSV data
828 | csv_data = []
829 | for line in csv_content.splitlines():
830 | if not line.strip():
831 | continue
832 |
833 | # Handle quoted fields with delimiters inside them
834 | row = []
835 | field = ""
836 | in_quotes = False
837 | i = 0
838 |
839 | while i < len(line):
840 | char = line[i]
841 |
842 | if char == '"' and (i == 0 or line[i-1] != '\\'):
843 | # Toggle quote mode
844 | in_quotes = not in_quotes
845 | # Handle escaped quotes (two double quotes in a row)
846 | if in_quotes is False and i + 1 < len(line) and line[i+1] == '"':
847 | field += '"'
848 | i += 1 # Skip the next quote
849 | elif char == delimiter and not in_quotes:
850 | # End of field
851 | row.append(field)
852 | field = ""
853 | else:
854 | field += char
855 |
856 | i += 1
857 |
858 | # Add the last field
859 | row.append(field)
860 | csv_data.append(row)
861 |
862 | # Create or retrieve the Excel session from state
863 | session = await self._get_or_create_excel_session(show_excel, get_state, set_state)
864 |
865 | # Open the workbook
866 | workbook = session.open_workbook(validated_file_path, read_only=False)
867 |
868 | # Find or create the worksheet
869 | worksheet = None
870 | if sheet_name:
871 | # Try to find the sheet
872 | for sheet in workbook.Worksheets:
873 | if sheet.Name.lower() == sheet_name.lower():
874 | worksheet = sheet
875 | break
876 |
877 | # Create if not found and create_sheet is True
878 | if not worksheet and create_sheet:
879 | worksheet = workbook.Worksheets.Add()
880 | worksheet.Name = sheet_name
881 |
882 | # If no sheet_name specified or sheet not found, use the active sheet
883 | if not worksheet:
884 | if not sheet_name and not create_sheet:
885 | worksheet = workbook.ActiveSheet
886 | elif create_sheet:
887 | worksheet = workbook.Worksheets.Add()
888 | if sheet_name:
889 | worksheet.Name = sheet_name
890 | else:
891 | worksheet.Name = f"CSV_Import_{time.strftime('%Y%m%d')}"
892 |
893 | # Parse start cell
894 | start_cell_obj = worksheet.Range(start_cell)
895 | start_row = start_cell_obj.Row
896 | start_col = start_cell_obj.Column
897 |
898 | # Import the data
899 | for row_idx, row_data in enumerate(csv_data):
900 | for col_idx, cell_value in enumerate(row_data):
901 | worksheet.Cells(start_row + row_idx, start_col + col_idx).Value = cell_value
902 |
903 | # Auto-fit columns for better readability
904 | if csv_data:
905 | start_range = worksheet.Cells(start_row, start_col)
906 | end_range = worksheet.Cells(start_row + len(csv_data) - 1, start_col + len(csv_data[0]) - 1)
907 | data_range = worksheet.Range(start_range, end_range)
908 | data_range.Columns.AutoFit()
909 |
910 | # Save the workbook
911 | session.save_workbook(workbook, validated_file_path)
912 |
913 | # Close the workbook
914 | workbook.Close(SaveChanges=False)
915 |
916 | processing_time = time.time() - start_time
917 | result = {
918 | "success": True,
919 | "file_path": validated_file_path,
920 | "csv_path": validated_csv_path,
921 | "sheet_name": worksheet.Name,
922 | "rows_imported": len(csv_data),
923 | "columns_imported": len(csv_data[0]) if csv_data else 0,
924 | "processing_time": processing_time
925 | }
926 |
927 | logger.info(
928 | f"CSV import completed in {processing_time:.2f}s",
929 | emoji_key="success"
930 | )
931 |
932 | return result
933 |
934 | except Exception as e:
935 | logger.error(
936 | f"Error importing CSV data: {str(e)}",
937 | emoji_key="error",
938 | exc_info=True
939 | )
940 | # Try to clean up session on error
941 | await self._cleanup_excel_session(delete_state)
942 | raise ToolError(
943 | f"Failed to import CSV data: {str(e)}",
944 | details={"file_path": file_path, "csv_path": csv_path}
945 | ) from e
946 |
947 | # --- Excel session management methods ---
948 |
949 | async def _get_or_create_excel_session(self, visible=False, get_state=None, set_state=None):
950 | """Get an existing Excel session from state or create a new one.
951 |
952 | Args:
953 | visible: Whether Excel should be visible
954 | get_state: Function to get state
955 | set_state: Function to set state
956 |
957 | Returns:
958 | ExcelSession: An Excel session
959 | """
960 | # Try to get session from state
961 | session_data = await get_state("excel_session")
962 |
963 | if session_data and getattr(session_data, "status", "") != "closed":
964 | logger.info("Using existing Excel session from state")
965 | return session_data
966 |
967 | # Create a new session if none exists in state
968 | logger.info("Creating new Excel session")
969 | session = await asyncio.to_thread(ExcelSession, visible=visible)
970 |
971 | # Store session in state
972 | await set_state("excel_session", session)
973 |
974 | return session
975 |
976 | async def _cleanup_excel_session(self, delete_state=None):
977 | """Clean up Excel session resources.
978 |
979 | Args:
980 | delete_state: Function to delete state
981 | """
982 | if delete_state:
983 | await delete_state("excel_session")
984 |
985 | async def _validate_excel_file_path(self, file_path: str, check_exists: bool = False) -> str:
986 | """Validate that an Excel file path is in an allowed directory.
987 |
988 | Args:
989 | file_path: Path to validate
990 | check_exists: Whether to check if the file exists
991 |
992 | Returns:
993 | Validated absolute path
994 | """
995 | if not file_path:
996 | raise ToolInputError("File path cannot be empty")
997 |
998 | # Check if file has an Excel extension
999 | if not file_path.lower().endswith(('.xlsx', '.xlsm', '.xls')):
1000 | raise ToolInputError(f"File must have an Excel extension (.xlsx, .xlsm, .xls): {file_path}")
1001 |
1002 | # Get allowed directories for file operations
1003 | allowed_dirs = await get_allowed_directories()
1004 |
1005 | # Check if path is in an allowed directory
1006 | abs_path = os.path.abspath(file_path)
1007 | if not any(abs_path.startswith(os.path.abspath(allowed_dir)) for allowed_dir in allowed_dirs):
1008 | raise ToolInputError(f"File path is outside allowed directories: {file_path}")
1009 |
1010 | # Check existence if required
1011 | if check_exists and not os.path.exists(abs_path):
1012 | raise ToolInputError(f"File does not exist: {file_path}")
1013 |
1014 | return abs_path
1015 |
1016 | # --- Internal implementation methods ---
1017 |
1018 | async def _execute_excel_operation(
1019 | self,
1020 | session: ExcelSession,
1021 | instruction: str,
1022 | operation_type: str,
1023 | file_path: Optional[str] = None,
1024 | template_path: Optional[str] = None,
1025 | parameters: Dict[str, Any] = None
1026 | ) -> Dict[str, Any]:
1027 | """Internal method to execute Excel operations.
1028 |
1029 | This method handles the core Excel manipulation based on operation_type.
1030 |
1031 | Args:
1032 | session: Excel session to use
1033 | instruction: Natural language instruction
1034 | operation_type: Type of operation
1035 | file_path: Path to the Excel file
1036 | template_path: Optional template file path
1037 | parameters: Optional structured parameters
1038 |
1039 | Returns:
1040 | Dictionary with operation results
1041 | """
1042 | # Initialize result structure
1043 | result = {
1044 | "success": True,
1045 | "operation_type": operation_type,
1046 | "file_path": file_path
1047 | }
1048 |
1049 | # Handle different operation types
1050 | if operation_type == "create":
1051 | # Validate file_path and ensure it doesn't exist
1052 | if file_path:
1053 | validated_file_path = await validate_path(file_path, check_exists=False, check_parent_writable=True)
1054 |
1055 | # Ensure parent directory exists
1056 | parent_dir = os.path.dirname(validated_file_path)
1057 | if parent_dir:
1058 | await create_directory(parent_dir)
1059 |
1060 | # Create a new workbook, either from scratch or from a template
1061 | if template_path:
1062 | # Validate template path and ensure it exists
1063 | validated_template_path = await validate_path(template_path, check_exists=True)
1064 |
1065 | # Open the template
1066 | wb = session.open_workbook(validated_template_path, read_only=True)
1067 | # Save as the new file
1068 | session.save_workbook(wb, validated_file_path)
1069 | # Close the template and reopen the new file
1070 | session.close_workbook(wb)
1071 | wb = session.open_workbook(validated_file_path)
1072 | else:
1073 | # Create a new workbook
1074 | wb = session.create_workbook()
1075 | # If file_path is provided, immediately save it
1076 | if file_path:
1077 | session.save_workbook(wb, validated_file_path)
1078 |
1079 | # Apply the instruction to the workbook
1080 | operations_performed = await self._apply_instruction_to_workbook(
1081 | session=session,
1082 | workbook=wb,
1083 | instruction=instruction,
1084 | parameters=parameters
1085 | )
1086 |
1087 | # Save the workbook
1088 | session.save_workbook(wb, validated_file_path)
1089 |
1090 | result["operations_performed"] = operations_performed
1091 | result["file_created"] = validated_file_path
1092 |
1093 | elif operation_type == "modify":
1094 | # Validate file_path and ensure it exists
1095 | validated_file_path = await validate_path(file_path, check_exists=True)
1096 |
1097 | # Open existing workbook for modification
1098 | wb = session.open_workbook(validated_file_path, read_only=False)
1099 |
1100 | # Apply the instruction to the workbook
1101 | operations_performed = await self._apply_instruction_to_workbook(
1102 | session=session,
1103 | workbook=wb,
1104 | instruction=instruction,
1105 | parameters=parameters
1106 | )
1107 |
1108 | # Save the workbook
1109 | session.save_workbook(wb, validated_file_path)
1110 |
1111 | result["operations_performed"] = operations_performed
1112 | result["file_modified"] = validated_file_path
1113 |
1114 | elif operation_type == "analyze":
1115 | # Validate file_path and ensure it exists
1116 | validated_file_path = await validate_path(file_path, check_exists=True)
1117 |
1118 | # Open existing workbook for analysis
1119 | wb = session.open_workbook(validated_file_path, read_only=True)
1120 |
1121 | # Analyze the workbook
1122 | analysis_results = await self._analyze_workbook(
1123 | session=session,
1124 | workbook=wb,
1125 | instruction=instruction,
1126 | parameters=parameters
1127 | )
1128 |
1129 | result["analysis_results"] = analysis_results
1130 |
1131 | elif operation_type == "format":
1132 | # Validate file_path and ensure it exists
1133 | validated_file_path = await validate_path(file_path, check_exists=True)
1134 |
1135 | # Open existing workbook for formatting
1136 | wb = session.open_workbook(validated_file_path, read_only=False)
1137 |
1138 | # Apply formatting to the workbook
1139 | formatting_applied = await self._apply_formatting_to_workbook(
1140 | session=session,
1141 | workbook=wb,
1142 | instruction=instruction,
1143 | parameters=parameters
1144 | )
1145 |
1146 | # Save the workbook
1147 | session.save_workbook(wb, validated_file_path)
1148 |
1149 | result["formatting_applied"] = formatting_applied
1150 |
1151 | else:
1152 | raise ToolInputError(f"Unknown operation_type: {operation_type}")
1153 |
1154 | return result
1155 |
1156 | async def _apply_instruction_to_workbook(
1157 | self,
1158 | session: ExcelSession,
1159 | workbook: Any,
1160 | instruction: str,
1161 | parameters: Dict[str, Any]
1162 | ) -> List[Dict[str, Any]]:
1163 | """Apply natural language instructions to a workbook.
1164 |
1165 | This method interprets the instructions and performs the requested operations.
1166 |
1167 | Args:
1168 | session: Excel session
1169 | workbook: Workbook COM object
1170 | instruction: Natural language instruction
1171 | parameters: Optional structured parameters
1172 |
1173 | Returns:
1174 | List of operations performed
1175 | """
1176 | operations_performed = []
1177 |
1178 | # Default to first worksheet if none exists
1179 | if workbook.Worksheets.Count == 0:
1180 | worksheet = workbook.Worksheets.Add()
1181 | operations_performed.append({
1182 | "operation": "create_worksheet",
1183 | "sheet_name": worksheet.Name
1184 | })
1185 |
1186 | # Process instruction to extract key operations
1187 | instruction_lower = instruction.lower()
1188 |
1189 | # Create sheets if mentioned
1190 | if "sheet" in instruction_lower or "sheets" in instruction_lower:
1191 | # Extract sheet names using regex to find patterns like:
1192 | # - 'sheets: X, Y, Z'
1193 | # - 'sheets named X and Y'
1194 | # - 'create sheets X, Y, Z'
1195 | sheet_patterns = [
1196 | r"sheet(?:s)?\s*(?:named|called|:)?\s*(?:'([^']*)'|\"([^\"]*)\"|([A-Za-z0-9_, ]+))",
1197 | r"create (?:a |)sheet(?:s)? (?:named|called)?\s*(?:'([^']*)'|\"([^\"]*)\"|([A-Za-z0-9_, ]+))"
1198 | ]
1199 |
1200 | sheet_names = []
1201 | for pattern in sheet_patterns:
1202 | matches = re.findall(pattern, instruction_lower)
1203 | if matches:
1204 | for match in matches:
1205 | # Each match is now a tuple with 3 capture groups: (single_quoted, double_quoted, unquoted)
1206 | sheet_name = match[0] or match[1] or match[2]
1207 | if sheet_name:
1208 | # Split by commas and/or 'and', then clean up
1209 | for name in re.split(r',|\s+and\s+', sheet_name):
1210 | clean_name = name.strip("' \"").strip()
1211 | if clean_name:
1212 | sheet_names.append(clean_name)
1213 |
1214 | # Also check explicit parameters if provided
1215 | if parameters and "sheet_names" in parameters:
1216 | sheet_names.extend(parameters["sheet_names"])
1217 |
1218 | # Make sheet names unique
1219 | sheet_names = list(set(sheet_names))
1220 |
1221 | # Create each sheet
1222 | current_sheets = [sheet.Name.lower() for sheet in workbook.Worksheets]
1223 |
1224 | for sheet_name in sheet_names:
1225 | if sheet_name.lower() not in current_sheets:
1226 | new_sheet = workbook.Worksheets.Add(After=workbook.Worksheets(workbook.Worksheets.Count))
1227 | new_sheet.Name = sheet_name
1228 | operations_performed.append({
1229 | "operation": "create_worksheet",
1230 | "sheet_name": sheet_name
1231 | })
1232 |
1233 | # Add headers if mentioned
1234 | if "header" in instruction_lower or "headers" in instruction_lower:
1235 | # Extract header information
1236 | header_data = None
1237 |
1238 | # Check parameters first
1239 | if parameters and "headers" in parameters:
1240 | header_data = parameters["headers"]
1241 | else:
1242 | # Try to extract from instruction
1243 | header_match = re.search(r"header(?:s)?\s*(?::|with|including)\s*([^.]+)", instruction_lower)
1244 | if header_match:
1245 | # Parse the header text
1246 | header_text = header_match.group(1).strip()
1247 | # Split by commas and/or 'and'
1248 | header_data = [h.strip("' \"").strip() for h in re.split(r',|\s+and\s+', header_text) if h.strip()]
1249 |
1250 | if header_data:
1251 | # Determine target sheet
1252 | target_sheet_name = None
1253 |
1254 | # Check parameters first
1255 | if parameters and "target_sheet" in parameters:
1256 | target_sheet_name = parameters["target_sheet"]
1257 | else:
1258 | # Try to extract from instruction
1259 | sheet_match = re.search(r"in (?:the |)(?:sheet|worksheet) (?:'([^']*)'|\"([^\"]*)\"|([A-Za-z0-9_]+))", instruction_lower)
1260 | if sheet_match:
1261 | target_sheet_name = sheet_match.group(1) or sheet_match.group(2) or sheet_match.group(3)
1262 |
1263 | # Default to first sheet if not specified
1264 | if not target_sheet_name:
1265 | target_sheet_name = workbook.Worksheets(1).Name
1266 |
1267 | # Find the worksheet
1268 | worksheet = None
1269 | for sheet in workbook.Worksheets:
1270 | if sheet.Name.lower() == target_sheet_name.lower():
1271 | worksheet = sheet
1272 | break
1273 |
1274 | if not worksheet:
1275 | worksheet = workbook.Worksheets(1)
1276 |
1277 | # Add headers to the worksheet
1278 | for col_idx, header in enumerate(header_data, 1):
1279 | worksheet.Cells(1, col_idx).Value = header
1280 |
1281 | # Apply simple header formatting
1282 | worksheet.Cells(1, col_idx).Font.Bold = True
1283 |
1284 | operations_performed.append({
1285 | "operation": "add_headers",
1286 | "sheet_name": worksheet.Name,
1287 | "headers": header_data
1288 | })
1289 |
1290 | # Add data if mentioned
1291 | if "data" in instruction_lower or "values" in instruction_lower:
1292 | # Check parameters first
1293 | data_rows = None
1294 |
1295 | if parameters and "data" in parameters:
1296 | data_rows = parameters["data"]
1297 |
1298 | if data_rows:
1299 | # Determine target sheet
1300 | target_sheet_name = None
1301 |
1302 | # Check parameters first
1303 | if parameters and "target_sheet" in parameters:
1304 | target_sheet_name = parameters["target_sheet"]
1305 | else:
1306 | # Try to extract from instruction
1307 | sheet_match = re.search(r"in (?:the |)(?:sheet|worksheet) (?:'([^']*)'|\"([^\"]*)\"|([A-Za-z0-9_]+))", instruction_lower)
1308 | if sheet_match:
1309 | target_sheet_name = sheet_match.group(1) or sheet_match.group(2) or sheet_match.group(3)
1310 |
1311 | # Default to first sheet if not specified
1312 | if not target_sheet_name:
1313 | target_sheet_name = workbook.Worksheets(1).Name
1314 |
1315 | # Find the worksheet
1316 | worksheet = None
1317 | for sheet in workbook.Worksheets:
1318 | if sheet.Name.lower() == target_sheet_name.lower():
1319 | worksheet = sheet
1320 | break
1321 |
1322 | if not worksheet:
1323 | worksheet = workbook.Worksheets(1)
1324 |
1325 | # Determine starting row (typically 2 if headers exist)
1326 | start_row = 2
1327 | if parameters and "start_row" in parameters:
1328 | start_row = parameters["start_row"]
1329 |
1330 | # Add data to the worksheet
1331 | for row_idx, row_data in enumerate(data_rows, start_row):
1332 | for col_idx, cell_value in enumerate(row_data, 1):
1333 | worksheet.Cells(row_idx, col_idx).Value = cell_value
1334 |
1335 | operations_performed.append({
1336 | "operation": "add_data",
1337 | "sheet_name": worksheet.Name,
1338 | "start_row": start_row,
1339 | "row_count": len(data_rows)
1340 | })
1341 |
1342 | # Add formulas if mentioned
1343 | if "formula" in instruction_lower or "formulas" in instruction_lower:
1344 | formula_data = None
1345 |
1346 | # Check parameters first
1347 | if parameters and "formulas" in parameters:
1348 | formula_data = parameters["formulas"]
1349 |
1350 | if formula_data:
1351 | # Determine target sheet
1352 | target_sheet_name = None
1353 |
1354 | # Check parameters first
1355 | if parameters and "target_sheet" in parameters:
1356 | target_sheet_name = parameters["target_sheet"]
1357 |
1358 | # Default to first sheet if not specified
1359 | if not target_sheet_name:
1360 | target_sheet_name = workbook.Worksheets(1).Name
1361 |
1362 | # Find the worksheet
1363 | worksheet = None
1364 | for sheet in workbook.Worksheets:
1365 | if sheet.Name.lower() == target_sheet_name.lower():
1366 | worksheet = sheet
1367 | break
1368 |
1369 | if not worksheet:
1370 | worksheet = workbook.Worksheets(1)
1371 |
1372 | # Add formulas to the worksheet
1373 | for formula_entry in formula_data:
1374 | cell_ref = formula_entry.get("cell")
1375 | formula = formula_entry.get("formula")
1376 |
1377 | if cell_ref and formula:
1378 | worksheet.Range(cell_ref).Formula = formula
1379 |
1380 | operations_performed.append({
1381 | "operation": "add_formulas",
1382 | "sheet_name": worksheet.Name,
1383 | "formula_count": len(formula_data)
1384 | })
1385 |
1386 | # Apply formatting if mentioned
1387 | if "format" in instruction_lower or "formatting" in instruction_lower:
1388 | formatting = None
1389 |
1390 | # Check parameters first
1391 | if parameters and "formatting" in parameters:
1392 | formatting = parameters["formatting"]
1393 |
1394 | if formatting:
1395 | await self._apply_formatting_to_workbook(
1396 | session=session,
1397 | workbook=workbook,
1398 | instruction=instruction,
1399 | parameters={"formatting": formatting}
1400 | )
1401 |
1402 | operations_performed.append({
1403 | "operation": "apply_formatting",
1404 | "details": "Applied formatting based on parameters"
1405 | })
1406 | else:
1407 | # Apply default formatting based on instruction
1408 | sheet = workbook.Worksheets(1)
1409 |
1410 | # Auto-fit columns
1411 | used_range = sheet.UsedRange
1412 | used_range.Columns.AutoFit()
1413 |
1414 | # Add borders to data range
1415 | if used_range.Rows.Count > 1:
1416 | data_range = sheet.Range(sheet.Cells(1, 1), sheet.Cells(used_range.Rows.Count, used_range.Columns.Count))
1417 | data_range.Borders.LineStyle = 1 # xlContinuous
1418 |
1419 | operations_performed.append({
1420 | "operation": "apply_formatting",
1421 | "details": "Applied default formatting (auto-fit columns, borders)"
1422 | })
1423 |
1424 | # Create a chart if mentioned
1425 | if "chart" in instruction_lower or "graph" in instruction_lower:
1426 | chart_type = None
1427 |
1428 | # Chart type mapping
1429 | CHART_TYPES = {
1430 | "column": win32c.xlColumnClustered,
1431 | "bar": win32c.xlBarClustered,
1432 | "line": win32c.xlLine,
1433 | "pie": win32c.xlPie,
1434 | "area": win32c.xlArea,
1435 | "scatter": win32c.xlXYScatter,
1436 | "radar": win32c.xlRadar,
1437 | "stock": win32c.xlStockHLC,
1438 | "surface": win32c.xlSurface,
1439 | "doughnut": win32c.xlDoughnut,
1440 | "bubble": win32c.xlBubble,
1441 | "combo": win32c.xl3DColumn
1442 | }
1443 |
1444 | # Check parameters first
1445 | if parameters and "chart" in parameters:
1446 | chart_info = parameters["chart"]
1447 | chart_type_str = chart_info.get("type", "column").lower()
1448 | data_range = chart_info.get("data_range")
1449 | chart_title = chart_info.get("title", "Chart")
1450 |
1451 | # Get chart type constant
1452 | chart_type = CHART_TYPES.get(chart_type_str, win32c.xlColumnClustered)
1453 |
1454 | if data_range:
1455 | # Determine target sheet
1456 | target_sheet_name = chart_info.get("sheet_name")
1457 |
1458 | # Default to first sheet if not specified
1459 | if not target_sheet_name:
1460 | target_sheet_name = workbook.Worksheets(1).Name
1461 |
1462 | # Find the worksheet
1463 | worksheet = None
1464 | for sheet in workbook.Worksheets:
1465 | if sheet.Name.lower() == target_sheet_name.lower():
1466 | worksheet = sheet
1467 | break
1468 |
1469 | if not worksheet:
1470 | worksheet = workbook.Worksheets(1)
1471 |
1472 | # Create the chart
1473 | chart = worksheet.Shapes.AddChart2(-1, chart_type).Chart
1474 | chart.SetSourceData(worksheet.Range(data_range))
1475 | chart.HasTitle = True
1476 | chart.ChartTitle.Text = chart_title
1477 |
1478 | operations_performed.append({
1479 | "operation": "create_chart",
1480 | "sheet_name": worksheet.Name,
1481 | "chart_type": chart_type_str,
1482 | "data_range": data_range
1483 | })
1484 |
1485 | return operations_performed
1486 |
1487 | async def _analyze_workbook(
1488 | self,
1489 | session: ExcelSession,
1490 | workbook: Any,
1491 | instruction: str,
1492 | parameters: Dict[str, Any]
1493 | ) -> Dict[str, Any]:
1494 | """Analyze a workbook based on instructions.
1495 |
1496 | This method examines the workbook structure, formulas, and data.
1497 |
1498 | Args:
1499 | session: Excel session
1500 | workbook: Workbook COM object
1501 | instruction: Analysis instruction
1502 | parameters: Optional structured parameters
1503 |
1504 | Returns:
1505 | Dictionary with analysis results
1506 | """
1507 | # Initialize result
1508 | analysis_results = {
1509 | "workbook_name": workbook.Name,
1510 | "sheet_count": workbook.Sheets.Count,
1511 | "sheets_info": [],
1512 | "has_formulas": False,
1513 | "has_links": workbook.HasLinks,
1514 | "calculation_mode": self._get_calculation_mode_name(workbook.CalculationMode),
1515 | }
1516 |
1517 | total_formulas = 0
1518 |
1519 | # Analyze each sheet
1520 | for sheet_idx in range(1, workbook.Sheets.Count + 1):
1521 | sheet = workbook.Sheets(sheet_idx)
1522 |
1523 | # Skip chart sheets
1524 | if sheet.Type != 1: # xlWorksheet
1525 | continue
1526 |
1527 | used_range = sheet.UsedRange
1528 |
1529 | # Get sheet details
1530 | sheet_info = {
1531 | "name": sheet.Name,
1532 | "row_count": used_range.Rows.Count if used_range else 0,
1533 | "column_count": used_range.Columns.Count if used_range else 0,
1534 | "visible": sheet.Visible == -1, # -1 is xlSheetVisible
1535 | "has_formulas": False,
1536 | "formula_count": 0,
1537 | "data_tables": False,
1538 | "has_charts": False,
1539 | "chart_count": 0,
1540 | "named_ranges": []
1541 | }
1542 |
1543 | # Check for charts
1544 | chart_objects = sheet.ChartObjects()
1545 | chart_count = chart_objects.Count
1546 | sheet_info["has_charts"] = chart_count > 0
1547 | sheet_info["chart_count"] = chart_count
1548 |
1549 | # Look for formulas
1550 | formula_cells = []
1551 | formula_count = 0
1552 |
1553 | if used_range:
1554 | # Sample used range cells to check for formulas (limit to reasonable number)
1555 | row_count = min(used_range.Rows.Count, 1000)
1556 | col_count = min(used_range.Columns.Count, 100)
1557 |
1558 | for row in range(1, row_count + 1):
1559 | for col in range(1, col_count + 1):
1560 | cell = used_range.Cells(row, col)
1561 | if cell.HasFormula:
1562 | formula_count += 1
1563 | if len(formula_cells) < 10: # Just store a few examples
1564 | cell_address = cell.Address(False, False) # A1 style without $
1565 | formula_cells.append({
1566 | "address": cell_address,
1567 | "formula": cell.Formula
1568 | })
1569 |
1570 | sheet_info["has_formulas"] = formula_count > 0
1571 | sheet_info["formula_count"] = formula_count
1572 | sheet_info["example_formulas"] = formula_cells
1573 |
1574 | total_formulas += formula_count
1575 |
1576 | # Get named ranges in this sheet
1577 | for name in workbook.Names:
1578 | try:
1579 | if name.RefersToRange.Parent.Name == sheet.Name:
1580 | sheet_info["named_ranges"].append({
1581 | "name": name.Name,
1582 | "refers_to": name.RefersTo
1583 | })
1584 | except Exception:
1585 | pass # Skip if there's an error (e.g., name refers to another workbook)
1586 |
1587 | analysis_results["sheets_info"].append(sheet_info)
1588 |
1589 | analysis_results["has_formulas"] = total_formulas > 0
1590 | analysis_results["total_formula_count"] = total_formulas
1591 |
1592 | # Check for external links
1593 | if workbook.HasLinks:
1594 | links = []
1595 | try:
1596 | for link in workbook.LinkSources():
1597 | links.append(link)
1598 | except Exception:
1599 | pass # Skip if there's an error
1600 |
1601 | analysis_results["external_links"] = links
1602 |
1603 | # Add sheet dependencies if requested
1604 | if "analyze_dependencies" in instruction.lower() or (parameters and parameters.get("analyze_dependencies")):
1605 | analysis_results["dependencies"] = await self._analyze_sheet_dependencies(session, workbook)
1606 |
1607 | # Add formula analysis if requested
1608 | if "analyze_formulas" in instruction.lower() or (parameters and parameters.get("analyze_formulas")):
1609 | analysis_results["formula_analysis"] = await self._analyze_formulas_in_workbook(session, workbook)
1610 |
1611 | return analysis_results
1612 |
1613 | async def _apply_formatting_to_workbook(
1614 | self,
1615 | session: ExcelSession,
1616 | workbook: Any,
1617 | instruction: str,
1618 | parameters: Dict[str, Any]
1619 | ) -> List[Dict[str, Any]]:
1620 | """Apply formatting to a workbook based on instructions.
1621 |
1622 | This method handles various formatting operations.
1623 |
1624 | Args:
1625 | session: Excel session
1626 | workbook: Workbook COM object
1627 | instruction: Formatting instruction
1628 | parameters: Optional structured parameters
1629 |
1630 | Returns:
1631 | List of formatting operations performed
1632 | """
1633 | formatting_applied = []
1634 |
1635 | # Check if specific formatting instructions are provided in parameters
1636 | if parameters and "formatting" in parameters:
1637 | formatting = parameters["formatting"]
1638 |
1639 | # Apply cell formatting
1640 | if "cells" in formatting:
1641 | for cell_format in formatting["cells"]:
1642 | cell_range = cell_format.get("range")
1643 | sheet_name = cell_format.get("sheet")
1644 |
1645 | if not cell_range:
1646 | continue
1647 |
1648 | # Find the worksheet
1649 | worksheet = None
1650 | if sheet_name:
1651 | for sheet in workbook.Worksheets:
1652 | if sheet.Name.lower() == sheet_name.lower():
1653 | worksheet = sheet
1654 | break
1655 |
1656 | if not worksheet:
1657 | worksheet = workbook.Worksheets(1)
1658 |
1659 | # Get the range
1660 | range_obj = worksheet.Range(cell_range)
1661 |
1662 | # Apply formatting attributes
1663 | if "bold" in cell_format:
1664 | range_obj.Font.Bold = cell_format["bold"]
1665 |
1666 | if "italic" in cell_format:
1667 | range_obj.Font.Italic = cell_format["italic"]
1668 |
1669 | if "color" in cell_format:
1670 | # Handle hex color codes (e.g., "#FF0000" for red)
1671 | color_code = cell_format["color"]
1672 | if color_code.startswith("#"):
1673 | # Convert hex color to RGB value
1674 | r = int(color_code[1:3], 16)
1675 | g = int(color_code[3:5], 16)
1676 | b = int(color_code[5:7], 16)
1677 | range_obj.Font.Color = b + (g << 8) + (r << 16)
1678 | else:
1679 | # Try to set color directly
1680 | range_obj.Font.Color = cell_format["color"]
1681 |
1682 | if "bg_color" in cell_format:
1683 | # Handle hex color codes
1684 | color_code = cell_format["bg_color"]
1685 | if color_code.startswith("#"):
1686 | # Convert hex color to RGB value
1687 | r = int(color_code[1:3], 16)
1688 | g = int(color_code[3:5], 16)
1689 | b = int(color_code[5:7], 16)
1690 | range_obj.Interior.Color = b + (g << 8) + (r << 16)
1691 | else:
1692 | # Try to set color directly
1693 | range_obj.Interior.Color = cell_format["bg_color"]
1694 |
1695 | if "number_format" in cell_format:
1696 | range_obj.NumberFormat = cell_format["number_format"]
1697 |
1698 | if "border" in cell_format:
1699 | border_style = cell_format["border"]
1700 | if border_style == "all":
1701 | for border_idx in range(7, 13): # xlEdgeLeft to xlInsideVertical
1702 | range_obj.Borders(border_idx).LineStyle = 1 # xlContinuous
1703 | range_obj.Borders(border_idx).Weight = 2 # xlThin
1704 | elif border_style == "outside":
1705 | for border_idx in range(7, 11): # xlEdgeLeft to xlEdgeRight
1706 | range_obj.Borders(border_idx).LineStyle = 1 # xlContinuous
1707 | range_obj.Borders(border_idx).Weight = 2 # xlThin
1708 |
1709 | formatting_applied.append({
1710 | "operation": "format_cells",
1711 | "sheet_name": worksheet.Name,
1712 | "range": cell_range
1713 | })
1714 |
1715 | # Apply table formatting
1716 | if "tables" in formatting:
1717 | for table_format in formatting["tables"]:
1718 | data_range = table_format.get("range")
1719 | sheet_name = table_format.get("sheet")
1720 | table_style = table_format.get("style", "TableStyleMedium2")
1721 | has_headers = table_format.get("has_headers", True)
1722 |
1723 | if not data_range:
1724 | continue
1725 |
1726 | # Find the worksheet
1727 | worksheet = None
1728 | if sheet_name:
1729 | for sheet in workbook.Worksheets:
1730 | if sheet.Name.lower() == sheet_name.lower():
1731 | worksheet = sheet
1732 | break
1733 |
1734 | if not worksheet:
1735 | worksheet = workbook.Worksheets(1)
1736 |
1737 | # Create a table
1738 | table_name = f"Table{len(worksheet.ListObjects) + 1}"
1739 | if "name" in table_format:
1740 | table_name = table_format["name"]
1741 |
1742 | try:
1743 | table = worksheet.ListObjects.Add(1, worksheet.Range(data_range), True)
1744 | table.Name = table_name
1745 | table.TableStyle = table_style
1746 |
1747 | formatting_applied.append({
1748 | "operation": "create_table",
1749 | "sheet_name": worksheet.Name,
1750 | "table_name": table_name,
1751 | "range": data_range
1752 | })
1753 | except Exception as e:
1754 | logger.warning(f"Failed to create table: {str(e)}")
1755 |
1756 | # Apply conditional formatting
1757 | if "conditional_formatting" in formatting:
1758 | for cf_format in formatting["conditional_formatting"]:
1759 | cell_range = cf_format.get("range")
1760 | sheet_name = cf_format.get("sheet")
1761 | cf_type = cf_format.get("type")
1762 |
1763 | if not cell_range or not cf_type:
1764 | continue
1765 |
1766 | # Find the worksheet
1767 | worksheet = None
1768 | if sheet_name:
1769 | for sheet in workbook.Worksheets:
1770 | if sheet.Name.lower() == sheet_name.lower():
1771 | worksheet = sheet
1772 | break
1773 |
1774 | if not worksheet:
1775 | worksheet = workbook.Worksheets(1)
1776 |
1777 | # Get the range
1778 | range_obj = worksheet.Range(cell_range)
1779 |
1780 | # Apply conditional formatting based on type
1781 | if cf_type == "data_bar":
1782 | color = cf_format.get("color", 43) # Default blue
1783 | if isinstance(color, str) and color.startswith("#"):
1784 | # Convert hex color to RGB value
1785 | r = int(color[1:3], 16)
1786 | g = int(color[3:5], 16)
1787 | b = int(color[5:7], 16)
1788 | color = b + (g << 8) + (r << 16)
1789 |
1790 | cf = range_obj.FormatConditions.AddDatabar()
1791 | cf.BarColor.Color = color
1792 |
1793 | elif cf_type == "color_scale":
1794 | cf = range_obj.FormatConditions.AddColorScale(3)
1795 | # Configure color scale (could be extended with more options)
1796 |
1797 | elif cf_type == "icon_set":
1798 | icon_style = cf_format.get("icon_style", "3Arrows")
1799 | cf = range_obj.FormatConditions.AddIconSetCondition()
1800 | cf.IconSet = workbook.Application.IconSets(icon_style)
1801 |
1802 | elif cf_type == "cell_value":
1803 | comparison_operator = cf_format.get("operator", "greaterThan")
1804 | comparison_value = cf_format.get("value", 0)
1805 |
1806 | # Map string operator to Excel constant
1807 | operator_map = {
1808 | "greaterThan": 3, # xlGreater
1809 | "lessThan": 5, # xlLess
1810 | "equalTo": 2, # xlEqual
1811 | "greaterOrEqual": 4, # xlGreaterEqual
1812 | "lessOrEqual": 6, # xlLessEqual
1813 | "notEqual": 7 # xlNotEqual
1814 | }
1815 |
1816 | operator_constant = operator_map.get(comparison_operator, 3)
1817 |
1818 | cf = range_obj.FormatConditions.Add(1, operator_constant, comparison_value) # 1 = xlCellValue
1819 |
1820 | # Apply formatting
1821 | if "bold" in cf_format:
1822 | cf.Font.Bold = cf_format["bold"]
1823 |
1824 | if "italic" in cf_format:
1825 | cf.Font.Italic = cf_format["italic"]
1826 |
1827 | if "color" in cf_format:
1828 | # Handle hex color codes
1829 | color_code = cf_format["color"]
1830 | if color_code.startswith("#"):
1831 | # Convert hex color to RGB value
1832 | r = int(color_code[1:3], 16)
1833 | g = int(color_code[3:5], 16)
1834 | b = int(color_code[5:7], 16)
1835 | cf.Font.Color = b + (g << 8) + (r << 16)
1836 | else:
1837 | cf.Font.Color = cf_format["color"]
1838 |
1839 | if "bg_color" in cf_format:
1840 | # Handle hex color codes
1841 | color_code = cf_format["bg_color"]
1842 | if color_code.startswith("#"):
1843 | # Convert hex color to RGB value
1844 | r = int(color_code[1:3], 16)
1845 | g = int(color_code[3:5], 16)
1846 | b = int(color_code[5:7], 16)
1847 | cf.Interior.Color = b + (g << 8) + (r << 16)
1848 | else:
1849 | cf.Interior.Color = cf_format["bg_color"]
1850 |
1851 | formatting_applied.append({
1852 | "operation": "add_conditional_formatting",
1853 | "sheet_name": worksheet.Name,
1854 | "range": cell_range,
1855 | "type": cf_type
1856 | })
1857 |
1858 | # Apply default formatting based on instruction if no specific formatting provided
1859 | elif not parameters or "formatting" not in parameters:
1860 | instruction_lower = instruction.lower()
1861 |
1862 | # Extract target sheet(s)
1863 | sheet_names = []
1864 | sheet_match = re.search(r"(?:in|to) (?:the |)(?:sheet|worksheet)(?:s|) (?:'([^']*)'|\"([^\"]*)\"|([A-Za-z0-9_, ]+))", instruction_lower)
1865 |
1866 | if sheet_match:
1867 | sheet_names_str = sheet_match.group(1) or sheet_match.group(2) or sheet_match.group(3)
1868 | # Split by commas and/or 'and'
1869 | for name in re.split(r',|\s+and\s+', sheet_names_str):
1870 | clean_name = name.strip("' \"").strip()
1871 | if clean_name:
1872 | sheet_names.append(clean_name)
1873 |
1874 | # If no sheets specified, use all worksheets
1875 | if not sheet_names:
1876 | sheet_names = [sheet.Name for sheet in workbook.Worksheets]
1877 |
1878 | for sheet_name in sheet_names:
1879 | # Find the worksheet
1880 | worksheet = None
1881 | for sheet in workbook.Worksheets:
1882 | if sheet.Name.lower() == sheet_name.lower():
1883 | worksheet = sheet
1884 | break
1885 |
1886 | if not worksheet:
1887 | continue
1888 |
1889 | # Apply standard formatting
1890 | used_range = worksheet.UsedRange
1891 |
1892 | # Auto-fit columns
1893 | if "auto-fit" in instruction_lower or "autofit" in instruction_lower:
1894 | used_range.Columns.AutoFit()
1895 |
1896 | formatting_applied.append({
1897 | "operation": "auto_fit_columns",
1898 | "sheet_name": worksheet.Name
1899 | })
1900 |
1901 | # Add borders to data
1902 | if "borders" in instruction_lower or "outline" in instruction_lower:
1903 | if used_range.Rows.Count > 0 and used_range.Columns.Count > 0:
1904 | # Apply borders
1905 | border_style = 1 # xlContinuous
1906 | border_weight = 2 # xlThin
1907 |
1908 | # Determine border type
1909 | if "outside" in instruction_lower:
1910 | # Outside borders only
1911 | for border_idx in range(7, 11): # xlEdgeLeft to xlEdgeRight
1912 | used_range.Borders(border_idx).LineStyle = border_style
1913 | used_range.Borders(border_idx).Weight = border_weight
1914 | else:
1915 | # All borders
1916 | used_range.Borders.LineStyle = border_style
1917 | used_range.Borders.Weight = border_weight
1918 |
1919 | formatting_applied.append({
1920 | "operation": "add_borders",
1921 | "sheet_name": worksheet.Name,
1922 | "border_type": "outside" if "outside" in instruction_lower else "all"
1923 | })
1924 |
1925 | # Format headers
1926 | if "header" in instruction_lower or "headers" in instruction_lower:
1927 | if used_range.Rows.Count > 0:
1928 | # Apply header formatting to first row
1929 | header_row = worksheet.Rows(1)
1930 | header_row.Font.Bold = True
1931 |
1932 | # Set background color if mentioned
1933 | if "blue" in instruction_lower:
1934 | header_row.Interior.Color = 15773696 # Light blue
1935 | elif "gray" in instruction_lower or "grey" in instruction_lower:
1936 | header_row.Interior.Color = 14540253 # Light gray
1937 | elif "green" in instruction_lower:
1938 | header_row.Interior.Color = 13561798 # Light green
1939 | else:
1940 | # Default light blue
1941 | header_row.Interior.Color = 15773696
1942 |
1943 | formatting_applied.append({
1944 | "operation": "format_headers",
1945 | "sheet_name": worksheet.Name
1946 | })
1947 |
1948 | # Apply number formatting
1949 | if "currency" in instruction_lower or "dollar" in instruction_lower:
1950 | # Look for ranges with currency values
1951 | # This is a simplistic approach - in a real tool, we might analyze the data
1952 | # to identify numeric columns that might be currency
1953 | if used_range.Rows.Count > 1: # Skip if only header row
1954 | for col in range(1, used_range.Columns.Count + 1):
1955 | # Check a sample of cells in this column
1956 | numeric_cell_count = 0
1957 | sample_size = min(10, used_range.Rows.Count - 1)
1958 |
1959 | for row in range(2, 2 + sample_size): # Skip header
1960 | cell_value = worksheet.Cells(row, col).Value
1961 | if isinstance(cell_value, (int, float)):
1962 | numeric_cell_count += 1
1963 |
1964 | # If most cells are numeric, apply currency format
1965 | if numeric_cell_count > sample_size / 2:
1966 | col_range = worksheet.Range(
1967 | worksheet.Cells(2, col),
1968 | worksheet.Cells(used_range.Rows.Count, col)
1969 | )
1970 |
1971 | # Determine currency symbol
1972 | currency_format = "$#,##0.00"
1973 | if "euro" in instruction_lower:
1974 | currency_format = "€#,##0.00"
1975 | elif "pound" in instruction_lower:
1976 | currency_format = "£#,##0.00"
1977 |
1978 | col_range.NumberFormat = currency_format
1979 |
1980 | formatting_applied.append({
1981 | "operation": "apply_currency_format",
1982 | "sheet_name": worksheet.Name,
1983 | "column": worksheet.Cells(1, col).Value or f"Column {col}",
1984 | "format": currency_format
1985 | })
1986 |
1987 | # Apply percentage formatting
1988 | if "percent" in instruction_lower or "percentage" in instruction_lower:
1989 | # Similar approach to currency formatting
1990 | if used_range.Rows.Count > 1: # Skip if only header row
1991 | for col in range(1, used_range.Columns.Count + 1):
1992 | col_header = worksheet.Cells(1, col).Value
1993 |
1994 | # Check if column header suggests percentage
1995 | is_percentage_column = False
1996 | if col_header and isinstance(col_header, str):
1997 | if any(term in col_header.lower() for term in ["percent", "rate", "growth", "change", "margin"]):
1998 | is_percentage_column = True
1999 |
2000 | if is_percentage_column:
2001 | col_range = worksheet.Range(
2002 | worksheet.Cells(2, col),
2003 | worksheet.Cells(used_range.Rows.Count, col)
2004 | )
2005 |
2006 | col_range.NumberFormat = "0.0%"
2007 |
2008 | formatting_applied.append({
2009 | "operation": "apply_percentage_format",
2010 | "sheet_name": worksheet.Name,
2011 | "column": col_header or f"Column {col}"
2012 | })
2013 |
2014 | # Create a table if requested
2015 | if "table" in instruction_lower and "style" in instruction_lower:
2016 | if used_range.Rows.Count > 0 and used_range.Columns.Count > 0:
2017 | # Create a table with the used range
2018 | try:
2019 | has_headers = True
2020 | if "no header" in instruction_lower:
2021 | has_headers = False
2022 |
2023 | table = worksheet.ListObjects.Add(1, used_range, has_headers)
2024 |
2025 | # Set table style
2026 | table_style = "TableStyleMedium2" # Default medium blue
2027 |
2028 | if "light" in instruction_lower:
2029 | if "blue" in instruction_lower:
2030 | table_style = "TableStyleLight1"
2031 | elif "green" in instruction_lower:
2032 | table_style = "TableStyleLight5"
2033 | elif "orange" in instruction_lower:
2034 | table_style = "TableStyleLight3"
2035 | elif "medium" in instruction_lower:
2036 | if "blue" in instruction_lower:
2037 | table_style = "TableStyleMedium2"
2038 | elif "green" in instruction_lower:
2039 | table_style = "TableStyleMedium5"
2040 | elif "orange" in instruction_lower:
2041 | table_style = "TableStyleMedium3"
2042 | elif "dark" in instruction_lower:
2043 | if "blue" in instruction_lower:
2044 | table_style = "TableStyleDark2"
2045 | elif "green" in instruction_lower:
2046 | table_style = "TableStyleDark5"
2047 | elif "orange" in instruction_lower:
2048 | table_style = "TableStyleDark3"
2049 |
2050 | table.TableStyle = table_style
2051 |
2052 | formatting_applied.append({
2053 | "operation": "create_table",
2054 | "sheet_name": worksheet.Name,
2055 | "style": table_style
2056 | })
2057 | except Exception as e:
2058 | logger.warning(f"Failed to create table: {str(e)}")
2059 |
2060 | return formatting_applied
2061 |
2062 | async def _analyze_excel_template(
2063 | self,
2064 | session: ExcelSession,
2065 | exemplar_path: str,
2066 | parameters: Dict[str, Any] = None
2067 | ) -> Dict[str, Any]:
2068 | """Analyze an Excel template to understand its structure and formulas.
2069 |
2070 | This method examines the provided Excel file to understand its structure,
2071 | formulas, data patterns, and features used. The analysis is used for
2072 | adapting the template to a new context.
2073 |
2074 | Args:
2075 | session: ExcelSession instance to use
2076 | exemplar_path: Path to the Excel file to analyze (already validated)
2077 | parameters: Optional parameters to guide the analysis
2078 |
2079 | Returns:
2080 | Dictionary containing the template analysis
2081 | """
2082 | parameters = parameters or {}
2083 |
2084 | try:
2085 | # Open the workbook - path is already validated by the caller
2086 | workbook = session.open_workbook(exemplar_path)
2087 |
2088 | analysis = {
2089 | "worksheets": [],
2090 | "formulas": {},
2091 | "data_tables": [],
2092 | "named_ranges": [],
2093 | "pivot_tables": [],
2094 | "charts": [],
2095 | "complex_features": []
2096 | }
2097 |
2098 | # Analyze worksheets
2099 | for sheet in workbook.Worksheets:
2100 | sheet_analysis = {
2101 | "name": sheet.Name,
2102 | "used_range": f"{sheet.UsedRange.Address}",
2103 | "columns": {},
2104 | "rows": {},
2105 | "formulas": [],
2106 | "data_patterns": []
2107 | }
2108 |
2109 | # Identify data patterns and column types
2110 | # This is a simplified analysis - in practice would be more complex
2111 | used_range = sheet.UsedRange
2112 | if used_range:
2113 | # Sample column headers
2114 | for col in range(1, min(used_range.Columns.Count + 1, 50)):
2115 | header = used_range.Cells(1, col).Value
2116 | if header:
2117 | sheet_analysis["columns"][col] = {
2118 | "header": str(header),
2119 | "type": "unknown" # Would determine type in real analysis
2120 | }
2121 |
2122 | # Sample formula patterns (simplified)
2123 | for row in range(2, min(used_range.Rows.Count + 1, 20)):
2124 | for col in range(1, min(used_range.Columns.Count + 1, 20)):
2125 | cell = used_range.Cells(row, col)
2126 | if cell.HasFormula:
2127 | sheet_analysis["formulas"].append({
2128 | "address": cell.Address,
2129 | "formula": cell.Formula,
2130 | "type": "calculation" # Would classify formula type
2131 | })
2132 |
2133 | analysis["worksheets"].append(sheet_analysis)
2134 |
2135 | # Look for named ranges
2136 | for name in workbook.Names:
2137 | try:
2138 | analysis["named_ranges"].append({
2139 | "name": name.Name,
2140 | "refers_to": name.RefersTo
2141 | })
2142 | except Exception:
2143 | # Some name objects might be invalid or hidden
2144 | pass
2145 |
2146 | # Identify charts (simplified approach)
2147 | for sheet in workbook.Worksheets:
2148 | if sheet.ChartObjects.Count > 0:
2149 | sheet_charts = []
2150 | for chart_idx in range(1, sheet.ChartObjects.Count + 1):
2151 | chart = sheet.ChartObjects(chart_idx)
2152 | sheet_charts.append({
2153 | "name": chart.Name,
2154 | "type": str(chart.Chart.ChartType)
2155 | })
2156 |
2157 | analysis["charts"].append({
2158 | "sheet": sheet.Name,
2159 | "charts": sheet_charts
2160 | })
2161 |
2162 | # Close without saving
2163 | workbook.Close(SaveChanges=False)
2164 |
2165 | return analysis
2166 |
2167 | except Exception as e:
2168 | logger.error(f"Error analyzing Excel template: {str(e)}", exc_info=True)
2169 | raise ToolError(f"Failed to analyze Excel template: {str(e)}") from e
2170 |
2171 | async def _apply_excel_template(
2172 | self,
2173 | session: ExcelSession,
2174 | exemplar_path: str,
2175 | output_path: str,
2176 | data: Dict[str, Any],
2177 | parameters: Dict[str, Any] = None
2178 | ) -> Dict[str, Any]:
2179 | """Apply an Excel template with new data.
2180 |
2181 | This method takes an exemplar Excel file, modifies it with new data
2182 | according to the provided parameters, and saves it to a new location.
2183 |
2184 | Args:
2185 | session: ExcelSession instance to use
2186 | exemplar_path: Path to the Excel template file (already validated)
2187 | output_path: Path where the modified file will be saved (already validated)
2188 | data: New data to apply to the template
2189 | parameters: Optional parameters to guide template application
2190 |
2191 | Returns:
2192 | Dictionary containing the results of the template application
2193 | """
2194 | parameters = parameters or {}
2195 |
2196 | try:
2197 | # Open the template workbook - path is already validated by the caller
2198 | template = session.open_workbook(exemplar_path)
2199 |
2200 | # Track modifications for reporting
2201 | modifications = {
2202 | "cells_modified": 0,
2203 | "sheets_modified": set(),
2204 | "data_mappings": []
2205 | }
2206 |
2207 | # Process each data mapping
2208 | for mapping in data.get("mappings", []):
2209 | sheet_name = mapping.get("sheet")
2210 | if not sheet_name:
2211 | continue
2212 |
2213 | # Find the target sheet
2214 | sheet = None
2215 | for s in template.Worksheets:
2216 | if s.Name == sheet_name:
2217 | sheet = s
2218 | break
2219 |
2220 | if not sheet:
2221 | logger.warning(f"Sheet '{sheet_name}' not found in template")
2222 | continue
2223 |
2224 | # Process this sheet's mappings
2225 | target_range = mapping.get("range")
2226 | values = mapping.get("values", [])
2227 |
2228 | if target_range and values:
2229 | try:
2230 | # Apply values to the range
2231 | range_obj = sheet.Range(target_range)
2232 |
2233 | # Handle different data structures based on the shape of values
2234 | if isinstance(values, list):
2235 | if len(values) > 0 and isinstance(values[0], list):
2236 | # 2D array of values
2237 | for row_idx, row_data in enumerate(values):
2238 | for col_idx, cell_value in enumerate(row_data):
2239 | if row_idx < range_obj.Rows.Count and col_idx < range_obj.Columns.Count:
2240 | cell = range_obj.Cells(row_idx + 1, col_idx + 1)
2241 | cell.Value = cell_value
2242 | modifications["cells_modified"] += 1
2243 | else:
2244 | # 1D array of values - apply to a single row or column
2245 | if range_obj.Rows.Count == 1:
2246 | # Apply horizontally
2247 | for col_idx, cell_value in enumerate(values):
2248 | if col_idx < range_obj.Columns.Count:
2249 | cell = range_obj.Cells(1, col_idx + 1)
2250 | cell.Value = cell_value
2251 | modifications["cells_modified"] += 1
2252 | else:
2253 | # Apply vertically
2254 | for row_idx, cell_value in enumerate(values):
2255 | if row_idx < range_obj.Rows.Count:
2256 | cell = range_obj.Cells(row_idx + 1, 1)
2257 | cell.Value = cell_value
2258 | modifications["cells_modified"] += 1
2259 | else:
2260 | # Single value - apply to entire range
2261 | range_obj.Value = values
2262 | modifications["cells_modified"] += 1
2263 |
2264 | modifications["sheets_modified"].add(sheet_name)
2265 | modifications["data_mappings"].append({
2266 | "sheet": sheet_name,
2267 | "range": target_range,
2268 | "values_applied": True
2269 | })
2270 |
2271 | except Exception as e:
2272 | logger.error(f"Error applying values to range {target_range} in sheet {sheet_name}: {str(e)}", exc_info=True)
2273 | modifications["data_mappings"].append({
2274 | "sheet": sheet_name,
2275 | "range": target_range,
2276 | "values_applied": False,
2277 | "error": str(e)
2278 | })
2279 |
2280 | # Recalculate formulas
2281 | template.Application.CalculateFull()
2282 |
2283 | # Save the workbook to the specified output path
2284 | template.SaveAs(output_path)
2285 | template.Close(SaveChanges=False)
2286 |
2287 | # Create result object
2288 | result = {
2289 | "success": True,
2290 | "exemplar_path": exemplar_path,
2291 | "output_path": output_path,
2292 | "cells_modified": modifications["cells_modified"],
2293 | "sheets_modified": list(modifications["sheets_modified"]),
2294 | "mappings_applied": modifications["data_mappings"]
2295 | }
2296 |
2297 | return result
2298 |
2299 | except Exception as e:
2300 | logger.error(f"Error applying Excel template: {str(e)}", exc_info=True)
2301 | raise ToolError(f"Failed to apply Excel template: {str(e)}") from e
2302 |
2303 | async def _analyze_formulas_in_workbook(self, session, workbook):
2304 | """Analyze formulas across a workbook.
2305 |
2306 | Args:
2307 | session: Excel session
2308 | workbook: Workbook COM object
2309 |
2310 | Returns:
2311 | Dictionary with formula analysis results
2312 | """
2313 | # Initialize results
2314 | analysis = {
2315 | "total_formulas": 0,
2316 | "sheets_with_formulas": 0,
2317 | "formula_categories": {},
2318 | "complexity": {
2319 | "simple": 0,
2320 | "moderate": 0,
2321 | "complex": 0,
2322 | "very_complex": 0
2323 | },
2324 | "samples": {}
2325 | }
2326 |
2327 | # Function categories to track
2328 | categories = {
2329 | "mathematical": ["SUM", "AVERAGE", "MIN", "MAX", "COUNT", "PRODUCT", "ROUND"],
2330 | "logical": ["IF", "AND", "OR", "NOT", "SWITCH", "IFS"],
2331 | "lookup": ["VLOOKUP", "HLOOKUP", "INDEX", "MATCH", "XLOOKUP"],
2332 | "text": ["CONCATENATE", "LEFT", "RIGHT", "MID", "FIND", "SEARCH", "REPLACE"],
2333 | "date": ["TODAY", "NOW", "DATE", "DAY", "MONTH", "YEAR"],
2334 | "financial": ["PMT", "RATE", "NPV", "IRR", "FV", "PV"],
2335 | "statistical": ["STDEV", "VAR", "AVERAGE", "MEDIAN", "PERCENTILE"],
2336 | "reference": ["INDIRECT", "OFFSET", "ADDRESS", "ROW", "COLUMN"],
2337 | "database": ["DSUM", "DAVERAGE", "DCOUNT", "DGET"]
2338 | }
2339 |
2340 | for category in categories:
2341 | analysis["formula_categories"][category] = 0
2342 | analysis["samples"][category] = []
2343 |
2344 | # Analyze each sheet
2345 | for sheet_idx in range(1, workbook.Sheets.Count + 1):
2346 | sheet = workbook.Sheets(sheet_idx)
2347 |
2348 | # Skip chart sheets
2349 | if sheet.Type != 1: # xlWorksheet
2350 | continue
2351 |
2352 | used_range = sheet.UsedRange
2353 |
2354 | if not used_range:
2355 | continue
2356 |
2357 | sheet_has_formulas = False
2358 | sheet_formula_count = 0
2359 |
2360 | # Check cells for formulas
2361 | for row in range(1, min(used_range.Rows.Count, 1000) + 1):
2362 | for col in range(1, min(used_range.Columns.Count, 100) + 1):
2363 | try:
2364 | cell = used_range.Cells(row, col)
2365 |
2366 | if cell.HasFormula:
2367 | formula = cell.Formula
2368 | sheet_has_formulas = True
2369 | sheet_formula_count += 1
2370 | analysis["total_formulas"] += 1
2371 |
2372 | # Categorize formula
2373 | formula_upper = formula.upper()
2374 | categorized = False
2375 |
2376 | for category, functions in categories.items():
2377 | for func in functions:
2378 | if func.upper() + "(" in formula_upper:
2379 | analysis["formula_categories"][category] += 1
2380 |
2381 | # Store a sample if needed
2382 | if len(analysis["samples"][category]) < 3:
2383 | analysis["samples"][category].append({
2384 | "sheet": sheet.Name,
2385 | "cell": cell.Address(False, False),
2386 | "formula": formula
2387 | })
2388 |
2389 | categorized = True
2390 | break
2391 |
2392 | if categorized:
2393 | break
2394 |
2395 | # Assess complexity
2396 | complexity = self._assess_formula_complexity(formula)
2397 | analysis["complexity"][complexity] += 1
2398 | except Exception:
2399 | pass # Skip cells with errors
2400 |
2401 | if sheet_has_formulas:
2402 | analysis["sheets_with_formulas"] += 1
2403 |
2404 | return analysis
2405 |
2406 | def _categorize_template(self, template_analysis):
2407 | """Categorize a template based on its structure and contents.
2408 |
2409 | Args:
2410 | template_analysis: Analysis of the template
2411 |
2412 | Returns:
2413 | String indicating the template category
2414 | """
2415 | # Extract relevant information from analysis
2416 | sheets = template_analysis.get("worksheets", [])
2417 | sheet_names = [s.get("name", "").lower() for s in sheets]
2418 |
2419 | # Look for common sheet patterns
2420 | has_financial_sheets = any(name in ["income", "balance", "cash flow", "forecast", "budget", "revenue"] for name in sheet_names)
2421 | has_project_sheets = any(name in ["tasks", "timeline", "gantt", "resources", "schedule"] for name in sheet_names)
2422 | has_dashboard_sheets = any(name in ["dashboard", "summary", "overview", "kpi", "metrics"] for name in sheet_names)
2423 | has_data_sheets = any(name in ["data", "raw data", "source", "input"] for name in sheet_names)
2424 |
2425 | # Check formula patterns
2426 | formula_patterns = []
2427 | for sheet in sheets:
2428 | formula_patterns.extend(sheet.get("formula_patterns", []))
2429 |
2430 | # Count pattern types
2431 | financial_formulas = sum(p.get("count", 0) for p in formula_patterns if p.get("pattern") in ["sum", "calculation"])
2432 | lookup_formulas = sum(p.get("count", 0) for p in formula_patterns if p.get("pattern") in ["lookup", "reference"])
2433 |
2434 | # Determine category based on collected information
2435 | if has_financial_sheets and financial_formulas > 0:
2436 | if "forecast" in " ".join(sheet_names) or "projection" in " ".join(sheet_names):
2437 | return "financial_forecast"
2438 | elif "budget" in " ".join(sheet_names):
2439 | return "budget"
2440 | else:
2441 | return "financial"
2442 |
2443 | elif has_project_sheets:
2444 | return "project_management"
2445 |
2446 | elif has_dashboard_sheets and has_data_sheets:
2447 | return "dashboard"
2448 |
2449 | elif lookup_formulas > financial_formulas:
2450 | return "data_analysis"
2451 |
2452 | else:
2453 | return "general"
2454 |
2455 | def _adapt_text_to_context(self, text, context):
2456 | """Adapt text based on context for template adaptation.
2457 |
2458 | Args:
2459 | text: Original text string
2460 | context: Context description
2461 |
2462 | Returns:
2463 | Adapted text
2464 | """
2465 | if not text or not isinstance(text, str):
2466 | return text
2467 |
2468 | # Check what industry or domain is mentioned in the context
2469 | context_lower = context.lower()
2470 | industry = None
2471 |
2472 | # Try to detect the target industry or domain
2473 | if "healthcare" in context_lower or "medical" in context_lower or "hospital" in context_lower:
2474 | industry = "healthcare"
2475 | elif "tech" in context_lower or "software" in context_lower or "saas" in context_lower:
2476 | industry = "technology"
2477 | elif "retail" in context_lower or "shop" in context_lower or "store" in context_lower:
2478 | industry = "retail"
2479 | elif "finance" in context_lower or "bank" in context_lower or "investment" in context_lower:
2480 | industry = "finance"
2481 | elif "education" in context_lower or "school" in context_lower or "university" in context_lower:
2482 | industry = "education"
2483 | elif "manufacturing" in context_lower or "factory" in context_lower:
2484 | industry = "manufacturing"
2485 | elif "real estate" in context_lower or "property" in context_lower:
2486 | industry = "real_estate"
2487 |
2488 | # If no industry detected, return original text
2489 | if not industry:
2490 | return text
2491 |
2492 | # Adapt common business terms based on industry
2493 | text_lower = text.lower()
2494 |
2495 | # Handle healthcare industry adaptations
2496 | if industry == "healthcare":
2497 | if "customer" in text_lower:
2498 | return text.replace("Customer", "Patient").replace("customer", "patient")
2499 | elif "sales" in text_lower:
2500 | return text.replace("Sales", "Services").replace("sales", "services")
2501 | elif "product" in text_lower:
2502 | return text.replace("Product", "Treatment").replace("product", "treatment")
2503 | elif "revenue" in text_lower and "healthcare revenue" not in text_lower:
2504 | return text.replace("Revenue", "Healthcare Revenue").replace("revenue", "healthcare revenue")
2505 |
2506 | # Handle technology industry adaptations
2507 | elif industry == "technology":
2508 | if "customer" in text_lower:
2509 | return text.replace("Customer", "User").replace("customer", "user")
2510 | elif "sales" in text_lower:
2511 | return text.replace("Sales", "Subscriptions").replace("sales", "subscriptions")
2512 | elif "product" in text_lower:
2513 | return text.replace("Product", "Solution").replace("product", "solution")
2514 |
2515 | # Handle retail industry adaptations
2516 | elif industry == "retail":
2517 | if "customer" in text_lower:
2518 | return text.replace("Customer", "Shopper").replace("customer", "shopper")
2519 | elif "sales" in text_lower:
2520 | return text.replace("Sales", "Retail Sales").replace("sales", "retail sales")
2521 |
2522 | # Handle finance industry adaptations
2523 | elif industry == "finance":
2524 | if "customer" in text_lower:
2525 | return text.replace("Customer", "Client").replace("customer", "client")
2526 | elif "product" in text_lower:
2527 | return text.replace("Product", "Financial Product").replace("product", "financial product")
2528 |
2529 | # Handle education industry adaptations
2530 | elif industry == "education":
2531 | if "customer" in text_lower:
2532 | return text.replace("Customer", "Student").replace("customer", "student")
2533 | elif "sales" in text_lower:
2534 | return text.replace("Sales", "Enrollments").replace("sales", "enrollments")
2535 | elif "product" in text_lower:
2536 | return text.replace("Product", "Course").replace("product", "course")
2537 |
2538 | # Default - return original text
2539 | return text
2540 |
2541 | def _explain_formula(self, formula):
2542 | """Generate a natural language explanation of an Excel formula.
2543 |
2544 | Args:
2545 | formula: Excel formula string
2546 |
2547 | Returns:
2548 | Human-readable explanation
2549 | """
2550 | formula_upper = formula.upper()
2551 |
2552 | # SUM function
2553 | if "SUM(" in formula_upper:
2554 | match = re.search(r"SUM\(([^)]+)\)", formula_upper)
2555 | if match:
2556 | range_str = match.group(1)
2557 | return f"This formula calculates the sum of values in the range {range_str}."
2558 |
2559 | # AVERAGE function
2560 | elif "AVERAGE(" in formula_upper:
2561 | match = re.search(r"AVERAGE\(([^)]+)\)", formula_upper)
2562 | if match:
2563 | range_str = match.group(1)
2564 | return f"This formula calculates the average (mean) of values in the range {range_str}."
2565 |
2566 | # VLOOKUP function
2567 | elif "VLOOKUP(" in formula_upper:
2568 | params = formula_upper.split("VLOOKUP(")[1].split(")", 1)[0].split(",")
2569 | lookup_value = params[0] if len(params) > 0 else "?"
2570 | table_array = params[1] if len(params) > 1 else "?"
2571 | col_index = params[2] if len(params) > 2 else "?"
2572 | exact_match = "FALSE" in params[3] if len(params) > 3 else False
2573 |
2574 | match_type = "exact match" if exact_match else "closest match (approximate match)"
2575 | return f"This formula looks up {lookup_value} in the first column of {table_array}, and returns the value from column {col_index}. It finds the {match_type}."
2576 |
2577 | # IF function
2578 | elif "IF(" in formula_upper:
2579 | try:
2580 | # This is a simplistic parsing - real parsing would be more complex
2581 | content = formula_upper.split("IF(")[1].split(")", 1)[0]
2582 | parts = []
2583 | depth = 0
2584 | current = ""
2585 |
2586 | for char in content:
2587 | if char == "," and depth == 0:
2588 | parts.append(current)
2589 | current = ""
2590 | else:
2591 | if char == "(":
2592 | depth += 1
2593 | elif char == ")":
2594 | depth -= 1
2595 | current += char
2596 |
2597 | if current:
2598 | parts.append(current)
2599 |
2600 | condition = parts[0] if len(parts) > 0 else "?"
2601 | true_value = parts[1] if len(parts) > 1 else "?"
2602 | false_value = parts[2] if len(parts) > 2 else "?"
2603 |
2604 | return f"This formula tests if {condition}. If true, it returns {true_value}, otherwise it returns {false_value}."
2605 | except Exception:
2606 | # Fallback if parsing fails
2607 | return "This formula uses an IF statement to return different values based on a condition."
2608 |
2609 | # INDEX/MATCH
2610 | elif "INDEX(" in formula_upper and "MATCH(" in formula_upper:
2611 | return "This formula uses the INDEX/MATCH combination to look up a value in a table. INDEX returns a value at a specific position, and MATCH finds the position of a lookup value."
2612 |
2613 | # Simple calculations
2614 | elif "+" in formula or "-" in formula or "*" in formula or "/" in formula:
2615 | # Check if it's a simple calculation without functions
2616 | if not any(func in formula_upper for func in ["SUM(", "AVERAGE(", "IF(", "VLOOKUP(", "INDEX("]):
2617 | operations = []
2618 | if "+" in formula:
2619 | operations.append("addition")
2620 | if "-" in formula:
2621 | operations.append("subtraction")
2622 | if "*" in formula:
2623 | operations.append("multiplication")
2624 | if "/" in formula:
2625 | operations.append("division")
2626 |
2627 | ops_text = " and ".join(operations)
2628 | return f"This formula performs {ops_text} on the specified values or cell references."
2629 |
2630 | # Fallback for unrecognized or complex formulas
2631 | return "This formula performs a calculation on the referenced cells. For complex formulas, consider breaking it down into its component parts to understand it better."
2632 |
2633 | def _categorize_formula(self, formula):
2634 | """Categorize a formula based on its functions and structure.
2635 |
2636 | Args:
2637 | formula: Excel formula string
2638 |
2639 | Returns:
2640 | Category string
2641 | """
2642 | formula_upper = formula.upper()
2643 |
2644 | # Mathematical
2645 | if any(func in formula_upper for func in ["SUM(", "AVERAGE(", "MIN(", "MAX(", "COUNT(", "PRODUCT(", "ROUND("]):
2646 | return "mathematical"
2647 |
2648 | # Logical
2649 | elif any(func in formula_upper for func in ["IF(", "AND(", "OR(", "NOT(", "SWITCH(", "IFS("]):
2650 | return "logical"
2651 |
2652 | # Lookup
2653 | elif any(func in formula_upper for func in ["VLOOKUP(", "HLOOKUP(", "INDEX(", "MATCH(", "XLOOKUP("]):
2654 | return "lookup"
2655 |
2656 | # Text
2657 | elif any(func in formula_upper for func in ["CONCATENATE(", "LEFT(", "RIGHT(", "MID(", "FIND(", "SEARCH(", "REPLACE("]):
2658 | return "text"
2659 |
2660 | # Date
2661 | elif any(func in formula_upper for func in ["TODAY(", "NOW(", "DATE(", "DAY(", "MONTH(", "YEAR("]):
2662 | return "date"
2663 |
2664 | # Financial
2665 | elif any(func in formula_upper for func in ["PMT(", "RATE(", "NPV(", "IRR(", "FV(", "PV("]):
2666 | return "financial"
2667 |
2668 | # Statistical
2669 | elif any(func in formula_upper for func in ["STDEV(", "VAR(", "MEDIAN(", "PERCENTILE("]):
2670 | return "statistical"
2671 |
2672 | # Reference
2673 | elif any(func in formula_upper for func in ["INDIRECT(", "OFFSET(", "ADDRESS(", "ROW(", "COLUMN("]):
2674 | return "reference"
2675 |
2676 | # Database
2677 | elif any(func in formula_upper for func in ["DSUM(", "DAVERAGE(", "DCOUNT(", "DGET("]):
2678 | return "database"
2679 |
2680 | # Simple calculation
2681 | elif any(op in formula for op in ["+", "-", "*", "/"]):
2682 | return "calculation"
2683 |
2684 | # Default/unknown
2685 | return "other"
2686 |
2687 | def _assess_formula_complexity(self, formula):
2688 | """Assess the complexity of a formula.
2689 |
2690 | Args:
2691 | formula: Excel formula string
2692 |
2693 | Returns:
2694 | Complexity level (simple, moderate, complex, very_complex)
2695 | """
2696 | # Count various aspects of the formula
2697 | formula_length = len(formula)
2698 | function_count = formula.upper().count("(")
2699 | nesting_level = 0
2700 | max_nesting = 0
2701 |
2702 | # Calculate nesting depth
2703 | for char in formula:
2704 | if char == "(":
2705 | nesting_level += 1
2706 | max_nesting = max(max_nesting, nesting_level)
2707 | elif char == ")":
2708 | nesting_level -= 1
2709 |
2710 | # Count references
2711 | reference_count = len(re.findall(r"[A-Z]+[0-9]+(?::[A-Z]+[0-9]+)?", formula))
2712 |
2713 | # Count operators
2714 | operator_count = sum(formula.count(op) for op in ["+", "-", "*", "/", "=", "<", ">", "&"])
2715 |
2716 | # Calculate a weighted complexity score
2717 | score = (
2718 | min(10, formula_length / 40) + # Length: max 10 points
2719 | function_count * 1.5 + # Functions: 1.5 points each
2720 | max_nesting * 2 + # Max nesting: 2 points per level
2721 | reference_count * 0.5 + # References: 0.5 points each
2722 | operator_count * 0.5 # Operators: 0.5 points each
2723 | )
2724 |
2725 | # Determine complexity level
2726 | if score < 5:
2727 | return "simple"
2728 | elif score < 10:
2729 | return "moderate"
2730 | elif score < 20:
2731 | return "complex"
2732 | else:
2733 | return "very_complex"
2734 |
2735 | def _get_formula_dependency_level(self, formula):
2736 | """Determine how many other cells a formula depends on.
2737 |
2738 | Args:
2739 | formula: Excel formula string
2740 |
2741 | Returns:
2742 | Dependency level (low, medium, high)
2743 | """
2744 | # Count cell references and ranges
2745 | references = re.findall(r"[A-Z]+[0-9]+(?::[A-Z]+[0-9]+)?", formula)
2746 |
2747 | # Count individual cells
2748 | cell_count = 0
2749 | for ref in references:
2750 | if ":" in ref:
2751 | # It's a range
2752 | try:
2753 | start, end = ref.split(":")
2754 | start_col = re.search(r"[A-Z]+", start).group(0)
2755 | start_row = int(re.search(r"[0-9]+", start).group(0))
2756 | end_col = re.search(r"[A-Z]+", end).group(0)
2757 | end_row = int(re.search(r"[0-9]+", end).group(0))
2758 |
2759 | # Convert column letters to numbers
2760 | start_col_num = 0
2761 | for char in start_col:
2762 | start_col_num = start_col_num * 26 + (ord(char) - ord('A') + 1)
2763 |
2764 | end_col_num = 0
2765 | for char in end_col:
2766 | end_col_num = end_col_num * 26 + (ord(char) - ord('A') + 1)
2767 |
2768 | # Calculate cells in range
2769 | cells_in_range = (end_row - start_row + 1) * (end_col_num - start_col_num + 1)
2770 | cell_count += cells_in_range
2771 | except Exception:
2772 | # Fallback if parsing fails
2773 | cell_count += 10 # Assume a moderate size range
2774 | else:
2775 | # Single cell
2776 | cell_count += 1
2777 |
2778 | # Determine dependency level
2779 | if cell_count <= 3:
2780 | return "low"
2781 | elif cell_count <= 10:
2782 | return "medium"
2783 | else:
2784 | return "high"
2785 |
2786 | def _check_formula_volatility(self, formula):
2787 | """Check if a formula contains volatile functions.
2788 |
2789 | Args:
2790 | formula: Excel formula string
2791 |
2792 | Returns:
2793 | Volatility level (none, low, high)
2794 | """
2795 | formula_upper = formula.upper()
2796 |
2797 | # Highly volatile functions
2798 | high_volatility = ["NOW(", "TODAY(", "RAND(", "RANDBETWEEN("]
2799 | if any(func in formula_upper for func in high_volatility):
2800 | return "high"
2801 |
2802 | # Low volatility functions
2803 | low_volatility = ["OFFSET(", "INDIRECT(", "CELL(", "INFO("]
2804 | if any(func in formula_upper for func in low_volatility):
2805 | return "low"
2806 |
2807 | # Non-volatile
2808 | return "none"
2809 |
2810 | def _get_calculation_mode_name(self, mode_value):
2811 | """Convert Excel calculation mode numeric value to name.
2812 |
2813 | Args:
2814 | mode_value: Numeric value of calculation mode
2815 |
2816 | Returns:
2817 | String name of calculation mode
2818 | """
2819 | modes = {
2820 | -4105: "Automatic",
2821 | -4135: "Manual",
2822 | -4133: "Semiautomatic"
2823 | }
2824 |
2825 | return modes.get(mode_value, f"Unknown ({mode_value})")
2826 |
2827 | async def _generate_excel_macro(
2828 | self,
2829 | session: ExcelSession,
2830 | instruction: str,
2831 | file_path: Optional[str] = None,
2832 | template: Optional[str] = None,
2833 | test_execution: bool = False,
2834 | security_level: str = "standard"
2835 | ) -> Dict[str, Any]:
2836 | """Generate Excel VBA macro based on instructions.
2837 |
2838 | Args:
2839 | session: Excel session
2840 | instruction: Natural language instruction
2841 | file_path: Path to Excel file
2842 | template: Optional template code or path to template file
2843 | test_execution: Whether to test execute the macro
2844 | security_level: Security level for macro execution
2845 |
2846 | Returns:
2847 | Dictionary with macro generation results
2848 | """
2849 | result = {
2850 | "success": True,
2851 | "macro_generated": True,
2852 | "macro_code": "",
2853 | "execution_result": None
2854 | }
2855 |
2856 | # Check if template is a file path
2857 | template_code = ""
2858 | if template and os.path.exists(template):
2859 | try:
2860 | # Use read_file_content to load the template
2861 | template_code = await read_file_content(template)
2862 | result["template_source"] = "file"
2863 | except Exception as e:
2864 | logger.warning(f"Failed to read template file: {str(e)}")
2865 | template_code = template or ""
2866 | result["template_source"] = "text"
2867 | else:
2868 | template_code = template or ""
2869 | result["template_source"] = "text"
2870 |
2871 | # Generate the macro code based on instruction
2872 | # This would typically be done by the LLM in a real implementation
2873 | macro_code = f"' Generated VBA Macro based on instruction:\n' {instruction}\n\n"
2874 |
2875 | if template_code:
2876 | macro_code += f"' Based on template:\n{template_code}\n\n"
2877 |
2878 | # Add a simple macro as an example
2879 | macro_code += """
2880 | Sub ExampleMacro()
2881 | ' This is a placeholder for actual generated code
2882 | MsgBox "Macro executed successfully!"
2883 | End Sub
2884 | """
2885 |
2886 | result["macro_code"] = macro_code
2887 |
2888 | # Save the macro code to a separate file for reference if file_path is provided
2889 | if file_path:
2890 | macro_file_path = os.path.splitext(file_path)[0] + "_macro.bas"
2891 | try:
2892 | await write_file_content(macro_file_path, macro_code)
2893 | result["macro_file"] = macro_file_path
2894 | except Exception as e:
2895 | logger.warning(f"Failed to save macro to file: {str(e)}")
2896 |
2897 | # If file_path provided, add the macro to the workbook
2898 | if file_path and os.path.exists(file_path):
2899 | # Open the workbook and add the macro
2900 | # Implementation would depend on Excel VBA model
2901 | pass
2902 |
2903 | # Test execution if requested
2904 | if test_execution and file_path and os.path.exists(file_path):
2905 | # Execute the macro
2906 | # Implementation would depend on Excel VBA model
2907 | result["execution_result"] = "Macro executed successfully"
2908 |
2909 | return result
2910 |
2911 |
2912 | def register_excel_spreadsheet_tools(mcp_server):
2913 | """Registers Excel Spreadsheet Tools with the MCP server.
2914 |
2915 | Args:
2916 | mcp_server: MCP server instance
2917 |
2918 | Returns:
2919 | ExcelSpreadsheetTools instance
2920 | """
2921 | # Initialize the tool
2922 | excel_tools = ExcelSpreadsheetTools(mcp_server)
2923 |
2924 | # Register tools with MCP server
2925 | # These functions are now using state management
2926 | mcp_server.tool(name="excel_execute")(excel_tools.excel_execute)
2927 | mcp_server.tool(name="excel_learn_and_apply")(excel_tools.excel_learn_and_apply)
2928 | mcp_server.tool(name="excel_analyze_formulas")(excel_tools.excel_analyze_formulas)
2929 | mcp_server.tool(name="excel_generate_macro")(excel_tools.excel_generate_macro)
2930 | mcp_server.tool(name="excel_export_sheet_to_csv")(excel_tools.excel_export_sheet_to_csv)
2931 | mcp_server.tool(name="excel_import_csv_to_sheet")(excel_tools.excel_import_csv_to_sheet)
2932 |
2933 | return excel_tools
```