#
tokens: 30625/50000 14/14 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── filesystem_server
│   ├── example_filesystem_server.py
│   ├── filesystem_server_mcp.py
│   └── test_filesystem_server.py
├── google_forms_mcp
│   ├── __init__.py
│   ├── google_forms_example_run.py
│   ├── google_forms_server_mcp.py
│   └── test_google_forms_server_mcp.py
├── markitdown_camel
│   ├── camel_markitdown_client.py
│   ├── how_to_run.md
│   └── streamlit_markitdown_app.py
├── README.md
├── sql_server
│   ├── sql_example_run.py
│   ├── sql_mcp_test.py
│   └── sql_server_mcp.py
└── ssss.png
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP-Servers

This repo contains mcp servers for various use cases , binded with use cases with CAMEL-AI

1. [Filesystem Server](https://github.com/parthshr370/MCP-Servers/tree/main/filesystem_server)
2. [SQL Server](https://github.com/parthshr370/MCP-Servers/tree/main/sql_server)
3. [Markdown Server with Note taking usecase](https://github.com/parthshr370/Md-Notes-Buddy) and [Normal Doc to Md](https://github.com/parthshr370/MCP-Servers/tree/main/markitdown_camel)

```

--------------------------------------------------------------------------------
/google_forms_mcp/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/filesystem_server/test_filesystem_server.py:
--------------------------------------------------------------------------------

```python
import asyncio  # noqa: F401
import os
import tempfile

import pytest

# Import the async tools you want to test.
# Adjust the import path if necessary.
from filesystem_server_mcp import (
    list_directory,
    read_file,
)


@pytest.mark.asyncio
async def test_read_file_success():
    """
    Test that read_file returns the correct file contents when given a valid file.
    """
    # Create a temporary file with known content.
    with tempfile.NamedTemporaryFile(mode="w+", delete=False) as tmp:
        tmp.write("Hello, Camel AI!\n")
        tmp_path = tmp.name

    try:
        # Call the read_file tool and remove trailing whitespace.
        result = await read_file(file_path=tmp_path)
        # Check that the result matches the content (newline removed by rstrip).
        assert result == "Hello, Camel AI!"
    finally:
        # Clean up the temporary file.
        os.remove(tmp_path)

@pytest.mark.asyncio
async def test_read_file_error():
    """
    Test that read_file returns an error message when the file does not exist.
    """
    non_existent_file = "this_file_does_not_exist.txt"
    result = await read_file(file_path=non_existent_file)
    # Check that the error message is returned.
    assert "Error reading file" in result

@pytest.mark.asyncio
async def test_list_directory_success():
    """
    Test that list_directory returns the correct list of entries for a valid directory.
    """
    # Create a temporary directory.
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Create a few temporary files in the directory.
        file_names = ["file1.txt", "file2.txt", "file3.txt"]
        for name in file_names:
            file_path = os.path.join(tmp_dir, name)
            with open(file_path, "w") as f:
                f.write("Test content")

        # Call the list_directory tool.
        result = await list_directory(directory_path=tmp_dir)
        # Convert the newline-separated result into a list.
        entries = result.split("\n")

        # Check that every file we created is in the directory listing.
        for name in file_names:
            assert name in entries

@pytest.mark.asyncio
async def test_list_directory_error():
    """
    Test that list_directory returns an error message when the directory does not exist.
    """
    non_existent_directory = "this_directory_does_not_exist"
    result = await list_directory(directory_path=non_existent_directory)
    assert "Error listing directory" in result
```

--------------------------------------------------------------------------------
/filesystem_server/filesystem_server_mcp.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
import asyncio  # noqa: F401
from mcp.server.fastmcp import FastMCP
from camel.logger import get_logger

logger = get_logger(__name__)
mcp = FastMCP("filesystem")

@mcp.tool()
async def read_file(file_path: str) -> str:
    r"""Reads the content of the file at the given file path.
    Args:
        file_path (str): The path to the file.
    Returns:
        str: The content of the file with trailing whitespace removed, or an error message if reading fails.
    """
    logger.info(f"read_file triggered with file_path: {file_path}")
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read()
        return content.rstrip()
    except Exception as e:
        return f"Error reading file '{file_path}': {e}"

read_file.inputSchema = {
    "type": "object",
    "properties": {
         "file_path": {
             "type": "string",
             "title": "File Path",
             "description": "The path to the file to read. Default is 'README.md'."
         }
    },
    "required": ["file_path"]
}

@mcp.tool()
async def list_directory(directory_path: str) -> str:
    r"""Lists the contents of the specified directory.
    Args:
        directory_path (str): The path of the directory to list.
    Returns:
        str: A newline-separated string of the directory entries, or an error message if listing fails.
    """
    logger.info(f"list_directory triggered with directory_path: {directory_path}")
    try:
        entries = os.listdir(directory_path)
        return "\n".join(entry.rstrip() for entry in entries)
    except Exception as e:
        return f"Error listing directory '{directory_path}': {e}"

list_directory.inputSchema = {
    "type": "object",
    "properties": {
         "directory_path": {
             "type": "string",
             "title": "Directory Path",
             "description": "The directory path whose contents should be listed. Default is '.'."
         }
    },
    "required": ["directory_path"]
}

def main(transport: str = "stdio"):
    r"""Runs the Filesystem MCP Server.
    This server provides filesystem-related functionalities via MCP.
    Args:
        transport (str): The transport mode ('stdio' or 'sse').
    """
    if transport == 'stdio':
        mcp.run(transport='stdio')
    elif transport == 'sse':
        mcp.run(transport='sse')
    else:
        print(f"Unknown transport mode: {transport}")

if __name__ == "__main__":
    import sys
    transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio"
    main(transport_mode)

```

--------------------------------------------------------------------------------
/filesystem_server/example_filesystem_server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.toolkits import MCPToolkit
from camel.types import ModelPlatformType
from camel.toolkits.mcp_toolkit import _MCPServer

load_dotenv()

# Set your Anthropic API key (ensure this is valid).
os.environ["ANTHROPIC_API_KEY"] = # anthropic api key defined 0


async def interactive_input_loop(agent: ChatAgent):
    loop = asyncio.get_event_loop()
    print("\nEntering interactive mode. Type 'exit' at any prompt to quit.")

    while True:
        choice = await loop.run_in_executor(
            None,
            input,
            "\nChoose an action (Type 'exit' to end loop or press Enter to use current directory):\n"
            "1. Read a file\n"
            "2. List a directory\nYour choice (1/2): "
        )
        choice = choice.strip().lower()
        if choice == "exit":
            print("Exiting interactive mode.")
            break

        if choice == "1":
            file_path = await loop.run_in_executor(
                None,
                input,
                "Enter the file path to read (default: README.md): "
            )
            file_path = file_path.strip() or "README.md"
            query = f"Use the read_file tool to display the content of {file_path}. Do not generate an answer from your internal knowledge."
        elif choice == "2":
            dir_path = await loop.run_in_executor(
                None,
                input,
                "Enter the directory path to list (default: .): "
            )
            dir_path = dir_path.strip() or "."
            query = f"Call the list_directory tool to show me all files in {dir_path}. Do not answer directly."
        else:
            print("Invalid choice. Please enter 1 or 2.")
            continue

        response = await agent.astep(query)
        print(f"\nYour Query: {query}")
        print("Full Agent Response:")
        print(response.info)
        if response.msgs and response.msgs[0].content:
            print("Agent Output:")
            print(response.msgs[0].content.rstrip())
        else:
            print("No output received.")

async def main(server_transport: str = 'stdio'):
    if server_transport == 'stdio':
        # Assuming both files are in the same folder structure, determine the path to the server file.
        server_script_path = Path(__file__).resolve().parent / "filesystem_server_mcp.py"
        if not server_script_path.is_file():
            print(f"Error: Server script not found at {server_script_path}")
            return
        # Create an _MCPServer instance for our filesystem server.
        server = _MCPServer(
            command_or_url=sys.executable,
            args=[str(server_script_path)]
        )
        mcp_toolkit = MCPToolkit(servers=[server])
    else:
        mcp_toolkit = MCPToolkit("tcp://localhost:5000")

    async with mcp_toolkit.connection() as toolkit:
        tools = toolkit.get_tools()
        sys_msg = (
            "You are a helpful assistant. Always use the provided external tools for filesystem operations "
            "and answer filesystem-related questions using these tools."
        )
        model = ModelFactory.create(
            model_platform=ModelPlatformType.ANTHROPIC,
            model_type="claude-3-7-sonnet-20250219",
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model_config_dict={"temperature": 0.8, "max_tokens": 4096},
        )
        camel_agent = ChatAgent(
            system_message=sys_msg,
            model=model,
            tools=tools,
        )
        camel_agent.reset()
        camel_agent.memory.clear()
        await interactive_input_loop(camel_agent)

if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/markitdown_camel/how_to_run.md:
--------------------------------------------------------------------------------

```markdown
# How to Run MarkItDown Integrations

This guide explains how to set up and run the MarkItDown integrations created in this project: the command-line interface (CLI) powered by Camel AI, and the Streamlit web frontend.

**Project Structure Overview:**

```
markitdown/
├── packages/
│   ├── markitdown/          # Core library source
│   │   └── src/
│   ├── markitdown-mcp/       # MCP Server source
│   │   └── src/
│   └── ...
├── camel_markitdown_client.py # Camel AI client script (CLI mode)
├── streamlit_markitdown_app.py # Streamlit frontend script
├── how_to_run.md              # This file
└── ... (other project files like README.md, .git, etc.)
```

## I. Prerequisites

Before running either integration, ensure you have the following set up:

1.  **Conda Environment:** It's highly recommended to use a dedicated Conda environment (like `kratos` used during development). Activate it:
    ```bash
    conda activate kratos
    ```

2.  **Install Base Dependencies:** Install the core `markitdown` library and its dependencies in editable mode. This allows using the local source code.
    ```bash
    # Navigate to the project root directory (e.g., ~/Downloads/markitdown/)
    cd /path/to/your/markitdown

    # Install the core library and all its optional features
    pip install -e './packages/markitdown[all]'

    # Install the MCP server package (needed by markitdown library)
    pip install -e './packages/markitdown-mcp'
    ```

3.  **API Keys:**
    *   **Google Gemini:** The Camel AI client is configured to use Google Gemini. Set your API key as an environment variable:
        ```bash
        export GOOGLE_API_KEY="YOUR_GOOGLE_API_KEY_HERE"
        ```
    *   **MarkItDown Dependencies:** The underlying `markitdown` library might require other keys for specific conversions (e.g., Azure Document Intelligence, transcription services). Set these as environment variables if you plan to use those features.

## II. Running the Camel AI Client (CLI Mode)

This mode uses a Large Language Model (LLM) agent (Gemini) to interact with the `markitdown` tool via the `markitdown-mcp` server, which runs in the background.

1.  **Install Camel AI Dependencies:**
    ```bash
    # Install camel-ai with google support
    pip install "camel-ai[google]"
    ```

2.  **Ensure Prerequisites:** Make sure your Conda environment is active and your `GOOGLE_API_KEY` is exported (as described in Prerequisites).

3.  **Run the Client Script:** Execute the client script from the project root directory:
    ```bash
    python camel_markitdown_client.py
    ```

4.  **Interaction:**
    *   The script will start the background `markitdown-mcp` server automatically.
    *   It will connect to the server and initialize the Gemini agent.
    *   You will be prompted in the terminal to enter a URI (`http://`, `https://`, `file://`, `data:`).
    *   Provide a URI (e.g., `https://www.google.com` or `/absolute/path/to/your/local/file.pdf`). Local paths starting with `/` will automatically be converted to `file://` URIs.
    *   The agent will call the `convert_to_markdown` tool on the background server.
    *   The resulting Markdown (or any error messages) will be printed to your terminal.
    *   Type `exit` at the prompt to quit.

## III. Running the Streamlit Frontend (Web UI Mode)

This mode provides a web interface to directly use the `markitdown` library for conversions without involving an LLM agent or the MCP server.

1.  **Install Streamlit Dependency:**
    ```bash
    pip install streamlit
    ```

2.  **Ensure Prerequisites:** Make sure your Conda environment is active. While the Streamlit app doesn't directly use the Google API key, ensure any keys needed by `markitdown` itself for specific conversions are set.

3.  **Run the Streamlit App:** Execute the following command from the project root directory:
    ```bash
    streamlit run streamlit_markitdown_app.py
    ```
    *Note: This script is configured to automatically add the local `markitdown` source path to `sys.path`, so you should **not** need to set the `PYTHONPATH` environment variable manually for this script.* 

4.  **Interaction:**
    *   Streamlit will provide a URL (usually `http://localhost:8501`). Open this in your web browser.
    *   Select the input method: "Enter URL" or "Upload File".
    *   Provide the URL or upload your desired file.
    *   Click the "Convert to Markdown" button.
    *   A progress bar will indicate activity.
    *   The resulting Markdown will be displayed in a text area.
    *   A "Download Markdown (.md)" button will appear, allowing you to save the output.
    *   Check the terminal where you ran `streamlit run` for log messages. 
```

--------------------------------------------------------------------------------
/sql_server/sql_mcp_test.py:
--------------------------------------------------------------------------------

```python
import asyncio  # noqa: F401
import os
import tempfile
import json
import sqlite3
import pytest

# Import the async tools to test
from sql_server_mcp import (
    execute_query,
    list_tables,
    create_database,
    describe_table,
)

@pytest.mark.asyncio
async def test_create_database():
    """
    Test that create_database creates a valid SQLite database.
    """
    # Create a temporary file name
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
        tmp_path = tmp.name
    
    try:
        # Delete the file as we just want the name
        os.unlink(tmp_path)
        
        # Call the create_database tool
        result = await create_database(db_path=tmp_path)
        result_json = json.loads(result)
        
        # Check that creation was successful
        assert result_json["status"] == "success"
        
        # Verify the file exists
        assert os.path.exists(tmp_path)
        
        # Verify it's a valid SQLite database
        conn = sqlite3.connect(tmp_path)
        conn.close()
    finally:
        # Clean up the temporary file
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

@pytest.mark.asyncio
async def test_execute_query():
    """
    Test that execute_query can create a table, insert data, and query it.
    """
    # Create a temporary database
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
        tmp_path = tmp.name
    
    try:
        # Set up a test database
        conn = sqlite3.connect(tmp_path)
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)")
        cursor.execute("INSERT INTO test_table (id, name) VALUES (1, 'Test Name')")
        conn.commit()
        conn.close()
        
        # Test SELECT query
        query = "SELECT * FROM test_table"
        result = await execute_query(connection_string=tmp_path, query=query)
        result_json = json.loads(result)
        
        # Check if the result contains our test data
        assert len(result_json) == 1
        assert result_json[0]["id"] == 1
        assert result_json[0]["name"] == "Test Name"
        
        # Test INSERT query
        insert_query = "INSERT INTO test_table (id, name) VALUES (2, 'Second Test')"
        insert_result = await execute_query(connection_string=tmp_path, query=insert_query)
        insert_result_json = json.loads(insert_result)
        
        # Check if the row was inserted
        assert insert_result_json["affected_rows"] == 1
        
        # Verify the insert by querying again
        query = "SELECT * FROM test_table WHERE id = 2"
        result = await execute_query(connection_string=tmp_path, query=query)
        result_json = json.loads(result)
        
        assert len(result_json) == 1
        assert result_json[0]["id"] == 2
        assert result_json[0]["name"] == "Second Test"
    finally:
        # Clean up the temporary file
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

@pytest.mark.asyncio
async def test_list_tables():
    """
    Test that list_tables returns the correct list of tables in a database.
    """
    # Create a temporary database
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
        tmp_path = tmp.name
    
    try:
        # Set up a test database with multiple tables
        conn = sqlite3.connect(tmp_path)
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE table1 (id INTEGER PRIMARY KEY, value TEXT)")
        cursor.execute("CREATE TABLE table2 (id INTEGER PRIMARY KEY, value TEXT)")
        conn.commit()
        conn.close()
        
        # Test list_tables
        result = await list_tables(connection_string=tmp_path)
        result_json = json.loads(result)
        
        # Check if both tables are listed
        assert "tables" in result_json
        assert "table1" in result_json["tables"]
        assert "table2" in result_json["tables"]
    finally:
        # Clean up the temporary file
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

@pytest.mark.asyncio
async def test_describe_table():
    """
    Test that describe_table returns the correct schema for a table.
    """
    # Create a temporary database
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
        tmp_path = tmp.name
    
    try:
        # Set up a test database with a table that has various column types
        conn = sqlite3.connect(tmp_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE test_table (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                age INTEGER,
                salary REAL,
                hire_date TEXT
            )
        """)
        conn.commit()
        conn.close()
        
        # Test describe_table
        result = await describe_table(connection_string=tmp_path, table_name="test_table")
        result_json = json.loads(result)
        
        # Check if the schema is correct
        assert result_json["table"] == "test_table"
        
        # Check column details
        columns = {col["name"]: col for col in result_json["columns"]}
        
        assert "id" in columns
        assert columns["id"]["type"] == "INTEGER"
        assert columns["id"]["pk"] == 1
        
        assert "name" in columns
        assert columns["name"]["type"] == "TEXT"
        assert columns["name"]["notnull"] == 1
        
        assert "age" in columns
        assert columns["age"]["type"] == "INTEGER"
        
        assert "salary" in columns
        assert columns["salary"]["type"] == "REAL"
        
        assert "hire_date" in columns
        assert columns["hire_date"]["type"] == "TEXT"
    finally:
        # Clean up the temporary file
        if os.path.exists(tmp_path):
            os.remove(tmp_path)
```

--------------------------------------------------------------------------------
/sql_server/sql_example_run.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.toolkits import MCPToolkit
from camel.types import ModelPlatformType
from camel.toolkits.mcp_toolkit import _MCPServer

# Load environment variables from .env file
load_dotenv()

# Set your Anthropic API key (ensure this is valid in your .env file)
os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY")

# Create a sample database for demonstration
async def create_sample_database():
    """Create a sample SQLite database with some data for demonstration."""
    import sqlite3
    
    # Create a temporary database in the current directory
    db_path = "sample.db"
    
    # If database already exists, remove it to start fresh
    if os.path.exists(db_path):
        os.remove(db_path)
    
    # Create the database and add sample tables and data
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Create employees table
    cursor.execute("""
    CREATE TABLE employees (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        department TEXT,
        salary REAL,
        hire_date TEXT
    )
    """)
    
    # Insert sample employee data
    employees = [
        (1, 'John Doe', 'Engineering', 85000.00, '2020-01-15'),
        (2, 'Jane Smith', 'Marketing', 75000.00, '2019-05-20'),
        (3, 'Bob Johnson', 'Engineering', 95000.00, '2018-11-10'),
        (4, 'Alice Brown', 'HR', 65000.00, '2021-03-05'),
        (5, 'Charlie Davis', 'Engineering', 90000.00, '2020-08-12')
    ]
    cursor.executemany("INSERT INTO employees VALUES (?, ?, ?, ?, ?)", employees)
    
    # Create departments table
    cursor.execute("""
    CREATE TABLE departments (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        budget REAL,
        location TEXT
    )
    """)
    
    # Insert sample department data
    departments = [
        (1, 'Engineering', 1000000.00, 'Building A'),
        (2, 'Marketing', 500000.00, 'Building B'),
        (3, 'HR', 300000.00, 'Building A'),
        (4, 'Finance', 600000.00, 'Building C')
    ]
    cursor.executemany("INSERT INTO departments VALUES (?, ?, ?, ?)", departments)
    
    # Commit changes and close connection
    conn.commit()
    conn.close()
    
    print(f"Sample database created at: {db_path}")
    return db_path

# Interactive mode function to chat with the agent
async def interactive_input_loop(agent: ChatAgent, db_path: str):
    loop = asyncio.get_event_loop()
    print("\n==== SQL Assistant Interactive Mode ====")
    print("Type 'exit' at any prompt to quit.")
    print(f"\nUsing sample database at: {db_path}")
    print("\nSample queries you can try:")
    print("- Show me all tables in sample.db")
    print("- What columns are in the employees table in sample.db?")
    print("- List all employees in the Engineering department")
    print("- What is the average salary by department?")
    print("- How many employees are in each department?")
    print("- Find the employee with the highest salary")
    print("- Add a new employee named Michael Wilson to Finance with salary 82000")
    print("======================================")

    while True:
        query = await loop.run_in_executor(
            None, 
            input, 
            "\nEnter your query (or type 'exit' to quit): "
        )
        
        if query.lower() == 'exit':
            print("Exiting interactive mode.")
            break
        
        print("\nProcessing query...")
        response = await agent.astep(query)
        
        print("\nAgent Response:")
        if response.msgs and response.msgs[0].content:
            print(response.msgs[0].content.rstrip())
        else:
            print("No output received.")

# Main function to run the entire example
async def main(server_transport: str = 'stdio'):
    # First create a sample database
    db_path = await create_sample_database()
    
    if server_transport == 'stdio':
        # Determine the path to the server file
        server_script_path = Path(__file__).resolve().parent / "sql_server_mcp.py"
        if not server_script_path.is_file():
            print(f"Error: Server script not found at {server_script_path}")
            return
            
        # Create an _MCPServer instance for our SQL server
        server = _MCPServer(
            command_or_url=sys.executable,
            args=[str(server_script_path)]
        )
        mcp_toolkit = MCPToolkit(servers=[server])
    else:
        mcp_toolkit = MCPToolkit("tcp://localhost:5000")

    async with mcp_toolkit.connection() as toolkit:
        tools = toolkit.get_tools()
        sys_msg = (
            "You are a helpful SQL assistant. Use the provided external tools for database operations. "
            "Always use the tools to query the database rather than answering from your general knowledge. "
            f"The sample database is at '{db_path}'. It contains tables for employees and departments. "
            "When a user asks a question about the database, ALWAYS explicitly include the database path "
            f"'{db_path}' in your tool calls. First list the tables to understand the schema, "
            "then use describe_table to see column details before querying."
        )
        model = ModelFactory.create(
            model_platform=ModelPlatformType.ANTHROPIC,
            model_type="claude-3-7-sonnet-20250219",
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model_config_dict={"temperature": 0.5, "max_tokens": 4096},
        )
        camel_agent = ChatAgent(
            system_message=sys_msg,
            model=model,
            tools=tools,
        )
        camel_agent.reset()
        camel_agent.memory.clear()
        await interactive_input_loop(camel_agent, db_path)
        
        # Clean up the sample database after we're done
        if os.path.exists(db_path):
            os.remove(db_path)
            print(f"\nRemoved sample database: {db_path}")

# Entry point 
if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/markitdown_camel/camel_markitdown_client.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.toolkits import MCPToolkit # camels implementation of the mcp protocol
from camel.types import ModelPlatformType
from camel.toolkits.mcp_toolkit import MCPClient


# Ensure your Anthropic API key is set in your environment variables
# os.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY"
# you can also use your gemini api key here


# Starting the interactive input loop for the camel ai client 
async def interactive_input_loop(agent: ChatAgent):
    loop = asyncio.get_event_loop()
    print("\nEntering interactive mode. Type 'exit' at any prompt to quit.")
# exit conditions 
    while True:
        uri = await loop.run_in_executor(
            None,
            input,
            "\nEnter the URI (http:, https:, file:, data:) to convert to Markdown (or type 'exit'): "
        )
        uri = uri.strip()
        if uri.lower() == "exit":
            print("Exiting interactive mode.")
            break

        if not uri:
            print("URI cannot be empty.")
            continue

        # Prepend file:// scheme if it looks like a local absolute path
        if uri.startswith('/') and not uri.startswith('file://'):
            print(f"Detected local path, prepending 'file://' to URI: {uri}")
            formatted_uri = f"file://{uri}"
        else:
            formatted_uri = uri

        # The prompt clearly tells the agent which tool to use and what the parameter is.
        query = f"Use the convert_to_markdown tool to convert the content at the URI '{formatted_uri}' to Markdown. Do not generate an answer from your internal knowledge, just show the Markdown output from the tool."

        print(f"\nSending query to agent: {query}")
        response = await agent.astep(query)

        print("\nFull Agent Response Info:")
        print(response.info) # Shows tool calls and parameters

        # Check for direct message output first
        if response.msgs and response.msgs[0].content:
            print("\nAgent Output (Markdown):")
            print("-" * 20)
            print(response.msgs[0].content.rstrip())
            print("-" * 20)
        # If no direct message, check if the tool call info is available in response.info
        elif 'tool_calls' in response.info and response.info['tool_calls']:
             print("\nTool Call Response (Raw from info):")
             print("-" * 20)
             found_output = False
             # Iterate through the tool calls list in response.info
             for tool_call in response.info['tool_calls']:
                 # Camel AI structure might place output here (adjust key if needed based on ToolCallingRecord structure)
                 if hasattr(tool_call, 'result') and tool_call.result:
                     print(str(tool_call.result).rstrip())
                     found_output = True
                 # Add other potential output locations if needed

             if not found_output:
                 print("(No tool result found in tool call info)")
             print("-" * 20)
        else:
            print("No output message or tool output received.")


# main funct
async def main(server_transport: str = 'stdio'):
    if server_transport != 'stdio':
        print("Error: This client currently only supports 'stdio' transport.")
        return

    print("Starting MarkItDown MCP server in stdio mode...")
    server_command = sys.executable
    server_args = ["-m", "markitdown_mcp"]

    # Get the root directory of the script (assuming it's in the project root)
    project_root = Path(__file__).resolve().parent

    # Create an MCPClient instance, adding the cwd
    server = MCPClient(
        command_or_url=server_command,
        args=server_args,
        # Set the working directory for the server process
        env={"PYTHONPATH": str(project_root / "packages" / "markitdown-mcp" / "src") + os.pathsep + str(project_root / "packages" / "markitdown" / "src") + os.pathsep + os.environ.get("PYTHONPATH", ""), "CWD": str(project_root)},
        # Optional: timeout=None
    )

    # Pass the MCPClient object in a list
    mcp_toolkit = MCPToolkit(servers=[server])

    print("Connecting to MCP server...")
    async with mcp_toolkit.connection() as toolkit:
        print("Connection successful. Retrieving tools...")
        tools = toolkit.get_tools()
        if not tools:
            print("Error: No tools retrieved from the server. Make sure the server started correctly and defined tools.")
            return
        print(f"Tools retrieved: {[tool.func.__name__ for tool in tools]}")

        # Check if the required tool is available using func.__name__
        if not any(tool.func.__name__ == "convert_to_markdown" for tool in tools):
             print("Error: 'convert_to_markdown' tool not found on the server.")
             return

        sys_msg = (
            "You are a helpful assistant. You have access to an external tool called 'convert_to_markdown' which takes a single argument, 'uri'. "
            "When asked to convert a URI to Markdown, you MUST use this tool by providing the URI to the 'uri' parameter. "
            "Provide ONLY the Markdown output received from the tool, without any additional explanation or introductory text."
        )

        # Ensure GOOGLE_API_KEY is set in environment variables
        # print(f"DEBUG: Value of GOOGLE_API_KEY from os.getenv: {os.getenv('GOOGLE_API_KEY')}")
        api_key = os.getenv("GOOGLE_API_KEY") # Check for GOOGLE_API_KEY
        if not api_key:
            print("Error: GOOGLE_API_KEY environment variable not set.") # Update error message
            print("Please set it before running the client.")
            return

        # Configure the model for Google Gemini
        # You might need to install the camel-google extra: pip install camel-ai[google]
        try:
            model = ModelFactory.create(
                model_platform=ModelPlatformType.GEMINI, # Change platform
                # Set the desired Gemini model
                model_type="gemini-2.5-pro-preview-03-25", # Using 1.5 Pro as 2.5 is not yet a valid identifier in CAMEL AI
                api_key=api_key,
                model_config_dict={"temperature": 0.0, "max_tokens": 8192}, # Adjust config if needed
            )
        except Exception as e:
             print(f"Error creating model: {e}")
             print("Ensure you have the necessary dependencies installed (e.g., `pip install camel-ai[google]`)")
             return

        camel_agent = ChatAgent(
            system_message=sys_msg,
            model=model,
            tools=tools,
        )
        camel_agent.reset()
        camel_agent.memory.clear()

        await interactive_input_loop(camel_agent)

if __name__ == "__main__":
    # This client only supports stdio for now
    asyncio.run(main(server_transport='stdio')) 
```

--------------------------------------------------------------------------------
/markitdown_camel/streamlit_markitdown_app.py:
--------------------------------------------------------------------------------

```python
import streamlit as st
import os
import tempfile
from pathlib import Path
import re
import sys
import logging # Import logging

# --- Basic Logging Configuration --- #
logging.basicConfig(
    level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
    format='%(asctime)s - %(levelname)s - %(message)s',
    stream=sys.stderr, # Ensure logs go to stderr (terminal)
)

# --- Dynamically add local package path --- #
# Calculate the path to the 'src' directory of the local 'markitdown' package
_PROJECT_ROOT = Path(__file__).resolve().parent
_MARKITDOWN_SRC_PATH = _PROJECT_ROOT / "packages" / "markitdown" / "src"

# Add the path to sys.path if it's not already there
if str(_MARKITDOWN_SRC_PATH) not in sys.path:
    sys.path.insert(0, str(_MARKITDOWN_SRC_PATH))
    print(f"DEBUG: Added '{_MARKITDOWN_SRC_PATH}' to sys.path")

# Attempt to import the core MarkItDown class
try:
    from markitdown import MarkItDown
    from markitdown._exceptions import MarkItDownException
except ImportError as e:
    logging.error(f"Failed to import markitdown: {e}", exc_info=True)
    st.error(
        f"Failed to import the `markitdown` library or its exceptions. "
        f"Error: {e}\n"
        "Please ensure it is installed correctly (e.g., `pip install -e ./packages/markitdown`) "
        f"and that the path '{_MARKITDOWN_SRC_PATH}' exists and is accessible."
    )
    st.stop()

# --- Page Configuration ---
st.set_page_config(
    page_title="MarkItDown Converter",
    page_icon=":memo:",
    layout="wide",
)

# --- Session State Initialization ---
if 'markdown_output' not in st.session_state:
    st.session_state.markdown_output = None
if 'error_message' not in st.session_state:
    st.session_state.error_message = None
if 'input_uri' not in st.session_state:
    st.session_state.input_uri = ""

# --- Helper Functions ---
def is_valid_uri_scheme(uri):
    """Basic check for supported URI schemes."""
    return uri.startswith(("http://", "https://", "file://", "data:"))

def clean_filename(filename):
    """Remove invalid characters for filenames."""
    # Remove URL scheme if present
    name = re.sub(r'^(http|https|file|data):[\/]*', '', filename)
    # Replace problematic characters
    name = re.sub(r'[\/:*?\"<>|%#&.]+', '_', name)
    # Limit length
    return name[:100] or "converted"

# --- UI Layout ---
st.title("📝 MarkItDown Content Converter")
st.markdown(
    "Convert content from various sources (URLs or uploaded files) into Markdown."
)
st.divider()

# --- Input Method Selection ---
input_method = st.radio(
    "Select Input Method:",
    ("Enter URL", "Upload File"),
    horizontal=True,
    key="input_method_radio",
    help="Choose whether to provide a web URL or upload a local file."
)

col1, col2 = st.columns([3, 1])

with col1:
    if input_method == "Enter URL":
        st.session_state.input_uri = st.text_input(
            "Enter URL (http://, https://, data:)",
            value=st.session_state.input_uri,
            placeholder="e.g., https://www.example.com or data:text/plain;base64,...",
            key="url_input",
        )
        uploaded_file = None
    else: # Upload File
        uploaded_file = st.file_uploader(
            "Upload a file (will be converted to a `file://` URI)",
            type=None, # Allow any file type MarkItDown might support
            key="file_uploader",
        )
        st.session_state.input_uri = "" # Clear URI input if file is chosen

with col2:
    st.markdown("&nbsp;", unsafe_allow_html=True) # Vertical alignment hack
    st.markdown("&nbsp;", unsafe_allow_html=True)
    convert_button = st.button("Convert to Markdown", type="primary", use_container_width=True)

# --- Conversion Logic ---
if convert_button:
    logging.info("'Convert to Markdown' button clicked.")
    st.session_state.markdown_output = None # Clear previous output
    st.session_state.error_message = None # Clear previous error
    final_uri = None
    tmp_file_path = None # Ensure tmp_file_path is defined

    if uploaded_file is not None:
        try:
            # Save uploaded file temporarily to get a path
            # Use a consistent way to create temp files
            with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file:
                tmp_file.write(uploaded_file.getvalue())
                tmp_file_path = tmp_file.name
            logging.info(f"Uploaded file '{uploaded_file.name}' saved to temporary path: {tmp_file_path}")
            final_uri = Path(tmp_file_path).as_uri()
            logging.info(f"Processing uploaded file as URI: {final_uri}")
            st.info(f"Processing uploaded file as: {final_uri}")
        except Exception as e:
             logging.error(f"Error saving uploaded file: {e}", exc_info=True)
             st.error(f"Error handling uploaded file: {e}")
             final_uri = None # Prevent further processing

    elif st.session_state.input_uri:
        final_uri = st.session_state.input_uri.strip()
        if not is_valid_uri_scheme(final_uri):
             # Attempt to fix common local path issue
            if final_uri.startswith('/'):
                st.warning(f"Assuming '{final_uri}' is a local path. Prepending 'file://'.")
                final_uri = f"file://{final_uri}"
            else:
                st.error(f"Invalid or unsupported URI scheme in '{final_uri}'. Must start with http://, https://, file://, or data:")
                final_uri = None
        logging.info(f"Processing input URI: {final_uri}") # Log the final URI used

    else:
        logging.warning("Conversion attempt with no input URI or file.")
        st.warning("Please enter a URL or upload a file.")

    if final_uri:
        progress_bar = st.progress(0, text="Starting conversion...")
        try:
            logging.info(f"Initializing MarkItDown converter for URI: {final_uri}")
            md_converter = MarkItDown()
            progress_bar.progress(25, text=f"Converting URI: {final_uri[:100]}...")

            # Perform the conversion
            logging.info(f"Calling convert_uri for: {final_uri}")
            result = md_converter.convert_uri(final_uri)
            logging.info(f"convert_uri successful for: {final_uri}")

            progress_bar.progress(100, text="Conversion successful!")
            st.session_state.markdown_output = result.markdown
            st.success("Content successfully converted to Markdown!")

        except MarkItDownException as e:
            logging.error(f"MarkItDown Conversion Error for URI {final_uri}: {e}", exc_info=True)
            st.session_state.error_message = f"MarkItDown Conversion Error: {e}"
            st.error(st.session_state.error_message)
            progress_bar.progress(100, text="Conversion failed.")
        except Exception as e:
            logging.error(f"Unexpected Error during conversion for URI {final_uri}: {e}", exc_info=True)
            st.session_state.error_message = f"An unexpected error occurred: {e}"
            st.error(st.session_state.error_message)
            progress_bar.progress(100, text="Conversion failed.")
        finally:
            # Clean up temporary file if created
            if tmp_file_path and os.path.exists(tmp_file_path):
                try:
                    os.remove(tmp_file_path)
                    logging.info(f"Cleaned up temporary file: {tmp_file_path}")
                except Exception as e:
                     logging.error(f"Error removing temporary file {tmp_file_path}: {e}", exc_info=True)
            # Remove progress bar after completion/error
            progress_bar.empty()

st.divider()

# --- Output Display and Download ---
st.subheader("Output")

if st.session_state.error_message and not st.session_state.markdown_output:
    # Show error prominently if conversion failed and there's no output
    st.error(st.session_state.error_message)
elif st.session_state.markdown_output:
    st.text_area(
        "Generated Markdown",
        value=st.session_state.markdown_output,
        height=400,
        key="markdown_display",
        help="The Markdown generated from the source."
    )

    # Determine a sensible filename
    download_filename_base = "converted_markdown"
    if st.session_state.input_uri:
        download_filename_base = clean_filename(st.session_state.input_uri)
    elif uploaded_file:
        download_filename_base = clean_filename(uploaded_file.name)

    st.download_button(
        label="Download Markdown (.md)",
        data=st.session_state.markdown_output,
        file_name=f"{download_filename_base}.md",
        mime="text/markdown",
        key="download_button",
        use_container_width=True,
    )
else:
    st.info("Enter a URL or upload a file and click 'Convert to Markdown' to see the output here.") 
```

--------------------------------------------------------------------------------
/sql_server/sql_server_mcp.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import os
import asyncio  # noqa: F401
import sqlite3
import json
import re # Added for table name validation
from mcp.server.fastmcp import FastMCP
from camel.logger import get_logger
from pathlib import Path

logger = get_logger(__name__)
mcp = FastMCP("sqldb")

@mcp.tool()
async def execute_query(connection_string: str, query: str) -> str:
    r"""Executes the SQL query on the given database.
    Args:
        connection_string (str): The connection string or path to the SQLite database.
        query (str): The SQL query to execute.
    Returns:
        str: The result of the query as a JSON string, or an error message if execution fails.
    """
    logger.info(f"execute_query triggered with connection_string: {connection_string}")
    # For security reasons, don't log the full query in production
    logger.info(f"Query starts with: {query[:20]}...")
    
    conn = None
    try:
        conn = sqlite3.connect(connection_string)
        cursor = conn.cursor()
        
        # Execute the query
        cursor.execute(query)
        
        # Check if this is a SELECT query (has results to fetch)
        if query.strip().upper().startswith("SELECT"):
            # Get column names from cursor description
            columns = [desc[0] for desc in cursor.description]
            
            # Fetch results and format as a list of dictionaries
            results = []
            for row in cursor.fetchall():
                results.append(dict(zip(columns, row)))
            
            return json.dumps({"status": "success", "data": results}, indent=2)
        else:
            # For INSERT, UPDATE, DELETE, etc.
            conn.commit()
            affected_rows = cursor.rowcount
            return json.dumps({"status": "success", "affected_rows": affected_rows}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error executing SQL query: {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

execute_query.inputSchema = {
    "type": "object",
    "properties": {
         "connection_string": {
             "type": "string",
             "title": "Connection String",
             "description": "The connection string or path to the SQLite database."
         },
         "query": {
             "type": "string",
             "title": "SQL Query",
             "description": "The SQL query to execute."
         }
    },
    "required": ["connection_string", "query"]
}

@mcp.tool()
async def list_tables(connection_string: str) -> str:
    r"""Lists all tables in the specified database.
    Args:
        connection_string (str): The connection string or path to the SQLite database.
    Returns:
        str: A JSON string containing the list of tables, or an error message if listing fails.
    """
    logger.info(f"list_tables triggered with connection_string: {connection_string}")
    
    conn = None
    try:
        conn = sqlite3.connect(connection_string)
        cursor = conn.cursor()
        
        # Query to get all table names in SQLite
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        
        # Fetch and format results
        tables = [row[0] for row in cursor.fetchall()]
        return json.dumps({"status": "success", "tables": tables}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error listing tables: {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

list_tables.inputSchema = {
    "type": "object",
    "properties": {
         "connection_string": {
             "type": "string",
             "title": "Connection String",
             "description": "The connection string or path to the SQLite database."
         }
    },
    "required": ["connection_string"]
}

@mcp.tool()
async def create_database(db_path: str) -> str:
    r"""Creates a new SQLite database at the specified path.
    Args:
        db_path (str): The path where the new database should be created.
    Returns:
        str: A success message or an error message if creation fails.
    """
    logger.info(f"create_database triggered with db_path: {db_path}")
    
    conn = None
    try:
        # Check if file already exists
        if os.path.exists(db_path):
            return json.dumps({"status": "exists", "message": f"Database already exists at {db_path}"}, indent=2)
        
        conn = sqlite3.connect(db_path)
        conn.close()
        
        return json.dumps({"status": "success", "message": f"Database created at {db_path}"}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error creating database: {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

create_database.inputSchema = {
    "type": "object",
    "properties": {
         "db_path": {
             "type": "string",
             "title": "Database Path",
             "description": "The path where the new SQLite database should be created."
         }
    },
    "required": ["db_path"]
}

@mcp.tool()
async def describe_table(connection_string: str, table_name: str) -> str:
    r"""Describes the schema of a specified table.
    Args:
        connection_string (str): The connection string or path to the SQLite database.
        table_name (str): The name of the table to describe.
    Returns:
        str: A JSON string containing the table schema, or an error message if the operation fails.
    """
    logger.info(f"describe_table triggered with connection_string: {connection_string}, table_name: {table_name}")

    # Validate table_name to be a simple identifier to reduce SQL injection risk with PRAGMA
    if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name):
        return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2)

    conn = None
    try:
        conn = sqlite3.connect(connection_string)
        cursor = conn.cursor()
        
        # PRAGMA statements do not support placeholders for table names.
        cursor.execute(f"PRAGMA table_info({table_name});")
        
        columns = []
        for row in cursor.fetchall():
            columns.append({
                "cid": row[0],
                "name": row[1],
                "type": row[2],
                "notnull": row[3],
                "default_value": row[4],
                "pk": row[5]
            })
        
        if not columns:
             return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found or has no columns."}, indent=2)
        return json.dumps({"status": "success", "table": table_name, "columns": columns}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error describing table '{table_name}': {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

describe_table.inputSchema = {
    "type": "object",
    "properties": {
         "connection_string": {
             "type": "string",
             "title": "Connection String",
             "description": "The connection string or path to the SQLite database."
         },
         "table_name": {
             "type": "string",
             "title": "Table Name",
             "description": "The name of the table to describe."
         }
    },
    "required": ["connection_string", "table_name"]
}

@mcp.tool()
async def delete_database(db_path: str) -> str:
    r"""Deletes an existing SQLite database file at the specified path.
    Args:
        db_path (str): The path to the SQLite database file to be deleted.
    Returns:
        str: A JSON string indicating success or an error message if deletion fails.
    """
    logger.info(f"delete_database triggered with db_path: {db_path}")
    
    try:
        if not os.path.exists(db_path):
            return json.dumps({"status": "not_found", "message": f"Database file not found at {db_path}"}, indent=2)
        
        os.remove(db_path)
        return json.dumps({"status": "success", "message": f"Database deleted from {db_path}"}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error deleting database: {str(e)}"}, indent=2)

delete_database.inputSchema = {
    "type": "object",
    "properties": {
         "db_path": {
             "type": "string",
             "title": "Database Path",
             "description": "The path to the SQLite database file to be deleted."
         }
    },
    "required": ["db_path"]
}

@mcp.tool()
async def delete_table(connection_string: str, table_name: str) -> str:
    r"""Deletes a specified table from the SQLite database.
    Args:
        connection_string (str): The connection string or path to the SQLite database.
        table_name (str): The name of the table to delete.
    Returns:
        str: A JSON string indicating success or failure.
    """
    logger.info(f"delete_table triggered for table: {table_name} in db: {connection_string}")

    if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name):
        return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2)

    conn = None
    try:
        conn = sqlite3.connect(connection_string)
        cursor = conn.cursor()
        
        # Check if table exists first for a more specific message
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,))
        if cursor.fetchone() is None:
            return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found. No action taken."}, indent=2)
            
        # Table name is validated, safe to use in f-string for DROP TABLE
        cursor.execute(f"DROP TABLE {table_name};")
        conn.commit()
        
        return json.dumps({"status": "success", "message": f"Table '{table_name}' deleted successfully."}, indent=2)
    
    except Exception as e:
        return json.dumps({"status": "error", "message": f"Error deleting table '{table_name}': {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

delete_table.inputSchema = {
    "type": "object",
    "properties": {
         "connection_string": {
             "type": "string",
             "title": "Connection String",
             "description": "The connection string or path to the SQLite database."
         },
         "table_name": {
             "type": "string",
             "title": "Table Name",
             "description": "The name of the table to delete."
         }
    },
    "required": ["connection_string", "table_name"]
}

@mcp.tool()
async def get_table_row_count(connection_string: str, table_name: str) -> str:
    r"""Gets the row count for a specified table.
    Args:
        connection_string (str): The connection string or path to the SQLite database.
        table_name (str): The name of the table.
    Returns:
        str: A JSON string containing the table name and its row count, or an error message.
    """
    logger.info(f"get_table_row_count triggered for table: {table_name} in db: {connection_string}")
    
    if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name):
        return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2)

    conn = None
    try:
        conn = sqlite3.connect(connection_string)
        cursor = conn.cursor()
        
        # Table name is validated, safe to use in f-string for SELECT COUNT(*)
        cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
        count = cursor.fetchone()[0]
        
        return json.dumps({"status": "success", "table": table_name, "row_count": count}, indent=2)
    
    except sqlite3.OperationalError as e: # Catches errors like "no such table"
        return json.dumps({"status": "error", "message": f"Error getting row count for table '{table_name}': {str(e)}. Ensure table exists."}, indent=2)
    except Exception as e:
        return json.dumps({"status": "error", "message": f"An unexpected error occurred while getting row count for table '{table_name}': {str(e)}"}, indent=2)
    finally:
        if conn:
            conn.close()

get_table_row_count.inputSchema = {
    "type": "object",
    "properties": {
         "connection_string": {
             "type": "string",
             "title": "Connection String",
             "description": "The connection string or path to the SQLite database."
         },
         "table_name": {
             "type": "string",
             "title": "Table Name",
             "description": "The name of the table to get row count for."
         }
    },
    "required": ["connection_string", "table_name"]
}

async def init_db():
    db_path = Path.cwd() / "your_sqlite_database.db"
    conn = sqlite3.connect(str(db_path))
    cursor = conn.cursor()
    
    # Create a sample table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS sample_table (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL
        )
    ''')
    
    conn.commit()
    conn.close()

async def main():
    await init_db()
    
    # Keep the server running
    while True:
        await asyncio.sleep(1)

def main(transport: str = "stdio"):
    r"""Runs the SQL MCP Server.
    This server provides SQL database functionalities via MCP.
    Args:
        transport (str): The transport mode ('stdio' or 'sse').
    """
    if transport == 'stdio':
        mcp.run(transport='stdio')
    elif transport == 'sse':
        mcp.run(transport='sse')
    else:
        print(f"Unknown transport mode: {transport}")

if __name__ == "__main__":
    import sys
    transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio"
    asyncio.run(main(transport_mode))
```

--------------------------------------------------------------------------------
/google_forms_mcp/google_forms_example_run.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

"""
Example run script for the Google Forms MCP Server

This script demonstrates creating and managing Google Forms through the MCP Server.
It performs a series of operations that test all the functionality of the server.
"""

import asyncio
import os
import sys
import json
from pathlib import Path
from dotenv import load_dotenv
import argparse
from typing import Optional

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.toolkits import MCPToolkit
from camel.types import ModelPlatformType

# Load environment variables from .env file
load_dotenv()

# Set your Anthropic API key (ensure this is valid in your .env file)
os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY")

# Check if the necessary credentials file exists
def check_credentials():
    if not os.path.exists('credentials.json'):
        print("Error: credentials.json not found!")
        print("Please follow these steps to set up Google Forms API access:")
        print("1. Go to https://console.developers.google.com/")
        print("2. Create a new project or select an existing one")
        print("3. Enable the Google Forms API, Google Drive API, and Google Sheets API")
        print("4. Create credentials (OAuth client ID) for a desktop application")
        print("5. Download the credentials JSON file and save it as 'credentials.json' in this directory")
        return False
    return True

# Function to print nicely formatted JSON responses
def print_response(title, response):
    try:
        json_data = json.loads(response)
        print(f"\n{title}:")
        print(json.dumps(json_data, indent=2))
    except:
        print(f"\n{title}:")
        print(response)

# Automated test run that demonstrates all features
async def run_automated_test(tools):
    print("\n==== Google Forms MCP Server Automated Test ====")
    print("This test will demonstrate all functionalities of the Google Forms MCP Server")
    
    try:
        # Step 1: Create a new form
        print("\n--- Step 1: Creating a new form ---")
        create_form_response = await tools["create_form"](
            title="Customer Satisfaction Survey",
            description="Help us improve our services by providing your feedback"
        )
        print_response("Form created", create_form_response)
        
        # Extract form ID for further operations
        form_data = json.loads(create_form_response)
        form_id = form_data["form_id"]
        
        # Step 2: Modify form settings
        print("\n--- Step 2: Modifying form settings ---")
        settings_response = await tools["modify_form_settings"](
            form_id=form_id,
            collect_email=True,
            limit_responses=True
        )
        print_response("Form settings updated", settings_response)
        
        # Step 3: Add a section
        print("\n--- Step 3: Adding a section ---")
        section_response = await tools["add_section"](
            form_id=form_id,
            title="About Your Experience",
            description="Please tell us about your recent experience with our product/service"
        )
        print_response("Section added", section_response)
        
        # Step 4: Add multiple choice question
        print("\n--- Step 4: Adding a multiple choice question ---")
        mc_response = await tools["add_multiple_choice"](
            form_id=form_id,
            question_text="How would you rate our service?",
            choices=["Excellent", "Good", "Average", "Poor", "Very Poor"],
            required=True,
            help_text="Please select one option"
        )
        print_response("Multiple choice question added", mc_response)
        
        # Step 5: Add a checkbox question
        print("\n--- Step 5: Adding a checkbox question ---")
        checkbox_response = await tools["add_checkboxes"](
            form_id=form_id,
            question_text="Which aspects of our service did you appreciate?",
            choices=["Responsiveness", "Quality", "Value for money", "Customer support", "Other"],
            required=False,
            help_text="Select all that apply"
        )
        print_response("Checkbox question added", checkbox_response)
        
        # Step 6: Add a dropdown question
        print("\n--- Step 6: Adding a dropdown question ---")
        dropdown_response = await tools["add_dropdown"](
            form_id=form_id,
            question_text="How often do you use our service?",
            choices=["Daily", "Weekly", "Monthly", "Quarterly", "Yearly", "First time"],
            required=True
        )
        print_response("Dropdown question added", dropdown_response)
        
        # Step 7: Add a short answer question
        print("\n--- Step 7: Adding a short answer question ---")
        short_answer_response = await tools["add_short_answer"](
            form_id=form_id,
            question_text="What is your customer ID?",
            required=False,
            help_text="Please enter your customer ID if you have one"
        )
        print_response("Short answer question added", short_answer_response)
        
        # Step 8: Add a paragraph question
        print("\n--- Step 8: Adding a paragraph question ---")
        paragraph_response = await tools["add_paragraph"](
            form_id=form_id,
            question_text="Do you have any suggestions for improvement?",
            required=False,
            help_text="Please share any ideas on how we can serve you better"
        )
        print_response("Paragraph question added", paragraph_response)
        
        # Step 9: Add a file upload question
        print("\n--- Step 9: Adding a file upload question ---")
        file_upload_response = await tools["add_file_upload"](
            form_id=form_id,
            question_text="Would you like to upload any relevant documents?",
            required=False,
            help_text="You can upload screenshots or other documents"
        )
        print_response("File upload question added", file_upload_response)
        
        # Step 10: Add another section
        print("\n--- Step 10: Adding another section ---")
        section2_response = await tools["add_section"](
            form_id=form_id,
            title="Additional Information",
            description="Help us personalize our services"
        )
        print_response("Another section added", section2_response)
        
        # Step 11: List all forms
        print("\n--- Step 11: Listing all forms ---")
        list_forms_response = await tools["list_forms"]()
        print_response("Forms list", list_forms_response)
        
        # Step 12: Export responses (might not have any responses yet)
        print("\n--- Step 12: Setting up response export ---")
        export_response = await tools["export_responses"](
            form_id=form_id,
            format="sheets"
        )
        print_response("Export setup", export_response)
        
        # Step 13: Try to get responses (likely empty, but tests the functionality)
        print("\n--- Step 13: Getting responses ---")
        responses = await tools["get_responses"](
            form_id=form_id
        )
        print_response("Form responses", responses)
        
        print("\n=== Test completed successfully! ===")
        print(f"Created form can be viewed at: {form_data['view_url']}")
        print(f"Created form can be edited at: {form_data['edit_url']}")
        
    except Exception as e:
        print(f"\nError during test: {str(e)}")
        raise

# Interactive mode function to chat with the agent
async def interactive_input_loop(agent: ChatAgent):
    loop = asyncio.get_event_loop()
    print("\n==== Google Forms Assistant Interactive Mode ====")
    print("Type 'exit' at any prompt to quit.")
    print("\nSample queries you can try:")
    print("- Create a new feedback form")
    print("- Add a customer satisfaction survey with multiple choice questions")
    print("- List all my forms")
    print("- Create a job application form with sections for personal info, education, and experience")
    print("- Export responses from a form to CSV")
    print("======================================")

    while True:
        query = await loop.run_in_executor(
            None, 
            input, 
            "\nEnter your query (or type 'exit' to quit): "
        )
        
        if query.lower() == 'exit':
            print("Exiting interactive mode.")
            break
        
        print("\nProcessing query...")
        response = await agent.astep(query)
        
        print("\nAgent Response:")
        if response.msgs and response.msgs[0].content:
            print(response.msgs[0].content.rstrip())
        else:
            print("No output received.")

# Main function to run the entire example
async def main(server_transport: str = 'stdio', mode: str = 'automated', 
             server_url: Optional[str] = None):
    # First check if credentials exist
    if not check_credentials():
        return
        
    mcp_toolkit = None
    server_process = None
    
    try: # Wrap setup and execution in try
        # Configure based on transport type
        if server_transport == 'stdio':
            # Original stdio logic (may still deadlock, but kept for reference)
            print("Using stdio transport (Note: May cause deadlock during auth)")
            current_dir = Path(__file__).resolve().parent
            server_script_path = current_dir / "google_forms_server_mcp.py"
            
            print(f"Looking for server script at: {server_script_path}")
            print(f"Directory contents: {[f.name for f in current_dir.iterdir() if f.is_file()]}")
            
            if not server_script_path.is_file():
                print(f"Error: Server script not found at {server_script_path}")
                return
                
            # Use the _MCPServer helper for stdio 
            from camel.toolkits.mcp_toolkit import _MCPServer # Import locally
            server_process = _MCPServer([sys.executable, str(server_script_path), "stdio"])
            await server_process.start()
            mcp_toolkit = MCPToolkit(mcp_server_process=server_process)
            print("MCP Server started via stdio.")
            
        elif server_transport == 'sse':
            # SSE logic: Connect to an existing HTTP/SSE server
            if not server_url:
                print("Error: --server-url is required for SSE transport.")
                print("Example: python google_forms_example_run.py --transport sse --server-url http://127.0.0.1:8000")
                return
                
            print(f"Connecting to MCP Server via SSE at: {server_url}")
            mcp_toolkit = MCPToolkit(servers=[server_url])
            # Move the connection test inside the 'async with' block
            
        else:
            print(f"Error: Unsupported server transport: {server_transport}")
            return

        # Initialize the LLM model
        # Reverting to model_config_dict based on user example, and adding api_key explicitly
        anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
        if not anthropic_api_key:
             print("Error: ANTHROPIC_API_KEY not found in environment variables or .env file.")
             # Decide how to handle missing key - raise error or return
             return # Or raise ValueError("Missing Anthropic API Key")

        model = ModelFactory.create(
            model_platform=ModelPlatformType.ANTHROPIC,
            model_type="claude-3-haiku-20240307", # Use a suitable Anthropic model
            # temperature=0.0 # Replaced by model_config_dict
            api_key=anthropic_api_key, # Explicitly pass API key
            model_config_dict={"temperature": 0.0} # Use the dict from user example
        )

        # Main execution block within the try
        async with mcp_toolkit.connection() as toolkit:
            # Test connection *after* establishing context
            try:
                await toolkit.list_tools() # Use toolkit here
                print("Successfully connected to WebSocket MCP server and listed tools.")
            except Exception as e:
                print(f"Error testing connection/listing tools with server at {server_url}: {e}")
                print("Please ensure the server is running correctly.")
                print(f"Run: python google_forms_server_mcp.py --transport sse")
                return # Exit if connection test fails

            print("\nInitializing ChatAgent...")
            # Initialize ChatAgent with the MCP toolkit
            camel_agent = ChatAgent(
                model=model,
                tools=toolkit.get_tools(),
                verbose=True
            )
            print("ChatAgent initialized.")
            
            # Choose mode: automated test or interactive loop
            if mode == 'automated':
                await run_automated_test(toolkit.get_tools())
            elif mode == 'interactive':
                await interactive_input_loop(camel_agent)

    finally: # Finally block associated with the outer try
        print("\nCleaning up...")
        # Clean up resources only if server was started by this script (stdio mode)
        if server_process:
            print("Stopping stdio MCP server process...")
            try:
                await server_process.stop()
                print("Server process stopped.")
            except Exception as e:
                 print(f"Error stopping server process: {e}")
        # For WebSocket, we assume the server runs independently
        print("Cleanup complete.")

# Add argument parsing for command-line execution (this remains outside the main function)
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Google Forms MCP Example Runner")
    parser.add_argument(
        "--mode", 
        default="automated", 
        choices=["automated", "interactive"], 
        help="Run mode (default: automated)"
    )
    parser.add_argument(
        "--transport", 
        default="sse",  # Default to SSE for client too
        choices=["stdio", "sse"], # Only allow stdio or sse
        help="Server transport method (default: sse)"
    )
    parser.add_argument(
        "--server-url", 
        default="http://127.0.0.1:8000",  # Default SSE URL (HTTP)
        help="URL of the running MCP SSE server (required for sse transport)"
    )
    args = parser.parse_args()

    try:
        asyncio.run(main(server_transport=args.transport, mode=args.mode, server_url=args.server_url))
    except KeyboardInterrupt:
        print("\nExecution interrupted by user.")
```

--------------------------------------------------------------------------------
/google_forms_mcp/test_google_forms_server_mcp.py:
--------------------------------------------------------------------------------

```python
import asyncio  # noqa: F401
import os
import json
import pytest
from unittest.mock import Mock, patch, MagicMock
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

# Import the async tools to test
from google_forms_server_mcp import (
    create_form,
    add_section,
    add_short_answer,
    add_paragraph,
    add_multiple_choice,
    add_checkboxes,
    add_dropdown,
    add_file_upload,
    modify_form_settings,
    get_responses,
    export_responses,
    list_forms,
)

# Mock Google API services
@pytest.fixture
def mock_google_services():
    # Create mock services
    mock_form_service = MagicMock()
    mock_drive_service = MagicMock()
    mock_sheets_service = MagicMock()
    
    # Create mock context with mock services
    mock_context = MagicMock()
    mock_context.lifespan_context = MagicMock()
    mock_context.lifespan_context.form_service = mock_form_service
    mock_context.lifespan_context.drive_service = mock_drive_service
    mock_context.lifespan_context.sheets_service = mock_sheets_service
    
    return mock_context

@pytest.mark.asyncio
async def test_create_form(mock_google_services):
    """
    Test that create_form creates a form correctly
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    mock_create = mock_forms_obj.create.return_value
    mock_create.execute.return_value = {"formId": form_id}
    
    # Call the create_form function
    form_title = "Test Form"
    form_description = "A test form"
    result = await create_form(title=form_title, description=form_description, ctx=mock_google_services)
    result_json = json.loads(result)
    
    # Check that the form creation was successful
    assert result_json["form_id"] == form_id
    assert result_json["title"] == form_title
    assert result_json["description"] == form_description
    assert "edit_url" in result_json
    assert "view_url" in result_json
    
    # Check that the correct API calls were made
    mock_forms_obj.create.assert_called_once()
    form_body = mock_forms_obj.create.call_args[1]["body"]
    assert form_body["info"]["title"] == form_title
    assert form_body["info"]["description"] == form_description

@pytest.mark.asyncio
async def test_add_section(mock_google_services):
    """
    Test that add_section adds a section to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the get method
    mock_get = mock_forms_obj.get.return_value
    mock_get.execute.return_value = {
        "formId": form_id,
        "items": [{"item1": "data"}]
    }
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_section function
    section_title = "Test Section"
    section_description = "A test section"
    result = await add_section(
        form_id=form_id, 
        title=section_title, 
        description=section_description, 
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the section addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["section_added"] == section_title
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.get.assert_called_once_with(formId=form_id)
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == section_title
    assert update_request["requests"][0]["createItem"]["item"]["description"] == section_description
    assert "pageBreakItem" in update_request["requests"][0]["createItem"]["item"]

@pytest.mark.asyncio
async def test_modify_form_settings(mock_google_services):
    """
    Test that modify_form_settings updates form settings correctly
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the get method
    mock_get = mock_forms_obj.get.return_value
    mock_get.execute.return_value = {
        "formId": form_id,
        "settings": {"collectEmail": False}
    }
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the modify_form_settings function
    collect_email = True
    result = await modify_form_settings(
        form_id=form_id,
        collect_email=collect_email,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the settings update was successful
    assert result_json["form_id"] == form_id
    assert result_json["settings_updated"] == True
    assert result_json["collect_email"] == collect_email
    
    # Check that the correct API calls were made
    mock_forms_obj.get.assert_called_once_with(formId=form_id)
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["updateSettings"]["settings"]["collectEmail"] == collect_email
    assert update_request["requests"][0]["updateSettings"]["updateMask"] == "collectEmail"

@pytest.mark.asyncio
async def test_add_short_answer(mock_google_services):
    """
    Test that add_short_answer adds a question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_short_answer function
    question_text = "What is your name?"
    required = True
    help_text = "Please enter your full name"
    result = await add_short_answer(
        form_id=form_id, 
        question_text=question_text, 
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "short_answer"
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == False

@pytest.mark.asyncio
async def test_add_paragraph(mock_google_services):
    """
    Test that add_paragraph adds a paragraph question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_paragraph function
    question_text = "Tell us about yourself"
    required = False
    help_text = "Write a brief description"
    result = await add_paragraph(
        form_id=form_id, 
        question_text=question_text, 
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "paragraph"
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == True

@pytest.mark.asyncio
async def test_add_multiple_choice(mock_google_services):
    """
    Test that add_multiple_choice adds a multiple choice question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_multiple_choice function
    question_text = "What is your favorite color?"
    choices = ["Red", "Blue", "Green", "Yellow"]
    required = True
    help_text = "Select one option"
    result = await add_multiple_choice(
        form_id=form_id, 
        question_text=question_text,
        choices=choices,
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "multiple_choice"
    assert result_json["choices"] == choices
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    
    # Check that the choices were set correctly
    choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"]
    assert choice_question["type"] == "RADIO"
    assert len(choice_question["options"]) == len(choices)
    for i, choice in enumerate(choices):
        assert choice_question["options"][i]["value"] == choice
    assert choice_question["shuffle"] == False

@pytest.mark.asyncio
async def test_add_checkboxes(mock_google_services):
    """
    Test that add_checkboxes adds a checkboxes question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_checkboxes function
    question_text = "Which fruits do you like?"
    choices = ["Apple", "Banana", "Orange", "Strawberry"]
    required = True
    help_text = "Select all that apply"
    result = await add_checkboxes(
        form_id=form_id, 
        question_text=question_text,
        choices=choices,
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "checkboxes"
    assert result_json["choices"] == choices
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    
    # Check that the choices were set correctly
    choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"]
    assert choice_question["type"] == "CHECKBOX"
    assert len(choice_question["options"]) == len(choices)
    for i, choice in enumerate(choices):
        assert choice_question["options"][i]["value"] == choice
    assert choice_question["shuffle"] == False

@pytest.mark.asyncio
async def test_add_dropdown(mock_google_services):
    """
    Test that add_dropdown adds a dropdown question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_dropdown function
    question_text = "Select your country"
    choices = ["USA", "Canada", "UK", "Australia"]
    required = True
    help_text = "Select one"
    result = await add_dropdown(
        form_id=form_id, 
        question_text=question_text,
        choices=choices,
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "dropdown"
    assert result_json["choices"] == choices
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    
    # Check that the choices were set correctly
    choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"]
    assert choice_question["type"] == "DROP_DOWN"
    assert len(choice_question["options"]) == len(choices)
    for i, choice in enumerate(choices):
        assert choice_question["options"][i]["value"] == choice
    assert choice_question["shuffle"] == False

@pytest.mark.asyncio
async def test_add_file_upload(mock_google_services):
    """
    Test that add_file_upload adds a file upload question to a form
    """
    # Set up the mock response
    form_id = "abc123formid"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the add_file_upload function
    question_text = "Upload your resume"
    required = True
    help_text = "PDF files only please"
    result = await add_file_upload(
        form_id=form_id, 
        question_text=question_text,
        required=required,
        help_text=help_text,
        ctx=mock_google_services
    )
    result_json = json.loads(result)
    
    # Check that the question addition was successful
    assert result_json["form_id"] == form_id
    assert result_json["question_text"] == question_text
    assert result_json["type"] == "file_upload"
    assert result_json["required"] == required
    assert result_json["status"] == "success"
    
    # Check that the correct API calls were made
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text
    assert update_request["requests"][0]["createItem"]["item"]["required"] == required
    assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text
    
    # Check that it's a file upload question
    assert "fileUploadQuestion" in update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]

@pytest.mark.asyncio
async def test_get_responses(mock_google_services):
    """
    Test that get_responses retrieves form responses correctly
    """
    # Set up the mock response
    form_id = "abc123formid"
    form_title = "Test Form"
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    
    # Mock the get method
    mock_get = mock_forms_obj.get.return_value
    mock_get.execute.return_value = {
        "formId": form_id,
        "info": {"title": form_title},
        "items": [
            {
                "title": "What is your name?",
                "questionItem": {
                    "question": {
                        "questionId": "q1"
                    }
                }
            },
            {
                "title": "What is your age?",
                "questionItem": {
                    "question": {
                        "questionId": "q2"
                    }
                }
            }
        ]
    }
    
    # Mock the responses.list method
    mock_responses = mock_forms_obj.responses.return_value
    mock_responses_list = mock_responses.list.return_value
    mock_responses_list.execute.return_value = {
        "responses": [
            {
                "responseId": "resp1",
                "createTime": "2023-04-01T12:00:00Z",
                "answers": {
                    "q1": {
                        "textAnswers": {
                            "answers": [
                                {"value": "John Doe"}
                            ]
                        }
                    },
                    "q2": {
                        "textAnswers": {
                            "answers": [
                                {"value": "30"}
                            ]
                        }
                    }
                }
            }
        ]
    }
    
    # Call the get_responses function
    result = await get_responses(form_id=form_id, ctx=mock_google_services)
    result_json = json.loads(result)
    
    # Check that the responses were retrieved successfully
    assert result_json["form_id"] == form_id
    assert result_json["title"] == form_title
    assert result_json["response_count"] == 1
    assert len(result_json["responses"]) == 1
    
    # Check the response details
    response = result_json["responses"][0]
    assert response["response_id"] == "resp1"
    assert response["timestamp"] == "2023-04-01T12:00:00Z"
    assert "What is your name?" in response["answers"]
    assert response["answers"]["What is your name?"] == "John Doe"
    assert "What is your age?" in response["answers"]
    assert response["answers"]["What is your age?"] == "30"
    
    # Check that the correct API calls were made
    mock_forms_obj.get.assert_called_once_with(formId=form_id)
    mock_forms_obj.responses.assert_called_once()
    mock_responses.list.assert_called_once_with(formId=form_id)

@pytest.mark.asyncio
async def test_export_responses(mock_google_services):
    """
    Test that export_responses correctly sets up response export
    """
    # Set up the mock response
    form_id = "abc123formid"
    form_title = "Test Form"
    sheet_id = "xyz789sheetid"
    
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value
    
    # Mock form get method
    mock_get = mock_forms_obj.get.return_value
    mock_get.execute.return_value = {
        "formId": form_id,
        "info": {"title": form_title},
        "responderUri": f"https://docs.google.com/spreadsheets/d/{sheet_id}/edit"
    }
    
    # Call the export_responses function for CSV
    result_csv = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services)
    result_csv_json = json.loads(result_csv)
    
    # Call the export_responses function for sheets
    result_sheets = await export_responses(form_id=form_id, format="sheets", ctx=mock_google_services)
    result_sheets_json = json.loads(result_sheets)
    
    # Check the CSV export result
    assert result_csv_json["form_id"] == form_id
    assert result_csv_json["export_format"] == "csv"
    assert sheet_id in result_csv_json["download_link"]
    
    # Check the Sheets export result
    assert result_sheets_json["form_id"] == form_id
    assert result_sheets_json["export_format"] == "sheets"
    assert result_sheets_json["spreadsheet_id"] == sheet_id
    assert sheet_id in result_sheets_json["spreadsheet_link"]
    
    # Check that the correct API calls were made
    mock_forms_obj.get.assert_called_with(formId=form_id)

@pytest.mark.asyncio
async def test_export_responses_create_new(mock_google_services):
    """
    Test that export_responses creates a new spreadsheet when none exists
    """
    # Set up the mock response
    form_id = "abc123formid"
    form_title = "Test Form"
    new_sheet_id = "new789sheetid"
    
    mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value
    mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value
    
    # Mock form get method with no responderUri
    mock_get = mock_forms_obj.get.return_value
    mock_get.execute.return_value = {
        "formId": form_id,
        "info": {"title": form_title}
    }
    
    # Mock spreadsheet create method
    mock_create = mock_sheets_obj.create.return_value
    mock_create.execute.return_value = {"spreadsheetId": new_sheet_id}
    
    # Mock form batchUpdate method
    mock_batch_update = mock_forms_obj.batchUpdate.return_value
    mock_batch_update.execute.return_value = {"success": True}
    
    # Call the export_responses function
    result = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services)
    result_json = json.loads(result)
    
    # Check the export result
    assert result_json["form_id"] == form_id
    assert result_json["export_format"] == "csv"
    assert new_sheet_id in result_json["download_link"]
    
    # Check that the correct API calls were made
    mock_forms_obj.get.assert_called_with(formId=form_id)
    mock_sheets_obj.create.assert_called_once()
    
    # Check that we tried to link the form to the spreadsheet
    mock_forms_obj.batchUpdate.assert_called_once()
    update_request = mock_forms_obj.batchUpdate.call_args[1]["body"]
    assert update_request["requests"][0]["updateSettings"]["settings"]["responseDestination"] == "SPREADSHEET"
    assert update_request["requests"][0]["updateSettings"]["settings"]["spreadsheetId"] == new_sheet_id

@pytest.mark.asyncio
async def test_list_forms(mock_google_services):
    """
    Test that list_forms returns all forms correctly
    """
    # Set up the mock response
    mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value
    mock_list = mock_drive_obj.list.return_value
    mock_list.execute.return_value = {
        "files": [
            {
                "id": "form1",
                "name": "Customer Feedback",
                "webViewLink": "https://docs.google.com/forms/d/form1/viewform",
                "createdTime": "2023-01-15T12:00:00Z"
            },
            {
                "id": "form2",
                "name": "Job Application",
                "webViewLink": "https://docs.google.com/forms/d/form2/viewform",
                "createdTime": "2023-02-20T10:30:00Z"
            }
        ]
    }
    
    # Call the list_forms function
    result = await list_forms(ctx=mock_google_services)
    result_json = json.loads(result)
    
    # Check that the forms were listed correctly
    assert len(result_json["forms"]) == 2
    
    # Check the first form details
    form1 = result_json["forms"][0]
    assert form1["id"] == "form1"
    assert form1["name"] == "Customer Feedback"
    assert form1["url"] == "https://docs.google.com/forms/d/form1/viewform"
    assert form1["created"] == "2023-01-15T12:00:00Z"
    
    # Check the second form details
    form2 = result_json["forms"][1]
    assert form2["id"] == "form2"
    assert form2["name"] == "Job Application"
    assert form2["url"] == "https://docs.google.com/forms/d/form2/viewform"
    assert form2["created"] == "2023-02-20T10:30:00Z"
    
    # Check that the correct API calls were made
    mock_drive_obj.list.assert_called_once()
    list_args = mock_drive_obj.list.call_args[1]
    assert list_args["q"] == "mimeType='application/vnd.google-apps.form'"
    assert "id" in list_args["fields"]
    assert "name" in list_args["fields"]
    assert "webViewLink" in list_args["fields"]
    assert "createdTime" in list_args["fields"]

@pytest.mark.asyncio
async def test_list_forms_empty(mock_google_services):
    """
    Test that list_forms handles the case of no forms
    """
    # Set up the mock response with no forms
    mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value
    mock_list = mock_drive_obj.list.return_value
    mock_list.execute.return_value = {"files": []}
    
    # Call the list_forms function
    result = await list_forms(ctx=mock_google_services)
    result_json = json.loads(result)
    
    # Check that the response is formatted correctly
    assert result_json["forms"] == []
    assert "message" in result_json
    assert result_json["message"] == "No forms found"
    
    # Check that the correct API call was made
    mock_drive_obj.list.assert_called_once()
```

--------------------------------------------------------------------------------
/google_forms_mcp/google_forms_server_mcp.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

"""
Google Forms MCP Server using FastMCP

This server provides MCP tools for interacting with Google Forms.
"""

import os
import json
import pickle
from typing import List, Optional, Dict, Any
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass

import asyncio  # noqa: F401
from mcp.server.fastmcp import FastMCP, Context
from camel.logger import get_logger

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# Add necessary imports for WebSocket server
import argparse

logger = get_logger(__name__)

# Define the scopes needed for Google Forms API
SCOPES = [
    'https://www.googleapis.com/auth/forms',
    'https://www.googleapis.com/auth/drive',
    'https://www.googleapis.com/auth/spreadsheets'
]

@dataclass
class FormServices:
    """Class to hold the Google API services"""
    form_service: Any
    drive_service: Any
    sheets_service: Any


# Find the app_lifespan function in google_forms_server_mcp.py
# Look for this section (around line 60-80):

# Update the app_lifespan function in google_forms_server_mcp.py with this code

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[FormServices]:
    """Manage application lifecycle with Google API services"""
    logger.info("Initializing Google API services...")
    # Get credentials and initialize services
    creds = None
    
    # Token file stores the user's access and refresh tokens
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
            
    # If there are no valid credentials, let the user log in
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # We need to make this work with async
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            
            # Generate the authorization URL
            auth_url, _ = flow.authorization_url(prompt='consent')
            print("\n\n================================================")
            print("Go to this URL in your browser to authenticate:")
            print(f"{auth_url}")
            print("================================================\n")
            
            # Get authorization code from user input (using asyncio event loop)
            loop = asyncio.get_event_loop()
            auth_code = await loop.run_in_executor(
                None, 
                lambda: input("Enter the authorization code: ")
            )
            
            # Exchange authorization code for credentials
            flow.fetch_token(code=auth_code)
            creds = flow.credentials
            
        # Save the credentials for the next run
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    
    # Build API services
    form_service = build('forms', 'v1', credentials=creds)
    drive_service = build('drive', 'v3', credentials=creds)
    sheets_service = build('sheets', 'v4', credentials=creds)
    
    services = FormServices(
        form_service=form_service,
        drive_service=drive_service,
        sheets_service=sheets_service
    )
    
    logger.info("Google API services initialized successfully")
    
    try:
        yield services
    finally:
        # No specific cleanup needed for Google API services
        logger.info("Shutting down Google API services")

# Create the MCP server with lifespan support
mcp = FastMCP(
    "GoogleForms", 
    lifespan=app_lifespan,
    # Optionally add description, version etc.
    # description="MCP Server for Google Forms interaction.", 
    # version="0.1.0"
)

# Form Structure Tools

@mcp.tool()
async def create_form(title: str, description: str = "", ctx: Context = None) -> str:
    """
    Create a new Google Form with title and description
    
    Args:
        title (str): The title of the form
        description (str): The description of the form
    
    Returns:
        str: JSON string containing form details including ID
    """
    logger.info(f"create_form triggered with title: {title}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        form_body = {
            "info": {
                "title": title,
                "documentTitle": title
            }
        }
        
        if description:
            form_body["info"]["description"] = description
        
        result = form_service.forms().create(body=form_body).execute()
        
        return json.dumps({
            "form_id": result["formId"],
            "title": title,
            "description": description,
            "edit_url": f"https://docs.google.com/forms/d/{result['formId']}/edit",
            "view_url": f"https://docs.google.com/forms/d/{result['formId']}/viewform"
        }, indent=2)
    except Exception as e:
        return f"Error creating form: {e}"

create_form.inputSchema = {
    "type": "object",
    "properties": {
        "title": {
            "type": "string",
            "title": "Form Title",
            "description": "The title of the Google Form"
        },
        "description": {
            "type": "string",
            "title": "Form Description",
            "description": "The description of the Google Form"
        }
    },
    "required": ["title"]
}

@mcp.tool()
async def add_section(form_id: str, title: str, description: str = "", ctx: Context = None) -> str:
    """
    Add a section to a Google Form
    
    Args:
        form_id (str): The ID of the form
        title (str): The title of the section
        description (str): The description of the section
    
    Returns:
        str: JSON string containing the updated form details
    """
    logger.info(f"add_section triggered with form_id: {form_id}, title: {title}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # First, get the current form
        form = form_service.forms().get(formId=form_id).execute()
        
        # Create the update request
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": {
                            "title": title,
                            "description": description,
                            "pageBreakItem": {}
                        },
                        "location": {
                            "index": len(form.get("items", []))
                        }
                    }
                }
            ]
        }
        
        # Execute the update
        updated_form = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "section_added": title,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding section: {e}"

add_section.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "title": {
            "type": "string",
            "title": "Section Title",
            "description": "The title of the section"
        },
        "description": {
            "type": "string",
            "title": "Section Description",
            "description": "The description of the section"
        }
    },
    "required": ["form_id", "title"]
}

@mcp.tool()
async def modify_form_settings(
    form_id: str, 
    collect_email: Optional[bool] = None,
    limit_responses: Optional[bool] = None,
    response_limit: Optional[int] = None,
    ctx: Context = None
) -> str:
    """
    Modify Google Form settings
    
    Args:
        form_id (str): The ID of the form
        collect_email (bool, optional): Whether to collect email addresses
        limit_responses (bool, optional): Whether to limit responses
        response_limit (int, optional): Maximum number of responses allowed
    
    Returns:
        str: JSON string containing the updated form settings
    """
    logger.info(f"modify_form_settings triggered with form_id: {form_id}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # Get the current form
        form = form_service.forms().get(formId=form_id).execute()
        
        updates = []
        
        # Update collect email setting
        if collect_email is not None:
            updates.append({
                "updateSettings": {
                    "settings": {
                        "collectEmail": collect_email
                    },
                    "updateMask": "collectEmail"
                }
            })
        
        # Update response limit settings
        if limit_responses is not None and not limit_responses:
            updates.append({
                "updateSettings": {
                    "settings": {
                        "isQuiz": False
                    },
                    "updateMask": "isQuiz"
                }
            })
        elif limit_responses and response_limit:
            updates.append({
                "updateSettings": {
                    "settings": {
                        "limitOneResponsePerUser": True
                    },
                    "updateMask": "limitOneResponsePerUser"
                }
            })
        
        # Execute the updates if any
        if updates:
            update_request = {"requests": updates}
            updated_form = form_service.forms().batchUpdate(
                formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "settings_updated": True,
            "collect_email": collect_email,
            "limit_responses": limit_responses,
            "response_limit": response_limit
        }, indent=2)
    except Exception as e:
        return f"Error modifying form settings: {e}"

modify_form_settings.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "collect_email": {
            "type": "boolean",
            "title": "Collect Email",
            "description": "Whether to collect email addresses"
        },
        "limit_responses": {
            "type": "boolean",
            "title": "Limit Responses",
            "description": "Whether to limit responses"
        },
        "response_limit": {
            "type": "integer",
            "title": "Response Limit",
            "description": "Maximum number of responses allowed"
        }
    },
    "required": ["form_id"]
}

# Question Type Tools

@mcp.tool()
async def add_short_answer(
    form_id: str, 
    question_text: str, 
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a short answer question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_short_answer triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        question_item = {
            "title": question_text,
            "required": required,
            "textQuestion": {
                "paragraph": False
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "short_answer",
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding short answer question: {e}"

add_short_answer.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text"]
}

@mcp.tool()
async def add_paragraph(
    form_id: str, 
    question_text: str, 
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a paragraph question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_paragraph triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        question_item = {
            "title": question_text,
            "required": required,
            "textQuestion": {
                "paragraph": True
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "paragraph",
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding paragraph question: {e}"

add_paragraph.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text"]
}

@mcp.tool()
async def add_multiple_choice(
    form_id: str, 
    question_text: str, 
    choices: List[str],
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a multiple choice question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        choices (List[str]): List of choices for the multiple choice question
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_multiple_choice triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # Create choices objects
        choice_items = [{"value": choice} for choice in choices]
        
        question_item = {
            "title": question_text,
            "required": required,
            "questionItem": {
                "question": {
                    "choiceQuestion": {
                        "type": "RADIO",
                        "options": choice_items,
                        "shuffle": False
                    }
                }
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "multiple_choice",
            "choices": choices,
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding multiple choice question: {e}"

add_multiple_choice.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "choices": {
            "type": "array",
            "items": {
                "type": "string"
            },
            "title": "Choices",
            "description": "List of choices for the multiple choice question"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text", "choices"]
}

@mcp.tool()
async def add_checkboxes(
    form_id: str, 
    question_text: str, 
    choices: List[str],
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a checkboxes question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        choices (List[str]): List of choices for the checkboxes
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_checkboxes triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # Create choices objects
        choice_items = [{"value": choice} for choice in choices]
        
        question_item = {
            "title": question_text,
            "required": required,
            "questionItem": {
                "question": {
                    "choiceQuestion": {
                        "type": "CHECKBOX",
                        "options": choice_items,
                        "shuffle": False
                    }
                }
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "checkboxes",
            "choices": choices,
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding checkboxes question: {e}"

add_checkboxes.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "choices": {
            "type": "array",
            "items": {
                "type": "string"
            },
            "title": "Choices",
            "description": "List of choices for the checkboxes"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text", "choices"]
}

@mcp.tool()
async def add_dropdown(
    form_id: str, 
    question_text: str, 
    choices: List[str],
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a dropdown question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        choices (List[str]): List of choices for the dropdown
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_dropdown triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # Create choices objects
        choice_items = [{"value": choice} for choice in choices]
        
        question_item = {
            "title": question_text,
            "required": required,
            "questionItem": {
                "question": {
                    "choiceQuestion": {
                        "type": "DROP_DOWN",
                        "options": choice_items,
                        "shuffle": False
                    }
                }
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "dropdown",
            "choices": choices,
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding dropdown question: {e}"

add_dropdown.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "choices": {
            "type": "array",
            "items": {
                "type": "string"
            },
            "title": "Choices",
            "description": "List of choices for the dropdown"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text", "choices"]
}

@mcp.tool()
async def add_file_upload(
    form_id: str, 
    question_text: str, 
    required: bool = False,
    help_text: str = "",
    ctx: Context = None
) -> str:
    """
    Add a file upload question to a Google Form
    
    Args:
        form_id (str): The ID of the form
        question_text (str): The text of the question
        required (bool, optional): Whether the question is required
        help_text (str, optional): Help text for the question
    
    Returns:
        str: JSON string containing the question details
    """
    logger.info(f"add_file_upload triggered with form_id: {form_id}, question: {question_text}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        question_item = {
            "title": question_text,
            "required": required,
            "questionItem": {
                "question": {
                    "fileUploadQuestion": {
                        "folderId": None  # This will use the default folder
                    }
                }
            }
        }
        
        if help_text:
            question_item["description"] = help_text
        
        update_request = {
            "requests": [
                {
                    "createItem": {
                        "item": question_item,
                        "location": {
                            "index": 0
                        }
                    }
                }
            ]
        }
        
        result = form_service.forms().batchUpdate(
            formId=form_id, body=update_request).execute()
        
        return json.dumps({
            "form_id": form_id,
            "question_text": question_text,
            "type": "file_upload",
            "required": required,
            "status": "success"
        }, indent=2)
    except Exception as e:
        return f"Error adding file upload question: {e}"

add_file_upload.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "question_text": {
            "type": "string",
            "title": "Question Text",
            "description": "The text of the question"
        },
        "required": {
            "type": "boolean",
            "title": "Required",
            "description": "Whether the question is required"
        },
        "help_text": {
            "type": "string",
            "title": "Help Text",
            "description": "Help text for the question"
        }
    },
    "required": ["form_id", "question_text"]
}

# Response Management Tools

@mcp.tool()
async def get_responses(form_id: str, ctx: Context = None) -> str:
    """
    Get responses from a Google Form
    
    Args:
        form_id (str): The ID of the form
    
    Returns:
        str: JSON string containing the form responses
    """
    logger.info(f"get_responses triggered with form_id: {form_id}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        
        # Get the form first to check if it has a linked response sheet
        form = form_service.forms().get(formId=form_id).execute()
        
        # Get responses
        responses = form_service.forms().responses().list(formId=form_id).execute()
        
        # Simplify the response data for readability
        simplified_responses = []
        for response in responses.get("responses", []):
            answer_data = {}
            for key, value in response.get("answers", {}).items():
                question_id = key
                # Try to get the question text from the form
                question_text = "Unknown Question"
                for item in form.get("items", []):
                    if item.get("questionItem", {}).get("question", {}).get("questionId") == question_id:
                        question_text = item.get("title", "Unknown Question")
                        break
                
                # Extract the answer value based on type
                answer_value = None
                if "textAnswers" in value:
                    answer_value = value["textAnswers"]["answers"][0]["value"]
                elif "fileUploadAnswers" in value:
                    answer_value = [file.get("fileId") for file in value["fileUploadAnswers"]["answers"]]
                elif "choiceAnswers" in value:
                    answer_value = [choice.get("value") for choice in value["choiceAnswers"]["answers"]]
                
                answer_data[question_text] = answer_value
            
            simplified_responses.append({
                "response_id": response.get("responseId"),
                "timestamp": response.get("createTime"),
                "answers": answer_data
            })
        
        return json.dumps({
            "form_id": form_id,
            "title": form.get("info", {}).get("title", ""),
            "response_count": len(responses.get("responses", [])),
            "responses": simplified_responses
        }, indent=2)
    except Exception as e:
        return f"Error getting responses: {e}"

get_responses.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        }
    },
    "required": ["form_id"]
}

@mcp.tool()
async def export_responses(form_id: str, format: str = "csv", ctx: Context = None) -> str:
    """
    Export responses from a Google Form to a spreadsheet or CSV
    
    Args:
        form_id (str): The ID of the form
        format (str, optional): The format to export (csv or sheets)
    
    Returns:
        str: JSON string containing the export details
    """
    logger.info(f"export_responses triggered with form_id: {form_id}, format: {format}")
    
    try:
        services = ctx.lifespan_context
        form_service = services.form_service
        drive_service = services.drive_service
        sheets_service = services.sheets_service
        
        # Get the form
        form = form_service.forms().get(formId=form_id).execute()
        
        # Check if there's already a response spreadsheet
        response_sheet_id = None
        try:
            form_info = form_service.forms().get(formId=form_id).execute()
            if "responderUri" in form_info:
                # Extract the spreadsheet ID from the responder URI
                uri_parts = form_info["responderUri"].split("/")
                if len(uri_parts) > 5:
                    response_sheet_id = uri_parts[5]
        except Exception as e:
            logger.error(f"Error getting form: {e}")
        
        if not response_sheet_id:
            # Create a new spreadsheet for responses
            spreadsheet_body = {
                'properties': {
                    'title': f"Responses for {form.get('info', {}).get('title', 'Form')}"
                }
            }
            sheet = sheets_service.spreadsheets().create(body=spreadsheet_body).execute()
            response_sheet_id = sheet.get('spreadsheetId')
            
            # Link the form to the spreadsheet
            update_request = {
                "requests": [
                    {
                        "updateSettings": {
                            "settings": {
                                "responseDestination": "SPREADSHEET",
                                "spreadsheetId": response_sheet_id
                            },
                            "updateMask": "responseDestination,spreadsheetId"
                        }
                    }
                ]
            }
            form_service.forms().batchUpdate(formId=form_id, body=update_request).execute()
        
        # For CSV format, create a link to download as CSV
        if format.lower() == "csv":
            csv_export_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/export?format=csv"
            return json.dumps({
                "form_id": form_id,
                "export_format": "csv",
                "download_link": csv_export_link
            }, indent=2)
        else:
            # For sheets format, return the spreadsheet link
            sheets_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/edit"
            return json.dumps({
                "form_id": form_id,
                "export_format": "sheets",
                "spreadsheet_id": response_sheet_id,
                "spreadsheet_link": sheets_link
            }, indent=2)
    except Exception as e:
        return f"Error exporting responses: {e}"

export_responses.inputSchema = {
    "type": "object",
    "properties": {
        "form_id": {
            "type": "string",
            "title": "Form ID",
            "description": "The ID of the Google Form"
        },
        "format": {
            "type": "string",
            "title": "Export Format",
            "description": "The format to export (csv or sheets)",
            "enum": ["csv", "sheets"]
        }
    },
    "required": ["form_id"]
}

@mcp.tool()
async def list_forms(ctx: Context = None) -> str:
    """
    List all Google Forms created by the user
    
    Returns:
        str: JSON string containing the list of forms
    """
    logger.info("list_forms triggered")
    
    try:
        services = ctx.lifespan_context
        drive_service = services.drive_service
        
        # Search for Google Forms files
        results = drive_service.files().list(
            q="mimeType='application/vnd.google-apps.form'",
            spaces='drive',
            fields='files(id, name, webViewLink, createdTime)'
        ).execute()
        
        forms = results.get('files', [])
        
        if not forms:
            return json.dumps({"forms": [], "message": "No forms found"}, indent=2)
        
        # Format the forms list
        forms_list = []
        for form in forms:
            forms_list.append({
                "id": form.get('id'),
                "name": form.get('name'),
                "url": form.get('webViewLink'),
                "created": form.get('createdTime')
            })
        
        return json.dumps({"forms": forms_list}, indent=2)
    except Exception as e:
        return f"Error listing forms: {e}"

list_forms.inputSchema = {
    "type": "object",
    "properties": {},
    "required": []
}

# Update the main function to handle different transports using mcp.run
def main(transport: str = "stdio", host: str = "127.0.0.1", port: int = 8000):
    """Starts the MCP server using the specified transport via mcp.run()."""
    
    # Pass arguments like host and port if mcp.run supports them
    # Assuming mcp.run handles stdio and ws transports appropriately
    logger.info(f"Starting MCP server with {transport} transport...")
    try:
        # Check if mcp.run accepts host/port for relevant transports (like ws)
        # This structure assumes mcp.run handles the server lifecycles
        if transport == "ws":
             # We might need to pass host/port here if mcp.run accepts them
             # Trying without them first, assuming defaults or internal handling
             # If it fails, we may need to inspect mcp.run or FastMCP source
             # mcp.run(transport=transport) # Run with only transport for ws - Changed to sse
             # Assuming mcp.run handles sse transport correctly
             mcp.run(transport=transport)
        elif transport == "stdio":
             mcp.run(transport=transport)
        else:
             # logger.error(f"Unsupported transport type for mcp.run: {transport}")
             # Since we only support stdio and sse now, this path shouldn't be hit with arg choices
             # If it somehow is, mcp.run will raise its own error. 
             # We'll rely on mcp.run to validate the transport.
             mcp.run(transport=transport) 

    except AttributeError:
         logger.error(f"mcp.run does not support the arguments provided for transport '{transport}'. Trying without host/port.")
         try:
             mcp.run(transport=transport)
         except Exception as e:
             logger.error(f"Failed to start server with mcp.run(transport='{transport}'): {e}")
    except Exception as e:
        logger.error(f"Failed to start server with mcp.run: {e}")
        
# Add argument parsing for command-line execution
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Google Forms MCP Server")
    parser.add_argument(
        "--transport", 
        default="sse",  # Default to Server-Sent Events
        choices=["stdio", "sse"], # Only allow stdio or sse
        help="Server transport method (default: sse)"
    )
    parser.add_argument(
        "--host", 
        default="127.0.0.1", 
        help="Host address for WebSocket server (default: 127.0.0.1)"
    )
    parser.add_argument(
        "--port", 
        type=int, 
        default=8000, 
        help="Port for WebSocket server (default: 8000)"
    )
    args = parser.parse_args()
    
    main(transport=args.transport, host=args.host, port=args.port)
```