# Directory Structure ``` ├── .gitignore ├── CHANGELOG.md ├── demo.gif ├── Dockerfile ├── DOCUMENTATION.md ├── LICENSE ├── odoo_config.json.example ├── pyproject.toml ├── README.md ├── run_server.py ├── src │ └── odoo_mcp │ ├── __init__.py │ ├── __main__.py │ ├── extensions.py │ ├── models.py │ ├── odoo_client.py │ ├── prompts.py │ ├── resources.py │ ├── server.py │ ├── tools_accounting.py │ ├── tools_inventory.py │ ├── tools_purchase.py │ └── tools_sales.py └── validation.py ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python bytecode __pycache__/ *.py[cod] *$py.class # Distribution / packaging dist/ build/ *.egg-info/ *.egg # Virtual environments venv/ env/ ENV/ .venv-* # Logs logs/ *.log # IDE specific files .idea/ .vscode/ *.swp *.swo # Environment configuration .env .env.local .env.development.local .env.test.local .env.production.local # Odoo specific odoo_config.json # System files .DS_Store Thumbs.db # Unit test / coverage reports htmlcov/ .tox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover .hypothesis/ .github/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Odoo MCP Improved  <div align="center">  [](https://pypi.org/project/odoo-mcp-improved/) [](https://pypi.org/project/odoo-mcp-improved/) [](https://opensource.org/licenses/MIT) **Enhanced Model Context Protocol (MCP) server for Odoo ERP with advanced tools for sales, purchases, inventory and accounting** </div> --- ## 📋 Table of Contents - [Overview](#-overview) - [Features](#-features) - [Installation](#-installation) - [Configuration](#-configuration) - [Usage](#-usage) - [Tools Reference](#-tools-reference) - [Resources Reference](#-resources-reference) - [Prompts](#-prompts) - [Claude Desktop Integration](#-claude-desktop-integration) - [License](#-license) --- ## 🔍 Overview Odoo MCP Improved is a comprehensive implementation of the [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for Odoo ERP systems. It provides a bridge between large language models like Claude and your Odoo instance, enabling AI assistants to interact directly with your business data and processes. This extended version enhances the original MCP-Odoo implementation with advanced tools and resources for sales, purchases, inventory management, and accounting, making it a powerful solution for AI-assisted business operations. --- ## ✨ Features ### Core Capabilities - **Seamless Odoo Integration**: Connect directly to your Odoo instance via XML-RPC - **Comprehensive Data Access**: Query and manipulate data across all Odoo modules - **Modular Architecture**: Easily extensible with new tools and resources - **Robust Error Handling**: Clear error messages and validation for reliable operation ### Business Domain Support - **Sales Management**: Order tracking, customer insights, and performance analysis - **Purchase Management**: Supplier management, order processing, and performance metrics - **Inventory Management**: Stock monitoring, inventory adjustments, and turnover analysis - **Accounting**: Financial reporting, journal entries, and ratio analysis ### Advanced Functionality - **Analytical Tools**: Business intelligence capabilities across all domains - **Specialized Prompts**: Pre-configured prompts for common business scenarios - **Resource URIs**: Standardized access to Odoo data through URI patterns - **Performance Optimization**: Caching and efficient data retrieval --- ## 📦 Installation ### Using pip ```bash pip install odoo-mcp-improved ``` ### From Source ```bash git clone https://github.com/hachecito/odoo-mcp-improved.git cd odoo-mcp-improved pip install -e . ``` --- ## ⚙️ Configuration ### Environment Variables ```bash export ODOO_URL=https://your-odoo-instance.com export ODOO_DB=your_database export ODOO_USERNAME=your_username export ODOO_PASSWORD=your_password ``` ### Configuration File Create an `odoo_config.json` file in your working directory: ```json { "url": "https://your-odoo-instance.com", "db": "your_database", "username": "your_username", "password": "your_password" } ``` --- ## 🚀 Usage ### Running the Server ```bash # Using the module python -m odoo_mcp ``` ### Example Interactions ``` # Sales Analysis Using the Odoo MCP, analyze our sales performance for the last quarter and identify our top-selling products. # Inventory Check Check the current stock levels for product XYZ across all warehouses. # Financial Analysis Calculate our current liquidity and profitability ratios based on the latest financial data. # Customer Insights Provide insights on customer ABC's purchase history and payment patterns. ``` --- ## 🤖 Claude Desktop Integration Add the following to your `claude_desktop_config.json`: ```json { "mcpServers": { "odoo": { "command": "python", "args": ["-m", "odoo_mcp"], "env": { "ODOO_URL": "https://your-odoo-instance.com", "ODOO_DB": "your_database", "ODOO_USERNAME": "your_username", "ODOO_PASSWORD": "your_password" } } } } ``` --- ## 🛠️ Tools Reference ### Sales Tools | Tool | Description | |------|-------------| | `search_sales_orders` | Search for sales orders with advanced filtering | | `create_sales_order` | Create a new sales order | | `analyze_sales_performance` | Analyze sales performance by period, product, or customer | | `get_customer_insights` | Get detailed insights about a specific customer | ### Purchase Tools | Tool | Description | |------|-------------| | `search_purchase_orders` | Search for purchase orders with advanced filtering | | `create_purchase_order` | Create a new purchase order | | `analyze_supplier_performance` | Analyze supplier performance metrics | ### Inventory Tools | Tool | Description | |------|-------------| | `check_product_availability` | Check stock availability for products | | `create_inventory_adjustment` | Create inventory adjustment entries | | `analyze_inventory_turnover` | Calculate and analyze inventory turnover metrics | ### Accounting Tools | Tool | Description | |------|-------------| | `search_journal_entries` | Search for accounting journal entries | | `create_journal_entry` | Create a new journal entry | | `analyze_financial_ratios` | Calculate key financial ratios | --- ## 🔗 Resources Reference ### Sales Resources | URI | Description | |-----|-------------| | `odoo://sales/orders` | List sales orders | | `odoo://sales/order/{order_id}` | Get details of a specific sales order | | `odoo://sales/products` | List sellable products | | `odoo://sales/customers` | List customers | ### Purchase Resources | URI | Description | |-----|-------------| | `odoo://purchase/orders` | List purchase orders | | `odoo://purchase/order/{order_id}` | Get details of a specific purchase order | | `odoo://purchase/suppliers` | List suppliers | ### Inventory Resources | URI | Description | |-----|-------------| | `odoo://inventory/products` | List products in inventory | | `odoo://inventory/stock/{location_id}` | Get stock levels at a specific location | | `odoo://inventory/movements` | List inventory movements | ### Accounting Resources | URI | Description | |-----|-------------| | `odoo://accounting/accounts` | List accounting accounts | | `odoo://accounting/journal_entries` | List journal entries | | `odoo://accounting/reports/{report_type}` | Get financial reports | --- ## 💬 Prompts Odoo MCP Improved includes specialized prompts for different business scenarios: ### Sales Analysis Prompts - Sales trend analysis - Customer segmentation - Product performance evaluation - Sales team performance ### Inventory Management Prompts - Stock optimization - Reordering suggestions - Warehouse efficiency analysis - Product movement patterns ### Human Resources Prompts - Staff planning - Scheduling optimization - Performance evaluation - Resource allocation ### Financial Analysis Prompts - Ratio interpretation - Cash flow analysis - Budget variance analysis - Financial health assessment --- ## 📄 License This project is licensed under the MIT License - see the LICENSE file for details. This repo is extended from [mcp-odoo](https://github.com/tuanle96/mcp-odoo) - [Lê Anh Tuấn](https://github.com/tuanle96) --- <div align="center"> **Odoo MCP Improved** - Empowering AI assistants with comprehensive Odoo ERP capabilities </div> ``` -------------------------------------------------------------------------------- /src/odoo_mcp/__init__.py: -------------------------------------------------------------------------------- ```python """ Actualización del archivo __init__.py para incluir todos los módulos """ from . import odoo_client from . import server from . import models from . import prompts from . import tools_sales from . import tools_purchase from . import tools_inventory from . import tools_accounting from . import resources from . import extensions __all__ = [ 'odoo_client', 'server', 'models', 'prompts', 'tools_sales', 'tools_purchase', 'tools_inventory', 'tools_accounting', 'resources', 'extensions' ] ``` -------------------------------------------------------------------------------- /src/odoo_mcp/resources.py: -------------------------------------------------------------------------------- ```python """ Integración de recursos para MCP-Odoo """ from mcp.server.fastmcp import FastMCP def register_sales_resources(mcp: FastMCP) -> None: """Registra recursos relacionados con ventas""" pass def register_purchase_resources(mcp: FastMCP) -> None: """Registra recursos relacionados con compras""" pass def register_inventory_resources(mcp: FastMCP) -> None: """Registra recursos relacionados con inventario""" pass def register_accounting_resources(mcp: FastMCP) -> None: """Registra recursos relacionados con contabilidad""" pass def register_all_resources(mcp: FastMCP) -> None: """Registra todos los recursos disponibles""" register_sales_resources(mcp) register_purchase_resources(mcp) register_inventory_resources(mcp) register_accounting_resources(mcp) ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.10-slim WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y --no-install-recommends \ gcc \ procps \ && rm -rf /var/lib/apt/lists/* # Copy source code COPY . /app/ # Create logs directory RUN mkdir -p /app/logs && chmod 777 /app/logs # Install Python dependencies and the package RUN pip install --no-cache-dir "mcp[cli]" && \ pip install --no-cache-dir -e . # Set environment variables (can be overridden at runtime) ENV ODOO_URL="" ENV ODOO_DB="" ENV ODOO_USERNAME="" ENV ODOO_PASSWORD="" ENV ODOO_TIMEOUT="30" ENV ODOO_VERIFY_SSL="1" ENV DEBUG="0" # Make run_server.py executable RUN chmod +x run_server.py # Set stdout/stderr to unbuffered mode ENV PYTHONUNBUFFERED=1 # Run the custom MCP server script instead of the module ENTRYPOINT ["python", "run_server.py"] ``` -------------------------------------------------------------------------------- /src/odoo_mcp/extensions.py: -------------------------------------------------------------------------------- ```python """ Integración de todos los módulos en el servidor MCP principal """ from mcp.server.fastmcp import FastMCP from .prompts import register_all_prompts from .resources import register_all_resources from .tools_sales import register_sales_tools from .tools_purchase import register_purchase_tools from .tools_inventory import register_inventory_tools from .tools_accounting import register_accounting_tools def register_all_extensions(mcp: FastMCP) -> None: """ Registra todas las extensiones (prompts, recursos y herramientas) en el servidor MCP """ # Registrar prompts register_all_prompts(mcp) # Registrar recursos register_all_resources(mcp) # Registrar herramientas register_sales_tools(mcp) register_purchase_tools(mcp) register_inventory_tools(mcp) register_accounting_tools(mcp) ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # Changelog All notable changes to this project will be documented in this file. ## [0.0.3] - 2025-03-18 ### Fixed - Fixed `OdooClient` class by adding missing methods: `get_models()`, `get_model_info()`, `get_model_fields()`, `search_read()`, and `read_records()` - Ensured compatibility with different Odoo versions by using only basic fields when retrieving model information ### Added - Support for retrieving all models from an Odoo instance - Support for retrieving detailed information about specific models - Support for searching and reading records with various filtering options ## [0.0.2] - 2025-03-18 ### Fixed - Added missing dependencies in pyproject.toml: `mcp>=0.1.1`, `requests>=2.31.0`, `xmlrpc>=0.4.1` ## [0.0.1] - 2025-03-18 ### Added - Initial release with basic Odoo XML-RPC client support - MCP Server integration for Odoo - Command-line interface for quick setup and testing ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [project] name = "odoo-mcp-improved" version = "1.0.2" description = "MCP Server for Odoo Integration" readme = "README.md" requires-python = ">=3.10" license = {text = "MIT"} classifiers = [ "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Operating System :: OS Independent", ] keywords = ["odoo", "mcp", "server", "ai"] authors = [ {name = "Yhasmani Valdes", email = "[email protected]"} ] dependencies = [ "mcp>=0.1.1", "requests>=2.31.0", "pypi-xmlrpc==2020.12.3", ] [project.urls] Homepage = "https://github.com/hachecito/odoo-mcp-improved" Issues = "https://github.com/hachecito/odoo-mcp-improved/issues" [project.optional-dependencies] dev = [ "black", "isort", "mypy", "ruff", "build", "twine", ] [project.scripts] odoo-mcp-improved = "odoo_mcp.__main__:main" [tool.setuptools] package-dir = {"" = "src"} packages = ["odoo_mcp"] [tool.black] line-length = 88 target-version = ["py310"] [tool.isort] profile = "black" line_length = 88 [tool.mypy] python_version = "3.10" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true ``` -------------------------------------------------------------------------------- /src/odoo_mcp/__main__.py: -------------------------------------------------------------------------------- ```python """ Command line entry point for the Odoo MCP Server """ import sys import asyncio import traceback import os from .server import mcp def main() -> int: """ Run the MCP server """ try: print("=== ODOO MCP SERVER STARTING ===", file=sys.stderr) print(f"Python version: {sys.version}", file=sys.stderr) print("Environment variables:", file=sys.stderr) for key, value in os.environ.items(): if key.startswith("ODOO_"): if key == "ODOO_PASSWORD": print(f" {key}: ***hidden***", file=sys.stderr) else: print(f" {key}: {value}", file=sys.stderr) # Check if server instance has the run_stdio method methods = [method for method in dir(mcp) if not method.startswith('_')] print(f"Available methods on mcp object: {methods}", file=sys.stderr) print("Starting MCP server with run() method...", file=sys.stderr) sys.stderr.flush() # Ensure log information is written immediately # Use the run() method directly mcp.run() # If execution reaches here, the server exited normally print("MCP server stopped normally", file=sys.stderr) return 0 except KeyboardInterrupt: print("MCP server stopped by user", file=sys.stderr) return 0 except Exception as e: print(f"Error starting server: {e}", file=sys.stderr) print("Exception details:", file=sys.stderr) traceback.print_exc(file=sys.stderr) print("\nServer object information:", file=sys.stderr) print(f"MCP object type: {type(mcp)}", file=sys.stderr) print(f"MCP object dir: {dir(mcp)}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /run_server.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python """ Standalone script to run the Odoo MCP server Uses the same approach as in the official MCP SDK examples """ import sys import os import asyncio import anyio import logging import datetime from mcp.server.stdio import stdio_server from mcp.server.lowlevel import Server import mcp.types as types from odoo_mcp.server import mcp # FastMCP instance from our code def setup_logging(): """Set up logging to both console and file""" log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(log_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") log_file = os.path.join(log_dir, f"mcp_server_{timestamp}.log") # Configure logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # File handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) # Format for both handlers formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) # Add handlers to logger logger.addHandler(console_handler) logger.addHandler(file_handler) return logger def main() -> int: """ Run the MCP server based on the official examples """ logger = setup_logging() try: logger.info("=== ODOO MCP SERVER STARTING ===") logger.info(f"Python version: {sys.version}") logger.info("Environment variables:") for key, value in os.environ.items(): if key.startswith("ODOO_"): if key == "ODOO_PASSWORD": logger.info(f" {key}: ***hidden***") else: logger.info(f" {key}: {value}") logger.info(f"MCP object type: {type(mcp)}") # Run server in stdio mode like the official examples async def arun(): logger.info("Starting Odoo MCP server with stdio transport...") async with stdio_server() as streams: logger.info("Stdio server initialized, running MCP server...") await mcp._mcp_server.run( streams[0], streams[1], mcp._mcp_server.create_initialization_options() ) # Run server anyio.run(arun) logger.info("MCP server stopped normally") return 0 except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /src/odoo_mcp/prompts.py: -------------------------------------------------------------------------------- ```python """ Implementación de prompts para MCP-Odoo """ from mcp.server.fastmcp import FastMCP def register_sales_prompts(mcp: FastMCP) -> None: """Registra prompts relacionados con ventas""" @mcp.prompt( name="sales_analysis", description="Analiza las ventas de un período específico y proporciona insights clave" ) def sales_analysis_prompt() -> str: return """ Analiza las ventas del último {period} (ej. 'month', 'quarter', 'year') y proporciona insights sobre: - Productos más vendidos (top 5) - Clientes principales (top 5) - Tendencias de ventas (comparación con período anterior si es posible) - Rendimiento por vendedor (si aplica) - Recomendaciones accionables para mejorar las ventas. Utiliza las herramientas disponibles como 'search_sales_orders' y 'execute_method' para obtener los datos necesarios de Odoo. """ def register_purchase_prompts(mcp: FastMCP) -> None: """Registra prompts relacionados con compras""" @mcp.prompt( name="purchase_analysis", description="Analiza las órdenes de compra y el rendimiento de proveedores" ) def purchase_analysis_prompt() -> str: return """ Analiza las compras realizadas en el último {period} (ej. 'month', 'quarter', 'year') y proporciona insights sobre: - Productos más comprados (top 5) - Proveedores principales (top 5 por volumen/valor) - Tendencias de compras - Plazos de entrega promedio por proveedor - Recomendaciones para optimizar compras o negociar con proveedores. Utiliza las herramientas disponibles como 'search_purchase_orders' para obtener los datos necesarios de Odoo. """ def register_inventory_prompts(mcp: FastMCP) -> None: """Registra prompts relacionados con inventario""" @mcp.prompt( name="inventory_management", description="Analiza el estado del inventario y proporciona recomendaciones" ) def inventory_management_prompt() -> str: return """ Analiza el estado actual del inventario y proporciona información sobre: - Productos con bajo stock (por debajo del mínimo si está configurado) - Productos con exceso de stock (por encima del máximo o sin movimiento) - Valoración actual del inventario - Rotación de inventario para productos clave - Recomendaciones para ajustes, reabastecimiento o liquidación de stock. Utiliza las herramientas disponibles como 'check_product_availability' y 'analyze_inventory_turnover' para obtener los datos necesarios de Odoo. """ def register_accounting_prompts(mcp: FastMCP) -> None: """Registra prompts relacionados con contabilidad""" @mcp.prompt( name="financial_analysis", description="Realiza un análisis financiero básico" ) def financial_analysis_prompt() -> str: return """ Realiza un análisis financiero para el período {period} (ej. 'last_month', 'last_quarter', 'year_to_date') y proporciona: - Resumen del estado de resultados (ingresos, gastos, beneficio) - Resumen del balance general (activos, pasivos, patrimonio) - Ratios financieros clave (ej. liquidez, rentabilidad) - Comparación con el período anterior si es posible - Observaciones o alertas importantes. Utiliza las herramientas disponibles como 'search_journal_entries' y 'analyze_financial_ratios' para obtener los datos necesarios de Odoo. """ def register_all_prompts(mcp: FastMCP) -> None: """Registra todos los prompts disponibles""" register_sales_prompts(mcp) register_purchase_prompts(mcp) register_inventory_prompts(mcp) register_accounting_prompts(mcp) ``` -------------------------------------------------------------------------------- /src/odoo_mcp/models.py: -------------------------------------------------------------------------------- ```python """ Implementación de modelos Pydantic para MCP-Odoo """ from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel, Field # Modelos para Ventas class SalesOrderLineCreate(BaseModel): """Línea de pedido de venta para creación""" product_id: int = Field(description="ID del producto") product_uom_qty: float = Field(description="Cantidad") price_unit: Optional[float] = Field(None, description="Precio unitario (opcional, Odoo puede calcularlo)") class SalesOrderCreate(BaseModel): """Datos para crear un pedido de venta""" partner_id: int = Field(description="ID del cliente") order_lines: List[SalesOrderLineCreate] = Field(description="Líneas del pedido") date_order: Optional[str] = Field(None, description="Fecha del pedido (YYYY-MM-DD)") class SalesOrderFilter(BaseModel): """Filtros para búsqueda de pedidos de venta""" partner_id: Optional[int] = Field(None, description="Filtrar por cliente ID") date_from: Optional[str] = Field(None, description="Fecha inicial (YYYY-MM-DD)") date_to: Optional[str] = Field(None, description="Fecha final (YYYY-MM-DD)") state: Optional[str] = Field(None, description="Estado del pedido (e.g., 'sale', 'draft', 'done')") limit: Optional[int] = Field(20, description="Límite de resultados") offset: Optional[int] = Field(0, description="Offset para paginación") order: Optional[str] = Field(None, description="Criterio de ordenación (e.g., 'date_order DESC')") class SalesPerformanceInput(BaseModel): """Parámetros para análisis de rendimiento de ventas""" date_from: str = Field(description="Fecha inicial (YYYY-MM-DD)") date_to: str = Field(description="Fecha final (YYYY-MM-DD)") group_by: Optional[str] = Field(None, description="Agrupar por ('product', 'customer', 'salesperson')") # Modelos para Compras class PurchaseOrderLineCreate(BaseModel): """Línea de orden de compra para creación""" product_id: int = Field(description="ID del producto") product_qty: float = Field(description="Cantidad") price_unit: Optional[float] = Field(None, description="Precio unitario (opcional)") class PurchaseOrderCreate(BaseModel): """Datos para crear una orden de compra""" partner_id: int = Field(description="ID del proveedor") order_lines: List[PurchaseOrderLineCreate] = Field(description="Líneas de la orden") date_order: Optional[str] = Field(None, description="Fecha de la orden (YYYY-MM-DD)") class PurchaseOrderFilter(BaseModel): """Filtros para búsqueda de órdenes de compra""" partner_id: Optional[int] = Field(None, description="Filtrar por proveedor ID") date_from: Optional[str] = Field(None, description="Fecha inicial (YYYY-MM-DD)") date_to: Optional[str] = Field(None, description="Fecha final (YYYY-MM-DD)") state: Optional[str] = Field(None, description="Estado de la orden (e.g., 'purchase', 'draft', 'done')") limit: Optional[int] = Field(20, description="Límite de resultados") offset: Optional[int] = Field(0, description="Offset para paginación") order: Optional[str] = Field(None, description="Criterio de ordenación (e.g., 'date_order DESC')") class SupplierPerformanceInput(BaseModel): """Parámetros para análisis de rendimiento de proveedores""" date_from: str = Field(description="Fecha inicial (YYYY-MM-DD)") date_to: str = Field(description="Fecha final (YYYY-MM-DD)") supplier_ids: Optional[List[int]] = Field(None, description="Lista de IDs de proveedores (opcional)") # Modelos para Inventario class ProductAvailabilityInput(BaseModel): """Parámetros para verificar disponibilidad de productos""" product_ids: List[int] = Field(description="Lista de IDs de productos") location_id: Optional[int] = Field(None, description="ID de la ubicación específica (opcional)") class InventoryLineAdjustment(BaseModel): """Línea de ajuste de inventario""" product_id: int = Field(description="ID del producto") location_id: int = Field(description="ID de la ubicación") product_qty: float = Field(description="Cantidad contada real") class InventoryAdjustmentCreate(BaseModel): """Datos para crear un ajuste de inventario""" name: str = Field(description="Nombre o descripción del ajuste") adjustment_lines: List[InventoryLineAdjustment] = Field(description="Líneas de ajuste") date: Optional[str] = Field(None, description="Fecha del ajuste (YYYY-MM-DD)") class InventoryTurnoverInput(BaseModel): """Parámetros para análisis de rotación de inventario""" date_from: str = Field(description="Fecha inicial (YYYY-MM-DD)") date_to: str = Field(description="Fecha final (YYYY-MM-DD)") product_ids: Optional[List[int]] = Field(None, description="Lista de IDs de productos (opcional)") category_id: Optional[int] = Field(None, description="ID de categoría de producto (opcional)") # Modelos para Contabilidad class JournalEntryLineCreate(BaseModel): """Línea de asiento contable para creación""" account_id: int = Field(description="ID de la cuenta contable") partner_id: Optional[int] = Field(None, description="ID del partner (opcional)") name: Optional[str] = Field(None, description="Descripción de la línea") debit: float = Field(0.0, description="Importe al debe") credit: float = Field(0.0, description="Importe al haber") class JournalEntryCreate(BaseModel): """Datos para crear un asiento contable""" ref: Optional[str] = Field(None, description="Referencia del asiento") journal_id: int = Field(description="ID del diario contable") date: Optional[str] = Field(None, description="Fecha del asiento (YYYY-MM-DD)") lines: List[JournalEntryLineCreate] = Field(description="Líneas del asiento (debe y haber deben cuadrar)") class JournalEntryFilter(BaseModel): """Filtros para búsqueda de asientos contables""" date_from: Optional[str] = Field(None, description="Fecha inicial (YYYY-MM-DD)") date_to: Optional[str] = Field(None, description="Fecha final (YYYY-MM-DD)") journal_id: Optional[int] = Field(None, description="Filtrar por diario contable ID") state: Optional[str] = Field(None, description="Estado del asiento (e.g., 'posted', 'draft')") limit: Optional[int] = Field(20, description="Límite de resultados") offset: Optional[int] = Field(0, description="Offset para paginación") class FinancialRatioInput(BaseModel): """Parámetros para cálculo de ratios financieros""" date_from: str = Field(description="Fecha inicial (YYYY-MM-DD)") date_to: str = Field(description="Fecha final (YYYY-MM-DD)") ratios: List[str] = Field(description="Lista de ratios a calcular (e.g., ['liquidity', 'profitability', 'debt'])") ``` -------------------------------------------------------------------------------- /validation.py: -------------------------------------------------------------------------------- ```python """ Script de validación para probar las nuevas funcionalidades del MCP-Odoo """ import os import sys import json import time from datetime import datetime, timedelta # Añadir el directorio src al path para poder importar odoo_mcp sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from src.odoo_mcp.odoo_client import get_odoo_client, OdooClient from src.odoo_mcp.models import ( SalesOrderFilter, PurchaseOrderFilter, ProductAvailabilityInput, JournalEntryFilter, FinancialRatioInput ) class ValidationContext: """Contexto simulado para pruebas""" class RequestContext: def __init__(self, odoo_client): self.lifespan_context = type('LifespanContext', (), {'odoo': odoo_client}) def __init__(self, odoo_client): self.request_context = self.RequestContext(odoo_client) def run_validation(): """Ejecuta pruebas de validación para todas las nuevas funcionalidades""" print("Iniciando validación de MCP-Odoo mejorado...") # Obtener cliente Odoo try: odoo_client = get_odoo_client() print("✅ Conexión con Odoo establecida correctamente") except Exception as e: print(f"❌ Error al conectar con Odoo: {str(e)}") return False # Crear contexto simulado ctx = ValidationContext(odoo_client) # Validar herramientas de ventas print("\n=== Validando herramientas de ventas ===") try: from src.odoo_mcp.tools_sales import search_sales_orders # Crear filtro de prueba filter_params = SalesOrderFilter( date_from=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"), date_to=datetime.now().strftime("%Y-%m-%d"), limit=5 ) # Ejecutar búsqueda result = search_sales_orders(ctx, filter_params) if result.get("success"): print(f"✅ search_sales_orders: Encontradas {result['result']['count']} órdenes de venta") else: print(f"❌ search_sales_orders: {result.get('error', 'Error desconocido')}") except Exception as e: print(f"❌ Error al validar search_sales_orders: {str(e)}") # Validar herramientas de compras print("\n=== Validando herramientas de compras ===") try: from src.odoo_mcp.tools_purchase import search_purchase_orders # Crear filtro de prueba filter_params = PurchaseOrderFilter( date_from=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"), date_to=datetime.now().strftime("%Y-%m-%d"), limit=5 ) # Ejecutar búsqueda result = search_purchase_orders(ctx, filter_params) if result.get("success"): print(f"✅ search_purchase_orders: Encontradas {result['result']['count']} órdenes de compra") else: print(f"❌ search_purchase_orders: {result.get('error', 'Error desconocido')}") except Exception as e: print(f"❌ Error al validar search_purchase_orders: {str(e)}") # Validar herramientas de inventario print("\n=== Validando herramientas de inventario ===") try: from src.odoo_mcp.tools_inventory import check_product_availability # Obtener algunos IDs de productos products = odoo_client.search_read( "product.product", [("type", "=", "product")], fields=["id"], limit=3 ) if products: product_ids = [p["id"] for p in products] # Crear parámetros de prueba params = ProductAvailabilityInput( product_ids=product_ids ) # Ejecutar verificación result = check_product_availability(ctx, params) if result.get("success"): print(f"✅ check_product_availability: Verificados {len(result['result']['products'])} productos") else: print(f"❌ check_product_availability: {result.get('error', 'Error desconocido')}") else: print("⚠️ No se encontraron productos para validar check_product_availability") except Exception as e: print(f"❌ Error al validar check_product_availability: {str(e)}") # Validar herramientas de contabilidad print("\n=== Validando herramientas de contabilidad ===") try: from src.odoo_mcp.tools_accounting import search_journal_entries # Crear filtro de prueba filter_params = JournalEntryFilter( date_from=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"), date_to=datetime.now().strftime("%Y-%m-%d"), limit=5 ) # Ejecutar búsqueda result = search_journal_entries(ctx, filter_params) if result.get("success"): print(f"✅ search_journal_entries: Encontrados {result['result']['count']} asientos contables") else: print(f"❌ search_journal_entries: {result.get('error', 'Error desconocido')}") except Exception as e: print(f"❌ Error al validar search_journal_entries: {str(e)}") try: from src.odoo_mcp.tools_accounting import analyze_financial_ratios # Crear parámetros de prueba params = FinancialRatioInput( date_from=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"), date_to=datetime.now().strftime("%Y-%m-%d"), ratios=["liquidity", "profitability", "debt"] ) # Ejecutar análisis result = analyze_financial_ratios(ctx, params) if result.get("success"): print(f"✅ analyze_financial_ratios: Análisis completado con {len(result['result']['ratios'])} ratios") else: print(f"❌ analyze_financial_ratios: {result.get('error', 'Error desconocido')}") except Exception as e: print(f"❌ Error al validar analyze_financial_ratios: {str(e)}") # Validar recursos print("\n=== Validando recursos ===") try: from src.odoo_mcp.resources import register_sales_resources print("✅ Recursos de ventas importados correctamente") except Exception as e: print(f"❌ Error al importar recursos de ventas: {str(e)}") try: from src.odoo_mcp.resources import register_purchase_resources print("✅ Recursos de compras importados correctamente") except Exception as e: print(f"❌ Error al importar recursos de compras: {str(e)}") try: from src.odoo_mcp.resources import register_inventory_resources print("✅ Recursos de inventario importados correctamente") except Exception as e: print(f"❌ Error al importar recursos de inventario: {str(e)}") try: from src.odoo_mcp.resources import register_accounting_resources print("✅ Recursos de contabilidad importados correctamente") except Exception as e: print(f"❌ Error al importar recursos de contabilidad: {str(e)}") # Validar prompts print("\n=== Validando prompts ===") try: from src.odoo_mcp.prompts import register_all_prompts print("✅ Prompts importados correctamente") except Exception as e: print(f"❌ Error al importar prompts: {str(e)}") # Validar integración completa print("\n=== Validando integración completa ===") try: from src.odoo_mcp.extensions import register_all_extensions print("✅ Módulo de extensiones importado correctamente") except Exception as e: print(f"❌ Error al importar módulo de extensiones: {str(e)}") try: from src.odoo_mcp.server import mcp print("✅ Servidor MCP importado correctamente") except Exception as e: print(f"❌ Error al importar servidor MCP: {str(e)}") print("\nValidación completada.") return True if __name__ == "__main__": run_validation() ``` -------------------------------------------------------------------------------- /DOCUMENTATION.md: -------------------------------------------------------------------------------- ```markdown # Documentación MCP-Odoo Mejorado ## Introducción Este documento describe las mejoras implementadas en el MCP (Model Context Protocol) para Odoo, que amplía significativamente las capacidades del repositorio original añadiendo nuevas herramientas, recursos y prompts para las áreas de ventas, compras, inventario y contabilidad. El objetivo de estas mejoras es proporcionar una integración más completa y funcional entre Odoo ERP y los modelos de lenguaje como Claude, permitiendo interacciones más ricas y útiles en contextos empresariales. ## Arquitectura La arquitectura del MCP-Odoo mejorado sigue un diseño modular que facilita la extensión y el mantenimiento: ``` mcp-odoo/ ├── src/ │ └── odoo_mcp/ │ ├── __init__.py # Inicialización del paquete │ ├── server.py # Servidor MCP principal │ ├── odoo_client.py # Cliente para conexión con Odoo │ ├── models.py # Modelos Pydantic para validación │ ├── extensions.py # Registro centralizado de extensiones │ ├── prompts.py # Prompts para análisis y asistencia │ ├── resources.py # Recursos MCP (URIs) │ ├── tools_sales.py # Herramientas para ventas │ ├── tools_purchase.py # Herramientas para compras │ ├── tools_inventory.py # Herramientas para inventario │ └── tools_accounting.py # Herramientas para contabilidad ├── pyproject.toml # Configuración del paquete ├── Dockerfile # Configuración para Docker └── validation.py # Script de validación ``` ## Nuevas Funcionalidades ### 1. Herramientas (Tools) Las herramientas permiten a los modelos de lenguaje realizar acciones específicas en Odoo: #### Ventas - `search_sales_orders`: Busca órdenes de venta con filtros avanzados - `create_sales_order`: Crea una nueva orden de venta - `analyze_sales_performance`: Analiza el rendimiento de ventas por período, producto o cliente - `get_customer_insights`: Obtiene información detallada sobre un cliente específico #### Compras - `search_purchase_orders`: Busca órdenes de compra con filtros avanzados - `create_purchase_order`: Crea una nueva orden de compra - `analyze_supplier_performance`: Analiza el rendimiento de proveedores #### Inventario - `check_product_availability`: Verifica la disponibilidad de stock para productos - `create_inventory_adjustment`: Crea un ajuste de inventario - `analyze_inventory_turnover`: Calcula y analiza la rotación de inventario #### Contabilidad - `search_journal_entries`: Busca asientos contables con filtros - `create_journal_entry`: Crea un nuevo asiento contable - `analyze_financial_ratios`: Calcula ratios financieros clave ### 2. Recursos (Resources) Los recursos proporcionan acceso a datos de Odoo mediante URIs: #### Ventas - `odoo://sales/orders`: Lista órdenes de venta - `odoo://sales/order/{order_id}`: Obtiene detalles de una orden específica - `odoo://sales/products`: Lista productos vendibles - `odoo://sales/customers`: Lista clientes #### Compras - `odoo://purchase/orders`: Lista órdenes de compra - `odoo://purchase/order/{order_id}`: Obtiene detalles de una orden específica - `odoo://purchase/suppliers`: Lista proveedores #### Inventario - `odoo://inventory/products`: Lista productos en inventario - `odoo://inventory/stock/{location_id}`: Obtiene niveles de stock en una ubicación - `odoo://inventory/movements`: Lista movimientos de inventario #### Contabilidad - `odoo://accounting/accounts`: Lista cuentas contables - `odoo://accounting/journal_entries`: Lista asientos contables - `odoo://accounting/reports/{report_type}`: Obtiene informes financieros ### 3. Prompts Se han añadido prompts especializados para diferentes áreas: - **Análisis de ventas**: Prompts para analizar tendencias, rendimiento y oportunidades - **Gestión de inventario**: Prompts para optimización de stock y planificación - **Planificación de recursos humanos**: Prompts para gestión de personal y horarios - **Análisis financiero**: Prompts para interpretación de datos contables y financieros ## Guía de Uso ### Instalación #### Opción 1: Usando el paquete de Python ```bash # Clonar el repositorio git clone https://github.com/tuanle96/mcp-odoo.git cd mcp-odoo # Instalar el paquete pip install -e . # Ejecutar como módulo python -m src.odoo_mcp ``` #### Opción 2: Usando Docker ```bash # Construir la imagen docker build -t mcp/odoo:latest -f Dockerfile . # Ejecutar el contenedor docker run -i --rm \ -e ODOO_URL=https://tu-instancia-odoo.com \ -e ODOO_DB=nombre-de-tu-base-de-datos \ -e ODOO_USERNAME=tu-usuario \ -e ODOO_PASSWORD=tu-contraseña \ mcp/odoo ``` ### Configuración El MCP-Odoo puede configurarse mediante variables de entorno o un archivo de configuración: #### Variables de entorno ```bash export ODOO_URL=https://tu-instancia-odoo.com export ODOO_DB=nombre-de-tu-base-de-datos export ODOO_USERNAME=tu-usuario export ODOO_PASSWORD=tu-contraseña ``` #### Archivo de configuración Crear un archivo `odoo_config.json` en el directorio de trabajo: ```json { "url": "https://tu-instancia-odoo.com", "db": "nombre-de-tu-base-de-datos", "username": "tu-usuario", "password": "tu-contraseña" } ``` ### Integración con Claude Desktop Para usar el MCP-Odoo con Claude Desktop, añade la siguiente configuración a tu `claude_desktop_config.json`: ```json { "mcpServers": { "odoo": { "command": "python", "args": ["-m", "src.odoo_mcp"], "env": { "ODOO_URL": "https://tu-instancia-odoo.com", "ODOO_DB": "nombre-de-tu-base-de-datos", "ODOO_USERNAME": "tu-usuario", "ODOO_PASSWORD": "tu-contraseña" } } } } ``` Para usar la versión Docker: ```json { "mcpServers": { "odoo": { "command": "docker", "args": [ "run", "-i", "--rm", "-e", "ODOO_URL", "-e", "ODOO_DB", "-e", "ODOO_USERNAME", "-e", "ODOO_PASSWORD", "mcp/odoo" ], "env": { "ODOO_URL": "https://tu-instancia-odoo.com", "ODOO_DB": "nombre-de-tu-base-de-datos", "ODOO_USERNAME": "tu-usuario", "ODOO_PASSWORD": "tu-contraseña" } } } } ``` ## Ejemplos de Uso ### Ejemplo 1: Análisis de ventas ``` Usando el MCP de Odoo, analiza las ventas del último trimestre y muestra los productos más vendidos. ``` ### Ejemplo 2: Verificación de inventario ``` Verifica la disponibilidad de stock para los productos X, Y y Z en el almacén principal. ``` ### Ejemplo 3: Análisis financiero ``` Calcula los ratios de liquidez y rentabilidad para el año fiscal actual y compáralos con el año anterior. ``` ### Ejemplo 4: Creación de órdenes de compra ``` Crea una orden de compra para el proveedor ABC con los siguientes productos: 10 unidades del producto X y 5 unidades del producto Y. ``` ## Extensión del Sistema El sistema está diseñado para ser fácilmente extensible. Para añadir nuevas funcionalidades: 1. **Nuevas herramientas**: Crea un nuevo archivo `tools_*.py` siguiendo el patrón existente 2. **Nuevos recursos**: Añade nuevos recursos en `resources.py` 3. **Nuevos prompts**: Añade nuevos prompts en `prompts.py` 4. **Registro de extensiones**: Actualiza `extensions.py` para registrar las nuevas funcionalidades ## Solución de Problemas ### Problemas de conexión Si experimentas problemas de conexión con Odoo: 1. Verifica las credenciales en las variables de entorno o archivo de configuración 2. Asegúrate de que la URL de Odoo es accesible desde donde ejecutas el MCP 3. Verifica que el usuario tiene permisos suficientes en Odoo ### Errores en las herramientas Si una herramienta devuelve un error: 1. Revisa los parámetros proporcionados 2. Verifica que los IDs de registros existen en Odoo 3. Comprueba los permisos del usuario para la operación específica ## Validación Se incluye un script de validación (`validation.py`) que puede ejecutarse para verificar que todas las funcionalidades están correctamente implementadas y son compatibles con tu instancia de Odoo: ```bash python validation.py ``` ## Contribución Las contribuciones son bienvenidas. Para contribuir: 1. Haz un fork del repositorio 2. Crea una rama para tu funcionalidad (`git checkout -b feature/nueva-funcionalidad`) 3. Realiza tus cambios y añade pruebas 4. Envía un pull request ## Licencia Este proyecto se distribuye bajo la misma licencia que el repositorio original. ``` -------------------------------------------------------------------------------- /src/odoo_mcp/tools_purchase.py: -------------------------------------------------------------------------------- ```python """ Implementación de herramientas (tools) para compras en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP, Context from .models import ( PurchaseOrderFilter, PurchaseOrderCreate, SupplierPerformanceInput ) def register_purchase_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con compras""" @mcp.tool(description="Busca órdenes de compra con filtros avanzados") def search_purchase_orders( ctx: Context, filters: PurchaseOrderFilter ) -> Dict[str, Any]: """ Busca órdenes de compra según los filtros especificados Args: filters: Filtros para la búsqueda de órdenes Returns: Diccionario con resultados de la búsqueda """ odoo = ctx.request_context.lifespan_context.odoo try: # Construir dominio de búsqueda domain = [] if filters.partner_id: domain.append(("partner_id", "=", filters.partner_id)) if filters.date_from: try: datetime.strptime(filters.date_from, "%Y-%m-%d") domain.append(("date_order", ">=", filters.date_from)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_from}. Use YYYY-MM-DD."} if filters.date_to: try: datetime.strptime(filters.date_to, "%Y-%m-%d") domain.append(("date_order", "<=", filters.date_to)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_to}. Use YYYY-MM-DD."} if filters.state: domain.append(("state", "=", filters.state)) # Campos a recuperar fields = [ "name", "partner_id", "date_order", "amount_total", "state", "invoice_status", "user_id", "order_line", "date_planned", "date_approve" ] # Ejecutar búsqueda orders = odoo.search_read( "purchase.order", domain, fields=fields, limit=filters.limit, offset=filters.offset, order=filters.order ) # Obtener el conteo total sin límite para paginación total_count = odoo.execute_method("purchase.order", "search_count", domain) return { "success": True, "result": { "count": len(orders), "total_count": total_count, "orders": orders } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crear una nueva orden de compra") def create_purchase_order( ctx: Context, order: PurchaseOrderCreate ) -> Dict[str, Any]: """ Crea una nueva orden de compra Args: order: Datos de la orden a crear Returns: Respuesta con el resultado de la operación """ odoo = ctx.request_context.lifespan_context.odoo try: # Preparar valores para la orden order_vals = { "partner_id": order.partner_id, "order_line": [] } if order.date_order: try: datetime.strptime(order.date_order, "%Y-%m-%d") order_vals["date_order"] = order.date_order except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {order.date_order}. Use YYYY-MM-DD."} # Preparar líneas de orden for line in order.order_lines: line_vals = [ 0, 0, { "product_id": line.product_id, "product_qty": line.product_qty } ] if line.price_unit is not None: line_vals[2]["price_unit"] = line.price_unit order_vals["order_line"].append(line_vals) # Crear orden order_id = odoo.execute_method("purchase.order", "create", order_vals) # Obtener información de la orden creada order_info = odoo.execute_method("purchase.order", "read", [order_id], ["name"])[0] return { "success": True, "result": { "order_id": order_id, "order_name": order_info["name"] } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Analiza el rendimiento de los proveedores") def analyze_supplier_performance( ctx: Context, params: SupplierPerformanceInput ) -> Dict[str, Any]: """ Analiza el rendimiento de los proveedores en un período específico Args: params: Parámetros para el análisis Returns: Diccionario con resultados del análisis """ odoo = ctx.request_context.lifespan_context.odoo try: # Validar fechas try: datetime.strptime(params.date_from, "%Y-%m-%d") datetime.strptime(params.date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para órdenes confirmadas domain = [ ("date_order", ">=", params.date_from), ("date_order", "<=", params.date_to), ("state", "in", ["purchase", "done"]) ] # Filtrar por proveedores específicos si se proporcionan if params.supplier_ids: domain.append(("partner_id", "in", params.supplier_ids)) # Obtener datos de compras purchase_data = odoo.search_read( "purchase.order", domain, fields=[ "name", "partner_id", "date_order", "amount_total", "date_approve", "date_planned", "effective_date" ] ) # Agrupar por proveedor supplier_data = {} for order in purchase_data: supplier_id = order["partner_id"][0] if order["partner_id"] else 0 supplier_name = order["partner_id"][1] if order["partner_id"] else "Desconocido" if supplier_id not in supplier_data: supplier_data[supplier_id] = { "name": supplier_name, "order_count": 0, "total_amount": 0, "orders": [], "on_time_delivery_count": 0, "late_delivery_count": 0, "avg_delay_days": 0 } supplier_data[supplier_id]["order_count"] += 1 supplier_data[supplier_id]["total_amount"] += order["amount_total"] # Calcular métricas de entrega a tiempo if order.get("effective_date") and order.get("date_planned"): effective_date = datetime.strptime(order["effective_date"].split(" ")[0], "%Y-%m-%d") planned_date = datetime.strptime(order["date_planned"].split(" ")[0], "%Y-%m-%d") delay_days = (effective_date - planned_date).days order_info = { "id": order["id"], "name": order["name"], "amount": order["amount_total"], "date_order": order["date_order"], "planned_date": order["date_planned"], "effective_date": order["effective_date"], "delay_days": delay_days } supplier_data[supplier_id]["orders"].append(order_info) if delay_days <= 0: supplier_data[supplier_id]["on_time_delivery_count"] += 1 else: supplier_data[supplier_id]["late_delivery_count"] += 1 # Calcular métricas adicionales for supplier_id, data in supplier_data.items(): # Calcular promedio de días de retraso delay_days = [order["delay_days"] for order in data["orders"] if "delay_days" in order] if delay_days: data["avg_delay_days"] = sum(delay_days) / len(delay_days) # Calcular porcentaje de entregas a tiempo total_deliveries = data["on_time_delivery_count"] + data["late_delivery_count"] if total_deliveries > 0: data["on_time_delivery_rate"] = (data["on_time_delivery_count"] / total_deliveries) * 100 else: data["on_time_delivery_rate"] = 0 # Eliminar lista detallada de órdenes para reducir tamaño de respuesta data.pop("orders", None) # Ordenar proveedores por monto total top_suppliers = sorted( supplier_data.items(), key=lambda x: x[1]["total_amount"], reverse=True ) # Preparar resultado result = { "period": { "from": params.date_from, "to": params.date_to }, "summary": { "supplier_count": len(supplier_data), "order_count": len(purchase_data), "total_amount": sum(order["amount_total"] for order in purchase_data) }, "suppliers": [ {"id": k, **v} for k, v in top_suppliers ] } return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /src/odoo_mcp/tools_sales.py: -------------------------------------------------------------------------------- ```python """ Implementación de herramientas (tools) para ventas en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP, Context from .models import ( SalesOrderFilter, SalesOrderCreate, SalesPerformanceInput ) def register_sales_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con ventas""" @mcp.tool(description="Busca pedidos de venta con filtros avanzados") def search_sales_orders( ctx: Context, filters: SalesOrderFilter ) -> Dict[str, Any]: """ Busca pedidos de venta según los filtros especificados Args: filters: Filtros para la búsqueda de pedidos Returns: Diccionario con resultados de la búsqueda """ odoo = ctx.request_context.lifespan_context.odoo try: # Construir dominio de búsqueda domain = [] if filters.partner_id: domain.append(("partner_id", "=", filters.partner_id)) if filters.date_from: try: datetime.strptime(filters.date_from, "%Y-%m-%d") domain.append(("date_order", ">=", filters.date_from)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_from}. Use YYYY-MM-DD."} if filters.date_to: try: datetime.strptime(filters.date_to, "%Y-%m-%d") domain.append(("date_order", "<=", filters.date_to)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_to}. Use YYYY-MM-DD."} if filters.state: domain.append(("state", "=", filters.state)) # Campos a recuperar fields = [ "name", "partner_id", "date_order", "amount_total", "state", "invoice_status", "user_id", "order_line" ] # Ejecutar búsqueda orders = odoo.search_read( "sale.order", domain, fields=fields, limit=filters.limit, offset=filters.offset, order=filters.order ) # Obtener el conteo total sin límite para paginación total_count = odoo.execute_method("sale.order", "search_count", domain) return { "success": True, "result": { "count": len(orders), "total_count": total_count, "orders": orders } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crear un nuevo pedido de venta") def create_sales_order( ctx: Context, order: SalesOrderCreate ) -> Dict[str, Any]: """ Crea un nuevo pedido de venta Args: order: Datos del pedido a crear Returns: Respuesta con el resultado de la operación """ odoo = ctx.request_context.lifespan_context.odoo try: # Preparar valores para el pedido order_vals = { "partner_id": order.partner_id, "order_line": [] } if order.date_order: try: datetime.strptime(order.date_order, "%Y-%m-%d") order_vals["date_order"] = order.date_order except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {order.date_order}. Use YYYY-MM-DD."} # Preparar líneas de pedido for line in order.order_lines: line_vals = [ 0, 0, { "product_id": line.product_id, "product_uom_qty": line.product_uom_qty } ] if line.price_unit is not None: line_vals[2]["price_unit"] = line.price_unit order_vals["order_line"].append(line_vals) # Crear pedido order_id = odoo.execute_method("sale.order", "create", order_vals) # Obtener información del pedido creado order_info = odoo.execute_method("sale.order", "read", [order_id], ["name"])[0] return { "success": True, "result": { "order_id": order_id, "order_name": order_info["name"] } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Analiza el rendimiento de ventas en un período") def analyze_sales_performance( ctx: Context, params: SalesPerformanceInput ) -> Dict[str, Any]: """ Analiza el rendimiento de ventas en un período específico Args: params: Parámetros para el análisis Returns: Diccionario con resultados del análisis """ odoo = ctx.request_context.lifespan_context.odoo try: # Validar fechas try: datetime.strptime(params.date_from, "%Y-%m-%d") datetime.strptime(params.date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para pedidos confirmados domain = [ ("date_order", ">=", params.date_from), ("date_order", "<=", params.date_to), ("state", "in", ["sale", "done"]) ] # Obtener datos de ventas sales_data = odoo.search_read( "sale.order", domain, fields=["name", "partner_id", "date_order", "amount_total", "user_id"] ) # Calcular período anterior para comparación date_from = datetime.strptime(params.date_from, "%Y-%m-%d") date_to = datetime.strptime(params.date_to, "%Y-%m-%d") delta = date_to - date_from prev_date_to = date_from - timedelta(days=1) prev_date_from = prev_date_to - delta prev_domain = [ ("date_order", ">=", prev_date_from.strftime("%Y-%m-%d")), ("date_order", "<=", prev_date_to.strftime("%Y-%m-%d")), ("state", "in", ["sale", "done"]) ] prev_sales_data = odoo.search_read( "sale.order", prev_domain, fields=["amount_total"] ) # Calcular totales current_total = sum(order["amount_total"] for order in sales_data) previous_total = sum(order["amount_total"] for order in prev_sales_data) # Calcular cambio porcentual percent_change = 0 if previous_total > 0: percent_change = ((current_total - previous_total) / previous_total) * 100 # Agrupar según el parámetro group_by grouped_data = {} if params.group_by: if params.group_by == "product": # Obtener líneas de pedido para analizar productos order_ids = [order["id"] for order in sales_data] if order_ids: order_lines = odoo.search_read( "sale.order.line", [("order_id", "in", order_ids)], fields=["product_id", "product_uom_qty", "price_subtotal"] ) # Agrupar por producto product_data = {} for line in order_lines: product_id = line["product_id"][0] if line["product_id"] else 0 product_name = line["product_id"][1] if line["product_id"] else "Desconocido" if product_id not in product_data: product_data[product_id] = { "name": product_name, "quantity": 0, "amount": 0 } product_data[product_id]["quantity"] += line["product_uom_qty"] product_data[product_id]["amount"] += line["price_subtotal"] # Ordenar por monto top_products = sorted( product_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["products"] = [ {"id": k, **v} for k, v in top_products[:10] ] elif params.group_by == "customer": # Agrupar por cliente customer_data = {} for order in sales_data: customer_id = order["partner_id"][0] if order["partner_id"] else 0 customer_name = order["partner_id"][1] if order["partner_id"] else "Desconocido" if customer_id not in customer_data: customer_data[customer_id] = { "name": customer_name, "order_count": 0, "amount": 0 } customer_data[customer_id]["order_count"] += 1 customer_data[customer_id]["amount"] += order["amount_total"] # Ordenar por monto top_customers = sorted( customer_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["customers"] = [ {"id": k, **v} for k, v in top_customers[:10] ] elif params.group_by == "salesperson": # Agrupar por vendedor salesperson_data = {} for order in sales_data: salesperson_id = order["user_id"][0] if order["user_id"] else 0 salesperson_name = order["user_id"][1] if order["user_id"] else "Desconocido" if salesperson_id not in salesperson_data: salesperson_data[salesperson_id] = { "name": salesperson_name, "order_count": 0, "amount": 0 } salesperson_data[salesperson_id]["order_count"] += 1 salesperson_data[salesperson_id]["amount"] += order["amount_total"] # Ordenar por monto top_salespersons = sorted( salesperson_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["salespersons"] = [ {"id": k, **v} for k, v in top_salespersons ] # Preparar resultado result = { "period": { "from": params.date_from, "to": params.date_to }, "summary": { "order_count": len(sales_data), "total_amount": current_total, "previous_period": { "from": prev_date_from.strftime("%Y-%m-%d"), "to": prev_date_to.strftime("%Y-%m-%d"), "order_count": len(prev_sales_data), "total_amount": previous_total }, "percent_change": round(percent_change, 2) } } # Añadir datos agrupados si existen if grouped_data: result["grouped_data"] = grouped_data return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /src/odoo_mcp/odoo_client.py: -------------------------------------------------------------------------------- ```python """ Odoo XML-RPC client for MCP server integration """ import json import os import re import socket import urllib.parse import http.client import xmlrpc.client class OdooClient: """Client for interacting with Odoo via XML-RPC""" def __init__( self, url, db, username, password, timeout=10, verify_ssl=True, ): """ Initialize the Odoo client with connection parameters Args: url: Odoo server URL (with or without protocol) db: Database name username: Login username password: Login password timeout: Connection timeout in seconds verify_ssl: Whether to verify SSL certificates """ # Ensure URL has a protocol if not re.match(r"^https?://", url): url = f"http://{url}" # Remove trailing slash from URL if present url = url.rstrip("/") self.url = url self.db = db self.username = username self.password = password self.uid = None # Set timeout and SSL verification self.timeout = timeout self.verify_ssl = verify_ssl # Setup connections self._common = None self._models = None # Parse hostname for logging parsed_url = urllib.parse.urlparse(self.url) self.hostname = parsed_url.netloc # Connect self._connect() def _connect(self): """Initialize the XML-RPC connection and authenticate""" # Tạo transport với timeout phù hợp is_https = self.url.startswith("https://") transport = RedirectTransport( timeout=self.timeout, use_https=is_https, verify_ssl=self.verify_ssl ) print(f"Connecting to Odoo at: {self.url}", file=os.sys.stderr) print(f" Hostname: {self.hostname}", file=os.sys.stderr) print( f" Timeout: {self.timeout}s, Verify SSL: {self.verify_ssl}", file=os.sys.stderr, ) # Thiết lập endpoints self._common = xmlrpc.client.ServerProxy( f"{self.url}/xmlrpc/2/common", transport=transport ) self._models = xmlrpc.client.ServerProxy( f"{self.url}/xmlrpc/2/object", transport=transport ) # Xác thực và lấy user ID print( f"Authenticating with database: {self.db}, username: {self.username}", file=os.sys.stderr, ) try: print( f"Making request to {self.hostname}/xmlrpc/2/common (attempt 1)", file=os.sys.stderr, ) self.uid = self._common.authenticate( self.db, self.username, self.password, {} ) if not self.uid: raise ValueError("Authentication failed: Invalid username or password") except (socket.error, socket.timeout, ConnectionError, TimeoutError) as e: print(f"Connection error: {str(e)}", file=os.sys.stderr) raise ConnectionError(f"Failed to connect to Odoo server: {str(e)}") except Exception as e: print(f"Authentication error: {str(e)}", file=os.sys.stderr) raise ValueError(f"Failed to authenticate with Odoo: {str(e)}") def _execute(self, model, method, *args, **kwargs): """Execute a method on an Odoo model""" return self._models.execute_kw( self.db, self.uid, self.password, model, method, args, kwargs ) def execute_method(self, model, method, *args, **kwargs): """ Execute an arbitrary method on a model Args: model: The model name (e.g., 'res.partner') method: Method name to execute *args: Positional arguments to pass to the method **kwargs: Keyword arguments to pass to the method Returns: Result of the method execution """ return self._execute(model, method, *args, **kwargs) def get_models(self): """ Get a list of all available models in the system Returns: List of model names Examples: >>> client = OdooClient(url, db, username, password) >>> models = client.get_models() >>> print(len(models)) 125 >>> print(models[:5]) ['res.partner', 'res.users', 'res.company', 'res.groups', 'ir.model'] """ try: # First search for model IDs model_ids = self._execute("ir.model", "search", []) if not model_ids: return { "model_names": [], "models_details": {}, "error": "No models found", } # Then read the model data with only the most basic fields # that are guaranteed to exist in all Odoo versions result = self._execute("ir.model", "read", model_ids, ["model", "name"]) # Extract and sort model names alphabetically models = sorted([rec["model"] for rec in result]) # For more detailed information, include the full records models_info = { "model_names": models, "models_details": { rec["model"]: {"name": rec.get("name", "")} for rec in result }, } return models_info except Exception as e: print(f"Error retrieving models: {str(e)}", file=os.sys.stderr) return {"model_names": [], "models_details": {}, "error": str(e)} def get_model_info(self, model_name): """ Get information about a specific model Args: model_name: Name of the model (e.g., 'res.partner') Returns: Dictionary with model information Examples: >>> client = OdooClient(url, db, username, password) >>> info = client.get_model_info('res.partner') >>> print(info['name']) 'Contact' """ try: result = self._execute( "ir.model", "search_read", [("model", "=", model_name)], {"fields": ["name", "model"]}, ) if not result: return {"error": f"Model {model_name} not found"} return result[0] except Exception as e: print(f"Error retrieving model info: {str(e)}", file=os.sys.stderr) return {"error": str(e)} def get_model_fields(self, model_name): """ Get field definitions for a specific model Args: model_name: Name of the model (e.g., 'res.partner') Returns: Dictionary mapping field names to their definitions Examples: >>> client = OdooClient(url, db, username, password) >>> fields = client.get_model_fields('res.partner') >>> print(fields['name']['type']) 'char' """ try: fields = self._execute(model_name, "fields_get") return fields except Exception as e: print(f"Error retrieving fields: {str(e)}", file=os.sys.stderr) return {"error": str(e)} def search_read( self, model_name, domain, fields=None, offset=None, limit=None, order=None ): """ Search for records and read their data in a single call Args: model_name: Name of the model (e.g., 'res.partner') domain: Search domain (e.g., [('is_company', '=', True)]) fields: List of field names to return (None for all) offset: Number of records to skip limit: Maximum number of records to return order: Sorting criteria (e.g., 'name ASC, id DESC') Returns: List of dictionaries with the matching records Examples: >>> client = OdooClient(url, db, username, password) >>> records = client.search_read('res.partner', [('is_company', '=', True)], limit=5) >>> print(len(records)) 5 """ try: kwargs = {} if offset: kwargs["offset"] = offset if fields is not None: kwargs["fields"] = fields if limit is not None: kwargs["limit"] = limit if order is not None: kwargs["order"] = order result = self._execute(model_name, "search_read", domain, kwargs) return result except Exception as e: print(f"Error in search_read: {str(e)}", file=os.sys.stderr) return [] def read_records(self, model_name, ids, fields=None): """ Read data of records by IDs Args: model_name: Name of the model (e.g., 'res.partner') ids: List of record IDs to read fields: List of field names to return (None for all) Returns: List of dictionaries with the requested records Examples: >>> client = OdooClient(url, db, username, password) >>> records = client.read_records('res.partner', [1]) >>> print(records[0]['name']) 'YourCompany' """ try: kwargs = {} if fields is not None: kwargs["fields"] = fields result = self._execute(model_name, "read", ids, kwargs) return result except Exception as e: print(f"Error reading records: {str(e)}", file=os.sys.stderr) return [] class RedirectTransport(xmlrpc.client.Transport): """Transport that adds timeout, SSL verification, and redirect handling""" def __init__( self, timeout=10, use_https=True, verify_ssl=True, max_redirects=5, proxy=None ): super().__init__() self.timeout = timeout self.use_https = use_https self.verify_ssl = verify_ssl self.max_redirects = max_redirects self.proxy = proxy or os.environ.get("HTTP_PROXY") if use_https and not verify_ssl: import ssl self.context = ssl._create_unverified_context() def make_connection(self, host): if self.proxy: proxy_url = urllib.parse.urlparse(self.proxy) connection = http.client.HTTPConnection( proxy_url.hostname, proxy_url.port, timeout=self.timeout ) connection.set_tunnel(host) else: if self.use_https and not self.verify_ssl: connection = http.client.HTTPSConnection( host, timeout=self.timeout, context=self.context ) else: if self.use_https: connection = http.client.HTTPSConnection(host, timeout=self.timeout) else: connection = http.client.HTTPConnection(host, timeout=self.timeout) return connection def request(self, host, handler, request_body, verbose): """Send HTTP request with retry for redirects""" redirects = 0 while redirects < self.max_redirects: try: print(f"Making request to {host}{handler}", file=os.sys.stderr) return super().request(host, handler, request_body, verbose) except xmlrpc.client.ProtocolError as err: if err.errcode in (301, 302, 303, 307, 308) and err.headers.get( "location" ): redirects += 1 location = err.headers.get("location") parsed = urllib.parse.urlparse(location) if parsed.netloc: host = parsed.netloc handler = parsed.path if parsed.query: handler += "?" + parsed.query else: raise except Exception as e: print(f"Error during request: {str(e)}", file=os.sys.stderr) raise raise xmlrpc.client.ProtocolError(host + handler, 310, "Too many redirects", {}) def load_config(): """ Load Odoo configuration from environment variables or config file Returns: dict: Configuration dictionary with url, db, username, password """ # Define config file paths to check config_paths = [ "./odoo_config.json", os.path.expanduser("~/.config/odoo/config.json"), os.path.expanduser("~/.odoo_config.json"), ] # Try environment variables first if all( var in os.environ for var in ["ODOO_URL", "ODOO_DB", "ODOO_USERNAME", "ODOO_PASSWORD"] ): return { "url": os.environ["ODOO_URL"], "db": os.environ["ODOO_DB"], "username": os.environ["ODOO_USERNAME"], "password": os.environ["ODOO_PASSWORD"], } # Try to load from file for path in config_paths: expanded_path = os.path.expanduser(path) if os.path.exists(expanded_path): with open(expanded_path, "r") as f: return json.load(f) raise FileNotFoundError( "No Odoo configuration found. Please create an odoo_config.json file or set environment variables." ) def get_odoo_client(): """ Get a configured Odoo client instance Returns: OdooClient: A configured Odoo client instance """ config = load_config() # Get additional options from environment variables timeout = int( os.environ.get("ODOO_TIMEOUT", "30") ) # Increase default timeout to 30 seconds verify_ssl = os.environ.get("ODOO_VERIFY_SSL", "1").lower() in ["1", "true", "yes"] # Print detailed configuration print("Odoo client configuration:", file=os.sys.stderr) print(f" URL: {config['url']}", file=os.sys.stderr) print(f" Database: {config['db']}", file=os.sys.stderr) print(f" Username: {config['username']}", file=os.sys.stderr) print(f" Timeout: {timeout}s", file=os.sys.stderr) print(f" Verify SSL: {verify_ssl}", file=os.sys.stderr) return OdooClient( url=config["url"], db=config["db"], username=config["username"], password=config["password"], timeout=timeout, verify_ssl=verify_ssl, ) ``` -------------------------------------------------------------------------------- /src/odoo_mcp/server.py: -------------------------------------------------------------------------------- ```python """ Actualización del servidor MCP principal para integrar todas las extensiones """ import json from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, AsyncIterator, Dict, List, Optional, Union, cast from mcp.server.fastmcp import Context, FastMCP from pydantic import BaseModel, Field from .odoo_client import OdooClient, get_odoo_client from .extensions import register_all_extensions @dataclass class AppContext: """Application context for the MCP server""" odoo: OdooClient @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """ Application lifespan for initialization and cleanup """ # Initialize Odoo client on startup odoo_client = get_odoo_client() try: yield AppContext(odoo=odoo_client) finally: # No cleanup needed for Odoo client pass # Create MCP server mcp = FastMCP( "Odoo MCP Server", description="MCP Server for interacting with Odoo ERP systems", dependencies=["requests"], lifespan=app_lifespan, ) # ----- MCP Resources ----- @mcp.resource( "odoo://models", description="List all available models in the Odoo system" ) def get_models() -> str: """Lists all available models in the Odoo system""" odoo_client = get_odoo_client() models = odoo_client.get_models() return json.dumps(models, indent=2) @mcp.resource( "odoo://model/{model_name}", description="Get detailed information about a specific model including fields", ) def get_model_info(model_name: str) -> str: """ Get information about a specific model Parameters: model_name: Name of the Odoo model (e.g., 'res.partner') """ odoo_client = get_odoo_client() try: # Get model info model_info = odoo_client.get_model_info(model_name) # Get field definitions fields = odoo_client.get_model_fields(model_name) model_info["fields"] = fields return json.dumps(model_info, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp.resource( "odoo://record/{model_name}/{record_id}", description="Get detailed information of a specific record by ID", ) def get_record(model_name: str, record_id: str) -> str: """ Get a specific record by ID Parameters: model_name: Name of the Odoo model (e.g., 'res.partner') record_id: ID of the record """ odoo_client = get_odoo_client() try: record_id_int = int(record_id) record = odoo_client.read_records(model_name, [record_id_int]) if not record: return json.dumps( {"error": f"Record not found: {model_name} ID {record_id}"}, indent=2 ) return json.dumps(record[0], indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp.resource( "odoo://search/{model_name}/{domain}", description="Search for records matching the domain", ) def search_records_resource(model_name: str, domain: str) -> str: """ Search for records that match a domain Parameters: model_name: Name of the Odoo model (e.g., 'res.partner') domain: Search domain in JSON format (e.g., '[["name", "ilike", "test"]]') """ odoo_client = get_odoo_client() try: # Parse domain from JSON string domain_list = json.loads(domain) # Set a reasonable default limit limit = 10 # Perform search_read for efficiency results = odoo_client.search_read(model_name, domain_list, limit=limit) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # ----- Pydantic models for type safety ----- class DomainCondition(BaseModel): """A single condition in a search domain""" field: str = Field(description="Field name to search") operator: str = Field( description="Operator (e.g., '=', '!=', '>', '<', 'in', 'not in', 'like', 'ilike')" ) value: Any = Field(description="Value to compare against") def to_tuple(self) -> List: """Convert to Odoo domain condition tuple""" return [self.field, self.operator, self.value] class SearchDomain(BaseModel): """Search domain for Odoo models""" conditions: List[DomainCondition] = Field( default_factory=list, description="List of conditions for searching. All conditions are combined with AND operator.", ) def to_domain_list(self) -> List[List]: """Convert to Odoo domain list format""" return [condition.to_tuple() for condition in self.conditions] class EmployeeSearchResult(BaseModel): """Represents a single employee search result.""" id: int = Field(description="Employee ID") name: str = Field(description="Employee name") class SearchEmployeeResponse(BaseModel): """Response model for the search_employee tool.""" success: bool = Field(description="Indicates if the search was successful") result: Optional[List[EmployeeSearchResult]] = Field( default=None, description="List of employee search results" ) error: Optional[str] = Field(default=None, description="Error message, if any") class Holiday(BaseModel): """Represents a single holiday.""" display_name: str = Field(description="Display name of the holiday") start_datetime: str = Field(description="Start date and time of the holiday") stop_datetime: str = Field(description="End date and time of the holiday") employee_id: List[Union[int, str]] = Field( description="Employee ID associated with the holiday" ) name: str = Field(description="Name of the holiday") state: str = Field(description="State of the holiday") class SearchHolidaysResponse(BaseModel): """Response model for the search_holidays tool.""" success: bool = Field(description="Indicates if the search was successful") result: Optional[List[Holiday]] = Field( default=None, description="List of holidays found" ) error: Optional[str] = Field(default=None, description="Error message, if any") # ----- MCP Tools ----- @mcp.tool(description="Execute a custom method on an Odoo model") def execute_method( ctx: Context, model: str, method: str, args: List = None, kwargs: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Execute a custom method on an Odoo model Parameters: model: The model name (e.g., 'res.partner') method: Method name to execute args: Positional arguments kwargs: Keyword arguments Returns: Dictionary containing: - success: Boolean indicating success - result: Result of the method (if success) - error: Error message (if failure) """ odoo = ctx.request_context.lifespan_context.odoo try: args = args or [] kwargs = kwargs or {} # Special handling for search methods like search, search_count, search_read search_methods = ["search", "search_count", "search_read"] if method in search_methods and args: # Search methods usually have domain as the first parameter # args: [[domain], limit, offset, ...] or [domain, limit, offset, ...] normalized_args = list( args ) # Create a copy to avoid affecting the original args if len(normalized_args) > 0: # Process domain in args[0] domain = normalized_args[0] domain_list = [] # Check if domain is wrapped unnecessarily ([domain] instead of domain) if ( isinstance(domain, list) and len(domain) == 1 and isinstance(domain[0], list) ): # Case [[domain]] - unwrap to [domain] domain = domain[0] # Normalize domain similar to search_records function if domain is None: domain_list = [] elif isinstance(domain, dict): if "conditions" in domain: # Object format conditions = domain.get("conditions", []) domain_list = [] for cond in conditions: if isinstance(cond, dict) and all( k in cond for k in ["field", "operator", "value"] ): domain_list.append( [cond["field"], cond["operator"], cond["value"]] ) elif isinstance(domain, list): # List format if not domain: domain_list = [] elif all(isinstance(item, list) for item in domain) or any( item in ["&", "|", "!"] for item in domain ): domain_list = domain elif len(domain) >= 3 and isinstance(domain[0], str): # Case [field, operator, value] (not [[field, operator, value]]) domain_list = [domain] elif isinstance(domain, str): # String format (JSON) try: parsed_domain = json.loads(domain) if ( isinstance(parsed_domain, dict) and "conditions" in parsed_domain ): conditions = parsed_domain.get("conditions", []) domain_list = [] for cond in conditions: if isinstance(cond, dict) and all( k in cond for k in ["field", "operator", "value"] ): domain_list.append( [cond["field"], cond["operator"], cond["value"]] ) elif isinstance(parsed_domain, list): domain_list = parsed_domain except json.JSONDecodeError: try: import ast parsed_domain = ast.literal_eval(domain) if isinstance(parsed_domain, list): domain_list = parsed_domain except: domain_list = [] # Xác thực domain_list if domain_list: valid_conditions = [] for cond in domain_list: if isinstance(cond, str) and cond in ["&", "|", "!"]: valid_conditions.append(cond) continue if ( isinstance(cond, list) and len(cond) == 3 and isinstance(cond[0], str) and isinstance(cond[1], str) ): valid_conditions.append(cond) domain_list = valid_conditions # Cập nhật args với domain đã chuẩn hóa normalized_args[0] = domain_list args = normalized_args # Log for debugging print(f"Executing {method} with normalized domain: {domain_list}") result = odoo.execute_method(model, method, *args, **kwargs) return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Search for employees by name") def search_employee( ctx: Context, name: str, limit: int = 20, ) -> SearchEmployeeResponse: """ Search for employees by name using Odoo's name_search method. Parameters: name: The name (or part of the name) to search for. limit: The maximum number of results to return (default 20). Returns: SearchEmployeeResponse containing results or error information. """ odoo = ctx.request_context.lifespan_context.odoo model = "hr.employee" method = "name_search" args = [] kwargs = {"name": name, "limit": limit} try: result = odoo.execute_method(model, method, *args, **kwargs) parsed_result = [ EmployeeSearchResult(id=item[0], name=item[1]) for item in result ] return SearchEmployeeResponse(success=True, result=parsed_result) except Exception as e: return SearchEmployeeResponse(success=False, error=str(e)) @mcp.tool(description="Search for holidays within a date range") def search_holidays( ctx: Context, start_date: str, end_date: str, employee_id: Optional[int] = None, ) -> SearchHolidaysResponse: """ Searches for holidays within a specified date range. Parameters: start_date: Start date in YYYY-MM-DD format. end_date: End date in YYYY-MM-DD format. employee_id: Optional employee ID to filter holidays. Returns: SearchHolidaysResponse: Object containing the search results. """ odoo = ctx.request_context.lifespan_context.odoo # Validate date format using datetime try: datetime.strptime(start_date, "%Y-%m-%d") except ValueError: return SearchHolidaysResponse( success=False, error="Invalid start_date format. Use YYYY-MM-DD." ) try: datetime.strptime(end_date, "%Y-%m-%d") except ValueError: return SearchHolidaysResponse( success=False, error="Invalid end_date format. Use YYYY-MM-DD." ) # Calculate adjusted start_date (subtract one day) start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") adjusted_start_date_dt = start_date_dt - timedelta(days=1) adjusted_start_date = adjusted_start_date_dt.strftime("%Y-%m-%d") # Build the domain domain = [ "&", ["start_datetime", "<=", f"{end_date} 22:59:59"], # Use adjusted date ["stop_datetime", ">=", f"{adjusted_start_date} 23:00:00"], ] if employee_id: domain.append( ["employee_id", "=", employee_id], ) try: holidays = odoo.search_read( model_name="hr.leave.report.calendar", domain=domain, ) parsed_holidays = [Holiday(**holiday) for holiday in holidays] return SearchHolidaysResponse(success=True, result=parsed_holidays) except Exception as e: return SearchHolidaysResponse(success=False, error=str(e)) # Registrar todas las extensiones register_all_extensions(mcp) ``` -------------------------------------------------------------------------------- /src/odoo_mcp/tools_accounting.py: -------------------------------------------------------------------------------- ```python """ Implementación de herramientas (tools) para contabilidad en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP, Context from .models import ( JournalEntryFilter, JournalEntryCreate, FinancialRatioInput ) def register_accounting_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con contabilidad""" @mcp.tool(description="Busca asientos contables con filtros") def search_journal_entries( ctx: Context, filters: JournalEntryFilter ) -> Dict[str, Any]: """ Busca asientos contables según los filtros especificados Args: filters: Filtros para la búsqueda de asientos Returns: Diccionario con resultados de la búsqueda """ odoo = ctx.request_context.lifespan_context.odoo try: # Construir dominio de búsqueda domain = [] if filters.date_from: try: datetime.strptime(filters.date_from, "%Y-%m-%d") domain.append(("date", ">=", filters.date_from)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_from}. Use YYYY-MM-DD."} if filters.date_to: try: datetime.strptime(filters.date_to, "%Y-%m-%d") domain.append(("date", "<=", filters.date_to)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_to}. Use YYYY-MM-DD."} if filters.journal_id: domain.append(("journal_id", "=", filters.journal_id)) if filters.state: domain.append(("state", "=", filters.state)) # Campos a recuperar fields = [ "name", "ref", "date", "journal_id", "state", "amount_total", "amount_total_signed", "line_ids" ] # Ejecutar búsqueda entries = odoo.search_read( "account.move", domain, fields=fields, limit=filters.limit, offset=filters.offset ) # Obtener el conteo total sin límite para paginación total_count = odoo.execute_method("account.move", "search_count", domain) # Para cada asiento, obtener información resumida de las líneas for entry in entries: if entry.get("line_ids"): line_ids = entry["line_ids"] lines = odoo.search_read( "account.move.line", [("id", "in", line_ids)], fields=["name", "account_id", "partner_id", "debit", "credit", "balance"] ) entry["lines"] = lines # Eliminar la lista de IDs para reducir tamaño entry.pop("line_ids", None) return { "success": True, "result": { "count": len(entries), "total_count": total_count, "entries": entries } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crea un nuevo asiento contable") def create_journal_entry( ctx: Context, entry: JournalEntryCreate ) -> Dict[str, Any]: """ Crea un nuevo asiento contable Args: entry: Datos del asiento a crear Returns: Respuesta con el resultado de la operación """ odoo = ctx.request_context.lifespan_context.odoo try: # Verificar que el debe y el haber cuadran total_debit = sum(line.debit for line in entry.lines) total_credit = sum(line.credit for line in entry.lines) if round(total_debit, 2) != round(total_credit, 2): return { "success": False, "error": f"El asiento no está cuadrado. Debe: {total_debit}, Haber: {total_credit}" } # Preparar valores para el asiento move_vals = { "journal_id": entry.journal_id, "line_ids": [] } if entry.ref: move_vals["ref"] = entry.ref if entry.date: try: datetime.strptime(entry.date, "%Y-%m-%d") move_vals["date"] = entry.date except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {entry.date}. Use YYYY-MM-DD."} # Preparar líneas del asiento for line in entry.lines: line_vals = [ 0, 0, { "account_id": line.account_id, "name": line.name or "/", "debit": line.debit, "credit": line.credit } ] if line.partner_id: line_vals[2]["partner_id"] = line.partner_id move_vals["line_ids"].append(line_vals) # Crear asiento move_id = odoo.execute_method("account.move", "create", move_vals) # Obtener información del asiento creado move_info = odoo.execute_method("account.move", "read", [move_id], ["name", "state"])[0] return { "success": True, "result": { "move_id": move_id, "name": move_info["name"], "state": move_info["state"] } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Calcula ratios financieros clave") def analyze_financial_ratios( ctx: Context, params: FinancialRatioInput ) -> Dict[str, Any]: """ Calcula ratios financieros clave para un período específico Args: params: Parámetros para el análisis Returns: Diccionario con los ratios calculados """ odoo = ctx.request_context.lifespan_context.odoo try: # Validar fechas try: datetime.strptime(params.date_from, "%Y-%m-%d") datetime.strptime(params.date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Verificar qué ratios se solicitan requested_ratios = params.ratios # Inicializar resultado ratios = {} # Obtener datos del balance general # Activos assets_domain = [ ("account_id.user_type_id.internal_group", "=", "asset"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] assets_data = odoo.search_read( "account.move.line", assets_domain, fields=["account_id", "balance"] ) total_assets = sum(line["balance"] for line in assets_data) # Activos corrientes current_assets_domain = [ ("account_id.user_type_id.internal_group", "=", "asset"), ("account_id.user_type_id.type", "=", "liquidity"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] current_assets_data = odoo.search_read( "account.move.line", current_assets_domain, fields=["account_id", "balance"] ) current_assets = sum(line["balance"] for line in current_assets_data) # Pasivos liabilities_domain = [ ("account_id.user_type_id.internal_group", "=", "liability"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] liabilities_data = odoo.search_read( "account.move.line", liabilities_domain, fields=["account_id", "balance"] ) total_liabilities = sum(line["balance"] for line in liabilities_data) # Pasivos corrientes current_liabilities_domain = [ ("account_id.user_type_id.internal_group", "=", "liability"), ("account_id.user_type_id.type", "=", "payable"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] current_liabilities_data = odoo.search_read( "account.move.line", current_liabilities_domain, fields=["account_id", "balance"] ) current_liabilities = sum(line["balance"] for line in current_liabilities_data) # Patrimonio equity_domain = [ ("account_id.user_type_id.internal_group", "=", "equity"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] equity_data = odoo.search_read( "account.move.line", equity_domain, fields=["account_id", "balance"] ) total_equity = sum(line["balance"] for line in equity_data) # Ingresos income_domain = [ ("account_id.user_type_id.internal_group", "=", "income"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] income_data = odoo.search_read( "account.move.line", income_domain, fields=["account_id", "balance"] ) total_income = sum(line["balance"] for line in income_data) # Gastos expense_domain = [ ("account_id.user_type_id.internal_group", "=", "expense"), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("parent_state", "=", "posted") ] expense_data = odoo.search_read( "account.move.line", expense_domain, fields=["account_id", "balance"] ) total_expenses = sum(line["balance"] for line in expense_data) # Calcular beneficio neto net_income = total_income - total_expenses # Calcular ratios solicitados if "liquidity" in requested_ratios: # Ratio de liquidez corriente current_ratio = 0 if current_liabilities != 0: current_ratio = current_assets / abs(current_liabilities) ratios["liquidity"] = { "current_ratio": current_ratio, "current_assets": current_assets, "current_liabilities": abs(current_liabilities) } if "profitability" in requested_ratios: # Rentabilidad sobre activos (ROA) roa = 0 if total_assets != 0: roa = (net_income / total_assets) * 100 # Rentabilidad sobre patrimonio (ROE) roe = 0 if total_equity != 0: roe = (net_income / total_equity) * 100 # Margen de beneficio neto profit_margin = 0 if total_income != 0: profit_margin = (net_income / total_income) * 100 ratios["profitability"] = { "return_on_assets": roa, "return_on_equity": roe, "net_profit_margin": profit_margin, "net_income": net_income, "total_income": total_income } if "debt" in requested_ratios: # Ratio de endeudamiento debt_ratio = 0 if total_assets != 0: debt_ratio = (abs(total_liabilities) / total_assets) * 100 # Ratio de apalancamiento leverage_ratio = 0 if total_equity != 0: leverage_ratio = (abs(total_liabilities) / total_equity) ratios["debt"] = { "debt_ratio": debt_ratio, "leverage_ratio": leverage_ratio, "total_liabilities": abs(total_liabilities), "total_equity": total_equity } if "efficiency" in requested_ratios: # Rotación de activos asset_turnover = 0 if total_assets != 0: asset_turnover = total_income / total_assets ratios["efficiency"] = { "asset_turnover": asset_turnover } # Preparar resultado result = { "period": { "from": params.date_from, "to": params.date_to }, "summary": { "total_assets": total_assets, "total_liabilities": abs(total_liabilities), "total_equity": total_equity, "total_income": total_income, "total_expenses": abs(total_expenses), "net_income": net_income }, "ratios": ratios } return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /src/odoo_mcp/tools_inventory.py: -------------------------------------------------------------------------------- ```python """ Implementación de herramientas (tools) para inventario en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP, Context from .models import ( ProductAvailabilityInput, InventoryAdjustmentCreate, InventoryTurnoverInput ) def register_inventory_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con inventario""" @mcp.tool(description="Verifica la disponibilidad de stock para uno o más productos") def check_product_availability( ctx: Context, params: ProductAvailabilityInput ) -> Dict[str, Any]: """ Verifica la disponibilidad de stock para uno o más productos Args: params: Parámetros con IDs de productos y ubicación opcional Returns: Diccionario con información de disponibilidad """ odoo = ctx.request_context.lifespan_context.odoo try: # Verificar que los productos existen products = odoo.search_read( "product.product", [("id", "in", params.product_ids)], fields=["name", "default_code", "type", "uom_id"] ) if not products: return {"success": False, "error": "No se encontraron productos con los IDs proporcionados"} # Mapear IDs a nombres para referencia product_names = {p["id"]: p["name"] for p in products} # Obtener disponibilidad availability = {} for product_id in params.product_ids: # Construir contexto para la consulta context = {} if params.location_id: context["location"] = params.location_id # Obtener cantidad disponible usando el método qty_available try: product_data = odoo.execute_method( "product.product", "read", [product_id], ["qty_available", "virtual_available", "incoming_qty", "outgoing_qty"], context ) if product_data: product_info = product_data[0] availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "qty_available": product_info["qty_available"], "virtual_available": product_info["virtual_available"], "incoming_qty": product_info["incoming_qty"], "outgoing_qty": product_info["outgoing_qty"] } else: availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "error": "Producto no encontrado" } except Exception as e: availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "error": str(e) } # Obtener información de la ubicación si se especificó location_info = None if params.location_id: try: location_data = odoo.search_read( "stock.location", [("id", "=", params.location_id)], fields=["name", "complete_name"] ) if location_data: location_info = location_data[0] except Exception: location_info = {"id": params.location_id, "name": "Ubicación desconocida"} return { "success": True, "result": { "products": availability, "location": location_info } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crea un ajuste de inventario para corregir el stock") def create_inventory_adjustment( ctx: Context, adjustment: InventoryAdjustmentCreate ) -> Dict[str, Any]: """ Crea un ajuste de inventario para corregir el stock Args: adjustment: Datos del ajuste a crear Returns: Respuesta con el resultado de la operación """ odoo = ctx.request_context.lifespan_context.odoo try: # Verificar la versión de Odoo para determinar el modelo correcto # En Odoo 13.0+, se usa 'stock.inventory' # En Odoo 15.0+, se usa 'stock.quant' directamente # Intentar obtener el modelo stock.inventory inventory_model_exists = odoo.execute_method( "ir.model", "search_count", [("model", "=", "stock.inventory")] ) > 0 if inventory_model_exists: # Usar el flujo de stock.inventory (Odoo 13.0, 14.0) # Crear el inventario inventory_vals = { "name": adjustment.name, "line_ids": [] } if adjustment.date: try: datetime.strptime(adjustment.date, "%Y-%m-%d") inventory_vals["date"] = adjustment.date except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {adjustment.date}. Use YYYY-MM-DD."} # Crear el inventario inventory_id = odoo.execute_method("stock.inventory", "create", inventory_vals) # Añadir líneas al inventario for line in adjustment.adjustment_lines: line_vals = { "inventory_id": inventory_id, "product_id": line.product_id, "location_id": line.location_id, "product_qty": line.product_qty } odoo.execute_method("stock.inventory.line", "create", line_vals) # Confirmar el inventario odoo.execute_method("stock.inventory", "action_validate", [inventory_id]) return { "success": True, "result": { "inventory_id": inventory_id, "name": adjustment.name } } else: # Usar el flujo de stock.quant (Odoo 15.0+) result_ids = [] for line in adjustment.adjustment_lines: # Buscar el quant existente quant_domain = [ ("product_id", "=", line.product_id), ("location_id", "=", line.location_id) ] quants = odoo.search_read( "stock.quant", quant_domain, fields=["id", "quantity"] ) if quants: # Actualizar quant existente quant_id = quants[0]["id"] odoo.execute_method( "stock.quant", "write", [quant_id], {"inventory_quantity": line.product_qty} ) result_ids.append(quant_id) else: # Crear nuevo quant quant_vals = { "product_id": line.product_id, "location_id": line.location_id, "inventory_quantity": line.product_qty } quant_id = odoo.execute_method("stock.quant", "create", quant_vals) result_ids.append(quant_id) # Aplicar el inventario odoo.execute_method("stock.quant", "action_apply_inventory", result_ids) return { "success": True, "result": { "quant_ids": result_ids, "name": adjustment.name } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Calcula y analiza la rotación de inventario") def analyze_inventory_turnover( ctx: Context, params: InventoryTurnoverInput ) -> Dict[str, Any]: """ Calcula y analiza la rotación de inventario Args: params: Parámetros para el análisis Returns: Diccionario con resultados del análisis """ odoo = ctx.request_context.lifespan_context.odoo try: # Validar fechas try: date_from = datetime.strptime(params.date_from, "%Y-%m-%d") date_to = datetime.strptime(params.date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para productos product_domain = [("type", "=", "product")] # Solo productos almacenables if params.product_ids: product_domain.append(("id", "in", params.product_ids)) if params.category_id: product_domain.append(("categ_id", "=", params.category_id)) # Obtener productos products = odoo.search_read( "product.product", product_domain, fields=["name", "default_code", "categ_id", "standard_price"] ) if not products: return {"success": False, "error": "No se encontraron productos con los criterios especificados"} # Calcular rotación para cada producto product_turnover = {} for product in products: product_id = product["id"] # 1. Obtener movimientos de salida (ventas) en el período outgoing_domain = [ ("product_id", "=", product_id), ("date", ">=", params.date_from), ("date", "<=", params.date_to), ("location_dest_id.usage", "=", "customer") # Destino: cliente ] outgoing_moves = odoo.search_read( "stock.move", outgoing_domain, fields=["product_uom_qty", "price_unit"] ) # Calcular costo de ventas cogs = sum(move["product_uom_qty"] * (move.get("price_unit") or product["standard_price"]) for move in outgoing_moves) # 2. Obtener valor de inventario promedio # Intentar obtener valoración de inventario al inicio y fin del período # Método 1: Usar informes de valoración si están disponibles try: # Valoración al inicio del período context_start = { "to_date": params.date_from } valuation_start = odoo.execute_method( "product.product", "read", [product_id], ["stock_value"], context_start ) # Valoración al final del período context_end = { "to_date": params.date_to } valuation_end = odoo.execute_method( "product.product", "read", [product_id], ["stock_value"], context_end ) start_value = valuation_start[0]["stock_value"] if valuation_start else 0 end_value = valuation_end[0]["stock_value"] if valuation_end else 0 avg_inventory_value = (start_value + end_value) / 2 except Exception: # Método 2: Estimación basada en precio estándar y cantidad # Obtener cantidad al inicio context_start = { "to_date": params.date_from } qty_start = odoo.execute_method( "product.product", "read", [product_id], ["qty_available"], context_start ) # Obtener cantidad al final context_end = { "to_date": params.date_to } qty_end = odoo.execute_method( "product.product", "read", [product_id], ["qty_available"], context_end ) start_qty = qty_start[0]["qty_available"] if qty_start else 0 end_qty = qty_end[0]["qty_available"] if qty_end else 0 avg_qty = (start_qty + end_qty) / 2 avg_inventory_value = avg_qty * product["standard_price"] # 3. Calcular métricas de rotación turnover_ratio = 0 days_inventory = 0 if avg_inventory_value > 0: turnover_ratio = cogs / avg_inventory_value # Días de inventario (basado en el período analizado) days_in_period = (date_to - date_from).days + 1 if turnover_ratio > 0: days_inventory = days_in_period / turnover_ratio # Guardar resultados product_turnover[product_id] = { "name": product["name"], "default_code": product["default_code"], "category": product["categ_id"][1] if product["categ_id"] else "Sin categoría", "cogs": cogs, "avg_inventory_value": avg_inventory_value, "turnover_ratio": turnover_ratio, "days_inventory": days_inventory } # Ordenar productos por rotación (de mayor a menor) sorted_products = sorted( product_turnover.items(), key=lambda x: x[1]["turnover_ratio"], reverse=True ) # Calcular promedios generales total_cogs = sum(data["cogs"] for _, data in product_turnover.items()) total_avg_value = sum(data["avg_inventory_value"] for _, data in product_turnover.items()) overall_turnover = 0 overall_days = 0 if total_avg_value > 0: overall_turnover = total_cogs / total_avg_value days_in_period = (date_to - date_from).days + 1 if overall_turnover > 0: overall_days = days_in_period / overall_turnover # Preparar resultado result = { "period": { "from": params.date_from, "to": params.date_to, "days": (date_to - date_from).days + 1 }, "summary": { "product_count": len(products), "total_cogs": total_cogs, "total_avg_inventory_value": total_avg_value, "overall_turnover_ratio": overall_turnover, "overall_days_inventory": overall_days }, "products": [ {"id": k, **v} for k, v in sorted_products ] } return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} ```