# Directory Structure
```
├── .gitignore
├── .python-version
├── .vscode
│ └── settings.json
├── ~
│ └── .zshrc
├── app.log
├── pyproject.toml
├── README.md
├── src
│ ├── .DS_Store
│ └── scaflog_zoho_mcp_server
│ ├── __init__.py
│ ├── .DS_Store
│ ├── auth.py
│ ├── config.py
│ ├── models.py
│ ├── resource_config.py
│ ├── server.py
│ └── service.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_server.py
│ └── test_service.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.12
2 |
```
--------------------------------------------------------------------------------
/~/.zshrc:
--------------------------------------------------------------------------------
```
1 | # Add these lines to the end of your .zshrc
2 | alias python="python3"
3 | alias pip="pip3"
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python-generated files
2 | __pycache__/
3 | *.py[oc]
4 | build/
5 | dist/
6 | wheels/
7 | *.egg-info
8 |
9 | # Virtual environments
10 | .venv
11 |
12 | # Environment variables
13 | .env
14 |
15 | # Flatten repo
16 | flatten/
17 |
18 | # other
19 | app.log
20 | .DS_Store
21 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # scaflog-zoho-mcp-server MCP server
2 |
3 | Zoho Creator Scaflog App MCP Server
4 |
5 | ## Components
6 |
7 | ### Resources
8 |
9 | The server implements a simple note storage system with:
10 | - Custom note:// URI scheme for accessing individual notes
11 | - Each note resource has a name, description and text/plain mimetype
12 |
13 | ### Prompts
14 |
15 | The server provides a single prompt:
16 | - summarize-notes: Creates summaries of all stored notes
17 | - Optional "style" argument to control detail level (brief/detailed)
18 | - Generates prompt combining all current notes with style preference
19 |
20 | ### Tools
21 |
22 | The server implements one tool:
23 | - add-note: Adds a new note to the server
24 | - Takes "name" and "content" as required string arguments
25 | - Updates server state and notifies clients of resource changes
26 |
27 | ## Configuration
28 |
29 | [TODO: Add configuration details specific to your implementation]
30 |
31 | ## Quickstart
32 |
33 | ### Install
34 |
35 | #### Claude Desktop
36 |
37 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
38 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
39 |
40 | <details>
41 | <summary>Development/Unpublished Servers Configuration</summary>
42 | ```
43 | "mcpServers": {
44 | "scaflog-zoho-mcp-server": {
45 | "command": "uv",
46 | "args": [
47 | "--directory",
48 | "/Users/alexsherin/Documents/Projects/MCP Servers",
49 | "run",
50 | "scaflog-zoho-mcp-server"
51 | ]
52 | }
53 | }
54 | ```
55 | </details>
56 |
57 | <details>
58 | <summary>Published Servers Configuration</summary>
59 | ```
60 | "mcpServers": {
61 | "scaflog-zoho-mcp-server": {
62 | "command": "uvx",
63 | "args": [
64 | "scaflog-zoho-mcp-server"
65 | ]
66 | }
67 | }
68 | ```
69 | </details>
70 |
71 | ## Development
72 |
73 | ### Building and Publishing
74 |
75 | To prepare the package for distribution:
76 |
77 | 1. Sync dependencies and update lockfile:
78 | ```bash
79 | uv sync
80 | ```
81 |
82 | 2. Build package distributions:
83 | ```bash
84 | uv build
85 | ```
86 |
87 | This will create source and wheel distributions in the `dist/` directory.
88 |
89 | 3. Publish to PyPI:
90 | ```bash
91 | uv publish
92 | ```
93 |
94 | Note: You'll need to set PyPI credentials via environment variables or command flags:
95 | - Token: `--token` or `UV_PUBLISH_TOKEN`
96 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
97 |
98 | ### Debugging
99 |
100 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging
101 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
102 |
103 |
104 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
105 |
106 | ```bash
107 | npx @modelcontextprotocol/inspector uv --directory /Users/alexsherin/Documents/Projects/MCP Servers run scaflog-zoho-mcp-server
108 | ```
109 |
110 |
111 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
1 | # tests/__init__.py
2 |
3 | # import pytest
4 |
5 | # @pytest.fixture(scope="module")
6 | # def mock_server():
7 | # return ZohoCreatorServer()
8 |
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from . import server
2 | import asyncio
3 |
4 | def main():
5 | """Main entry point for the package."""
6 | asyncio.run(server.main())
7 |
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "window.commandCenter": 1,
3 | "workbench.colorTheme": "Material Theme",
4 | "workbench.iconTheme": "material-icon-theme",
5 | "editor.fontSize": 14,
6 | "files.autoSave": "afterDelay",
7 | "files.autoSaveWorkspaceFilesOnly": true,
8 | "git.postCommitCommand": "push",
9 | "git.enableSmartCommit": false,
10 | "git.defaultCommitMessageTemplate": "feat: ",
11 | "git.followTagsWhenSync": true,
12 | "git.inputValidationSubjectLength": 72,
13 | "git.inputValidationLength": 500,
14 | "scm.defaultViewMode": "tree",
15 | "git.useCommitInputAsStashMessage": true
16 | }
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "scaflog-zoho-mcp-server"
3 | version = "0.1.0"
4 | description = "MCP Server for Zoho Creator Integration"
5 | readme = "README.md"
6 | authors = [
7 | { name = "alexsherin", email = "[email protected]" }
8 | ]
9 | requires-python = ">=3.12"
10 | dependencies = [
11 | "mcp>=1.1.0",
12 | "zcrmsdk==3.1.0",
13 | "pydantic>=2.0.0",
14 | "python-dotenv>=1.0.0",
15 | "typing-extensions>=4.7.0",
16 | "httpx>=0.24.0",
17 | ]
18 |
19 | [project.scripts]
20 | scaflog-zoho-mcp-server = "scaflog_zoho_mcp_server:main"
21 |
22 | [build-system]
23 | requires = ["hatchling"]
24 | build-backend = "hatchling.build"
25 |
26 | [tool.hatch.build.targets.wheel]
27 | packages = ["src/scaflog_zoho_mcp_server"]
28 |
29 | [tool.pytest.ini_options]
30 | asyncio_mode = "auto"
31 | asyncio_default_fixture_loop_scope = "function"
32 | testpaths = ["tests"]
33 |
34 | [project.optional-dependencies]
35 | test = [
36 | "pytest>=7.0.0",
37 | "pytest-asyncio>=0.21.0",
38 | "pytest-cov>=4.1.0",
39 | ]
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/config.py:
--------------------------------------------------------------------------------
```python
1 | from pathlib import Path
2 | import os
3 | from typing import Optional
4 | from pydantic import BaseModel, Field
5 | from dotenv import load_dotenv
6 |
7 | class ZohoCreatorConfig(BaseModel):
8 | """Configuration for Zoho Creator API access."""
9 | client_id: str = Field(..., description="OAuth client ID")
10 | client_secret: str = Field(..., description="OAuth client secret")
11 | refresh_token: str = Field(..., description="OAuth refresh token")
12 | organization_id: str = Field(..., description="Zoho organization ID")
13 | environment: str = Field(default="production", description="Zoho environment (production/sandbox)")
14 | access_token: Optional[str] = Field(default=None, description="Current access token")
15 |
16 | def load_config() -> ZohoCreatorConfig:
17 | """Load configuration from environment variables or .env file."""
18 | # Try to load from .env file if it exists
19 | env_path = Path(".env")
20 | if env_path.exists():
21 | load_dotenv(env_path)
22 |
23 | return ZohoCreatorConfig(
24 | client_id=os.getenv("ZOHO_CLIENT_ID", ""),
25 | client_secret=os.getenv("ZOHO_CLIENT_SECRET", ""),
26 | refresh_token=os.getenv("ZOHO_REFRESH_TOKEN", ""),
27 | organization_id=os.getenv("ZOHO_ORGANIZATION_ID", ""),
28 | environment=os.getenv("ZOHO_ENVIRONMENT", "production"),
29 | )
30 |
31 | # API endpoints for different environments
32 | API_BASE_URL = {
33 | "production": "https://creator.zoho.com/api/v2/100rails/goscaffold",
34 | "sandbox": "https://creator.zoho.com/api/v2/sandbox"
35 | }
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/auth.py:
--------------------------------------------------------------------------------
```python
1 | # src_scaflog_zoho_mcp_server/auth.py
2 |
3 | import time
4 | from typing import Optional
5 | import httpx
6 | from pydantic import BaseModel, Field
7 |
8 | from .config import ZohoCreatorConfig, API_BASE_URL
9 |
10 | class TokenInfo(BaseModel):
11 | """Model for storing token information."""
12 | access_token: str
13 | expires_in: int
14 | created_at: float = Field(default_factory=time.time)
15 |
16 | @property
17 | def is_expired(self) -> bool:
18 | """Check if the token is expired with a 5-minute buffer."""
19 | return time.time() > (self.created_at + self.expires_in - 300)
20 |
21 | class ZohoAuth:
22 | """Handles authentication with Zoho Creator API."""
23 | def __init__(self, config: ZohoCreatorConfig):
24 | self.config = config
25 | self._token_info: Optional[TokenInfo] = None
26 | self._client = httpx.AsyncClient(timeout=30.0)
27 |
28 | async def get_access_token(self) -> str:
29 | """Get a valid access token, refreshing if necessary."""
30 | if not self._token_info or self._token_info.is_expired:
31 | await self._refresh_token()
32 | return self._token_info.access_token
33 |
34 | async def _refresh_token(self) -> None:
35 | """Refresh the access token using the refresh token."""
36 | token_url = "https://accounts.zoho.com/oauth/v2/token"
37 | params = {
38 | "client_id": self.config.client_id,
39 | "client_secret": self.config.client_secret,
40 | "refresh_token": self.config.refresh_token,
41 | "grant_type": "refresh_token",
42 | "redirect_uri": "https://www.zohoapis.com"
43 | }
44 |
45 | async with self._client as client:
46 | response = await client.post(token_url, params=params)
47 | response.raise_for_status()
48 | data = response.json()
49 |
50 | self._token_info = TokenInfo(
51 | access_token=data["access_token"],
52 | expires_in=data["expires_in"]
53 | )
54 |
55 | async def get_authorized_headers(self) -> dict:
56 | """Get headers with authorization token."""
57 | token = await self.get_access_token()
58 | return {
59 | "Authorization": f"Bearer {token}",
60 | "Content-Type": "application/json",
61 | }
62 |
63 | async def close(self):
64 | """Close the HTTP client."""
65 | await self._client.aclose()
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/resource_config.py:
--------------------------------------------------------------------------------
```python
1 | # src_scaflog_zoho_mcp_server/resource_config.py
2 |
3 | from typing import Dict, List, Optional
4 | from pydantic import BaseModel
5 |
6 | class FieldConfig(BaseModel):
7 | """Configuration for a whitelisted field."""
8 | display_name: str
9 | description: Optional[str] = None
10 | required: bool = False
11 |
12 | class FormConfig(BaseModel):
13 | """Configuration for a whitelisted form."""
14 | link_name: str
15 | display_name: str
16 | description: Optional[str] = None
17 | fields: Dict[str, FieldConfig]
18 |
19 | class ReportConfig(BaseModel):
20 | """Configuration for a whitelisted report."""
21 | link_name: str
22 | display_name: str
23 | description: Optional[str] = None
24 | fields: Dict[str, FieldConfig]
25 |
26 | # Define the whitelisted resources
27 | WHITELISTED_RESOURCES = {
28 | "forms": {
29 | "Company_Info": FormConfig(
30 | link_name="Company_Info",
31 | display_name="Company Information",
32 | description="Core company details and profile",
33 | fields={
34 | "Company_Name": FieldConfig(
35 | display_name="Company Name",
36 | description="Legal name of the company",
37 | required=True
38 | ),
39 | "Phone": FieldConfig(
40 | display_name="Phone Number",
41 | description="Primary contact number"
42 | ),
43 | "Email": FieldConfig(
44 | display_name="Email",
45 | description="Primary contact email"
46 | ),
47 | "Industry": FieldConfig(
48 | display_name="Industry",
49 | description="Company's primary industry"
50 | )
51 | }
52 | ),
53 | # Add more forms as needed
54 | },
55 | "reports": {
56 | "Company_All_Data": ReportConfig(
57 | link_name="Company_All_Data",
58 | display_name="Company Overview",
59 | description="Comprehensive view of company information",
60 | fields={
61 | "Company_Name": FieldConfig(
62 | display_name="Company Name",
63 | description="Legal name of the company"
64 | ),
65 | "Industry": FieldConfig(
66 | display_name="Industry",
67 | description="Company's primary industry"
68 | ),
69 | "Status": FieldConfig(
70 | display_name="Status",
71 | description="Current company status"
72 | )
73 | }
74 | ),
75 | # Add more reports as needed
76 | }
77 | }
78 |
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
1 | # tests/test_server.py
2 | import pytest
3 | from pydantic import AnyUrl
4 | import mcp.types as types
5 | from mcp import ClientSession
6 | import logging
7 |
8 | # Configure logging for the test
9 | logging.basicConfig(level=logging.INFO)
10 |
11 | @pytest.mark.asyncio
12 | async def test_server_resources(client_session: ClientSession):
13 | """Test server resources functionality."""
14 | logging.info("Starting test_server_resources...")
15 | resources = await client_session.list_resources()
16 | logging.info(f"Resources: {resources}") # Log the resources
17 | assert len(resources) > 0
18 | assert any(str(r.uri) == "zoho://forms" for r in resources)
19 | assert any(str(r.uri) == "zoho://forms/test_form" for r in resources)
20 |
21 | # Read a resource
22 | resource = await client_session.read_resource("zoho://forms/test_form")
23 | assert resource.mimeType == "application/json"
24 | assert "form" in resource.text
25 | assert "records" in resource.text
26 |
27 | @pytest.mark.asyncio
28 | async def test_server_tools(client_session: ClientSession):
29 | """Test server tools functionality."""
30 | # List available tools
31 | tools = await client_session.list_tools()
32 | assert any(tool.name == "create-record" for tool in tools)
33 |
34 | # Call a tool
35 | result = await client_session.call_tool(
36 | "create-record",
37 | arguments={
38 | "form_name": "test_form",
39 | "data": {"test_field": "test_value"}
40 | }
41 | )
42 | assert len(result) == 1
43 | assert result[0].type == "text"
44 | assert "created successfully" in result[0].text
45 |
46 | @pytest.mark.asyncio
47 | async def test_real_server_resources(client_session: ClientSession):
48 | """Test server resources functionality with real data."""
49 | # List available resources
50 | resources = await client_session.list_resources()
51 | print("Resources:", resources) # Log the resources
52 | assert len(resources) > 0
53 | assert any(str(r.uri) == "zoho://forms" for r in resources)
54 | assert any(str(r.uri) == "zoho://form/Company_Info" for r in resources)
55 |
56 | # Read a resource
57 | resource = await client_session.read_resource("zoho://report/Company_All_Data")
58 | print("Resource:", resource) # Log the resource
59 | assert resource.mimeType == "application/json"
60 | assert "form" in resource.text
61 | assert "records" in resource.text
62 |
63 | @pytest.mark.asyncio
64 | async def test_real_server_tools(client_session: ClientSession):
65 | """Test server tools functionality with real data."""
66 | # List available tools
67 | tools = await client_session.list_tools()
68 | print("Tools:", tools) # Log the tools
69 | assert any(tool.name == "create-record" for tool in tools)
70 |
71 | # Call a tool
72 | result = await client_session.call_tool(
73 | "create-record",
74 | arguments={
75 | "form_name": "test_form",
76 | "data": {"test_field": "test_value"}
77 | }
78 | )
79 | print("Tool Call Result:", result) # Log the result
80 | assert len(result) == 1
81 | assert result[0].type == "text"
82 | assert "created successfully" in result[0].text
83 |
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/models.py:
--------------------------------------------------------------------------------
```python
1 | # src_scaflog_zoho_mcp_server/models.py
2 |
3 | from datetime import datetime
4 | from typing import Any, Dict, List, Optional
5 | from pydantic import BaseModel, Field
6 |
7 | class ZohoField(BaseModel):
8 | """Represents a field in a Zoho Creator form."""
9 | link_name: str = Field(..., description="The API name of the field")
10 | display_name: str = Field(..., description="The display name of the field")
11 | field_type: int = Field(..., description="The type of the field (e.g., text, number)")
12 | required: bool = Field(default=False, description="Indicates if the field is mandatory")
13 | unique: bool = Field(default=False, description="Indicates if the field must be unique")
14 | max_char: Optional[int] = Field(default=None, description="Maximum character length for the field")
15 | lookup: Optional[bool] = Field(default=None, description="Indicates if the field is a lookup field")
16 | choices: Optional[List[dict]] = Field(default=None, description="List of choices for the field if applicable")
17 |
18 | class ZohoForm(BaseModel):
19 | """Represents a form in Zoho Creator."""
20 | link_name: str = Field(..., description="API link name of the form")
21 | display_name: str = Field(..., description="Display name of the form")
22 | type: int = Field(..., description="Type of the form")
23 | fields: List[ZohoField] = Field(default_factory=list)
24 |
25 | class ZohoRecord(BaseModel):
26 | """Represents a record in a Zoho Creator form."""
27 | id: str
28 | form_link_name: str
29 | data: Dict[str, Any]
30 | # Remove created_time and modified_time if they are not in the response
31 | # created_time: Optional[datetime] = None
32 | # modified_time: Optional[datetime] = None
33 |
34 | class ZohoReport(BaseModel):
35 | """Represents a report in Zoho Creator."""
36 | link_name: str = Field(..., description="API link name of the report")
37 | display_name: str = Field(..., description="Display name of the report")
38 | type: int = Field(..., description="Type of the reprt")
39 |
40 | class Cache:
41 | """Simple cache for form metadata."""
42 | def __init__(self, ttl_seconds: int = 300):
43 | self.forms: Dict[str, ZohoForm] = {}
44 | self.reports: Dict[str, ZohoReport] = {}
45 | self.ttl = ttl_seconds
46 | self.last_refresh: Optional[datetime] = None
47 |
48 | def needs_refresh(self) -> bool:
49 | """Check if cache needs refreshing."""
50 | if not self.last_refresh:
51 | return True
52 | return (datetime.now() - self.last_refresh).total_seconds() > self.ttl
53 |
54 | def update_forms(self, forms: List[ZohoForm]):
55 | """Update cached forms."""
56 | self.forms = {form.link_name: form for form in forms}
57 | self.last_refresh = datetime.now()
58 |
59 | def get_form(self, link_name: str) -> Optional[ZohoForm]:
60 | """Get a form from cache by link name."""
61 | return self.forms.get(link_name)
62 |
63 | def update_reports(self, reports: List[ZohoReport]):
64 | """Update cached reports."""
65 | self.reports = {report.link_name: report for report in reports}
66 | self.last_refresh = datetime.now()
67 |
68 |
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
1 | # tests/conftest.py
2 | import pytest
3 | from unittest.mock import AsyncMock
4 | from typing import AsyncGenerator, Generator
5 | import httpx
6 | from datetime import datetime
7 | import os
8 | from pathlib import Path
9 | import sys
10 | from dotenv import load_dotenv # Import dotenv to load environment variables
11 | import logging # Import logging
12 |
13 | # Import the necessary classes
14 | from scaflog_zoho_mcp_server.config import ZohoCreatorConfig
15 | from scaflog_zoho_mcp_server.auth import ZohoAuth
16 | from scaflog_zoho_mcp_server.service import ZohoCreatorService
17 | from mcp import ClientSession, StdioServerParameters # Add this import
18 | from mcp.client.stdio import stdio_client # Add this import
19 |
20 | # Configure logging to write to a file
21 | logging.basicConfig(
22 | filename='app.log', # Specify the log file name
23 | filemode='a', # Append mode
24 | format='%(asctime)s - %(levelname)s - %(message)s',
25 | level=logging.INFO # Set the logging level
26 | )
27 |
28 | # Log that the tests are starting
29 | logging.info("Starting tests...")
30 |
31 | # Load environment variables from .env file
32 | load_dotenv()
33 |
34 | @pytest.fixture(scope="session")
35 | def test_env() -> Generator[dict, None, None]:
36 | """Create a test environment with necessary configuration."""
37 | env = {
38 | "ZOHO_CLIENT_ID": os.getenv("ZOHO_CLIENT_ID"),
39 | "ZOHO_CLIENT_SECRET": os.getenv("ZOHO_CLIENT_SECRET"),
40 | "ZOHO_REFRESH_TOKEN": os.getenv("ZOHO_REFRESH_TOKEN"),
41 | "ZOHO_ORGANIZATION_ID": os.getenv("ZOHO_ORGANIZATION_ID"),
42 | "ZOHO_ENVIRONMENT": os.getenv("ZOHO_ENVIRONMENT"),
43 | }
44 |
45 | yield env
46 |
47 | @pytest.fixture
48 | async def mock_service() -> AsyncGenerator[ZohoCreatorService, None]:
49 | """Create a service with actual data from Zoho Creator."""
50 | config = ZohoCreatorConfig(
51 | client_id=os.getenv("ZOHO_CLIENT_ID"),
52 | client_secret=os.getenv("ZOHO_CLIENT_SECRET"),
53 | refresh_token=os.getenv("ZOHO_REFRESH_TOKEN"),
54 | organization_id=os.getenv("ZOHO_ORGANIZATION_ID"),
55 | environment=os.getenv("ZOHO_ENVIRONMENT")
56 | )
57 | auth = ZohoAuth(config)
58 |
59 | # Create a new instance of ZohoCreatorService
60 | service = ZohoCreatorService(auth)
61 |
62 | # Create a new HTTP client for each test
63 | service._client = httpx.AsyncClient() # Create a new client instance
64 |
65 | yield service
66 |
67 | await service._client.aclose() # Close the client after the test
68 | await service.close() # Close the service
69 |
70 | @pytest.fixture
71 | async def client_session(test_env) -> AsyncGenerator[ClientSession, None]:
72 | """Create a client session connected to the test server."""
73 | python_path = sys.executable
74 | project_root = Path(__file__).parent.parent
75 |
76 | server_params = StdioServerParameters(
77 | command=python_path,
78 | args=["-m", "src"],
79 | env={
80 | **os.environ,
81 | **test_env,
82 | "PYTHONPATH": str(project_root)
83 | }
84 | )
85 |
86 | async with stdio_client(server_params) as (read, write):
87 | async with ClientSession(read, write) as session:
88 | await session.initialize()
89 | yield session
90 |
```
--------------------------------------------------------------------------------
/tests/test_service.py:
--------------------------------------------------------------------------------
```python
1 | # tests/test_service.py
2 | import pytest
3 | from datetime import datetime
4 | import logging
5 |
6 | # Configure logging for the test
7 | logging.basicConfig(level=logging.INFO)
8 |
9 | from scaflog_zoho_mcp_server.service import ZohoCreatorService
10 |
11 | @pytest.mark.asyncio
12 | async def test_list_forms(mock_service: ZohoCreatorService):
13 | """Test listing forms."""
14 | logging.info("Starting test_list_forms...")
15 | forms = await mock_service.list_forms(force_refresh=True)
16 | logging.info(f"Fetched forms: {[form.display_name for form in forms]}") # Log the display names of the forms
17 |
18 | assert len(forms) > 0 # Ensure that at least one form is returned
19 | assert all(hasattr(form, 'link_name') for form in forms) # Check that each form has a link_name
20 | assert all(hasattr(form, 'display_name') for form in forms) # Check that each form has a display_name
21 |
22 | @pytest.mark.asyncio
23 | async def test_get_records(mock_service: ZohoCreatorService):
24 | """Test getting records."""
25 | logging.info("Starting test_get_records...")
26 | records = await mock_service.get_records("test_form")
27 | logging.info(f"Fetched records: {records}") # Log the fetched records
28 | assert len(records) == 1
29 | assert records[0].id == "123"
30 | assert records[0].data["ID"] == "test_value"
31 |
32 | @pytest.mark.asyncio
33 | async def test_create_record(mock_service: ZohoCreatorService):
34 | """Test creating a record."""
35 | record = await mock_service.create_record(
36 | "test_form",
37 | {"test_field": "new_value"}
38 | )
39 | assert record.id == "123"
40 | assert record.form_link_name == "test_form"
41 | assert record.data["test_field"] == "new_value"
42 |
43 | @pytest.mark.asyncio
44 | async def test_update_record(mock_service: ZohoCreatorService):
45 | """Test updating a record."""
46 | record = await mock_service.update_record(
47 | "test_form",
48 | "123",
49 | {"test_field": "updated_value"}
50 | )
51 | assert record.id == "123"
52 | assert record.form_link_name == "test_form"
53 | assert record.data["test_field"] == "updated_value"
54 |
55 | @pytest.mark.asyncio
56 | async def test_fetch_data(mock_service):
57 | logging.info("Starting test_fetch_data...")
58 | data = await mock_service.fetch_data()
59 | logging.info(f"Fetched data: {data}")
60 | assert data is not None # Example assertion
61 |
62 | @pytest.mark.asyncio
63 | async def test_fetch_all_records(mock_service: ZohoCreatorService):
64 | """Test fetching all records from the Company_Info report."""
65 | logging.info("Starting test_fetch_all_records...")
66 |
67 | # Fetch all records for the report "Company_All_Data_Report"
68 | records = await mock_service.get_records("Company_All_Data") # Use the report link name
69 |
70 | # Log the fetched records
71 | logging.info(f"Fetched records: {records}")
72 |
73 | # Assertions to verify the records
74 | assert len(records) > 0 # Ensure that at least one record is returned
75 | for record in records:
76 | assert isinstance(record.id, str) # Ensure each record has a valid ID
77 | # assert "Company_Info" in record.data # Ensure the record contains data for the form
78 |
79 | # You can add more tests below...
80 |
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/service.py:
--------------------------------------------------------------------------------
```python
1 | # src_scaflog_zoho_mcp_server/service.py
2 |
3 | from typing import List, Optional, Dict, Any
4 | import httpx
5 | from datetime import datetime
6 | import logging
7 |
8 | from .models import ZohoForm, ZohoReport, ZohoField, ZohoRecord, Cache
9 | from .auth import ZohoAuth
10 | from .config import API_BASE_URL
11 |
12 | # Configure logging to write to a file
13 | logging.basicConfig(
14 | filename='app.log', # Specify the log file name
15 | filemode='a', # Append mode
16 | format='%(asctime)s - %(levelname)s - %(message)s',
17 | level=logging.INFO # Set the logging level
18 | )
19 |
20 | class ZohoCreatorService:
21 | """Service for interacting with Zoho Creator API."""
22 |
23 | def __init__(self, auth: ZohoAuth):
24 | self.auth = auth
25 | self.cache = Cache()
26 | self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0))
27 | self.base_url = API_BASE_URL[auth.config.environment]
28 |
29 | async def list_forms(self, force_refresh: bool = False) -> List[ZohoForm]:
30 | """Get all available forms."""
31 | if not force_refresh and not self.cache.needs_refresh():
32 | return list(self.cache.forms.values())
33 |
34 | headers = await self.auth.get_authorized_headers()
35 | url = f"{self.base_url}/forms"
36 |
37 | response = await self._client.get(url, headers=headers)
38 | response.raise_for_status()
39 | data = response.json()
40 | logging.info(f"Response from list_forms: {data}") # Log the entire response
41 |
42 | forms = []
43 | for form_data in data['forms'][:10]:
44 | logging.info(f"Processing form: {form_data['link_name']}") # Log the link_name
45 | fields = await self._get_form_fields(form_data['link_name'], headers)
46 | form = ZohoForm(
47 | link_name=form_data['link_name'],
48 | display_name=form_data['display_name'],
49 | fields=fields,
50 | type=form_data['type']
51 | )
52 | forms.append(form)
53 |
54 | self.cache.update_forms(forms)
55 | return forms
56 |
57 | async def _get_form_fields(self, form_link_name: str, headers: dict) -> List[ZohoField]:
58 | """Get fields for a specific form."""
59 | url = f"{self.base_url}/form/{form_link_name}/fields"
60 |
61 | response = await self._client.get(url, headers=headers)
62 | response.raise_for_status()
63 | data = response.json()
64 | # logging.info(f"Response from _get_form_fields: {data}") # Log the entire response
65 |
66 | return [
67 | ZohoField(
68 | link_name=field['link_name'],
69 | display_name=field['display_name'],
70 | field_type=field['type'],
71 | required=field['mandatory'],
72 | unique=field['unique'],
73 | max_char=field.get('max_char'),
74 | lookup=field.get('is_lookup_field'),
75 | choices=field.get('choices')
76 | )
77 | for field in data['fields']
78 | ]
79 |
80 | async def list_reports(self, force_refresh: bool = False) -> List[ZohoReport]:
81 | """Get all available reports."""
82 | if not force_refresh and not self.cache.needs_refresh():
83 | return list(self.cache.reports.values())
84 |
85 | headers = await self.auth.get_authorized_headers()
86 | url = f"{self.base_url}/reports"
87 |
88 | response = await self._client.get(url, headers=headers)
89 | response.raise_for_status()
90 | data = response.json()
91 | logging.info(f"Response from list_reports: {data}") # Log the entire response
92 |
93 | reports = []
94 | for form_data in data['reports'][:10]:
95 | logging.info(f"Processing reporrt: {form_data['link_name']}") # Log the link_name
96 | form = ZohoReport(
97 | link_name=form_data['link_name'],
98 | display_name=form_data['display_name'],
99 | type=form_data['type']
100 | )
101 | reports.append(form)
102 |
103 | self.cache.update_reports(reports)
104 | return reports
105 |
106 | async def get_records(
107 | self,
108 | report_link_name: str,
109 | criteria: Optional[str] = None,
110 | limit: Optional[int] = None
111 | ) -> List[ZohoRecord]:
112 | """Get records from a specific report."""
113 | headers = await self.auth.get_authorized_headers()
114 | url = f"{self.base_url}/report/{report_link_name}"
115 |
116 | params = {}
117 | if criteria:
118 | params['criteria'] = criteria
119 | if limit:
120 | params['limit'] = limit
121 |
122 | async with self._client as client:
123 | response = await client.get(url, headers=headers, params=params)
124 | response.raise_for_status()
125 | data = response.json()
126 |
127 | return [
128 | ZohoRecord(
129 | id=record['ID'],
130 | form_link_name=report_link_name,
131 | data=record
132 | )
133 | for record in data['data']
134 | ]
135 |
136 | async def get_record(
137 | self,
138 | report_link_name: str,
139 | record_id: str
140 | ) -> ZohoRecord:
141 | """Get a specific record by ID."""
142 | headers = await self.auth.get_authorized_headers()
143 | url = f"{self.base_url}/report/{report_link_name}/{record_id}"
144 |
145 | async with self._client as client:
146 | response = await client.get(url, headers=headers)
147 | response.raise_for_status()
148 | result = response.json()
149 |
150 | return ZohoRecord(
151 | id=record_id,
152 | form_link_name=report_link_name,
153 | data=result['data']
154 | )
155 |
156 | async def create_record(
157 | self,
158 | form_link_name: str,
159 | data: Dict[str, Any]
160 | ) -> ZohoRecord:
161 | """Create a new record in a form."""
162 | headers = await self.auth.get_authorized_headers()
163 | url = f"{self.base_url}/form/{form_link_name}"
164 |
165 | async with self._client as client:
166 | response = await client.post(
167 | url,
168 | headers=headers,
169 | json={"data": data}
170 | )
171 | response.raise_for_status()
172 | result = response.json()
173 |
174 | return ZohoRecord(
175 | id=result['record']['ID'],
176 | form_link_name=form_link_name,
177 | data=data
178 | )
179 |
180 | async def update_record(
181 | self,
182 | report_link_name: str,
183 | record_id: str,
184 | data: Dict[str, Any]
185 | ) -> ZohoRecord:
186 | """Update an existing record in a form."""
187 | headers = await self.auth.get_authorized_headers()
188 | url = f"{self.base_url}/report/{report_link_name}/{record_id}"
189 |
190 | async with self._client as client:
191 | response = await client.patch(
192 | url,
193 | headers=headers,
194 | json={"data": data}
195 | )
196 | response.raise_for_status()
197 | result = response.json()
198 |
199 | return ZohoRecord(
200 | id=record_id,
201 | form_link_name=report_link_name,
202 | data=data
203 | )
204 |
205 | async def close(self):
206 | """Close HTTP client."""
207 | await self._client.aclose()
208 |
209 | async def fetch_data(self):
210 | logging.info("Fetching data from the API...")
211 | try:
212 | response = await self._client.get("your_api_endpoint")
213 | response.raise_for_status() # Raise an error for bad responses
214 | logging.info(f"Fetched data: {response.json()}")
215 | return response.json()
216 | except Exception as e:
217 | logging.error(f"Error fetching data: {e}")
218 | return None
219 |
```
--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/server.py:
--------------------------------------------------------------------------------
```python
1 | # src_scaflog_zoho_mcp_server/server.py
2 |
3 | import json
4 | import logging
5 | from typing import Dict, List, Optional
6 | from urllib.parse import parse_qs, urlparse
7 | from pydantic import AnyUrl
8 |
9 | from mcp.server import NotificationOptions, Server
10 | from mcp.server.models import InitializationOptions
11 | import mcp.types as types
12 | import mcp.server.stdio
13 |
14 | from .config import load_config, API_BASE_URL
15 | from .auth import ZohoAuth
16 | from .service import ZohoCreatorService
17 | from .resource_config import WHITELISTED_RESOURCES
18 |
19 | # Configure logging
20 | logging.basicConfig(level=logging.DEBUG)
21 | logger = logging.getLogger(__name__)
22 |
23 | # Create a server instance
24 | server = Server("scaflog-zoho-mcp-server")
25 | config = load_config()
26 | auth = ZohoAuth(config)
27 | service = ZohoCreatorService(auth)
28 |
29 | @server.list_resources()
30 | async def handle_list_resources() -> list[types.Resource]:
31 | """List available whitelisted Zoho Creator forms and reports as resources."""
32 | logger.debug("Starting handle_list_resources...")
33 |
34 | try:
35 | resources = []
36 |
37 | # Add container resources
38 | resources.append(
39 | types.Resource(
40 | uri=AnyUrl("zoho://forms"),
41 | name="Available Forms",
42 | description="List of available Zoho Creator forms",
43 | mimeType="application/json"
44 | )
45 | )
46 |
47 | resources.append(
48 | types.Resource(
49 | uri=AnyUrl("zoho://reports"),
50 | name="Available Reports",
51 | description="List of available Zoho Creator reports",
52 | mimeType="application/json"
53 | )
54 | )
55 |
56 | # Add whitelisted forms
57 | for link_name, form_config in WHITELISTED_RESOURCES["forms"].items():
58 | resources.append(
59 | types.Resource(
60 | uri=AnyUrl(f"zoho://form/{link_name}"),
61 | name=form_config.display_name,
62 | description=form_config.description,
63 | mimeType="application/json"
64 | )
65 | )
66 |
67 | # Add whitelisted reports
68 | for link_name, report_config in WHITELISTED_RESOURCES["reports"].items():
69 | resources.append(
70 | types.Resource(
71 | uri=AnyUrl(f"zoho://report/{link_name}"),
72 | name=report_config.display_name,
73 | description=report_config.description,
74 | mimeType="application/json"
75 | )
76 | )
77 |
78 | return resources
79 |
80 | except Exception as e:
81 | logger.exception("Error in handle_list_resources")
82 | raise
83 |
84 | @server.read_resource()
85 | async def handle_read_resource(uri: AnyUrl) -> types.TextResourceContents | types.BlobResourceContents:
86 | """Read data from Zoho Creator based on the resource URI, filtered by whitelist."""
87 | try:
88 | logger.info(f"Reading resource: {uri}")
89 | parsed = urlparse(str(uri))
90 |
91 | if parsed.scheme != "zoho":
92 | raise ValueError(f"Unsupported URI scheme: {parsed.scheme}")
93 |
94 | full_path = f"{parsed.netloc}{parsed.path}".strip("/")
95 | path_parts = full_path.split("/")
96 |
97 | if not path_parts:
98 | raise ValueError("Empty resource path")
99 |
100 | resource_type = path_parts[0]
101 |
102 | # Handle root resources
103 | if resource_type == "forms":
104 | return types.TextResourceContents(
105 | uri=uri,
106 | mimeType="application/json",
107 | text=json.dumps({
108 | "forms": [
109 | {
110 | "link_name": link_name,
111 | "display_name": form.display_name,
112 | "description": form.description,
113 | "fields": {
114 | field_name: field.dict()
115 | for field_name, field in form.fields.items()
116 | }
117 | }
118 | for link_name, form in WHITELISTED_RESOURCES["forms"].items()
119 | ]
120 | }, indent=2)
121 | )
122 |
123 | elif resource_type == "reports":
124 | return types.TextResourceContents(
125 | uri=uri,
126 | mimeType="application/json",
127 | text=json.dumps({
128 | "reports": [
129 | {
130 | "link_name": link_name,
131 | "display_name": report.display_name,
132 | "description": report.description,
133 | "fields": {
134 | field_name: field.dict()
135 | for field_name, field in report.fields.items()
136 | }
137 | }
138 | for link_name, report in WHITELISTED_RESOURCES["reports"].items()
139 | ]
140 | }, indent=2)
141 | )
142 |
143 | # Handle specific resources
144 | if len(path_parts) < 2:
145 | raise ValueError(f"Missing link name for resource type: {resource_type}")
146 |
147 | link_name = path_parts[1]
148 |
149 | if resource_type == "form":
150 | # Check if form is whitelisted
151 | form_config = WHITELISTED_RESOURCES["forms"].get(link_name)
152 | if not form_config:
153 | raise ValueError(f"Form not found or not accessible: {link_name}")
154 |
155 | # Get form data from Zoho
156 | records = await service.get_records(link_name)
157 |
158 | # Filter fields based on whitelist
159 | filtered_records = [
160 | {
161 | field_name: record.data.get(field_name)
162 | for field_name in form_config.fields.keys()
163 | if field_name in record.data
164 | }
165 | for record in records
166 | ]
167 |
168 | return types.TextResourceContents(
169 | uri=uri,
170 | mimeType="application/json",
171 | text=json.dumps({
172 | "form": {
173 | "link_name": link_name,
174 | "display_name": form_config.display_name,
175 | "description": form_config.description,
176 | "fields": {
177 | name: field.dict()
178 | for name, field in form_config.fields.items()
179 | }
180 | },
181 | "records": filtered_records
182 | }, indent=2)
183 | )
184 |
185 | elif resource_type == "report":
186 | # Check if report is whitelisted
187 | report_config = WHITELISTED_RESOURCES["reports"].get(link_name)
188 | if not report_config:
189 | raise ValueError(f"Report not found or not accessible: {link_name}")
190 |
191 | # Get report data from Zoho
192 | records = await service.get_records(link_name)
193 |
194 | # Filter fields based on whitelist
195 | filtered_records = [
196 | {
197 | field_name: record.data.get(field_name)
198 | for field_name in report_config.fields.keys()
199 | if field_name in record.data
200 | }
201 | for record in records
202 | ]
203 |
204 | return types.TextResourceContents(
205 | uri=uri,
206 | mimeType="application/json",
207 | text=json.dumps({
208 | "report": {
209 | "link_name": link_name,
210 | "display_name": report_config.display_name,
211 | "description": report_config.description,
212 | "fields": {
213 | name: field.dict()
214 | for name, field in report_config.fields.items()
215 | }
216 | },
217 | "records": filtered_records
218 | }, indent=2)
219 | )
220 |
221 | else:
222 | raise ValueError(f"Unknown resource type: {resource_type}")
223 |
224 | except Exception as e:
225 | logger.exception(f"Error reading resource: {uri}")
226 | raise
227 |
228 | async def main():
229 | """Main entry point for the server."""
230 | logger.info("Starting Zoho Creator MCP server...")
231 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
232 | try:
233 | logger.info("Initializing server connection...")
234 | await server.run(
235 | read_stream,
236 | write_stream,
237 | InitializationOptions(
238 | server_name="scaflog-zoho-mcp-server",
239 | server_version="0.1.0",
240 | capabilities=server.get_capabilities(
241 | notification_options=NotificationOptions(),
242 | experimental_capabilities={},
243 | ),
244 | ),
245 | )
246 | except Exception as e:
247 | logger.exception("Error running server")
248 | raise
249 | finally:
250 | logger.info("Shutting down server...")
251 | await auth.close()
252 | await service.close()
253 |
```