# Directory Structure ``` ├── filesystem_server │ ├── example_filesystem_server.py │ ├── filesystem_server_mcp.py │ └── test_filesystem_server.py ├── google_forms_mcp │ ├── __init__.py │ ├── google_forms_example_run.py │ ├── google_forms_server_mcp.py │ └── test_google_forms_server_mcp.py ├── markitdown_camel │ ├── camel_markitdown_client.py │ ├── how_to_run.md │ └── streamlit_markitdown_app.py ├── README.md ├── sql_server │ ├── sql_example_run.py │ ├── sql_mcp_test.py │ └── sql_server_mcp.py └── ssss.png ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # MCP-Servers This repo contains mcp servers for various use cases , binded with use cases with CAMEL-AI 1. [Filesystem Server](https://github.com/parthshr370/MCP-Servers/tree/main/filesystem_server) 2. [SQL Server](https://github.com/parthshr370/MCP-Servers/tree/main/sql_server) 3. [Markdown Server with Note taking usecase](https://github.com/parthshr370/Md-Notes-Buddy) and [Normal Doc to Md](https://github.com/parthshr370/MCP-Servers/tree/main/markitdown_camel) ``` -------------------------------------------------------------------------------- /google_forms_mcp/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /filesystem_server/test_filesystem_server.py: -------------------------------------------------------------------------------- ```python import asyncio # noqa: F401 import os import tempfile import pytest # Import the async tools you want to test. # Adjust the import path if necessary. from filesystem_server_mcp import ( list_directory, read_file, ) @pytest.mark.asyncio async def test_read_file_success(): """ Test that read_file returns the correct file contents when given a valid file. """ # Create a temporary file with known content. with tempfile.NamedTemporaryFile(mode="w+", delete=False) as tmp: tmp.write("Hello, Camel AI!\n") tmp_path = tmp.name try: # Call the read_file tool and remove trailing whitespace. result = await read_file(file_path=tmp_path) # Check that the result matches the content (newline removed by rstrip). assert result == "Hello, Camel AI!" finally: # Clean up the temporary file. os.remove(tmp_path) @pytest.mark.asyncio async def test_read_file_error(): """ Test that read_file returns an error message when the file does not exist. """ non_existent_file = "this_file_does_not_exist.txt" result = await read_file(file_path=non_existent_file) # Check that the error message is returned. assert "Error reading file" in result @pytest.mark.asyncio async def test_list_directory_success(): """ Test that list_directory returns the correct list of entries for a valid directory. """ # Create a temporary directory. with tempfile.TemporaryDirectory() as tmp_dir: # Create a few temporary files in the directory. file_names = ["file1.txt", "file2.txt", "file3.txt"] for name in file_names: file_path = os.path.join(tmp_dir, name) with open(file_path, "w") as f: f.write("Test content") # Call the list_directory tool. result = await list_directory(directory_path=tmp_dir) # Convert the newline-separated result into a list. entries = result.split("\n") # Check that every file we created is in the directory listing. for name in file_names: assert name in entries @pytest.mark.asyncio async def test_list_directory_error(): """ Test that list_directory returns an error message when the directory does not exist. """ non_existent_directory = "this_directory_does_not_exist" result = await list_directory(directory_path=non_existent_directory) assert "Error listing directory" in result ``` -------------------------------------------------------------------------------- /filesystem_server/filesystem_server_mcp.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= import os import asyncio # noqa: F401 from mcp.server.fastmcp import FastMCP from camel.logger import get_logger logger = get_logger(__name__) mcp = FastMCP("filesystem") @mcp.tool() async def read_file(file_path: str) -> str: r"""Reads the content of the file at the given file path. Args: file_path (str): The path to the file. Returns: str: The content of the file with trailing whitespace removed, or an error message if reading fails. """ logger.info(f"read_file triggered with file_path: {file_path}") try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() return content.rstrip() except Exception as e: return f"Error reading file '{file_path}': {e}" read_file.inputSchema = { "type": "object", "properties": { "file_path": { "type": "string", "title": "File Path", "description": "The path to the file to read. Default is 'README.md'." } }, "required": ["file_path"] } @mcp.tool() async def list_directory(directory_path: str) -> str: r"""Lists the contents of the specified directory. Args: directory_path (str): The path of the directory to list. Returns: str: A newline-separated string of the directory entries, or an error message if listing fails. """ logger.info(f"list_directory triggered with directory_path: {directory_path}") try: entries = os.listdir(directory_path) return "\n".join(entry.rstrip() for entry in entries) except Exception as e: return f"Error listing directory '{directory_path}': {e}" list_directory.inputSchema = { "type": "object", "properties": { "directory_path": { "type": "string", "title": "Directory Path", "description": "The directory path whose contents should be listed. Default is '.'." } }, "required": ["directory_path"] } def main(transport: str = "stdio"): r"""Runs the Filesystem MCP Server. This server provides filesystem-related functionalities via MCP. Args: transport (str): The transport mode ('stdio' or 'sse'). """ if transport == 'stdio': mcp.run(transport='stdio') elif transport == 'sse': mcp.run(transport='sse') else: print(f"Unknown transport mode: {transport}") if __name__ == "__main__": import sys transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio" main(transport_mode) ``` -------------------------------------------------------------------------------- /filesystem_server/example_filesystem_server.py: -------------------------------------------------------------------------------- ```python import asyncio import os import sys from pathlib import Path from dotenv import load_dotenv from camel.agents import ChatAgent from camel.models import ModelFactory from camel.toolkits import MCPToolkit from camel.types import ModelPlatformType from camel.toolkits.mcp_toolkit import _MCPServer load_dotenv() # Set your Anthropic API key (ensure this is valid). os.environ["ANTHROPIC_API_KEY"] = # anthropic api key defined 0 async def interactive_input_loop(agent: ChatAgent): loop = asyncio.get_event_loop() print("\nEntering interactive mode. Type 'exit' at any prompt to quit.") while True: choice = await loop.run_in_executor( None, input, "\nChoose an action (Type 'exit' to end loop or press Enter to use current directory):\n" "1. Read a file\n" "2. List a directory\nYour choice (1/2): " ) choice = choice.strip().lower() if choice == "exit": print("Exiting interactive mode.") break if choice == "1": file_path = await loop.run_in_executor( None, input, "Enter the file path to read (default: README.md): " ) file_path = file_path.strip() or "README.md" query = f"Use the read_file tool to display the content of {file_path}. Do not generate an answer from your internal knowledge." elif choice == "2": dir_path = await loop.run_in_executor( None, input, "Enter the directory path to list (default: .): " ) dir_path = dir_path.strip() or "." query = f"Call the list_directory tool to show me all files in {dir_path}. Do not answer directly." else: print("Invalid choice. Please enter 1 or 2.") continue response = await agent.astep(query) print(f"\nYour Query: {query}") print("Full Agent Response:") print(response.info) if response.msgs and response.msgs[0].content: print("Agent Output:") print(response.msgs[0].content.rstrip()) else: print("No output received.") async def main(server_transport: str = 'stdio'): if server_transport == 'stdio': # Assuming both files are in the same folder structure, determine the path to the server file. server_script_path = Path(__file__).resolve().parent / "filesystem_server_mcp.py" if not server_script_path.is_file(): print(f"Error: Server script not found at {server_script_path}") return # Create an _MCPServer instance for our filesystem server. server = _MCPServer( command_or_url=sys.executable, args=[str(server_script_path)] ) mcp_toolkit = MCPToolkit(servers=[server]) else: mcp_toolkit = MCPToolkit("tcp://localhost:5000") async with mcp_toolkit.connection() as toolkit: tools = toolkit.get_tools() sys_msg = ( "You are a helpful assistant. Always use the provided external tools for filesystem operations " "and answer filesystem-related questions using these tools." ) model = ModelFactory.create( model_platform=ModelPlatformType.ANTHROPIC, model_type="claude-3-7-sonnet-20250219", api_key=os.getenv("ANTHROPIC_API_KEY"), model_config_dict={"temperature": 0.8, "max_tokens": 4096}, ) camel_agent = ChatAgent( system_message=sys_msg, model=model, tools=tools, ) camel_agent.reset() camel_agent.memory.clear() await interactive_input_loop(camel_agent) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /markitdown_camel/how_to_run.md: -------------------------------------------------------------------------------- ```markdown # How to Run MarkItDown Integrations This guide explains how to set up and run the MarkItDown integrations created in this project: the command-line interface (CLI) powered by Camel AI, and the Streamlit web frontend. **Project Structure Overview:** ``` markitdown/ ├── packages/ │ ├── markitdown/ # Core library source │ │ └── src/ │ ├── markitdown-mcp/ # MCP Server source │ │ └── src/ │ └── ... ├── camel_markitdown_client.py # Camel AI client script (CLI mode) ├── streamlit_markitdown_app.py # Streamlit frontend script ├── how_to_run.md # This file └── ... (other project files like README.md, .git, etc.) ``` ## I. Prerequisites Before running either integration, ensure you have the following set up: 1. **Conda Environment:** It's highly recommended to use a dedicated Conda environment (like `kratos` used during development). Activate it: ```bash conda activate kratos ``` 2. **Install Base Dependencies:** Install the core `markitdown` library and its dependencies in editable mode. This allows using the local source code. ```bash # Navigate to the project root directory (e.g., ~/Downloads/markitdown/) cd /path/to/your/markitdown # Install the core library and all its optional features pip install -e './packages/markitdown[all]' # Install the MCP server package (needed by markitdown library) pip install -e './packages/markitdown-mcp' ``` 3. **API Keys:** * **Google Gemini:** The Camel AI client is configured to use Google Gemini. Set your API key as an environment variable: ```bash export GOOGLE_API_KEY="YOUR_GOOGLE_API_KEY_HERE" ``` * **MarkItDown Dependencies:** The underlying `markitdown` library might require other keys for specific conversions (e.g., Azure Document Intelligence, transcription services). Set these as environment variables if you plan to use those features. ## II. Running the Camel AI Client (CLI Mode) This mode uses a Large Language Model (LLM) agent (Gemini) to interact with the `markitdown` tool via the `markitdown-mcp` server, which runs in the background. 1. **Install Camel AI Dependencies:** ```bash # Install camel-ai with google support pip install "camel-ai[google]" ``` 2. **Ensure Prerequisites:** Make sure your Conda environment is active and your `GOOGLE_API_KEY` is exported (as described in Prerequisites). 3. **Run the Client Script:** Execute the client script from the project root directory: ```bash python camel_markitdown_client.py ``` 4. **Interaction:** * The script will start the background `markitdown-mcp` server automatically. * It will connect to the server and initialize the Gemini agent. * You will be prompted in the terminal to enter a URI (`http://`, `https://`, `file://`, `data:`). * Provide a URI (e.g., `https://www.google.com` or `/absolute/path/to/your/local/file.pdf`). Local paths starting with `/` will automatically be converted to `file://` URIs. * The agent will call the `convert_to_markdown` tool on the background server. * The resulting Markdown (or any error messages) will be printed to your terminal. * Type `exit` at the prompt to quit. ## III. Running the Streamlit Frontend (Web UI Mode) This mode provides a web interface to directly use the `markitdown` library for conversions without involving an LLM agent or the MCP server. 1. **Install Streamlit Dependency:** ```bash pip install streamlit ``` 2. **Ensure Prerequisites:** Make sure your Conda environment is active. While the Streamlit app doesn't directly use the Google API key, ensure any keys needed by `markitdown` itself for specific conversions are set. 3. **Run the Streamlit App:** Execute the following command from the project root directory: ```bash streamlit run streamlit_markitdown_app.py ``` *Note: This script is configured to automatically add the local `markitdown` source path to `sys.path`, so you should **not** need to set the `PYTHONPATH` environment variable manually for this script.* 4. **Interaction:** * Streamlit will provide a URL (usually `http://localhost:8501`). Open this in your web browser. * Select the input method: "Enter URL" or "Upload File". * Provide the URL or upload your desired file. * Click the "Convert to Markdown" button. * A progress bar will indicate activity. * The resulting Markdown will be displayed in a text area. * A "Download Markdown (.md)" button will appear, allowing you to save the output. * Check the terminal where you ran `streamlit run` for log messages. ``` -------------------------------------------------------------------------------- /sql_server/sql_mcp_test.py: -------------------------------------------------------------------------------- ```python import asyncio # noqa: F401 import os import tempfile import json import sqlite3 import pytest # Import the async tools to test from sql_server_mcp import ( execute_query, list_tables, create_database, describe_table, ) @pytest.mark.asyncio async def test_create_database(): """ Test that create_database creates a valid SQLite database. """ # Create a temporary file name with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: tmp_path = tmp.name try: # Delete the file as we just want the name os.unlink(tmp_path) # Call the create_database tool result = await create_database(db_path=tmp_path) result_json = json.loads(result) # Check that creation was successful assert result_json["status"] == "success" # Verify the file exists assert os.path.exists(tmp_path) # Verify it's a valid SQLite database conn = sqlite3.connect(tmp_path) conn.close() finally: # Clean up the temporary file if os.path.exists(tmp_path): os.remove(tmp_path) @pytest.mark.asyncio async def test_execute_query(): """ Test that execute_query can create a table, insert data, and query it. """ # Create a temporary database with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: tmp_path = tmp.name try: # Set up a test database conn = sqlite3.connect(tmp_path) cursor = conn.cursor() cursor.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)") cursor.execute("INSERT INTO test_table (id, name) VALUES (1, 'Test Name')") conn.commit() conn.close() # Test SELECT query query = "SELECT * FROM test_table" result = await execute_query(connection_string=tmp_path, query=query) result_json = json.loads(result) # Check if the result contains our test data assert len(result_json) == 1 assert result_json[0]["id"] == 1 assert result_json[0]["name"] == "Test Name" # Test INSERT query insert_query = "INSERT INTO test_table (id, name) VALUES (2, 'Second Test')" insert_result = await execute_query(connection_string=tmp_path, query=insert_query) insert_result_json = json.loads(insert_result) # Check if the row was inserted assert insert_result_json["affected_rows"] == 1 # Verify the insert by querying again query = "SELECT * FROM test_table WHERE id = 2" result = await execute_query(connection_string=tmp_path, query=query) result_json = json.loads(result) assert len(result_json) == 1 assert result_json[0]["id"] == 2 assert result_json[0]["name"] == "Second Test" finally: # Clean up the temporary file if os.path.exists(tmp_path): os.remove(tmp_path) @pytest.mark.asyncio async def test_list_tables(): """ Test that list_tables returns the correct list of tables in a database. """ # Create a temporary database with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: tmp_path = tmp.name try: # Set up a test database with multiple tables conn = sqlite3.connect(tmp_path) cursor = conn.cursor() cursor.execute("CREATE TABLE table1 (id INTEGER PRIMARY KEY, value TEXT)") cursor.execute("CREATE TABLE table2 (id INTEGER PRIMARY KEY, value TEXT)") conn.commit() conn.close() # Test list_tables result = await list_tables(connection_string=tmp_path) result_json = json.loads(result) # Check if both tables are listed assert "tables" in result_json assert "table1" in result_json["tables"] assert "table2" in result_json["tables"] finally: # Clean up the temporary file if os.path.exists(tmp_path): os.remove(tmp_path) @pytest.mark.asyncio async def test_describe_table(): """ Test that describe_table returns the correct schema for a table. """ # Create a temporary database with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp: tmp_path = tmp.name try: # Set up a test database with a table that has various column types conn = sqlite3.connect(tmp_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE test_table ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER, salary REAL, hire_date TEXT ) """) conn.commit() conn.close() # Test describe_table result = await describe_table(connection_string=tmp_path, table_name="test_table") result_json = json.loads(result) # Check if the schema is correct assert result_json["table"] == "test_table" # Check column details columns = {col["name"]: col for col in result_json["columns"]} assert "id" in columns assert columns["id"]["type"] == "INTEGER" assert columns["id"]["pk"] == 1 assert "name" in columns assert columns["name"]["type"] == "TEXT" assert columns["name"]["notnull"] == 1 assert "age" in columns assert columns["age"]["type"] == "INTEGER" assert "salary" in columns assert columns["salary"]["type"] == "REAL" assert "hire_date" in columns assert columns["hire_date"]["type"] == "TEXT" finally: # Clean up the temporary file if os.path.exists(tmp_path): os.remove(tmp_path) ``` -------------------------------------------------------------------------------- /sql_server/sql_example_run.py: -------------------------------------------------------------------------------- ```python import asyncio import os import sys from pathlib import Path from dotenv import load_dotenv from camel.agents import ChatAgent from camel.models import ModelFactory from camel.toolkits import MCPToolkit from camel.types import ModelPlatformType from camel.toolkits.mcp_toolkit import _MCPServer # Load environment variables from .env file load_dotenv() # Set your Anthropic API key (ensure this is valid in your .env file) os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") # Create a sample database for demonstration async def create_sample_database(): """Create a sample SQLite database with some data for demonstration.""" import sqlite3 # Create a temporary database in the current directory db_path = "sample.db" # If database already exists, remove it to start fresh if os.path.exists(db_path): os.remove(db_path) # Create the database and add sample tables and data conn = sqlite3.connect(db_path) cursor = conn.cursor() # Create employees table cursor.execute(""" CREATE TABLE employees ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, department TEXT, salary REAL, hire_date TEXT ) """) # Insert sample employee data employees = [ (1, 'John Doe', 'Engineering', 85000.00, '2020-01-15'), (2, 'Jane Smith', 'Marketing', 75000.00, '2019-05-20'), (3, 'Bob Johnson', 'Engineering', 95000.00, '2018-11-10'), (4, 'Alice Brown', 'HR', 65000.00, '2021-03-05'), (5, 'Charlie Davis', 'Engineering', 90000.00, '2020-08-12') ] cursor.executemany("INSERT INTO employees VALUES (?, ?, ?, ?, ?)", employees) # Create departments table cursor.execute(""" CREATE TABLE departments ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, budget REAL, location TEXT ) """) # Insert sample department data departments = [ (1, 'Engineering', 1000000.00, 'Building A'), (2, 'Marketing', 500000.00, 'Building B'), (3, 'HR', 300000.00, 'Building A'), (4, 'Finance', 600000.00, 'Building C') ] cursor.executemany("INSERT INTO departments VALUES (?, ?, ?, ?)", departments) # Commit changes and close connection conn.commit() conn.close() print(f"Sample database created at: {db_path}") return db_path # Interactive mode function to chat with the agent async def interactive_input_loop(agent: ChatAgent, db_path: str): loop = asyncio.get_event_loop() print("\n==== SQL Assistant Interactive Mode ====") print("Type 'exit' at any prompt to quit.") print(f"\nUsing sample database at: {db_path}") print("\nSample queries you can try:") print("- Show me all tables in sample.db") print("- What columns are in the employees table in sample.db?") print("- List all employees in the Engineering department") print("- What is the average salary by department?") print("- How many employees are in each department?") print("- Find the employee with the highest salary") print("- Add a new employee named Michael Wilson to Finance with salary 82000") print("======================================") while True: query = await loop.run_in_executor( None, input, "\nEnter your query (or type 'exit' to quit): " ) if query.lower() == 'exit': print("Exiting interactive mode.") break print("\nProcessing query...") response = await agent.astep(query) print("\nAgent Response:") if response.msgs and response.msgs[0].content: print(response.msgs[0].content.rstrip()) else: print("No output received.") # Main function to run the entire example async def main(server_transport: str = 'stdio'): # First create a sample database db_path = await create_sample_database() if server_transport == 'stdio': # Determine the path to the server file server_script_path = Path(__file__).resolve().parent / "sql_server_mcp.py" if not server_script_path.is_file(): print(f"Error: Server script not found at {server_script_path}") return # Create an _MCPServer instance for our SQL server server = _MCPServer( command_or_url=sys.executable, args=[str(server_script_path)] ) mcp_toolkit = MCPToolkit(servers=[server]) else: mcp_toolkit = MCPToolkit("tcp://localhost:5000") async with mcp_toolkit.connection() as toolkit: tools = toolkit.get_tools() sys_msg = ( "You are a helpful SQL assistant. Use the provided external tools for database operations. " "Always use the tools to query the database rather than answering from your general knowledge. " f"The sample database is at '{db_path}'. It contains tables for employees and departments. " "When a user asks a question about the database, ALWAYS explicitly include the database path " f"'{db_path}' in your tool calls. First list the tables to understand the schema, " "then use describe_table to see column details before querying." ) model = ModelFactory.create( model_platform=ModelPlatformType.ANTHROPIC, model_type="claude-3-7-sonnet-20250219", api_key=os.getenv("ANTHROPIC_API_KEY"), model_config_dict={"temperature": 0.5, "max_tokens": 4096}, ) camel_agent = ChatAgent( system_message=sys_msg, model=model, tools=tools, ) camel_agent.reset() camel_agent.memory.clear() await interactive_input_loop(camel_agent, db_path) # Clean up the sample database after we're done if os.path.exists(db_path): os.remove(db_path) print(f"\nRemoved sample database: {db_path}") # Entry point if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /markitdown_camel/camel_markitdown_client.py: -------------------------------------------------------------------------------- ```python import asyncio import os import sys from pathlib import Path from dotenv import load_dotenv from camel.agents import ChatAgent from camel.models import ModelFactory from camel.toolkits import MCPToolkit # camels implementation of the mcp protocol from camel.types import ModelPlatformType from camel.toolkits.mcp_toolkit import MCPClient # Ensure your Anthropic API key is set in your environment variables # os.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY" # you can also use your gemini api key here # Starting the interactive input loop for the camel ai client async def interactive_input_loop(agent: ChatAgent): loop = asyncio.get_event_loop() print("\nEntering interactive mode. Type 'exit' at any prompt to quit.") # exit conditions while True: uri = await loop.run_in_executor( None, input, "\nEnter the URI (http:, https:, file:, data:) to convert to Markdown (or type 'exit'): " ) uri = uri.strip() if uri.lower() == "exit": print("Exiting interactive mode.") break if not uri: print("URI cannot be empty.") continue # Prepend file:// scheme if it looks like a local absolute path if uri.startswith('/') and not uri.startswith('file://'): print(f"Detected local path, prepending 'file://' to URI: {uri}") formatted_uri = f"file://{uri}" else: formatted_uri = uri # The prompt clearly tells the agent which tool to use and what the parameter is. query = f"Use the convert_to_markdown tool to convert the content at the URI '{formatted_uri}' to Markdown. Do not generate an answer from your internal knowledge, just show the Markdown output from the tool." print(f"\nSending query to agent: {query}") response = await agent.astep(query) print("\nFull Agent Response Info:") print(response.info) # Shows tool calls and parameters # Check for direct message output first if response.msgs and response.msgs[0].content: print("\nAgent Output (Markdown):") print("-" * 20) print(response.msgs[0].content.rstrip()) print("-" * 20) # If no direct message, check if the tool call info is available in response.info elif 'tool_calls' in response.info and response.info['tool_calls']: print("\nTool Call Response (Raw from info):") print("-" * 20) found_output = False # Iterate through the tool calls list in response.info for tool_call in response.info['tool_calls']: # Camel AI structure might place output here (adjust key if needed based on ToolCallingRecord structure) if hasattr(tool_call, 'result') and tool_call.result: print(str(tool_call.result).rstrip()) found_output = True # Add other potential output locations if needed if not found_output: print("(No tool result found in tool call info)") print("-" * 20) else: print("No output message or tool output received.") # main funct async def main(server_transport: str = 'stdio'): if server_transport != 'stdio': print("Error: This client currently only supports 'stdio' transport.") return print("Starting MarkItDown MCP server in stdio mode...") server_command = sys.executable server_args = ["-m", "markitdown_mcp"] # Get the root directory of the script (assuming it's in the project root) project_root = Path(__file__).resolve().parent # Create an MCPClient instance, adding the cwd server = MCPClient( command_or_url=server_command, args=server_args, # Set the working directory for the server process env={"PYTHONPATH": str(project_root / "packages" / "markitdown-mcp" / "src") + os.pathsep + str(project_root / "packages" / "markitdown" / "src") + os.pathsep + os.environ.get("PYTHONPATH", ""), "CWD": str(project_root)}, # Optional: timeout=None ) # Pass the MCPClient object in a list mcp_toolkit = MCPToolkit(servers=[server]) print("Connecting to MCP server...") async with mcp_toolkit.connection() as toolkit: print("Connection successful. Retrieving tools...") tools = toolkit.get_tools() if not tools: print("Error: No tools retrieved from the server. Make sure the server started correctly and defined tools.") return print(f"Tools retrieved: {[tool.func.__name__ for tool in tools]}") # Check if the required tool is available using func.__name__ if not any(tool.func.__name__ == "convert_to_markdown" for tool in tools): print("Error: 'convert_to_markdown' tool not found on the server.") return sys_msg = ( "You are a helpful assistant. You have access to an external tool called 'convert_to_markdown' which takes a single argument, 'uri'. " "When asked to convert a URI to Markdown, you MUST use this tool by providing the URI to the 'uri' parameter. " "Provide ONLY the Markdown output received from the tool, without any additional explanation or introductory text." ) # Ensure GOOGLE_API_KEY is set in environment variables # print(f"DEBUG: Value of GOOGLE_API_KEY from os.getenv: {os.getenv('GOOGLE_API_KEY')}") api_key = os.getenv("GOOGLE_API_KEY") # Check for GOOGLE_API_KEY if not api_key: print("Error: GOOGLE_API_KEY environment variable not set.") # Update error message print("Please set it before running the client.") return # Configure the model for Google Gemini # You might need to install the camel-google extra: pip install camel-ai[google] try: model = ModelFactory.create( model_platform=ModelPlatformType.GEMINI, # Change platform # Set the desired Gemini model model_type="gemini-2.5-pro-preview-03-25", # Using 1.5 Pro as 2.5 is not yet a valid identifier in CAMEL AI api_key=api_key, model_config_dict={"temperature": 0.0, "max_tokens": 8192}, # Adjust config if needed ) except Exception as e: print(f"Error creating model: {e}") print("Ensure you have the necessary dependencies installed (e.g., `pip install camel-ai[google]`)") return camel_agent = ChatAgent( system_message=sys_msg, model=model, tools=tools, ) camel_agent.reset() camel_agent.memory.clear() await interactive_input_loop(camel_agent) if __name__ == "__main__": # This client only supports stdio for now asyncio.run(main(server_transport='stdio')) ``` -------------------------------------------------------------------------------- /markitdown_camel/streamlit_markitdown_app.py: -------------------------------------------------------------------------------- ```python import streamlit as st import os import tempfile from pathlib import Path import re import sys import logging # Import logging # --- Basic Logging Configuration --- # logging.basicConfig( level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL) format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr, # Ensure logs go to stderr (terminal) ) # --- Dynamically add local package path --- # # Calculate the path to the 'src' directory of the local 'markitdown' package _PROJECT_ROOT = Path(__file__).resolve().parent _MARKITDOWN_SRC_PATH = _PROJECT_ROOT / "packages" / "markitdown" / "src" # Add the path to sys.path if it's not already there if str(_MARKITDOWN_SRC_PATH) not in sys.path: sys.path.insert(0, str(_MARKITDOWN_SRC_PATH)) print(f"DEBUG: Added '{_MARKITDOWN_SRC_PATH}' to sys.path") # Attempt to import the core MarkItDown class try: from markitdown import MarkItDown from markitdown._exceptions import MarkItDownException except ImportError as e: logging.error(f"Failed to import markitdown: {e}", exc_info=True) st.error( f"Failed to import the `markitdown` library or its exceptions. " f"Error: {e}\n" "Please ensure it is installed correctly (e.g., `pip install -e ./packages/markitdown`) " f"and that the path '{_MARKITDOWN_SRC_PATH}' exists and is accessible." ) st.stop() # --- Page Configuration --- st.set_page_config( page_title="MarkItDown Converter", page_icon=":memo:", layout="wide", ) # --- Session State Initialization --- if 'markdown_output' not in st.session_state: st.session_state.markdown_output = None if 'error_message' not in st.session_state: st.session_state.error_message = None if 'input_uri' not in st.session_state: st.session_state.input_uri = "" # --- Helper Functions --- def is_valid_uri_scheme(uri): """Basic check for supported URI schemes.""" return uri.startswith(("http://", "https://", "file://", "data:")) def clean_filename(filename): """Remove invalid characters for filenames.""" # Remove URL scheme if present name = re.sub(r'^(http|https|file|data):[\/]*', '', filename) # Replace problematic characters name = re.sub(r'[\/:*?\"<>|%#&.]+', '_', name) # Limit length return name[:100] or "converted" # --- UI Layout --- st.title("📝 MarkItDown Content Converter") st.markdown( "Convert content from various sources (URLs or uploaded files) into Markdown." ) st.divider() # --- Input Method Selection --- input_method = st.radio( "Select Input Method:", ("Enter URL", "Upload File"), horizontal=True, key="input_method_radio", help="Choose whether to provide a web URL or upload a local file." ) col1, col2 = st.columns([3, 1]) with col1: if input_method == "Enter URL": st.session_state.input_uri = st.text_input( "Enter URL (http://, https://, data:)", value=st.session_state.input_uri, placeholder="e.g., https://www.example.com or data:text/plain;base64,...", key="url_input", ) uploaded_file = None else: # Upload File uploaded_file = st.file_uploader( "Upload a file (will be converted to a `file://` URI)", type=None, # Allow any file type MarkItDown might support key="file_uploader", ) st.session_state.input_uri = "" # Clear URI input if file is chosen with col2: st.markdown(" ", unsafe_allow_html=True) # Vertical alignment hack st.markdown(" ", unsafe_allow_html=True) convert_button = st.button("Convert to Markdown", type="primary", use_container_width=True) # --- Conversion Logic --- if convert_button: logging.info("'Convert to Markdown' button clicked.") st.session_state.markdown_output = None # Clear previous output st.session_state.error_message = None # Clear previous error final_uri = None tmp_file_path = None # Ensure tmp_file_path is defined if uploaded_file is not None: try: # Save uploaded file temporarily to get a path # Use a consistent way to create temp files with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file: tmp_file.write(uploaded_file.getvalue()) tmp_file_path = tmp_file.name logging.info(f"Uploaded file '{uploaded_file.name}' saved to temporary path: {tmp_file_path}") final_uri = Path(tmp_file_path).as_uri() logging.info(f"Processing uploaded file as URI: {final_uri}") st.info(f"Processing uploaded file as: {final_uri}") except Exception as e: logging.error(f"Error saving uploaded file: {e}", exc_info=True) st.error(f"Error handling uploaded file: {e}") final_uri = None # Prevent further processing elif st.session_state.input_uri: final_uri = st.session_state.input_uri.strip() if not is_valid_uri_scheme(final_uri): # Attempt to fix common local path issue if final_uri.startswith('/'): st.warning(f"Assuming '{final_uri}' is a local path. Prepending 'file://'.") final_uri = f"file://{final_uri}" else: st.error(f"Invalid or unsupported URI scheme in '{final_uri}'. Must start with http://, https://, file://, or data:") final_uri = None logging.info(f"Processing input URI: {final_uri}") # Log the final URI used else: logging.warning("Conversion attempt with no input URI or file.") st.warning("Please enter a URL or upload a file.") if final_uri: progress_bar = st.progress(0, text="Starting conversion...") try: logging.info(f"Initializing MarkItDown converter for URI: {final_uri}") md_converter = MarkItDown() progress_bar.progress(25, text=f"Converting URI: {final_uri[:100]}...") # Perform the conversion logging.info(f"Calling convert_uri for: {final_uri}") result = md_converter.convert_uri(final_uri) logging.info(f"convert_uri successful for: {final_uri}") progress_bar.progress(100, text="Conversion successful!") st.session_state.markdown_output = result.markdown st.success("Content successfully converted to Markdown!") except MarkItDownException as e: logging.error(f"MarkItDown Conversion Error for URI {final_uri}: {e}", exc_info=True) st.session_state.error_message = f"MarkItDown Conversion Error: {e}" st.error(st.session_state.error_message) progress_bar.progress(100, text="Conversion failed.") except Exception as e: logging.error(f"Unexpected Error during conversion for URI {final_uri}: {e}", exc_info=True) st.session_state.error_message = f"An unexpected error occurred: {e}" st.error(st.session_state.error_message) progress_bar.progress(100, text="Conversion failed.") finally: # Clean up temporary file if created if tmp_file_path and os.path.exists(tmp_file_path): try: os.remove(tmp_file_path) logging.info(f"Cleaned up temporary file: {tmp_file_path}") except Exception as e: logging.error(f"Error removing temporary file {tmp_file_path}: {e}", exc_info=True) # Remove progress bar after completion/error progress_bar.empty() st.divider() # --- Output Display and Download --- st.subheader("Output") if st.session_state.error_message and not st.session_state.markdown_output: # Show error prominently if conversion failed and there's no output st.error(st.session_state.error_message) elif st.session_state.markdown_output: st.text_area( "Generated Markdown", value=st.session_state.markdown_output, height=400, key="markdown_display", help="The Markdown generated from the source." ) # Determine a sensible filename download_filename_base = "converted_markdown" if st.session_state.input_uri: download_filename_base = clean_filename(st.session_state.input_uri) elif uploaded_file: download_filename_base = clean_filename(uploaded_file.name) st.download_button( label="Download Markdown (.md)", data=st.session_state.markdown_output, file_name=f"{download_filename_base}.md", mime="text/markdown", key="download_button", use_container_width=True, ) else: st.info("Enter a URL or upload a file and click 'Convert to Markdown' to see the output here.") ``` -------------------------------------------------------------------------------- /sql_server/sql_server_mcp.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= import os import asyncio # noqa: F401 import sqlite3 import json import re # Added for table name validation from mcp.server.fastmcp import FastMCP from camel.logger import get_logger from pathlib import Path logger = get_logger(__name__) mcp = FastMCP("sqldb") @mcp.tool() async def execute_query(connection_string: str, query: str) -> str: r"""Executes the SQL query on the given database. Args: connection_string (str): The connection string or path to the SQLite database. query (str): The SQL query to execute. Returns: str: The result of the query as a JSON string, or an error message if execution fails. """ logger.info(f"execute_query triggered with connection_string: {connection_string}") # For security reasons, don't log the full query in production logger.info(f"Query starts with: {query[:20]}...") conn = None try: conn = sqlite3.connect(connection_string) cursor = conn.cursor() # Execute the query cursor.execute(query) # Check if this is a SELECT query (has results to fetch) if query.strip().upper().startswith("SELECT"): # Get column names from cursor description columns = [desc[0] for desc in cursor.description] # Fetch results and format as a list of dictionaries results = [] for row in cursor.fetchall(): results.append(dict(zip(columns, row))) return json.dumps({"status": "success", "data": results}, indent=2) else: # For INSERT, UPDATE, DELETE, etc. conn.commit() affected_rows = cursor.rowcount return json.dumps({"status": "success", "affected_rows": affected_rows}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error executing SQL query: {str(e)}"}, indent=2) finally: if conn: conn.close() execute_query.inputSchema = { "type": "object", "properties": { "connection_string": { "type": "string", "title": "Connection String", "description": "The connection string or path to the SQLite database." }, "query": { "type": "string", "title": "SQL Query", "description": "The SQL query to execute." } }, "required": ["connection_string", "query"] } @mcp.tool() async def list_tables(connection_string: str) -> str: r"""Lists all tables in the specified database. Args: connection_string (str): The connection string or path to the SQLite database. Returns: str: A JSON string containing the list of tables, or an error message if listing fails. """ logger.info(f"list_tables triggered with connection_string: {connection_string}") conn = None try: conn = sqlite3.connect(connection_string) cursor = conn.cursor() # Query to get all table names in SQLite cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") # Fetch and format results tables = [row[0] for row in cursor.fetchall()] return json.dumps({"status": "success", "tables": tables}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error listing tables: {str(e)}"}, indent=2) finally: if conn: conn.close() list_tables.inputSchema = { "type": "object", "properties": { "connection_string": { "type": "string", "title": "Connection String", "description": "The connection string or path to the SQLite database." } }, "required": ["connection_string"] } @mcp.tool() async def create_database(db_path: str) -> str: r"""Creates a new SQLite database at the specified path. Args: db_path (str): The path where the new database should be created. Returns: str: A success message or an error message if creation fails. """ logger.info(f"create_database triggered with db_path: {db_path}") conn = None try: # Check if file already exists if os.path.exists(db_path): return json.dumps({"status": "exists", "message": f"Database already exists at {db_path}"}, indent=2) conn = sqlite3.connect(db_path) conn.close() return json.dumps({"status": "success", "message": f"Database created at {db_path}"}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error creating database: {str(e)}"}, indent=2) finally: if conn: conn.close() create_database.inputSchema = { "type": "object", "properties": { "db_path": { "type": "string", "title": "Database Path", "description": "The path where the new SQLite database should be created." } }, "required": ["db_path"] } @mcp.tool() async def describe_table(connection_string: str, table_name: str) -> str: r"""Describes the schema of a specified table. Args: connection_string (str): The connection string or path to the SQLite database. table_name (str): The name of the table to describe. Returns: str: A JSON string containing the table schema, or an error message if the operation fails. """ logger.info(f"describe_table triggered with connection_string: {connection_string}, table_name: {table_name}") # Validate table_name to be a simple identifier to reduce SQL injection risk with PRAGMA if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) conn = None try: conn = sqlite3.connect(connection_string) cursor = conn.cursor() # PRAGMA statements do not support placeholders for table names. cursor.execute(f"PRAGMA table_info({table_name});") columns = [] for row in cursor.fetchall(): columns.append({ "cid": row[0], "name": row[1], "type": row[2], "notnull": row[3], "default_value": row[4], "pk": row[5] }) if not columns: return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found or has no columns."}, indent=2) return json.dumps({"status": "success", "table": table_name, "columns": columns}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error describing table '{table_name}': {str(e)}"}, indent=2) finally: if conn: conn.close() describe_table.inputSchema = { "type": "object", "properties": { "connection_string": { "type": "string", "title": "Connection String", "description": "The connection string or path to the SQLite database." }, "table_name": { "type": "string", "title": "Table Name", "description": "The name of the table to describe." } }, "required": ["connection_string", "table_name"] } @mcp.tool() async def delete_database(db_path: str) -> str: r"""Deletes an existing SQLite database file at the specified path. Args: db_path (str): The path to the SQLite database file to be deleted. Returns: str: A JSON string indicating success or an error message if deletion fails. """ logger.info(f"delete_database triggered with db_path: {db_path}") try: if not os.path.exists(db_path): return json.dumps({"status": "not_found", "message": f"Database file not found at {db_path}"}, indent=2) os.remove(db_path) return json.dumps({"status": "success", "message": f"Database deleted from {db_path}"}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error deleting database: {str(e)}"}, indent=2) delete_database.inputSchema = { "type": "object", "properties": { "db_path": { "type": "string", "title": "Database Path", "description": "The path to the SQLite database file to be deleted." } }, "required": ["db_path"] } @mcp.tool() async def delete_table(connection_string: str, table_name: str) -> str: r"""Deletes a specified table from the SQLite database. Args: connection_string (str): The connection string or path to the SQLite database. table_name (str): The name of the table to delete. Returns: str: A JSON string indicating success or failure. """ logger.info(f"delete_table triggered for table: {table_name} in db: {connection_string}") if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) conn = None try: conn = sqlite3.connect(connection_string) cursor = conn.cursor() # Check if table exists first for a more specific message cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,)) if cursor.fetchone() is None: return json.dumps({"status": "not_found", "message": f"Table '{table_name}' not found. No action taken."}, indent=2) # Table name is validated, safe to use in f-string for DROP TABLE cursor.execute(f"DROP TABLE {table_name};") conn.commit() return json.dumps({"status": "success", "message": f"Table '{table_name}' deleted successfully."}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"Error deleting table '{table_name}': {str(e)}"}, indent=2) finally: if conn: conn.close() delete_table.inputSchema = { "type": "object", "properties": { "connection_string": { "type": "string", "title": "Connection String", "description": "The connection string or path to the SQLite database." }, "table_name": { "type": "string", "title": "Table Name", "description": "The name of the table to delete." } }, "required": ["connection_string", "table_name"] } @mcp.tool() async def get_table_row_count(connection_string: str, table_name: str) -> str: r"""Gets the row count for a specified table. Args: connection_string (str): The connection string or path to the SQLite database. table_name (str): The name of the table. Returns: str: A JSON string containing the table name and its row count, or an error message. """ logger.info(f"get_table_row_count triggered for table: {table_name} in db: {connection_string}") if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name): return json.dumps({"status": "error", "message": f"Invalid table name: '{table_name}'. Must be a valid SQL identifier."}, indent=2) conn = None try: conn = sqlite3.connect(connection_string) cursor = conn.cursor() # Table name is validated, safe to use in f-string for SELECT COUNT(*) cursor.execute(f"SELECT COUNT(*) FROM {table_name};") count = cursor.fetchone()[0] return json.dumps({"status": "success", "table": table_name, "row_count": count}, indent=2) except sqlite3.OperationalError as e: # Catches errors like "no such table" return json.dumps({"status": "error", "message": f"Error getting row count for table '{table_name}': {str(e)}. Ensure table exists."}, indent=2) except Exception as e: return json.dumps({"status": "error", "message": f"An unexpected error occurred while getting row count for table '{table_name}': {str(e)}"}, indent=2) finally: if conn: conn.close() get_table_row_count.inputSchema = { "type": "object", "properties": { "connection_string": { "type": "string", "title": "Connection String", "description": "The connection string or path to the SQLite database." }, "table_name": { "type": "string", "title": "Table Name", "description": "The name of the table to get row count for." } }, "required": ["connection_string", "table_name"] } async def init_db(): db_path = Path.cwd() / "your_sqlite_database.db" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Create a sample table cursor.execute(''' CREATE TABLE IF NOT EXISTS sample_table ( id INTEGER PRIMARY KEY, name TEXT NOT NULL ) ''') conn.commit() conn.close() async def main(): await init_db() # Keep the server running while True: await asyncio.sleep(1) def main(transport: str = "stdio"): r"""Runs the SQL MCP Server. This server provides SQL database functionalities via MCP. Args: transport (str): The transport mode ('stdio' or 'sse'). """ if transport == 'stdio': mcp.run(transport='stdio') elif transport == 'sse': mcp.run(transport='sse') else: print(f"Unknown transport mode: {transport}") if __name__ == "__main__": import sys transport_mode = sys.argv[1] if len(sys.argv) > 1 else "stdio" asyncio.run(main(transport_mode)) ``` -------------------------------------------------------------------------------- /google_forms_mcp/google_forms_example_run.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= """ Example run script for the Google Forms MCP Server This script demonstrates creating and managing Google Forms through the MCP Server. It performs a series of operations that test all the functionality of the server. """ import asyncio import os import sys import json from pathlib import Path from dotenv import load_dotenv import argparse from typing import Optional from camel.agents import ChatAgent from camel.models import ModelFactory from camel.toolkits import MCPToolkit from camel.types import ModelPlatformType # Load environment variables from .env file load_dotenv() # Set your Anthropic API key (ensure this is valid in your .env file) os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") # Check if the necessary credentials file exists def check_credentials(): if not os.path.exists('credentials.json'): print("Error: credentials.json not found!") print("Please follow these steps to set up Google Forms API access:") print("1. Go to https://console.developers.google.com/") print("2. Create a new project or select an existing one") print("3. Enable the Google Forms API, Google Drive API, and Google Sheets API") print("4. Create credentials (OAuth client ID) for a desktop application") print("5. Download the credentials JSON file and save it as 'credentials.json' in this directory") return False return True # Function to print nicely formatted JSON responses def print_response(title, response): try: json_data = json.loads(response) print(f"\n{title}:") print(json.dumps(json_data, indent=2)) except: print(f"\n{title}:") print(response) # Automated test run that demonstrates all features async def run_automated_test(tools): print("\n==== Google Forms MCP Server Automated Test ====") print("This test will demonstrate all functionalities of the Google Forms MCP Server") try: # Step 1: Create a new form print("\n--- Step 1: Creating a new form ---") create_form_response = await tools["create_form"]( title="Customer Satisfaction Survey", description="Help us improve our services by providing your feedback" ) print_response("Form created", create_form_response) # Extract form ID for further operations form_data = json.loads(create_form_response) form_id = form_data["form_id"] # Step 2: Modify form settings print("\n--- Step 2: Modifying form settings ---") settings_response = await tools["modify_form_settings"]( form_id=form_id, collect_email=True, limit_responses=True ) print_response("Form settings updated", settings_response) # Step 3: Add a section print("\n--- Step 3: Adding a section ---") section_response = await tools["add_section"]( form_id=form_id, title="About Your Experience", description="Please tell us about your recent experience with our product/service" ) print_response("Section added", section_response) # Step 4: Add multiple choice question print("\n--- Step 4: Adding a multiple choice question ---") mc_response = await tools["add_multiple_choice"]( form_id=form_id, question_text="How would you rate our service?", choices=["Excellent", "Good", "Average", "Poor", "Very Poor"], required=True, help_text="Please select one option" ) print_response("Multiple choice question added", mc_response) # Step 5: Add a checkbox question print("\n--- Step 5: Adding a checkbox question ---") checkbox_response = await tools["add_checkboxes"]( form_id=form_id, question_text="Which aspects of our service did you appreciate?", choices=["Responsiveness", "Quality", "Value for money", "Customer support", "Other"], required=False, help_text="Select all that apply" ) print_response("Checkbox question added", checkbox_response) # Step 6: Add a dropdown question print("\n--- Step 6: Adding a dropdown question ---") dropdown_response = await tools["add_dropdown"]( form_id=form_id, question_text="How often do you use our service?", choices=["Daily", "Weekly", "Monthly", "Quarterly", "Yearly", "First time"], required=True ) print_response("Dropdown question added", dropdown_response) # Step 7: Add a short answer question print("\n--- Step 7: Adding a short answer question ---") short_answer_response = await tools["add_short_answer"]( form_id=form_id, question_text="What is your customer ID?", required=False, help_text="Please enter your customer ID if you have one" ) print_response("Short answer question added", short_answer_response) # Step 8: Add a paragraph question print("\n--- Step 8: Adding a paragraph question ---") paragraph_response = await tools["add_paragraph"]( form_id=form_id, question_text="Do you have any suggestions for improvement?", required=False, help_text="Please share any ideas on how we can serve you better" ) print_response("Paragraph question added", paragraph_response) # Step 9: Add a file upload question print("\n--- Step 9: Adding a file upload question ---") file_upload_response = await tools["add_file_upload"]( form_id=form_id, question_text="Would you like to upload any relevant documents?", required=False, help_text="You can upload screenshots or other documents" ) print_response("File upload question added", file_upload_response) # Step 10: Add another section print("\n--- Step 10: Adding another section ---") section2_response = await tools["add_section"]( form_id=form_id, title="Additional Information", description="Help us personalize our services" ) print_response("Another section added", section2_response) # Step 11: List all forms print("\n--- Step 11: Listing all forms ---") list_forms_response = await tools["list_forms"]() print_response("Forms list", list_forms_response) # Step 12: Export responses (might not have any responses yet) print("\n--- Step 12: Setting up response export ---") export_response = await tools["export_responses"]( form_id=form_id, format="sheets" ) print_response("Export setup", export_response) # Step 13: Try to get responses (likely empty, but tests the functionality) print("\n--- Step 13: Getting responses ---") responses = await tools["get_responses"]( form_id=form_id ) print_response("Form responses", responses) print("\n=== Test completed successfully! ===") print(f"Created form can be viewed at: {form_data['view_url']}") print(f"Created form can be edited at: {form_data['edit_url']}") except Exception as e: print(f"\nError during test: {str(e)}") raise # Interactive mode function to chat with the agent async def interactive_input_loop(agent: ChatAgent): loop = asyncio.get_event_loop() print("\n==== Google Forms Assistant Interactive Mode ====") print("Type 'exit' at any prompt to quit.") print("\nSample queries you can try:") print("- Create a new feedback form") print("- Add a customer satisfaction survey with multiple choice questions") print("- List all my forms") print("- Create a job application form with sections for personal info, education, and experience") print("- Export responses from a form to CSV") print("======================================") while True: query = await loop.run_in_executor( None, input, "\nEnter your query (or type 'exit' to quit): " ) if query.lower() == 'exit': print("Exiting interactive mode.") break print("\nProcessing query...") response = await agent.astep(query) print("\nAgent Response:") if response.msgs and response.msgs[0].content: print(response.msgs[0].content.rstrip()) else: print("No output received.") # Main function to run the entire example async def main(server_transport: str = 'stdio', mode: str = 'automated', server_url: Optional[str] = None): # First check if credentials exist if not check_credentials(): return mcp_toolkit = None server_process = None try: # Wrap setup and execution in try # Configure based on transport type if server_transport == 'stdio': # Original stdio logic (may still deadlock, but kept for reference) print("Using stdio transport (Note: May cause deadlock during auth)") current_dir = Path(__file__).resolve().parent server_script_path = current_dir / "google_forms_server_mcp.py" print(f"Looking for server script at: {server_script_path}") print(f"Directory contents: {[f.name for f in current_dir.iterdir() if f.is_file()]}") if not server_script_path.is_file(): print(f"Error: Server script not found at {server_script_path}") return # Use the _MCPServer helper for stdio from camel.toolkits.mcp_toolkit import _MCPServer # Import locally server_process = _MCPServer([sys.executable, str(server_script_path), "stdio"]) await server_process.start() mcp_toolkit = MCPToolkit(mcp_server_process=server_process) print("MCP Server started via stdio.") elif server_transport == 'sse': # SSE logic: Connect to an existing HTTP/SSE server if not server_url: print("Error: --server-url is required for SSE transport.") print("Example: python google_forms_example_run.py --transport sse --server-url http://127.0.0.1:8000") return print(f"Connecting to MCP Server via SSE at: {server_url}") mcp_toolkit = MCPToolkit(servers=[server_url]) # Move the connection test inside the 'async with' block else: print(f"Error: Unsupported server transport: {server_transport}") return # Initialize the LLM model # Reverting to model_config_dict based on user example, and adding api_key explicitly anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") if not anthropic_api_key: print("Error: ANTHROPIC_API_KEY not found in environment variables or .env file.") # Decide how to handle missing key - raise error or return return # Or raise ValueError("Missing Anthropic API Key") model = ModelFactory.create( model_platform=ModelPlatformType.ANTHROPIC, model_type="claude-3-haiku-20240307", # Use a suitable Anthropic model # temperature=0.0 # Replaced by model_config_dict api_key=anthropic_api_key, # Explicitly pass API key model_config_dict={"temperature": 0.0} # Use the dict from user example ) # Main execution block within the try async with mcp_toolkit.connection() as toolkit: # Test connection *after* establishing context try: await toolkit.list_tools() # Use toolkit here print("Successfully connected to WebSocket MCP server and listed tools.") except Exception as e: print(f"Error testing connection/listing tools with server at {server_url}: {e}") print("Please ensure the server is running correctly.") print(f"Run: python google_forms_server_mcp.py --transport sse") return # Exit if connection test fails print("\nInitializing ChatAgent...") # Initialize ChatAgent with the MCP toolkit camel_agent = ChatAgent( model=model, tools=toolkit.get_tools(), verbose=True ) print("ChatAgent initialized.") # Choose mode: automated test or interactive loop if mode == 'automated': await run_automated_test(toolkit.get_tools()) elif mode == 'interactive': await interactive_input_loop(camel_agent) finally: # Finally block associated with the outer try print("\nCleaning up...") # Clean up resources only if server was started by this script (stdio mode) if server_process: print("Stopping stdio MCP server process...") try: await server_process.stop() print("Server process stopped.") except Exception as e: print(f"Error stopping server process: {e}") # For WebSocket, we assume the server runs independently print("Cleanup complete.") # Add argument parsing for command-line execution (this remains outside the main function) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Google Forms MCP Example Runner") parser.add_argument( "--mode", default="automated", choices=["automated", "interactive"], help="Run mode (default: automated)" ) parser.add_argument( "--transport", default="sse", # Default to SSE for client too choices=["stdio", "sse"], # Only allow stdio or sse help="Server transport method (default: sse)" ) parser.add_argument( "--server-url", default="http://127.0.0.1:8000", # Default SSE URL (HTTP) help="URL of the running MCP SSE server (required for sse transport)" ) args = parser.parse_args() try: asyncio.run(main(server_transport=args.transport, mode=args.mode, server_url=args.server_url)) except KeyboardInterrupt: print("\nExecution interrupted by user.") ``` -------------------------------------------------------------------------------- /google_forms_mcp/test_google_forms_server_mcp.py: -------------------------------------------------------------------------------- ```python import asyncio # noqa: F401 import os import json import pytest from unittest.mock import Mock, patch, MagicMock import sys sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) # Import the async tools to test from google_forms_server_mcp import ( create_form, add_section, add_short_answer, add_paragraph, add_multiple_choice, add_checkboxes, add_dropdown, add_file_upload, modify_form_settings, get_responses, export_responses, list_forms, ) # Mock Google API services @pytest.fixture def mock_google_services(): # Create mock services mock_form_service = MagicMock() mock_drive_service = MagicMock() mock_sheets_service = MagicMock() # Create mock context with mock services mock_context = MagicMock() mock_context.lifespan_context = MagicMock() mock_context.lifespan_context.form_service = mock_form_service mock_context.lifespan_context.drive_service = mock_drive_service mock_context.lifespan_context.sheets_service = mock_sheets_service return mock_context @pytest.mark.asyncio async def test_create_form(mock_google_services): """ Test that create_form creates a form correctly """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value mock_create = mock_forms_obj.create.return_value mock_create.execute.return_value = {"formId": form_id} # Call the create_form function form_title = "Test Form" form_description = "A test form" result = await create_form(title=form_title, description=form_description, ctx=mock_google_services) result_json = json.loads(result) # Check that the form creation was successful assert result_json["form_id"] == form_id assert result_json["title"] == form_title assert result_json["description"] == form_description assert "edit_url" in result_json assert "view_url" in result_json # Check that the correct API calls were made mock_forms_obj.create.assert_called_once() form_body = mock_forms_obj.create.call_args[1]["body"] assert form_body["info"]["title"] == form_title assert form_body["info"]["description"] == form_description @pytest.mark.asyncio async def test_add_section(mock_google_services): """ Test that add_section adds a section to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the get method mock_get = mock_forms_obj.get.return_value mock_get.execute.return_value = { "formId": form_id, "items": [{"item1": "data"}] } # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_section function section_title = "Test Section" section_description = "A test section" result = await add_section( form_id=form_id, title=section_title, description=section_description, ctx=mock_google_services ) result_json = json.loads(result) # Check that the section addition was successful assert result_json["form_id"] == form_id assert result_json["section_added"] == section_title assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.get.assert_called_once_with(formId=form_id) mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == section_title assert update_request["requests"][0]["createItem"]["item"]["description"] == section_description assert "pageBreakItem" in update_request["requests"][0]["createItem"]["item"] @pytest.mark.asyncio async def test_modify_form_settings(mock_google_services): """ Test that modify_form_settings updates form settings correctly """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the get method mock_get = mock_forms_obj.get.return_value mock_get.execute.return_value = { "formId": form_id, "settings": {"collectEmail": False} } # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the modify_form_settings function collect_email = True result = await modify_form_settings( form_id=form_id, collect_email=collect_email, ctx=mock_google_services ) result_json = json.loads(result) # Check that the settings update was successful assert result_json["form_id"] == form_id assert result_json["settings_updated"] == True assert result_json["collect_email"] == collect_email # Check that the correct API calls were made mock_forms_obj.get.assert_called_once_with(formId=form_id) mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["updateSettings"]["settings"]["collectEmail"] == collect_email assert update_request["requests"][0]["updateSettings"]["updateMask"] == "collectEmail" @pytest.mark.asyncio async def test_add_short_answer(mock_google_services): """ Test that add_short_answer adds a question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_short_answer function question_text = "What is your name?" required = True help_text = "Please enter your full name" result = await add_short_answer( form_id=form_id, question_text=question_text, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "short_answer" assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == False @pytest.mark.asyncio async def test_add_paragraph(mock_google_services): """ Test that add_paragraph adds a paragraph question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_paragraph function question_text = "Tell us about yourself" required = False help_text = "Write a brief description" result = await add_paragraph( form_id=form_id, question_text=question_text, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "paragraph" assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text assert update_request["requests"][0]["createItem"]["item"]["textQuestion"]["paragraph"] == True @pytest.mark.asyncio async def test_add_multiple_choice(mock_google_services): """ Test that add_multiple_choice adds a multiple choice question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_multiple_choice function question_text = "What is your favorite color?" choices = ["Red", "Blue", "Green", "Yellow"] required = True help_text = "Select one option" result = await add_multiple_choice( form_id=form_id, question_text=question_text, choices=choices, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "multiple_choice" assert result_json["choices"] == choices assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text # Check that the choices were set correctly choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] assert choice_question["type"] == "RADIO" assert len(choice_question["options"]) == len(choices) for i, choice in enumerate(choices): assert choice_question["options"][i]["value"] == choice assert choice_question["shuffle"] == False @pytest.mark.asyncio async def test_add_checkboxes(mock_google_services): """ Test that add_checkboxes adds a checkboxes question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_checkboxes function question_text = "Which fruits do you like?" choices = ["Apple", "Banana", "Orange", "Strawberry"] required = True help_text = "Select all that apply" result = await add_checkboxes( form_id=form_id, question_text=question_text, choices=choices, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "checkboxes" assert result_json["choices"] == choices assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text # Check that the choices were set correctly choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] assert choice_question["type"] == "CHECKBOX" assert len(choice_question["options"]) == len(choices) for i, choice in enumerate(choices): assert choice_question["options"][i]["value"] == choice assert choice_question["shuffle"] == False @pytest.mark.asyncio async def test_add_dropdown(mock_google_services): """ Test that add_dropdown adds a dropdown question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_dropdown function question_text = "Select your country" choices = ["USA", "Canada", "UK", "Australia"] required = True help_text = "Select one" result = await add_dropdown( form_id=form_id, question_text=question_text, choices=choices, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "dropdown" assert result_json["choices"] == choices assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text # Check that the choices were set correctly choice_question = update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"]["choiceQuestion"] assert choice_question["type"] == "DROP_DOWN" assert len(choice_question["options"]) == len(choices) for i, choice in enumerate(choices): assert choice_question["options"][i]["value"] == choice assert choice_question["shuffle"] == False @pytest.mark.asyncio async def test_add_file_upload(mock_google_services): """ Test that add_file_upload adds a file upload question to a form """ # Set up the mock response form_id = "abc123formid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the add_file_upload function question_text = "Upload your resume" required = True help_text = "PDF files only please" result = await add_file_upload( form_id=form_id, question_text=question_text, required=required, help_text=help_text, ctx=mock_google_services ) result_json = json.loads(result) # Check that the question addition was successful assert result_json["form_id"] == form_id assert result_json["question_text"] == question_text assert result_json["type"] == "file_upload" assert result_json["required"] == required assert result_json["status"] == "success" # Check that the correct API calls were made mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["createItem"]["item"]["title"] == question_text assert update_request["requests"][0]["createItem"]["item"]["required"] == required assert update_request["requests"][0]["createItem"]["item"]["description"] == help_text # Check that it's a file upload question assert "fileUploadQuestion" in update_request["requests"][0]["createItem"]["item"]["questionItem"]["question"] @pytest.mark.asyncio async def test_get_responses(mock_google_services): """ Test that get_responses retrieves form responses correctly """ # Set up the mock response form_id = "abc123formid" form_title = "Test Form" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value # Mock the get method mock_get = mock_forms_obj.get.return_value mock_get.execute.return_value = { "formId": form_id, "info": {"title": form_title}, "items": [ { "title": "What is your name?", "questionItem": { "question": { "questionId": "q1" } } }, { "title": "What is your age?", "questionItem": { "question": { "questionId": "q2" } } } ] } # Mock the responses.list method mock_responses = mock_forms_obj.responses.return_value mock_responses_list = mock_responses.list.return_value mock_responses_list.execute.return_value = { "responses": [ { "responseId": "resp1", "createTime": "2023-04-01T12:00:00Z", "answers": { "q1": { "textAnswers": { "answers": [ {"value": "John Doe"} ] } }, "q2": { "textAnswers": { "answers": [ {"value": "30"} ] } } } } ] } # Call the get_responses function result = await get_responses(form_id=form_id, ctx=mock_google_services) result_json = json.loads(result) # Check that the responses were retrieved successfully assert result_json["form_id"] == form_id assert result_json["title"] == form_title assert result_json["response_count"] == 1 assert len(result_json["responses"]) == 1 # Check the response details response = result_json["responses"][0] assert response["response_id"] == "resp1" assert response["timestamp"] == "2023-04-01T12:00:00Z" assert "What is your name?" in response["answers"] assert response["answers"]["What is your name?"] == "John Doe" assert "What is your age?" in response["answers"] assert response["answers"]["What is your age?"] == "30" # Check that the correct API calls were made mock_forms_obj.get.assert_called_once_with(formId=form_id) mock_forms_obj.responses.assert_called_once() mock_responses.list.assert_called_once_with(formId=form_id) @pytest.mark.asyncio async def test_export_responses(mock_google_services): """ Test that export_responses correctly sets up response export """ # Set up the mock response form_id = "abc123formid" form_title = "Test Form" sheet_id = "xyz789sheetid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value # Mock form get method mock_get = mock_forms_obj.get.return_value mock_get.execute.return_value = { "formId": form_id, "info": {"title": form_title}, "responderUri": f"https://docs.google.com/spreadsheets/d/{sheet_id}/edit" } # Call the export_responses function for CSV result_csv = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services) result_csv_json = json.loads(result_csv) # Call the export_responses function for sheets result_sheets = await export_responses(form_id=form_id, format="sheets", ctx=mock_google_services) result_sheets_json = json.loads(result_sheets) # Check the CSV export result assert result_csv_json["form_id"] == form_id assert result_csv_json["export_format"] == "csv" assert sheet_id in result_csv_json["download_link"] # Check the Sheets export result assert result_sheets_json["form_id"] == form_id assert result_sheets_json["export_format"] == "sheets" assert result_sheets_json["spreadsheet_id"] == sheet_id assert sheet_id in result_sheets_json["spreadsheet_link"] # Check that the correct API calls were made mock_forms_obj.get.assert_called_with(formId=form_id) @pytest.mark.asyncio async def test_export_responses_create_new(mock_google_services): """ Test that export_responses creates a new spreadsheet when none exists """ # Set up the mock response form_id = "abc123formid" form_title = "Test Form" new_sheet_id = "new789sheetid" mock_forms_obj = mock_google_services.lifespan_context.form_service.forms.return_value mock_sheets_obj = mock_google_services.lifespan_context.sheets_service.spreadsheets.return_value # Mock form get method with no responderUri mock_get = mock_forms_obj.get.return_value mock_get.execute.return_value = { "formId": form_id, "info": {"title": form_title} } # Mock spreadsheet create method mock_create = mock_sheets_obj.create.return_value mock_create.execute.return_value = {"spreadsheetId": new_sheet_id} # Mock form batchUpdate method mock_batch_update = mock_forms_obj.batchUpdate.return_value mock_batch_update.execute.return_value = {"success": True} # Call the export_responses function result = await export_responses(form_id=form_id, format="csv", ctx=mock_google_services) result_json = json.loads(result) # Check the export result assert result_json["form_id"] == form_id assert result_json["export_format"] == "csv" assert new_sheet_id in result_json["download_link"] # Check that the correct API calls were made mock_forms_obj.get.assert_called_with(formId=form_id) mock_sheets_obj.create.assert_called_once() # Check that we tried to link the form to the spreadsheet mock_forms_obj.batchUpdate.assert_called_once() update_request = mock_forms_obj.batchUpdate.call_args[1]["body"] assert update_request["requests"][0]["updateSettings"]["settings"]["responseDestination"] == "SPREADSHEET" assert update_request["requests"][0]["updateSettings"]["settings"]["spreadsheetId"] == new_sheet_id @pytest.mark.asyncio async def test_list_forms(mock_google_services): """ Test that list_forms returns all forms correctly """ # Set up the mock response mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value mock_list = mock_drive_obj.list.return_value mock_list.execute.return_value = { "files": [ { "id": "form1", "name": "Customer Feedback", "webViewLink": "https://docs.google.com/forms/d/form1/viewform", "createdTime": "2023-01-15T12:00:00Z" }, { "id": "form2", "name": "Job Application", "webViewLink": "https://docs.google.com/forms/d/form2/viewform", "createdTime": "2023-02-20T10:30:00Z" } ] } # Call the list_forms function result = await list_forms(ctx=mock_google_services) result_json = json.loads(result) # Check that the forms were listed correctly assert len(result_json["forms"]) == 2 # Check the first form details form1 = result_json["forms"][0] assert form1["id"] == "form1" assert form1["name"] == "Customer Feedback" assert form1["url"] == "https://docs.google.com/forms/d/form1/viewform" assert form1["created"] == "2023-01-15T12:00:00Z" # Check the second form details form2 = result_json["forms"][1] assert form2["id"] == "form2" assert form2["name"] == "Job Application" assert form2["url"] == "https://docs.google.com/forms/d/form2/viewform" assert form2["created"] == "2023-02-20T10:30:00Z" # Check that the correct API calls were made mock_drive_obj.list.assert_called_once() list_args = mock_drive_obj.list.call_args[1] assert list_args["q"] == "mimeType='application/vnd.google-apps.form'" assert "id" in list_args["fields"] assert "name" in list_args["fields"] assert "webViewLink" in list_args["fields"] assert "createdTime" in list_args["fields"] @pytest.mark.asyncio async def test_list_forms_empty(mock_google_services): """ Test that list_forms handles the case of no forms """ # Set up the mock response with no forms mock_drive_obj = mock_google_services.lifespan_context.drive_service.files.return_value mock_list = mock_drive_obj.list.return_value mock_list.execute.return_value = {"files": []} # Call the list_forms function result = await list_forms(ctx=mock_google_services) result_json = json.loads(result) # Check that the response is formatted correctly assert result_json["forms"] == [] assert "message" in result_json assert result_json["message"] == "No forms found" # Check that the correct API call was made mock_drive_obj.list.assert_called_once() ``` -------------------------------------------------------------------------------- /google_forms_mcp/google_forms_server_mcp.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= """ Google Forms MCP Server using FastMCP This server provides MCP tools for interacting with Google Forms. """ import os import json import pickle from typing import List, Optional, Dict, Any from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass import asyncio # noqa: F401 from mcp.server.fastmcp import FastMCP, Context from camel.logger import get_logger from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request # Add necessary imports for WebSocket server import argparse logger = get_logger(__name__) # Define the scopes needed for Google Forms API SCOPES = [ 'https://www.googleapis.com/auth/forms', 'https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/spreadsheets' ] @dataclass class FormServices: """Class to hold the Google API services""" form_service: Any drive_service: Any sheets_service: Any # Find the app_lifespan function in google_forms_server_mcp.py # Look for this section (around line 60-80): # Update the app_lifespan function in google_forms_server_mcp.py with this code @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[FormServices]: """Manage application lifecycle with Google API services""" logger.info("Initializing Google API services...") # Get credentials and initialize services creds = None # Token file stores the user's access and refresh tokens if os.path.exists('token.pickle'): with open('token.pickle', 'rb') as token: creds = pickle.load(token) # If there are no valid credentials, let the user log in if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: # We need to make this work with async flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) # Generate the authorization URL auth_url, _ = flow.authorization_url(prompt='consent') print("\n\n================================================") print("Go to this URL in your browser to authenticate:") print(f"{auth_url}") print("================================================\n") # Get authorization code from user input (using asyncio event loop) loop = asyncio.get_event_loop() auth_code = await loop.run_in_executor( None, lambda: input("Enter the authorization code: ") ) # Exchange authorization code for credentials flow.fetch_token(code=auth_code) creds = flow.credentials # Save the credentials for the next run with open('token.pickle', 'wb') as token: pickle.dump(creds, token) # Build API services form_service = build('forms', 'v1', credentials=creds) drive_service = build('drive', 'v3', credentials=creds) sheets_service = build('sheets', 'v4', credentials=creds) services = FormServices( form_service=form_service, drive_service=drive_service, sheets_service=sheets_service ) logger.info("Google API services initialized successfully") try: yield services finally: # No specific cleanup needed for Google API services logger.info("Shutting down Google API services") # Create the MCP server with lifespan support mcp = FastMCP( "GoogleForms", lifespan=app_lifespan, # Optionally add description, version etc. # description="MCP Server for Google Forms interaction.", # version="0.1.0" ) # Form Structure Tools @mcp.tool() async def create_form(title: str, description: str = "", ctx: Context = None) -> str: """ Create a new Google Form with title and description Args: title (str): The title of the form description (str): The description of the form Returns: str: JSON string containing form details including ID """ logger.info(f"create_form triggered with title: {title}") try: services = ctx.lifespan_context form_service = services.form_service form_body = { "info": { "title": title, "documentTitle": title } } if description: form_body["info"]["description"] = description result = form_service.forms().create(body=form_body).execute() return json.dumps({ "form_id": result["formId"], "title": title, "description": description, "edit_url": f"https://docs.google.com/forms/d/{result['formId']}/edit", "view_url": f"https://docs.google.com/forms/d/{result['formId']}/viewform" }, indent=2) except Exception as e: return f"Error creating form: {e}" create_form.inputSchema = { "type": "object", "properties": { "title": { "type": "string", "title": "Form Title", "description": "The title of the Google Form" }, "description": { "type": "string", "title": "Form Description", "description": "The description of the Google Form" } }, "required": ["title"] } @mcp.tool() async def add_section(form_id: str, title: str, description: str = "", ctx: Context = None) -> str: """ Add a section to a Google Form Args: form_id (str): The ID of the form title (str): The title of the section description (str): The description of the section Returns: str: JSON string containing the updated form details """ logger.info(f"add_section triggered with form_id: {form_id}, title: {title}") try: services = ctx.lifespan_context form_service = services.form_service # First, get the current form form = form_service.forms().get(formId=form_id).execute() # Create the update request update_request = { "requests": [ { "createItem": { "item": { "title": title, "description": description, "pageBreakItem": {} }, "location": { "index": len(form.get("items", [])) } } } ] } # Execute the update updated_form = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "section_added": title, "status": "success" }, indent=2) except Exception as e: return f"Error adding section: {e}" add_section.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "title": { "type": "string", "title": "Section Title", "description": "The title of the section" }, "description": { "type": "string", "title": "Section Description", "description": "The description of the section" } }, "required": ["form_id", "title"] } @mcp.tool() async def modify_form_settings( form_id: str, collect_email: Optional[bool] = None, limit_responses: Optional[bool] = None, response_limit: Optional[int] = None, ctx: Context = None ) -> str: """ Modify Google Form settings Args: form_id (str): The ID of the form collect_email (bool, optional): Whether to collect email addresses limit_responses (bool, optional): Whether to limit responses response_limit (int, optional): Maximum number of responses allowed Returns: str: JSON string containing the updated form settings """ logger.info(f"modify_form_settings triggered with form_id: {form_id}") try: services = ctx.lifespan_context form_service = services.form_service # Get the current form form = form_service.forms().get(formId=form_id).execute() updates = [] # Update collect email setting if collect_email is not None: updates.append({ "updateSettings": { "settings": { "collectEmail": collect_email }, "updateMask": "collectEmail" } }) # Update response limit settings if limit_responses is not None and not limit_responses: updates.append({ "updateSettings": { "settings": { "isQuiz": False }, "updateMask": "isQuiz" } }) elif limit_responses and response_limit: updates.append({ "updateSettings": { "settings": { "limitOneResponsePerUser": True }, "updateMask": "limitOneResponsePerUser" } }) # Execute the updates if any if updates: update_request = {"requests": updates} updated_form = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "settings_updated": True, "collect_email": collect_email, "limit_responses": limit_responses, "response_limit": response_limit }, indent=2) except Exception as e: return f"Error modifying form settings: {e}" modify_form_settings.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "collect_email": { "type": "boolean", "title": "Collect Email", "description": "Whether to collect email addresses" }, "limit_responses": { "type": "boolean", "title": "Limit Responses", "description": "Whether to limit responses" }, "response_limit": { "type": "integer", "title": "Response Limit", "description": "Maximum number of responses allowed" } }, "required": ["form_id"] } # Question Type Tools @mcp.tool() async def add_short_answer( form_id: str, question_text: str, required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a short answer question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_short_answer triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service question_item = { "title": question_text, "required": required, "textQuestion": { "paragraph": False } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "short_answer", "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding short answer question: {e}" add_short_answer.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text"] } @mcp.tool() async def add_paragraph( form_id: str, question_text: str, required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a paragraph question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_paragraph triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service question_item = { "title": question_text, "required": required, "textQuestion": { "paragraph": True } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "paragraph", "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding paragraph question: {e}" add_paragraph.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text"] } @mcp.tool() async def add_multiple_choice( form_id: str, question_text: str, choices: List[str], required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a multiple choice question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question choices (List[str]): List of choices for the multiple choice question required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_multiple_choice triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service # Create choices objects choice_items = [{"value": choice} for choice in choices] question_item = { "title": question_text, "required": required, "questionItem": { "question": { "choiceQuestion": { "type": "RADIO", "options": choice_items, "shuffle": False } } } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "multiple_choice", "choices": choices, "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding multiple choice question: {e}" add_multiple_choice.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "choices": { "type": "array", "items": { "type": "string" }, "title": "Choices", "description": "List of choices for the multiple choice question" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text", "choices"] } @mcp.tool() async def add_checkboxes( form_id: str, question_text: str, choices: List[str], required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a checkboxes question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question choices (List[str]): List of choices for the checkboxes required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_checkboxes triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service # Create choices objects choice_items = [{"value": choice} for choice in choices] question_item = { "title": question_text, "required": required, "questionItem": { "question": { "choiceQuestion": { "type": "CHECKBOX", "options": choice_items, "shuffle": False } } } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "checkboxes", "choices": choices, "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding checkboxes question: {e}" add_checkboxes.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "choices": { "type": "array", "items": { "type": "string" }, "title": "Choices", "description": "List of choices for the checkboxes" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text", "choices"] } @mcp.tool() async def add_dropdown( form_id: str, question_text: str, choices: List[str], required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a dropdown question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question choices (List[str]): List of choices for the dropdown required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_dropdown triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service # Create choices objects choice_items = [{"value": choice} for choice in choices] question_item = { "title": question_text, "required": required, "questionItem": { "question": { "choiceQuestion": { "type": "DROP_DOWN", "options": choice_items, "shuffle": False } } } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "dropdown", "choices": choices, "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding dropdown question: {e}" add_dropdown.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "choices": { "type": "array", "items": { "type": "string" }, "title": "Choices", "description": "List of choices for the dropdown" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text", "choices"] } @mcp.tool() async def add_file_upload( form_id: str, question_text: str, required: bool = False, help_text: str = "", ctx: Context = None ) -> str: """ Add a file upload question to a Google Form Args: form_id (str): The ID of the form question_text (str): The text of the question required (bool, optional): Whether the question is required help_text (str, optional): Help text for the question Returns: str: JSON string containing the question details """ logger.info(f"add_file_upload triggered with form_id: {form_id}, question: {question_text}") try: services = ctx.lifespan_context form_service = services.form_service question_item = { "title": question_text, "required": required, "questionItem": { "question": { "fileUploadQuestion": { "folderId": None # This will use the default folder } } } } if help_text: question_item["description"] = help_text update_request = { "requests": [ { "createItem": { "item": question_item, "location": { "index": 0 } } } ] } result = form_service.forms().batchUpdate( formId=form_id, body=update_request).execute() return json.dumps({ "form_id": form_id, "question_text": question_text, "type": "file_upload", "required": required, "status": "success" }, indent=2) except Exception as e: return f"Error adding file upload question: {e}" add_file_upload.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "question_text": { "type": "string", "title": "Question Text", "description": "The text of the question" }, "required": { "type": "boolean", "title": "Required", "description": "Whether the question is required" }, "help_text": { "type": "string", "title": "Help Text", "description": "Help text for the question" } }, "required": ["form_id", "question_text"] } # Response Management Tools @mcp.tool() async def get_responses(form_id: str, ctx: Context = None) -> str: """ Get responses from a Google Form Args: form_id (str): The ID of the form Returns: str: JSON string containing the form responses """ logger.info(f"get_responses triggered with form_id: {form_id}") try: services = ctx.lifespan_context form_service = services.form_service # Get the form first to check if it has a linked response sheet form = form_service.forms().get(formId=form_id).execute() # Get responses responses = form_service.forms().responses().list(formId=form_id).execute() # Simplify the response data for readability simplified_responses = [] for response in responses.get("responses", []): answer_data = {} for key, value in response.get("answers", {}).items(): question_id = key # Try to get the question text from the form question_text = "Unknown Question" for item in form.get("items", []): if item.get("questionItem", {}).get("question", {}).get("questionId") == question_id: question_text = item.get("title", "Unknown Question") break # Extract the answer value based on type answer_value = None if "textAnswers" in value: answer_value = value["textAnswers"]["answers"][0]["value"] elif "fileUploadAnswers" in value: answer_value = [file.get("fileId") for file in value["fileUploadAnswers"]["answers"]] elif "choiceAnswers" in value: answer_value = [choice.get("value") for choice in value["choiceAnswers"]["answers"]] answer_data[question_text] = answer_value simplified_responses.append({ "response_id": response.get("responseId"), "timestamp": response.get("createTime"), "answers": answer_data }) return json.dumps({ "form_id": form_id, "title": form.get("info", {}).get("title", ""), "response_count": len(responses.get("responses", [])), "responses": simplified_responses }, indent=2) except Exception as e: return f"Error getting responses: {e}" get_responses.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" } }, "required": ["form_id"] } @mcp.tool() async def export_responses(form_id: str, format: str = "csv", ctx: Context = None) -> str: """ Export responses from a Google Form to a spreadsheet or CSV Args: form_id (str): The ID of the form format (str, optional): The format to export (csv or sheets) Returns: str: JSON string containing the export details """ logger.info(f"export_responses triggered with form_id: {form_id}, format: {format}") try: services = ctx.lifespan_context form_service = services.form_service drive_service = services.drive_service sheets_service = services.sheets_service # Get the form form = form_service.forms().get(formId=form_id).execute() # Check if there's already a response spreadsheet response_sheet_id = None try: form_info = form_service.forms().get(formId=form_id).execute() if "responderUri" in form_info: # Extract the spreadsheet ID from the responder URI uri_parts = form_info["responderUri"].split("/") if len(uri_parts) > 5: response_sheet_id = uri_parts[5] except Exception as e: logger.error(f"Error getting form: {e}") if not response_sheet_id: # Create a new spreadsheet for responses spreadsheet_body = { 'properties': { 'title': f"Responses for {form.get('info', {}).get('title', 'Form')}" } } sheet = sheets_service.spreadsheets().create(body=spreadsheet_body).execute() response_sheet_id = sheet.get('spreadsheetId') # Link the form to the spreadsheet update_request = { "requests": [ { "updateSettings": { "settings": { "responseDestination": "SPREADSHEET", "spreadsheetId": response_sheet_id }, "updateMask": "responseDestination,spreadsheetId" } } ] } form_service.forms().batchUpdate(formId=form_id, body=update_request).execute() # For CSV format, create a link to download as CSV if format.lower() == "csv": csv_export_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/export?format=csv" return json.dumps({ "form_id": form_id, "export_format": "csv", "download_link": csv_export_link }, indent=2) else: # For sheets format, return the spreadsheet link sheets_link = f"https://docs.google.com/spreadsheets/d/{response_sheet_id}/edit" return json.dumps({ "form_id": form_id, "export_format": "sheets", "spreadsheet_id": response_sheet_id, "spreadsheet_link": sheets_link }, indent=2) except Exception as e: return f"Error exporting responses: {e}" export_responses.inputSchema = { "type": "object", "properties": { "form_id": { "type": "string", "title": "Form ID", "description": "The ID of the Google Form" }, "format": { "type": "string", "title": "Export Format", "description": "The format to export (csv or sheets)", "enum": ["csv", "sheets"] } }, "required": ["form_id"] } @mcp.tool() async def list_forms(ctx: Context = None) -> str: """ List all Google Forms created by the user Returns: str: JSON string containing the list of forms """ logger.info("list_forms triggered") try: services = ctx.lifespan_context drive_service = services.drive_service # Search for Google Forms files results = drive_service.files().list( q="mimeType='application/vnd.google-apps.form'", spaces='drive', fields='files(id, name, webViewLink, createdTime)' ).execute() forms = results.get('files', []) if not forms: return json.dumps({"forms": [], "message": "No forms found"}, indent=2) # Format the forms list forms_list = [] for form in forms: forms_list.append({ "id": form.get('id'), "name": form.get('name'), "url": form.get('webViewLink'), "created": form.get('createdTime') }) return json.dumps({"forms": forms_list}, indent=2) except Exception as e: return f"Error listing forms: {e}" list_forms.inputSchema = { "type": "object", "properties": {}, "required": [] } # Update the main function to handle different transports using mcp.run def main(transport: str = "stdio", host: str = "127.0.0.1", port: int = 8000): """Starts the MCP server using the specified transport via mcp.run().""" # Pass arguments like host and port if mcp.run supports them # Assuming mcp.run handles stdio and ws transports appropriately logger.info(f"Starting MCP server with {transport} transport...") try: # Check if mcp.run accepts host/port for relevant transports (like ws) # This structure assumes mcp.run handles the server lifecycles if transport == "ws": # We might need to pass host/port here if mcp.run accepts them # Trying without them first, assuming defaults or internal handling # If it fails, we may need to inspect mcp.run or FastMCP source # mcp.run(transport=transport) # Run with only transport for ws - Changed to sse # Assuming mcp.run handles sse transport correctly mcp.run(transport=transport) elif transport == "stdio": mcp.run(transport=transport) else: # logger.error(f"Unsupported transport type for mcp.run: {transport}") # Since we only support stdio and sse now, this path shouldn't be hit with arg choices # If it somehow is, mcp.run will raise its own error. # We'll rely on mcp.run to validate the transport. mcp.run(transport=transport) except AttributeError: logger.error(f"mcp.run does not support the arguments provided for transport '{transport}'. Trying without host/port.") try: mcp.run(transport=transport) except Exception as e: logger.error(f"Failed to start server with mcp.run(transport='{transport}'): {e}") except Exception as e: logger.error(f"Failed to start server with mcp.run: {e}") # Add argument parsing for command-line execution if __name__ == "__main__": parser = argparse.ArgumentParser(description="Google Forms MCP Server") parser.add_argument( "--transport", default="sse", # Default to Server-Sent Events choices=["stdio", "sse"], # Only allow stdio or sse help="Server transport method (default: sse)" ) parser.add_argument( "--host", default="127.0.0.1", help="Host address for WebSocket server (default: 127.0.0.1)" ) parser.add_argument( "--port", type=int, default=8000, help="Port for WebSocket server (default: 8000)" ) args = parser.parse_args() main(transport=args.transport, host=args.host, port=args.port) ```