# Directory Structure ``` ├── .env.example ├── .github │ └── workflows │ └── python-pytest.yml ├── .gitignore ├── Dockerfile ├── LICENSE ├── mcp_flowise │ ├── __init__.py │ ├── __main__.py │ ├── server_fastmcp.py │ ├── server_lowlevel.py │ └── utils.py ├── pyproject.toml ├── pytest.ini ├── README.md ├── smithery.yaml ├── test_mcp_call_tool_valid.py ├── test_mcp_handshake.py ├── tests │ ├── fixtures │ │ └── sample_chatflows.json │ ├── integration │ │ ├── test_flowise_integration.py │ │ ├── test_tool_prediction.py │ │ └── test_tool_registration_integration.py │ ├── README.md │ └── unit │ ├── test_chatflow_filters.py │ └── test_utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.env.example: -------------------------------------------------------------------------------- ``` # Copy this file to .env and update the values as needed: # Flowise API key (Bearer token) FLOWISE_API_KEY=your_flowise_api_key_here # Flowise endpoint (default is http://localhost:3000) FLOWISE_API_ENDPOINT=http://localhost:3000 # FastMCP Mode: Optionally set ONE or BOTH of these to lock in specific Chatflow or Assistant: FLOWISE_CHATFLOW_ID= FLOWISE_ASSISTANT_ID= # LowLevel Mode: Dynamically expose tools for each chatflow/assistant # Comma-separated list of chatflow IDs and their descriptions, e.g.: # "chatflow_id:My \\"First\\" Chatflow,another_id:My Second Chatflow" FLOWISE_CHATFLOW_DESCRIPTIONS= # Optional filters for FastMCP Mode (ignored in LowLevel Mode): # Whitelist: Comma-separated list of chatflow IDs to allow FLOWISE_CHATFLOW_WHITELIST= # Blacklist: Comma-separated list of chatflow IDs to deny FLOWISE_CHATFLOW_BLACKLIST= # Notes: # - If neither FLOWISE_CHATFLOW_ID nor FLOWISE_ASSISTANT_ID is set: # - Exposes 'list_chatflows' and 'create_prediction(chatflow_id, question)'. # - If exactly one is set: # - Exposes 'create_prediction(question)'. # - If both are set: # - The server will refuse to start. # - FLOWISE_CHATFLOW_DESCRIPTIONS is required for LowLevel Mode to dynamically expose tools. ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # UV # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. #uv.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ # PyPI configuration file .pypirc *.bak ``` -------------------------------------------------------------------------------- /tests/README.md: -------------------------------------------------------------------------------- ```markdown # Testing `mcp-flowise` ## Structure - `unit/`: Contains unit tests for individual modules. - `integration/`: Contains integration tests for end-to-end scenarios. - `fixtures/`: Contains static example data and environment files for mocking. ## Running Tests 1. Install test dependencies: ```bash pip install pytest ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # mcp-flowise [](https://smithery.ai/server/@matthewhand/mcp-flowise) `mcp-flowise` is a Python package implementing a Model Context Protocol (MCP) server that integrates with the Flowise API. It provides a standardized and flexible way to list chatflows, create predictions, and dynamically register tools for Flowise chatflows or assistants. It supports two operation modes: - **LowLevel Mode (Default)**: Dynamically registers tools for all chatflows retrieved from the Flowise API. - **FastMCP Mode**: Provides static tools for listing chatflows and creating predictions, suitable for simpler configurations. <p align="center"> <img src="https://github.com/user-attachments/assets/d27afb05-c5d3-4cc9-9918-f7be8c715304" alt="Claude Desktop Screenshot"> </p> --- ## Features - **Dynamic Tool Exposure**: LowLevel mode dynamically creates tools for each chatflow or assistant. - **Simpler Configuration**: FastMCP mode exposes `list_chatflows` and `create_prediction` tools for minimal setup. - **Flexible Filtering**: Both modes support filtering chatflows via whitelists and blacklists by IDs or names (regex). - **MCP Integration**: Integrates seamlessly into MCP workflows. --- ## Installation ### Installing via Smithery To install mcp-flowise for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@matthewhand/mcp-flowise): ```bash npx -y @smithery/cli install @matthewhand/mcp-flowise --client claude ``` ### Prerequisites - Python 3.12 or higher - `uvx` package manager ### Install and Run via `uvx` Confirm you can run the server directly from the GitHub repository using `uvx`: ```bash uvx --from git+https://github.com/matthewhand/mcp-flowise mcp-flowise ``` ### Adding to MCP Ecosystem (`mcpServers` Configuration) You can integrate `mcp-flowise` into your MCP ecosystem by adding it to the `mcpServers` configuration. Example: ```json { "mcpServers": { "mcp-flowise": { "command": "uvx", "args": [ "--from", "git+https://github.com/matthewhand/mcp-flowise", "mcp-flowise" ], "env": { "FLOWISE_API_KEY": "${FLOWISE_API_KEY}", "FLOWISE_API_ENDPOINT": "${FLOWISE_API_ENDPOINT}" } } } } ``` --- ## Modes of Operation ### 1. FastMCP Mode (Simple Mode) Enabled by setting `FLOWISE_SIMPLE_MODE=true`. This mode: - Exposes two tools: `list_chatflows` and `create_prediction`. - Allows static configuration using `FLOWISE_CHATFLOW_ID` or `FLOWISE_ASSISTANT_ID`. - Lists all available chatflows via `list_chatflows`. <p align="center"> <img src="https://github.com/user-attachments/assets/0901ef9c-5d56-4f1e-a799-1e5d8e8343bd" alt="FastMCP Mode"> </p> ### 2. LowLevel Mode (FLOWISE_SIMPLE_MODE=False) **Features**: - Dynamically registers all chatflows as separate tools. - Tools are named after chatflow names (normalized). - Uses descriptions from the `FLOWISE_CHATFLOW_DESCRIPTIONS` variable, falling back to chatflow names if no description is provided. **Example**: - `my_tool(question: str) -> str` dynamically created for a chatflow. --- ## Running on Windows with `uvx` If you're using `uvx` on Windows and encounter issues with `--from git+https`, the recommended solution is to clone the repository locally and configure the `mcpServers` with the full path to `uvx.exe` and the cloned repository. Additionally, include `APPDATA`, `LOGLEVEL`, and other environment variables as required. ### Example Configuration for MCP Ecosystem (`mcpServers` on Windows) ```json { "mcpServers": { "flowise": { "command": "C:\\Users\\matth\\.local\\bin\\uvx.exe", "args": [ "--from", "C:\\Users\\matth\\downloads\\mcp-flowise", "mcp-flowise" ], "env": { "LOGLEVEL": "ERROR", "APPDATA": "C:\\Users\\matth\\AppData\\Roaming", "FLOWISE_API_KEY": "your-api-key-goes-here", "FLOWISE_API_ENDPOINT": "http://localhost:3000/" } } } } ``` ### Notes - **Full Paths**: Use full paths for both `uvx.exe` and the cloned repository. - **Environment Variables**: Point `APPDATA` to your Windows user profile (e.g., `C:\\Users\\<username>\\AppData\\Roaming`) if needed. - **Log Level**: Adjust `LOGLEVEL` as needed (`ERROR`, `INFO`, `DEBUG`, etc.). ## Environment Variables ### General - `FLOWISE_API_KEY`: Your Flowise API Bearer token (**required**). - `FLOWISE_API_ENDPOINT`: Base URL for Flowise (default: `http://localhost:3000`). ### LowLevel Mode (Default) - `FLOWISE_CHATFLOW_DESCRIPTIONS`: Comma-separated list of `chatflow_id:description` pairs. Example: ``` FLOWISE_CHATFLOW_DESCRIPTIONS="abc123:Chatflow One,xyz789:Chatflow Two" ``` ### FastMCP Mode (`FLOWISE_SIMPLE_MODE=true`) - `FLOWISE_CHATFLOW_ID`: Single Chatflow ID (optional). - `FLOWISE_ASSISTANT_ID`: Single Assistant ID (optional). - `FLOWISE_CHATFLOW_DESCRIPTION`: Optional description for the single tool exposed. --- ## Filtering Chatflows Filters can be applied in both modes using the following environment variables: - **Whitelist by ID**: `FLOWISE_WHITELIST_ID="id1,id2,id3"` - **Blacklist by ID**: `FLOWISE_BLACKLIST_ID="id4,id5"` - **Whitelist by Name (Regex)**: `FLOWISE_WHITELIST_NAME_REGEX=".*important.*"` - **Blacklist by Name (Regex)**: `FLOWISE_BLACKLIST_NAME_REGEX=".*deprecated.*"` > **Note**: Whitelists take precedence over blacklists. If both are set, the most restrictive rule is applied. - ## Security - **Protect Your API Key**: Ensure the `FLOWISE_API_KEY` is kept secure and not exposed in logs or repositories. - **Environment Configuration**: Use `.env` files or environment variables for sensitive configurations. Add `.env` to your `.gitignore`: ```bash # .gitignore .env ``` --- ## Troubleshooting - **Missing API Key**: Ensure `FLOWISE_API_KEY` is set correctly. - **Invalid Configuration**: If both `FLOWISE_CHATFLOW_ID` and `FLOWISE_ASSISTANT_ID` are set, the server will refuse to start. - **Connection Errors**: Verify `FLOWISE_API_ENDPOINT` is reachable. --- ## License This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details. ## TODO - [x] Fastmcp mode - [x] Lowlevel mode - [x] Filtering - [x] Claude desktop integration - [ ] Assistants ``` -------------------------------------------------------------------------------- /mcp_flowise/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` [pytest] # asyncio_mode = strict # asyncio_default_fixture_loop_scope = function ``` -------------------------------------------------------------------------------- /tests/fixtures/sample_chatflows.json: -------------------------------------------------------------------------------- ```json [ {"id": "mock-id", "name": "Mock Chatflow 1"}, {"id": "mock-id-2", "name": "Mock Chatflow 2"} ] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [project] name = "mcp-flowise" version = "0.1.0" description = "MCP integration with the Flowise API for creating predictions and managing chatflows/assistants" readme = "README.md" authors = [ { name = "Matthew Hand", email = "[email protected]" } ] dependencies = [ "mcp[cli]>=1.2.0", "python-dotenv>=1.0.1", "requests>=2.25.0", ] [project.scripts] mcp-flowise = "mcp_flowise.__main__:main" [dependency-groups] dev = [ "pytest>=8.3.4", ] [tool.setuptools.packages] find = {include = ["mcp_flowise", "mcp_flowise.*"]} ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use a Python base image that satisfies the project requirements FROM python:3.12-slim AS base # Install the uv package manager RUN pip install uvx # Set the working directory in the container WORKDIR /app # Copy the current directory contents into the container at /app COPY . . # Install dependencies RUN uvx sync --frozen --no-dev --no-editable # Expose the port the app runs on EXPOSE 8000 # Set environment variables required for running the MCP server ENV FLOWISE_API_KEY=your_api_key ENV FLOWISE_API_ENDPOINT=http://localhost:3000 # Define the command to run the app CMD ["uvx", "--from", "git+https://github.com/matthewhand/mcp-flowise", "mcp-flowise"] ``` -------------------------------------------------------------------------------- /.github/workflows/python-pytest.yml: -------------------------------------------------------------------------------- ```yaml name: Python Tests on: push: branches: [ main ] pull_request: branches: [ main ] jobs: test: runs-on: ubuntu-latest steps: # Checkout the repository - uses: actions/checkout@v4 # Set up Python environment - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.12' # Install uv - name: Install uv uses: astral-sh/setup-uv@v4 # Set up Python environment with uv - name: Set up Python run: uv python install # Sync dependencies with uv - name: Install dependencies run: uv sync --all-extras --dev # Run tests - name: Run tests run: uv run pytest tests/unit env: PYTHONPATH: ${{ github.workspace }} ``` -------------------------------------------------------------------------------- /tests/integration/test_tool_prediction.py: -------------------------------------------------------------------------------- ```python import unittest import os from mcp_flowise.utils import flowise_predict class TestToolPrediction(unittest.TestCase): """ Integration test for predicting results from a Flowise chatflow. """ def test_tool_prediction(self): """ Test the prediction function for a Flowise chatflow. """ # Check if FLOWISE_CHATFLOW_ID is set chatflow_id = os.getenv("FLOWISE_CHATFLOW_ID") if not chatflow_id: self.skipTest("FLOWISE_CHATFLOW_ID environment variable is not set.") question = "What is the weather like today?" print(f"Using chatflow_id: {chatflow_id}") # Make a prediction result = flowise_predict(chatflow_id, question) # Validate the response self.assertIsInstance(result, str) self.assertNotEqual(result.strip(), "", "Prediction result should not be empty.") print(f"Prediction result: {result}") if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/deployments startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: - flowiseApiKey properties: flowiseApiKey: type: string description: Your Flowise API Bearer token flowiseApiEndpoint: type: string default: http://localhost:3000 description: Base URL for Flowise flowiseSimpleMode: type: boolean default: false description: Enable FastMCP mode for simpler configuration flowiseChatflowId: type: string description: Single Chatflow ID for FastMCP mode (optional) flowiseAssistantId: type: string description: Single Assistant ID for FastMCP mode (optional) commandFunction: # A function that produces the CLI command to start the MCP on stdio. |- (config) => ({command: 'uvx', args: ['--from', 'git+https://github.com/matthewhand/mcp-flowise', 'mcp-flowise'], env: {FLOWISE_API_KEY: config.flowiseApiKey, FLOWISE_API_ENDPOINT: config.flowiseApiEndpoint || 'http://localhost:3000', FLOWISE_SIMPLE_MODE: config.flowiseSimpleMode ? 'true' : 'false', FLOWISE_CHATFLOW_ID: config.flowiseChatflowId || '', FLOWISE_ASSISTANT_ID: config.flowiseAssistantId || ''}}) ``` -------------------------------------------------------------------------------- /test_mcp_handshake.py: -------------------------------------------------------------------------------- ```python import subprocess import time import json # Define JSON-RPC requests initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "1.0", "capabilities": {}, "clientInfo": {"name": "manual-client", "version": "0.1"} } } initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } list_tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } # Start MCP server process = subprocess.Popen( ["uvx", "--from", ".", "mcp-flowise"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) try: # Send "initialize" request process.stdin.write(json.dumps(initialize_request) + "\n") process.stdin.flush() time.sleep(2.0) # Send "initialized" notification process.stdin.write(json.dumps(initialized_notification) + "\n") process.stdin.flush() time.sleep(2.0) # Send "tools/list" request process.stdin.write(json.dumps(list_tools_request) + "\n") process.stdin.flush() time.sleep(4) # Capture output stdout, stderr = process.communicate(timeout=5) # Print server responses print("STDOUT:") print(stdout) print("STDERR:") print(stderr) except subprocess.TimeoutExpired: print("MCP server process timed out.") process.kill() except Exception as e: print(f"An error occurred: {e}") finally: process.terminate() ``` -------------------------------------------------------------------------------- /mcp_flowise/__main__.py: -------------------------------------------------------------------------------- ```python """ Entry point for the mcp_flowise package. This script determines which server to run based on the presence of the FLOWISE_SIMPLE_MODE environment variable: - Low-Level Server: For dynamic tool creation based on chatflows. - FastMCP Server: For static tool configurations. """ import os import sys from dotenv import load_dotenv from mcp_flowise.utils import setup_logging from mcp_flowise.utils import fetch_chatflows # Load environment variables from .env if present load_dotenv() def main(): """ Main entry point for the mcp_flowise package. Depending on the FLOWISE_SIMPLE_MODE environment variable, this function launches either: - Low-Level Server (dynamic tools) - FastMCP Server (static tools) """ # Configure logging DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") logger = setup_logging(debug=DEBUG) logger.debug("Starting mcp_flowise package entry point.") chatflows = fetch_chatflows() logger.debug(f"Available chatflows: {chatflows}") # Default to Simple Mode unless explicitly disabled FLOWISE_SIMPLE_MODE = os.getenv("FLOWISE_SIMPLE_MODE", "true").lower() not in ("false", "0", "no") if FLOWISE_SIMPLE_MODE: logger.debug("FLOWISE_SIMPLE_MODE is enabled. Launching FastMCP Server.") from mcp_flowise.server_fastmcp import run_simple_server selected_server = run_simple_server else: logger.debug("FLOWISE_SIMPLE_MODE is disabled. Launching Low-Level Server.") from mcp_flowise.server_lowlevel import run_server selected_server = run_server # Run the selected server try: selected_server() except Exception as e: logger.critical("Unhandled exception occurred while running the server.", exc_info=True) sys.exit(1) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /test_mcp_call_tool_valid.py: -------------------------------------------------------------------------------- ```python import os import subprocess import time import json # Ensure the required environment variable is set FLOWISE_CHATFLOW_ID = os.getenv("FLOWISE_CHATFLOW_ID") if not FLOWISE_CHATFLOW_ID: print("Error: FLOWISE_CHATFLOW_ID environment variable is not set.") exit(1) # Define requests initialize_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "1.0", "capabilities": {}, "clientInfo": {"name": "valid-client", "version": "0.1"} } } list_tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list" } call_tool_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": FLOWISE_CHATFLOW_ID, # Use the valid chatflow ID "arguments": {"question": "What is AI?"} } } # Start MCP server process = subprocess.Popen( ["uvx", "--from", ".", "mcp-flowise"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) try: # Initialize the server print("Sending initialize request...") process.stdin.write(json.dumps(initialize_request) + "\n") process.stdin.flush() # Wait until the server sends a response to the initialize request time.sleep(0.5) stdout_line = process.stdout.readline() while "id" not in stdout_line: # Look for a response containing "id" print(f"Server Response: {stdout_line.strip()}") stdout_line = process.stdout.readline() print("Initialization complete.") # List tools print("Sending tools/list request...") process.stdin.write(json.dumps(list_tools_request) + "\n") process.stdin.flush() time.sleep(0.5) # Call the valid tool print(f"Sending tools/call request for chatflow '{FLOWISE_CHATFLOW_ID}'...") process.stdin.write(json.dumps(call_tool_request) + "\n") process.stdin.flush() time.sleep(1) # Capture output stdout, stderr = process.communicate(timeout=5) # Print responses print("STDOUT:") print(stdout) print("STDERR:") print(stderr) except subprocess.TimeoutExpired: print("MCP server process timed out.") process.kill() except Exception as e: print(f"An error occurred: {e}") finally: process.terminate() ``` -------------------------------------------------------------------------------- /tests/integration/test_flowise_integration.py: -------------------------------------------------------------------------------- ```python """ Integration tests for Flowise MCP. These tests will run conditionally if the required environment variables are configured. """ import os import unittest from mcp_flowise.utils import fetch_chatflows, flowise_predict class IntegrationTests(unittest.TestCase): """ Integration tests for Flowise MCP. """ @unittest.skipUnless( os.getenv("FLOWISE_API_KEY") and os.getenv("FLOWISE_API_ENDPOINT"), "FLOWISE_API_KEY and FLOWISE_API_ENDPOINT must be set for integration tests.", ) def test_tool_discovery_in_lowlevel_mode(self): """ Test tool discovery in low-level mode by fetching tools from the Flowise server. """ chatflows = fetch_chatflows() self.assertGreater(len(chatflows), 0, "No chatflows discovered. Ensure the Flowise server is configured correctly.") print(f"Discovered chatflows: {[cf['name'] for cf in chatflows]}") @unittest.skipUnless( os.getenv("FLOWISE_API_KEY") and os.getenv("FLOWISE_API_ENDPOINT"), "FLOWISE_API_KEY and FLOWISE_API_ENDPOINT must be set for tool tests.", ) def test_call_specific_tool(self): """ Test calling a specific tool if available on the Flowise server. """ chatflows = fetch_chatflows() if not chatflows: self.skipTest("No chatflows discovered on the server. Skipping tool test.") # Handle cases with and without the FLOWISE_CHATFLOW_ID environment variable specific_chatflow_id = os.getenv("FLOWISE_CHATFLOW_ID") if specific_chatflow_id: # Look for the specified chatflow ID chatflow = next((cf for cf in chatflows if cf["id"] == specific_chatflow_id), None) if not chatflow: self.skipTest(f"Specified chatflow ID {specific_chatflow_id} not found. Skipping tool test.") else: # Fallback to the first chatflow if no ID is specified chatflow = chatflows[0] tool_name = chatflow.get("name") print(f"Testing tool: {tool_name} with ID: {chatflow['id']}") # Simulate tool call result = self.simulate_tool_call(tool_name, chatflow["id"], "Tell me a fun fact.") self.assertTrue( result.strip(), f"Unexpected empty response from tool {tool_name}: {result}" ) def simulate_tool_call(self, tool_name, chatflow_id, question): """ Simulates a tool call by directly using the flowise_predict function. Args: tool_name (str): The name of the tool. chatflow_id (str): The ID of the chatflow/tool. question (str): The question to ask. Returns: str: The response from the tool. """ return flowise_predict(chatflow_id, question) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /tests/integration/test_tool_registration_integration.py: -------------------------------------------------------------------------------- ```python import os import unittest import asyncio from mcp_flowise.server_lowlevel import run_server from mcp import types from multiprocessing import Process from time import sleep class TestToolRegistrationIntegration(unittest.TestCase): """ True integration test for tool registration and listing. """ @classmethod def setUpClass(cls): """ Set up the test environment and server. """ # Set the environment variable for chatflow descriptions os.environ["FLOWISE_CHATFLOW_DESCRIPTIONS"] = ( "chatflow1:Test Chatflow 1,chatflow2:Test Chatflow 2" ) # Start the server using asyncio.create_task # cls.loop = asyncio.get_event_loop() cls.loop = asyncio.new_event_loop() asyncio.set_event_loop(cls.loop) cls.server_task = cls.loop.create_task(cls.start_server()) @classmethod async def start_server(cls): """ Start the server as a coroutine. """ await run_server() @classmethod def tearDownClass(cls): """ Clean up the server task. """ cls.server_task.cancel() def test_tool_registration_and_listing(self): """ Test that tools are correctly registered and listed at runtime. """ async def run_client(): # Create a ListToolsRequest list_tools_request = types.ListToolsRequest(method="tools/list") # Simulate the request and get the response response = await self.mock_client_request(list_tools_request) # Validate the response tools = response.root.tools assert len(tools) == 2, "Expected 2 tools to be registered" assert tools[0].name == "test_chatflow_1" assert tools[0].description == "Test Chatflow 1" assert tools[1].name == "test_chatflow_2" assert tools[1].description == "Test Chatflow 2" asyncio.run(run_client()) async def mock_client_request(self, request): """ Mock client request for testing purposes. Replace with actual client logic. """ return types.ServerResult( root=types.ListToolsResult( tools=[ types.Tool( name="test_chatflow_1", description="Test Chatflow 1", inputSchema={ "type": "object", "properties": { "question": {"type": "string"} }, "required": ["question"] } ), types.Tool( name="test_chatflow_2", description="Test Chatflow 2", inputSchema={ "type": "object", "properties": { "question": {"type": "string"} }, "required": ["question"] } ), ] ) ) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /tests/unit/test_chatflow_filters.py: -------------------------------------------------------------------------------- ```python import os import unittest from unittest.mock import patch from mcp_flowise.utils import filter_chatflows class TestChatflowFilters(unittest.TestCase): """ Unit tests for chatflow filtering logic in mcp_flowise.utils. """ def setUp(self): """ Reset the environment variables for filtering logic. """ os.environ.pop("FLOWISE_WHITELIST_ID", None) os.environ.pop("FLOWISE_BLACKLIST_ID", None) os.environ.pop("FLOWISE_WHITELIST_NAME_REGEX", None) os.environ.pop("FLOWISE_BLACKLIST_NAME_REGEX", None) def test_no_filters(self): """ Test that all chatflows are returned when no filters are set. """ chatflows = [ {"id": "chatflow1", "name": "First Chatflow"}, {"id": "chatflow2", "name": "Second Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), len(chatflows)) self.assertListEqual(filtered, chatflows) @patch.dict(os.environ, {"FLOWISE_WHITELIST_ID": "chatflow1,chatflow3"}) def test_whitelist_id_filter(self): """ Test that only whitelisted chatflows by ID are returned. """ chatflows = [ {"id": "chatflow1", "name": "First Chatflow"}, {"id": "chatflow2", "name": "Second Chatflow"}, {"id": "chatflow3", "name": "Third Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 2) self.assertTrue(all(cf["id"] in {"chatflow1", "chatflow3"} for cf in filtered)) @patch.dict(os.environ, {"FLOWISE_BLACKLIST_ID": "chatflow2"}) def test_blacklist_id_filter(self): """ Test that blacklisted chatflows by ID are excluded. """ chatflows = [ {"id": "chatflow1", "name": "First Chatflow"}, {"id": "chatflow2", "name": "Second Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 1) self.assertEqual(filtered[0]["id"], "chatflow1") @patch.dict(os.environ, {"FLOWISE_WHITELIST_NAME_REGEX": ".*First.*"}) def test_whitelist_name_regex_filter(self): """ Test that only chatflows matching the whitelist name regex are returned. """ chatflows = [ {"id": "chatflow1", "name": "First Chatflow"}, {"id": "chatflow2", "name": "Second Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 1) self.assertEqual(filtered[0]["name"], "First Chatflow") @patch.dict(os.environ, {"FLOWISE_BLACKLIST_NAME_REGEX": ".*Second.*"}) def test_blacklist_name_regex_filter(self): """ Test that chatflows matching the blacklist name regex are excluded. """ chatflows = [ {"id": "chatflow1", "name": "First Chatflow"}, {"id": "chatflow2", "name": "Second Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 1) self.assertEqual(filtered[0]["name"], "First Chatflow") @patch.dict( os.environ, { "FLOWISE_WHITELIST_ID": "chatflow1", "FLOWISE_BLACKLIST_NAME_REGEX": ".*Second.*", }, ) def test_whitelist_and_blacklist_combined(self): """ Test that whitelist takes precedence over blacklist. """ chatflows = [ {"id": "chatflow1", "name": "Second Chatflow"}, {"id": "chatflow2", "name": "Another Chatflow"}, ] filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 1) self.assertEqual(filtered[0]["id"], "chatflow1") if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /mcp_flowise/server_fastmcp.py: -------------------------------------------------------------------------------- ```python """ Provides the FastMCP server logic for mcp_flowise. This server exposes a limited set of tools (list_chatflows, create_prediction) and uses environment variables to determine the chatflow or assistant configuration. """ import os import sys import json from mcp.server.fastmcp import FastMCP from mcp_flowise.utils import flowise_predict, fetch_chatflows, redact_api_key, setup_logging # Environment variables FLOWISE_API_KEY = os.getenv("FLOWISE_API_KEY", "") FLOWISE_API_ENDPOINT = os.getenv("FLOWISE_API_ENDPOINT", "http://localhost:3000") FLOWISE_CHATFLOW_ID = os.getenv("FLOWISE_CHATFLOW_ID") FLOWISE_ASSISTANT_ID = os.getenv("FLOWISE_ASSISTANT_ID") FLOWISE_CHATFLOW_DESCRIPTION = os.getenv("FLOWISE_CHATFLOW_DESCRIPTION") FLOWISE_CHATFLOW_WHITELIST = os.getenv("FLOWISE_CHATFLOW_WHITELIST") FLOWISE_CHATFLOW_BLACKLIST = os.getenv("FLOWISE_CHATFLOW_BLACKLIST") DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") # Configure logging logger = setup_logging(debug=DEBUG) # Log key environment variable values logger.debug(f"Flowise API Key (redacted): {redact_api_key(FLOWISE_API_KEY)}") logger.debug(f"Flowise API Endpoint: {FLOWISE_API_ENDPOINT}") logger.debug(f"Flowise Chatflow ID: {FLOWISE_CHATFLOW_ID}") logger.debug(f"Flowise Assistant ID: {FLOWISE_ASSISTANT_ID}") logger.debug(f"Flowise Chatflow Description: {FLOWISE_CHATFLOW_DESCRIPTION}") # Initialize MCP Server mcp = FastMCP("FlowiseMCP-with-EnvAuth") @mcp.tool() def list_chatflows() -> str: """ List all available chatflows from the Flowise API. This function respects optional whitelisting or blacklisting if configured via FLOWISE_CHATFLOW_WHITELIST or FLOWISE_CHATFLOW_BLACKLIST. Returns: str: A JSON-encoded string of filtered chatflows. """ logger.debug("Handling list_chatflows tool.") chatflows = fetch_chatflows() # Apply whitelisting if FLOWISE_CHATFLOW_WHITELIST: whitelist = set(FLOWISE_CHATFLOW_WHITELIST.split(",")) chatflows = [cf for cf in chatflows if cf["id"] in whitelist] logger.debug(f"Applied whitelist filter: {whitelist}") # Apply blacklisting if FLOWISE_CHATFLOW_BLACKLIST: blacklist = set(FLOWISE_CHATFLOW_BLACKLIST.split(",")) chatflows = [cf for cf in chatflows if cf["id"] not in blacklist] logger.debug(f"Applied blacklist filter: {blacklist}") logger.debug(f"Filtered chatflows: {chatflows}") return json.dumps(chatflows) @mcp.tool() def create_prediction(*, chatflow_id: str = None, question: str) -> str: """ Create a prediction by sending a question to a specific chatflow or assistant. Args: chatflow_id (str, optional): The ID of the chatflow to use. Defaults to FLOWISE_CHATFLOW_ID. question (str): The question or prompt to send to the chatflow. Returns: str: The raw JSON response from Flowise API or an error message if something goes wrong. """ logger.debug(f"create_prediction called with chatflow_id={chatflow_id}, question={question}") chatflow_id = chatflow_id or FLOWISE_CHATFLOW_ID if not chatflow_id and not FLOWISE_ASSISTANT_ID: logger.error("No chatflow_id or assistant_id provided or pre-configured.") return json.dumps({"error": "chatflow_id or assistant_id is required"}) try: # Determine which chatflow ID to use target_chatflow_id = chatflow_id or FLOWISE_ASSISTANT_ID # Call the prediction function and return the raw JSON result result = flowise_predict(target_chatflow_id, question) logger.debug(f"Prediction result: {result}") return result # Returning raw JSON as a string except Exception as e: logger.error(f"Unhandled exception in create_prediction: {e}", exc_info=True) return json.dumps({"error": str(e)}) def run_simple_server(): """ Run the FastMCP version of the Flowise server. This function ensures proper configuration and handles server initialization. Raises: SystemExit: If both FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID are set simultaneously. """ if FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID: logger.error("Both FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID are set. Set only one.") sys.exit(1) try: logger.debug("Starting MCP server (FastMCP version)...") mcp.run(transport="stdio") except Exception as e: logger.error("Unhandled exception in MCP server.", exc_info=True) sys.exit(1) ``` -------------------------------------------------------------------------------- /tests/unit/test_utils.py: -------------------------------------------------------------------------------- ```python import unittest from unittest.mock import patch, Mock import requests from mcp_flowise.utils import flowise_predict, fetch_chatflows, filter_chatflows, normalize_tool_name class TestUtils(unittest.TestCase): @patch("requests.post") def test_flowise_predict_success(self, mock_post: Mock) -> None: """ Test successful prediction response. """ mock_post.return_value = Mock( status_code=200, text='{"text": "Mock Prediction"}', ) response = flowise_predict("valid_chatflow_id", "What's AI?") self.assertEqual(response, '{"text": "Mock Prediction"}') # Success case mock_post.assert_called_once() @patch("requests.post", side_effect=requests.Timeout) def test_flowise_predict_timeout(self, mock_post: Mock) -> None: """ Test prediction handling of timeout. """ response = flowise_predict("valid_chatflow_id", "What's AI?") self.assertIn("error", response) # Assert the response contains the error key # self.assertIn("Timeout", response) # Timeout-specific assertion @patch("requests.post") def test_flowise_predict_http_error(self, mock_post: Mock) -> None: """ Test prediction handling of HTTP errors. """ mock_post.return_value = Mock( status_code=500, raise_for_status=Mock(side_effect=requests.HTTPError("500 Error")), text='{"error": "500 Error"}', ) response = flowise_predict("valid_chatflow_id", "What's AI?") self.assertIn("error", response) self.assertIn("500 Error", response) @patch("requests.get") def test_fetch_chatflows_success(self, mock_get: Mock) -> None: """ Test successful fetching of chatflows. """ mock_get.return_value = Mock( status_code=200, json=Mock(return_value=[{"id": "1", "name": "Chatflow 1"}, {"id": "2", "name": "Chatflow 2"}]), ) chatflows = fetch_chatflows() self.assertEqual(len(chatflows), 2) self.assertEqual(chatflows[0]["id"], "1") self.assertEqual(chatflows[0]["name"], "Chatflow 1") mock_get.assert_called_once() @patch("requests.get", side_effect=requests.Timeout) def test_fetch_chatflows_timeout(self, mock_get: Mock) -> None: """ Test handling of timeout when fetching chatflows. """ chatflows = fetch_chatflows() self.assertEqual(chatflows, []) # Should return an empty list on timeout @patch("requests.get") def test_fetch_chatflows_http_error(self, mock_get: Mock) -> None: """ Test handling of HTTP errors when fetching chatflows. """ mock_get.return_value = Mock( status_code=500, raise_for_status=Mock(side_effect=requests.HTTPError("500 Error")), ) chatflows = fetch_chatflows() self.assertEqual(chatflows, []) # Should return an empty list on HTTP error def test_filter_chatflows(self) -> None: """ Test filtering of chatflows based on whitelist and blacklist criteria. """ chatflows = [ {"id": "1", "name": "Chatflow 1"}, {"id": "2", "name": "Chatflow 2"}, {"id": "3", "name": "Chatflow 3"}, ] # Mock environment variables with patch.dict("os.environ", { "FLOWISE_WHITELIST_ID": "1,2", "FLOWISE_BLACKLIST_ID": "3", "FLOWISE_WHITELIST_NAME_REGEX": "", "FLOWISE_BLACKLIST_NAME_REGEX": "", }): filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 2) self.assertEqual(filtered[0]["id"], "1") self.assertEqual(filtered[1]["id"], "2") # Mock environment variables for blacklist only with patch.dict("os.environ", { "FLOWISE_WHITELIST_ID": "", "FLOWISE_BLACKLIST_ID": "2", "FLOWISE_WHITELIST_NAME_REGEX": "", "FLOWISE_BLACKLIST_NAME_REGEX": "", }): filtered = filter_chatflows(chatflows) self.assertEqual(len(filtered), 2) self.assertEqual(filtered[0]["id"], "1") self.assertEqual(filtered[1]["id"], "3") def test_normalize_tool_name(self) -> None: """ Test normalization of tool names. """ self.assertEqual(normalize_tool_name("Tool Name"), "tool_name") self.assertEqual(normalize_tool_name("Tool-Name"), "tool_name") self.assertEqual(normalize_tool_name("Tool_Name"), "tool_name") self.assertEqual(normalize_tool_name("ToolName"), "toolname") self.assertEqual(normalize_tool_name(""), "unknown_tool") self.assertEqual(normalize_tool_name(None), "unknown_tool") if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /mcp_flowise/server_lowlevel.py: -------------------------------------------------------------------------------- ```python ''' Low-Level Server for the Flowise MCP. This server dynamically registers tools based on the provided chatflows retrieved from the Flowise API. Tool names are normalized for safety and consistency, and potential conflicts are logged. Descriptions for tools are prioritized from FLOWISE_CHATFLOW_DESCRIPTIONS, falling back to the chatflow names when not provided. Conflicts in tool names after normalization are handled gracefully by skipping those chatflows. ''' import os import sys import asyncio import json from typing import List, Dict, Any from mcp import types from mcp.server.lowlevel import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp_flowise.utils import ( flowise_predict, fetch_chatflows, normalize_tool_name, setup_logging, ) # Configure logging DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") logger = setup_logging(debug=DEBUG) # Global tool mapping: tool name to chatflow ID NAME_TO_ID_MAPPING: Dict[str, str] = {} tools: List[types.Tool] = [] # Initialize the Low-Level MCP Server mcp = Server("FlowiseMCP-with-EnvAuth") def get_chatflow_descriptions() -> Dict[str, str]: """ Parse the FLOWISE_CHATFLOW_DESCRIPTIONS environment variable for descriptions. Returns: dict: A dictionary mapping chatflow IDs to descriptions. """ descriptions_env = os.getenv("FLOWISE_CHATFLOW_DESCRIPTIONS", "") if not descriptions_env: logger.debug("No FLOWISE_CHATFLOW_DESCRIPTIONS provided.") return {} logger.debug("Retrieved FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions_env) descriptions = {} for pair in descriptions_env.split(","): if ":" not in pair: logger.warning("Invalid format in FLOWISE_CHATFLOW_DESCRIPTIONS: %s", pair) continue chatflow_id, description = map(str.strip, pair.split(":", 1)) if chatflow_id and description: descriptions[chatflow_id] = description logger.debug("Parsed FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions) return descriptions async def dispatcher_handler(request: types.CallToolRequest) -> types.ServerResult: """ Dispatcher handler that routes CallToolRequest to the appropriate tool handler based on the tool name. """ try: tool_name = request.params.name logger.debug("Dispatcher received CallToolRequest for tool: %s", tool_name) if tool_name not in NAME_TO_ID_MAPPING: logger.error("Unknown tool requested: %s", tool_name) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text="Unknown tool requested")] ) ) chatflow_id = NAME_TO_ID_MAPPING[tool_name] question = request.params.arguments.get("question", "") if not question: logger.error("Missing 'question' argument for tool: %s", tool_name) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text="Missing 'question' argument.")] ) ) logger.debug("Dispatching prediction for chatflow_id: %s with question: %s", chatflow_id, question) # Call the prediction function try: result = flowise_predict(chatflow_id, question) logger.debug("Prediction result: %s", result) except Exception as pred_err: logger.error("Error during prediction: %s", pred_err, exc_info=True) result = json.dumps({"error": "Error occurred during prediction."}) # Pass the raw JSON response or error JSON back to the client return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text=result)] ) ) except Exception as e: logger.error("Unhandled exception in dispatcher_handler: %s", e, exc_info=True) return types.ServerResult( root=types.CallToolResult( content=[types.TextContent(type="text", text=json.dumps({"error": "Internal server error."}))] # Ensure JSON is returned ) ) async def list_tools(request: types.ListToolsRequest) -> types.ServerResult: """ Handler for ListToolsRequest to list all registered tools. Args: request (types.ListToolsRequest): The request to list tools. Returns: types.ServerResult: The result containing the list of tools. """ logger.debug("Handling list_tools request.") return types.ServerResult(root=types.ListToolsResult(tools=tools)) def register_tools(chatflows: List[Dict[str, Any]], chatflow_descriptions: Dict[str, str]) -> List[types.Tool]: """ Register tools dynamically based on the provided chatflows. Args: chatflows (List[Dict[str, Any]]): List of chatflows retrieved from the Flowise API. chatflow_descriptions (Dict[str, str]): Dictionary mapping chatflow IDs to descriptions. Returns: List[types.Tool]: List of registered tools. """ global tools tools = [] # Clear existing tools before re-registration for chatflow in chatflows: try: normalized_name = normalize_tool_name(chatflow["name"]) if normalized_name in NAME_TO_ID_MAPPING: logger.warning( "Tool name conflict: '%s' already exists. Skipping chatflow '%s' (ID: '%s').", normalized_name, chatflow["name"], chatflow["id"], ) continue NAME_TO_ID_MAPPING[normalized_name] = chatflow["id"] description = chatflow_descriptions.get(chatflow["id"], chatflow["name"]) tool = types.Tool( name=normalized_name, description=description, inputSchema={ "type": "object", "required": ["question"], "properties": {"question": {"type": "string"}}, }, ) tools.append(tool) logger.debug("Registered tool: %s (ID: %s)", tool.name, chatflow["id"]) except Exception as e: logger.error("Error registering chatflow '%s' (ID: '%s'): %s", chatflow["name"], chatflow["id"], e) return tools async def start_server(): """ Start the Low-Level MCP server. """ logger.debug("Starting Low-Level MCP server...") try: async with stdio_server() as (read_stream, write_stream): await mcp.run( read_stream, write_stream, initialization_options=InitializationOptions( server_name="FlowiseMCP-with-EnvAuth", server_version="0.1.0", capabilities=types.ServerCapabilities(), ), ) except Exception as e: logger.critical("Unhandled exception in MCP server: %s", e) sys.exit(1) def run_server(): """ Run the Low-Level Flowise server by registering tools dynamically. """ try: chatflows = fetch_chatflows() if not chatflows: raise ValueError("No chatflows retrieved from the Flowise API.") except Exception as e: logger.critical("Failed to start server: %s", e) sys.exit(1) chatflow_descriptions = get_chatflow_descriptions() register_tools(chatflows, chatflow_descriptions) if not tools: logger.critical("No valid tools registered. Shutting down the server.") sys.exit(1) mcp.request_handlers[types.CallToolRequest] = dispatcher_handler logger.debug("Registered dispatcher_handler for CallToolRequest.") mcp.request_handlers[types.ListToolsRequest] = list_tools logger.debug("Registered list_tools handler.") try: asyncio.run(start_server()) except KeyboardInterrupt: logger.debug("MCP server shutdown initiated by user.") except Exception as e: logger.critical("Failed to start MCP server: %s", e) sys.exit(1) if __name__ == "__main__": run_server() ``` -------------------------------------------------------------------------------- /mcp_flowise/utils.py: -------------------------------------------------------------------------------- ```python """ Utility functions for mcp_flowise, including logging setup, chatflow filtering, and Flowise API interactions. This module centralizes shared functionality such as: 1. Logging configuration for consistent log output across the application. 2. Safe redaction of sensitive data like API keys in logs. 3. Low-level interactions with the Flowise API for predictions and chatflow management. 4. Flexible filtering of chatflows based on whitelist/blacklist criteria. """ import os import sys import logging import requests import re import json from dotenv import load_dotenv # Load environment variables from .env if present load_dotenv() # Flowise API configuration FLOWISE_API_KEY = os.getenv("FLOWISE_API_KEY", "") FLOWISE_API_ENDPOINT = os.getenv("FLOWISE_API_ENDPOINT", "http://localhost:3000") def setup_logging(debug: bool = False, log_dir: str = None, log_file: str = "debug-mcp-flowise.log") -> logging.Logger: """ Sets up logging for the application, including outputting CRITICAL and ERROR logs to stdout. Args: debug (bool): If True, set log level to DEBUG; otherwise, INFO. log_dir (str): Directory where log files will be stored. Ignored if `FLOWISE_LOGFILE_PATH` is set. log_file (str): Name of the log file. Ignored if `FLOWISE_LOGFILE_PATH` is set. Returns: logging.Logger: Configured logger instance. """ log_path = os.getenv("FLOWISE_LOGFILE_PATH") if not log_path: if log_dir is None: log_dir = os.path.join(os.path.expanduser("~"), "mcp_logs") try: os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, log_file) except PermissionError as e: # Fallback to stdout logging if directory creation fails log_path = None print(f"[ERROR] Failed to create log directory: {e}", file=sys.stderr) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG if debug else logging.INFO) logger.propagate = False # Prevent log messages from propagating to the root logger # Remove all existing handlers to prevent accumulation for handler in logger.handlers[:]: logger.removeHandler(handler) handlers = [] if log_path: try: file_handler = logging.FileHandler(log_path, mode="a") file_handler.setLevel(logging.DEBUG if debug else logging.INFO) formatter = logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s") file_handler.setFormatter(formatter) handlers.append(file_handler) except Exception as e: print(f"[ERROR] Failed to create log file handler: {e}", file=sys.stderr) # Attempt to create StreamHandler for ERROR level logs try: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.ERROR) formatter = logging.Formatter("[%(levelname)s] %(message)s") stdout_handler.setFormatter(formatter) handlers.append(stdout_handler) except Exception as e: print(f"[ERROR] Failed to create stdout log handler: {e}", file=sys.stderr) # Add all handlers to the logger for handler in handlers: logger.addHandler(handler) if log_path: logger.debug(f"Logging initialized. Writing logs to {log_path}") else: logger.debug("Logging initialized. Logs will only appear in stdout.") return logger def redact_api_key(key: str) -> str: """ Redacts the Flowise API key for safe logging output. Args: key (str): The API key to redact. Returns: str: The redacted API key or '<not set>' if the key is invalid. """ if not key or len(key) <= 4: return "<not set>" return f"{key[:2]}{'*' * (len(key) - 4)}{key[-2:]}" def normalize_tool_name(name: str) -> str: """ Normalize tool names by converting to lowercase and replacing non-alphanumeric characters with underscores. Args: name (str): Original tool name. Returns: str: Normalized tool name. Returns 'unknown_tool' if the input is invalid. """ logger = logging.getLogger(__name__) if not name or not isinstance(name, str): logger.warning("Invalid tool name input: %s. Using default 'unknown_tool'.", name) return "unknown_tool" normalized = re.sub(r"[^a-zA-Z0-9]", "_", name).lower() logger.debug("Normalized tool name from '%s' to '%s'", name, normalized) return normalized or "unknown_tool" def filter_chatflows(chatflows: list[dict]) -> list[dict]: """ Filters chatflows based on whitelist and blacklist criteria. Whitelist takes precedence over blacklist. Args: chatflows (list[dict]): A list of chatflow dictionaries. Returns: list[dict]: Filtered list of chatflows. """ logger = logging.getLogger(__name__) # Dynamically fetch filtering criteria whitelist_ids = set(filter(bool, os.getenv("FLOWISE_WHITELIST_ID", "").split(","))) blacklist_ids = set(filter(bool, os.getenv("FLOWISE_BLACKLIST_ID", "").split(","))) whitelist_name_regex = os.getenv("FLOWISE_WHITELIST_NAME_REGEX", "") blacklist_name_regex = os.getenv("FLOWISE_BLACKLIST_NAME_REGEX", "") filtered_chatflows = [] for chatflow in chatflows: chatflow_id = chatflow.get("id", "") chatflow_name = chatflow.get("name", "") # Flags to determine inclusion is_whitelisted = False # Check Whitelist if whitelist_ids or whitelist_name_regex: if whitelist_ids and chatflow_id in whitelist_ids: is_whitelisted = True if whitelist_name_regex and re.search(whitelist_name_regex, chatflow_name): is_whitelisted = True if is_whitelisted: # If whitelisted, include regardless of blacklist logger.debug("Including whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) filtered_chatflows.append(chatflow) continue # Skip blacklist checks else: # If not whitelisted, exclude regardless of blacklist logger.debug("Excluding non-whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) continue else: # If no whitelist, apply blacklist directly if blacklist_ids and chatflow_id in blacklist_ids: logger.debug("Skipping chatflow '%s' (ID: '%s') - In blacklist.", chatflow_name, chatflow_id) continue # Exclude blacklisted by ID if blacklist_name_regex and re.search(blacklist_name_regex, chatflow_name): logger.debug("Skipping chatflow '%s' (ID: '%s') - Name matches blacklist regex.", chatflow_name, chatflow_id) continue # Exclude blacklisted by name # Include the chatflow if it passes all filters logger.debug("Including chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id) filtered_chatflows.append(chatflow) logger.debug("Filtered chatflows: %d out of %d", len(filtered_chatflows), len(chatflows)) return filtered_chatflows def flowise_predict(chatflow_id: str, question: str) -> str: """ Sends a question to a specific chatflow ID via the Flowise API and returns the response JSON text. Args: chatflow_id (str): The ID of the Flowise chatflow to be used. question (str): The question or prompt to send to the chatflow. Returns: str: The raw JSON response text from the Flowise API, or an error message if something goes wrong. """ logger = logging.getLogger(__name__) # Construct the Flowise API URL for predictions url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/prediction/{chatflow_id}" headers = { "Content-Type": "application/json", } if FLOWISE_API_KEY: headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}" payload = {"question": question} logger.debug(f"Sending prediction request to {url} with payload: {payload}") try: # Send POST request to the Flowise API response = requests.post(url, json=payload, headers=headers, timeout=30) logger.debug(f"Prediction response code: HTTP {response.status_code}") # response.raise_for_status() # Log the raw response text for debugging logger.debug(f"Raw prediction response: {response.text}") # Return the raw JSON response text return response.text #except requests.exceptions.RequestException as e: except Exception as e: # Log and return an error message logger.error(f"Error during prediction: {e}") return json.dumps({"error": str(e)}) def fetch_chatflows() -> list[dict]: """ Fetch a list of all chatflows from the Flowise API. Returns: list of dict: Each dict contains the 'id' and 'name' of a chatflow. Returns an empty list if there's an error. """ logger = logging.getLogger(__name__) # Construct the Flowise API URL for fetching chatflows url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/chatflows" headers = {} if FLOWISE_API_KEY: headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}" logger.debug(f"Fetching chatflows from {url}") try: # Send GET request to the Flowise API response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() # Parse and simplify the response data chatflows_data = response.json() simplified_chatflows = [{"id": cf["id"], "name": cf["name"]} for cf in chatflows_data] logger.debug(f"Fetched chatflows: {simplified_chatflows}") return filter_chatflows(simplified_chatflows) #except requests.exceptions.RequestException as e: except Exception as e: # Log and return an empty list on error logger.error(f"Error fetching chatflows: {e}") return [] # Set up logging before obtaining the logger DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") logger = setup_logging(debug=DEBUG) # Log key environment variable values logger.debug(f"Flowise API Key (redacted): {redact_api_key(FLOWISE_API_KEY)}") logger.debug(f"Flowise API Endpoint: {FLOWISE_API_ENDPOINT}") ```