# Directory Structure ``` ├── .gitignore ├── .python-version ├── .vscode │ └── settings.json ├── ~ │ └── .zshrc ├── app.log ├── pyproject.toml ├── README.md ├── src │ ├── .DS_Store │ └── scaflog_zoho_mcp_server │ ├── __init__.py │ ├── .DS_Store │ ├── auth.py │ ├── config.py │ ├── models.py │ ├── resource_config.py │ ├── server.py │ └── service.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── test_server.py │ └── test_service.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12 ``` -------------------------------------------------------------------------------- /~/.zshrc: -------------------------------------------------------------------------------- ``` # Add these lines to the end of your .zshrc alias python="python3" alias pip="pip3" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Environment variables .env # Flatten repo flatten/ # other app.log .DS_Store ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # scaflog-zoho-mcp-server MCP server Zoho Creator Scaflog App MCP Server ## Components ### Resources The server implements a simple note storage system with: - Custom note:// URI scheme for accessing individual notes - Each note resource has a name, description and text/plain mimetype ### Prompts The server provides a single prompt: - summarize-notes: Creates summaries of all stored notes - Optional "style" argument to control detail level (brief/detailed) - Generates prompt combining all current notes with style preference ### Tools The server implements one tool: - add-note: Adds a new note to the server - Takes "name" and "content" as required string arguments - Updates server state and notifies clients of resource changes ## Configuration [TODO: Add configuration details specific to your implementation] ## Quickstart ### Install #### Claude Desktop On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json` On Windows: `%APPDATA%/Claude/claude_desktop_config.json` <details> <summary>Development/Unpublished Servers Configuration</summary> ``` "mcpServers": { "scaflog-zoho-mcp-server": { "command": "uv", "args": [ "--directory", "/Users/alexsherin/Documents/Projects/MCP Servers", "run", "scaflog-zoho-mcp-server" ] } } ``` </details> <details> <summary>Published Servers Configuration</summary> ``` "mcpServers": { "scaflog-zoho-mcp-server": { "command": "uvx", "args": [ "scaflog-zoho-mcp-server" ] } } ``` </details> ## Development ### Building and Publishing To prepare the package for distribution: 1. Sync dependencies and update lockfile: ```bash uv sync ``` 2. Build package distributions: ```bash uv build ``` This will create source and wheel distributions in the `dist/` directory. 3. Publish to PyPI: ```bash uv publish ``` Note: You'll need to set PyPI credentials via environment variables or command flags: - Token: `--token` or `UV_PUBLISH_TOKEN` - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD` ### Debugging Since MCP servers run over stdio, debugging can be challenging. For the best debugging experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command: ```bash npx @modelcontextprotocol/inspector uv --directory /Users/alexsherin/Documents/Projects/MCP Servers run scaflog-zoho-mcp-server ``` Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging. ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python # tests/__init__.py # import pytest # @pytest.fixture(scope="module") # def mock_server(): # return ZohoCreatorServer() ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/__init__.py: -------------------------------------------------------------------------------- ```python from . import server import asyncio def main(): """Main entry point for the package.""" asyncio.run(server.main()) # Optionally expose other important items at package level __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- ```json { "window.commandCenter": 1, "workbench.colorTheme": "Material Theme", "workbench.iconTheme": "material-icon-theme", "editor.fontSize": 14, "files.autoSave": "afterDelay", "files.autoSaveWorkspaceFilesOnly": true, "git.postCommitCommand": "push", "git.enableSmartCommit": false, "git.defaultCommitMessageTemplate": "feat: ", "git.followTagsWhenSync": true, "git.inputValidationSubjectLength": 72, "git.inputValidationLength": 500, "scm.defaultViewMode": "tree", "git.useCommitInputAsStashMessage": true } ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "scaflog-zoho-mcp-server" version = "0.1.0" description = "MCP Server for Zoho Creator Integration" readme = "README.md" authors = [ { name = "alexsherin", email = "[email protected]" } ] requires-python = ">=3.12" dependencies = [ "mcp>=1.1.0", "zcrmsdk==3.1.0", "pydantic>=2.0.0", "python-dotenv>=1.0.0", "typing-extensions>=4.7.0", "httpx>=0.24.0", ] [project.scripts] scaflog-zoho-mcp-server = "scaflog_zoho_mcp_server:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/scaflog_zoho_mcp_server"] [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" testpaths = ["tests"] [project.optional-dependencies] test = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", "pytest-cov>=4.1.0", ] ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/config.py: -------------------------------------------------------------------------------- ```python from pathlib import Path import os from typing import Optional from pydantic import BaseModel, Field from dotenv import load_dotenv class ZohoCreatorConfig(BaseModel): """Configuration for Zoho Creator API access.""" client_id: str = Field(..., description="OAuth client ID") client_secret: str = Field(..., description="OAuth client secret") refresh_token: str = Field(..., description="OAuth refresh token") organization_id: str = Field(..., description="Zoho organization ID") environment: str = Field(default="production", description="Zoho environment (production/sandbox)") access_token: Optional[str] = Field(default=None, description="Current access token") def load_config() -> ZohoCreatorConfig: """Load configuration from environment variables or .env file.""" # Try to load from .env file if it exists env_path = Path(".env") if env_path.exists(): load_dotenv(env_path) return ZohoCreatorConfig( client_id=os.getenv("ZOHO_CLIENT_ID", ""), client_secret=os.getenv("ZOHO_CLIENT_SECRET", ""), refresh_token=os.getenv("ZOHO_REFRESH_TOKEN", ""), organization_id=os.getenv("ZOHO_ORGANIZATION_ID", ""), environment=os.getenv("ZOHO_ENVIRONMENT", "production"), ) # API endpoints for different environments API_BASE_URL = { "production": "https://creator.zoho.com/api/v2/100rails/goscaffold", "sandbox": "https://creator.zoho.com/api/v2/sandbox" } ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/auth.py: -------------------------------------------------------------------------------- ```python # src_scaflog_zoho_mcp_server/auth.py import time from typing import Optional import httpx from pydantic import BaseModel, Field from .config import ZohoCreatorConfig, API_BASE_URL class TokenInfo(BaseModel): """Model for storing token information.""" access_token: str expires_in: int created_at: float = Field(default_factory=time.time) @property def is_expired(self) -> bool: """Check if the token is expired with a 5-minute buffer.""" return time.time() > (self.created_at + self.expires_in - 300) class ZohoAuth: """Handles authentication with Zoho Creator API.""" def __init__(self, config: ZohoCreatorConfig): self.config = config self._token_info: Optional[TokenInfo] = None self._client = httpx.AsyncClient(timeout=30.0) async def get_access_token(self) -> str: """Get a valid access token, refreshing if necessary.""" if not self._token_info or self._token_info.is_expired: await self._refresh_token() return self._token_info.access_token async def _refresh_token(self) -> None: """Refresh the access token using the refresh token.""" token_url = "https://accounts.zoho.com/oauth/v2/token" params = { "client_id": self.config.client_id, "client_secret": self.config.client_secret, "refresh_token": self.config.refresh_token, "grant_type": "refresh_token", "redirect_uri": "https://www.zohoapis.com" } async with self._client as client: response = await client.post(token_url, params=params) response.raise_for_status() data = response.json() self._token_info = TokenInfo( access_token=data["access_token"], expires_in=data["expires_in"] ) async def get_authorized_headers(self) -> dict: """Get headers with authorization token.""" token = await self.get_access_token() return { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } async def close(self): """Close the HTTP client.""" await self._client.aclose() ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/resource_config.py: -------------------------------------------------------------------------------- ```python # src_scaflog_zoho_mcp_server/resource_config.py from typing import Dict, List, Optional from pydantic import BaseModel class FieldConfig(BaseModel): """Configuration for a whitelisted field.""" display_name: str description: Optional[str] = None required: bool = False class FormConfig(BaseModel): """Configuration for a whitelisted form.""" link_name: str display_name: str description: Optional[str] = None fields: Dict[str, FieldConfig] class ReportConfig(BaseModel): """Configuration for a whitelisted report.""" link_name: str display_name: str description: Optional[str] = None fields: Dict[str, FieldConfig] # Define the whitelisted resources WHITELISTED_RESOURCES = { "forms": { "Company_Info": FormConfig( link_name="Company_Info", display_name="Company Information", description="Core company details and profile", fields={ "Company_Name": FieldConfig( display_name="Company Name", description="Legal name of the company", required=True ), "Phone": FieldConfig( display_name="Phone Number", description="Primary contact number" ), "Email": FieldConfig( display_name="Email", description="Primary contact email" ), "Industry": FieldConfig( display_name="Industry", description="Company's primary industry" ) } ), # Add more forms as needed }, "reports": { "Company_All_Data": ReportConfig( link_name="Company_All_Data", display_name="Company Overview", description="Comprehensive view of company information", fields={ "Company_Name": FieldConfig( display_name="Company Name", description="Legal name of the company" ), "Industry": FieldConfig( display_name="Industry", description="Company's primary industry" ), "Status": FieldConfig( display_name="Status", description="Current company status" ) } ), # Add more reports as needed } } ``` -------------------------------------------------------------------------------- /tests/test_server.py: -------------------------------------------------------------------------------- ```python # tests/test_server.py import pytest from pydantic import AnyUrl import mcp.types as types from mcp import ClientSession import logging # Configure logging for the test logging.basicConfig(level=logging.INFO) @pytest.mark.asyncio async def test_server_resources(client_session: ClientSession): """Test server resources functionality.""" logging.info("Starting test_server_resources...") resources = await client_session.list_resources() logging.info(f"Resources: {resources}") # Log the resources assert len(resources) > 0 assert any(str(r.uri) == "zoho://forms" for r in resources) assert any(str(r.uri) == "zoho://forms/test_form" for r in resources) # Read a resource resource = await client_session.read_resource("zoho://forms/test_form") assert resource.mimeType == "application/json" assert "form" in resource.text assert "records" in resource.text @pytest.mark.asyncio async def test_server_tools(client_session: ClientSession): """Test server tools functionality.""" # List available tools tools = await client_session.list_tools() assert any(tool.name == "create-record" for tool in tools) # Call a tool result = await client_session.call_tool( "create-record", arguments={ "form_name": "test_form", "data": {"test_field": "test_value"} } ) assert len(result) == 1 assert result[0].type == "text" assert "created successfully" in result[0].text @pytest.mark.asyncio async def test_real_server_resources(client_session: ClientSession): """Test server resources functionality with real data.""" # List available resources resources = await client_session.list_resources() print("Resources:", resources) # Log the resources assert len(resources) > 0 assert any(str(r.uri) == "zoho://forms" for r in resources) assert any(str(r.uri) == "zoho://form/Company_Info" for r in resources) # Read a resource resource = await client_session.read_resource("zoho://report/Company_All_Data") print("Resource:", resource) # Log the resource assert resource.mimeType == "application/json" assert "form" in resource.text assert "records" in resource.text @pytest.mark.asyncio async def test_real_server_tools(client_session: ClientSession): """Test server tools functionality with real data.""" # List available tools tools = await client_session.list_tools() print("Tools:", tools) # Log the tools assert any(tool.name == "create-record" for tool in tools) # Call a tool result = await client_session.call_tool( "create-record", arguments={ "form_name": "test_form", "data": {"test_field": "test_value"} } ) print("Tool Call Result:", result) # Log the result assert len(result) == 1 assert result[0].type == "text" assert "created successfully" in result[0].text ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/models.py: -------------------------------------------------------------------------------- ```python # src_scaflog_zoho_mcp_server/models.py from datetime import datetime from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field class ZohoField(BaseModel): """Represents a field in a Zoho Creator form.""" link_name: str = Field(..., description="The API name of the field") display_name: str = Field(..., description="The display name of the field") field_type: int = Field(..., description="The type of the field (e.g., text, number)") required: bool = Field(default=False, description="Indicates if the field is mandatory") unique: bool = Field(default=False, description="Indicates if the field must be unique") max_char: Optional[int] = Field(default=None, description="Maximum character length for the field") lookup: Optional[bool] = Field(default=None, description="Indicates if the field is a lookup field") choices: Optional[List[dict]] = Field(default=None, description="List of choices for the field if applicable") class ZohoForm(BaseModel): """Represents a form in Zoho Creator.""" link_name: str = Field(..., description="API link name of the form") display_name: str = Field(..., description="Display name of the form") type: int = Field(..., description="Type of the form") fields: List[ZohoField] = Field(default_factory=list) class ZohoRecord(BaseModel): """Represents a record in a Zoho Creator form.""" id: str form_link_name: str data: Dict[str, Any] # Remove created_time and modified_time if they are not in the response # created_time: Optional[datetime] = None # modified_time: Optional[datetime] = None class ZohoReport(BaseModel): """Represents a report in Zoho Creator.""" link_name: str = Field(..., description="API link name of the report") display_name: str = Field(..., description="Display name of the report") type: int = Field(..., description="Type of the reprt") class Cache: """Simple cache for form metadata.""" def __init__(self, ttl_seconds: int = 300): self.forms: Dict[str, ZohoForm] = {} self.reports: Dict[str, ZohoReport] = {} self.ttl = ttl_seconds self.last_refresh: Optional[datetime] = None def needs_refresh(self) -> bool: """Check if cache needs refreshing.""" if not self.last_refresh: return True return (datetime.now() - self.last_refresh).total_seconds() > self.ttl def update_forms(self, forms: List[ZohoForm]): """Update cached forms.""" self.forms = {form.link_name: form for form in forms} self.last_refresh = datetime.now() def get_form(self, link_name: str) -> Optional[ZohoForm]: """Get a form from cache by link name.""" return self.forms.get(link_name) def update_reports(self, reports: List[ZohoReport]): """Update cached reports.""" self.reports = {report.link_name: report for report in reports} self.last_refresh = datetime.now() ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python # tests/conftest.py import pytest from unittest.mock import AsyncMock from typing import AsyncGenerator, Generator import httpx from datetime import datetime import os from pathlib import Path import sys from dotenv import load_dotenv # Import dotenv to load environment variables import logging # Import logging # Import the necessary classes from scaflog_zoho_mcp_server.config import ZohoCreatorConfig from scaflog_zoho_mcp_server.auth import ZohoAuth from scaflog_zoho_mcp_server.service import ZohoCreatorService from mcp import ClientSession, StdioServerParameters # Add this import from mcp.client.stdio import stdio_client # Add this import # Configure logging to write to a file logging.basicConfig( filename='app.log', # Specify the log file name filemode='a', # Append mode format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO # Set the logging level ) # Log that the tests are starting logging.info("Starting tests...") # Load environment variables from .env file load_dotenv() @pytest.fixture(scope="session") def test_env() -> Generator[dict, None, None]: """Create a test environment with necessary configuration.""" env = { "ZOHO_CLIENT_ID": os.getenv("ZOHO_CLIENT_ID"), "ZOHO_CLIENT_SECRET": os.getenv("ZOHO_CLIENT_SECRET"), "ZOHO_REFRESH_TOKEN": os.getenv("ZOHO_REFRESH_TOKEN"), "ZOHO_ORGANIZATION_ID": os.getenv("ZOHO_ORGANIZATION_ID"), "ZOHO_ENVIRONMENT": os.getenv("ZOHO_ENVIRONMENT"), } yield env @pytest.fixture async def mock_service() -> AsyncGenerator[ZohoCreatorService, None]: """Create a service with actual data from Zoho Creator.""" config = ZohoCreatorConfig( client_id=os.getenv("ZOHO_CLIENT_ID"), client_secret=os.getenv("ZOHO_CLIENT_SECRET"), refresh_token=os.getenv("ZOHO_REFRESH_TOKEN"), organization_id=os.getenv("ZOHO_ORGANIZATION_ID"), environment=os.getenv("ZOHO_ENVIRONMENT") ) auth = ZohoAuth(config) # Create a new instance of ZohoCreatorService service = ZohoCreatorService(auth) # Create a new HTTP client for each test service._client = httpx.AsyncClient() # Create a new client instance yield service await service._client.aclose() # Close the client after the test await service.close() # Close the service @pytest.fixture async def client_session(test_env) -> AsyncGenerator[ClientSession, None]: """Create a client session connected to the test server.""" python_path = sys.executable project_root = Path(__file__).parent.parent server_params = StdioServerParameters( command=python_path, args=["-m", "src"], env={ **os.environ, **test_env, "PYTHONPATH": str(project_root) } ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() yield session ``` -------------------------------------------------------------------------------- /tests/test_service.py: -------------------------------------------------------------------------------- ```python # tests/test_service.py import pytest from datetime import datetime import logging # Configure logging for the test logging.basicConfig(level=logging.INFO) from scaflog_zoho_mcp_server.service import ZohoCreatorService @pytest.mark.asyncio async def test_list_forms(mock_service: ZohoCreatorService): """Test listing forms.""" logging.info("Starting test_list_forms...") forms = await mock_service.list_forms(force_refresh=True) logging.info(f"Fetched forms: {[form.display_name for form in forms]}") # Log the display names of the forms assert len(forms) > 0 # Ensure that at least one form is returned assert all(hasattr(form, 'link_name') for form in forms) # Check that each form has a link_name assert all(hasattr(form, 'display_name') for form in forms) # Check that each form has a display_name @pytest.mark.asyncio async def test_get_records(mock_service: ZohoCreatorService): """Test getting records.""" logging.info("Starting test_get_records...") records = await mock_service.get_records("test_form") logging.info(f"Fetched records: {records}") # Log the fetched records assert len(records) == 1 assert records[0].id == "123" assert records[0].data["ID"] == "test_value" @pytest.mark.asyncio async def test_create_record(mock_service: ZohoCreatorService): """Test creating a record.""" record = await mock_service.create_record( "test_form", {"test_field": "new_value"} ) assert record.id == "123" assert record.form_link_name == "test_form" assert record.data["test_field"] == "new_value" @pytest.mark.asyncio async def test_update_record(mock_service: ZohoCreatorService): """Test updating a record.""" record = await mock_service.update_record( "test_form", "123", {"test_field": "updated_value"} ) assert record.id == "123" assert record.form_link_name == "test_form" assert record.data["test_field"] == "updated_value" @pytest.mark.asyncio async def test_fetch_data(mock_service): logging.info("Starting test_fetch_data...") data = await mock_service.fetch_data() logging.info(f"Fetched data: {data}") assert data is not None # Example assertion @pytest.mark.asyncio async def test_fetch_all_records(mock_service: ZohoCreatorService): """Test fetching all records from the Company_Info report.""" logging.info("Starting test_fetch_all_records...") # Fetch all records for the report "Company_All_Data_Report" records = await mock_service.get_records("Company_All_Data") # Use the report link name # Log the fetched records logging.info(f"Fetched records: {records}") # Assertions to verify the records assert len(records) > 0 # Ensure that at least one record is returned for record in records: assert isinstance(record.id, str) # Ensure each record has a valid ID # assert "Company_Info" in record.data # Ensure the record contains data for the form # You can add more tests below... ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/service.py: -------------------------------------------------------------------------------- ```python # src_scaflog_zoho_mcp_server/service.py from typing import List, Optional, Dict, Any import httpx from datetime import datetime import logging from .models import ZohoForm, ZohoReport, ZohoField, ZohoRecord, Cache from .auth import ZohoAuth from .config import API_BASE_URL # Configure logging to write to a file logging.basicConfig( filename='app.log', # Specify the log file name filemode='a', # Append mode format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO # Set the logging level ) class ZohoCreatorService: """Service for interacting with Zoho Creator API.""" def __init__(self, auth: ZohoAuth): self.auth = auth self.cache = Cache() self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0)) self.base_url = API_BASE_URL[auth.config.environment] async def list_forms(self, force_refresh: bool = False) -> List[ZohoForm]: """Get all available forms.""" if not force_refresh and not self.cache.needs_refresh(): return list(self.cache.forms.values()) headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/forms" response = await self._client.get(url, headers=headers) response.raise_for_status() data = response.json() logging.info(f"Response from list_forms: {data}") # Log the entire response forms = [] for form_data in data['forms'][:10]: logging.info(f"Processing form: {form_data['link_name']}") # Log the link_name fields = await self._get_form_fields(form_data['link_name'], headers) form = ZohoForm( link_name=form_data['link_name'], display_name=form_data['display_name'], fields=fields, type=form_data['type'] ) forms.append(form) self.cache.update_forms(forms) return forms async def _get_form_fields(self, form_link_name: str, headers: dict) -> List[ZohoField]: """Get fields for a specific form.""" url = f"{self.base_url}/form/{form_link_name}/fields" response = await self._client.get(url, headers=headers) response.raise_for_status() data = response.json() # logging.info(f"Response from _get_form_fields: {data}") # Log the entire response return [ ZohoField( link_name=field['link_name'], display_name=field['display_name'], field_type=field['type'], required=field['mandatory'], unique=field['unique'], max_char=field.get('max_char'), lookup=field.get('is_lookup_field'), choices=field.get('choices') ) for field in data['fields'] ] async def list_reports(self, force_refresh: bool = False) -> List[ZohoReport]: """Get all available reports.""" if not force_refresh and not self.cache.needs_refresh(): return list(self.cache.reports.values()) headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/reports" response = await self._client.get(url, headers=headers) response.raise_for_status() data = response.json() logging.info(f"Response from list_reports: {data}") # Log the entire response reports = [] for form_data in data['reports'][:10]: logging.info(f"Processing reporrt: {form_data['link_name']}") # Log the link_name form = ZohoReport( link_name=form_data['link_name'], display_name=form_data['display_name'], type=form_data['type'] ) reports.append(form) self.cache.update_reports(reports) return reports async def get_records( self, report_link_name: str, criteria: Optional[str] = None, limit: Optional[int] = None ) -> List[ZohoRecord]: """Get records from a specific report.""" headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/report/{report_link_name}" params = {} if criteria: params['criteria'] = criteria if limit: params['limit'] = limit async with self._client as client: response = await client.get(url, headers=headers, params=params) response.raise_for_status() data = response.json() return [ ZohoRecord( id=record['ID'], form_link_name=report_link_name, data=record ) for record in data['data'] ] async def get_record( self, report_link_name: str, record_id: str ) -> ZohoRecord: """Get a specific record by ID.""" headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/report/{report_link_name}/{record_id}" async with self._client as client: response = await client.get(url, headers=headers) response.raise_for_status() result = response.json() return ZohoRecord( id=record_id, form_link_name=report_link_name, data=result['data'] ) async def create_record( self, form_link_name: str, data: Dict[str, Any] ) -> ZohoRecord: """Create a new record in a form.""" headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/form/{form_link_name}" async with self._client as client: response = await client.post( url, headers=headers, json={"data": data} ) response.raise_for_status() result = response.json() return ZohoRecord( id=result['record']['ID'], form_link_name=form_link_name, data=data ) async def update_record( self, report_link_name: str, record_id: str, data: Dict[str, Any] ) -> ZohoRecord: """Update an existing record in a form.""" headers = await self.auth.get_authorized_headers() url = f"{self.base_url}/report/{report_link_name}/{record_id}" async with self._client as client: response = await client.patch( url, headers=headers, json={"data": data} ) response.raise_for_status() result = response.json() return ZohoRecord( id=record_id, form_link_name=report_link_name, data=data ) async def close(self): """Close HTTP client.""" await self._client.aclose() async def fetch_data(self): logging.info("Fetching data from the API...") try: response = await self._client.get("your_api_endpoint") response.raise_for_status() # Raise an error for bad responses logging.info(f"Fetched data: {response.json()}") return response.json() except Exception as e: logging.error(f"Error fetching data: {e}") return None ``` -------------------------------------------------------------------------------- /src/scaflog_zoho_mcp_server/server.py: -------------------------------------------------------------------------------- ```python # src_scaflog_zoho_mcp_server/server.py import json import logging from typing import Dict, List, Optional from urllib.parse import parse_qs, urlparse from pydantic import AnyUrl from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions import mcp.types as types import mcp.server.stdio from .config import load_config, API_BASE_URL from .auth import ZohoAuth from .service import ZohoCreatorService from .resource_config import WHITELISTED_RESOURCES # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Create a server instance server = Server("scaflog-zoho-mcp-server") config = load_config() auth = ZohoAuth(config) service = ZohoCreatorService(auth) @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """List available whitelisted Zoho Creator forms and reports as resources.""" logger.debug("Starting handle_list_resources...") try: resources = [] # Add container resources resources.append( types.Resource( uri=AnyUrl("zoho://forms"), name="Available Forms", description="List of available Zoho Creator forms", mimeType="application/json" ) ) resources.append( types.Resource( uri=AnyUrl("zoho://reports"), name="Available Reports", description="List of available Zoho Creator reports", mimeType="application/json" ) ) # Add whitelisted forms for link_name, form_config in WHITELISTED_RESOURCES["forms"].items(): resources.append( types.Resource( uri=AnyUrl(f"zoho://form/{link_name}"), name=form_config.display_name, description=form_config.description, mimeType="application/json" ) ) # Add whitelisted reports for link_name, report_config in WHITELISTED_RESOURCES["reports"].items(): resources.append( types.Resource( uri=AnyUrl(f"zoho://report/{link_name}"), name=report_config.display_name, description=report_config.description, mimeType="application/json" ) ) return resources except Exception as e: logger.exception("Error in handle_list_resources") raise @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> types.TextResourceContents | types.BlobResourceContents: """Read data from Zoho Creator based on the resource URI, filtered by whitelist.""" try: logger.info(f"Reading resource: {uri}") parsed = urlparse(str(uri)) if parsed.scheme != "zoho": raise ValueError(f"Unsupported URI scheme: {parsed.scheme}") full_path = f"{parsed.netloc}{parsed.path}".strip("/") path_parts = full_path.split("/") if not path_parts: raise ValueError("Empty resource path") resource_type = path_parts[0] # Handle root resources if resource_type == "forms": return types.TextResourceContents( uri=uri, mimeType="application/json", text=json.dumps({ "forms": [ { "link_name": link_name, "display_name": form.display_name, "description": form.description, "fields": { field_name: field.dict() for field_name, field in form.fields.items() } } for link_name, form in WHITELISTED_RESOURCES["forms"].items() ] }, indent=2) ) elif resource_type == "reports": return types.TextResourceContents( uri=uri, mimeType="application/json", text=json.dumps({ "reports": [ { "link_name": link_name, "display_name": report.display_name, "description": report.description, "fields": { field_name: field.dict() for field_name, field in report.fields.items() } } for link_name, report in WHITELISTED_RESOURCES["reports"].items() ] }, indent=2) ) # Handle specific resources if len(path_parts) < 2: raise ValueError(f"Missing link name for resource type: {resource_type}") link_name = path_parts[1] if resource_type == "form": # Check if form is whitelisted form_config = WHITELISTED_RESOURCES["forms"].get(link_name) if not form_config: raise ValueError(f"Form not found or not accessible: {link_name}") # Get form data from Zoho records = await service.get_records(link_name) # Filter fields based on whitelist filtered_records = [ { field_name: record.data.get(field_name) for field_name in form_config.fields.keys() if field_name in record.data } for record in records ] return types.TextResourceContents( uri=uri, mimeType="application/json", text=json.dumps({ "form": { "link_name": link_name, "display_name": form_config.display_name, "description": form_config.description, "fields": { name: field.dict() for name, field in form_config.fields.items() } }, "records": filtered_records }, indent=2) ) elif resource_type == "report": # Check if report is whitelisted report_config = WHITELISTED_RESOURCES["reports"].get(link_name) if not report_config: raise ValueError(f"Report not found or not accessible: {link_name}") # Get report data from Zoho records = await service.get_records(link_name) # Filter fields based on whitelist filtered_records = [ { field_name: record.data.get(field_name) for field_name in report_config.fields.keys() if field_name in record.data } for record in records ] return types.TextResourceContents( uri=uri, mimeType="application/json", text=json.dumps({ "report": { "link_name": link_name, "display_name": report_config.display_name, "description": report_config.description, "fields": { name: field.dict() for name, field in report_config.fields.items() } }, "records": filtered_records }, indent=2) ) else: raise ValueError(f"Unknown resource type: {resource_type}") except Exception as e: logger.exception(f"Error reading resource: {uri}") raise async def main(): """Main entry point for the server.""" logger.info("Starting Zoho Creator MCP server...") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): try: logger.info("Initializing server connection...") await server.run( read_stream, write_stream, InitializationOptions( server_name="scaflog-zoho-mcp-server", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) except Exception as e: logger.exception("Error running server") raise finally: logger.info("Shutting down server...") await auth.close() await service.close() ```