#
tokens: 14365/50000 22/22 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .github
│   └── workflows
│       └── python-pytest.yml
├── .gitignore
├── Dockerfile
├── LICENSE
├── mcp_flowise
│   ├── __init__.py
│   ├── __main__.py
│   ├── server_fastmcp.py
│   ├── server_lowlevel.py
│   └── utils.py
├── pyproject.toml
├── pytest.ini
├── README.md
├── smithery.yaml
├── test_mcp_call_tool_valid.py
├── test_mcp_handshake.py
├── tests
│   ├── fixtures
│   │   └── sample_chatflows.json
│   ├── integration
│   │   ├── test_flowise_integration.py
│   │   ├── test_tool_prediction.py
│   │   └── test_tool_registration_integration.py
│   ├── README.md
│   └── unit
│       ├── test_chatflow_filters.py
│       └── test_utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# Copy this file to .env and update the values as needed:

# Flowise API key (Bearer token)
FLOWISE_API_KEY=your_flowise_api_key_here

# Flowise endpoint (default is http://localhost:3000)
FLOWISE_API_ENDPOINT=http://localhost:3000

# FastMCP Mode: Optionally set ONE or BOTH of these to lock in specific Chatflow or Assistant:
FLOWISE_CHATFLOW_ID=
FLOWISE_ASSISTANT_ID=

# LowLevel Mode: Dynamically expose tools for each chatflow/assistant
# Comma-separated list of chatflow IDs and their descriptions, e.g.:
# "chatflow_id:My \\"First\\" Chatflow,another_id:My Second Chatflow"
FLOWISE_CHATFLOW_DESCRIPTIONS=

# Optional filters for FastMCP Mode (ignored in LowLevel Mode):
# Whitelist: Comma-separated list of chatflow IDs to allow
FLOWISE_CHATFLOW_WHITELIST=
# Blacklist: Comma-separated list of chatflow IDs to deny
FLOWISE_CHATFLOW_BLACKLIST=

# Notes:
# - If neither FLOWISE_CHATFLOW_ID nor FLOWISE_ASSISTANT_ID is set:
#     - Exposes 'list_chatflows' and 'create_prediction(chatflow_id, question)'.
# - If exactly one is set:
#     - Exposes 'create_prediction(question)'.
# - If both are set:
#     - The server will refuse to start.
# - FLOWISE_CHATFLOW_DESCRIPTIONS is required for LowLevel Mode to dynamically expose tools.

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# PyPI configuration file
.pypirc
*.bak
```

--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------

```markdown
# Testing `mcp-flowise`

## Structure
- `unit/`: Contains unit tests for individual modules.
- `integration/`: Contains integration tests for end-to-end scenarios.
- `fixtures/`: Contains static example data and environment files for mocking.

## Running Tests
1. Install test dependencies:
   ```bash
   pip install pytest


```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# mcp-flowise

[![smithery badge](https://smithery.ai/badge/@matthewhand/mcp-flowise)](https://smithery.ai/server/@matthewhand/mcp-flowise)

`mcp-flowise` is a Python package implementing a Model Context Protocol (MCP) server that integrates with the Flowise API. It provides a standardized and flexible way to list chatflows, create predictions, and dynamically register tools for Flowise chatflows or assistants.

It supports two operation modes:

- **LowLevel Mode (Default)**: Dynamically registers tools for all chatflows retrieved from the Flowise API.
- **FastMCP Mode**: Provides static tools for listing chatflows and creating predictions, suitable for simpler configurations.

<p align="center">
<img src="https://github.com/user-attachments/assets/d27afb05-c5d3-4cc9-9918-f7be8c715304" alt="Claude Desktop Screenshot">
</p>

---

## Features

- **Dynamic Tool Exposure**: LowLevel mode dynamically creates tools for each chatflow or assistant.
- **Simpler Configuration**: FastMCP mode exposes `list_chatflows` and `create_prediction` tools for minimal setup.
- **Flexible Filtering**: Both modes support filtering chatflows via whitelists and blacklists by IDs or names (regex).
- **MCP Integration**: Integrates seamlessly into MCP workflows.

---

## Installation

### Installing via Smithery

To install mcp-flowise for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@matthewhand/mcp-flowise):

```bash
npx -y @smithery/cli install @matthewhand/mcp-flowise --client claude
```

### Prerequisites

- Python 3.12 or higher
- `uvx` package manager

### Install and Run via `uvx`

Confirm you can run the server directly from the GitHub repository using `uvx`:

```bash
uvx --from git+https://github.com/matthewhand/mcp-flowise mcp-flowise
```

### Adding to MCP Ecosystem (`mcpServers` Configuration)

You can integrate `mcp-flowise` into your MCP ecosystem by adding it to the `mcpServers` configuration. Example:

```json
{
    "mcpServers": {
        "mcp-flowise": {
            "command": "uvx",
            "args": [
                "--from",
                "git+https://github.com/matthewhand/mcp-flowise",
                "mcp-flowise"
            ],
            "env": {
                "FLOWISE_API_KEY": "${FLOWISE_API_KEY}",
                "FLOWISE_API_ENDPOINT": "${FLOWISE_API_ENDPOINT}"
            }
        }
    }
}
```

---

## Modes of Operation

### 1. FastMCP Mode (Simple Mode)

Enabled by setting `FLOWISE_SIMPLE_MODE=true`. This mode:
- Exposes two tools: `list_chatflows` and `create_prediction`.
- Allows static configuration using `FLOWISE_CHATFLOW_ID` or `FLOWISE_ASSISTANT_ID`.
- Lists all available chatflows via `list_chatflows`.

<p align="center">
<img src="https://github.com/user-attachments/assets/0901ef9c-5d56-4f1e-a799-1e5d8e8343bd" alt="FastMCP Mode">
</p>

### 2. LowLevel Mode (FLOWISE_SIMPLE_MODE=False)

**Features**:
- Dynamically registers all chatflows as separate tools.
- Tools are named after chatflow names (normalized).
- Uses descriptions from the `FLOWISE_CHATFLOW_DESCRIPTIONS` variable, falling back to chatflow names if no description is provided.

**Example**:
- `my_tool(question: str) -> str` dynamically created for a chatflow.

---
## Running on Windows with `uvx`

If you're using `uvx` on Windows and encounter issues with `--from git+https`, the recommended solution is to clone the repository locally and configure the `mcpServers` with the full path to `uvx.exe` and the cloned repository. Additionally, include `APPDATA`, `LOGLEVEL`, and other environment variables as required.

### Example Configuration for MCP Ecosystem (`mcpServers` on Windows)

```json
{
  "mcpServers": {
    "flowise": {
      "command": "C:\\Users\\matth\\.local\\bin\\uvx.exe",
      "args": [
        "--from",
        "C:\\Users\\matth\\downloads\\mcp-flowise",
        "mcp-flowise"
      ],
      "env": {
        "LOGLEVEL": "ERROR",
        "APPDATA": "C:\\Users\\matth\\AppData\\Roaming",
        "FLOWISE_API_KEY": "your-api-key-goes-here",
        "FLOWISE_API_ENDPOINT": "http://localhost:3000/"
      }
    }
  }
}
```

### Notes

- **Full Paths**: Use full paths for both `uvx.exe` and the cloned repository.
- **Environment Variables**: Point `APPDATA` to your Windows user profile (e.g., `C:\\Users\\<username>\\AppData\\Roaming`) if needed.
- **Log Level**: Adjust `LOGLEVEL` as needed (`ERROR`, `INFO`, `DEBUG`, etc.).

## Environment Variables

### General

- `FLOWISE_API_KEY`: Your Flowise API Bearer token (**required**).
- `FLOWISE_API_ENDPOINT`: Base URL for Flowise (default: `http://localhost:3000`).

### LowLevel Mode (Default)

- `FLOWISE_CHATFLOW_DESCRIPTIONS`: Comma-separated list of `chatflow_id:description` pairs. Example:
  ```
  FLOWISE_CHATFLOW_DESCRIPTIONS="abc123:Chatflow One,xyz789:Chatflow Two"
  ```

### FastMCP Mode (`FLOWISE_SIMPLE_MODE=true`)

- `FLOWISE_CHATFLOW_ID`: Single Chatflow ID (optional).
- `FLOWISE_ASSISTANT_ID`: Single Assistant ID (optional).
- `FLOWISE_CHATFLOW_DESCRIPTION`: Optional description for the single tool exposed.

---

## Filtering Chatflows

Filters can be applied in both modes using the following environment variables:

- **Whitelist by ID**:  
  `FLOWISE_WHITELIST_ID="id1,id2,id3"`
- **Blacklist by ID**:  
  `FLOWISE_BLACKLIST_ID="id4,id5"`
- **Whitelist by Name (Regex)**:  
  `FLOWISE_WHITELIST_NAME_REGEX=".*important.*"`
- **Blacklist by Name (Regex)**:  
  `FLOWISE_BLACKLIST_NAME_REGEX=".*deprecated.*"`

> **Note**: Whitelists take precedence over blacklists. If both are set, the most restrictive rule is applied.

-
## Security

- **Protect Your API Key**: Ensure the `FLOWISE_API_KEY` is kept secure and not exposed in logs or repositories.
- **Environment Configuration**: Use `.env` files or environment variables for sensitive configurations.

Add `.env` to your `.gitignore`:

```bash
# .gitignore
.env
```

---

## Troubleshooting

- **Missing API Key**: Ensure `FLOWISE_API_KEY` is set correctly.
- **Invalid Configuration**: If both `FLOWISE_CHATFLOW_ID` and `FLOWISE_ASSISTANT_ID` are set, the server will refuse to start.
- **Connection Errors**: Verify `FLOWISE_API_ENDPOINT` is reachable.

---

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.

## TODO

- [x] Fastmcp mode
- [x] Lowlevel mode
- [x] Filtering
- [x] Claude desktop integration
- [ ] Assistants

```

--------------------------------------------------------------------------------
/mcp_flowise/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------

```
[pytest]
# asyncio_mode = strict
# asyncio_default_fixture_loop_scope = function


```

--------------------------------------------------------------------------------
/tests/fixtures/sample_chatflows.json:
--------------------------------------------------------------------------------

```json
[
    {"id": "mock-id", "name": "Mock Chatflow 1"},
    {"id": "mock-id-2", "name": "Mock Chatflow 2"}
]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "mcp-flowise"
version = "0.1.0"
description = "MCP integration with the Flowise API for creating predictions and managing chatflows/assistants"
readme = "README.md"
authors = [
  { name = "Matthew Hand", email = "[email protected]" }
]
dependencies = [
  "mcp[cli]>=1.2.0",
  "python-dotenv>=1.0.1",
  "requests>=2.25.0",
]

[project.scripts]
mcp-flowise = "mcp_flowise.__main__:main"

[dependency-groups]
dev = [
    "pytest>=8.3.4",
]

[tool.setuptools.packages]
find = {include = ["mcp_flowise", "mcp_flowise.*"]}

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Use a Python base image that satisfies the project requirements
FROM python:3.12-slim AS base

# Install the uv package manager
RUN pip install uvx

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . .

# Install dependencies
RUN uvx sync --frozen --no-dev --no-editable

# Expose the port the app runs on
EXPOSE 8000

# Set environment variables required for running the MCP server
ENV FLOWISE_API_KEY=your_api_key
ENV FLOWISE_API_ENDPOINT=http://localhost:3000

# Define the command to run the app
CMD ["uvx", "--from", "git+https://github.com/matthewhand/mcp-flowise", "mcp-flowise"]
```

--------------------------------------------------------------------------------
/.github/workflows/python-pytest.yml:
--------------------------------------------------------------------------------

```yaml
name: Python Tests

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      # Checkout the repository
      - uses: actions/checkout@v4

      # Set up Python environment
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.12'

      # Install uv
      - name: Install uv
        uses: astral-sh/setup-uv@v4

      # Set up Python environment with uv
      - name: Set up Python
        run: uv python install

      # Sync dependencies with uv
      - name: Install dependencies
        run: uv sync --all-extras --dev

      # Run tests
      - name: Run tests
        run: uv run pytest tests/unit
        env:
          PYTHONPATH: ${{ github.workspace }}

```

--------------------------------------------------------------------------------
/tests/integration/test_tool_prediction.py:
--------------------------------------------------------------------------------

```python
import unittest
import os
from mcp_flowise.utils import flowise_predict

class TestToolPrediction(unittest.TestCase):
    """
    Integration test for predicting results from a Flowise chatflow.
    """

    def test_tool_prediction(self):
        """
        Test the prediction function for a Flowise chatflow.
        """
        # Check if FLOWISE_CHATFLOW_ID is set
        chatflow_id = os.getenv("FLOWISE_CHATFLOW_ID")
        if not chatflow_id:
            self.skipTest("FLOWISE_CHATFLOW_ID environment variable is not set.")
        
        question = "What is the weather like today?"
        print(f"Using chatflow_id: {chatflow_id}")
        
        # Make a prediction
        result = flowise_predict(chatflow_id, question)
        
        # Validate the response
        self.assertIsInstance(result, str)
        self.assertNotEqual(result.strip(), "", "Prediction result should not be empty.")
        print(f"Prediction result: {result}")

if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/deployments

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - flowiseApiKey
    properties:
      flowiseApiKey:
        type: string
        description: Your Flowise API Bearer token
      flowiseApiEndpoint:
        type: string
        default: http://localhost:3000
        description: Base URL for Flowise
      flowiseSimpleMode:
        type: boolean
        default: false
        description: Enable FastMCP mode for simpler configuration
      flowiseChatflowId:
        type: string
        description: Single Chatflow ID for FastMCP mode (optional)
      flowiseAssistantId:
        type: string
        description: Single Assistant ID for FastMCP mode (optional)
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    (config) => ({command: 'uvx', args: ['--from', 'git+https://github.com/matthewhand/mcp-flowise', 'mcp-flowise'], env: {FLOWISE_API_KEY: config.flowiseApiKey, FLOWISE_API_ENDPOINT: config.flowiseApiEndpoint || 'http://localhost:3000', FLOWISE_SIMPLE_MODE: config.flowiseSimpleMode ? 'true' : 'false', FLOWISE_CHATFLOW_ID: config.flowiseChatflowId || '', FLOWISE_ASSISTANT_ID: config.flowiseAssistantId || ''}})
```

--------------------------------------------------------------------------------
/test_mcp_handshake.py:
--------------------------------------------------------------------------------

```python
import subprocess
import time
import json

# Define JSON-RPC requests
initialize_request = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "initialize",
    "params": {
        "protocolVersion": "1.0",
        "capabilities": {},
        "clientInfo": {"name": "manual-client", "version": "0.1"}
    }
}

initialized_notification = {
    "jsonrpc": "2.0",
    "method": "notifications/initialized"
}

list_tools_request = {
    "jsonrpc": "2.0",
    "id": 2,
    "method": "tools/list"
}

# Start MCP server
process = subprocess.Popen(
    ["uvx", "--from", ".", "mcp-flowise"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

try:
    # Send "initialize" request
    process.stdin.write(json.dumps(initialize_request) + "\n")
    process.stdin.flush()
    time.sleep(2.0)

    # Send "initialized" notification
    process.stdin.write(json.dumps(initialized_notification) + "\n")
    process.stdin.flush()
    time.sleep(2.0)

    # Send "tools/list" request
    process.stdin.write(json.dumps(list_tools_request) + "\n")
    process.stdin.flush()
    time.sleep(4)

    # Capture output
    stdout, stderr = process.communicate(timeout=5)

    # Print server responses
    print("STDOUT:")
    print(stdout)
    print("STDERR:")
    print(stderr)

except subprocess.TimeoutExpired:
    print("MCP server process timed out.")
    process.kill()
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    process.terminate()

```

--------------------------------------------------------------------------------
/mcp_flowise/__main__.py:
--------------------------------------------------------------------------------

```python
"""
Entry point for the mcp_flowise package.

This script determines which server to run based on the presence of 
the FLOWISE_SIMPLE_MODE environment variable:
- Low-Level Server: For dynamic tool creation based on chatflows.
- FastMCP Server: For static tool configurations.
"""

import os
import sys
from dotenv import load_dotenv
from mcp_flowise.utils import setup_logging
from mcp_flowise.utils import fetch_chatflows

# Load environment variables from .env if present
load_dotenv()

def main():
    """
    Main entry point for the mcp_flowise package.

    Depending on the FLOWISE_SIMPLE_MODE environment variable, this function
    launches either:
    - Low-Level Server (dynamic tools)
    - FastMCP Server (static tools)
    """
    # Configure logging
    DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")
    logger = setup_logging(debug=DEBUG)

    logger.debug("Starting mcp_flowise package entry point.")

    chatflows = fetch_chatflows()
    logger.debug(f"Available chatflows: {chatflows}")

    # Default to Simple Mode unless explicitly disabled
    FLOWISE_SIMPLE_MODE = os.getenv("FLOWISE_SIMPLE_MODE", "true").lower() not in ("false", "0", "no")
    if FLOWISE_SIMPLE_MODE:
        logger.debug("FLOWISE_SIMPLE_MODE is enabled. Launching FastMCP Server.")
        from mcp_flowise.server_fastmcp import run_simple_server
        selected_server = run_simple_server
    else:
        logger.debug("FLOWISE_SIMPLE_MODE is disabled. Launching Low-Level Server.")
        from mcp_flowise.server_lowlevel import run_server
        selected_server = run_server

    # Run the selected server
    try:
        selected_server()
    except Exception as e:
        logger.critical("Unhandled exception occurred while running the server.", exc_info=True)
        sys.exit(1)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/test_mcp_call_tool_valid.py:
--------------------------------------------------------------------------------

```python
import os
import subprocess
import time
import json

# Ensure the required environment variable is set
FLOWISE_CHATFLOW_ID = os.getenv("FLOWISE_CHATFLOW_ID")
if not FLOWISE_CHATFLOW_ID:
    print("Error: FLOWISE_CHATFLOW_ID environment variable is not set.")
    exit(1)

# Define requests
initialize_request = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "initialize",
    "params": {
        "protocolVersion": "1.0",
        "capabilities": {},
        "clientInfo": {"name": "valid-client", "version": "0.1"}
    }
}

list_tools_request = {
    "jsonrpc": "2.0",
    "id": 2,
    "method": "tools/list"
}

call_tool_request = {
    "jsonrpc": "2.0",
    "id": 3,
    "method": "tools/call",
    "params": {
        "name": FLOWISE_CHATFLOW_ID,  # Use the valid chatflow ID
        "arguments": {"question": "What is AI?"}
    }
}

# Start MCP server
process = subprocess.Popen(
    ["uvx", "--from", ".", "mcp-flowise"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

try:
    # Initialize the server
    print("Sending initialize request...")
    process.stdin.write(json.dumps(initialize_request) + "\n")
    process.stdin.flush()

    # Wait until the server sends a response to the initialize request
    time.sleep(0.5)
    stdout_line = process.stdout.readline()
    while "id" not in stdout_line:  # Look for a response containing "id"
        print(f"Server Response: {stdout_line.strip()}")
        stdout_line = process.stdout.readline()

    print("Initialization complete.")

    # List tools
    print("Sending tools/list request...")
    process.stdin.write(json.dumps(list_tools_request) + "\n")
    process.stdin.flush()
    time.sleep(0.5)

    # Call the valid tool
    print(f"Sending tools/call request for chatflow '{FLOWISE_CHATFLOW_ID}'...")
    process.stdin.write(json.dumps(call_tool_request) + "\n")
    process.stdin.flush()
    time.sleep(1)

    # Capture output
    stdout, stderr = process.communicate(timeout=5)

    # Print responses
    print("STDOUT:")
    print(stdout)

    print("STDERR:")
    print(stderr)

except subprocess.TimeoutExpired:
    print("MCP server process timed out.")
    process.kill()
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    process.terminate()

```

--------------------------------------------------------------------------------
/tests/integration/test_flowise_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for Flowise MCP.
These tests will run conditionally if the required environment variables are configured.
"""

import os
import unittest
from mcp_flowise.utils import fetch_chatflows, flowise_predict


class IntegrationTests(unittest.TestCase):
    """
    Integration tests for Flowise MCP.
    """

    @unittest.skipUnless(
        os.getenv("FLOWISE_API_KEY") and os.getenv("FLOWISE_API_ENDPOINT"),
        "FLOWISE_API_KEY and FLOWISE_API_ENDPOINT must be set for integration tests.",
    )
    def test_tool_discovery_in_lowlevel_mode(self):
        """
        Test tool discovery in low-level mode by fetching tools from the Flowise server.
        """
        chatflows = fetch_chatflows()
        self.assertGreater(len(chatflows), 0, "No chatflows discovered. Ensure the Flowise server is configured correctly.")
        print(f"Discovered chatflows: {[cf['name'] for cf in chatflows]}")

    @unittest.skipUnless(
        os.getenv("FLOWISE_API_KEY") and os.getenv("FLOWISE_API_ENDPOINT"),
        "FLOWISE_API_KEY and FLOWISE_API_ENDPOINT must be set for tool tests.",
    )
    def test_call_specific_tool(self):
        """
        Test calling a specific tool if available on the Flowise server.
        """
        chatflows = fetch_chatflows()
        if not chatflows:
            self.skipTest("No chatflows discovered on the server. Skipping tool test.")

        # Handle cases with and without the FLOWISE_CHATFLOW_ID environment variable
        specific_chatflow_id = os.getenv("FLOWISE_CHATFLOW_ID")
        if specific_chatflow_id:
            # Look for the specified chatflow ID
            chatflow = next((cf for cf in chatflows if cf["id"] == specific_chatflow_id), None)
            if not chatflow:
                self.skipTest(f"Specified chatflow ID {specific_chatflow_id} not found. Skipping tool test.")
        else:
            # Fallback to the first chatflow if no ID is specified
            chatflow = chatflows[0]

        tool_name = chatflow.get("name")
        print(f"Testing tool: {tool_name} with ID: {chatflow['id']}")

        # Simulate tool call
        result = self.simulate_tool_call(tool_name, chatflow["id"], "Tell me a fun fact.")
        self.assertTrue(
            result.strip(),
            f"Unexpected empty response from tool {tool_name}: {result}"
        )

    def simulate_tool_call(self, tool_name, chatflow_id, question):
        """
        Simulates a tool call by directly using the flowise_predict function.

        Args:
            tool_name (str): The name of the tool.
            chatflow_id (str): The ID of the chatflow/tool.
            question (str): The question to ask.

        Returns:
            str: The response from the tool.
        """
        return flowise_predict(chatflow_id, question)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/integration/test_tool_registration_integration.py:
--------------------------------------------------------------------------------

```python
import os
import unittest
import asyncio
from mcp_flowise.server_lowlevel import run_server
from mcp import types
from multiprocessing import Process
from time import sleep


class TestToolRegistrationIntegration(unittest.TestCase):
    """
    True integration test for tool registration and listing.
    """

    @classmethod
    def setUpClass(cls):
        """
        Set up the test environment and server.
        """
        # Set the environment variable for chatflow descriptions
        os.environ["FLOWISE_CHATFLOW_DESCRIPTIONS"] = (
            "chatflow1:Test Chatflow 1,chatflow2:Test Chatflow 2"
        )

        # Start the server using asyncio.create_task
        # cls.loop = asyncio.get_event_loop()
        cls.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(cls.loop)
        cls.server_task = cls.loop.create_task(cls.start_server())

    @classmethod
    async def start_server(cls):
        """
        Start the server as a coroutine.
        """
        await run_server()

    @classmethod
    def tearDownClass(cls):
        """
        Clean up the server task.
        """
        cls.server_task.cancel()

    def test_tool_registration_and_listing(self):
        """
        Test that tools are correctly registered and listed at runtime.
        """
        async def run_client():
            # Create a ListToolsRequest
            list_tools_request = types.ListToolsRequest(method="tools/list")

            # Simulate the request and get the response
            response = await self.mock_client_request(list_tools_request)

            # Validate the response
            tools = response.root.tools
            assert len(tools) == 2, "Expected 2 tools to be registered"
            assert tools[0].name == "test_chatflow_1"
            assert tools[0].description == "Test Chatflow 1"
            assert tools[1].name == "test_chatflow_2"
            assert tools[1].description == "Test Chatflow 2"

        asyncio.run(run_client())

    async def mock_client_request(self, request):
        """
        Mock client request for testing purposes. Replace with actual client logic.
        """
        return types.ServerResult(
            root=types.ListToolsResult(
                tools=[
                    types.Tool(
                        name="test_chatflow_1",
                        description="Test Chatflow 1",
                        inputSchema={
                            "type": "object",
                            "properties": {
                                "question": {"type": "string"}
                            },
                            "required": ["question"]
                        }
                    ),
                    types.Tool(
                        name="test_chatflow_2",
                        description="Test Chatflow 2",
                        inputSchema={
                            "type": "object",
                            "properties": {
                                "question": {"type": "string"}
                            },
                            "required": ["question"]
                        }
                    ),
                ]
            )
        )


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/tests/unit/test_chatflow_filters.py:
--------------------------------------------------------------------------------

```python
import os
import unittest
from unittest.mock import patch
from mcp_flowise.utils import filter_chatflows


class TestChatflowFilters(unittest.TestCase):
    """
    Unit tests for chatflow filtering logic in mcp_flowise.utils.
    """

    def setUp(self):
        """
        Reset the environment variables for filtering logic.
        """
        os.environ.pop("FLOWISE_WHITELIST_ID", None)
        os.environ.pop("FLOWISE_BLACKLIST_ID", None)
        os.environ.pop("FLOWISE_WHITELIST_NAME_REGEX", None)
        os.environ.pop("FLOWISE_BLACKLIST_NAME_REGEX", None)

    def test_no_filters(self):
        """
        Test that all chatflows are returned when no filters are set.
        """
        chatflows = [
            {"id": "chatflow1", "name": "First Chatflow"},
            {"id": "chatflow2", "name": "Second Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), len(chatflows))
        self.assertListEqual(filtered, chatflows)

    @patch.dict(os.environ, {"FLOWISE_WHITELIST_ID": "chatflow1,chatflow3"})
    def test_whitelist_id_filter(self):
        """
        Test that only whitelisted chatflows by ID are returned.
        """
        chatflows = [
            {"id": "chatflow1", "name": "First Chatflow"},
            {"id": "chatflow2", "name": "Second Chatflow"},
            {"id": "chatflow3", "name": "Third Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), 2)
        self.assertTrue(all(cf["id"] in {"chatflow1", "chatflow3"} for cf in filtered))

    @patch.dict(os.environ, {"FLOWISE_BLACKLIST_ID": "chatflow2"})
    def test_blacklist_id_filter(self):
        """
        Test that blacklisted chatflows by ID are excluded.
        """
        chatflows = [
            {"id": "chatflow1", "name": "First Chatflow"},
            {"id": "chatflow2", "name": "Second Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), 1)
        self.assertEqual(filtered[0]["id"], "chatflow1")

    @patch.dict(os.environ, {"FLOWISE_WHITELIST_NAME_REGEX": ".*First.*"})
    def test_whitelist_name_regex_filter(self):
        """
        Test that only chatflows matching the whitelist name regex are returned.
        """
        chatflows = [
            {"id": "chatflow1", "name": "First Chatflow"},
            {"id": "chatflow2", "name": "Second Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), 1)
        self.assertEqual(filtered[0]["name"], "First Chatflow")

    @patch.dict(os.environ, {"FLOWISE_BLACKLIST_NAME_REGEX": ".*Second.*"})
    def test_blacklist_name_regex_filter(self):
        """
        Test that chatflows matching the blacklist name regex are excluded.
        """
        chatflows = [
            {"id": "chatflow1", "name": "First Chatflow"},
            {"id": "chatflow2", "name": "Second Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), 1)
        self.assertEqual(filtered[0]["name"], "First Chatflow")

    @patch.dict(
        os.environ,
        {
            "FLOWISE_WHITELIST_ID": "chatflow1",
            "FLOWISE_BLACKLIST_NAME_REGEX": ".*Second.*",
        },
    )
    def test_whitelist_and_blacklist_combined(self):
        """
        Test that whitelist takes precedence over blacklist.
        """
        chatflows = [
            {"id": "chatflow1", "name": "Second Chatflow"},
            {"id": "chatflow2", "name": "Another Chatflow"},
        ]
        filtered = filter_chatflows(chatflows)
        self.assertEqual(len(filtered), 1)
        self.assertEqual(filtered[0]["id"], "chatflow1")


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/mcp_flowise/server_fastmcp.py:
--------------------------------------------------------------------------------

```python
"""
Provides the FastMCP server logic for mcp_flowise.

This server exposes a limited set of tools (list_chatflows, create_prediction)
and uses environment variables to determine the chatflow or assistant configuration.
"""

import os
import sys
import json
from mcp.server.fastmcp import FastMCP
from mcp_flowise.utils import flowise_predict, fetch_chatflows, redact_api_key, setup_logging

# Environment variables
FLOWISE_API_KEY = os.getenv("FLOWISE_API_KEY", "")
FLOWISE_API_ENDPOINT = os.getenv("FLOWISE_API_ENDPOINT", "http://localhost:3000")
FLOWISE_CHATFLOW_ID = os.getenv("FLOWISE_CHATFLOW_ID")
FLOWISE_ASSISTANT_ID = os.getenv("FLOWISE_ASSISTANT_ID")
FLOWISE_CHATFLOW_DESCRIPTION = os.getenv("FLOWISE_CHATFLOW_DESCRIPTION")
FLOWISE_CHATFLOW_WHITELIST = os.getenv("FLOWISE_CHATFLOW_WHITELIST")
FLOWISE_CHATFLOW_BLACKLIST = os.getenv("FLOWISE_CHATFLOW_BLACKLIST")

DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")

# Configure logging
logger = setup_logging(debug=DEBUG)

# Log key environment variable values
logger.debug(f"Flowise API Key (redacted): {redact_api_key(FLOWISE_API_KEY)}")
logger.debug(f"Flowise API Endpoint: {FLOWISE_API_ENDPOINT}")
logger.debug(f"Flowise Chatflow ID: {FLOWISE_CHATFLOW_ID}")
logger.debug(f"Flowise Assistant ID: {FLOWISE_ASSISTANT_ID}")
logger.debug(f"Flowise Chatflow Description: {FLOWISE_CHATFLOW_DESCRIPTION}")

# Initialize MCP Server
mcp = FastMCP("FlowiseMCP-with-EnvAuth")


@mcp.tool()
def list_chatflows() -> str:
    """
    List all available chatflows from the Flowise API.

    This function respects optional whitelisting or blacklisting if configured
    via FLOWISE_CHATFLOW_WHITELIST or FLOWISE_CHATFLOW_BLACKLIST.

    Returns:
        str: A JSON-encoded string of filtered chatflows.
    """
    logger.debug("Handling list_chatflows tool.")
    chatflows = fetch_chatflows()

    # Apply whitelisting
    if FLOWISE_CHATFLOW_WHITELIST:
        whitelist = set(FLOWISE_CHATFLOW_WHITELIST.split(","))
        chatflows = [cf for cf in chatflows if cf["id"] in whitelist]
        logger.debug(f"Applied whitelist filter: {whitelist}")

    # Apply blacklisting
    if FLOWISE_CHATFLOW_BLACKLIST:
        blacklist = set(FLOWISE_CHATFLOW_BLACKLIST.split(","))
        chatflows = [cf for cf in chatflows if cf["id"] not in blacklist]
        logger.debug(f"Applied blacklist filter: {blacklist}")

    logger.debug(f"Filtered chatflows: {chatflows}")
    return json.dumps(chatflows)


@mcp.tool()
def create_prediction(*, chatflow_id: str = None, question: str) -> str:
    """
    Create a prediction by sending a question to a specific chatflow or assistant.

    Args:
        chatflow_id (str, optional): The ID of the chatflow to use. Defaults to FLOWISE_CHATFLOW_ID.
        question (str): The question or prompt to send to the chatflow.

    Returns:
        str: The raw JSON response from Flowise API or an error message if something goes wrong.
    """
    logger.debug(f"create_prediction called with chatflow_id={chatflow_id}, question={question}")
    chatflow_id = chatflow_id or FLOWISE_CHATFLOW_ID

    if not chatflow_id and not FLOWISE_ASSISTANT_ID:
        logger.error("No chatflow_id or assistant_id provided or pre-configured.")
        return json.dumps({"error": "chatflow_id or assistant_id is required"})

    try:
        # Determine which chatflow ID to use
        target_chatflow_id = chatflow_id or FLOWISE_ASSISTANT_ID

        # Call the prediction function and return the raw JSON result
        result = flowise_predict(target_chatflow_id, question)
        logger.debug(f"Prediction result: {result}")
        return result  # Returning raw JSON as a string
    except Exception as e:
        logger.error(f"Unhandled exception in create_prediction: {e}", exc_info=True)
        return json.dumps({"error": str(e)})

def run_simple_server():
    """
    Run the FastMCP version of the Flowise server.

    This function ensures proper configuration and handles server initialization.

    Raises:
        SystemExit: If both FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID are set simultaneously.
    """
    if FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID:
        logger.error("Both FLOWISE_CHATFLOW_ID and FLOWISE_ASSISTANT_ID are set. Set only one.")
        sys.exit(1)

    try:
        logger.debug("Starting MCP server (FastMCP version)...")
        mcp.run(transport="stdio")
    except Exception as e:
        logger.error("Unhandled exception in MCP server.", exc_info=True)
        sys.exit(1)

```

--------------------------------------------------------------------------------
/tests/unit/test_utils.py:
--------------------------------------------------------------------------------

```python
import unittest
from unittest.mock import patch, Mock
import requests
from mcp_flowise.utils import flowise_predict, fetch_chatflows, filter_chatflows, normalize_tool_name

class TestUtils(unittest.TestCase):

    @patch("requests.post")
    def test_flowise_predict_success(self, mock_post: Mock) -> None:
        """
        Test successful prediction response.
        """
        mock_post.return_value = Mock(
            status_code=200,
            text='{"text": "Mock Prediction"}',
        )
        response = flowise_predict("valid_chatflow_id", "What's AI?")
        self.assertEqual(response, '{"text": "Mock Prediction"}')  # Success case
        mock_post.assert_called_once()

    @patch("requests.post", side_effect=requests.Timeout)
    def test_flowise_predict_timeout(self, mock_post: Mock) -> None:
        """
        Test prediction handling of timeout.
        """
        response = flowise_predict("valid_chatflow_id", "What's AI?")
        self.assertIn("error", response)  # Assert the response contains the error key
        # self.assertIn("Timeout", response)  # Timeout-specific assertion

    @patch("requests.post")
    def test_flowise_predict_http_error(self, mock_post: Mock) -> None:
        """
        Test prediction handling of HTTP errors.
        """
        mock_post.return_value = Mock(
            status_code=500,
            raise_for_status=Mock(side_effect=requests.HTTPError("500 Error")),
            text='{"error": "500 Error"}',
        )
        response = flowise_predict("valid_chatflow_id", "What's AI?")
        self.assertIn("error", response)
        self.assertIn("500 Error", response)

    @patch("requests.get")
    def test_fetch_chatflows_success(self, mock_get: Mock) -> None:
        """
        Test successful fetching of chatflows.
        """
        mock_get.return_value = Mock(
            status_code=200,
            json=Mock(return_value=[{"id": "1", "name": "Chatflow 1"}, {"id": "2", "name": "Chatflow 2"}]),
        )
        chatflows = fetch_chatflows()
        self.assertEqual(len(chatflows), 2)
        self.assertEqual(chatflows[0]["id"], "1")
        self.assertEqual(chatflows[0]["name"], "Chatflow 1")
        mock_get.assert_called_once()

    @patch("requests.get", side_effect=requests.Timeout)
    def test_fetch_chatflows_timeout(self, mock_get: Mock) -> None:
        """
        Test handling of timeout when fetching chatflows.
        """
        chatflows = fetch_chatflows()
        self.assertEqual(chatflows, [])  # Should return an empty list on timeout

    @patch("requests.get")
    def test_fetch_chatflows_http_error(self, mock_get: Mock) -> None:
        """
        Test handling of HTTP errors when fetching chatflows.
        """
        mock_get.return_value = Mock(
            status_code=500,
            raise_for_status=Mock(side_effect=requests.HTTPError("500 Error")),
        )
        chatflows = fetch_chatflows()
        self.assertEqual(chatflows, [])  # Should return an empty list on HTTP error

    def test_filter_chatflows(self) -> None:
        """
        Test filtering of chatflows based on whitelist and blacklist criteria.
        """
        chatflows = [
            {"id": "1", "name": "Chatflow 1"},
            {"id": "2", "name": "Chatflow 2"},
            {"id": "3", "name": "Chatflow 3"},
        ]

        # Mock environment variables
        with patch.dict("os.environ", {
            "FLOWISE_WHITELIST_ID": "1,2",
            "FLOWISE_BLACKLIST_ID": "3",
            "FLOWISE_WHITELIST_NAME_REGEX": "",
            "FLOWISE_BLACKLIST_NAME_REGEX": "",
        }):
            filtered = filter_chatflows(chatflows)
            self.assertEqual(len(filtered), 2)
            self.assertEqual(filtered[0]["id"], "1")
            self.assertEqual(filtered[1]["id"], "2")

        # Mock environment variables for blacklist only
        with patch.dict("os.environ", {
            "FLOWISE_WHITELIST_ID": "",
            "FLOWISE_BLACKLIST_ID": "2",
            "FLOWISE_WHITELIST_NAME_REGEX": "",
            "FLOWISE_BLACKLIST_NAME_REGEX": "",
        }):
            filtered = filter_chatflows(chatflows)
            self.assertEqual(len(filtered), 2)
            self.assertEqual(filtered[0]["id"], "1")
            self.assertEqual(filtered[1]["id"], "3")

    def test_normalize_tool_name(self) -> None:
        """
        Test normalization of tool names.
        """
        self.assertEqual(normalize_tool_name("Tool Name"), "tool_name")
        self.assertEqual(normalize_tool_name("Tool-Name"), "tool_name")
        self.assertEqual(normalize_tool_name("Tool_Name"), "tool_name")
        self.assertEqual(normalize_tool_name("ToolName"), "toolname")
        self.assertEqual(normalize_tool_name(""), "unknown_tool")
        self.assertEqual(normalize_tool_name(None), "unknown_tool")

if __name__ == "__main__":
    unittest.main()
```

--------------------------------------------------------------------------------
/mcp_flowise/server_lowlevel.py:
--------------------------------------------------------------------------------

```python
'''
Low-Level Server for the Flowise MCP.

This server dynamically registers tools based on the provided chatflows
retrieved from the Flowise API. Tool names are normalized for safety 
and consistency, and potential conflicts are logged.

Descriptions for tools are prioritized from FLOWISE_CHATFLOW_DESCRIPTIONS,
falling back to the chatflow names when not provided.

Conflicts in tool names after normalization are handled gracefully by
skipping those chatflows.
'''

import os
import sys
import asyncio
import json
from typing import List, Dict, Any
from mcp import types
from mcp.server.lowlevel import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp_flowise.utils import (
    flowise_predict,
    fetch_chatflows,
    normalize_tool_name,
    setup_logging,
)

# Configure logging
DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")
logger = setup_logging(debug=DEBUG)

# Global tool mapping: tool name to chatflow ID
NAME_TO_ID_MAPPING: Dict[str, str] = {}
tools: List[types.Tool] = []

# Initialize the Low-Level MCP Server
mcp = Server("FlowiseMCP-with-EnvAuth")


def get_chatflow_descriptions() -> Dict[str, str]:
    """
    Parse the FLOWISE_CHATFLOW_DESCRIPTIONS environment variable for descriptions.

    Returns:
        dict: A dictionary mapping chatflow IDs to descriptions.
    """
    descriptions_env = os.getenv("FLOWISE_CHATFLOW_DESCRIPTIONS", "")
    if not descriptions_env:
        logger.debug("No FLOWISE_CHATFLOW_DESCRIPTIONS provided.")
        return {}

    logger.debug("Retrieved FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions_env)
    descriptions = {}
    for pair in descriptions_env.split(","):
        if ":" not in pair:
            logger.warning("Invalid format in FLOWISE_CHATFLOW_DESCRIPTIONS: %s", pair)
            continue
        chatflow_id, description = map(str.strip, pair.split(":", 1))
        if chatflow_id and description:
            descriptions[chatflow_id] = description
    logger.debug("Parsed FLOWISE_CHATFLOW_DESCRIPTIONS: %s", descriptions)
    return descriptions


async def dispatcher_handler(request: types.CallToolRequest) -> types.ServerResult:
    """
    Dispatcher handler that routes CallToolRequest to the appropriate tool handler based on the tool name.
    """
    try:
        tool_name = request.params.name
        logger.debug("Dispatcher received CallToolRequest for tool: %s", tool_name)

        if tool_name not in NAME_TO_ID_MAPPING:
            logger.error("Unknown tool requested: %s", tool_name)
            return types.ServerResult(
                root=types.CallToolResult(
                    content=[types.TextContent(type="text", text="Unknown tool requested")]
                )
            )

        chatflow_id = NAME_TO_ID_MAPPING[tool_name]
        question = request.params.arguments.get("question", "")
        if not question:
            logger.error("Missing 'question' argument for tool: %s", tool_name)
            return types.ServerResult(
                root=types.CallToolResult(
                    content=[types.TextContent(type="text", text="Missing 'question' argument.")]
                )
            )

        logger.debug("Dispatching prediction for chatflow_id: %s with question: %s", chatflow_id, question)

        # Call the prediction function
        try:
            result = flowise_predict(chatflow_id, question)
            logger.debug("Prediction result: %s", result)
        except Exception as pred_err:
            logger.error("Error during prediction: %s", pred_err, exc_info=True)
            result = json.dumps({"error": "Error occurred during prediction."})

        # Pass the raw JSON response or error JSON back to the client
        return types.ServerResult(
            root=types.CallToolResult(
                content=[types.TextContent(type="text", text=result)]
            )
        )
    except Exception as e:
        logger.error("Unhandled exception in dispatcher_handler: %s", e, exc_info=True)
        return types.ServerResult(
            root=types.CallToolResult(
                content=[types.TextContent(type="text", text=json.dumps({"error": "Internal server error."}))]  # Ensure JSON is returned
            )
        )


async def list_tools(request: types.ListToolsRequest) -> types.ServerResult:
    """
    Handler for ListToolsRequest to list all registered tools.

    Args:
        request (types.ListToolsRequest): The request to list tools.

    Returns:
        types.ServerResult: The result containing the list of tools.
    """
    logger.debug("Handling list_tools request.")
    return types.ServerResult(root=types.ListToolsResult(tools=tools))


def register_tools(chatflows: List[Dict[str, Any]], chatflow_descriptions: Dict[str, str]) -> List[types.Tool]:
    """
    Register tools dynamically based on the provided chatflows.

    Args:
        chatflows (List[Dict[str, Any]]): List of chatflows retrieved from the Flowise API.
        chatflow_descriptions (Dict[str, str]): Dictionary mapping chatflow IDs to descriptions.

    Returns:
        List[types.Tool]: List of registered tools.
    """
    global tools
    tools = []  # Clear existing tools before re-registration
    for chatflow in chatflows:
        try:
            normalized_name = normalize_tool_name(chatflow["name"])

            if normalized_name in NAME_TO_ID_MAPPING:
                logger.warning(
                    "Tool name conflict: '%s' already exists. Skipping chatflow '%s' (ID: '%s').",
                    normalized_name,
                    chatflow["name"],
                    chatflow["id"],
                )
                continue

            NAME_TO_ID_MAPPING[normalized_name] = chatflow["id"]
            description = chatflow_descriptions.get(chatflow["id"], chatflow["name"])

            tool = types.Tool(
                name=normalized_name,
                description=description,
                inputSchema={
                    "type": "object",
                    "required": ["question"],
                    "properties": {"question": {"type": "string"}},
                },
            )
            tools.append(tool)
            logger.debug("Registered tool: %s (ID: %s)", tool.name, chatflow["id"])

        except Exception as e:
            logger.error("Error registering chatflow '%s' (ID: '%s'): %s", chatflow["name"], chatflow["id"], e)

    return tools


async def start_server():
    """
    Start the Low-Level MCP server.
    """
    logger.debug("Starting Low-Level MCP server...")
    try:
        async with stdio_server() as (read_stream, write_stream):
            await mcp.run(
                read_stream,
                write_stream,
                initialization_options=InitializationOptions(
                    server_name="FlowiseMCP-with-EnvAuth",
                    server_version="0.1.0",
                    capabilities=types.ServerCapabilities(),
                ),
            )
    except Exception as e:
        logger.critical("Unhandled exception in MCP server: %s", e)
        sys.exit(1)


def run_server():
    """
    Run the Low-Level Flowise server by registering tools dynamically.
    """
    try:
        chatflows = fetch_chatflows()
        if not chatflows:
            raise ValueError("No chatflows retrieved from the Flowise API.")
    except Exception as e:
        logger.critical("Failed to start server: %s", e)
        sys.exit(1)

    chatflow_descriptions = get_chatflow_descriptions()
    register_tools(chatflows, chatflow_descriptions)

    if not tools:
        logger.critical("No valid tools registered. Shutting down the server.")
        sys.exit(1)

    mcp.request_handlers[types.CallToolRequest] = dispatcher_handler
    logger.debug("Registered dispatcher_handler for CallToolRequest.")

    mcp.request_handlers[types.ListToolsRequest] = list_tools
    logger.debug("Registered list_tools handler.")

    try:
        asyncio.run(start_server())
    except KeyboardInterrupt:
        logger.debug("MCP server shutdown initiated by user.")
    except Exception as e:
        logger.critical("Failed to start MCP server: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    run_server()

```

--------------------------------------------------------------------------------
/mcp_flowise/utils.py:
--------------------------------------------------------------------------------

```python
"""
Utility functions for mcp_flowise, including logging setup, chatflow filtering, and Flowise API interactions.

This module centralizes shared functionality such as:
1. Logging configuration for consistent log output across the application.
2. Safe redaction of sensitive data like API keys in logs.
3. Low-level interactions with the Flowise API for predictions and chatflow management.
4. Flexible filtering of chatflows based on whitelist/blacklist criteria.
"""

import os
import sys
import logging
import requests
import re
import json
from dotenv import load_dotenv

# Load environment variables from .env if present
load_dotenv()

# Flowise API configuration
FLOWISE_API_KEY = os.getenv("FLOWISE_API_KEY", "")
FLOWISE_API_ENDPOINT = os.getenv("FLOWISE_API_ENDPOINT", "http://localhost:3000")


def setup_logging(debug: bool = False, log_dir: str = None, log_file: str = "debug-mcp-flowise.log") -> logging.Logger:
    """
    Sets up logging for the application, including outputting CRITICAL and ERROR logs to stdout.

    Args:
        debug (bool): If True, set log level to DEBUG; otherwise, INFO.
        log_dir (str): Directory where log files will be stored. Ignored if `FLOWISE_LOGFILE_PATH` is set.
        log_file (str): Name of the log file. Ignored if `FLOWISE_LOGFILE_PATH` is set.

    Returns:
        logging.Logger: Configured logger instance.
    """
    log_path = os.getenv("FLOWISE_LOGFILE_PATH")
    if not log_path:
        if log_dir is None:
            log_dir = os.path.join(os.path.expanduser("~"), "mcp_logs")
        try:
            os.makedirs(log_dir, exist_ok=True)
            log_path = os.path.join(log_dir, log_file)
        except PermissionError as e:
            # Fallback to stdout logging if directory creation fails
            log_path = None
            print(f"[ERROR] Failed to create log directory: {e}", file=sys.stderr)

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG if debug else logging.INFO)
    logger.propagate = False  # Prevent log messages from propagating to the root logger

    # Remove all existing handlers to prevent accumulation
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)

    handlers = []

    if log_path:
        try:
            file_handler = logging.FileHandler(log_path, mode="a")
            file_handler.setLevel(logging.DEBUG if debug else logging.INFO)
            formatter = logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s")
            file_handler.setFormatter(formatter)
            handlers.append(file_handler)
        except Exception as e:
            print(f"[ERROR] Failed to create log file handler: {e}", file=sys.stderr)

    # Attempt to create StreamHandler for ERROR level logs
    try:
        stdout_handler = logging.StreamHandler(sys.stdout)
        stdout_handler.setLevel(logging.ERROR)
        formatter = logging.Formatter("[%(levelname)s] %(message)s")
        stdout_handler.setFormatter(formatter)
        handlers.append(stdout_handler)
    except Exception as e:
        print(f"[ERROR] Failed to create stdout log handler: {e}", file=sys.stderr)

    # Add all handlers to the logger
    for handler in handlers:
        logger.addHandler(handler)

    if log_path:
        logger.debug(f"Logging initialized. Writing logs to {log_path}")
    else:
        logger.debug("Logging initialized. Logs will only appear in stdout.")
    return logger


def redact_api_key(key: str) -> str:
    """
    Redacts the Flowise API key for safe logging output.

    Args:
        key (str): The API key to redact.

    Returns:
        str: The redacted API key or '<not set>' if the key is invalid.
    """
    if not key or len(key) <= 4:
        return "<not set>"
    return f"{key[:2]}{'*' * (len(key) - 4)}{key[-2:]}"


def normalize_tool_name(name: str) -> str:
    """
    Normalize tool names by converting to lowercase and replacing non-alphanumeric characters with underscores.

    Args:
        name (str): Original tool name.

    Returns:
        str: Normalized tool name. Returns 'unknown_tool' if the input is invalid.
    """
    logger = logging.getLogger(__name__)
    if not name or not isinstance(name, str):
        logger.warning("Invalid tool name input: %s. Using default 'unknown_tool'.", name)
        return "unknown_tool"
    normalized = re.sub(r"[^a-zA-Z0-9]", "_", name).lower()
    logger.debug("Normalized tool name from '%s' to '%s'", name, normalized)
    return normalized or "unknown_tool"


def filter_chatflows(chatflows: list[dict]) -> list[dict]:
    """
    Filters chatflows based on whitelist and blacklist criteria.
    Whitelist takes precedence over blacklist.

    Args:
        chatflows (list[dict]): A list of chatflow dictionaries.

    Returns:
        list[dict]: Filtered list of chatflows.
    """
    logger = logging.getLogger(__name__)

    # Dynamically fetch filtering criteria
    whitelist_ids = set(filter(bool, os.getenv("FLOWISE_WHITELIST_ID", "").split(",")))
    blacklist_ids = set(filter(bool, os.getenv("FLOWISE_BLACKLIST_ID", "").split(",")))
    whitelist_name_regex = os.getenv("FLOWISE_WHITELIST_NAME_REGEX", "")
    blacklist_name_regex = os.getenv("FLOWISE_BLACKLIST_NAME_REGEX", "")

    filtered_chatflows = []

    for chatflow in chatflows:
        chatflow_id = chatflow.get("id", "")
        chatflow_name = chatflow.get("name", "")

        # Flags to determine inclusion
        is_whitelisted = False

        # Check Whitelist
        if whitelist_ids or whitelist_name_regex:
            if whitelist_ids and chatflow_id in whitelist_ids:
                is_whitelisted = True
            if whitelist_name_regex and re.search(whitelist_name_regex, chatflow_name):
                is_whitelisted = True

            if is_whitelisted:
                # If whitelisted, include regardless of blacklist
                logger.debug("Including whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id)
                filtered_chatflows.append(chatflow)
                continue  # Skip blacklist checks
            else:
                # If not whitelisted, exclude regardless of blacklist
                logger.debug("Excluding non-whitelisted chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id)
                continue
        else:
            # If no whitelist, apply blacklist directly
            if blacklist_ids and chatflow_id in blacklist_ids:
                logger.debug("Skipping chatflow '%s' (ID: '%s') - In blacklist.", chatflow_name, chatflow_id)
                continue  # Exclude blacklisted by ID
            if blacklist_name_regex and re.search(blacklist_name_regex, chatflow_name):
                logger.debug("Skipping chatflow '%s' (ID: '%s') - Name matches blacklist regex.", chatflow_name, chatflow_id)
                continue  # Exclude blacklisted by name

            # Include the chatflow if it passes all filters
            logger.debug("Including chatflow '%s' (ID: '%s').", chatflow_name, chatflow_id)
            filtered_chatflows.append(chatflow)

    logger.debug("Filtered chatflows: %d out of %d", len(filtered_chatflows), len(chatflows))
    return filtered_chatflows

def flowise_predict(chatflow_id: str, question: str) -> str:
    """
    Sends a question to a specific chatflow ID via the Flowise API and returns the response JSON text.

    Args:
        chatflow_id (str): The ID of the Flowise chatflow to be used.
        question (str): The question or prompt to send to the chatflow.

    Returns:
        str: The raw JSON response text from the Flowise API, or an error message if something goes wrong.
    """
    logger = logging.getLogger(__name__)

    # Construct the Flowise API URL for predictions
    url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/prediction/{chatflow_id}"
    headers = {
        "Content-Type": "application/json",
    }
    if FLOWISE_API_KEY:
        headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}"

    payload = {"question": question}
    logger.debug(f"Sending prediction request to {url} with payload: {payload}")

    try:
        # Send POST request to the Flowise API
        response = requests.post(url, json=payload, headers=headers, timeout=30)
        logger.debug(f"Prediction response code: HTTP {response.status_code}")
        # response.raise_for_status()

        # Log the raw response text for debugging
        logger.debug(f"Raw prediction response: {response.text}")

        # Return the raw JSON response text
        return response.text

    #except requests.exceptions.RequestException as e:
    except Exception as e:
        # Log and return an error message
        logger.error(f"Error during prediction: {e}")
        return json.dumps({"error": str(e)})


def fetch_chatflows() -> list[dict]:
    """
    Fetch a list of all chatflows from the Flowise API.

    Returns:
        list of dict: Each dict contains the 'id' and 'name' of a chatflow.
                      Returns an empty list if there's an error.
    """
    logger = logging.getLogger(__name__)

    # Construct the Flowise API URL for fetching chatflows
    url = f"{FLOWISE_API_ENDPOINT.rstrip('/')}/api/v1/chatflows"
    headers = {}
    if FLOWISE_API_KEY:
        headers["Authorization"] = f"Bearer {FLOWISE_API_KEY}"

    logger.debug(f"Fetching chatflows from {url}")

    try:
        # Send GET request to the Flowise API
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()

        # Parse and simplify the response data
        chatflows_data = response.json()
        simplified_chatflows = [{"id": cf["id"], "name": cf["name"]} for cf in chatflows_data]

        logger.debug(f"Fetched chatflows: {simplified_chatflows}")
        return filter_chatflows(simplified_chatflows)
    #except requests.exceptions.RequestException as e:
    except Exception as e:
        # Log and return an empty list on error
        logger.error(f"Error fetching chatflows: {e}")
        return []


# Set up logging before obtaining the logger
DEBUG = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")
logger = setup_logging(debug=DEBUG)

# Log key environment variable values
logger.debug(f"Flowise API Key (redacted): {redact_api_key(FLOWISE_API_KEY)}")
logger.debug(f"Flowise API Endpoint: {FLOWISE_API_ENDPOINT}")

```