#
tokens: 11815/50000 17/17 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── .vscode
│   └── settings.json
├── ~
│   └── .zshrc
├── app.log
├── pyproject.toml
├── README.md
├── src
│   ├── .DS_Store
│   └── scaflog_zoho_mcp_server
│       ├── __init__.py
│       ├── .DS_Store
│       ├── auth.py
│       ├── config.py
│       ├── models.py
│       ├── resource_config.py
│       ├── server.py
│       └── service.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_server.py
│   └── test_service.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/~/.zshrc:
--------------------------------------------------------------------------------

```
1 | # Add these lines to the end of your .zshrc
2 | alias python="python3"
3 | alias pip="pip3" 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | 
12 | # Environment variables
13 | .env
14 | 
15 | # Flatten repo
16 | flatten/
17 | 
18 | # other
19 | app.log
20 | .DS_Store
21 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # scaflog-zoho-mcp-server MCP server
  2 | 
  3 | Zoho Creator Scaflog App MCP Server
  4 | 
  5 | ## Components
  6 | 
  7 | ### Resources
  8 | 
  9 | The server implements a simple note storage system with:
 10 | - Custom note:// URI scheme for accessing individual notes
 11 | - Each note resource has a name, description and text/plain mimetype
 12 | 
 13 | ### Prompts
 14 | 
 15 | The server provides a single prompt:
 16 | - summarize-notes: Creates summaries of all stored notes
 17 |   - Optional "style" argument to control detail level (brief/detailed)
 18 |   - Generates prompt combining all current notes with style preference
 19 | 
 20 | ### Tools
 21 | 
 22 | The server implements one tool:
 23 | - add-note: Adds a new note to the server
 24 |   - Takes "name" and "content" as required string arguments
 25 |   - Updates server state and notifies clients of resource changes
 26 | 
 27 | ## Configuration
 28 | 
 29 | [TODO: Add configuration details specific to your implementation]
 30 | 
 31 | ## Quickstart
 32 | 
 33 | ### Install
 34 | 
 35 | #### Claude Desktop
 36 | 
 37 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
 38 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
 39 | 
 40 | <details>
 41 |   <summary>Development/Unpublished Servers Configuration</summary>
 42 |   ```
 43 |   "mcpServers": {
 44 |     "scaflog-zoho-mcp-server": {
 45 |       "command": "uv",
 46 |       "args": [
 47 |         "--directory",
 48 |         "/Users/alexsherin/Documents/Projects/MCP Servers",
 49 |         "run",
 50 |         "scaflog-zoho-mcp-server"
 51 |       ]
 52 |     }
 53 |   }
 54 |   ```
 55 | </details>
 56 | 
 57 | <details>
 58 |   <summary>Published Servers Configuration</summary>
 59 |   ```
 60 |   "mcpServers": {
 61 |     "scaflog-zoho-mcp-server": {
 62 |       "command": "uvx",
 63 |       "args": [
 64 |         "scaflog-zoho-mcp-server"
 65 |       ]
 66 |     }
 67 |   }
 68 |   ```
 69 | </details>
 70 | 
 71 | ## Development
 72 | 
 73 | ### Building and Publishing
 74 | 
 75 | To prepare the package for distribution:
 76 | 
 77 | 1. Sync dependencies and update lockfile:
 78 | ```bash
 79 | uv sync
 80 | ```
 81 | 
 82 | 2. Build package distributions:
 83 | ```bash
 84 | uv build
 85 | ```
 86 | 
 87 | This will create source and wheel distributions in the `dist/` directory.
 88 | 
 89 | 3. Publish to PyPI:
 90 | ```bash
 91 | uv publish
 92 | ```
 93 | 
 94 | Note: You'll need to set PyPI credentials via environment variables or command flags:
 95 | - Token: `--token` or `UV_PUBLISH_TOKEN`
 96 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
 97 | 
 98 | ### Debugging
 99 | 
100 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging
101 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
102 | 
103 | 
104 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
105 | 
106 | ```bash
107 | npx @modelcontextprotocol/inspector uv --directory /Users/alexsherin/Documents/Projects/MCP Servers run scaflog-zoho-mcp-server
108 | ```
109 | 
110 | 
111 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # tests/__init__.py
2 | 
3 | # import pytest
4 | 
5 | # @pytest.fixture(scope="module")
6 | # def mock_server():
7 | #     return ZohoCreatorServer()
8 | 
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
1 | from . import server
2 | import asyncio
3 | 
4 | def main():
5 |     """Main entry point for the package."""
6 |     asyncio.run(server.main())
7 | 
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |     "window.commandCenter": 1,
 3 |     "workbench.colorTheme": "Material Theme",
 4 |     "workbench.iconTheme": "material-icon-theme",
 5 |     "editor.fontSize": 14,
 6 |     "files.autoSave": "afterDelay",
 7 |     "files.autoSaveWorkspaceFilesOnly": true,
 8 |     "git.postCommitCommand": "push",
 9 |     "git.enableSmartCommit": false,
10 |     "git.defaultCommitMessageTemplate": "feat: ",
11 |     "git.followTagsWhenSync": true,
12 |     "git.inputValidationSubjectLength": 72,
13 |     "git.inputValidationLength": 500,
14 |     "scm.defaultViewMode": "tree",
15 |     "git.useCommitInputAsStashMessage": true
16 | }
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "scaflog-zoho-mcp-server"
 3 | version = "0.1.0"
 4 | description = "MCP Server for Zoho Creator Integration"
 5 | readme = "README.md"
 6 | authors = [
 7 |     { name = "alexsherin", email = "[email protected]" }
 8 | ]
 9 | requires-python = ">=3.12"
10 | dependencies = [
11 |     "mcp>=1.1.0",
12 |     "zcrmsdk==3.1.0",
13 |     "pydantic>=2.0.0",
14 |     "python-dotenv>=1.0.0",
15 |     "typing-extensions>=4.7.0",
16 |     "httpx>=0.24.0",
17 | ]
18 | 
19 | [project.scripts]
20 | scaflog-zoho-mcp-server = "scaflog_zoho_mcp_server:main"
21 | 
22 | [build-system]
23 | requires = ["hatchling"]
24 | build-backend = "hatchling.build"
25 | 
26 | [tool.hatch.build.targets.wheel]
27 | packages = ["src/scaflog_zoho_mcp_server"]
28 | 
29 | [tool.pytest.ini_options]
30 | asyncio_mode = "auto"
31 | asyncio_default_fixture_loop_scope = "function"
32 | testpaths = ["tests"]
33 | 
34 | [project.optional-dependencies]
35 | test = [
36 |     "pytest>=7.0.0",
37 |     "pytest-asyncio>=0.21.0",
38 |     "pytest-cov>=4.1.0",
39 | ]
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/config.py:
--------------------------------------------------------------------------------

```python
 1 | from pathlib import Path
 2 | import os
 3 | from typing import Optional
 4 | from pydantic import BaseModel, Field
 5 | from dotenv import load_dotenv
 6 | 
 7 | class ZohoCreatorConfig(BaseModel):
 8 |     """Configuration for Zoho Creator API access."""
 9 |     client_id: str = Field(..., description="OAuth client ID")
10 |     client_secret: str = Field(..., description="OAuth client secret")
11 |     refresh_token: str = Field(..., description="OAuth refresh token")
12 |     organization_id: str = Field(..., description="Zoho organization ID")
13 |     environment: str = Field(default="production", description="Zoho environment (production/sandbox)")
14 |     access_token: Optional[str] = Field(default=None, description="Current access token")
15 | 
16 | def load_config() -> ZohoCreatorConfig:
17 |     """Load configuration from environment variables or .env file."""
18 |     # Try to load from .env file if it exists
19 |     env_path = Path(".env")
20 |     if env_path.exists():
21 |         load_dotenv(env_path)
22 | 
23 |     return ZohoCreatorConfig(
24 |         client_id=os.getenv("ZOHO_CLIENT_ID", ""),
25 |         client_secret=os.getenv("ZOHO_CLIENT_SECRET", ""),
26 |         refresh_token=os.getenv("ZOHO_REFRESH_TOKEN", ""),
27 |         organization_id=os.getenv("ZOHO_ORGANIZATION_ID", ""),
28 |         environment=os.getenv("ZOHO_ENVIRONMENT", "production"),
29 |     )
30 | 
31 | # API endpoints for different environments
32 | API_BASE_URL = {
33 |     "production": "https://creator.zoho.com/api/v2/100rails/goscaffold",
34 |     "sandbox": "https://creator.zoho.com/api/v2/sandbox"
35 | }
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/auth.py:
--------------------------------------------------------------------------------

```python
 1 | # src_scaflog_zoho_mcp_server/auth.py
 2 | 
 3 | import time
 4 | from typing import Optional
 5 | import httpx
 6 | from pydantic import BaseModel, Field
 7 | 
 8 | from .config import ZohoCreatorConfig, API_BASE_URL
 9 | 
10 | class TokenInfo(BaseModel):
11 |     """Model for storing token information."""
12 |     access_token: str
13 |     expires_in: int
14 |     created_at: float = Field(default_factory=time.time)
15 | 
16 |     @property
17 |     def is_expired(self) -> bool:
18 |         """Check if the token is expired with a 5-minute buffer."""
19 |         return time.time() > (self.created_at + self.expires_in - 300)
20 | 
21 | class ZohoAuth:
22 |     """Handles authentication with Zoho Creator API."""
23 |     def __init__(self, config: ZohoCreatorConfig):
24 |         self.config = config
25 |         self._token_info: Optional[TokenInfo] = None
26 |         self._client = httpx.AsyncClient(timeout=30.0)
27 | 
28 |     async def get_access_token(self) -> str:
29 |         """Get a valid access token, refreshing if necessary."""
30 |         if not self._token_info or self._token_info.is_expired:
31 |             await self._refresh_token()
32 |         return self._token_info.access_token
33 | 
34 |     async def _refresh_token(self) -> None:
35 |         """Refresh the access token using the refresh token."""
36 |         token_url = "https://accounts.zoho.com/oauth/v2/token"
37 |         params = {
38 |             "client_id": self.config.client_id,
39 |             "client_secret": self.config.client_secret,
40 |             "refresh_token": self.config.refresh_token,
41 |             "grant_type": "refresh_token",
42 |             "redirect_uri": "https://www.zohoapis.com"
43 |         }
44 | 
45 |         async with self._client as client:
46 |             response = await client.post(token_url, params=params)
47 |             response.raise_for_status()
48 |             data = response.json()
49 | 
50 |             self._token_info = TokenInfo(
51 |                 access_token=data["access_token"],
52 |                 expires_in=data["expires_in"]
53 |             )
54 | 
55 |     async def get_authorized_headers(self) -> dict:
56 |         """Get headers with authorization token."""
57 |         token = await self.get_access_token()
58 |         return {
59 |             "Authorization": f"Bearer {token}",
60 |             "Content-Type": "application/json",
61 |         }
62 | 
63 |     async def close(self):
64 |         """Close the HTTP client."""
65 |         await self._client.aclose()
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/resource_config.py:
--------------------------------------------------------------------------------

```python
 1 | # src_scaflog_zoho_mcp_server/resource_config.py
 2 | 
 3 | from typing import Dict, List, Optional
 4 | from pydantic import BaseModel
 5 | 
 6 | class FieldConfig(BaseModel):
 7 |     """Configuration for a whitelisted field."""
 8 |     display_name: str
 9 |     description: Optional[str] = None
10 |     required: bool = False
11 | 
12 | class FormConfig(BaseModel):
13 |     """Configuration for a whitelisted form."""
14 |     link_name: str
15 |     display_name: str
16 |     description: Optional[str] = None
17 |     fields: Dict[str, FieldConfig]
18 | 
19 | class ReportConfig(BaseModel):
20 |     """Configuration for a whitelisted report."""
21 |     link_name: str
22 |     display_name: str
23 |     description: Optional[str] = None
24 |     fields: Dict[str, FieldConfig]
25 | 
26 | # Define the whitelisted resources
27 | WHITELISTED_RESOURCES = {
28 |     "forms": {
29 |         "Company_Info": FormConfig(
30 |             link_name="Company_Info",
31 |             display_name="Company Information",
32 |             description="Core company details and profile",
33 |             fields={
34 |                 "Company_Name": FieldConfig(
35 |                     display_name="Company Name",
36 |                     description="Legal name of the company",
37 |                     required=True
38 |                 ),
39 |                 "Phone": FieldConfig(
40 |                     display_name="Phone Number",
41 |                     description="Primary contact number"
42 |                 ),
43 |                 "Email": FieldConfig(
44 |                     display_name="Email",
45 |                     description="Primary contact email"
46 |                 ),
47 |                 "Industry": FieldConfig(
48 |                     display_name="Industry",
49 |                     description="Company's primary industry"
50 |                 )
51 |             }
52 |         ),
53 |         # Add more forms as needed
54 |     },
55 |     "reports": {
56 |         "Company_All_Data": ReportConfig(
57 |             link_name="Company_All_Data",
58 |             display_name="Company Overview",
59 |             description="Comprehensive view of company information",
60 |             fields={
61 |                 "Company_Name": FieldConfig(
62 |                     display_name="Company Name",
63 |                     description="Legal name of the company"
64 |                 ),
65 |                 "Industry": FieldConfig(
66 |                     display_name="Industry",
67 |                     description="Company's primary industry"
68 |                 ),
69 |                 "Status": FieldConfig(
70 |                     display_name="Status",
71 |                     description="Current company status"
72 |                 )
73 |             }
74 |         ),
75 |         # Add more reports as needed
76 |     }
77 | }
78 | 
```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
 1 | # tests/test_server.py
 2 | import pytest
 3 | from pydantic import AnyUrl
 4 | import mcp.types as types
 5 | from mcp import ClientSession
 6 | import logging
 7 | 
 8 | # Configure logging for the test
 9 | logging.basicConfig(level=logging.INFO)
10 | 
11 | @pytest.mark.asyncio
12 | async def test_server_resources(client_session: ClientSession):
13 |     """Test server resources functionality."""
14 |     logging.info("Starting test_server_resources...")
15 |     resources = await client_session.list_resources()
16 |     logging.info(f"Resources: {resources}")  # Log the resources
17 |     assert len(resources) > 0
18 |     assert any(str(r.uri) == "zoho://forms" for r in resources)
19 |     assert any(str(r.uri) == "zoho://forms/test_form" for r in resources)
20 | 
21 |     # Read a resource
22 |     resource = await client_session.read_resource("zoho://forms/test_form")
23 |     assert resource.mimeType == "application/json"
24 |     assert "form" in resource.text
25 |     assert "records" in resource.text
26 | 
27 | @pytest.mark.asyncio
28 | async def test_server_tools(client_session: ClientSession):
29 |     """Test server tools functionality."""
30 |     # List available tools
31 |     tools = await client_session.list_tools()
32 |     assert any(tool.name == "create-record" for tool in tools)
33 | 
34 |     # Call a tool
35 |     result = await client_session.call_tool(
36 |         "create-record",
37 |         arguments={
38 |             "form_name": "test_form",
39 |             "data": {"test_field": "test_value"}
40 |         }
41 |     )
42 |     assert len(result) == 1
43 |     assert result[0].type == "text"
44 |     assert "created successfully" in result[0].text
45 | 
46 | @pytest.mark.asyncio
47 | async def test_real_server_resources(client_session: ClientSession):
48 |     """Test server resources functionality with real data."""
49 |     # List available resources
50 |     resources = await client_session.list_resources()
51 |     print("Resources:", resources)  # Log the resources
52 |     assert len(resources) > 0
53 |     assert any(str(r.uri) == "zoho://forms" for r in resources)
54 |     assert any(str(r.uri) == "zoho://form/Company_Info" for r in resources)
55 | 
56 |     # Read a resource
57 |     resource = await client_session.read_resource("zoho://report/Company_All_Data")
58 |     print("Resource:", resource)  # Log the resource
59 |     assert resource.mimeType == "application/json"
60 |     assert "form" in resource.text
61 |     assert "records" in resource.text
62 | 
63 | @pytest.mark.asyncio
64 | async def test_real_server_tools(client_session: ClientSession):
65 |     """Test server tools functionality with real data."""
66 |     # List available tools
67 |     tools = await client_session.list_tools()
68 |     print("Tools:", tools)  # Log the tools
69 |     assert any(tool.name == "create-record" for tool in tools)
70 | 
71 |     # Call a tool
72 |     result = await client_session.call_tool(
73 |         "create-record",
74 |         arguments={
75 |             "form_name": "test_form",
76 |             "data": {"test_field": "test_value"}
77 |         }
78 |     )
79 |     print("Tool Call Result:", result)  # Log the result
80 |     assert len(result) == 1
81 |     assert result[0].type == "text"
82 |     assert "created successfully" in result[0].text
83 | 
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/models.py:
--------------------------------------------------------------------------------

```python
 1 | # src_scaflog_zoho_mcp_server/models.py
 2 | 
 3 | from datetime import datetime
 4 | from typing import Any, Dict, List, Optional
 5 | from pydantic import BaseModel, Field
 6 | 
 7 | class ZohoField(BaseModel):
 8 |     """Represents a field in a Zoho Creator form."""
 9 |     link_name: str = Field(..., description="The API name of the field")
10 |     display_name: str = Field(..., description="The display name of the field")
11 |     field_type: int = Field(..., description="The type of the field (e.g., text, number)")
12 |     required: bool = Field(default=False, description="Indicates if the field is mandatory")
13 |     unique: bool = Field(default=False, description="Indicates if the field must be unique")
14 |     max_char: Optional[int] = Field(default=None, description="Maximum character length for the field")
15 |     lookup: Optional[bool] = Field(default=None, description="Indicates if the field is a lookup field")
16 |     choices: Optional[List[dict]] = Field(default=None, description="List of choices for the field if applicable")
17 | 
18 | class ZohoForm(BaseModel):
19 |     """Represents a form in Zoho Creator."""
20 |     link_name: str = Field(..., description="API link name of the form")
21 |     display_name: str = Field(..., description="Display name of the form")
22 |     type: int = Field(..., description="Type of the form")
23 |     fields: List[ZohoField] = Field(default_factory=list)
24 | 
25 | class ZohoRecord(BaseModel):
26 |     """Represents a record in a Zoho Creator form."""
27 |     id: str
28 |     form_link_name: str
29 |     data: Dict[str, Any]
30 |     # Remove created_time and modified_time if they are not in the response
31 |     # created_time: Optional[datetime] = None
32 |     # modified_time: Optional[datetime] = None
33 | 
34 | class ZohoReport(BaseModel):
35 |     """Represents a report in Zoho Creator."""
36 |     link_name: str = Field(..., description="API link name of the report")
37 |     display_name: str = Field(..., description="Display name of the report")
38 |     type: int = Field(..., description="Type of the reprt")
39 | 
40 | class Cache:
41 |     """Simple cache for form metadata."""
42 |     def __init__(self, ttl_seconds: int = 300):
43 |         self.forms: Dict[str, ZohoForm] = {}
44 |         self.reports: Dict[str, ZohoReport] = {}
45 |         self.ttl = ttl_seconds
46 |         self.last_refresh: Optional[datetime] = None
47 | 
48 |     def needs_refresh(self) -> bool:
49 |         """Check if cache needs refreshing."""
50 |         if not self.last_refresh:
51 |             return True
52 |         return (datetime.now() - self.last_refresh).total_seconds() > self.ttl
53 | 
54 |     def update_forms(self, forms: List[ZohoForm]):
55 |         """Update cached forms."""
56 |         self.forms = {form.link_name: form for form in forms}
57 |         self.last_refresh = datetime.now()
58 | 
59 |     def get_form(self, link_name: str) -> Optional[ZohoForm]:
60 |         """Get a form from cache by link name."""
61 |         return self.forms.get(link_name)
62 |     
63 |     def update_reports(self, reports: List[ZohoReport]):
64 |         """Update cached reports."""
65 |         self.reports = {report.link_name: report for report in reports}
66 |         self.last_refresh = datetime.now()
67 | 
68 | 
```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
 1 | # tests/conftest.py
 2 | import pytest
 3 | from unittest.mock import AsyncMock
 4 | from typing import AsyncGenerator, Generator
 5 | import httpx
 6 | from datetime import datetime
 7 | import os
 8 | from pathlib import Path
 9 | import sys
10 | from dotenv import load_dotenv  # Import dotenv to load environment variables
11 | import logging  # Import logging
12 | 
13 | # Import the necessary classes
14 | from scaflog_zoho_mcp_server.config import ZohoCreatorConfig
15 | from scaflog_zoho_mcp_server.auth import ZohoAuth
16 | from scaflog_zoho_mcp_server.service import ZohoCreatorService
17 | from mcp import ClientSession, StdioServerParameters  # Add this import
18 | from mcp.client.stdio import stdio_client  # Add this import
19 | 
20 | # Configure logging to write to a file
21 | logging.basicConfig(
22 |     filename='app.log',  # Specify the log file name
23 |     filemode='a',        # Append mode
24 |     format='%(asctime)s - %(levelname)s - %(message)s',
25 |     level=logging.INFO   # Set the logging level
26 | )
27 | 
28 | # Log that the tests are starting
29 | logging.info("Starting tests...")
30 | 
31 | # Load environment variables from .env file
32 | load_dotenv()
33 | 
34 | @pytest.fixture(scope="session")
35 | def test_env() -> Generator[dict, None, None]:
36 |     """Create a test environment with necessary configuration."""
37 |     env = {
38 |         "ZOHO_CLIENT_ID": os.getenv("ZOHO_CLIENT_ID"),
39 |         "ZOHO_CLIENT_SECRET": os.getenv("ZOHO_CLIENT_SECRET"),
40 |         "ZOHO_REFRESH_TOKEN": os.getenv("ZOHO_REFRESH_TOKEN"),
41 |         "ZOHO_ORGANIZATION_ID": os.getenv("ZOHO_ORGANIZATION_ID"),
42 |         "ZOHO_ENVIRONMENT": os.getenv("ZOHO_ENVIRONMENT"),
43 |     }
44 |     
45 |     yield env
46 | 
47 | @pytest.fixture
48 | async def mock_service() -> AsyncGenerator[ZohoCreatorService, None]:
49 |     """Create a service with actual data from Zoho Creator."""
50 |     config = ZohoCreatorConfig(
51 |         client_id=os.getenv("ZOHO_CLIENT_ID"),
52 |         client_secret=os.getenv("ZOHO_CLIENT_SECRET"),
53 |         refresh_token=os.getenv("ZOHO_REFRESH_TOKEN"),
54 |         organization_id=os.getenv("ZOHO_ORGANIZATION_ID"),
55 |         environment=os.getenv("ZOHO_ENVIRONMENT")
56 |     )
57 |     auth = ZohoAuth(config)
58 |     
59 |     # Create a new instance of ZohoCreatorService
60 |     service = ZohoCreatorService(auth)
61 | 
62 |     # Create a new HTTP client for each test
63 |     service._client = httpx.AsyncClient()  # Create a new client instance
64 | 
65 |     yield service
66 | 
67 |     await service._client.aclose()  # Close the client after the test
68 |     await service.close()  # Close the service
69 | 
70 | @pytest.fixture
71 | async def client_session(test_env) -> AsyncGenerator[ClientSession, None]:
72 |     """Create a client session connected to the test server."""
73 |     python_path = sys.executable
74 |     project_root = Path(__file__).parent.parent
75 |     
76 |     server_params = StdioServerParameters(
77 |         command=python_path,
78 |         args=["-m", "src"],
79 |         env={
80 |             **os.environ,
81 |             **test_env,
82 |             "PYTHONPATH": str(project_root)
83 |         }
84 |     )
85 | 
86 |     async with stdio_client(server_params) as (read, write):
87 |         async with ClientSession(read, write) as session:
88 |             await session.initialize()
89 |             yield session
90 | 
```

--------------------------------------------------------------------------------
/tests/test_service.py:
--------------------------------------------------------------------------------

```python
 1 | # tests/test_service.py
 2 | import pytest
 3 | from datetime import datetime
 4 | import logging
 5 | 
 6 | # Configure logging for the test
 7 | logging.basicConfig(level=logging.INFO)
 8 | 
 9 | from scaflog_zoho_mcp_server.service import ZohoCreatorService
10 | 
11 | @pytest.mark.asyncio
12 | async def test_list_forms(mock_service: ZohoCreatorService):
13 |     """Test listing forms."""
14 |     logging.info("Starting test_list_forms...")
15 |     forms = await mock_service.list_forms(force_refresh=True)
16 |     logging.info(f"Fetched forms: {[form.display_name for form in forms]}")  # Log the display names of the forms
17 |     
18 |     assert len(forms) > 0  # Ensure that at least one form is returned
19 |     assert all(hasattr(form, 'link_name') for form in forms)  # Check that each form has a link_name
20 |     assert all(hasattr(form, 'display_name') for form in forms)  # Check that each form has a display_name
21 | 
22 | @pytest.mark.asyncio
23 | async def test_get_records(mock_service: ZohoCreatorService):
24 |     """Test getting records."""
25 |     logging.info("Starting test_get_records...")
26 |     records = await mock_service.get_records("test_form")
27 |     logging.info(f"Fetched records: {records}")  # Log the fetched records
28 |     assert len(records) == 1
29 |     assert records[0].id == "123"
30 |     assert records[0].data["ID"] == "test_value"
31 | 
32 | @pytest.mark.asyncio
33 | async def test_create_record(mock_service: ZohoCreatorService):
34 |     """Test creating a record."""
35 |     record = await mock_service.create_record(
36 |         "test_form",
37 |         {"test_field": "new_value"}
38 |     )
39 |     assert record.id == "123"
40 |     assert record.form_link_name == "test_form"
41 |     assert record.data["test_field"] == "new_value"
42 | 
43 | @pytest.mark.asyncio
44 | async def test_update_record(mock_service: ZohoCreatorService):
45 |     """Test updating a record."""
46 |     record = await mock_service.update_record(
47 |         "test_form",
48 |         "123",
49 |         {"test_field": "updated_value"}
50 |     )
51 |     assert record.id == "123"
52 |     assert record.form_link_name == "test_form"
53 |     assert record.data["test_field"] == "updated_value"
54 | 
55 | @pytest.mark.asyncio
56 | async def test_fetch_data(mock_service):
57 |     logging.info("Starting test_fetch_data...")
58 |     data = await mock_service.fetch_data()
59 |     logging.info(f"Fetched data: {data}")
60 |     assert data is not None  # Example assertion
61 | 
62 | @pytest.mark.asyncio
63 | async def test_fetch_all_records(mock_service: ZohoCreatorService):
64 |     """Test fetching all records from the Company_Info report."""
65 |     logging.info("Starting test_fetch_all_records...")
66 |     
67 |     # Fetch all records for the report "Company_All_Data_Report"
68 |     records = await mock_service.get_records("Company_All_Data")  # Use the report link name
69 |     
70 |     # Log the fetched records
71 |     logging.info(f"Fetched records: {records}")
72 |     
73 |     # Assertions to verify the records
74 |     assert len(records) > 0  # Ensure that at least one record is returned
75 |     for record in records:
76 |         assert isinstance(record.id, str)  # Ensure each record has a valid ID
77 |         # assert "Company_Info" in record.data  # Ensure the record contains data for the form
78 | 
79 | # You can add more tests below...
80 | 
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/service.py:
--------------------------------------------------------------------------------

```python
  1 | # src_scaflog_zoho_mcp_server/service.py
  2 | 
  3 | from typing import List, Optional, Dict, Any
  4 | import httpx
  5 | from datetime import datetime
  6 | import logging
  7 | 
  8 | from .models import ZohoForm, ZohoReport, ZohoField, ZohoRecord, Cache
  9 | from .auth import ZohoAuth
 10 | from .config import API_BASE_URL
 11 | 
 12 | # Configure logging to write to a file
 13 | logging.basicConfig(
 14 |     filename='app.log',  # Specify the log file name
 15 |     filemode='a',        # Append mode
 16 |     format='%(asctime)s - %(levelname)s - %(message)s',
 17 |     level=logging.INFO   # Set the logging level
 18 | )
 19 | 
 20 | class ZohoCreatorService:
 21 |     """Service for interacting with Zoho Creator API."""
 22 |     
 23 |     def __init__(self, auth: ZohoAuth):
 24 |         self.auth = auth
 25 |         self.cache = Cache()
 26 |         self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0))
 27 |         self.base_url = API_BASE_URL[auth.config.environment]
 28 | 
 29 |     async def list_forms(self, force_refresh: bool = False) -> List[ZohoForm]:
 30 |         """Get all available forms."""
 31 |         if not force_refresh and not self.cache.needs_refresh():
 32 |             return list(self.cache.forms.values())
 33 | 
 34 |         headers = await self.auth.get_authorized_headers()
 35 |         url = f"{self.base_url}/forms"
 36 |         
 37 |         response = await self._client.get(url, headers=headers)
 38 |         response.raise_for_status()
 39 |         data = response.json()
 40 |         logging.info(f"Response from list_forms: {data}")  # Log the entire response
 41 | 
 42 |         forms = []
 43 |         for form_data in data['forms'][:10]:
 44 |             logging.info(f"Processing form: {form_data['link_name']}")  # Log the link_name
 45 |             fields = await self._get_form_fields(form_data['link_name'], headers)
 46 |             form = ZohoForm(
 47 |                 link_name=form_data['link_name'],
 48 |                 display_name=form_data['display_name'],
 49 |                 fields=fields,
 50 |                 type=form_data['type']
 51 |             )
 52 |             forms.append(form)
 53 | 
 54 |         self.cache.update_forms(forms)
 55 |         return forms
 56 | 
 57 |     async def _get_form_fields(self, form_link_name: str, headers: dict) -> List[ZohoField]:
 58 |         """Get fields for a specific form."""
 59 |         url = f"{self.base_url}/form/{form_link_name}/fields"
 60 |         
 61 |         response = await self._client.get(url, headers=headers)
 62 |         response.raise_for_status()
 63 |         data = response.json()
 64 |         # logging.info(f"Response from _get_form_fields: {data}")  # Log the entire response
 65 | 
 66 |         return [
 67 |             ZohoField(
 68 |                 link_name=field['link_name'],
 69 |                 display_name=field['display_name'],
 70 |                 field_type=field['type'],
 71 |                 required=field['mandatory'],
 72 |                 unique=field['unique'],
 73 |                 max_char=field.get('max_char'),
 74 |                 lookup=field.get('is_lookup_field'),
 75 |                 choices=field.get('choices')
 76 |             )
 77 |             for field in data['fields']
 78 |         ]
 79 | 
 80 |     async def list_reports(self, force_refresh: bool = False) -> List[ZohoReport]:
 81 |         """Get all available reports."""
 82 |         if not force_refresh and not self.cache.needs_refresh():
 83 |             return list(self.cache.reports.values())
 84 | 
 85 |         headers = await self.auth.get_authorized_headers()
 86 |         url = f"{self.base_url}/reports"
 87 |         
 88 |         response = await self._client.get(url, headers=headers)
 89 |         response.raise_for_status()
 90 |         data = response.json()
 91 |         logging.info(f"Response from list_reports: {data}")  # Log the entire response
 92 | 
 93 |         reports = []
 94 |         for form_data in data['reports'][:10]:
 95 |             logging.info(f"Processing reporrt: {form_data['link_name']}")  # Log the link_name
 96 |             form = ZohoReport(
 97 |                 link_name=form_data['link_name'],
 98 |                 display_name=form_data['display_name'],
 99 |                 type=form_data['type']
100 |             )
101 |             reports.append(form)
102 | 
103 |         self.cache.update_reports(reports)
104 |         return reports
105 | 
106 |     async def get_records(
107 |         self,
108 |         report_link_name: str,
109 |         criteria: Optional[str] = None,
110 |         limit: Optional[int] = None
111 |     ) -> List[ZohoRecord]:
112 |         """Get records from a specific report."""
113 |         headers = await self.auth.get_authorized_headers()
114 |         url = f"{self.base_url}/report/{report_link_name}"
115 |         
116 |         params = {}
117 |         if criteria:
118 |             params['criteria'] = criteria
119 |         if limit:
120 |             params['limit'] = limit
121 | 
122 |         async with self._client as client:
123 |             response = await client.get(url, headers=headers, params=params)
124 |             response.raise_for_status()
125 |             data = response.json()
126 | 
127 |             return [
128 |                 ZohoRecord(
129 |                     id=record['ID'],
130 |                     form_link_name=report_link_name,
131 |                     data=record
132 |                 )
133 |                 for record in data['data']
134 |             ]
135 | 
136 |     async def get_record(
137 |         self,
138 |         report_link_name: str,
139 |         record_id: str
140 |     ) -> ZohoRecord:
141 |         """Get a specific record by ID."""
142 |         headers = await self.auth.get_authorized_headers()
143 |         url = f"{self.base_url}/report/{report_link_name}/{record_id}"
144 |         
145 |         async with self._client as client:
146 |             response = await client.get(url, headers=headers)
147 |             response.raise_for_status()
148 |             result = response.json()
149 | 
150 |             return ZohoRecord(
151 |                 id=record_id,
152 |                 form_link_name=report_link_name,
153 |                 data=result['data']
154 |             )
155 | 
156 |     async def create_record(
157 |         self,
158 |         form_link_name: str,
159 |         data: Dict[str, Any]
160 |     ) -> ZohoRecord:
161 |         """Create a new record in a form."""
162 |         headers = await self.auth.get_authorized_headers()
163 |         url = f"{self.base_url}/form/{form_link_name}"
164 |         
165 |         async with self._client as client:
166 |             response = await client.post(
167 |                 url,
168 |                 headers=headers,
169 |                 json={"data": data}
170 |             )
171 |             response.raise_for_status()
172 |             result = response.json()
173 | 
174 |             return ZohoRecord(
175 |                 id=result['record']['ID'],
176 |                 form_link_name=form_link_name,
177 |                 data=data
178 |             )
179 | 
180 |     async def update_record(
181 |         self,
182 |         report_link_name: str,
183 |         record_id: str,
184 |         data: Dict[str, Any]
185 |     ) -> ZohoRecord:
186 |         """Update an existing record in a form."""
187 |         headers = await self.auth.get_authorized_headers()
188 |         url = f"{self.base_url}/report/{report_link_name}/{record_id}"
189 |         
190 |         async with self._client as client:
191 |             response = await client.patch(
192 |                 url,
193 |                 headers=headers,
194 |                 json={"data": data}
195 |             )
196 |             response.raise_for_status()
197 |             result = response.json()
198 | 
199 |             return ZohoRecord(
200 |                 id=record_id,
201 |                 form_link_name=report_link_name,
202 |                 data=data
203 |             )
204 | 
205 |     async def close(self):
206 |         """Close HTTP client."""
207 |         await self._client.aclose()
208 | 
209 |     async def fetch_data(self):
210 |         logging.info("Fetching data from the API...")
211 |         try:
212 |             response = await self._client.get("your_api_endpoint")
213 |             response.raise_for_status()  # Raise an error for bad responses
214 |             logging.info(f"Fetched data: {response.json()}")
215 |             return response.json()
216 |         except Exception as e:
217 |             logging.error(f"Error fetching data: {e}")
218 |             return None
219 |         
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/server.py:
--------------------------------------------------------------------------------

```python
  1 | # src_scaflog_zoho_mcp_server/server.py
  2 | 
  3 | import json
  4 | import logging
  5 | from typing import Dict, List, Optional
  6 | from urllib.parse import parse_qs, urlparse
  7 | from pydantic import AnyUrl
  8 | 
  9 | from mcp.server import NotificationOptions, Server
 10 | from mcp.server.models import InitializationOptions
 11 | import mcp.types as types
 12 | import mcp.server.stdio
 13 | 
 14 | from .config import load_config, API_BASE_URL
 15 | from .auth import ZohoAuth
 16 | from .service import ZohoCreatorService
 17 | from .resource_config import WHITELISTED_RESOURCES
 18 | 
 19 | # Configure logging
 20 | logging.basicConfig(level=logging.DEBUG)
 21 | logger = logging.getLogger(__name__)
 22 | 
 23 | # Create a server instance
 24 | server = Server("scaflog-zoho-mcp-server")
 25 | config = load_config()
 26 | auth = ZohoAuth(config)
 27 | service = ZohoCreatorService(auth)
 28 | 
 29 | @server.list_resources()
 30 | async def handle_list_resources() -> list[types.Resource]:
 31 |     """List available whitelisted Zoho Creator forms and reports as resources."""
 32 |     logger.debug("Starting handle_list_resources...")
 33 |     
 34 |     try:
 35 |         resources = []
 36 |         
 37 |         # Add container resources
 38 |         resources.append(
 39 |             types.Resource(
 40 |                 uri=AnyUrl("zoho://forms"),
 41 |                 name="Available Forms",
 42 |                 description="List of available Zoho Creator forms",
 43 |                 mimeType="application/json"
 44 |             )
 45 |         )
 46 |         
 47 |         resources.append(
 48 |             types.Resource(
 49 |                 uri=AnyUrl("zoho://reports"),
 50 |                 name="Available Reports",
 51 |                 description="List of available Zoho Creator reports",
 52 |                 mimeType="application/json"
 53 |             )
 54 |         )
 55 |         
 56 |         # Add whitelisted forms
 57 |         for link_name, form_config in WHITELISTED_RESOURCES["forms"].items():
 58 |             resources.append(
 59 |                 types.Resource(
 60 |                     uri=AnyUrl(f"zoho://form/{link_name}"),
 61 |                     name=form_config.display_name,
 62 |                     description=form_config.description,
 63 |                     mimeType="application/json"
 64 |                 )
 65 |             )
 66 |         
 67 |         # Add whitelisted reports
 68 |         for link_name, report_config in WHITELISTED_RESOURCES["reports"].items():
 69 |             resources.append(
 70 |                 types.Resource(
 71 |                     uri=AnyUrl(f"zoho://report/{link_name}"),
 72 |                     name=report_config.display_name,
 73 |                     description=report_config.description,
 74 |                     mimeType="application/json"
 75 |                 )
 76 |             )
 77 |         
 78 |         return resources
 79 |     
 80 |     except Exception as e:
 81 |         logger.exception("Error in handle_list_resources")
 82 |         raise
 83 | 
 84 | @server.read_resource()
 85 | async def handle_read_resource(uri: AnyUrl) -> types.TextResourceContents | types.BlobResourceContents:
 86 |     """Read data from Zoho Creator based on the resource URI, filtered by whitelist."""
 87 |     try:
 88 |         logger.info(f"Reading resource: {uri}")
 89 |         parsed = urlparse(str(uri))
 90 |         
 91 |         if parsed.scheme != "zoho":
 92 |             raise ValueError(f"Unsupported URI scheme: {parsed.scheme}")
 93 |         
 94 |         full_path = f"{parsed.netloc}{parsed.path}".strip("/")
 95 |         path_parts = full_path.split("/")
 96 |         
 97 |         if not path_parts:
 98 |             raise ValueError("Empty resource path")
 99 |             
100 |         resource_type = path_parts[0]
101 |         
102 |         # Handle root resources
103 |         if resource_type == "forms":
104 |             return types.TextResourceContents(
105 |                 uri=uri,
106 |                 mimeType="application/json",
107 |                 text=json.dumps({
108 |                     "forms": [
109 |                         {
110 |                             "link_name": link_name,
111 |                             "display_name": form.display_name,
112 |                             "description": form.description,
113 |                             "fields": {
114 |                                 field_name: field.dict() 
115 |                                 for field_name, field in form.fields.items()
116 |                             }
117 |                         }
118 |                         for link_name, form in WHITELISTED_RESOURCES["forms"].items()
119 |                     ]
120 |                 }, indent=2)
121 |             )
122 |             
123 |         elif resource_type == "reports":
124 |             return types.TextResourceContents(
125 |                 uri=uri,
126 |                 mimeType="application/json",
127 |                 text=json.dumps({
128 |                     "reports": [
129 |                         {
130 |                             "link_name": link_name,
131 |                             "display_name": report.display_name,
132 |                             "description": report.description,
133 |                             "fields": {
134 |                                 field_name: field.dict() 
135 |                                 for field_name, field in report.fields.items()
136 |                             }
137 |                         }
138 |                         for link_name, report in WHITELISTED_RESOURCES["reports"].items()
139 |                     ]
140 |                 }, indent=2)
141 |             )
142 |         
143 |         # Handle specific resources
144 |         if len(path_parts) < 2:
145 |             raise ValueError(f"Missing link name for resource type: {resource_type}")
146 |             
147 |         link_name = path_parts[1]
148 |         
149 |         if resource_type == "form":
150 |             # Check if form is whitelisted
151 |             form_config = WHITELISTED_RESOURCES["forms"].get(link_name)
152 |             if not form_config:
153 |                 raise ValueError(f"Form not found or not accessible: {link_name}")
154 |             
155 |             # Get form data from Zoho
156 |             records = await service.get_records(link_name)
157 |             
158 |             # Filter fields based on whitelist
159 |             filtered_records = [
160 |                 {
161 |                     field_name: record.data.get(field_name)
162 |                     for field_name in form_config.fields.keys()
163 |                     if field_name in record.data
164 |                 }
165 |                 for record in records
166 |             ]
167 |             
168 |             return types.TextResourceContents(
169 |                 uri=uri,
170 |                 mimeType="application/json",
171 |                 text=json.dumps({
172 |                     "form": {
173 |                         "link_name": link_name,
174 |                         "display_name": form_config.display_name,
175 |                         "description": form_config.description,
176 |                         "fields": {
177 |                             name: field.dict() 
178 |                             for name, field in form_config.fields.items()
179 |                         }
180 |                     },
181 |                     "records": filtered_records
182 |                 }, indent=2)
183 |             )
184 |             
185 |         elif resource_type == "report":
186 |             # Check if report is whitelisted
187 |             report_config = WHITELISTED_RESOURCES["reports"].get(link_name)
188 |             if not report_config:
189 |                 raise ValueError(f"Report not found or not accessible: {link_name}")
190 |             
191 |             # Get report data from Zoho
192 |             records = await service.get_records(link_name)
193 |             
194 |             # Filter fields based on whitelist
195 |             filtered_records = [
196 |                 {
197 |                     field_name: record.data.get(field_name)
198 |                     for field_name in report_config.fields.keys()
199 |                     if field_name in record.data
200 |                 }
201 |                 for record in records
202 |             ]
203 |             
204 |             return types.TextResourceContents(
205 |                 uri=uri,
206 |                 mimeType="application/json",
207 |                 text=json.dumps({
208 |                     "report": {
209 |                         "link_name": link_name,
210 |                         "display_name": report_config.display_name,
211 |                         "description": report_config.description,
212 |                         "fields": {
213 |                             name: field.dict() 
214 |                             for name, field in report_config.fields.items()
215 |                         }
216 |                     },
217 |                     "records": filtered_records
218 |                 }, indent=2)
219 |             )
220 |         
221 |         else:
222 |             raise ValueError(f"Unknown resource type: {resource_type}")
223 |             
224 |     except Exception as e:
225 |         logger.exception(f"Error reading resource: {uri}")
226 |         raise
227 | 
228 | async def main():
229 |     """Main entry point for the server."""
230 |     logger.info("Starting Zoho Creator MCP server...")
231 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
232 |         try:
233 |             logger.info("Initializing server connection...")
234 |             await server.run(
235 |                 read_stream,
236 |                 write_stream,
237 |                 InitializationOptions(
238 |                     server_name="scaflog-zoho-mcp-server",
239 |                     server_version="0.1.0",
240 |                     capabilities=server.get_capabilities(
241 |                         notification_options=NotificationOptions(),
242 |                         experimental_capabilities={},
243 |                     ),
244 |                 ),
245 |             )
246 |         except Exception as e:
247 |             logger.exception("Error running server")
248 |             raise
249 |         finally:
250 |             logger.info("Shutting down server...")
251 |             await auth.close()
252 |             await service.close()
253 | 
```