# Directory Structure ``` ├── filesystem_server │ ├── example_filesystem_server.py │ ├── filesystem_server_mcp.py │ └── test_filesystem_server.py ├── google_forms_mcp │ ├── __init__.py │ ├── google_forms_example_run.py │ ├── google_forms_server_mcp.py │ └── test_google_forms_server_mcp.py ├── markitdown_camel │ ├── camel_markitdown_client.py │ ├── how_to_run.md │ └── streamlit_markitdown_app.py ├── README.md ├── sql_server │ ├── sql_example_run.py │ ├── sql_mcp_test.py │ └── sql_server_mcp.py └── ssss.png ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # MCP-Servers 2 | 3 | This repo contains mcp servers for various use cases , binded with use cases with CAMEL-AI 4 | 5 | 1. [Filesystem Server](https://github.com/parthshr370/MCP-Servers/tree/main/filesystem_server) 6 | 2. [SQL Server](https://github.com/parthshr370/MCP-Servers/tree/main/sql_server) 7 | 3. [Markdown Server with Note taking usecase](https://github.com/parthshr370/Md-Notes-Buddy) and [Normal Doc to Md](https://github.com/parthshr370/MCP-Servers/tree/main/markitdown_camel) 8 | ``` -------------------------------------------------------------------------------- /google_forms_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /filesystem_server/test_filesystem_server.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio # noqa: F401 2 | import os 3 | import tempfile 4 | 5 | import pytest 6 | 7 | # Import the async tools you want to test. 8 | # Adjust the import path if necessary. 9 | from filesystem_server_mcp import ( 10 | list_directory, 11 | read_file, 12 | ) 13 | 14 | 15 | @pytest.mark.asyncio 16 | async def test_read_file_success(): 17 | """ 18 | Test that read_file returns the correct file contents when given a valid file. 19 | """ 20 | # Create a temporary file with known content. 21 | with tempfile.NamedTemporaryFile(mode="w+", delete=False) as tmp: 22 | tmp.write("Hello, Camel AI!\n") 23 | tmp_path = tmp.name 24 | 25 | try: 26 | # Call the read_file tool and remove trailing whitespace. 27 | result = await read_file(file_path=tmp_path) 28 | # Check that the result matches the content (newline removed by rstrip). 29 | assert result == "Hello, Camel AI!" 30 | finally: 31 | # Clean up the temporary file. 32 | os.remove(tmp_path) 33 | 34 | @pytest.mark.asyncio 35 | async def test_read_file_error(): 36 | """ 37 | Test that read_file returns an error message when the file does not exist. 38 | """ 39 | non_existent_file = "this_file_does_not_exist.txt" 40 | result = await read_file(file_path=non_existent_file) 41 | # Check that the error message is returned. 42 | assert "Error reading file" in result 43 | 44 | @pytest.mark.asyncio 45 | async def test_list_directory_success(): 46 | """ 47 | Test that list_directory returns the correct list of entries for a valid directory. 48 | """ 49 | # Create a temporary directory. 50 | with tempfile.TemporaryDirectory() as tmp_dir: 51 | # Create a few temporary files in the directory. 52 | file_names = ["file1.txt", "file2.txt", "file3.txt"] 53 | for name in file_names: 54 | file_path = os.path.join(tmp_dir, name) 55 | with open(file_path, "w") as f: 56 | f.write("Test content") 57 | 58 | # Call the list_directory tool. 59 | result = await list_directory(directory_path=tmp_dir) 60 | # Convert the newline-separated result into a list. 61 | entries = result.split("\n") 62 | 63 | # Check that every file we created is in the directory listing. 64 | for name in file_names: 65 | assert name in entries 66 | 67 | @pytest.mark.asyncio 68 | async def test_list_directory_error(): 69 | """ 70 | Test that list_directory returns an error message when the directory does not exist. 71 | """ 72 | non_existent_directory = "this_directory_does_not_exist" 73 | result = await list_directory(directory_path=non_existent_directory) 74 | assert "Error listing directory" in result ``` -------------------------------------------------------------------------------- /filesystem_server/filesystem_server_mcp.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 3 | # Licensed under the Apache License, Version 2.0 (the "License"); 4 | # you may not use this file except in compliance with the License. 5 | # You may obtain a copy of the License at 6 | # 7 | # http://www.apache.org/licenses/LICENSE-2.0 8 | # 9 | # Unless required by applicable law or agreed to in writing, software 10 | # distributed under the License is distributed on an "AS IS" BASIS, 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | # See the License for the specific language governing permissions and 13 | # limitations under the License. 14 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 15 | 16 | import os 17 | import asyncio # noqa: F401 18 | from mcp.server.fastmcp import FastMCP 19 | from camel.logger import get_logger 20 | 21 | logger = get_logger(__name__) 22 | mcp = FastMCP("filesystem") 23 | 24 | @mcp.tool() 25 | async def read_file(file_path: str) -> str: 26 | r"""Reads the content of the file at the given file path. 27 | Args: 28 | file_path (str): The path to the file. 29 | Returns: 30 | str: The content of the file with trailing whitespace removed, or an error message if reading fails. 31 | """ 32 | logger.info(f"read_file triggered with file_path: {file_path}") 33 | try: 34 | with open(file_path, "r", encoding="utf-8") as f: 35 | content = f.read() 36 | return content.rstrip() 37 | except Exception as e: 38 | return f"Error reading file '{file_path}': {e}" 39 | 40 | read_file.inputSchema = { 41 | "type": "object", 42 | "properties": { 43 | "file_path": { 44 | "type": "string", 45 | "title": "File Path", 46 | "description": "The path to the file to read. Default is 'README.md'." 47 | } 48 | }, 49 | "required": ["file_path"] 50 | } 51 | 52 | @mcp.tool() 53 | async def list_directory(directory_path: str) -> str: 54 | r"""Lists the contents of the specified directory. 55 | Args: 56 | directory_path (str): The path of the directory to list. 57 | Returns: 58 | str: A newline-separated string of the directory entries, or an error message if listing fails. 59 | """ 60 | logger.info(f"list_directory triggered with directory_path: {directory_path}") 61 | try: 62 | entries = os.listdir(directory_path) 63 | return "\n".join(entry.rstrip() for entry in entries) 64 | except Exception as e: 65 | return f"Error listing directory '{directory_path}': {e}" 66 | 67 | list_directory.inputSchema = { 68 | "type": "object", 69 | "properties": { 70 | "directory_path": { 71 | "type": "string", 72 | "title": "Directory Path", 73 | "description": "The directory path whose contents should be listed. Default is '.'." 74 | } 75 | }, 76 | "required": ["directory_path"] 77 | } 78 | 79 | def main(transport: str = "stdio"): 80 | r"""Runs the Filesystem MCP Server. 81 | This server provides filesystem-related functionalities via MCP. 82 | Args: 83 | transport (str): The transport mode ('stdio' or 'sse'). 84 | """ 85 | if transport == 'stdio': 86 | mcp.run(transport='stdio') 87 | elif transport == 'sse': 88 | mcp.run(transport='sse') 89 | else: 90 | print(f"Unknown transport mode: {transport}") 91 | 92 | if __name__ == "__main__": 93 | import sys 94 | transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio" 95 | main(transport_mode) 96 | ``` -------------------------------------------------------------------------------- /filesystem_server/example_filesystem_server.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio 2 | import os 3 | import sys 4 | from pathlib import Path 5 | from dotenv import load_dotenv 6 | 7 | from camel.agents import ChatAgent 8 | from camel.models import ModelFactory 9 | from camel.toolkits import MCPToolkit 10 | from camel.types import ModelPlatformType 11 | from camel.toolkits.mcp_toolkit import _MCPServer 12 | 13 | load_dotenv() 14 | 15 | # Set your Anthropic API key (ensure this is valid). 16 | os.environ["ANTHROPIC_API_KEY"] = # anthropic api key defined 0 17 | 18 | 19 | async def interactive_input_loop(agent: ChatAgent): 20 | loop = asyncio.get_event_loop() 21 | print("\nEntering interactive mode. Type 'exit' at any prompt to quit.") 22 | 23 | while True: 24 | choice = await loop.run_in_executor( 25 | None, 26 | input, 27 | "\nChoose an action (Type 'exit' to end loop or press Enter to use current directory):\n" 28 | "1. Read a file\n" 29 | "2. List a directory\nYour choice (1/2): " 30 | ) 31 | choice = choice.strip().lower() 32 | if choice == "exit": 33 | print("Exiting interactive mode.") 34 | break 35 | 36 | if choice == "1": 37 | file_path = await loop.run_in_executor( 38 | None, 39 | input, 40 | "Enter the file path to read (default: README.md): " 41 | ) 42 | file_path = file_path.strip() or "README.md" 43 | query = f"Use the read_file tool to display the content of {file_path}. Do not generate an answer from your internal knowledge." 44 | elif choice == "2": 45 | dir_path = await loop.run_in_executor( 46 | None, 47 | input, 48 | "Enter the directory path to list (default: .): " 49 | ) 50 | dir_path = dir_path.strip() or "." 51 | query = f"Call the list_directory tool to show me all files in {dir_path}. Do not answer directly." 52 | else: 53 | print("Invalid choice. Please enter 1 or 2.") 54 | continue 55 | 56 | response = await agent.astep(query) 57 | print(f"\nYour Query: {query}") 58 | print("Full Agent Response:") 59 | print(response.info) 60 | if response.msgs and response.msgs[0].content: 61 | print("Agent Output:") 62 | print(response.msgs[0].content.rstrip()) 63 | else: 64 | print("No output received.") 65 | 66 | async def main(server_transport: str = 'stdio'): 67 | if server_transport == 'stdio': 68 | # Assuming both files are in the same folder structure, determine the path to the server file. 69 | server_script_path = Path(__file__).resolve().parent / "filesystem_server_mcp.py" 70 | if not server_script_path.is_file(): 71 | print(f"Error: Server script not found at {server_script_path}") 72 | return 73 | # Create an _MCPServer instance for our filesystem server. 74 | server = _MCPServer( 75 | command_or_url=sys.executable, 76 | args=[str(server_script_path)] 77 | ) 78 | mcp_toolkit = MCPToolkit(servers=[server]) 79 | else: 80 | mcp_toolkit = MCPToolkit("tcp://localhost:5000") 81 | 82 | async with mcp_toolkit.connection() as toolkit: 83 | tools = toolkit.get_tools() 84 | sys_msg = ( 85 | "You are a helpful assistant. Always use the provided external tools for filesystem operations " 86 | "and answer filesystem-related questions using these tools." 87 | ) 88 | model = ModelFactory.create( 89 | model_platform=ModelPlatformType.ANTHROPIC, 90 | model_type="claude-3-7-sonnet-20250219", 91 | api_key=os.getenv("ANTHROPIC_API_KEY"), 92 | model_config_dict={"temperature": 0.8, "max_tokens": 4096}, 93 | ) 94 | camel_agent = ChatAgent( 95 | system_message=sys_msg, 96 | model=model, 97 | tools=tools, 98 | ) 99 | camel_agent.reset() 100 | camel_agent.memory.clear() 101 | await interactive_input_loop(camel_agent) 102 | 103 | if __name__ == "__main__": 104 | asyncio.run(main()) 105 | ``` -------------------------------------------------------------------------------- /markitdown_camel/how_to_run.md: -------------------------------------------------------------------------------- ```markdown 1 | # How to Run MarkItDown Integrations 2 | 3 | This guide explains how to set up and run the MarkItDown integrations created in this project: the command-line interface (CLI) powered by Camel AI, and the Streamlit web frontend. 4 | 5 | **Project Structure Overview:** 6 | 7 | ``` 8 | markitdown/ 9 | ├── packages/ 10 | │ ├── markitdown/ # Core library source 11 | │ │ └── src/ 12 | │ ├── markitdown-mcp/ # MCP Server source 13 | │ │ └── src/ 14 | │ └── ... 15 | ├── camel_markitdown_client.py # Camel AI client script (CLI mode) 16 | ├── streamlit_markitdown_app.py # Streamlit frontend script 17 | ├── how_to_run.md # This file 18 | └── ... (other project files like README.md, .git, etc.) 19 | ``` 20 | 21 | ## I. Prerequisites 22 | 23 | Before running either integration, ensure you have the following set up: 24 | 25 | 1. **Conda Environment:** It's highly recommended to use a dedicated Conda environment (like `kratos` used during development). Activate it: 26 | ```bash 27 | conda activate kratos 28 | ``` 29 | 30 | 2. **Install Base Dependencies:** Install the core `markitdown` library and its dependencies in editable mode. This allows using the local source code. 31 | ```bash 32 | # Navigate to the project root directory (e.g., ~/Downloads/markitdown/) 33 | cd /path/to/your/markitdown 34 | 35 | # Install the core library and all its optional features 36 | pip install -e './packages/markitdown[all]' 37 | 38 | # Install the MCP server package (needed by markitdown library) 39 | pip install -e './packages/markitdown-mcp' 40 | ``` 41 | 42 | 3. **API Keys:** 43 | * **Google Gemini:** The Camel AI client is configured to use Google Gemini. Set your API key as an environment variable: 44 | ```bash 45 | export GOOGLE_API_KEY="YOUR_GOOGLE_API_KEY_HERE" 46 | ``` 47 | * **MarkItDown Dependencies:** The underlying `markitdown` library might require other keys for specific conversions (e.g., Azure Document Intelligence, transcription services). Set these as environment variables if you plan to use those features. 48 | 49 | ## II. Running the Camel AI Client (CLI Mode) 50 | 51 | This mode uses a Large Language Model (LLM) agent (Gemini) to interact with the `markitdown` tool via the `markitdown-mcp` server, which runs in the background. 52 | 53 | 1. **Install Camel AI Dependencies:** 54 | ```bash 55 | # Install camel-ai with google support 56 | pip install "camel-ai[google]" 57 | ``` 58 | 59 | 2. **Ensure Prerequisites:** Make sure your Conda environment is active and your `GOOGLE_API_KEY` is exported (as described in Prerequisites). 60 | 61 | 3. **Run the Client Script:** Execute the client script from the project root directory: 62 | ```bash 63 | python camel_markitdown_client.py 64 | ``` 65 | 66 | 4. **Interaction:** 67 | * The script will start the background `markitdown-mcp` server automatically. 68 | * It will connect to the server and initialize the Gemini agent. 69 | * You will be prompted in the terminal to enter a URI (`http://`, `https://`, `file://`, `data:`). 70 | * Provide a URI (e.g., `https://www.google.com` or `/absolute/path/to/your/local/file.pdf`). Local paths starting with `/` will automatically be converted to `file://` URIs. 71 | * The agent will call the `convert_to_markdown` tool on the background server. 72 | * The resulting Markdown (or any error messages) will be printed to your terminal. 73 | * Type `exit` at the prompt to quit. 74 | 75 | ## III. Running the Streamlit Frontend (Web UI Mode) 76 | 77 | This mode provides a web interface to directly use the `markitdown` library for conversions without involving an LLM agent or the MCP server. 78 | 79 | 1. **Install Streamlit Dependency:** 80 | ```bash 81 | pip install streamlit 82 | ``` 83 | 84 | 2. **Ensure Prerequisites:** Make sure your Conda environment is active. While the Streamlit app doesn't directly use the Google API key, ensure any keys needed by `markitdown` itself for specific conversions are set. 85 | 86 | 3. **Run the Streamlit App:** Execute the following command from the project root directory: 87 | ```bash 88 | streamlit run streamlit_markitdown_app.py 89 | ``` 90 | *Note: This script is configured to automatically add the local `markitdown` source path to `sys.path`, so you should **not** need to set the `PYTHONPATH` environment variable manually for this script.* 91 | 92 | 4. **Interaction:** 93 | * Streamlit will provide a URL (usually `http://localhost:8501`). Open this in your web browser. 94 | * Select the input method: "Enter URL" or "Upload File". 95 | * Provide the URL or upload your desired file. 96 | * Click the "Convert to Markdown" button. 97 | * A progress bar will indicate activity. 98 | * The resulting Markdown will be displayed in a text area. 99 | * A "Download Markdown (.md)" button will appear, allowing you to save the output. 100 | * Check the terminal where you ran `streamlit run` for log messages. ``` -------------------------------------------------------------------------------- /sql_server/sql_mcp_test.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio # noqa: F401 2 | import os 3 | import tempfile 4 | import json 5 | import sqlite3 6 | import pytest 7 | 8 | # Import the async tools to test 9 | from sql_server_mcp import ( 10 | execute_query, 11 | list_tables, 12 | create_database, 13 | describe_table, 14 | ) 15 | 16 | @pytest.mark.asyncio 17 | async def test_create_database(): 18 | """ 19 | Test that create_database creates a valid SQLite database. 20 | """ 21 | # Create a temporary file name 22 | with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: 23 | tmp_path = tmp.name 24 | 25 | try: 26 | # Delete the file as we just want the name 27 | os.unlink(tmp_path) 28 | 29 | # Call the create_database tool 30 | result = await create_database(db_path=tmp_path) 31 | result_json = json.loads(result) 32 | 33 | # Check that creation was successful 34 | assert result_json["status"] == "success" 35 | 36 | # Verify the file exists 37 | assert os.path.exists(tmp_path) 38 | 39 | # Verify it's a valid SQLite database 40 | conn = sqlite3.connect(tmp_path) 41 | conn.close() 42 | finally: 43 | # Clean up the temporary file 44 | if os.path.exists(tmp_path): 45 | os.remove(tmp_path) 46 | 47 | @pytest.mark.asyncio 48 | async def test_execute_query(): 49 | """ 50 | Test that execute_query can create a table, insert data, and query it. 51 | """ 52 | # Create a temporary database 53 | with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: 54 | tmp_path = tmp.name 55 | 56 | try: 57 | # Set up a test database 58 | conn = sqlite3.connect(tmp_path) 59 | cursor = conn.cursor() 60 | cursor.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)") 61 | cursor.execute("INSERT INTO test_table (id, name) VALUES (1, 'Test Name')") 62 | conn.commit() 63 | conn.close() 64 | 65 | # Test SELECT query 66 | query = "SELECT * FROM test_table" 67 | result = await execute_query(connection_string=tmp_path, query=query) 68 | result_json = json.loads(result) 69 | 70 | # Check if the result contains our test data 71 | assert len(result_json) == 1 72 | assert result_json[0]["id"] == 1 73 | assert result_json[0]["name"] == "Test Name" 74 | 75 | # Test INSERT query 76 | insert_query = "INSERT INTO test_table (id, name) VALUES (2, 'Second Test')" 77 | insert_result = await execute_query(connection_string=tmp_path, query=insert_query) 78 | insert_result_json = json.loads(insert_result) 79 | 80 | # Check if the row was inserted 81 | assert insert_result_json["affected_rows"] == 1 82 | 83 | # Verify the insert by querying again 84 | query = "SELECT * FROM test_table WHERE id = 2" 85 | result = await execute_query(connection_string=tmp_path, query=query) 86 | result_json = json.loads(result) 87 | 88 | assert len(result_json) == 1 89 | assert result_json[0]["id"] == 2 90 | assert result_json[0]["name"] == "Second Test" 91 | finally: 92 | # Clean up the temporary file 93 | if os.path.exists(tmp_path): 94 | os.remove(tmp_path) 95 | 96 | @pytest.mark.asyncio 97 | async def test_list_tables(): 98 | """ 99 | Test that list_tables returns the correct list of tables in a database. 100 | """ 101 | # Create a temporary database 102 | with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: 103 | tmp_path = tmp.name 104 | 105 | try: 106 | # Set up a test database with multiple tables 107 | conn = sqlite3.connect(tmp_path) 108 | cursor = conn.cursor() 109 | cursor.execute("CREATE TABLE table1 (id INTEGER PRIMARY KEY, value TEXT)") 110 | cursor.execute("CREATE TABLE table2 (id INTEGER PRIMARY KEY, value TEXT)") 111 | conn.commit() 112 | conn.close() 113 | 114 | # Test list_tables 115 | result = await list_tables(connection_string=tmp_path) 116 | result_json = json.loads(result) 117 | 118 | # Check if both tables are listed 119 | assert "tables" in result_json 120 | assert "table1" in result_json["tables"] 121 | assert "table2" in result_json["tables"] 122 | finally: 123 | # Clean up the temporary file 124 | if os.path.exists(tmp_path): 125 | os.remove(tmp_path) 126 | 127 | @pytest.mark.asyncio 128 | async def test_describe_table(): 129 | """ 130 | Test that describe_table returns the correct schema for a table. 131 | """ 132 | # Create a temporary database 133 | with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: 134 | tmp_path = tmp.name 135 | 136 | try: 137 | # Set up a test database with a table that has various column types 138 | conn = sqlite3.connect(tmp_path) 139 | cursor = conn.cursor() 140 | cursor.execute(""" 141 | CREATE TABLE test_table ( 142 | id INTEGER PRIMARY KEY, 143 | name TEXT NOT NULL, 144 | age INTEGER, 145 | salary REAL, 146 | hire_date TEXT 147 | ) 148 | """) 149 | conn.commit() 150 | conn.close() 151 | 152 | # Test describe_table 153 | result = await describe_table(connection_string=tmp_path, table_name="test_table") 154 | result_json = json.loads(result) 155 | 156 | # Check if the schema is correct 157 | assert result_json["table"] == "test_table" 158 | 159 | # Check column details 160 | columns = {col["name"]: col for col in result_json["columns"]} 161 | 162 | assert "id" in columns 163 | assert columns["id"]["type"] == "INTEGER" 164 | assert columns["id"]["pk"] == 1 165 | 166 | assert "name" in columns 167 | assert columns["name"]["type"] == "TEXT" 168 | assert columns["name"]["notnull"] == 1 169 | 170 | assert "age" in columns 171 | assert columns["age"]["type"] == "INTEGER" 172 | 173 | assert "salary" in columns 174 | assert columns["salary"]["type"] == "REAL" 175 | 176 | assert "hire_date" in columns 177 | assert columns["hire_date"]["type"] == "TEXT" 178 | finally: 179 | # Clean up the temporary file 180 | if os.path.exists(tmp_path): 181 | os.remove(tmp_path) ``` -------------------------------------------------------------------------------- /sql_server/sql_example_run.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio 2 | import os 3 | import sys 4 | from pathlib import Path 5 | from dotenv import load_dotenv 6 | 7 | from camel.agents import ChatAgent 8 | from camel.models import ModelFactory 9 | from camel.toolkits import MCPToolkit 10 | from camel.types import ModelPlatformType 11 | from camel.toolkits.mcp_toolkit import _MCPServer 12 | 13 | # Load environment variables from .env file 14 | load_dotenv() 15 | 16 | # Set your Anthropic API key (ensure this is valid in your .env file) 17 | os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") 18 | 19 | # Create a sample database for demonstration 20 | async def create_sample_database(): 21 | """Create a sample SQLite database with some data for demonstration.""" 22 | import sqlite3 23 | 24 | # Create a temporary database in the current directory 25 | db_path = "sample.db" 26 | 27 | # If database already exists, remove it to start fresh 28 | if os.path.exists(db_path): 29 | os.remove(db_path) 30 | 31 | # Create the database and add sample tables and data 32 | conn = sqlite3.connect(db_path) 33 | cursor = conn.cursor() 34 | 35 | # Create employees table 36 | cursor.execute(""" 37 | CREATE TABLE employees ( 38 | id INTEGER PRIMARY KEY, 39 | name TEXT NOT NULL, 40 | department TEXT, 41 | salary REAL, 42 | hire_date TEXT 43 | ) 44 | """) 45 | 46 | # Insert sample employee data 47 | employees = [ 48 | (1, 'John Doe', 'Engineering', 85000.00, '2020-01-15'), 49 | (2, 'Jane Smith', 'Marketing', 75000.00, '2019-05-20'), 50 | (3, 'Bob Johnson', 'Engineering', 95000.00, '2018-11-10'), 51 | (4, 'Alice Brown', 'HR', 65000.00, '2021-03-05'), 52 | (5, 'Charlie Davis', 'Engineering', 90000.00, '2020-08-12') 53 | ] 54 | cursor.executemany("INSERT INTO employees VALUES (?, ?, ?, ?, ?)", employees) 55 | 56 | # Create departments table 57 | cursor.execute(""" 58 | CREATE TABLE departments ( 59 | id INTEGER PRIMARY KEY, 60 | name TEXT NOT NULL, 61 | budget REAL, 62 | location TEXT 63 | ) 64 | """) 65 | 66 | # Insert sample department data 67 | departments = [ 68 | (1, 'Engineering', 1000000.00, 'Building A'), 69 | (2, 'Marketing', 500000.00, 'Building B'), 70 | (3, 'HR', 300000.00, 'Building A'), 71 | (4, 'Finance', 600000.00, 'Building C') 72 | ] 73 | cursor.executemany("INSERT INTO departments VALUES (?, ?, ?, ?)", departments) 74 | 75 | # Commit changes and close connection 76 | conn.commit() 77 | conn.close() 78 | 79 | print(f"Sample database created at: {db_path}") 80 | return db_path 81 | 82 | # Interactive mode function to chat with the agent 83 | async def interactive_input_loop(agent: ChatAgent, db_path: str): 84 | loop = asyncio.get_event_loop() 85 | print("\n==== SQL Assistant Interactive Mode ====") 86 | print("Type 'exit' at any prompt to quit.") 87 | print(f"\nUsing sample database at: {db_path}") 88 | print("\nSample queries you can try:") 89 | print("- Show me all tables in sample.db") 90 | print("- What columns are in the employees table in sample.db?") 91 | print("- List all employees in the Engineering department") 92 | print("- What is the average salary by department?") 93 | print("- How many employees are in each department?") 94 | print("- Find the employee with the highest salary") 95 | print("- Add a new employee named Michael Wilson to Finance with salary 82000") 96 | print("======================================") 97 | 98 | while True: 99 | query = await loop.run_in_executor( 100 | None, 101 | input, 102 | "\nEnter your query (or type 'exit' to quit): " 103 | ) 104 | 105 | if query.lower() == 'exit': 106 | print("Exiting interactive mode.") 107 | break 108 | 109 | print("\nProcessing query...") 110 | response = await agent.astep(query) 111 | 112 | print("\nAgent Response:") 113 | if response.msgs and response.msgs[0].content: 114 | print(response.msgs[0].content.rstrip()) 115 | else: 116 | print("No output received.") 117 | 118 | # Main function to run the entire example 119 | async def main(server_transport: str = 'stdio'): 120 | # First create a sample database 121 | db_path = await create_sample_database() 122 | 123 | if server_transport == 'stdio': 124 | # Determine the path to the server file 125 | server_script_path = Path(__file__).resolve().parent / "sql_server_mcp.py" 126 | if not server_script_path.is_file(): 127 | print(f"Error: Server script not found at {server_script_path}") 128 | return 129 | 130 | # Create an _MCPServer instance for our SQL server 131 | server = _MCPServer( 132 | command_or_url=sys.executable, 133 | args=[str(server_script_path)] 134 | ) 135 | mcp_toolkit = MCPToolkit(servers=[server]) 136 | else: 137 | mcp_toolkit = MCPToolkit("tcp://localhost:5000") 138 | 139 | async with mcp_toolkit.connection() as toolkit: 140 | tools = toolkit.get_tools() 141 | sys_msg = ( 142 | "You are a helpful SQL assistant. Use the provided external tools for database operations. " 143 | "Always use the tools to query the database rather than answering from your general knowledge. " 144 | f"The sample database is at '{db_path}'. It contains tables for employees and departments. " 145 | "When a user asks a question about the database, ALWAYS explicitly include the database path " 146 | f"'{db_path}' in your tool calls. First list the tables to understand the schema, " 147 | "then use describe_table to see column details before querying." 148 | ) 149 | model = ModelFactory.create( 150 | model_platform=ModelPlatformType.ANTHROPIC, 151 | model_type="claude-3-7-sonnet-20250219", 152 | api_key=os.getenv("ANTHROPIC_API_KEY"), 153 | model_config_dict={"temperature": 0.5, "max_tokens": 4096}, 154 | ) 155 | camel_agent = ChatAgent( 156 | system_message=sys_msg, 157 | model=model, 158 | tools=tools, 159 | ) 160 | camel_agent.reset() 161 | camel_agent.memory.clear() 162 | await interactive_input_loop(camel_agent, db_path) 163 | 164 | # Clean up the sample database after we're done 165 | if os.path.exists(db_path): 166 | os.remove(db_path) 167 | print(f"\nRemoved sample database: {db_path}") 168 | 169 | # Entry point 170 | if __name__ == "__main__": 171 | asyncio.run(main()) 172 | ``` -------------------------------------------------------------------------------- /markitdown_camel/camel_markitdown_client.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio 2 | import os 3 | import sys 4 | from pathlib import Path 5 | from dotenv import load_dotenv 6 | 7 | from camel.agents import ChatAgent 8 | from camel.models import ModelFactory 9 | from camel.toolkits import MCPToolkit # camels implementation of the mcp protocol 10 | from camel.types import ModelPlatformType 11 | from camel.toolkits.mcp_toolkit import MCPClient 12 | 13 | 14 | # Ensure your Anthropic API key is set in your environment variables 15 | # os.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY" 16 | # you can also use your gemini api key here 17 | 18 | 19 | # Starting the interactive input loop for the camel ai client 20 | async def interactive_input_loop(agent: ChatAgent): 21 | loop = asyncio.get_event_loop() 22 | print("\nEntering interactive mode. Type 'exit' at any prompt to quit.") 23 | # exit conditions 24 | while True: 25 | uri = await loop.run_in_executor( 26 | None, 27 | input, 28 | "\nEnter the URI (http:, https:, file:, data:) to convert to Markdown (or type 'exit'): " 29 | ) 30 | uri = uri.strip() 31 | if uri.lower() == "exit": 32 | print("Exiting interactive mode.") 33 | break 34 | 35 | if not uri: 36 | print("URI cannot be empty.") 37 | continue 38 | 39 | # Prepend file:// scheme if it looks like a local absolute path 40 | if uri.startswith('/') and not uri.startswith('file://'): 41 | print(f"Detected local path, prepending 'file://' to URI: {uri}") 42 | formatted_uri = f"file://{uri}" 43 | else: 44 | formatted_uri = uri 45 | 46 | # The prompt clearly tells the agent which tool to use and what the parameter is. 47 | query = f"Use the convert_to_markdown tool to convert the content at the URI '{formatted_uri}' to Markdown. Do not generate an answer from your internal knowledge, just show the Markdown output from the tool." 48 | 49 | print(f"\nSending query to agent: {query}") 50 | response = await agent.astep(query) 51 | 52 | print("\nFull Agent Response Info:") 53 | print(response.info) # Shows tool calls and parameters 54 | 55 | # Check for direct message output first 56 | if response.msgs and response.msgs[0].content: 57 | print("\nAgent Output (Markdown):") 58 | print("-" * 20) 59 | print(response.msgs[0].content.rstrip()) 60 | print("-" * 20) 61 | # If no direct message, check if the tool call info is available in response.info 62 | elif 'tool_calls' in response.info and response.info['tool_calls']: 63 | print("\nTool Call Response (Raw from info):") 64 | print("-" * 20) 65 | found_output = False 66 | # Iterate through the tool calls list in response.info 67 | for tool_call in response.info['tool_calls']: 68 | # Camel AI structure might place output here (adjust key if needed based on ToolCallingRecord structure) 69 | if hasattr(tool_call, 'result') and tool_call.result: 70 | print(str(tool_call.result).rstrip()) 71 | found_output = True 72 | # Add other potential output locations if needed 73 | 74 | if not found_output: 75 | print("(No tool result found in tool call info)") 76 | print("-" * 20) 77 | else: 78 | print("No output message or tool output received.") 79 | 80 | 81 | # main funct 82 | async def main(server_transport: str = 'stdio'): 83 | if server_transport != 'stdio': 84 | print("Error: This client currently only supports 'stdio' transport.") 85 | return 86 | 87 | print("Starting MarkItDown MCP server in stdio mode...") 88 | server_command = sys.executable 89 | server_args = ["-m", "markitdown_mcp"] 90 | 91 | # Get the root directory of the script (assuming it's in the project root) 92 | project_root = Path(__file__).resolve().parent 93 | 94 | # Create an MCPClient instance, adding the cwd 95 | server = MCPClient( 96 | command_or_url=server_command, 97 | args=server_args, 98 | # Set the working directory for the server process 99 | env={"PYTHONPATH": str(project_root / "packages" / "markitdown-mcp" / "src") + os.pathsep + str(project_root / "packages" / "markitdown" / "src") + os.pathsep + os.environ.get("PYTHONPATH", ""), "CWD": str(project_root)}, 100 | # Optional: timeout=None 101 | ) 102 | 103 | # Pass the MCPClient object in a list 104 | mcp_toolkit = MCPToolkit(servers=[server]) 105 | 106 | print("Connecting to MCP server...") 107 | async with mcp_toolkit.connection() as toolkit: 108 | print("Connection successful. Retrieving tools...") 109 | tools = toolkit.get_tools() 110 | if not tools: 111 | print("Error: No tools retrieved from the server. Make sure the server started correctly and defined tools.") 112 | return 113 | print(f"Tools retrieved: {[tool.func.__name__ for tool in tools]}") 114 | 115 | # Check if the required tool is available using func.__name__ 116 | if not any(tool.func.__name__ == "convert_to_markdown" for tool in tools): 117 | print("Error: 'convert_to_markdown' tool not found on the server.") 118 | return 119 | 120 | sys_msg = ( 121 | "You are a helpful assistant. You have access to an external tool called 'convert_to_markdown' which takes a single argument, 'uri'. " 122 | "When asked to convert a URI to Markdown, you MUST use this tool by providing the URI to the 'uri' parameter. " 123 | "Provide ONLY the Markdown output received from the tool, without any additional explanation or introductory text." 124 | ) 125 | 126 | # Ensure GOOGLE_API_KEY is set in environment variables 127 | # print(f"DEBUG: Value of GOOGLE_API_KEY from os.getenv: {os.getenv('GOOGLE_API_KEY')}") 128 | api_key = os.getenv("GOOGLE_API_KEY") # Check for GOOGLE_API_KEY 129 | if not api_key: 130 | print("Error: GOOGLE_API_KEY environment variable not set.") # Update error message 131 | print("Please set it before running the client.") 132 | return 133 | 134 | # Configure the model for Google Gemini 135 | # You might need to install the camel-google extra: pip install camel-ai[google] 136 | try: 137 | model = ModelFactory.create( 138 | model_platform=ModelPlatformType.GEMINI, # Change platform 139 | # Set the desired Gemini model 140 | model_type="gemini-2.5-pro-preview-03-25", # Using 1.5 Pro as 2.5 is not yet a valid identifier in CAMEL AI 141 | api_key=api_key, 142 | model_config_dict={"temperature": 0.0, "max_tokens": 8192}, # Adjust config if needed 143 | ) 144 | except Exception as e: 145 | print(f"Error creating model: {e}") 146 | print("Ensure you have the necessary dependencies installed (e.g., `pip install camel-ai[google]`)") 147 | return 148 | 149 | camel_agent = ChatAgent( 150 | system_message=sys_msg, 151 | model=model, 152 | tools=tools, 153 | ) 154 | camel_agent.reset() 155 | camel_agent.memory.clear() 156 | 157 | await interactive_input_loop(camel_agent) 158 | 159 | if __name__ == "__main__": 160 | # This client only supports stdio for now 161 | asyncio.run(main(server_transport='stdio')) ``` -------------------------------------------------------------------------------- /markitdown_camel/streamlit_markitdown_app.py: -------------------------------------------------------------------------------- ```python 1 | import streamlit as st 2 | import os 3 | import tempfile 4 | from pathlib import Path 5 | import re 6 | import sys 7 | import logging # Import logging 8 | 9 | # --- Basic Logging Configuration --- # 10 | logging.basicConfig( 11 | level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL) 12 | format='%(asctime)s - %(levelname)s - %(message)s', 13 | stream=sys.stderr, # Ensure logs go to stderr (terminal) 14 | ) 15 | 16 | # --- Dynamically add local package path --- # 17 | # Calculate the path to the 'src' directory of the local 'markitdown' package 18 | _PROJECT_ROOT = Path(__file__).resolve().parent 19 | _MARKITDOWN_SRC_PATH = _PROJECT_ROOT / "packages" / "markitdown" / "src" 20 | 21 | # Add the path to sys.path if it's not already there 22 | if str(_MARKITDOWN_SRC_PATH) not in sys.path: 23 | sys.path.insert(0, str(_MARKITDOWN_SRC_PATH)) 24 | print(f"DEBUG: Added '{_MARKITDOWN_SRC_PATH}' to sys.path") 25 | 26 | # Attempt to import the core MarkItDown class 27 | try: 28 | from markitdown import MarkItDown 29 | from markitdown._exceptions import MarkItDownException 30 | except ImportError as e: 31 | logging.error(f"Failed to import markitdown: {e}", exc_info=True) 32 | st.error( 33 | f"Failed to import the `markitdown` library or its exceptions. " 34 | f"Error: {e}\n" 35 | "Please ensure it is installed correctly (e.g., `pip install -e ./packages/markitdown`) " 36 | f"and that the path '{_MARKITDOWN_SRC_PATH}' exists and is accessible." 37 | ) 38 | st.stop() 39 | 40 | # --- Page Configuration --- 41 | st.set_page_config( 42 | page_title="MarkItDown Converter", 43 | page_icon=":memo:", 44 | layout="wide", 45 | ) 46 | 47 | # --- Session State Initialization --- 48 | if 'markdown_output' not in st.session_state: 49 | st.session_state.markdown_output = None 50 | if 'error_message' not in st.session_state: 51 | st.session_state.error_message = None 52 | if 'input_uri' not in st.session_state: 53 | st.session_state.input_uri = "" 54 | 55 | # --- Helper Functions --- 56 | def is_valid_uri_scheme(uri): 57 | """Basic check for supported URI schemes.""" 58 | return uri.startswith(("http://", "https://", "file://", "data:")) 59 | 60 | def clean_filename(filename): 61 | """Remove invalid characters for filenames.""" 62 | # Remove URL scheme if present 63 | name = re.sub(r'^(http|https|file|data):[\/]*', '', filename) 64 | # Replace problematic characters 65 | name = re.sub(r'[\/:*?\"<>|%#&.]+', '_', name) 66 | # Limit length 67 | return name[:100] or "converted" 68 | 69 | # --- UI Layout --- 70 | st.title("📝 MarkItDown Content Converter") 71 | st.markdown( 72 | "Convert content from various sources (URLs or uploaded files) into Markdown." 73 | ) 74 | st.divider() 75 | 76 | # --- Input Method Selection --- 77 | input_method = st.radio( 78 | "Select Input Method:", 79 | ("Enter URL", "Upload File"), 80 | horizontal=True, 81 | key="input_method_radio", 82 | help="Choose whether to provide a web URL or upload a local file." 83 | ) 84 | 85 | col1, col2 = st.columns([3, 1]) 86 | 87 | with col1: 88 | if input_method == "Enter URL": 89 | st.session_state.input_uri = st.text_input( 90 | "Enter URL (http://, https://, data:)", 91 | value=st.session_state.input_uri, 92 | placeholder="e.g., https://www.example.com or data:text/plain;base64,...", 93 | key="url_input", 94 | ) 95 | uploaded_file = None 96 | else: # Upload File 97 | uploaded_file = st.file_uploader( 98 | "Upload a file (will be converted to a `file://` URI)", 99 | type=None, # Allow any file type MarkItDown might support 100 | key="file_uploader", 101 | ) 102 | st.session_state.input_uri = "" # Clear URI input if file is chosen 103 | 104 | with col2: 105 | st.markdown(" ", unsafe_allow_html=True) # Vertical alignment hack 106 | st.markdown(" ", unsafe_allow_html=True) 107 | convert_button = st.button("Convert to Markdown", type="primary", use_container_width=True) 108 | 109 | # --- Conversion Logic --- 110 | if convert_button: 111 | logging.info("'Convert to Markdown' button clicked.") 112 | st.session_state.markdown_output = None # Clear previous output 113 | st.session_state.error_message = None # Clear previous error 114 | final_uri = None 115 | tmp_file_path = None # Ensure tmp_file_path is defined 116 | 117 | if uploaded_file is not None: 118 | try: 119 | # Save uploaded file temporarily to get a path 120 | # Use a consistent way to create temp files 121 | with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file: 122 | tmp_file.write(uploaded_file.getvalue()) 123 | tmp_file_path = tmp_file.name 124 | logging.info(f"Uploaded file '{uploaded_file.name}' saved to temporary path: {tmp_file_path}") 125 | final_uri = Path(tmp_file_path).as_uri() 126 | logging.info(f"Processing uploaded file as URI: {final_uri}") 127 | st.info(f"Processing uploaded file as: {final_uri}") 128 | except Exception as e: 129 | logging.error(f"Error saving uploaded file: {e}", exc_info=True) 130 | st.error(f"Error handling uploaded file: {e}") 131 | final_uri = None # Prevent further processing 132 | 133 | elif st.session_state.input_uri: 134 | final_uri = st.session_state.input_uri.strip() 135 | if not is_valid_uri_scheme(final_uri): 136 | # Attempt to fix common local path issue 137 | if final_uri.startswith('/'): 138 | st.warning(f"Assuming '{final_uri}' is a local path. Prepending 'file://'.") 139 | final_uri = f"file://{final_uri}" 140 | else: 141 | st.error(f"Invalid or unsupported URI scheme in '{final_uri}'. Must start with http://, https://, file://, or data:") 142 | final_uri = None 143 | logging.info(f"Processing input URI: {final_uri}") # Log the final URI used 144 | 145 | else: 146 | logging.warning("Conversion attempt with no input URI or file.") 147 | st.warning("Please enter a URL or upload a file.") 148 | 149 | if final_uri: 150 | progress_bar = st.progress(0, text="Starting conversion...") 151 | try: 152 | logging.info(f"Initializing MarkItDown converter for URI: {final_uri}") 153 | md_converter = MarkItDown() 154 | progress_bar.progress(25, text=f"Converting URI: {final_uri[:100]}...") 155 | 156 | # Perform the conversion 157 | logging.info(f"Calling convert_uri for: {final_uri}") 158 | result = md_converter.convert_uri(final_uri) 159 | logging.info(f"convert_uri successful for: {final_uri}") 160 | 161 | progress_bar.progress(100, text="Conversion successful!") 162 | st.session_state.markdown_output = result.markdown 163 | st.success("Content successfully converted to Markdown!") 164 | 165 | except MarkItDownException as e: 166 | logging.error(f"MarkItDown Conversion Error for URI {final_uri}: {e}", exc_info=True) 167 | st.session_state.error_message = f"MarkItDown Conversion Error: {e}" 168 | st.error(st.session_state.error_message) 169 | progress_bar.progress(100, text="Conversion failed.") 170 | except Exception as e: 171 | logging.error(f"Unexpected Error during conversion for URI {final_uri}: {e}", exc_info=True) 172 | st.session_state.error_message = f"An unexpected error occurred: {e}" 173 | st.error(st.session_state.error_message) 174 | progress_bar.progress(100, text="Conversion failed.") 175 | finally: 176 | # Clean up temporary file if created 177 | if tmp_file_path and os.path.exists(tmp_file_path): 178 | try: 179 | os.remove(tmp_file_path) 180 | logging.info(f"Cleaned up temporary file: {tmp_file_path}") 181 | except Exception as e: 182 | logging.error(f"Error removing temporary file {tmp_file_path}: {e}", exc_info=True) 183 | # Remove progress bar after completion/error 184 | progress_bar.empty() 185 | 186 | st.divider() 187 | 188 | # --- Output Display and Download --- 189 | st.subheader("Output") 190 | 191 | if st.session_state.error_message and not st.session_state.markdown_output: 192 | # Show error prominently if conversion failed and there's no output 193 | st.error(st.session_state.error_message) 194 | elif st.session_state.markdown_output: 195 | st.text_area( 196 | "Generated Markdown", 197 | value=st.session_state.markdown_output, 198 | height=400, 199 | key="markdown_display", 200 | help="The Markdown generated from the source." 201 | ) 202 | 203 | # Determine a sensible filename 204 | download_filename_base = "converted_markdown" 205 | if st.session_state.input_uri: 206 | download_filename_base = clean_filename(st.session_state.input_uri) 207 | elif uploaded_file: 208 | download_filename_base = clean_filename(uploaded_file.name) 209 | 210 | st.download_button( 211 | label="Download Markdown (.md)", 212 | data=st.session_state.markdown_output, 213 | file_name=f"{download_filename_base}.md", 214 | mime="text/markdown", 215 | key="download_button", 216 | use_container_width=True, 217 | ) 218 | else: 219 | st.info("Enter a URL or upload a file and click 'Convert to Markdown' to see the output here.") ``` -------------------------------------------------------------------------------- /sql_server/sql_server_mcp.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 3 | # Licensed under the Apache License, Version 2.0 (the "License"); 4 | # you may not use this file except in compliance with the License. 5 | # You may obtain a copy of the License at 6 | # 7 | # http://www.apache.org/licenses/LICENSE-2.0 8 | # 9 | # Unless required by applicable law or agreed to in writing, software 10 | # distributed under the License is distributed on an "AS IS" BASIS, 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | # See the License for the specific language governing permissions and 13 | # limitations under the License. 14 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 15 | 16 | import os 17 | import asyncio # noqa: F401 18 | import sqlite3 19 | import json 20 | import re # Added for table name validation 21 | from mcp.server.fastmcp import FastMCP 22 | from camel.logger import get_logger 23 | from pathlib import Path 24 | 25 | logger = get_logger(__name__) 26 | mcp = FastMCP("sqldb") 27 | 28 | @mcp.tool() 29 | async def execute_query(connection_string: str, query: str) -> str: 30 | r"""Executes the SQL query on the given database. 31 | Args: 32 | connection_string (str): The connection string or path to the SQLite database. 33 | query (str): The SQL query to execute. 34 | Returns: 35 | str: The result of the query as a JSON string, or an error message if execution fails. 36 | """ 37 | logger.info(f"execute_query triggered with connection_string: {connection_string}") 38 | # For security reasons, don't log the full query in production 39 | logger.info(f"Query starts with: {query[:20]}...") 40 | 41 | conn = None 42 | try: 43 | conn = sqlite3.connect(connection_string) 44 | cursor = conn.cursor() 45 | 46 | # Execute the query 47 | cursor.execute(query) 48 | 49 | # Check if this is a SELECT query (has results to fetch) 50 | if query.strip().upper().startswith("SELECT"): 51 | # Get column names from cursor description 52 | columns = [desc[0] for desc in cursor.description] 53 | 54 | # Fetch results and format as a list of dictionaries 55 | results = [] 56 | for row in cursor.fetchall(): 57 | results.append(dict(zip(columns, row))) 58 | 59 | return json.dumps({"status": "success", "data": results}, indent=2) 60 | else: 61 | # For INSERT, UPDATE, DELETE, etc. 62 | conn.commit() 63 | affected_rows = cursor.rowcount 64 | return json.dumps({"status": "success", "affected_rows": affected_rows}, indent=2) 65 | 66 | except Exception as e: 67 | return json.dumps({"status": "error", "message": f"Error executing SQL query: {str(e)}"}, indent=2) 68 | finally: 69 | if conn: 70 | conn.close() 71 | 72 | execute_query.inputSchema = { 73 | "type": "object", 74 | "properties": { 75 | "connection_string": { 76 | "type": "string", 77 | "title": "Connection String", 78 | "description": "The connection string or path to the SQLite database." 79 | }, 80 | "query": { 81 | "type": "string", 82 | "title": "SQL Query", 83 | "description": "The SQL query to execute." 84 | } 85 | }, 86 | "required": ["connection_string", "query"] 87 | } 88 | 89 | @mcp.tool() 90 | async def list_tables(connection_string: str) -> str: 91 | r"""Lists all tables in the specified database. 92 | Args: 93 | connection_string (str): The connection string or path to the SQLite database. 94 | Returns: 95 | str: A JSON string containing the list of tables, or an error message if listing fails. 96 | """ 97 | logger.info(f"list_tables triggered with connection_string: {connection_string}") 98 | 99 | conn = None 100 | try: 101 | conn = sqlite3.connect(connection_string) 102 | cursor = conn.cursor() 103 | 104 | # Query to get all table names in SQLite 105 | cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") 106 | 107 | # Fetch and format results 108 | tables = [row[0] for row in cursor.fetchall()] 109 | return json.dumps({"status": "success", "tables": tables}, indent=2) 110 | 111 | except Exception as e: 112 | return json.dumps({"status": "error", "message": f"Error listing tables: {str(e)}"}, indent=2) 113 | finally: 114 | if conn: 115 | conn.close() 116 | 117 | list_tables.inputSchema = { 118 | "type": "object", 119 | "properties": { 120 | "connection_string": { 121 | "type": "string", 122 | "title": "Connection String", 123 | "description": "The connection string or path to the SQLite database." 124 | } 125 | }, 126 | "required": ["connection_string"] 127 | } 128 | 129 | @mcp.tool() 130 | async def create_database(db_path: str) -> str: 131 | r"""Creates a new SQLite database at the specified path. 132 | Args: 133 | db_path (str): The path where the new database should be created. 134 | Returns: 135 | str: A success message or an error message if creation fails. 136 | """ 137 | logger.info(f"create_database triggered with db_path: {db_path}") 138 | 139 | conn = None 140 | try: 141 | # Check if file already exists 142 | if os.path.exists(db_path): 143 | return json.dumps({"status": "exists", "message": f"Database already exists at {db_path}"}, indent=2) 144 | 145 | conn = sqlite3.connect(db_path) 146 | conn.close() 147 | 148 | return json.dumps({"status": "success", "message": f"Database created at {db_path}"}, indent=2) 149 | 150 | except Exception as e: 151 | return json.dumps({"status": "error", "message": f"Error creating database: {str(e)}"}, indent=2) 152 | finally: 153 | if conn: 154 | conn.close() 155 | 156 | create_database.inputSchema = { 157 | "type": "object", 158 | "properties": { 159 | "db_path": { 160 | "type": "string", 161 | "title": "Database Path", 162 | "description": "The path where the new SQLite database should be created." 163 | } 164 | }, 165 | "required": ["db_path"] 166 | } 167 | 168 | @mcp.tool() 169 | async def describe_table(connection_string: str, table_name: str) -> str: 170 | r"""Describes the schema of a specified table. 171 | Args: 172 | connection_string (str): The connection string or path to the SQLite database. 173 | table_name (str): The name of the table to describe. 174 | Returns: 175 | str: A JSON string containing the table schema, or an error message if the operation fails. 176 | """ 177 | logger.info(f"describe_table triggered with connection_string: {connection_string}, table_name: {table_name}") 178 | 179 | # Validate table_name to be a simple identifier to reduce SQL injection risk with PRAGMA 180 | if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): 181 | return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) 182 | 183 | conn = None 184 | try: 185 | conn = sqlite3.connect(connection_string) 186 | cursor = conn.cursor() 187 | 188 | # PRAGMA statements do not support placeholders for table names. 189 | cursor.execute(f"PRAGMA table_info({table_name});") 190 | 191 | columns = [] 192 | for row in cursor.fetchall(): 193 | columns.append({ 194 | "cid": row[0], 195 | "name": row[1], 196 | "type": row[2], 197 | "notnull": row[3], 198 | "default_value": row[4], 199 | "pk": row[5] 200 | }) 201 | 202 | if not columns: 203 | return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found or has no columns."}, indent=2) 204 | return json.dumps({"status": "success", "table": table_name, "columns": columns}, indent=2) 205 | 206 | except Exception as e: 207 | return json.dumps({"status": "error", "message": f"Error describing table '{table_name}': {str(e)}"}, indent=2) 208 | finally: 209 | if conn: 210 | conn.close() 211 | 212 | describe_table.inputSchema = { 213 | "type": "object", 214 | "properties": { 215 | "connection_string": { 216 | "type": "string", 217 | "title": "Connection String", 218 | "description": "The connection string or path to the SQLite database." 219 | }, 220 | "table_name": { 221 | "type": "string", 222 | "title": "Table Name", 223 | "description": "The name of the table to describe." 224 | } 225 | }, 226 | "required": ["connection_string", "table_name"] 227 | } 228 | 229 | @mcp.tool() 230 | async def delete_database(db_path: str) -> str: 231 | r"""Deletes an existing SQLite database file at the specified path. 232 | Args: 233 | db_path (str): The path to the SQLite database file to be deleted. 234 | Returns: 235 | str: A JSON string indicating success or an error message if deletion fails. 236 | """ 237 | logger.info(f"delete_database triggered with db_path: {db_path}") 238 | 239 | try: 240 | if not os.path.exists(db_path): 241 | return json.dumps({"status": "not_found", "message": f"Database file not found at {db_path}"}, indent=2) 242 | 243 | os.remove(db_path) 244 | return json.dumps({"status": "success", "message": f"Database deleted from {db_path}"}, indent=2) 245 | 246 | except Exception as e: 247 | return json.dumps({"status": "error", "message": f"Error deleting database: {str(e)}"}, indent=2) 248 | 249 | delete_database.inputSchema = { 250 | "type": "object", 251 | "properties": { 252 | "db_path": { 253 | "type": "string", 254 | "title": "Database Path", 255 | "description": "The path to the SQLite database file to be deleted." 256 | } 257 | }, 258 | "required": ["db_path"] 259 | } 260 | 261 | @mcp.tool() 262 | async def delete_table(connection_string: str, table_name: str) -> str: 263 | r"""Deletes a specified table from the SQLite database. 264 | Args: 265 | connection_string (str): The connection string or path to the SQLite database. 266 | table_name (str): The name of the table to delete. 267 | Returns: 268 | str: A JSON string indicating success or failure. 269 | """ 270 | logger.info(f"delete_table triggered for table: {table_name} in db: {connection_string}") 271 | 272 | if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): 273 | return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) 274 | 275 | conn = None 276 | try: 277 | conn = sqlite3.connect(connection_string) 278 | cursor = conn.cursor() 279 | 280 | # Check if table exists first for a more specific message 281 | cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,)) 282 | if cursor.fetchone() is None: 283 | return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found. No action taken."}, indent=2) 284 | 285 | # Table name is validated, safe to use in f-string for DROP TABLE 286 | cursor.execute(f"DROP TABLE {table_name};") 287 | conn.commit() 288 | 289 | return json.dumps({"status": "success", "message": f"Table '{table_name}' deleted successfully."}, indent=2) 290 | 291 | except Exception as e: 292 | return json.dumps({"status": "error", "message": f"Error deleting table '{table_name}': {str(e)}"}, indent=2) 293 | finally: 294 | if conn: 295 | conn.close() 296 | 297 | delete_table.inputSchema = { 298 | "type": "object", 299 | "properties": { 300 | "connection_string": { 301 | "type": "string", 302 | "title": "Connection String", 303 | "description": "The connection string or path to the SQLite database." 304 | }, 305 | "table_name": { 306 | "type": "string", 307 | "title": "Table Name", 308 | "description": "The name of the table to delete." 309 | } 310 | }, 311 | "required": ["connection_string", "table_name"] 312 | } 313 | 314 | @mcp.tool() 315 | async def get_table_row_count(connection_string: str, table_name: str) -> str: 316 | r"""Gets the row count for a specified table. 317 | Args: 318 | connection_string (str): The connection string or path to the SQLite database. 319 | table_name (str): The name of the table. 320 | Returns: 321 | str: A JSON string containing the table name and its row count, or an error message. 322 | """ 323 | logger.info(f"get_table_row_count triggered for table: {table_name} in db: {connection_string}") 324 | 325 | if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): 326 | return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) 327 | 328 | conn = None 329 | try: 330 | conn = sqlite3.connect(connection_string) 331 | cursor = conn.cursor() 332 | 333 | # Table name is validated, safe to use in f-string for SELECT COUNT(*) 334 | cursor.execute(f"SELECT COUNT(*) FROM {table_name};") 335 | count = cursor.fetchone()[0] 336 | 337 | return json.dumps({"status": "success", "table": table_name, "row_count": count}, indent=2) 338 | 339 | except sqlite3.OperationalError as e: # Catches errors like "no such table" 340 | return json.dumps({"status": "error", "message": f"Error getting row count for table '{table_name}': {str(e)}. Ensure table exists."}, indent=2) 341 | except Exception as e: 342 | return json.dumps({"status": "error", "message": f"An unexpected error occurred while getting row count for table '{table_name}': {str(e)}"}, indent=2) 343 | finally: 344 | if conn: 345 | conn.close() 346 | 347 | get_table_row_count.inputSchema = { 348 | "type": "object", 349 | "properties": { 350 | "connection_string": { 351 | "type": "string", 352 | "title": "Connection String", 353 | "description": "The connection string or path to the SQLite database." 354 | }, 355 | "table_name": { 356 | "type": "string", 357 | "title": "Table Name", 358 | "description": "The name of the table to get row count for." 359 | } 360 | }, 361 | "required": ["connection_string", "table_name"] 362 | } 363 | 364 | async def init_db(): 365 | db_path = Path.cwd() / "your_sqlite_database.db" 366 | conn = sqlite3.connect(str(db_path)) 367 | cursor = conn.cursor() 368 | 369 | # Create a sample table 370 | cursor.execute(''' 371 | CREATE TABLE IF NOT EXISTS sample_table ( 372 | id INTEGER PRIMARY KEY, 373 | name TEXT NOT NULL 374 | ) 375 | ''') 376 | 377 | conn.commit() 378 | conn.close() 379 | 380 | async def main(): 381 | await init_db() 382 | 383 | # Keep the server running 384 | while True: 385 | await asyncio.sleep(1) 386 | 387 | def main(transport: str = "stdio"): 388 | r"""Runs the SQL MCP Server. 389 | This server provides SQL database functionalities via MCP. 390 | Args: 391 | transport (str): The transport mode ('stdio' or 'sse'). 392 | """ 393 | if transport == 'stdio': 394 | mcp.run(transport='stdio') 395 | elif transport == 'sse': 396 | mcp.run(transport='sse') 397 | else: 398 | print(f"Unknown transport mode: {transport}") 399 | 400 | if __name__ == "__main__": 401 | import sys 402 | transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio" 403 | asyncio.run(main(transport_mode)) ``` -------------------------------------------------------------------------------- /google_forms_mcp/google_forms_example_run.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 3 | # Licensed under the Apache License, Version 2.0 (the "License"); 4 | # you may not use this file except in compliance with the License. 5 | # You may obtain a copy of the License at 6 | # 7 | # http://www.apache.org/licenses/LICENSE-2.0 8 | # 9 | # Unless required by applicable law or agreed to in writing, software 10 | # distributed under the License is distributed on an "AS IS" BASIS, 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | # See the License for the specific language governing permissions and 13 | # limitations under the License. 14 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 15 | 16 | """ 17 | Example run script for the Google Forms MCP Server 18 | 19 | This script demonstrates creating and managing Google Forms through the MCP Server. 20 | It performs a series of operations that test all the functionality of the server. 21 | """ 22 | 23 | import asyncio 24 | import os 25 | import sys 26 | import json 27 | from pathlib import Path 28 | from dotenv import load_dotenv 29 | import argparse 30 | from typing import Optional 31 | 32 | from camel.agents import ChatAgent 33 | from camel.models import ModelFactory 34 | from camel.toolkits import MCPToolkit 35 | from camel.types import ModelPlatformType 36 | 37 | # Load environment variables from .env file 38 | load_dotenv() 39 | 40 | # Set your Anthropic API key (ensure this is valid in your .env file) 41 | os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") 42 | 43 | # Check if the necessary credentials file exists 44 | def check_credentials(): 45 | if not os.path.exists('credentials.json'): 46 | print("Error: credentials.json not found!") 47 | print("Please follow these steps to set up Google Forms API access:") 48 | print("1. Go to https://console.developers.google.com/") 49 | print("2. Create a new project or select an existing one") 50 | print("3. Enable the Google Forms API, Google Drive API, and Google Sheets API") 51 | print("4. Create credentials (OAuth client ID) for a desktop application") 52 | print("5. Download the credentials JSON file and save it as 'credentials.json' in this directory") 53 | return False 54 | return True 55 | 56 | # Function to print nicely formatted JSON responses 57 | def print_response(title, response): 58 | try: 59 | json_data = json.loads(response) 60 | print(f"\n{title}:") 61 | print(json.dumps(json_data, indent=2)) 62 | except: 63 | print(f"\n{title}:") 64 | print(response) 65 | 66 | # Automated test run that demonstrates all features 67 | async def run_automated_test(tools): 68 | print("\n==== Google Forms MCP Server Automated Test ====") 69 | print("This test will demonstrate all functionalities of the Google Forms MCP Server") 70 | 71 | try: 72 | # Step 1: Create a new form 73 | print("\n--- Step 1: Creating a new form ---") 74 | create_form_response = await tools["create_form"]( 75 | title="Customer Satisfaction Survey", 76 | description="Help us improve our services by providing your feedback" 77 | ) 78 | print_response("Form created", create_form_response) 79 | 80 | # Extract form ID for further operations 81 | form_data = json.loads(create_form_response) 82 | form_id = form_data["form_id"] 83 | 84 | # Step 2: Modify form settings 85 | print("\n--- Step 2: Modifying form settings ---") 86 | settings_response = await tools["modify_form_settings"]( 87 | form_id=form_id, 88 | collect_email=True, 89 | limit_responses=True 90 | ) 91 | print_response("Form settings updated", settings_response) 92 | 93 | # Step 3: Add a section 94 | print("\n--- Step 3: Adding a section ---") 95 | section_response = await tools["add_section"]( 96 | form_id=form_id, 97 | title="About Your Experience", 98 | description="Please tell us about your recent experience with our product/service" 99 | ) 100 | print_response("Section added", section_response) 101 | 102 | # Step 4: Add multiple choice question 103 | print("\n--- Step 4: Adding a multiple choice question ---") 104 | mc_response = await tools["add_multiple_choice"]( 105 | form_id=form_id, 106 | question_text="How would you rate our service?", 107 | choices=["Excellent", "Good", "Average", "Poor", "Very Poor"], 108 | required=True, 109 | help_text="Please select one option" 110 | ) 111 | print_response("Multiple choice question added", mc_response) 112 | 113 | # Step 5: Add a checkbox question 114 | print("\n--- Step 5: Adding a checkbox question ---") 115 | checkbox_response = await tools["add_checkboxes"]( 116 | form_id=form_id, 117 | question_text="Which aspects of our service did you appreciate?", 118 | choices=["Responsiveness", "Quality", "Value for money", "Customer support", "Other"], 119 | required=False, 120 | help_text="Select all that apply" 121 | ) 122 | print_response("Checkbox question added", checkbox_response) 123 | 124 | # Step 6: Add a dropdown question 125 | print("\n--- Step 6: Adding a dropdown question ---") 126 | dropdown_response = await tools["add_dropdown"]( 127 | form_id=form_id, 128 | question_text="How often do you use our service?", 129 | choices=["Daily", "Weekly", "Monthly", "Quarterly", "Yearly", "First time"], 130 | required=True 131 | ) 132 | print_response("Dropdown question added", dropdown_response) 133 | 134 | # Step 7: Add a short answer question 135 | print("\n--- Step 7: Adding a short answer question ---") 136 | short_answer_response = await tools["add_short_answer"]( 137 | form_id=form_id, 138 | question_text="What is your customer ID?", 139 | required=False, 140 | help_text="Please enter your customer ID if you have one" 141 | ) 142 | print_response("Short answer question added", short_answer_response) 143 | 144 | # Step 8: Add a paragraph question 145 | print("\n--- Step 8: Adding a paragraph question ---") 146 | paragraph_response = await tools["add_paragraph"]( 147 | form_id=form_id, 148 | question_text="Do you have any suggestions for improvement?", 149 | required=False, 150 | help_text="Please share any ideas on how we can serve you better" 151 | ) 152 | print_response("Paragraph question added", paragraph_response) 153 | 154 | # Step 9: Add a file upload question 155 | print("\n--- Step 9: Adding a file upload question ---") 156 | file_upload_response = await tools["add_file_upload"]( 157 | form_id=form_id, 158 | question_text="Would you like to upload any relevant documents?", 159 | required=False, 160 | help_text="You can upload screenshots or other documents" 161 | ) 162 | print_response("File upload question added", file_upload_response) 163 | 164 | # Step 10: Add another section 165 | print("\n--- Step 10: Adding another section ---") 166 | section2_response = await tools["add_section"]( 167 | form_id=form_id, 168 | title="Additional Information", 169 | description="Help us personalize our services" 170 | ) 171 | print_response("Another section added", section2_response) 172 | 173 | # Step 11: List all forms 174 | print("\n--- Step 11: Listing all forms ---") 175 | list_forms_response = await tools["list_forms"]() 176 | print_response("Forms list", list_forms_response) 177 | 178 | # Step 12: Export responses (might not have any responses yet) 179 | print("\n--- Step 12: Setting up response export ---") 180 | export_response = await tools["export_responses"]( 181 | form_id=form_id, 182 | format="sheets" 183 | ) 184 | print_response("Export setup", export_response) 185 | 186 | # Step 13: Try to get responses (likely empty, but tests the functionality) 187 | print("\n--- Step 13: Getting responses ---") 188 | responses = await tools["get_responses"]( 189 | form_id=form_id 190 | ) 191 | print_response("Form responses", responses) 192 | 193 | print("\n=== Test completed successfully! ===") 194 | print(f"Created form can be viewed at: {form_data['view_url']}") 195 | print(f"Created form can be edited at: {form_data['edit_url']}") 196 | 197 | except Exception as e: 198 | print(f"\nError during test: {str(e)}") 199 | raise 200 | 201 | # Interactive mode function to chat with the agent 202 | async def interactive_input_loop(agent: ChatAgent): 203 | loop = asyncio.get_event_loop() 204 | print("\n==== Google Forms Assistant Interactive Mode ====") 205 | print("Type 'exit' at any prompt to quit.") 206 | print("\nSample queries you can try:") 207 | print("- Create a new feedback form") 208 | print("- Add a customer satisfaction survey with multiple choice questions") 209 | print("- List all my forms") 210 | print("- Create a job application form with sections for personal info, education, and experience") 211 | print("- Export responses from a form to CSV") 212 | print("======================================") 213 | 214 | while True: 215 | query = await loop.run_in_executor( 216 | None, 217 | input, 218 | "\nEnter your query (or type 'exit' to quit): " 219 | ) 220 | 221 | if query.lower() == 'exit': 222 | print("Exiting interactive mode.") 223 | break 224 | 225 | print("\nProcessing query...") 226 | response = await agent.astep(query) 227 | 228 | print("\nAgent Response:") 229 | if response.msgs and response.msgs[0].content: 230 | print(response.msgs[0].content.rstrip()) 231 | else: 232 | print("No output received.") 233 | 234 | # Main function to run the entire example 235 | async def main(server_transport: str = 'stdio', mode: str = 'automated', 236 | server_url: Optional[str] = None): 237 | # First check if credentials exist 238 | if not check_credentials(): 239 | return 240 | 241 | mcp_toolkit = None 242 | server_process = None 243 | 244 | try: # Wrap setup and execution in try 245 | # Configure based on transport type 246 | if server_transport == 'stdio': 247 | # Original stdio logic (may still deadlock, but kept for reference) 248 | print("Using stdio transport (Note: May cause deadlock during auth)") 249 | current_dir = Path(__file__).resolve().parent 250 | server_script_path = current_dir / "google_forms_server_mcp.py" 251 | 252 | print(f"Looking for server script at: {server_script_path}") 253 | print(f"Directory contents: {[f.name for f in current_dir.iterdir() if f.is_file()]}") 254 | 255 | if not server_script_path.is_file(): 256 | print(f"Error: Server script not found at {server_script_path}") 257 | return 258 | 259 | # Use the _MCPServer helper for stdio 260 | from camel.toolkits.mcp_toolkit import _MCPServer # Import locally 261 | server_process = _MCPServer([sys.executable, str(server_script_path), "stdio"]) 262 | await server_process.start() 263 | mcp_toolkit = MCPToolkit(mcp_server_process=server_process) 264 | print("MCP Server started via stdio.") 265 | 266 | elif server_transport == 'sse': 267 | # SSE logic: Connect to an existing HTTP/SSE server 268 | if not server_url: 269 | print("Error: --server-url is required for SSE transport.") 270 | print("Example: python google_forms_example_run.py --transport sse --server-url http://127.0.0.1:8000") 271 | return 272 | 273 | print(f"Connecting to MCP Server via SSE at: {server_url}") 274 | mcp_toolkit = MCPToolkit(servers=[server_url]) 275 | # Move the connection test inside the 'async with' block 276 | 277 | else: 278 | print(f"Error: Unsupported server transport: {server_transport}") 279 | return 280 | 281 | # Initialize the LLM model 282 | # Reverting to model_config_dict based on user example, and adding api_key explicitly 283 | anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") 284 | if not anthropic_api_key: 285 | print("Error: ANTHROPIC_API_KEY not found in environment variables or .env file.") 286 | # Decide how to handle missing key - raise error or return 287 | return # Or raise ValueError("Missing Anthropic API Key") 288 | 289 | model = ModelFactory.create( 290 | model_platform=ModelPlatformType.ANTHROPIC, 291 | model_type="claude-3-haiku-20240307", # Use a suitable Anthropic model 292 | # temperature=0.0 # Replaced by model_config_dict 293 | api_key=anthropic_api_key, # Explicitly pass API key 294 | model_config_dict={"temperature": 0.0} # Use the dict from user example 295 | ) 296 | 297 | # Main execution block within the try 298 | async with mcp_toolkit.connection() as toolkit: 299 | # Test connection *after* establishing context 300 | try: 301 | await toolkit.list_tools() # Use toolkit here 302 | print("Successfully connected to WebSocket MCP server and listed tools.") 303 | except Exception as e: 304 | print(f"Error testing connection/listing tools with server at {server_url}: {e}") 305 | print("Please ensure the server is running correctly.") 306 | print(f"Run: python google_forms_server_mcp.py --transport sse") 307 | return # Exit if connection test fails 308 | 309 | print("\nInitializing ChatAgent...") 310 | # Initialize ChatAgent with the MCP toolkit 311 | camel_agent = ChatAgent( 312 | model=model, 313 | tools=toolkit.get_tools(), 314 | verbose=True 315 | ) 316 | print("ChatAgent initialized.") 317 | 318 | # Choose mode: automated test or interactive loop 319 | if mode == 'automated': 320 | await run_automated_test(toolkit.get_tools()) 321 | elif mode == 'interactive': 322 | await interactive_input_loop(camel_agent) 323 | 324 | finally: # Finally block associated with the outer try 325 | print("\nCleaning up...") 326 | # Clean up resources only if server was started by this script (stdio mode) 327 | if server_process: 328 | print("Stopping stdio MCP server process...") 329 | try: 330 | await server_process.stop() 331 | print("Server process stopped.") 332 | except Exception as e: 333 | print(f"Error stopping server process: {e}") 334 | # For WebSocket, we assume the server runs independently 335 | print("Cleanup complete.") 336 | 337 | # Add argument parsing for command-line execution (this remains outside the main function) 338 | if __name__ == "__main__": 339 | parser = argparse.ArgumentParser(description="Google Forms MCP Example Runner") 340 | parser.add_argument( 341 | "--mode", 342 | default="automated", 343 | choices=["automated", "interactive"], 344 | help="Run mode (default: automated)" 345 | ) 346 | parser.add_argument( 347 | "--transport", 348 | default="sse", # Default to SSE for client too 349 | choices=["stdio", "sse"], # Only allow stdio or sse 350 | help="Server transport method (default: sse)" 351 | ) 352 | parser.add_argument( 353 | "--server-url", 354 | default="http://127.0.0.1:8000", # Default SSE URL (HTTP) 355 | help="URL of the running MCP SSE server (required for sse transport)" 356 | ) 357 | args = parser.parse_args() 358 | 359 | try: 360 | asyncio.run(main(server_transport=args.transport, mode=args.mode, server_url=args.server_url)) 361 | except KeyboardInterrupt: 362 | print("\nExecution interrupted by user.") ``` -------------------------------------------------------------------------------- /google_forms_mcp/test_google_forms_server_mcp.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio # noqa: F401 2 | import os 3 | import json 4 | import pytest 5 | from unittest.mock import Mock, patch, MagicMock 6 | import sys 7 | sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) 8 | 9 | # Import the async tools to test 10 | from google_forms_server_mcp import ( 11 | create_form, 12 | add_section, 13 | add_short_answer, 14 | add_paragraph, 15 | add_multiple_choice, 16 | add_checkboxes, 17 | add_dropdown, 18 | add_file_upload, 19 | modify_form_settings, 20 | get_responses, 21 | export_responses, 22 | list_forms, 23 | ) 24 | 25 | # Mock Google API services 26 | @pytest.fixture 27 | def mock_google_services(): 28 | # Create mock services 29 | mock_form_service = MagicMock() 30 | mock_drive_service = MagicMock() 31 | mock_sheets_service = MagicMock() 32 | 33 | # Create mock context with mock services 34 | mock_context = MagicMock() 35 | mock_context.lifespan_context = MagicMock() 36 | mock_context.lifespan_context.form_service = mock_form_service 37 | mock_context.lifespan_context.drive_service = mock_drive_service 38 | mock_context.lifespan_context.sheets_service = mock_sheets_service 39 | 40 | return mock_context 41 | 42 | @pytest.mark.asyncio 43 | async def test_create_form(mock_google_services): 44 | """ 45 | Test that create_form creates a form correctly 46 | """ 47 | # Set up the mock response 48 | form_id = "abc123formid" 49 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 50 | mock_create = mock_forms_obj.create.return_value 51 | mock_create.execute.return_value = {"formId": form_id} 52 | 53 | # Call the create_form function 54 | form_title = "Test Form" 55 | form_description = "A test form" 56 | result = await create_form(title=form_title, description=form_description, ctx=mock_google_services) 57 | result_json = json.loads(result) 58 | 59 | # Check that the form creation was successful 60 | assert result_json["form_id"] == form_id 61 | assert result_json["title"] == form_title 62 | assert result_json["description"] == form_description 63 | assert "edit_url" in result_json 64 | assert "view_url" in result_json 65 | 66 | # Check that the correct API calls were made 67 | mock_forms_obj.create.assert_called_once() 68 | form_body = mock_forms_obj.create.call_args[1]["body"] 69 | assert form_body["info"]["title"] == form_title 70 | assert form_body["info"]["description"] == form_description 71 | 72 | @pytest.mark.asyncio 73 | async def test_add_section(mock_google_services): 74 | """ 75 | Test that add_section adds a section to a form 76 | """ 77 | # Set up the mock response 78 | form_id = "abc123formid" 79 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 80 | 81 | # Mock the get method 82 | mock_get = mock_forms_obj.get.return_value 83 | mock_get.execute.return_value = { 84 | "formId": form_id, 85 | "items": [{"item1": "data"}] 86 | } 87 | 88 | # Mock the batchUpdate method 89 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 90 | mock_batch_update.execute.return_value = {"success": True} 91 | 92 | # Call the add_section function 93 | section_title = "Test Section" 94 | section_description = "A test section" 95 | result = await add_section( 96 | form_id=form_id, 97 | title=section_title, 98 | description=section_description, 99 | ctx=mock_google_services 100 | ) 101 | result_json = json.loads(result) 102 | 103 | # Check that the section addition was successful 104 | assert result_json["form_id"] == form_id 105 | assert result_json["section_added"] == section_title 106 | assert result_json["status"] == "success" 107 | 108 | # Check that the correct API calls were made 109 | mock_forms_obj.get.assert_called_once_with(formId=form_id) 110 | mock_forms_obj.batchUpdate.assert_called_once() 111 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 112 | assert update_request["requests"][0]["createItem"]["item"]["title"] == section_title 113 | assert update_request["requests"][0]["createItem"]["item"]["description"] == section_description 114 | assert "pageBreakItem" in update_request["requests"][0]["createItem"]["item"] 115 | 116 | @pytest.mark.asyncio 117 | async def test_modify_form_settings(mock_google_services): 118 | """ 119 | Test that modify_form_settings updates form settings correctly 120 | """ 121 | # Set up the mock response 122 | form_id = "abc123formid" 123 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 124 | 125 | # Mock the get method 126 | mock_get = mock_forms_obj.get.return_value 127 | mock_get.execute.return_value = { 128 | "formId": form_id, 129 | "settings": {"collectEmail": False} 130 | } 131 | 132 | # Mock the batchUpdate method 133 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 134 | mock_batch_update.execute.return_value = {"success": True} 135 | 136 | # Call the modify_form_settings function 137 | collect_email = True 138 | result = await modify_form_settings( 139 | form_id=form_id, 140 | collect_email=collect_email, 141 | ctx=mock_google_services 142 | ) 143 | result_json = json.loads(result) 144 | 145 | # Check that the settings update was successful 146 | assert result_json["form_id"] == form_id 147 | assert result_json["settings_updated"] == True 148 | assert result_json["collect_email"] == collect_email 149 | 150 | # Check that the correct API calls were made 151 | mock_forms_obj.get.assert_called_once_with(formId=form_id) 152 | mock_forms_obj.batchUpdate.assert_called_once() 153 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 154 | assert update_request["requests"][0]["updateSettings"]["settings"]["collectEmail"] == collect_email 155 | assert update_request["requests"][0]["updateSettings"]["updateMask"] == "collectEmail" 156 | 157 | @pytest.mark.asyncio 158 | async def test_add_short_answer(mock_google_services): 159 | """ 160 | Test that add_short_answer adds a question to a form 161 | """ 162 | # Set up the mock response 163 | form_id = "abc123formid" 164 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 165 | 166 | # Mock the batchUpdate method 167 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 168 | mock_batch_update.execute.return_value = {"success": True} 169 | 170 | # Call the add_short_answer function 171 | question_text = "What is your name?" 172 | required = True 173 | help_text = "Please enter your full name" 174 | result = await add_short_answer( 175 | form_id=form_id, 176 | question_text=question_text, 177 | required=required, 178 | help_text=help_text, 179 | ctx=mock_google_services 180 | ) 181 | result_json = json.loads(result) 182 | 183 | # Check that the question addition was successful 184 | assert result_json["form_id"] == form_id 185 | assert result_json["question_text"] == question_text 186 | assert result_json["type"] == "short_answer" 187 | assert result_json["required"] == required 188 | assert result_json["status"] == "success" 189 | 190 | # Check that the correct API calls were made 191 | mock_forms_obj.batchUpdate.assert_called_once() 192 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 193 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 194 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 195 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 196 | assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == False 197 | 198 | @pytest.mark.asyncio 199 | async def test_add_paragraph(mock_google_services): 200 | """ 201 | Test that add_paragraph adds a paragraph question to a form 202 | """ 203 | # Set up the mock response 204 | form_id = "abc123formid" 205 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 206 | 207 | # Mock the batchUpdate method 208 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 209 | mock_batch_update.execute.return_value = {"success": True} 210 | 211 | # Call the add_paragraph function 212 | question_text = "Tell us about yourself" 213 | required = False 214 | help_text = "Write a brief description" 215 | result = await add_paragraph( 216 | form_id=form_id, 217 | question_text=question_text, 218 | required=required, 219 | help_text=help_text, 220 | ctx=mock_google_services 221 | ) 222 | result_json = json.loads(result) 223 | 224 | # Check that the question addition was successful 225 | assert result_json["form_id"] == form_id 226 | assert result_json["question_text"] == question_text 227 | assert result_json["type"] == "paragraph" 228 | assert result_json["required"] == required 229 | assert result_json["status"] == "success" 230 | 231 | # Check that the correct API calls were made 232 | mock_forms_obj.batchUpdate.assert_called_once() 233 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 234 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 235 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 236 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 237 | assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == True 238 | 239 | @pytest.mark.asyncio 240 | async def test_add_multiple_choice(mock_google_services): 241 | """ 242 | Test that add_multiple_choice adds a multiple choice question to a form 243 | """ 244 | # Set up the mock response 245 | form_id = "abc123formid" 246 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 247 | 248 | # Mock the batchUpdate method 249 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 250 | mock_batch_update.execute.return_value = {"success": True} 251 | 252 | # Call the add_multiple_choice function 253 | question_text = "What is your favorite color?" 254 | choices = ["Red", "Blue", "Green", "Yellow"] 255 | required = True 256 | help_text = "Select one option" 257 | result = await add_multiple_choice( 258 | form_id=form_id, 259 | question_text=question_text, 260 | choices=choices, 261 | required=required, 262 | help_text=help_text, 263 | ctx=mock_google_services 264 | ) 265 | result_json = json.loads(result) 266 | 267 | # Check that the question addition was successful 268 | assert result_json["form_id"] == form_id 269 | assert result_json["question_text"] == question_text 270 | assert result_json["type"] == "multiple_choice" 271 | assert result_json["choices"] == choices 272 | assert result_json["required"] == required 273 | assert result_json["status"] == "success" 274 | 275 | # Check that the correct API calls were made 276 | mock_forms_obj.batchUpdate.assert_called_once() 277 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 278 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 279 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 280 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 281 | 282 | # Check that the choices were set correctly 283 | choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] 284 | assert choice_question["type"] == "RADIO" 285 | assert len(choice_question["options"]) == len(choices) 286 | for i, choice in enumerate(choices): 287 | assert choice_question["options"][i]["value"] == choice 288 | assert choice_question["shuffle"] == False 289 | 290 | @pytest.mark.asyncio 291 | async def test_add_checkboxes(mock_google_services): 292 | """ 293 | Test that add_checkboxes adds a checkboxes question to a form 294 | """ 295 | # Set up the mock response 296 | form_id = "abc123formid" 297 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 298 | 299 | # Mock the batchUpdate method 300 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 301 | mock_batch_update.execute.return_value = {"success": True} 302 | 303 | # Call the add_checkboxes function 304 | question_text = "Which fruits do you like?" 305 | choices = ["Apple", "Banana", "Orange", "Strawberry"] 306 | required = True 307 | help_text = "Select all that apply" 308 | result = await add_checkboxes( 309 | form_id=form_id, 310 | question_text=question_text, 311 | choices=choices, 312 | required=required, 313 | help_text=help_text, 314 | ctx=mock_google_services 315 | ) 316 | result_json = json.loads(result) 317 | 318 | # Check that the question addition was successful 319 | assert result_json["form_id"] == form_id 320 | assert result_json["question_text"] == question_text 321 | assert result_json["type"] == "checkboxes" 322 | assert result_json["choices"] == choices 323 | assert result_json["required"] == required 324 | assert result_json["status"] == "success" 325 | 326 | # Check that the correct API calls were made 327 | mock_forms_obj.batchUpdate.assert_called_once() 328 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 329 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 330 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 331 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 332 | 333 | # Check that the choices were set correctly 334 | choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] 335 | assert choice_question["type"] == "CHECKBOX" 336 | assert len(choice_question["options"]) == len(choices) 337 | for i, choice in enumerate(choices): 338 | assert choice_question["options"][i]["value"] == choice 339 | assert choice_question["shuffle"] == False 340 | 341 | @pytest.mark.asyncio 342 | async def test_add_dropdown(mock_google_services): 343 | """ 344 | Test that add_dropdown adds a dropdown question to a form 345 | """ 346 | # Set up the mock response 347 | form_id = "abc123formid" 348 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 349 | 350 | # Mock the batchUpdate method 351 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 352 | mock_batch_update.execute.return_value = {"success": True} 353 | 354 | # Call the add_dropdown function 355 | question_text = "Select your country" 356 | choices = ["USA", "Canada", "UK", "Australia"] 357 | required = True 358 | help_text = "Select one" 359 | result = await add_dropdown( 360 | form_id=form_id, 361 | question_text=question_text, 362 | choices=choices, 363 | required=required, 364 | help_text=help_text, 365 | ctx=mock_google_services 366 | ) 367 | result_json = json.loads(result) 368 | 369 | # Check that the question addition was successful 370 | assert result_json["form_id"] == form_id 371 | assert result_json["question_text"] == question_text 372 | assert result_json["type"] == "dropdown" 373 | assert result_json["choices"] == choices 374 | assert result_json["required"] == required 375 | assert result_json["status"] == "success" 376 | 377 | # Check that the correct API calls were made 378 | mock_forms_obj.batchUpdate.assert_called_once() 379 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 380 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 381 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 382 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 383 | 384 | # Check that the choices were set correctly 385 | choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] 386 | assert choice_question["type"] == "DROP_DOWN" 387 | assert len(choice_question["options"]) == len(choices) 388 | for i, choice in enumerate(choices): 389 | assert choice_question["options"][i]["value"] == choice 390 | assert choice_question["shuffle"] == False 391 | 392 | @pytest.mark.asyncio 393 | async def test_add_file_upload(mock_google_services): 394 | """ 395 | Test that add_file_upload adds a file upload question to a form 396 | """ 397 | # Set up the mock response 398 | form_id = "abc123formid" 399 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 400 | 401 | # Mock the batchUpdate method 402 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 403 | mock_batch_update.execute.return_value = {"success": True} 404 | 405 | # Call the add_file_upload function 406 | question_text = "Upload your resume" 407 | required = True 408 | help_text = "PDF files only please" 409 | result = await add_file_upload( 410 | form_id=form_id, 411 | question_text=question_text, 412 | required=required, 413 | help_text=help_text, 414 | ctx=mock_google_services 415 | ) 416 | result_json = json.loads(result) 417 | 418 | # Check that the question addition was successful 419 | assert result_json["form_id"] == form_id 420 | assert result_json["question_text"] == question_text 421 | assert result_json["type"] == "file_upload" 422 | assert result_json["required"] == required 423 | assert result_json["status"] == "success" 424 | 425 | # Check that the correct API calls were made 426 | mock_forms_obj.batchUpdate.assert_called_once() 427 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 428 | assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text 429 | assert update_request["requests"][0]["createItem"]["item"]["required"] == required 430 | assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text 431 | 432 | # Check that it's a file upload question 433 | assert "fileUploadQuestion" in update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"] 434 | 435 | @pytest.mark.asyncio 436 | async def test_get_responses(mock_google_services): 437 | """ 438 | Test that get_responses retrieves form responses correctly 439 | """ 440 | # Set up the mock response 441 | form_id = "abc123formid" 442 | form_title = "Test Form" 443 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 444 | 445 | # Mock the get method 446 | mock_get = mock_forms_obj.get.return_value 447 | mock_get.execute.return_value = { 448 | "formId": form_id, 449 | "info": {"title": form_title}, 450 | "items": [ 451 | { 452 | "title": "What is your name?", 453 | "questionItem": { 454 | "question": { 455 | "questionId": "q1" 456 | } 457 | } 458 | }, 459 | { 460 | "title": "What is your age?", 461 | "questionItem": { 462 | "question": { 463 | "questionId": "q2" 464 | } 465 | } 466 | } 467 | ] 468 | } 469 | 470 | # Mock the responses.list method 471 | mock_responses = mock_forms_obj.responses.return_value 472 | mock_responses_list = mock_responses.list.return_value 473 | mock_responses_list.execute.return_value = { 474 | "responses": [ 475 | { 476 | "responseId": "resp1", 477 | "createTime": "2023-04-01T12:00:00Z", 478 | "answers": { 479 | "q1": { 480 | "textAnswers": { 481 | "answers": [ 482 | {"value": "John Doe"} 483 | ] 484 | } 485 | }, 486 | "q2": { 487 | "textAnswers": { 488 | "answers": [ 489 | {"value": "30"} 490 | ] 491 | } 492 | } 493 | } 494 | } 495 | ] 496 | } 497 | 498 | # Call the get_responses function 499 | result = await get_responses(form_id=form_id, ctx=mock_google_services) 500 | result_json = json.loads(result) 501 | 502 | # Check that the responses were retrieved successfully 503 | assert result_json["form_id"] == form_id 504 | assert result_json["title"] == form_title 505 | assert result_json["response_count"] == 1 506 | assert len(result_json["responses"]) == 1 507 | 508 | # Check the response details 509 | response = result_json["responses"][0] 510 | assert response["response_id"] == "resp1" 511 | assert response["timestamp"] == "2023-04-01T12:00:00Z" 512 | assert "What is your name?" in response["answers"] 513 | assert response["answers"]["What is your name?"] == "John Doe" 514 | assert "What is your age?" in response["answers"] 515 | assert response["answers"]["What is your age?"] == "30" 516 | 517 | # Check that the correct API calls were made 518 | mock_forms_obj.get.assert_called_once_with(formId=form_id) 519 | mock_forms_obj.responses.assert_called_once() 520 | mock_responses.list.assert_called_once_with(formId=form_id) 521 | 522 | @pytest.mark.asyncio 523 | async def test_export_responses(mock_google_services): 524 | """ 525 | Test that export_responses correctly sets up response export 526 | """ 527 | # Set up the mock response 528 | form_id = "abc123formid" 529 | form_title = "Test Form" 530 | sheet_id = "xyz789sheetid" 531 | 532 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 533 | mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value 534 | 535 | # Mock form get method 536 | mock_get = mock_forms_obj.get.return_value 537 | mock_get.execute.return_value = { 538 | "formId": form_id, 539 | "info": {"title": form_title}, 540 | "responderUri": f"https://docs.google.com/spreadsheets/d/{sheet_id}/edit" 541 | } 542 | 543 | # Call the export_responses function for CSV 544 | result_csv = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services) 545 | result_csv_json = json.loads(result_csv) 546 | 547 | # Call the export_responses function for sheets 548 | result_sheets = await export_responses(form_id=form_id, format="sheets", ctx=mock_google_services) 549 | result_sheets_json = json.loads(result_sheets) 550 | 551 | # Check the CSV export result 552 | assert result_csv_json["form_id"] == form_id 553 | assert result_csv_json["export_format"] == "csv" 554 | assert sheet_id in result_csv_json["download_link"] 555 | 556 | # Check the Sheets export result 557 | assert result_sheets_json["form_id"] == form_id 558 | assert result_sheets_json["export_format"] == "sheets" 559 | assert result_sheets_json["spreadsheet_id"] == sheet_id 560 | assert sheet_id in result_sheets_json["spreadsheet_link"] 561 | 562 | # Check that the correct API calls were made 563 | mock_forms_obj.get.assert_called_with(formId=form_id) 564 | 565 | @pytest.mark.asyncio 566 | async def test_export_responses_create_new(mock_google_services): 567 | """ 568 | Test that export_responses creates a new spreadsheet when none exists 569 | """ 570 | # Set up the mock response 571 | form_id = "abc123formid" 572 | form_title = "Test Form" 573 | new_sheet_id = "new789sheetid" 574 | 575 | mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value 576 | mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value 577 | 578 | # Mock form get method with no responderUri 579 | mock_get = mock_forms_obj.get.return_value 580 | mock_get.execute.return_value = { 581 | "formId": form_id, 582 | "info": {"title": form_title} 583 | } 584 | 585 | # Mock spreadsheet create method 586 | mock_create = mock_sheets_obj.create.return_value 587 | mock_create.execute.return_value = {"spreadsheetId": new_sheet_id} 588 | 589 | # Mock form batchUpdate method 590 | mock_batch_update = mock_forms_obj.batchUpdate.return_value 591 | mock_batch_update.execute.return_value = {"success": True} 592 | 593 | # Call the export_responses function 594 | result = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services) 595 | result_json = json.loads(result) 596 | 597 | # Check the export result 598 | assert result_json["form_id"] == form_id 599 | assert result_json["export_format"] == "csv" 600 | assert new_sheet_id in result_json["download_link"] 601 | 602 | # Check that the correct API calls were made 603 | mock_forms_obj.get.assert_called_with(formId=form_id) 604 | mock_sheets_obj.create.assert_called_once() 605 | 606 | # Check that we tried to link the form to the spreadsheet 607 | mock_forms_obj.batchUpdate.assert_called_once() 608 | update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] 609 | assert update_request["requests"][0]["updateSettings"]["settings"]["responseDestination"] == "SPREADSHEET" 610 | assert update_request["requests"][0]["updateSettings"]["settings"]["spreadsheetId"] == new_sheet_id 611 | 612 | @pytest.mark.asyncio 613 | async def test_list_forms(mock_google_services): 614 | """ 615 | Test that list_forms returns all forms correctly 616 | """ 617 | # Set up the mock response 618 | mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value 619 | mock_list = mock_drive_obj.list.return_value 620 | mock_list.execute.return_value = { 621 | "files": [ 622 | { 623 | "id": "form1", 624 | "name": "Customer Feedback", 625 | "webViewLink": "https://docs.google.com/forms/d/form1/viewform", 626 | "createdTime": "2023-01-15T12:00:00Z" 627 | }, 628 | { 629 | "id": "form2", 630 | "name": "Job Application", 631 | "webViewLink": "https://docs.google.com/forms/d/form2/viewform", 632 | "createdTime": "2023-02-20T10:30:00Z" 633 | } 634 | ] 635 | } 636 | 637 | # Call the list_forms function 638 | result = await list_forms(ctx=mock_google_services) 639 | result_json = json.loads(result) 640 | 641 | # Check that the forms were listed correctly 642 | assert len(result_json["forms"]) == 2 643 | 644 | # Check the first form details 645 | form1 = result_json["forms"][0] 646 | assert form1["id"] == "form1" 647 | assert form1["name"] == "Customer Feedback" 648 | assert form1["url"] == "https://docs.google.com/forms/d/form1/viewform" 649 | assert form1["created"] == "2023-01-15T12:00:00Z" 650 | 651 | # Check the second form details 652 | form2 = result_json["forms"][1] 653 | assert form2["id"] == "form2" 654 | assert form2["name"] == "Job Application" 655 | assert form2["url"] == "https://docs.google.com/forms/d/form2/viewform" 656 | assert form2["created"] == "2023-02-20T10:30:00Z" 657 | 658 | # Check that the correct API calls were made 659 | mock_drive_obj.list.assert_called_once() 660 | list_args = mock_drive_obj.list.call_args[1] 661 | assert list_args["q"] == "mimeType='application/vnd.google-apps.form'" 662 | assert "id" in list_args["fields"] 663 | assert "name" in list_args["fields"] 664 | assert "webViewLink" in list_args["fields"] 665 | assert "createdTime" in list_args["fields"] 666 | 667 | @pytest.mark.asyncio 668 | async def test_list_forms_empty(mock_google_services): 669 | """ 670 | Test that list_forms handles the case of no forms 671 | """ 672 | # Set up the mock response with no forms 673 | mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value 674 | mock_list = mock_drive_obj.list.return_value 675 | mock_list.execute.return_value = {"files": []} 676 | 677 | # Call the list_forms function 678 | result = await list_forms(ctx=mock_google_services) 679 | result_json = json.loads(result) 680 | 681 | # Check that the response is formatted correctly 682 | assert result_json["forms"] == [] 683 | assert "message" in result_json 684 | assert result_json["message"] == "No forms found" 685 | 686 | # Check that the correct API call was made 687 | mock_drive_obj.list.assert_called_once() ``` -------------------------------------------------------------------------------- /google_forms_mcp/google_forms_server_mcp.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 3 | # Licensed under the Apache License, Version 2.0 (the "License"); 4 | # you may not use this file except in compliance with the License. 5 | # You may obtain a copy of the License at 6 | # 7 | # http://www.apache.org/licenses/LICENSE-2.0 8 | # 9 | # Unless required by applicable law or agreed to in writing, software 10 | # distributed under the License is distributed on an "AS IS" BASIS, 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 | # See the License for the specific language governing permissions and 13 | # limitations under the License. 14 | # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= 15 | 16 | """ 17 | Google Forms MCP Server using FastMCP 18 | 19 | This server provides MCP tools for interacting with Google Forms. 20 | """ 21 | 22 | import os 23 | import json 24 | import pickle 25 | from typing import List, Optional, Dict, Any 26 | from contextlib import asynccontextmanager 27 | from collections.abc import AsyncIterator 28 | from dataclasses import dataclass 29 | 30 | import asyncio # noqa: F401 31 | from mcp.server.fastmcp import FastMCP, Context 32 | from camel.logger import get_logger 33 | 34 | from googleapiclient.discovery import build 35 | from google_auth_oauthlib.flow import InstalledAppFlow 36 | from google.auth.transport.requests import Request 37 | 38 | # Add necessary imports for WebSocket server 39 | import argparse 40 | 41 | logger = get_logger(__name__) 42 | 43 | # Define the scopes needed for Google Forms API 44 | SCOPES = [ 45 | 'https://www.googleapis.com/auth/forms', 46 | 'https://www.googleapis.com/auth/drive', 47 | 'https://www.googleapis.com/auth/spreadsheets' 48 | ] 49 | 50 | @dataclass 51 | class FormServices: 52 | """Class to hold the Google API services""" 53 | form_service: Any 54 | drive_service: Any 55 | sheets_service: Any 56 | 57 | 58 | # Find the app_lifespan function in google_forms_server_mcp.py 59 | # Look for this section (around line 60-80): 60 | 61 | # Update the app_lifespan function in google_forms_server_mcp.py with this code 62 | 63 | @asynccontextmanager 64 | async def app_lifespan(server: FastMCP) -> AsyncIterator[FormServices]: 65 | """Manage application lifecycle with Google API services""" 66 | logger.info("Initializing Google API services...") 67 | # Get credentials and initialize services 68 | creds = None 69 | 70 | # Token file stores the user's access and refresh tokens 71 | if os.path.exists('token.pickle'): 72 | with open('token.pickle', 'rb') as token: 73 | creds = pickle.load(token) 74 | 75 | # If there are no valid credentials, let the user log in 76 | if not creds or not creds.valid: 77 | if creds and creds.expired and creds.refresh_token: 78 | creds.refresh(Request()) 79 | else: 80 | # We need to make this work with async 81 | flow = InstalledAppFlow.from_client_secrets_file( 82 | 'credentials.json', SCOPES) 83 | 84 | # Generate the authorization URL 85 | auth_url, _ = flow.authorization_url(prompt='consent') 86 | print("\n\n================================================") 87 | print("Go to this URL in your browser to authenticate:") 88 | print(f"{auth_url}") 89 | print("================================================\n") 90 | 91 | # Get authorization code from user input (using asyncio event loop) 92 | loop = asyncio.get_event_loop() 93 | auth_code = await loop.run_in_executor( 94 | None, 95 | lambda: input("Enter the authorization code: ") 96 | ) 97 | 98 | # Exchange authorization code for credentials 99 | flow.fetch_token(code=auth_code) 100 | creds = flow.credentials 101 | 102 | # Save the credentials for the next run 103 | with open('token.pickle', 'wb') as token: 104 | pickle.dump(creds, token) 105 | 106 | # Build API services 107 | form_service = build('forms', 'v1', credentials=creds) 108 | drive_service = build('drive', 'v3', credentials=creds) 109 | sheets_service = build('sheets', 'v4', credentials=creds) 110 | 111 | services = FormServices( 112 | form_service=form_service, 113 | drive_service=drive_service, 114 | sheets_service=sheets_service 115 | ) 116 | 117 | logger.info("Google API services initialized successfully") 118 | 119 | try: 120 | yield services 121 | finally: 122 | # No specific cleanup needed for Google API services 123 | logger.info("Shutting down Google API services") 124 | 125 | # Create the MCP server with lifespan support 126 | mcp = FastMCP( 127 | "GoogleForms", 128 | lifespan=app_lifespan, 129 | # Optionally add description, version etc. 130 | # description="MCP Server for Google Forms interaction.", 131 | # version="0.1.0" 132 | ) 133 | 134 | # Form Structure Tools 135 | 136 | @mcp.tool() 137 | async def create_form(title: str, description: str = "", ctx: Context = None) -> str: 138 | """ 139 | Create a new Google Form with title and description 140 | 141 | Args: 142 | title (str): The title of the form 143 | description (str): The description of the form 144 | 145 | Returns: 146 | str: JSON string containing form details including ID 147 | """ 148 | logger.info(f"create_form triggered with title: {title}") 149 | 150 | try: 151 | services = ctx.lifespan_context 152 | form_service = services.form_service 153 | 154 | form_body = { 155 | "info": { 156 | "title": title, 157 | "documentTitle": title 158 | } 159 | } 160 | 161 | if description: 162 | form_body["info"]["description"] = description 163 | 164 | result = form_service.forms().create(body=form_body).execute() 165 | 166 | return json.dumps({ 167 | "form_id": result["formId"], 168 | "title": title, 169 | "description": description, 170 | "edit_url": f"https://docs.google.com/forms/d/{result['formId']}/edit", 171 | "view_url": f"https://docs.google.com/forms/d/{result['formId']}/viewform" 172 | }, indent=2) 173 | except Exception as e: 174 | return f"Error creating form: {e}" 175 | 176 | create_form.inputSchema = { 177 | "type": "object", 178 | "properties": { 179 | "title": { 180 | "type": "string", 181 | "title": "Form Title", 182 | "description": "The title of the Google Form" 183 | }, 184 | "description": { 185 | "type": "string", 186 | "title": "Form Description", 187 | "description": "The description of the Google Form" 188 | } 189 | }, 190 | "required": ["title"] 191 | } 192 | 193 | @mcp.tool() 194 | async def add_section(form_id: str, title: str, description: str = "", ctx: Context = None) -> str: 195 | """ 196 | Add a section to a Google Form 197 | 198 | Args: 199 | form_id (str): The ID of the form 200 | title (str): The title of the section 201 | description (str): The description of the section 202 | 203 | Returns: 204 | str: JSON string containing the updated form details 205 | """ 206 | logger.info(f"add_section triggered with form_id: {form_id}, title: {title}") 207 | 208 | try: 209 | services = ctx.lifespan_context 210 | form_service = services.form_service 211 | 212 | # First, get the current form 213 | form = form_service.forms().get(formId=form_id).execute() 214 | 215 | # Create the update request 216 | update_request = { 217 | "requests": [ 218 | { 219 | "createItem": { 220 | "item": { 221 | "title": title, 222 | "description": description, 223 | "pageBreakItem": {} 224 | }, 225 | "location": { 226 | "index": len(form.get("items", [])) 227 | } 228 | } 229 | } 230 | ] 231 | } 232 | 233 | # Execute the update 234 | updated_form = form_service.forms().batchUpdate( 235 | formId=form_id, body=update_request).execute() 236 | 237 | return json.dumps({ 238 | "form_id": form_id, 239 | "section_added": title, 240 | "status": "success" 241 | }, indent=2) 242 | except Exception as e: 243 | return f"Error adding section: {e}" 244 | 245 | add_section.inputSchema = { 246 | "type": "object", 247 | "properties": { 248 | "form_id": { 249 | "type": "string", 250 | "title": "Form ID", 251 | "description": "The ID of the Google Form" 252 | }, 253 | "title": { 254 | "type": "string", 255 | "title": "Section Title", 256 | "description": "The title of the section" 257 | }, 258 | "description": { 259 | "type": "string", 260 | "title": "Section Description", 261 | "description": "The description of the section" 262 | } 263 | }, 264 | "required": ["form_id", "title"] 265 | } 266 | 267 | @mcp.tool() 268 | async def modify_form_settings( 269 | form_id: str, 270 | collect_email: Optional[bool] = None, 271 | limit_responses: Optional[bool] = None, 272 | response_limit: Optional[int] = None, 273 | ctx: Context = None 274 | ) -> str: 275 | """ 276 | Modify Google Form settings 277 | 278 | Args: 279 | form_id (str): The ID of the form 280 | collect_email (bool, optional): Whether to collect email addresses 281 | limit_responses (bool, optional): Whether to limit responses 282 | response_limit (int, optional): Maximum number of responses allowed 283 | 284 | Returns: 285 | str: JSON string containing the updated form settings 286 | """ 287 | logger.info(f"modify_form_settings triggered with form_id: {form_id}") 288 | 289 | try: 290 | services = ctx.lifespan_context 291 | form_service = services.form_service 292 | 293 | # Get the current form 294 | form = form_service.forms().get(formId=form_id).execute() 295 | 296 | updates = [] 297 | 298 | # Update collect email setting 299 | if collect_email is not None: 300 | updates.append({ 301 | "updateSettings": { 302 | "settings": { 303 | "collectEmail": collect_email 304 | }, 305 | "updateMask": "collectEmail" 306 | } 307 | }) 308 | 309 | # Update response limit settings 310 | if limit_responses is not None and not limit_responses: 311 | updates.append({ 312 | "updateSettings": { 313 | "settings": { 314 | "isQuiz": False 315 | }, 316 | "updateMask": "isQuiz" 317 | } 318 | }) 319 | elif limit_responses and response_limit: 320 | updates.append({ 321 | "updateSettings": { 322 | "settings": { 323 | "limitOneResponsePerUser": True 324 | }, 325 | "updateMask": "limitOneResponsePerUser" 326 | } 327 | }) 328 | 329 | # Execute the updates if any 330 | if updates: 331 | update_request = {"requests": updates} 332 | updated_form = form_service.forms().batchUpdate( 333 | formId=form_id, body=update_request).execute() 334 | 335 | return json.dumps({ 336 | "form_id": form_id, 337 | "settings_updated": True, 338 | "collect_email": collect_email, 339 | "limit_responses": limit_responses, 340 | "response_limit": response_limit 341 | }, indent=2) 342 | except Exception as e: 343 | return f"Error modifying form settings: {e}" 344 | 345 | modify_form_settings.inputSchema = { 346 | "type": "object", 347 | "properties": { 348 | "form_id": { 349 | "type": "string", 350 | "title": "Form ID", 351 | "description": "The ID of the Google Form" 352 | }, 353 | "collect_email": { 354 | "type": "boolean", 355 | "title": "Collect Email", 356 | "description": "Whether to collect email addresses" 357 | }, 358 | "limit_responses": { 359 | "type": "boolean", 360 | "title": "Limit Responses", 361 | "description": "Whether to limit responses" 362 | }, 363 | "response_limit": { 364 | "type": "integer", 365 | "title": "Response Limit", 366 | "description": "Maximum number of responses allowed" 367 | } 368 | }, 369 | "required": ["form_id"] 370 | } 371 | 372 | # Question Type Tools 373 | 374 | @mcp.tool() 375 | async def add_short_answer( 376 | form_id: str, 377 | question_text: str, 378 | required: bool = False, 379 | help_text: str = "", 380 | ctx: Context = None 381 | ) -> str: 382 | """ 383 | Add a short answer question to a Google Form 384 | 385 | Args: 386 | form_id (str): The ID of the form 387 | question_text (str): The text of the question 388 | required (bool, optional): Whether the question is required 389 | help_text (str, optional): Help text for the question 390 | 391 | Returns: 392 | str: JSON string containing the question details 393 | """ 394 | logger.info(f"add_short_answer triggered with form_id: {form_id}, question: {question_text}") 395 | 396 | try: 397 | services = ctx.lifespan_context 398 | form_service = services.form_service 399 | 400 | question_item = { 401 | "title": question_text, 402 | "required": required, 403 | "textQuestion": { 404 | "paragraph": False 405 | } 406 | } 407 | 408 | if help_text: 409 | question_item["description"] = help_text 410 | 411 | update_request = { 412 | "requests": [ 413 | { 414 | "createItem": { 415 | "item": question_item, 416 | "location": { 417 | "index": 0 418 | } 419 | } 420 | } 421 | ] 422 | } 423 | 424 | result = form_service.forms().batchUpdate( 425 | formId=form_id, body=update_request).execute() 426 | 427 | return json.dumps({ 428 | "form_id": form_id, 429 | "question_text": question_text, 430 | "type": "short_answer", 431 | "required": required, 432 | "status": "success" 433 | }, indent=2) 434 | except Exception as e: 435 | return f"Error adding short answer question: {e}" 436 | 437 | add_short_answer.inputSchema = { 438 | "type": "object", 439 | "properties": { 440 | "form_id": { 441 | "type": "string", 442 | "title": "Form ID", 443 | "description": "The ID of the Google Form" 444 | }, 445 | "question_text": { 446 | "type": "string", 447 | "title": "Question Text", 448 | "description": "The text of the question" 449 | }, 450 | "required": { 451 | "type": "boolean", 452 | "title": "Required", 453 | "description": "Whether the question is required" 454 | }, 455 | "help_text": { 456 | "type": "string", 457 | "title": "Help Text", 458 | "description": "Help text for the question" 459 | } 460 | }, 461 | "required": ["form_id", "question_text"] 462 | } 463 | 464 | @mcp.tool() 465 | async def add_paragraph( 466 | form_id: str, 467 | question_text: str, 468 | required: bool = False, 469 | help_text: str = "", 470 | ctx: Context = None 471 | ) -> str: 472 | """ 473 | Add a paragraph question to a Google Form 474 | 475 | Args: 476 | form_id (str): The ID of the form 477 | question_text (str): The text of the question 478 | required (bool, optional): Whether the question is required 479 | help_text (str, optional): Help text for the question 480 | 481 | Returns: 482 | str: JSON string containing the question details 483 | """ 484 | logger.info(f"add_paragraph triggered with form_id: {form_id}, question: {question_text}") 485 | 486 | try: 487 | services = ctx.lifespan_context 488 | form_service = services.form_service 489 | 490 | question_item = { 491 | "title": question_text, 492 | "required": required, 493 | "textQuestion": { 494 | "paragraph": True 495 | } 496 | } 497 | 498 | if help_text: 499 | question_item["description"] = help_text 500 | 501 | update_request = { 502 | "requests": [ 503 | { 504 | "createItem": { 505 | "item": question_item, 506 | "location": { 507 | "index": 0 508 | } 509 | } 510 | } 511 | ] 512 | } 513 | 514 | result = form_service.forms().batchUpdate( 515 | formId=form_id, body=update_request).execute() 516 | 517 | return json.dumps({ 518 | "form_id": form_id, 519 | "question_text": question_text, 520 | "type": "paragraph", 521 | "required": required, 522 | "status": "success" 523 | }, indent=2) 524 | except Exception as e: 525 | return f"Error adding paragraph question: {e}" 526 | 527 | add_paragraph.inputSchema = { 528 | "type": "object", 529 | "properties": { 530 | "form_id": { 531 | "type": "string", 532 | "title": "Form ID", 533 | "description": "The ID of the Google Form" 534 | }, 535 | "question_text": { 536 | "type": "string", 537 | "title": "Question Text", 538 | "description": "The text of the question" 539 | }, 540 | "required": { 541 | "type": "boolean", 542 | "title": "Required", 543 | "description": "Whether the question is required" 544 | }, 545 | "help_text": { 546 | "type": "string", 547 | "title": "Help Text", 548 | "description": "Help text for the question" 549 | } 550 | }, 551 | "required": ["form_id", "question_text"] 552 | } 553 | 554 | @mcp.tool() 555 | async def add_multiple_choice( 556 | form_id: str, 557 | question_text: str, 558 | choices: List[str], 559 | required: bool = False, 560 | help_text: str = "", 561 | ctx: Context = None 562 | ) -> str: 563 | """ 564 | Add a multiple choice question to a Google Form 565 | 566 | Args: 567 | form_id (str): The ID of the form 568 | question_text (str): The text of the question 569 | choices (List[str]): List of choices for the multiple choice question 570 | required (bool, optional): Whether the question is required 571 | help_text (str, optional): Help text for the question 572 | 573 | Returns: 574 | str: JSON string containing the question details 575 | """ 576 | logger.info(f"add_multiple_choice triggered with form_id: {form_id}, question: {question_text}") 577 | 578 | try: 579 | services = ctx.lifespan_context 580 | form_service = services.form_service 581 | 582 | # Create choices objects 583 | choice_items = [{"value": choice} for choice in choices] 584 | 585 | question_item = { 586 | "title": question_text, 587 | "required": required, 588 | "questionItem": { 589 | "question": { 590 | "choiceQuestion": { 591 | "type": "RADIO", 592 | "options": choice_items, 593 | "shuffle": False 594 | } 595 | } 596 | } 597 | } 598 | 599 | if help_text: 600 | question_item["description"] = help_text 601 | 602 | update_request = { 603 | "requests": [ 604 | { 605 | "createItem": { 606 | "item": question_item, 607 | "location": { 608 | "index": 0 609 | } 610 | } 611 | } 612 | ] 613 | } 614 | 615 | result = form_service.forms().batchUpdate( 616 | formId=form_id, body=update_request).execute() 617 | 618 | return json.dumps({ 619 | "form_id": form_id, 620 | "question_text": question_text, 621 | "type": "multiple_choice", 622 | "choices": choices, 623 | "required": required, 624 | "status": "success" 625 | }, indent=2) 626 | except Exception as e: 627 | return f"Error adding multiple choice question: {e}" 628 | 629 | add_multiple_choice.inputSchema = { 630 | "type": "object", 631 | "properties": { 632 | "form_id": { 633 | "type": "string", 634 | "title": "Form ID", 635 | "description": "The ID of the Google Form" 636 | }, 637 | "question_text": { 638 | "type": "string", 639 | "title": "Question Text", 640 | "description": "The text of the question" 641 | }, 642 | "choices": { 643 | "type": "array", 644 | "items": { 645 | "type": "string" 646 | }, 647 | "title": "Choices", 648 | "description": "List of choices for the multiple choice question" 649 | }, 650 | "required": { 651 | "type": "boolean", 652 | "title": "Required", 653 | "description": "Whether the question is required" 654 | }, 655 | "help_text": { 656 | "type": "string", 657 | "title": "Help Text", 658 | "description": "Help text for the question" 659 | } 660 | }, 661 | "required": ["form_id", "question_text", "choices"] 662 | } 663 | 664 | @mcp.tool() 665 | async def add_checkboxes( 666 | form_id: str, 667 | question_text: str, 668 | choices: List[str], 669 | required: bool = False, 670 | help_text: str = "", 671 | ctx: Context = None 672 | ) -> str: 673 | """ 674 | Add a checkboxes question to a Google Form 675 | 676 | Args: 677 | form_id (str): The ID of the form 678 | question_text (str): The text of the question 679 | choices (List[str]): List of choices for the checkboxes 680 | required (bool, optional): Whether the question is required 681 | help_text (str, optional): Help text for the question 682 | 683 | Returns: 684 | str: JSON string containing the question details 685 | """ 686 | logger.info(f"add_checkboxes triggered with form_id: {form_id}, question: {question_text}") 687 | 688 | try: 689 | services = ctx.lifespan_context 690 | form_service = services.form_service 691 | 692 | # Create choices objects 693 | choice_items = [{"value": choice} for choice in choices] 694 | 695 | question_item = { 696 | "title": question_text, 697 | "required": required, 698 | "questionItem": { 699 | "question": { 700 | "choiceQuestion": { 701 | "type": "CHECKBOX", 702 | "options": choice_items, 703 | "shuffle": False 704 | } 705 | } 706 | } 707 | } 708 | 709 | if help_text: 710 | question_item["description"] = help_text 711 | 712 | update_request = { 713 | "requests": [ 714 | { 715 | "createItem": { 716 | "item": question_item, 717 | "location": { 718 | "index": 0 719 | } 720 | } 721 | } 722 | ] 723 | } 724 | 725 | result = form_service.forms().batchUpdate( 726 | formId=form_id, body=update_request).execute() 727 | 728 | return json.dumps({ 729 | "form_id": form_id, 730 | "question_text": question_text, 731 | "type": "checkboxes", 732 | "choices": choices, 733 | "required": required, 734 | "status": "success" 735 | }, indent=2) 736 | except Exception as e: 737 | return f"Error adding checkboxes question: {e}" 738 | 739 | add_checkboxes.inputSchema = { 740 | "type": "object", 741 | "properties": { 742 | "form_id": { 743 | "type": "string", 744 | "title": "Form ID", 745 | "description": "The ID of the Google Form" 746 | }, 747 | "question_text": { 748 | "type": "string", 749 | "title": "Question Text", 750 | "description": "The text of the question" 751 | }, 752 | "choices": { 753 | "type": "array", 754 | "items": { 755 | "type": "string" 756 | }, 757 | "title": "Choices", 758 | "description": "List of choices for the checkboxes" 759 | }, 760 | "required": { 761 | "type": "boolean", 762 | "title": "Required", 763 | "description": "Whether the question is required" 764 | }, 765 | "help_text": { 766 | "type": "string", 767 | "title": "Help Text", 768 | "description": "Help text for the question" 769 | } 770 | }, 771 | "required": ["form_id", "question_text", "choices"] 772 | } 773 | 774 | @mcp.tool() 775 | async def add_dropdown( 776 | form_id: str, 777 | question_text: str, 778 | choices: List[str], 779 | required: bool = False, 780 | help_text: str = "", 781 | ctx: Context = None 782 | ) -> str: 783 | """ 784 | Add a dropdown question to a Google Form 785 | 786 | Args: 787 | form_id (str): The ID of the form 788 | question_text (str): The text of the question 789 | choices (List[str]): List of choices for the dropdown 790 | required (bool, optional): Whether the question is required 791 | help_text (str, optional): Help text for the question 792 | 793 | Returns: 794 | str: JSON string containing the question details 795 | """ 796 | logger.info(f"add_dropdown triggered with form_id: {form_id}, question: {question_text}") 797 | 798 | try: 799 | services = ctx.lifespan_context 800 | form_service = services.form_service 801 | 802 | # Create choices objects 803 | choice_items = [{"value": choice} for choice in choices] 804 | 805 | question_item = { 806 | "title": question_text, 807 | "required": required, 808 | "questionItem": { 809 | "question": { 810 | "choiceQuestion": { 811 | "type": "DROP_DOWN", 812 | "options": choice_items, 813 | "shuffle": False 814 | } 815 | } 816 | } 817 | } 818 | 819 | if help_text: 820 | question_item["description"] = help_text 821 | 822 | update_request = { 823 | "requests": [ 824 | { 825 | "createItem": { 826 | "item": question_item, 827 | "location": { 828 | "index": 0 829 | } 830 | } 831 | } 832 | ] 833 | } 834 | 835 | result = form_service.forms().batchUpdate( 836 | formId=form_id, body=update_request).execute() 837 | 838 | return json.dumps({ 839 | "form_id": form_id, 840 | "question_text": question_text, 841 | "type": "dropdown", 842 | "choices": choices, 843 | "required": required, 844 | "status": "success" 845 | }, indent=2) 846 | except Exception as e: 847 | return f"Error adding dropdown question: {e}" 848 | 849 | add_dropdown.inputSchema = { 850 | "type": "object", 851 | "properties": { 852 | "form_id": { 853 | "type": "string", 854 | "title": "Form ID", 855 | "description": "The ID of the Google Form" 856 | }, 857 | "question_text": { 858 | "type": "string", 859 | "title": "Question Text", 860 | "description": "The text of the question" 861 | }, 862 | "choices": { 863 | "type": "array", 864 | "items": { 865 | "type": "string" 866 | }, 867 | "title": "Choices", 868 | "description": "List of choices for the dropdown" 869 | }, 870 | "required": { 871 | "type": "boolean", 872 | "title": "Required", 873 | "description": "Whether the question is required" 874 | }, 875 | "help_text": { 876 | "type": "string", 877 | "title": "Help Text", 878 | "description": "Help text for the question" 879 | } 880 | }, 881 | "required": ["form_id", "question_text", "choices"] 882 | } 883 | 884 | @mcp.tool() 885 | async def add_file_upload( 886 | form_id: str, 887 | question_text: str, 888 | required: bool = False, 889 | help_text: str = "", 890 | ctx: Context = None 891 | ) -> str: 892 | """ 893 | Add a file upload question to a Google Form 894 | 895 | Args: 896 | form_id (str): The ID of the form 897 | question_text (str): The text of the question 898 | required (bool, optional): Whether the question is required 899 | help_text (str, optional): Help text for the question 900 | 901 | Returns: 902 | str: JSON string containing the question details 903 | """ 904 | logger.info(f"add_file_upload triggered with form_id: {form_id}, question: {question_text}") 905 | 906 | try: 907 | services = ctx.lifespan_context 908 | form_service = services.form_service 909 | 910 | question_item = { 911 | "title": question_text, 912 | "required": required, 913 | "questionItem": { 914 | "question": { 915 | "fileUploadQuestion": { 916 | "folderId": None # This will use the default folder 917 | } 918 | } 919 | } 920 | } 921 | 922 | if help_text: 923 | question_item["description"] = help_text 924 | 925 | update_request = { 926 | "requests": [ 927 | { 928 | "createItem": { 929 | "item": question_item, 930 | "location": { 931 | "index": 0 932 | } 933 | } 934 | } 935 | ] 936 | } 937 | 938 | result = form_service.forms().batchUpdate( 939 | formId=form_id, body=update_request).execute() 940 | 941 | return json.dumps({ 942 | "form_id": form_id, 943 | "question_text": question_text, 944 | "type": "file_upload", 945 | "required": required, 946 | "status": "success" 947 | }, indent=2) 948 | except Exception as e: 949 | return f"Error adding file upload question: {e}" 950 | 951 | add_file_upload.inputSchema = { 952 | "type": "object", 953 | "properties": { 954 | "form_id": { 955 | "type": "string", 956 | "title": "Form ID", 957 | "description": "The ID of the Google Form" 958 | }, 959 | "question_text": { 960 | "type": "string", 961 | "title": "Question Text", 962 | "description": "The text of the question" 963 | }, 964 | "required": { 965 | "type": "boolean", 966 | "title": "Required", 967 | "description": "Whether the question is required" 968 | }, 969 | "help_text": { 970 | "type": "string", 971 | "title": "Help Text", 972 | "description": "Help text for the question" 973 | } 974 | }, 975 | "required": ["form_id", "question_text"] 976 | } 977 | 978 | # Response Management Tools 979 | 980 | @mcp.tool() 981 | async def get_responses(form_id: str, ctx: Context = None) -> str: 982 | """ 983 | Get responses from a Google Form 984 | 985 | Args: 986 | form_id (str): The ID of the form 987 | 988 | Returns: 989 | str: JSON string containing the form responses 990 | """ 991 | logger.info(f"get_responses triggered with form_id: {form_id}") 992 | 993 | try: 994 | services = ctx.lifespan_context 995 | form_service = services.form_service 996 | 997 | # Get the form first to check if it has a linked response sheet 998 | form = form_service.forms().get(formId=form_id).execute() 999 | 1000 | # Get responses 1001 | responses = form_service.forms().responses().list(formId=form_id).execute() 1002 | 1003 | # Simplify the response data for readability 1004 | simplified_responses = [] 1005 | for response in responses.get("responses", []): 1006 | answer_data = {} 1007 | for key, value in response.get("answers", {}).items(): 1008 | question_id = key 1009 | # Try to get the question text from the form 1010 | question_text = "Unknown Question" 1011 | for item in form.get("items", []): 1012 | if item.get("questionItem", {}).get("question", {}).get("questionId") == question_id: 1013 | question_text = item.get("title", "Unknown Question") 1014 | break 1015 | 1016 | # Extract the answer value based on type 1017 | answer_value = None 1018 | if "textAnswers" in value: 1019 | answer_value = value["textAnswers"]["answers"][0]["value"] 1020 | elif "fileUploadAnswers" in value: 1021 | answer_value = [file.get("fileId") for file in value["fileUploadAnswers"]["answers"]] 1022 | elif "choiceAnswers" in value: 1023 | answer_value = [choice.get("value") for choice in value["choiceAnswers"]["answers"]] 1024 | 1025 | answer_data[question_text] = answer_value 1026 | 1027 | simplified_responses.append({ 1028 | "response_id": response.get("responseId"), 1029 | "timestamp": response.get("createTime"), 1030 | "answers": answer_data 1031 | }) 1032 | 1033 | return json.dumps({ 1034 | "form_id": form_id, 1035 | "title": form.get("info", {}).get("title", ""), 1036 | "response_count": len(responses.get("responses", [])), 1037 | "responses": simplified_responses 1038 | }, indent=2) 1039 | except Exception as e: 1040 | return f"Error getting responses: {e}" 1041 | 1042 | get_responses.inputSchema = { 1043 | "type": "object", 1044 | "properties": { 1045 | "form_id": { 1046 | "type": "string", 1047 | "title": "Form ID", 1048 | "description": "The ID of the Google Form" 1049 | } 1050 | }, 1051 | "required": ["form_id"] 1052 | } 1053 | 1054 | @mcp.tool() 1055 | async def export_responses(form_id: str, format: str = "csv", ctx: Context = None) -> str: 1056 | """ 1057 | Export responses from a Google Form to a spreadsheet or CSV 1058 | 1059 | Args: 1060 | form_id (str): The ID of the form 1061 | format (str, optional): The format to export (csv or sheets) 1062 | 1063 | Returns: 1064 | str: JSON string containing the export details 1065 | """ 1066 | logger.info(f"export_responses triggered with form_id: {form_id}, format: {format}") 1067 | 1068 | try: 1069 | services = ctx.lifespan_context 1070 | form_service = services.form_service 1071 | drive_service = services.drive_service 1072 | sheets_service = services.sheets_service 1073 | 1074 | # Get the form 1075 | form = form_service.forms().get(formId=form_id).execute() 1076 | 1077 | # Check if there's already a response spreadsheet 1078 | response_sheet_id = None 1079 | try: 1080 | form_info = form_service.forms().get(formId=form_id).execute() 1081 | if "responderUri" in form_info: 1082 | # Extract the spreadsheet ID from the responder URI 1083 | uri_parts = form_info["responderUri"].split("/") 1084 | if len(uri_parts) > 5: 1085 | response_sheet_id = uri_parts[5] 1086 | except Exception as e: 1087 | logger.error(f"Error getting form: {e}") 1088 | 1089 | if not response_sheet_id: 1090 | # Create a new spreadsheet for responses 1091 | spreadsheet_body = { 1092 | 'properties': { 1093 | 'title': f"Responses for {form.get('info', {}).get('title', 'Form')}" 1094 | } 1095 | } 1096 | sheet = sheets_service.spreadsheets().create(body=spreadsheet_body).execute() 1097 | response_sheet_id = sheet.get('spreadsheetId') 1098 | 1099 | # Link the form to the spreadsheet 1100 | update_request = { 1101 | "requests": [ 1102 | { 1103 | "updateSettings": { 1104 | "settings": { 1105 | "responseDestination": "SPREADSHEET", 1106 | "spreadsheetId": response_sheet_id 1107 | }, 1108 | "updateMask": "responseDestination,spreadsheetId" 1109 | } 1110 | } 1111 | ] 1112 | } 1113 | form_service.forms().batchUpdate(formId=form_id, body=update_request).execute() 1114 | 1115 | # For CSV format, create a link to download as CSV 1116 | if format.lower() == "csv": 1117 | csv_export_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/export?format=csv" 1118 | return json.dumps({ 1119 | "form_id": form_id, 1120 | "export_format": "csv", 1121 | "download_link": csv_export_link 1122 | }, indent=2) 1123 | else: 1124 | # For sheets format, return the spreadsheet link 1125 | sheets_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/edit" 1126 | return json.dumps({ 1127 | "form_id": form_id, 1128 | "export_format": "sheets", 1129 | "spreadsheet_id": response_sheet_id, 1130 | "spreadsheet_link": sheets_link 1131 | }, indent=2) 1132 | except Exception as e: 1133 | return f"Error exporting responses: {e}" 1134 | 1135 | export_responses.inputSchema = { 1136 | "type": "object", 1137 | "properties": { 1138 | "form_id": { 1139 | "type": "string", 1140 | "title": "Form ID", 1141 | "description": "The ID of the Google Form" 1142 | }, 1143 | "format": { 1144 | "type": "string", 1145 | "title": "Export Format", 1146 | "description": "The format to export (csv or sheets)", 1147 | "enum": ["csv", "sheets"] 1148 | } 1149 | }, 1150 | "required": ["form_id"] 1151 | } 1152 | 1153 | @mcp.tool() 1154 | async def list_forms(ctx: Context = None) -> str: 1155 | """ 1156 | List all Google Forms created by the user 1157 | 1158 | Returns: 1159 | str: JSON string containing the list of forms 1160 | """ 1161 | logger.info("list_forms triggered") 1162 | 1163 | try: 1164 | services = ctx.lifespan_context 1165 | drive_service = services.drive_service 1166 | 1167 | # Search for Google Forms files 1168 | results = drive_service.files().list( 1169 | q="mimeType='application/vnd.google-apps.form'", 1170 | spaces='drive', 1171 | fields='files(id, name, webViewLink, createdTime)' 1172 | ).execute() 1173 | 1174 | forms = results.get('files', []) 1175 | 1176 | if not forms: 1177 | return json.dumps({"forms": [], "message": "No forms found"}, indent=2) 1178 | 1179 | # Format the forms list 1180 | forms_list = [] 1181 | for form in forms: 1182 | forms_list.append({ 1183 | "id": form.get('id'), 1184 | "name": form.get('name'), 1185 | "url": form.get('webViewLink'), 1186 | "created": form.get('createdTime') 1187 | }) 1188 | 1189 | return json.dumps({"forms": forms_list}, indent=2) 1190 | except Exception as e: 1191 | return f"Error listing forms: {e}" 1192 | 1193 | list_forms.inputSchema = { 1194 | "type": "object", 1195 | "properties": {}, 1196 | "required": [] 1197 | } 1198 | 1199 | # Update the main function to handle different transports using mcp.run 1200 | def main(transport: str = "stdio", host: str = "127.0.0.1", port: int = 8000): 1201 | """Starts the MCP server using the specified transport via mcp.run().""" 1202 | 1203 | # Pass arguments like host and port if mcp.run supports them 1204 | # Assuming mcp.run handles stdio and ws transports appropriately 1205 | logger.info(f"Starting MCP server with {transport} transport...") 1206 | try: 1207 | # Check if mcp.run accepts host/port for relevant transports (like ws) 1208 | # This structure assumes mcp.run handles the server lifecycles 1209 | if transport == "ws": 1210 | # We might need to pass host/port here if mcp.run accepts them 1211 | # Trying without them first, assuming defaults or internal handling 1212 | # If it fails, we may need to inspect mcp.run or FastMCP source 1213 | # mcp.run(transport=transport) # Run with only transport for ws - Changed to sse 1214 | # Assuming mcp.run handles sse transport correctly 1215 | mcp.run(transport=transport) 1216 | elif transport == "stdio": 1217 | mcp.run(transport=transport) 1218 | else: 1219 | # logger.error(f"Unsupported transport type for mcp.run: {transport}") 1220 | # Since we only support stdio and sse now, this path shouldn't be hit with arg choices 1221 | # If it somehow is, mcp.run will raise its own error. 1222 | # We'll rely on mcp.run to validate the transport. 1223 | mcp.run(transport=transport) 1224 | 1225 | except AttributeError: 1226 | logger.error(f"mcp.run does not support the arguments provided for transport '{transport}'. Trying without host/port.") 1227 | try: 1228 | mcp.run(transport=transport) 1229 | except Exception as e: 1230 | logger.error(f"Failed to start server with mcp.run(transport='{transport}'): {e}") 1231 | except Exception as e: 1232 | logger.error(f"Failed to start server with mcp.run: {e}") 1233 | 1234 | # Add argument parsing for command-line execution 1235 | if __name__ == "__main__": 1236 | parser = argparse.ArgumentParser(description="Google Forms MCP Server") 1237 | parser.add_argument( 1238 | "--transport", 1239 | default="sse", # Default to Server-Sent Events 1240 | choices=["stdio", "sse"], # Only allow stdio or sse 1241 | help="Server transport method (default: sse)" 1242 | ) 1243 | parser.add_argument( 1244 | "--host", 1245 | default="127.0.0.1", 1246 | help="Host address for WebSocket server (default: 127.0.0.1)" 1247 | ) 1248 | parser.add_argument( 1249 | "--port", 1250 | type=int, 1251 | default=8000, 1252 | help="Port for WebSocket server (default: 8000)" 1253 | ) 1254 | args = parser.parse_args() 1255 | 1256 | main(transport=args.transport, host=args.host, port=args.port) ```