#
tokens: 8882/50000 17/17 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── .vscode
│   └── settings.json
├── ~
│   └── .zshrc
├── app.log
├── pyproject.toml
├── README.md
├── src
│   ├── .DS_Store
│   └── scaflog_zoho_mcp_server
│       ├── __init__.py
│       ├── .DS_Store
│       ├── auth.py
│       ├── config.py
│       ├── models.py
│       ├── resource_config.py
│       ├── server.py
│       └── service.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_server.py
│   └── test_service.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/~/.zshrc:
--------------------------------------------------------------------------------

```
# Add these lines to the end of your .zshrc
alias python="python3"
alias pip="pip3" 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv

# Environment variables
.env

# Flatten repo
flatten/

# other
app.log
.DS_Store

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# scaflog-zoho-mcp-server MCP server

Zoho Creator Scaflog App MCP Server

## Components

### Resources

The server implements a simple note storage system with:
- Custom note:// URI scheme for accessing individual notes
- Each note resource has a name, description and text/plain mimetype

### Prompts

The server provides a single prompt:
- summarize-notes: Creates summaries of all stored notes
  - Optional "style" argument to control detail level (brief/detailed)
  - Generates prompt combining all current notes with style preference

### Tools

The server implements one tool:
- add-note: Adds a new note to the server
  - Takes "name" and "content" as required string arguments
  - Updates server state and notifies clients of resource changes

## Configuration

[TODO: Add configuration details specific to your implementation]

## Quickstart

### Install

#### Claude Desktop

On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`

<details>
  <summary>Development/Unpublished Servers Configuration</summary>
  ```
  "mcpServers": {
    "scaflog-zoho-mcp-server": {
      "command": "uv",
      "args": [
        "--directory",
        "/Users/alexsherin/Documents/Projects/MCP Servers",
        "run",
        "scaflog-zoho-mcp-server"
      ]
    }
  }
  ```
</details>

<details>
  <summary>Published Servers Configuration</summary>
  ```
  "mcpServers": {
    "scaflog-zoho-mcp-server": {
      "command": "uvx",
      "args": [
        "scaflog-zoho-mcp-server"
      ]
    }
  }
  ```
</details>

## Development

### Building and Publishing

To prepare the package for distribution:

1. Sync dependencies and update lockfile:
```bash
uv sync
```

2. Build package distributions:
```bash
uv build
```

This will create source and wheel distributions in the `dist/` directory.

3. Publish to PyPI:
```bash
uv publish
```

Note: You'll need to set PyPI credentials via environment variables or command flags:
- Token: `--token` or `UV_PUBLISH_TOKEN`
- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`

### Debugging

Since MCP servers run over stdio, debugging can be challenging. For the best debugging
experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).


You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:

```bash
npx @modelcontextprotocol/inspector uv --directory /Users/alexsherin/Documents/Projects/MCP Servers run scaflog-zoho-mcp-server
```


Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
# tests/__init__.py

# import pytest

# @pytest.fixture(scope="module")
# def mock_server():
#     return ZohoCreatorServer()

```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
from . import server
import asyncio

def main():
    """Main entry point for the package."""
    asyncio.run(server.main())

# Optionally expose other important items at package level
__all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------

```json
{
    "window.commandCenter": 1,
    "workbench.colorTheme": "Material Theme",
    "workbench.iconTheme": "material-icon-theme",
    "editor.fontSize": 14,
    "files.autoSave": "afterDelay",
    "files.autoSaveWorkspaceFilesOnly": true,
    "git.postCommitCommand": "push",
    "git.enableSmartCommit": false,
    "git.defaultCommitMessageTemplate": "feat: ",
    "git.followTagsWhenSync": true,
    "git.inputValidationSubjectLength": 72,
    "git.inputValidationLength": 500,
    "scm.defaultViewMode": "tree",
    "git.useCommitInputAsStashMessage": true
}
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "scaflog-zoho-mcp-server"
version = "0.1.0"
description = "MCP Server for Zoho Creator Integration"
readme = "README.md"
authors = [
    { name = "alexsherin", email = "[email protected]" }
]
requires-python = ">=3.12"
dependencies = [
    "mcp>=1.1.0",
    "zcrmsdk==3.1.0",
    "pydantic>=2.0.0",
    "python-dotenv>=1.0.0",
    "typing-extensions>=4.7.0",
    "httpx>=0.24.0",
]

[project.scripts]
scaflog-zoho-mcp-server = "scaflog_zoho_mcp_server:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/scaflog_zoho_mcp_server"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
testpaths = ["tests"]

[project.optional-dependencies]
test = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.21.0",
    "pytest-cov>=4.1.0",
]
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/config.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
import os
from typing import Optional
from pydantic import BaseModel, Field
from dotenv import load_dotenv

class ZohoCreatorConfig(BaseModel):
    """Configuration for Zoho Creator API access."""
    client_id: str = Field(..., description="OAuth client ID")
    client_secret: str = Field(..., description="OAuth client secret")
    refresh_token: str = Field(..., description="OAuth refresh token")
    organization_id: str = Field(..., description="Zoho organization ID")
    environment: str = Field(default="production", description="Zoho environment (production/sandbox)")
    access_token: Optional[str] = Field(default=None, description="Current access token")

def load_config() -> ZohoCreatorConfig:
    """Load configuration from environment variables or .env file."""
    # Try to load from .env file if it exists
    env_path = Path(".env")
    if env_path.exists():
        load_dotenv(env_path)

    return ZohoCreatorConfig(
        client_id=os.getenv("ZOHO_CLIENT_ID", ""),
        client_secret=os.getenv("ZOHO_CLIENT_SECRET", ""),
        refresh_token=os.getenv("ZOHO_REFRESH_TOKEN", ""),
        organization_id=os.getenv("ZOHO_ORGANIZATION_ID", ""),
        environment=os.getenv("ZOHO_ENVIRONMENT", "production"),
    )

# API endpoints for different environments
API_BASE_URL = {
    "production": "https://creator.zoho.com/api/v2/100rails/goscaffold",
    "sandbox": "https://creator.zoho.com/api/v2/sandbox"
}
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/auth.py:
--------------------------------------------------------------------------------

```python
# src_scaflog_zoho_mcp_server/auth.py

import time
from typing import Optional
import httpx
from pydantic import BaseModel, Field

from .config import ZohoCreatorConfig, API_BASE_URL

class TokenInfo(BaseModel):
    """Model for storing token information."""
    access_token: str
    expires_in: int
    created_at: float = Field(default_factory=time.time)

    @property
    def is_expired(self) -> bool:
        """Check if the token is expired with a 5-minute buffer."""
        return time.time() > (self.created_at + self.expires_in - 300)

class ZohoAuth:
    """Handles authentication with Zoho Creator API."""
    def __init__(self, config: ZohoCreatorConfig):
        self.config = config
        self._token_info: Optional[TokenInfo] = None
        self._client = httpx.AsyncClient(timeout=30.0)

    async def get_access_token(self) -> str:
        """Get a valid access token, refreshing if necessary."""
        if not self._token_info or self._token_info.is_expired:
            await self._refresh_token()
        return self._token_info.access_token

    async def _refresh_token(self) -> None:
        """Refresh the access token using the refresh token."""
        token_url = "https://accounts.zoho.com/oauth/v2/token"
        params = {
            "client_id": self.config.client_id,
            "client_secret": self.config.client_secret,
            "refresh_token": self.config.refresh_token,
            "grant_type": "refresh_token",
            "redirect_uri": "https://www.zohoapis.com"
        }

        async with self._client as client:
            response = await client.post(token_url, params=params)
            response.raise_for_status()
            data = response.json()

            self._token_info = TokenInfo(
                access_token=data["access_token"],
                expires_in=data["expires_in"]
            )

    async def get_authorized_headers(self) -> dict:
        """Get headers with authorization token."""
        token = await self.get_access_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

    async def close(self):
        """Close the HTTP client."""
        await self._client.aclose()
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/resource_config.py:
--------------------------------------------------------------------------------

```python
# src_scaflog_zoho_mcp_server/resource_config.py

from typing import Dict, List, Optional
from pydantic import BaseModel

class FieldConfig(BaseModel):
    """Configuration for a whitelisted field."""
    display_name: str
    description: Optional[str] = None
    required: bool = False

class FormConfig(BaseModel):
    """Configuration for a whitelisted form."""
    link_name: str
    display_name: str
    description: Optional[str] = None
    fields: Dict[str, FieldConfig]

class ReportConfig(BaseModel):
    """Configuration for a whitelisted report."""
    link_name: str
    display_name: str
    description: Optional[str] = None
    fields: Dict[str, FieldConfig]

# Define the whitelisted resources
WHITELISTED_RESOURCES = {
    "forms": {
        "Company_Info": FormConfig(
            link_name="Company_Info",
            display_name="Company Information",
            description="Core company details and profile",
            fields={
                "Company_Name": FieldConfig(
                    display_name="Company Name",
                    description="Legal name of the company",
                    required=True
                ),
                "Phone": FieldConfig(
                    display_name="Phone Number",
                    description="Primary contact number"
                ),
                "Email": FieldConfig(
                    display_name="Email",
                    description="Primary contact email"
                ),
                "Industry": FieldConfig(
                    display_name="Industry",
                    description="Company's primary industry"
                )
            }
        ),
        # Add more forms as needed
    },
    "reports": {
        "Company_All_Data": ReportConfig(
            link_name="Company_All_Data",
            display_name="Company Overview",
            description="Comprehensive view of company information",
            fields={
                "Company_Name": FieldConfig(
                    display_name="Company Name",
                    description="Legal name of the company"
                ),
                "Industry": FieldConfig(
                    display_name="Industry",
                    description="Company's primary industry"
                ),
                "Status": FieldConfig(
                    display_name="Status",
                    description="Current company status"
                )
            }
        ),
        # Add more reports as needed
    }
}

```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
# tests/test_server.py
import pytest
from pydantic import AnyUrl
import mcp.types as types
from mcp import ClientSession
import logging

# Configure logging for the test
logging.basicConfig(level=logging.INFO)

@pytest.mark.asyncio
async def test_server_resources(client_session: ClientSession):
    """Test server resources functionality."""
    logging.info("Starting test_server_resources...")
    resources = await client_session.list_resources()
    logging.info(f"Resources: {resources}")  # Log the resources
    assert len(resources) > 0
    assert any(str(r.uri) == "zoho://forms" for r in resources)
    assert any(str(r.uri) == "zoho://forms/test_form" for r in resources)

    # Read a resource
    resource = await client_session.read_resource("zoho://forms/test_form")
    assert resource.mimeType == "application/json"
    assert "form" in resource.text
    assert "records" in resource.text

@pytest.mark.asyncio
async def test_server_tools(client_session: ClientSession):
    """Test server tools functionality."""
    # List available tools
    tools = await client_session.list_tools()
    assert any(tool.name == "create-record" for tool in tools)

    # Call a tool
    result = await client_session.call_tool(
        "create-record",
        arguments={
            "form_name": "test_form",
            "data": {"test_field": "test_value"}
        }
    )
    assert len(result) == 1
    assert result[0].type == "text"
    assert "created successfully" in result[0].text

@pytest.mark.asyncio
async def test_real_server_resources(client_session: ClientSession):
    """Test server resources functionality with real data."""
    # List available resources
    resources = await client_session.list_resources()
    print("Resources:", resources)  # Log the resources
    assert len(resources) > 0
    assert any(str(r.uri) == "zoho://forms" for r in resources)
    assert any(str(r.uri) == "zoho://form/Company_Info" for r in resources)

    # Read a resource
    resource = await client_session.read_resource("zoho://report/Company_All_Data")
    print("Resource:", resource)  # Log the resource
    assert resource.mimeType == "application/json"
    assert "form" in resource.text
    assert "records" in resource.text

@pytest.mark.asyncio
async def test_real_server_tools(client_session: ClientSession):
    """Test server tools functionality with real data."""
    # List available tools
    tools = await client_session.list_tools()
    print("Tools:", tools)  # Log the tools
    assert any(tool.name == "create-record" for tool in tools)

    # Call a tool
    result = await client_session.call_tool(
        "create-record",
        arguments={
            "form_name": "test_form",
            "data": {"test_field": "test_value"}
        }
    )
    print("Tool Call Result:", result)  # Log the result
    assert len(result) == 1
    assert result[0].type == "text"
    assert "created successfully" in result[0].text

```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/models.py:
--------------------------------------------------------------------------------

```python
# src_scaflog_zoho_mcp_server/models.py

from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field

class ZohoField(BaseModel):
    """Represents a field in a Zoho Creator form."""
    link_name: str = Field(..., description="The API name of the field")
    display_name: str = Field(..., description="The display name of the field")
    field_type: int = Field(..., description="The type of the field (e.g., text, number)")
    required: bool = Field(default=False, description="Indicates if the field is mandatory")
    unique: bool = Field(default=False, description="Indicates if the field must be unique")
    max_char: Optional[int] = Field(default=None, description="Maximum character length for the field")
    lookup: Optional[bool] = Field(default=None, description="Indicates if the field is a lookup field")
    choices: Optional[List[dict]] = Field(default=None, description="List of choices for the field if applicable")

class ZohoForm(BaseModel):
    """Represents a form in Zoho Creator."""
    link_name: str = Field(..., description="API link name of the form")
    display_name: str = Field(..., description="Display name of the form")
    type: int = Field(..., description="Type of the form")
    fields: List[ZohoField] = Field(default_factory=list)

class ZohoRecord(BaseModel):
    """Represents a record in a Zoho Creator form."""
    id: str
    form_link_name: str
    data: Dict[str, Any]
    # Remove created_time and modified_time if they are not in the response
    # created_time: Optional[datetime] = None
    # modified_time: Optional[datetime] = None

class ZohoReport(BaseModel):
    """Represents a report in Zoho Creator."""
    link_name: str = Field(..., description="API link name of the report")
    display_name: str = Field(..., description="Display name of the report")
    type: int = Field(..., description="Type of the reprt")

class Cache:
    """Simple cache for form metadata."""
    def __init__(self, ttl_seconds: int = 300):
        self.forms: Dict[str, ZohoForm] = {}
        self.reports: Dict[str, ZohoReport] = {}
        self.ttl = ttl_seconds
        self.last_refresh: Optional[datetime] = None

    def needs_refresh(self) -> bool:
        """Check if cache needs refreshing."""
        if not self.last_refresh:
            return True
        return (datetime.now() - self.last_refresh).total_seconds() > self.ttl

    def update_forms(self, forms: List[ZohoForm]):
        """Update cached forms."""
        self.forms = {form.link_name: form for form in forms}
        self.last_refresh = datetime.now()

    def get_form(self, link_name: str) -> Optional[ZohoForm]:
        """Get a form from cache by link name."""
        return self.forms.get(link_name)
    
    def update_reports(self, reports: List[ZohoReport]):
        """Update cached reports."""
        self.reports = {report.link_name: report for report in reports}
        self.last_refresh = datetime.now()


```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
# tests/conftest.py
import pytest
from unittest.mock import AsyncMock
from typing import AsyncGenerator, Generator
import httpx
from datetime import datetime
import os
from pathlib import Path
import sys
from dotenv import load_dotenv  # Import dotenv to load environment variables
import logging  # Import logging

# Import the necessary classes
from scaflog_zoho_mcp_server.config import ZohoCreatorConfig
from scaflog_zoho_mcp_server.auth import ZohoAuth
from scaflog_zoho_mcp_server.service import ZohoCreatorService
from mcp import ClientSession, StdioServerParameters  # Add this import
from mcp.client.stdio import stdio_client  # Add this import

# Configure logging to write to a file
logging.basicConfig(
    filename='app.log',  # Specify the log file name
    filemode='a',        # Append mode
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.INFO   # Set the logging level
)

# Log that the tests are starting
logging.info("Starting tests...")

# Load environment variables from .env file
load_dotenv()

@pytest.fixture(scope="session")
def test_env() -> Generator[dict, None, None]:
    """Create a test environment with necessary configuration."""
    env = {
        "ZOHO_CLIENT_ID": os.getenv("ZOHO_CLIENT_ID"),
        "ZOHO_CLIENT_SECRET": os.getenv("ZOHO_CLIENT_SECRET"),
        "ZOHO_REFRESH_TOKEN": os.getenv("ZOHO_REFRESH_TOKEN"),
        "ZOHO_ORGANIZATION_ID": os.getenv("ZOHO_ORGANIZATION_ID"),
        "ZOHO_ENVIRONMENT": os.getenv("ZOHO_ENVIRONMENT"),
    }
    
    yield env

@pytest.fixture
async def mock_service() -> AsyncGenerator[ZohoCreatorService, None]:
    """Create a service with actual data from Zoho Creator."""
    config = ZohoCreatorConfig(
        client_id=os.getenv("ZOHO_CLIENT_ID"),
        client_secret=os.getenv("ZOHO_CLIENT_SECRET"),
        refresh_token=os.getenv("ZOHO_REFRESH_TOKEN"),
        organization_id=os.getenv("ZOHO_ORGANIZATION_ID"),
        environment=os.getenv("ZOHO_ENVIRONMENT")
    )
    auth = ZohoAuth(config)
    
    # Create a new instance of ZohoCreatorService
    service = ZohoCreatorService(auth)

    # Create a new HTTP client for each test
    service._client = httpx.AsyncClient()  # Create a new client instance

    yield service

    await service._client.aclose()  # Close the client after the test
    await service.close()  # Close the service

@pytest.fixture
async def client_session(test_env) -> AsyncGenerator[ClientSession, None]:
    """Create a client session connected to the test server."""
    python_path = sys.executable
    project_root = Path(__file__).parent.parent
    
    server_params = StdioServerParameters(
        command=python_path,
        args=["-m", "src"],
        env={
            **os.environ,
            **test_env,
            "PYTHONPATH": str(project_root)
        }
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            yield session

```

--------------------------------------------------------------------------------
/tests/test_service.py:
--------------------------------------------------------------------------------

```python
# tests/test_service.py
import pytest
from datetime import datetime
import logging

# Configure logging for the test
logging.basicConfig(level=logging.INFO)

from scaflog_zoho_mcp_server.service import ZohoCreatorService

@pytest.mark.asyncio
async def test_list_forms(mock_service: ZohoCreatorService):
    """Test listing forms."""
    logging.info("Starting test_list_forms...")
    forms = await mock_service.list_forms(force_refresh=True)
    logging.info(f"Fetched forms: {[form.display_name for form in forms]}")  # Log the display names of the forms
    
    assert len(forms) > 0  # Ensure that at least one form is returned
    assert all(hasattr(form, 'link_name') for form in forms)  # Check that each form has a link_name
    assert all(hasattr(form, 'display_name') for form in forms)  # Check that each form has a display_name

@pytest.mark.asyncio
async def test_get_records(mock_service: ZohoCreatorService):
    """Test getting records."""
    logging.info("Starting test_get_records...")
    records = await mock_service.get_records("test_form")
    logging.info(f"Fetched records: {records}")  # Log the fetched records
    assert len(records) == 1
    assert records[0].id == "123"
    assert records[0].data["ID"] == "test_value"

@pytest.mark.asyncio
async def test_create_record(mock_service: ZohoCreatorService):
    """Test creating a record."""
    record = await mock_service.create_record(
        "test_form",
        {"test_field": "new_value"}
    )
    assert record.id == "123"
    assert record.form_link_name == "test_form"
    assert record.data["test_field"] == "new_value"

@pytest.mark.asyncio
async def test_update_record(mock_service: ZohoCreatorService):
    """Test updating a record."""
    record = await mock_service.update_record(
        "test_form",
        "123",
        {"test_field": "updated_value"}
    )
    assert record.id == "123"
    assert record.form_link_name == "test_form"
    assert record.data["test_field"] == "updated_value"

@pytest.mark.asyncio
async def test_fetch_data(mock_service):
    logging.info("Starting test_fetch_data...")
    data = await mock_service.fetch_data()
    logging.info(f"Fetched data: {data}")
    assert data is not None  # Example assertion

@pytest.mark.asyncio
async def test_fetch_all_records(mock_service: ZohoCreatorService):
    """Test fetching all records from the Company_Info report."""
    logging.info("Starting test_fetch_all_records...")
    
    # Fetch all records for the report "Company_All_Data_Report"
    records = await mock_service.get_records("Company_All_Data")  # Use the report link name
    
    # Log the fetched records
    logging.info(f"Fetched records: {records}")
    
    # Assertions to verify the records
    assert len(records) > 0  # Ensure that at least one record is returned
    for record in records:
        assert isinstance(record.id, str)  # Ensure each record has a valid ID
        # assert "Company_Info" in record.data  # Ensure the record contains data for the form

# You can add more tests below...

```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/service.py:
--------------------------------------------------------------------------------

```python
# src_scaflog_zoho_mcp_server/service.py

from typing import List, Optional, Dict, Any
import httpx
from datetime import datetime
import logging

from .models import ZohoForm, ZohoReport, ZohoField, ZohoRecord, Cache
from .auth import ZohoAuth
from .config import API_BASE_URL

# Configure logging to write to a file
logging.basicConfig(
    filename='app.log',  # Specify the log file name
    filemode='a',        # Append mode
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.INFO   # Set the logging level
)

class ZohoCreatorService:
    """Service for interacting with Zoho Creator API."""
    
    def __init__(self, auth: ZohoAuth):
        self.auth = auth
        self.cache = Cache()
        self._client = httpx.AsyncClient(timeout=httpx.Timeout(60.0))
        self.base_url = API_BASE_URL[auth.config.environment]

    async def list_forms(self, force_refresh: bool = False) -> List[ZohoForm]:
        """Get all available forms."""
        if not force_refresh and not self.cache.needs_refresh():
            return list(self.cache.forms.values())

        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/forms"
        
        response = await self._client.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        logging.info(f"Response from list_forms: {data}")  # Log the entire response

        forms = []
        for form_data in data['forms'][:10]:
            logging.info(f"Processing form: {form_data['link_name']}")  # Log the link_name
            fields = await self._get_form_fields(form_data['link_name'], headers)
            form = ZohoForm(
                link_name=form_data['link_name'],
                display_name=form_data['display_name'],
                fields=fields,
                type=form_data['type']
            )
            forms.append(form)

        self.cache.update_forms(forms)
        return forms

    async def _get_form_fields(self, form_link_name: str, headers: dict) -> List[ZohoField]:
        """Get fields for a specific form."""
        url = f"{self.base_url}/form/{form_link_name}/fields"
        
        response = await self._client.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        # logging.info(f"Response from _get_form_fields: {data}")  # Log the entire response

        return [
            ZohoField(
                link_name=field['link_name'],
                display_name=field['display_name'],
                field_type=field['type'],
                required=field['mandatory'],
                unique=field['unique'],
                max_char=field.get('max_char'),
                lookup=field.get('is_lookup_field'),
                choices=field.get('choices')
            )
            for field in data['fields']
        ]

    async def list_reports(self, force_refresh: bool = False) -> List[ZohoReport]:
        """Get all available reports."""
        if not force_refresh and not self.cache.needs_refresh():
            return list(self.cache.reports.values())

        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/reports"
        
        response = await self._client.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        logging.info(f"Response from list_reports: {data}")  # Log the entire response

        reports = []
        for form_data in data['reports'][:10]:
            logging.info(f"Processing reporrt: {form_data['link_name']}")  # Log the link_name
            form = ZohoReport(
                link_name=form_data['link_name'],
                display_name=form_data['display_name'],
                type=form_data['type']
            )
            reports.append(form)

        self.cache.update_reports(reports)
        return reports

    async def get_records(
        self,
        report_link_name: str,
        criteria: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[ZohoRecord]:
        """Get records from a specific report."""
        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/report/{report_link_name}"
        
        params = {}
        if criteria:
            params['criteria'] = criteria
        if limit:
            params['limit'] = limit

        async with self._client as client:
            response = await client.get(url, headers=headers, params=params)
            response.raise_for_status()
            data = response.json()

            return [
                ZohoRecord(
                    id=record['ID'],
                    form_link_name=report_link_name,
                    data=record
                )
                for record in data['data']
            ]

    async def get_record(
        self,
        report_link_name: str,
        record_id: str
    ) -> ZohoRecord:
        """Get a specific record by ID."""
        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/report/{report_link_name}/{record_id}"
        
        async with self._client as client:
            response = await client.get(url, headers=headers)
            response.raise_for_status()
            result = response.json()

            return ZohoRecord(
                id=record_id,
                form_link_name=report_link_name,
                data=result['data']
            )

    async def create_record(
        self,
        form_link_name: str,
        data: Dict[str, Any]
    ) -> ZohoRecord:
        """Create a new record in a form."""
        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/form/{form_link_name}"
        
        async with self._client as client:
            response = await client.post(
                url,
                headers=headers,
                json={"data": data}
            )
            response.raise_for_status()
            result = response.json()

            return ZohoRecord(
                id=result['record']['ID'],
                form_link_name=form_link_name,
                data=data
            )

    async def update_record(
        self,
        report_link_name: str,
        record_id: str,
        data: Dict[str, Any]
    ) -> ZohoRecord:
        """Update an existing record in a form."""
        headers = await self.auth.get_authorized_headers()
        url = f"{self.base_url}/report/{report_link_name}/{record_id}"
        
        async with self._client as client:
            response = await client.patch(
                url,
                headers=headers,
                json={"data": data}
            )
            response.raise_for_status()
            result = response.json()

            return ZohoRecord(
                id=record_id,
                form_link_name=report_link_name,
                data=data
            )

    async def close(self):
        """Close HTTP client."""
        await self._client.aclose()

    async def fetch_data(self):
        logging.info("Fetching data from the API...")
        try:
            response = await self._client.get("your_api_endpoint")
            response.raise_for_status()  # Raise an error for bad responses
            logging.info(f"Fetched data: {response.json()}")
            return response.json()
        except Exception as e:
            logging.error(f"Error fetching data: {e}")
            return None
        
```

--------------------------------------------------------------------------------
/src/scaflog_zoho_mcp_server/server.py:
--------------------------------------------------------------------------------

```python
# src_scaflog_zoho_mcp_server/server.py

import json
import logging
from typing import Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from pydantic import AnyUrl

from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
import mcp.types as types
import mcp.server.stdio

from .config import load_config, API_BASE_URL
from .auth import ZohoAuth
from .service import ZohoCreatorService
from .resource_config import WHITELISTED_RESOURCES

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Create a server instance
server = Server("scaflog-zoho-mcp-server")
config = load_config()
auth = ZohoAuth(config)
service = ZohoCreatorService(auth)

@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    """List available whitelisted Zoho Creator forms and reports as resources."""
    logger.debug("Starting handle_list_resources...")
    
    try:
        resources = []
        
        # Add container resources
        resources.append(
            types.Resource(
                uri=AnyUrl("zoho://forms"),
                name="Available Forms",
                description="List of available Zoho Creator forms",
                mimeType="application/json"
            )
        )
        
        resources.append(
            types.Resource(
                uri=AnyUrl("zoho://reports"),
                name="Available Reports",
                description="List of available Zoho Creator reports",
                mimeType="application/json"
            )
        )
        
        # Add whitelisted forms
        for link_name, form_config in WHITELISTED_RESOURCES["forms"].items():
            resources.append(
                types.Resource(
                    uri=AnyUrl(f"zoho://form/{link_name}"),
                    name=form_config.display_name,
                    description=form_config.description,
                    mimeType="application/json"
                )
            )
        
        # Add whitelisted reports
        for link_name, report_config in WHITELISTED_RESOURCES["reports"].items():
            resources.append(
                types.Resource(
                    uri=AnyUrl(f"zoho://report/{link_name}"),
                    name=report_config.display_name,
                    description=report_config.description,
                    mimeType="application/json"
                )
            )
        
        return resources
    
    except Exception as e:
        logger.exception("Error in handle_list_resources")
        raise

@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> types.TextResourceContents | types.BlobResourceContents:
    """Read data from Zoho Creator based on the resource URI, filtered by whitelist."""
    try:
        logger.info(f"Reading resource: {uri}")
        parsed = urlparse(str(uri))
        
        if parsed.scheme != "zoho":
            raise ValueError(f"Unsupported URI scheme: {parsed.scheme}")
        
        full_path = f"{parsed.netloc}{parsed.path}".strip("/")
        path_parts = full_path.split("/")
        
        if not path_parts:
            raise ValueError("Empty resource path")
            
        resource_type = path_parts[0]
        
        # Handle root resources
        if resource_type == "forms":
            return types.TextResourceContents(
                uri=uri,
                mimeType="application/json",
                text=json.dumps({
                    "forms": [
                        {
                            "link_name": link_name,
                            "display_name": form.display_name,
                            "description": form.description,
                            "fields": {
                                field_name: field.dict() 
                                for field_name, field in form.fields.items()
                            }
                        }
                        for link_name, form in WHITELISTED_RESOURCES["forms"].items()
                    ]
                }, indent=2)
            )
            
        elif resource_type == "reports":
            return types.TextResourceContents(
                uri=uri,
                mimeType="application/json",
                text=json.dumps({
                    "reports": [
                        {
                            "link_name": link_name,
                            "display_name": report.display_name,
                            "description": report.description,
                            "fields": {
                                field_name: field.dict() 
                                for field_name, field in report.fields.items()
                            }
                        }
                        for link_name, report in WHITELISTED_RESOURCES["reports"].items()
                    ]
                }, indent=2)
            )
        
        # Handle specific resources
        if len(path_parts) < 2:
            raise ValueError(f"Missing link name for resource type: {resource_type}")
            
        link_name = path_parts[1]
        
        if resource_type == "form":
            # Check if form is whitelisted
            form_config = WHITELISTED_RESOURCES["forms"].get(link_name)
            if not form_config:
                raise ValueError(f"Form not found or not accessible: {link_name}")
            
            # Get form data from Zoho
            records = await service.get_records(link_name)
            
            # Filter fields based on whitelist
            filtered_records = [
                {
                    field_name: record.data.get(field_name)
                    for field_name in form_config.fields.keys()
                    if field_name in record.data
                }
                for record in records
            ]
            
            return types.TextResourceContents(
                uri=uri,
                mimeType="application/json",
                text=json.dumps({
                    "form": {
                        "link_name": link_name,
                        "display_name": form_config.display_name,
                        "description": form_config.description,
                        "fields": {
                            name: field.dict() 
                            for name, field in form_config.fields.items()
                        }
                    },
                    "records": filtered_records
                }, indent=2)
            )
            
        elif resource_type == "report":
            # Check if report is whitelisted
            report_config = WHITELISTED_RESOURCES["reports"].get(link_name)
            if not report_config:
                raise ValueError(f"Report not found or not accessible: {link_name}")
            
            # Get report data from Zoho
            records = await service.get_records(link_name)
            
            # Filter fields based on whitelist
            filtered_records = [
                {
                    field_name: record.data.get(field_name)
                    for field_name in report_config.fields.keys()
                    if field_name in record.data
                }
                for record in records
            ]
            
            return types.TextResourceContents(
                uri=uri,
                mimeType="application/json",
                text=json.dumps({
                    "report": {
                        "link_name": link_name,
                        "display_name": report_config.display_name,
                        "description": report_config.description,
                        "fields": {
                            name: field.dict() 
                            for name, field in report_config.fields.items()
                        }
                    },
                    "records": filtered_records
                }, indent=2)
            )
        
        else:
            raise ValueError(f"Unknown resource type: {resource_type}")
            
    except Exception as e:
        logger.exception(f"Error reading resource: {uri}")
        raise

async def main():
    """Main entry point for the server."""
    logger.info("Starting Zoho Creator MCP server...")
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        try:
            logger.info("Initializing server connection...")
            await server.run(
                read_stream,
                write_stream,
                InitializationOptions(
                    server_name="scaflog-zoho-mcp-server",
                    server_version="0.1.0",
                    capabilities=server.get_capabilities(
                        notification_options=NotificationOptions(),
                        experimental_capabilities={},
                    ),
                ),
            )
        except Exception as e:
            logger.exception("Error running server")
            raise
        finally:
            logger.info("Shutting down server...")
            await auth.close()
            await service.close()

```