# Directory Structure ``` ├── .gitignore ├── .python-version ├── .vscode │ └── settings.json ├── ~ │ └── .zshrc ├── app.log ├── pyproject.toml ├── README.md ├── src │ ├── .DS_Store │ └── scaflog_zoho_mcp_server │ ├── __init__.py │ ├── .DS_Store │ ├── auth.py │ ├── config.py │ ├── models.py │ ├── resource_config.py │ ├── server.py │ └── service.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── test_server.py │ └── test_service.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.12 2 | ``` -------------------------------------------------------------------------------- /~/.zshrc: -------------------------------------------------------------------------------- ``` 1 | # Add these lines to the end of your .zshrc 2 | alias python="python3" 3 | alias pip="pip3" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python-generated files 2 | __pycache__/ 3 | *.py[oc] 4 | build/ 5 | dist/ 6 | wheels/ 7 | *.egg-info 8 | 9 | # Virtual environments 10 | .venv 11 | 12 | # Environment variables 13 | .env 14 | 15 | # Flatten repo 16 | flatten/ 17 | 18 | # other 19 | app.log 20 | .DS_Store 21 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # scaflog-zoho-mcp-server MCP server 2 | 3 | Zoho Creator Scaflog App MCP Server 4 | 5 | ## Components 6 | 7 | ### Resources 8 | 9 | The server implements a simple note storage system with: 10 | - Custom note:// URI scheme for accessing individual notes 11 | - Each note resource has a name, description and text/plain mimetype 12 | 13 | ### Prompts 14 | 15 | The server provides a single prompt: 16 | - summarize-notes: Creates summaries of all stored notes 17 | - Optional "style" argument to control detail level (brief/detailed) 18 | - Generates prompt combining all current notes with style preference 19 | 20 | ### Tools 21 | 22 | The server implements one tool: 23 | - add-note: Adds a new note to the server 24 | - Takes "name" and "content" as required string arguments 25 | - Updates server state and notifies clients of resource changes 26 | 27 | ## Configuration 28 | 29 | [TODO: Add configuration details specific to your implementation] 30 | 31 | ## Quickstart 32 | 33 | ### Install 34 | 35 | #### Claude Desktop 36 | 37 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json` 38 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json` 39 | 40 | <details> 41 | <summary>Development/Unpublished Servers Configuration</summary> 42 | ``` 43 | "mcpServers": { 44 | "scaflog-zoho-mcp-server": { 45 | "command": "uv", 46 | "args": [ 47 | "--directory", 48 | "/Users/alexsherin/Documents/Projects/MCP Servers", 49 | "run", 50 | "scaflog-zoho-mcp-server" 51 | ] 52 | } 53 | } 54 | ``` 55 | </details> 56 | 57 | <details> 58 | <summary>Published Servers Configuration</summary> 59 | ``` 60 | "mcpServers": { 61 | "scaflog-zoho-mcp-server": { 62 | "command": "uvx", 63 | "args": [ 64 | "scaflog-zoho-mcp-server" 65 | ] 66 | } 67 | } 68 | ``` 69 | </details> 70 | 71 | ## Development 72 | 73 | ### Building and Publishing 74 | 75 | To prepare the package for distribution: 76 | 77 | 1. Sync dependencies and update lockfile: 78 | ```bash 79 | uv sync 80 | ``` 81 | 82 | 2. Build package distributions: 83 | ```bash 84 | uv build 85 | ``` 86 | 87 | This will create source and wheel distributions in the `dist/` directory. 88 | 89 | 3. Publish to PyPI: 90 | ```bash 91 | uv publish 92 | ``` 93 | 94 | Note: You'll need to set PyPI credentials via environment variables or command flags: 95 | - Token: `--token` or `UV_PUBLISH_TOKEN` 96 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD` 97 | 98 | ### Debugging 99 | 100 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging 101 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). 102 | 103 | 104 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command: 105 | 106 | ```bash 107 | npx @modelcontextprotocol/inspector uv --directory /Users/alexsherin/Documents/Projects/MCP Servers run scaflog-zoho-mcp-server 108 | ``` 109 | 110 | 111 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging. ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python 1 | # tests/__init__.py 2 | 3 | # import pytest 4 | 5 | # @pytest.fixture(scope="module") 6 | # def mock_server(): 7 | # return ZohoCreatorServer() 8 | ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/__init__.py: -------------------------------------------------------------------------------- ```python 1 | from . import server 2 | import asyncio 3 | 4 | def main(): 5 | """Main entry point for the package.""" 6 | asyncio.run(server.main()) 7 | 8 | # Optionally expose other important items at package level 9 | __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- ```json 1 | { 2 | "window.commandCenter": 1, 3 | "workbench.colorTheme": "Material Theme", 4 | "workbench.iconTheme": "material-icon-theme", 5 | "editor.fontSize": 14, 6 | "files.autoSave": "afterDelay", 7 | "files.autoSaveWorkspaceFilesOnly": true, 8 | "git.postCommitCommand": "push", 9 | "git.enableSmartCommit": false, 10 | "git.defaultCommitMessageTemplate": "feat: ", 11 | "git.followTagsWhenSync": true, 12 | "git.inputValidationSubjectLength": 72, 13 | "git.inputValidationLength": 500, 14 | "scm.defaultViewMode": "tree", 15 | "git.useCommitInputAsStashMessage": true 16 | } ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "scaflog-zoho-mcp-server" 3 | version = "0.1.0" 4 | description = "MCP Server for Zoho Creator Integration" 5 | readme = "README.md" 6 | authors = [ 7 | { name = "alexsherin", email = "[email protected]" } 8 | ] 9 | requires-python = ">=3.12" 10 | dependencies = [ 11 | "mcp>=1.1.0", 12 | "zcrmsdk==3.1.0", 13 | "pydantic>=2.0.0", 14 | "python-dotenv>=1.0.0", 15 | "typing-extensions>=4.7.0", 16 | "httpx>=0.24.0", 17 | ] 18 | 19 | [project.scripts] 20 | scaflog-zoho-mcp-server = "scaflog_zoho_mcp_server:main" 21 | 22 | [build-system] 23 | requires = ["hatchling"] 24 | build-backend = "hatchling.build" 25 | 26 | [tool.hatch.build.targets.wheel] 27 | packages = ["src/scaflog_zoho_mcp_server"] 28 | 29 | [tool.pytest.ini_options] 30 | asyncio_mode = "auto" 31 | asyncio_default_fixture_loop_scope = "function" 32 | testpaths = ["tests"] 33 | 34 | [project.optional-dependencies] 35 | test = [ 36 | "pytest>=7.0.0", 37 | "pytest-asyncio>=0.21.0", 38 | "pytest-cov>=4.1.0", 39 | ] ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/config.py: -------------------------------------------------------------------------------- ```python 1 | from pathlib import Path 2 | import os 3 | from typing import Optional 4 | from pydantic import BaseModel, Field 5 | from dotenv import load_dotenv 6 | 7 | class ZohoCreatorConfig(BaseModel): 8 | """Configuration for Zoho Creator API access.""" 9 | client_id: str = Field(..., description="OAuth client ID") 10 | client_secret: str = Field(..., description="OAuth client secret") 11 | refresh_token: str = Field(..., description="OAuth refresh token") 12 | organization_id: str = Field(..., description="Zoho organization ID") 13 | environment: str = Field(default="production", description="Zoho environment (production/sandbox)") 14 | access_token: Optional[str] = Field(default=None, description="Current access token") 15 | 16 | def load_config() -> ZohoCreatorConfig: 17 | """Load configuration from environment variables or .env file.""" 18 | # Try to load from .env file if it exists 19 | env_path = Path(".env") 20 | if env_path.exists(): 21 | load_dotenv(env_path) 22 | 23 | return ZohoCreatorConfig( 24 | client_id=os.getenv("ZOHO_CLIENT_ID", ""), 25 | client_secret=os.getenv("ZOHO_CLIENT_SECRET", ""), 26 | refresh_token=os.getenv("ZOHO_REFRESH_TOKEN", ""), 27 | organization_id=os.getenv("ZOHO_ORGANIZATION_ID", ""), 28 | environment=os.getenv("ZOHO_ENVIRONMENT", "production"), 29 | ) 30 | 31 | # API endpoints for different environments 32 | API_BASE_URL = { 33 | "production": "https://creator.zoho.com/api/v2/100rails/goscaffold", 34 | "sandbox": "https://creator.zoho.com/api/v2/sandbox" 35 | } ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/auth.py: -------------------------------------------------------------------------------- ```python 1 | # src_scaflog_zoho_mcp_server/auth.py 2 | 3 | import time 4 | from typing import Optional 5 | import httpx 6 | from pydantic import BaseModel, Field 7 | 8 | from .config import ZohoCreatorConfig, API_BASE_URL 9 | 10 | class TokenInfo(BaseModel): 11 | """Model for storing token information.""" 12 | access_token: str 13 | expires_in: int 14 | created_at: float = Field(default_factory=time.time) 15 | 16 | @property 17 | def is_expired(self) -> bool: 18 | """Check if the token is expired with a 5-minute buffer.""" 19 | return time.time() > (self.created_at + self.expires_in - 300) 20 | 21 | class ZohoAuth: 22 | """Handles authentication with Zoho Creator API.""" 23 | def __init__(self, config: ZohoCreatorConfig): 24 | self.config = config 25 | self._token_info: Optional[TokenInfo] = None 26 | self._client = httpx.AsyncClient(timeout=30.0) 27 | 28 | async def get_access_token(self) -> str: 29 | """Get a valid access token, refreshing if necessary.""" 30 | if not self._token_info or self._token_info.is_expired: 31 | await self._refresh_token() 32 | return self._token_info.access_token 33 | 34 | async def _refresh_token(self) -> None: 35 | """Refresh the access token using the refresh token.""" 36 | token_url = "https://accounts.zoho.com/oauth/v2/token" 37 | params = { 38 | "client_id": self.config.client_id, 39 | "client_secret": self.config.client_secret, 40 | "refresh_token": self.config.refresh_token, 41 | "grant_type": "refresh_token", 42 | "redirect_uri": "https://www.zohoapis.com" 43 | } 44 | 45 | async with self._client as client: 46 | response = await client.post(token_url, params=params) 47 | response.raise_for_status() 48 | data = response.json() 49 | 50 | self._token_info = TokenInfo( 51 | access_token=data["access_token"], 52 | expires_in=data["expires_in"] 53 | ) 54 | 55 | async def get_authorized_headers(self) -> dict: 56 | """Get headers with authorization token.""" 57 | token = await self.get_access_token() 58 | return { 59 | "Authorization": f"Bearer {token}", 60 | "Content-Type": "application/json", 61 | } 62 | 63 | async def close(self): 64 | """Close the HTTP client.""" 65 | await self._client.aclose() ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/resource_config.py: -------------------------------------------------------------------------------- ```python 1 | # src_scaflog_zoho_mcp_server/resource_config.py 2 | 3 | from typing import Dict, List, Optional 4 | from pydantic import BaseModel 5 | 6 | class FieldConfig(BaseModel): 7 | """Configuration for a whitelisted field.""" 8 | display_name: str 9 | description: Optional[str] = None 10 | required: bool = False 11 | 12 | class FormConfig(BaseModel): 13 | """Configuration for a whitelisted form.""" 14 | link_name: str 15 | display_name: str 16 | description: Optional[str] = None 17 | fields: Dict[str, FieldConfig] 18 | 19 | class ReportConfig(BaseModel): 20 | """Configuration for a whitelisted report.""" 21 | link_name: str 22 | display_name: str 23 | description: Optional[str] = None 24 | fields: Dict[str, FieldConfig] 25 | 26 | # Define the whitelisted resources 27 | WHITELISTED_RESOURCES = { 28 | "forms": { 29 | "Company_Info": FormConfig( 30 | link_name="Company_Info", 31 | display_name="Company Information", 32 | description="Core company details and profile", 33 | fields={ 34 | "Company_Name": FieldConfig( 35 | display_name="Company Name", 36 | description="Legal name of the company", 37 | required=True 38 | ), 39 | "Phone": FieldConfig( 40 | display_name="Phone Number", 41 | description="Primary contact number" 42 | ), 43 | "Email": FieldConfig( 44 | display_name="Email", 45 | description="Primary contact email" 46 | ), 47 | "Industry": FieldConfig( 48 | display_name="Industry", 49 | description="Company's primary industry" 50 | ) 51 | } 52 | ), 53 | # Add more forms as needed 54 | }, 55 | "reports": { 56 | "Company_All_Data": ReportConfig( 57 | link_name="Company_All_Data", 58 | display_name="Company Overview", 59 | description="Comprehensive view of company information", 60 | fields={ 61 | "Company_Name": FieldConfig( 62 | display_name="Company Name", 63 | description="Legal name of the company" 64 | ), 65 | "Industry": FieldConfig( 66 | display_name="Industry", 67 | description="Company's primary industry" 68 | ), 69 | "Status": FieldConfig( 70 | display_name="Status", 71 | description="Current company status" 72 | ) 73 | } 74 | ), 75 | # Add more reports as needed 76 | } 77 | } 78 | ``` -------------------------------------------------------------------------------- /tests/test_server.py: -------------------------------------------------------------------------------- ```python 1 | # tests/test_server.py 2 | import pytest 3 | from pydantic import AnyUrl 4 | import mcp.types as types 5 | from mcp import ClientSession 6 | import logging 7 | 8 | # Configure logging for the test 9 | logging.basicConfig(level=logging.INFO) 10 | 11 | @pytest.mark.asyncio 12 | async def test_server_resources(client_session: ClientSession): 13 | """Test server resources functionality.""" 14 | logging.info("Starting test_server_resources...") 15 | resources = await client_session.list_resources() 16 | logging.info(f"Resources: {resources}") # Log the resources 17 | assert len(resources) > 0 18 | assert any(str(r.uri) == "zoho://forms" for r in resources) 19 | assert any(str(r.uri) == "zoho://forms/test_form" for r in resources) 20 | 21 | # Read a resource 22 | resource = await client_session.read_resource("zoho://forms/test_form") 23 | assert resource.mimeType == "application/json" 24 | assert "form" in resource.text 25 | assert "records" in resource.text 26 | 27 | @pytest.mark.asyncio 28 | async def test_server_tools(client_session: ClientSession): 29 | """Test server tools functionality.""" 30 | # List available tools 31 | tools = await client_session.list_tools() 32 | assert any(tool.name == "create-record" for tool in tools) 33 | 34 | # Call a tool 35 | result = await client_session.call_tool( 36 | "create-record", 37 | arguments={ 38 | "form_name": "test_form", 39 | "data": {"test_field": "test_value"} 40 | } 41 | ) 42 | assert len(result) == 1 43 | assert result[0].type == "text" 44 | assert "created successfully" in result[0].text 45 | 46 | @pytest.mark.asyncio 47 | async def test_real_server_resources(client_session: ClientSession): 48 | """Test server resources functionality with real data.""" 49 | # List available resources 50 | resources = await client_session.list_resources() 51 | print("Resources:", resources) # Log the resources 52 | assert len(resources) > 0 53 | assert any(str(r.uri) == "zoho://forms" for r in resources) 54 | assert any(str(r.uri) == "zoho://form/Company_Info" for r in resources) 55 | 56 | # Read a resource 57 | resource = await client_session.read_resource("zoho://report/Company_All_Data") 58 | print("Resource:", resource) # Log the resource 59 | assert resource.mimeType == "application/json" 60 | assert "form" in resource.text 61 | assert "records" in resource.text 62 | 63 | @pytest.mark.asyncio 64 | async def test_real_server_tools(client_session: ClientSession): 65 | """Test server tools functionality with real data.""" 66 | # List available tools 67 | tools = await client_session.list_tools() 68 | print("Tools:", tools) # Log the tools 69 | assert any(tool.name == "create-record" for tool in tools) 70 | 71 | # Call a tool 72 | result = await client_session.call_tool( 73 | "create-record", 74 | arguments={ 75 | "form_name": "test_form", 76 | "data": {"test_field": "test_value"} 77 | } 78 | ) 79 | print("Tool Call Result:", result) # Log the result 80 | assert len(result) == 1 81 | assert result[0].type == "text" 82 | assert "created successfully" in result[0].text 83 | ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/models.py: -------------------------------------------------------------------------------- ```python 1 | # src_scaflog_zoho_mcp_server/models.py 2 | 3 | from datetime import datetime 4 | from typing import Any, Dict, List, Optional 5 | from pydantic import BaseModel, Field 6 | 7 | class ZohoField(BaseModel): 8 | """Represents a field in a Zoho Creator form.""" 9 | link_name: str = Field(..., description="The API name of the field") 10 | display_name: str = Field(..., description="The display name of the field") 11 | field_type: int = Field(..., description="The type of the field (e.g., text, number)") 12 | required: bool = Field(default=False, description="Indicates if the field is mandatory") 13 | unique: bool = Field(default=False, description="Indicates if the field must be unique") 14 | max_char: Optional[int] = Field(default=None, description="Maximum character length for the field") 15 | lookup: Optional[bool] = Field(default=None, description="Indicates if the field is a lookup field") 16 | choices: Optional[List[dict]] = Field(default=None, description="List of choices for the field if applicable") 17 | 18 | class ZohoForm(BaseModel): 19 | """Represents a form in Zoho Creator.""" 20 | link_name: str = Field(..., description="API link name of the form") 21 | display_name: str = Field(..., description="Display name of the form") 22 | type: int = Field(..., description="Type of the form") 23 | fields: List[ZohoField] = Field(default_factory=list) 24 | 25 | class ZohoRecord(BaseModel): 26 | """Represents a record in a Zoho Creator form.""" 27 | id: str 28 | form_link_name: str 29 | data: Dict[str, Any] 30 | # Remove created_time and modified_time if they are not in the response 31 | # created_time: Optional[datetime] = None 32 | # modified_time: Optional[datetime] = None 33 | 34 | class ZohoReport(BaseModel): 35 | """Represents a report in Zoho Creator.""" 36 | link_name: str = Field(..., description="API link name of the report") 37 | display_name: str = Field(..., description="Display name of the report") 38 | type: int = Field(..., description="Type of the reprt") 39 | 40 | class Cache: 41 | """Simple cache for form metadata.""" 42 | def __init__(self, ttl_seconds: int = 300): 43 | self.forms: Dict[str, ZohoForm] = {} 44 | self.reports: Dict[str, ZohoReport] = {} 45 | self.ttl = ttl_seconds 46 | self.last_refresh: Optional[datetime] = None 47 | 48 | def needs_refresh(self) -> bool: 49 | """Check if cache needs refreshing.""" 50 | if not self.last_refresh: 51 | return True 52 | return (datetime.now() - self.last_refresh).total_seconds() > self.ttl 53 | 54 | def update_forms(self, forms: List[ZohoForm]): 55 | """Update cached forms.""" 56 | self.forms = {form.link_name: form for form in forms} 57 | self.last_refresh = datetime.now() 58 | 59 | def get_form(self, link_name: str) -> Optional[ZohoForm]: 60 | """Get a form from cache by link name.""" 61 | return self.forms.get(link_name) 62 | 63 | def update_reports(self, reports: List[ZohoReport]): 64 | """Update cached reports.""" 65 | self.reports = {report.link_name: report for report in reports} 66 | self.last_refresh = datetime.now() 67 | 68 | ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python 1 | # tests/conftest.py 2 | import pytest 3 | from unittest.mock import AsyncMock 4 | from typing import AsyncGenerator, Generator 5 | import httpx 6 | from datetime import datetime 7 | import os 8 | from pathlib import Path 9 | import sys 10 | from dotenv import load_dotenv # Import dotenv to load environment variables 11 | import logging # Import logging 12 | 13 | # Import the necessary classes 14 | from scaflog_zoho_mcp_server.config import ZohoCreatorConfig 15 | from scaflog_zoho_mcp_server.auth import ZohoAuth 16 | from scaflog_zoho_mcp_server.service import ZohoCreatorService 17 | from mcp import ClientSession, StdioServerParameters # Add this import 18 | from mcp.client.stdio import stdio_client # Add this import 19 | 20 | # Configure logging to write to a file 21 | logging.basicConfig( 22 | filename='app.log', # Specify the log file name 23 | filemode='a', # Append mode 24 | format='%(asctime)s - %(levelname)s - %(message)s', 25 | level=logging.INFO # Set the logging level 26 | ) 27 | 28 | # Log that the tests are starting 29 | logging.info("Starting tests...") 30 | 31 | # Load environment variables from .env file 32 | load_dotenv() 33 | 34 | @pytest.fixture(scope="session") 35 | def test_env() -> Generator[dict, None, None]: 36 | """Create a test environment with necessary configuration.""" 37 | env = { 38 | "ZOHO_CLIENT_ID": os.getenv("ZOHO_CLIENT_ID"), 39 | "ZOHO_CLIENT_SECRET": os.getenv("ZOHO_CLIENT_SECRET"), 40 | "ZOHO_REFRESH_TOKEN": os.getenv("ZOHO_REFRESH_TOKEN"), 41 | "ZOHO_ORGANIZATION_ID": os.getenv("ZOHO_ORGANIZATION_ID"), 42 | "ZOHO_ENVIRONMENT": os.getenv("ZOHO_ENVIRONMENT"), 43 | } 44 | 45 | yield env 46 | 47 | @pytest.fixture 48 | async def mock_service() -> AsyncGenerator[ZohoCreatorService, None]: 49 | """Create a service with actual data from Zoho Creator.""" 50 | config = ZohoCreatorConfig( 51 | client_id=os.getenv("ZOHO_CLIENT_ID"), 52 | client_secret=os.getenv("ZOHO_CLIENT_SECRET"), 53 | refresh_token=os.getenv("ZOHO_REFRESH_TOKEN"), 54 | organization_id=os.getenv("ZOHO_ORGANIZATION_ID"), 55 | environment=os.getenv("ZOHO_ENVIRONMENT") 56 | ) 57 | auth = ZohoAuth(config) 58 | 59 | # Create a new instance of ZohoCreatorService 60 | service = ZohoCreatorService(auth) 61 | 62 | # Create a new HTTP client for each test 63 | service._client = httpx.AsyncClient() # Create a new client instance 64 | 65 | yield service 66 | 67 | await service._client.aclose() # Close the client after the test 68 | await service.close() # Close the service 69 | 70 | @pytest.fixture 71 | async def client_session(test_env) -> AsyncGenerator[ClientSession, None]: 72 | """Create a client session connected to the test server.""" 73 | python_path = sys.executable 74 | project_root = Path(__file__).parent.parent 75 | 76 | server_params = StdioServerParameters( 77 | command=python_path, 78 | args=["-m", "src"], 79 | env={ 80 | **os.environ, 81 | **test_env, 82 | "PYTHONPATH": str(project_root) 83 | } 84 | ) 85 | 86 | async with stdio_client(server_params) as (read, write): 87 | async with ClientSession(read, write) as session: 88 | await session.initialize() 89 | yield session 90 | ``` -------------------------------------------------------------------------------- /tests/test_service.py: -------------------------------------------------------------------------------- ```python 1 | # tests/test_service.py 2 | import pytest 3 | from datetime import datetime 4 | import logging 5 | 6 | # Configure logging for the test 7 | logging.basicConfig(level=logging.INFO) 8 | 9 | from scaflog_zoho_mcp_server.service import ZohoCreatorService 10 | 11 | @pytest.mark.asyncio 12 | async def test_list_forms(mock_service: ZohoCreatorService): 13 | """Test listing forms.""" 14 | logging.info("Starting test_list_forms...") 15 | forms = await mock_service.list_forms(force_refresh=True) 16 | logging.info(f"Fetched forms: {[form.display_name for form in forms]}") # Log the display names of the forms 17 | 18 | assert len(forms) > 0 # Ensure that at least one form is returned 19 | assert all(hasattr(form, 'link_name') for form in forms) # Check that each form has a link_name 20 | assert all(hasattr(form, 'display_name') for form in forms) # Check that each form has a display_name 21 | 22 | @pytest.mark.asyncio 23 | async def test_get_records(mock_service: ZohoCreatorService): 24 | """Test getting records.""" 25 | logging.info("Starting test_get_records...") 26 | records = await mock_service.get_records("test_form") 27 | logging.info(f"Fetched records: {records}") # Log the fetched records 28 | assert len(records) == 1 29 | assert records[0].id == "123" 30 | assert records[0].data["ID"] == "test_value" 31 | 32 | @pytest.mark.asyncio 33 | async def test_create_record(mock_service: ZohoCreatorService): 34 | """Test creating a record.""" 35 | record = await mock_service.create_record( 36 | "test_form", 37 | {"test_field": "new_value"} 38 | ) 39 | assert record.id == "123" 40 | assert record.form_link_name == "test_form" 41 | assert record.data["test_field"] == "new_value" 42 | 43 | @pytest.mark.asyncio 44 | async def test_update_record(mock_service: ZohoCreatorService): 45 | """Test updating a record.""" 46 | record = await mock_service.update_record( 47 | "test_form", 48 | "123", 49 | {"test_field": "updated_value"} 50 | ) 51 | assert record.id == "123" 52 | assert record.form_link_name == "test_form" 53 | assert record.data["test_field"] == "updated_value" 54 | 55 | @pytest.mark.asyncio 56 | async def test_fetch_data(mock_service): 57 | logging.info("Starting test_fetch_data...") 58 | data = await mock_service.fetch_data() 59 | logging.info(f"Fetched data: {data}") 60 | assert data is not None # Example assertion 61 | 62 | @pytest.mark.asyncio 63 | async def test_fetch_all_records(mock_service: ZohoCreatorService): 64 | """Test fetching all records from the Company_Info report.""" 65 | logging.info("Starting test_fetch_all_records...") 66 | 67 | # Fetch all records for the report "Company_All_Data_Report" 68 | records = await mock_service.get_records("Company_All_Data") # Use the report link name 69 | 70 | # Log the fetched records 71 | logging.info(f"Fetched records: {records}") 72 | 73 | # Assertions to verify the records 74 | assert len(records) > 0 # Ensure that at least one record is returned 75 | for record in records: 76 | assert isinstance(record.id, str) # Ensure each record has a valid ID 77 | # assert "Company_Info" in record.data # Ensure the record contains data for the form 78 | 79 | # You can add more tests below... 80 | ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/service.py: -------------------------------------------------------------------------------- ```python 1 | # src_scaflog_zoho_mcp_server/service.py 2 | 3 | from typing import List, Optional, Dict, Any 4 | import httpx 5 | from datetime import datetime 6 | import logging 7 | 8 | from .models import ZohoForm, ZohoReport, ZohoField, ZohoRecord, Cache 9 | from .auth import ZohoAuth 10 | from .config import API_BASE_URL 11 | 12 | # Configure logging to write to a file 13 | logging.basicConfig( 14 | filename='app.log', # Specify the log file name 15 | filemode='a', # Append mode 16 | format='%(asctime)s - %(levelname)s - %(message)s', 17 | level=logging.INFO # Set the logging level 18 | ) 19 | 20 | class ZohoCreatorService: 21 | """Service for interacting with Zoho Creator API.""" 22 | 23 | def __init__(self, auth: ZohoAuth): 24 | self.auth = auth 25 | self.cache = Cache() 26 | self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0)) 27 | self.base_url = API_BASE_URL[auth.config.environment] 28 | 29 | async def list_forms(self, force_refresh: bool = False) -> List[ZohoForm]: 30 | """Get all available forms.""" 31 | if not force_refresh and not self.cache.needs_refresh(): 32 | return list(self.cache.forms.values()) 33 | 34 | headers = await self.auth.get_authorized_headers() 35 | url = f"{self.base_url}/forms" 36 | 37 | response = await self._client.get(url, headers=headers) 38 | response.raise_for_status() 39 | data = response.json() 40 | logging.info(f"Response from list_forms: {data}") # Log the entire response 41 | 42 | forms = [] 43 | for form_data in data['forms'][:10]: 44 | logging.info(f"Processing form: {form_data['link_name']}") # Log the link_name 45 | fields = await self._get_form_fields(form_data['link_name'], headers) 46 | form = ZohoForm( 47 | link_name=form_data['link_name'], 48 | display_name=form_data['display_name'], 49 | fields=fields, 50 | type=form_data['type'] 51 | ) 52 | forms.append(form) 53 | 54 | self.cache.update_forms(forms) 55 | return forms 56 | 57 | async def _get_form_fields(self, form_link_name: str, headers: dict) -> List[ZohoField]: 58 | """Get fields for a specific form.""" 59 | url = f"{self.base_url}/form/{form_link_name}/fields" 60 | 61 | response = await self._client.get(url, headers=headers) 62 | response.raise_for_status() 63 | data = response.json() 64 | # logging.info(f"Response from _get_form_fields: {data}") # Log the entire response 65 | 66 | return [ 67 | ZohoField( 68 | link_name=field['link_name'], 69 | display_name=field['display_name'], 70 | field_type=field['type'], 71 | required=field['mandatory'], 72 | unique=field['unique'], 73 | max_char=field.get('max_char'), 74 | lookup=field.get('is_lookup_field'), 75 | choices=field.get('choices') 76 | ) 77 | for field in data['fields'] 78 | ] 79 | 80 | async def list_reports(self, force_refresh: bool = False) -> List[ZohoReport]: 81 | """Get all available reports.""" 82 | if not force_refresh and not self.cache.needs_refresh(): 83 | return list(self.cache.reports.values()) 84 | 85 | headers = await self.auth.get_authorized_headers() 86 | url = f"{self.base_url}/reports" 87 | 88 | response = await self._client.get(url, headers=headers) 89 | response.raise_for_status() 90 | data = response.json() 91 | logging.info(f"Response from list_reports: {data}") # Log the entire response 92 | 93 | reports = [] 94 | for form_data in data['reports'][:10]: 95 | logging.info(f"Processing reporrt: {form_data['link_name']}") # Log the link_name 96 | form = ZohoReport( 97 | link_name=form_data['link_name'], 98 | display_name=form_data['display_name'], 99 | type=form_data['type'] 100 | ) 101 | reports.append(form) 102 | 103 | self.cache.update_reports(reports) 104 | return reports 105 | 106 | async def get_records( 107 | self, 108 | report_link_name: str, 109 | criteria: Optional[str] = None, 110 | limit: Optional[int] = None 111 | ) -> List[ZohoRecord]: 112 | """Get records from a specific report.""" 113 | headers = await self.auth.get_authorized_headers() 114 | url = f"{self.base_url}/report/{report_link_name}" 115 | 116 | params = {} 117 | if criteria: 118 | params['criteria'] = criteria 119 | if limit: 120 | params['limit'] = limit 121 | 122 | async with self._client as client: 123 | response = await client.get(url, headers=headers, params=params) 124 | response.raise_for_status() 125 | data = response.json() 126 | 127 | return [ 128 | ZohoRecord( 129 | id=record['ID'], 130 | form_link_name=report_link_name, 131 | data=record 132 | ) 133 | for record in data['data'] 134 | ] 135 | 136 | async def get_record( 137 | self, 138 | report_link_name: str, 139 | record_id: str 140 | ) -> ZohoRecord: 141 | """Get a specific record by ID.""" 142 | headers = await self.auth.get_authorized_headers() 143 | url = f"{self.base_url}/report/{report_link_name}/{record_id}" 144 | 145 | async with self._client as client: 146 | response = await client.get(url, headers=headers) 147 | response.raise_for_status() 148 | result = response.json() 149 | 150 | return ZohoRecord( 151 | id=record_id, 152 | form_link_name=report_link_name, 153 | data=result['data'] 154 | ) 155 | 156 | async def create_record( 157 | self, 158 | form_link_name: str, 159 | data: Dict[str, Any] 160 | ) -> ZohoRecord: 161 | """Create a new record in a form.""" 162 | headers = await self.auth.get_authorized_headers() 163 | url = f"{self.base_url}/form/{form_link_name}" 164 | 165 | async with self._client as client: 166 | response = await client.post( 167 | url, 168 | headers=headers, 169 | json={"data": data} 170 | ) 171 | response.raise_for_status() 172 | result = response.json() 173 | 174 | return ZohoRecord( 175 | id=result['record']['ID'], 176 | form_link_name=form_link_name, 177 | data=data 178 | ) 179 | 180 | async def update_record( 181 | self, 182 | report_link_name: str, 183 | record_id: str, 184 | data: Dict[str, Any] 185 | ) -> ZohoRecord: 186 | """Update an existing record in a form.""" 187 | headers = await self.auth.get_authorized_headers() 188 | url = f"{self.base_url}/report/{report_link_name}/{record_id}" 189 | 190 | async with self._client as client: 191 | response = await client.patch( 192 | url, 193 | headers=headers, 194 | json={"data": data} 195 | ) 196 | response.raise_for_status() 197 | result = response.json() 198 | 199 | return ZohoRecord( 200 | id=record_id, 201 | form_link_name=report_link_name, 202 | data=data 203 | ) 204 | 205 | async def close(self): 206 | """Close HTTP client.""" 207 | await self._client.aclose() 208 | 209 | async def fetch_data(self): 210 | logging.info("Fetching data from the API...") 211 | try: 212 | response = await self._client.get("your_api_endpoint") 213 | response.raise_for_status() # Raise an error for bad responses 214 | logging.info(f"Fetched data: {response.json()}") 215 | return response.json() 216 | except Exception as e: 217 | logging.error(f"Error fetching data: {e}") 218 | return None 219 | ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/server.py: -------------------------------------------------------------------------------- ```python 1 | # src_scaflog_zoho_mcp_server/server.py 2 | 3 | import json 4 | import logging 5 | from typing import Dict, List, Optional 6 | from urllib.parse import parse_qs, urlparse 7 | from pydantic import AnyUrl 8 | 9 | from mcp.server import NotificationOptions, Server 10 | from mcp.server.models import InitializationOptions 11 | import mcp.types as types 12 | import mcp.server.stdio 13 | 14 | from .config import load_config, API_BASE_URL 15 | from .auth import ZohoAuth 16 | from .service import ZohoCreatorService 17 | from .resource_config import WHITELISTED_RESOURCES 18 | 19 | # Configure logging 20 | logging.basicConfig(level=logging.DEBUG) 21 | logger = logging.getLogger(__name__) 22 | 23 | # Create a server instance 24 | server = Server("scaflog-zoho-mcp-server") 25 | config = load_config() 26 | auth = ZohoAuth(config) 27 | service = ZohoCreatorService(auth) 28 | 29 | @server.list_resources() 30 | async def handle_list_resources() -> list[types.Resource]: 31 | """List available whitelisted Zoho Creator forms and reports as resources.""" 32 | logger.debug("Starting handle_list_resources...") 33 | 34 | try: 35 | resources = [] 36 | 37 | # Add container resources 38 | resources.append( 39 | types.Resource( 40 | uri=AnyUrl("zoho://forms"), 41 | name="Available Forms", 42 | description="List of available Zoho Creator forms", 43 | mimeType="application/json" 44 | ) 45 | ) 46 | 47 | resources.append( 48 | types.Resource( 49 | uri=AnyUrl("zoho://reports"), 50 | name="Available Reports", 51 | description="List of available Zoho Creator reports", 52 | mimeType="application/json" 53 | ) 54 | ) 55 | 56 | # Add whitelisted forms 57 | for link_name, form_config in WHITELISTED_RESOURCES["forms"].items(): 58 | resources.append( 59 | types.Resource( 60 | uri=AnyUrl(f"zoho://form/{link_name}"), 61 | name=form_config.display_name, 62 | description=form_config.description, 63 | mimeType="application/json" 64 | ) 65 | ) 66 | 67 | # Add whitelisted reports 68 | for link_name, report_config in WHITELISTED_RESOURCES["reports"].items(): 69 | resources.append( 70 | types.Resource( 71 | uri=AnyUrl(f"zoho://report/{link_name}"), 72 | name=report_config.display_name, 73 | description=report_config.description, 74 | mimeType="application/json" 75 | ) 76 | ) 77 | 78 | return resources 79 | 80 | except Exception as e: 81 | logger.exception("Error in handle_list_resources") 82 | raise 83 | 84 | @server.read_resource() 85 | async def handle_read_resource(uri: AnyUrl) -> types.TextResourceContents | types.BlobResourceContents: 86 | """Read data from Zoho Creator based on the resource URI, filtered by whitelist.""" 87 | try: 88 | logger.info(f"Reading resource: {uri}") 89 | parsed = urlparse(str(uri)) 90 | 91 | if parsed.scheme != "zoho": 92 | raise ValueError(f"Unsupported URI scheme: {parsed.scheme}") 93 | 94 | full_path = f"{parsed.netloc}{parsed.path}".strip("/") 95 | path_parts = full_path.split("/") 96 | 97 | if not path_parts: 98 | raise ValueError("Empty resource path") 99 | 100 | resource_type = path_parts[0] 101 | 102 | # Handle root resources 103 | if resource_type == "forms": 104 | return types.TextResourceContents( 105 | uri=uri, 106 | mimeType="application/json", 107 | text=json.dumps({ 108 | "forms": [ 109 | { 110 | "link_name": link_name, 111 | "display_name": form.display_name, 112 | "description": form.description, 113 | "fields": { 114 | field_name: field.dict() 115 | for field_name, field in form.fields.items() 116 | } 117 | } 118 | for link_name, form in WHITELISTED_RESOURCES["forms"].items() 119 | ] 120 | }, indent=2) 121 | ) 122 | 123 | elif resource_type == "reports": 124 | return types.TextResourceContents( 125 | uri=uri, 126 | mimeType="application/json", 127 | text=json.dumps({ 128 | "reports": [ 129 | { 130 | "link_name": link_name, 131 | "display_name": report.display_name, 132 | "description": report.description, 133 | "fields": { 134 | field_name: field.dict() 135 | for field_name, field in report.fields.items() 136 | } 137 | } 138 | for link_name, report in WHITELISTED_RESOURCES["reports"].items() 139 | ] 140 | }, indent=2) 141 | ) 142 | 143 | # Handle specific resources 144 | if len(path_parts) < 2: 145 | raise ValueError(f"Missing link name for resource type: {resource_type}") 146 | 147 | link_name = path_parts[1] 148 | 149 | if resource_type == "form": 150 | # Check if form is whitelisted 151 | form_config = WHITELISTED_RESOURCES["forms"].get(link_name) 152 | if not form_config: 153 | raise ValueError(f"Form not found or not accessible: {link_name}") 154 | 155 | # Get form data from Zoho 156 | records = await service.get_records(link_name) 157 | 158 | # Filter fields based on whitelist 159 | filtered_records = [ 160 | { 161 | field_name: record.data.get(field_name) 162 | for field_name in form_config.fields.keys() 163 | if field_name in record.data 164 | } 165 | for record in records 166 | ] 167 | 168 | return types.TextResourceContents( 169 | uri=uri, 170 | mimeType="application/json", 171 | text=json.dumps({ 172 | "form": { 173 | "link_name": link_name, 174 | "display_name": form_config.display_name, 175 | "description": form_config.description, 176 | "fields": { 177 | name: field.dict() 178 | for name, field in form_config.fields.items() 179 | } 180 | }, 181 | "records": filtered_records 182 | }, indent=2) 183 | ) 184 | 185 | elif resource_type == "report": 186 | # Check if report is whitelisted 187 | report_config = WHITELISTED_RESOURCES["reports"].get(link_name) 188 | if not report_config: 189 | raise ValueError(f"Report not found or not accessible: {link_name}") 190 | 191 | # Get report data from Zoho 192 | records = await service.get_records(link_name) 193 | 194 | # Filter fields based on whitelist 195 | filtered_records = [ 196 | { 197 | field_name: record.data.get(field_name) 198 | for field_name in report_config.fields.keys() 199 | if field_name in record.data 200 | } 201 | for record in records 202 | ] 203 | 204 | return types.TextResourceContents( 205 | uri=uri, 206 | mimeType="application/json", 207 | text=json.dumps({ 208 | "report": { 209 | "link_name": link_name, 210 | "display_name": report_config.display_name, 211 | "description": report_config.description, 212 | "fields": { 213 | name: field.dict() 214 | for name, field in report_config.fields.items() 215 | } 216 | }, 217 | "records": filtered_records 218 | }, indent=2) 219 | ) 220 | 221 | else: 222 | raise ValueError(f"Unknown resource type: {resource_type}") 223 | 224 | except Exception as e: 225 | logger.exception(f"Error reading resource: {uri}") 226 | raise 227 | 228 | async def main(): 229 | """Main entry point for the server.""" 230 | logger.info("Starting Zoho Creator MCP server...") 231 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): 232 | try: 233 | logger.info("Initializing server connection...") 234 | await server.run( 235 | read_stream, 236 | write_stream, 237 | InitializationOptions( 238 | server_name="scaflog-zoho-mcp-server", 239 | server_version="0.1.0", 240 | capabilities=server.get_capabilities( 241 | notification_options=NotificationOptions(), 242 | experimental_capabilities={}, 243 | ), 244 | ), 245 | ) 246 | except Exception as e: 247 | logger.exception("Error running server") 248 | raise 249 | finally: 250 | logger.info("Shutting down server...") 251 | await auth.close() 252 | await service.close() 253 | ```