#
tokens: 33289/50000 40/40 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── docker-compose.yaml
├── Dockerfile
├── Dockerfile.test
├── pyproject.toml
├── README.md
├── src
│   └── mcp_prefect
│       ├── __init__.py
│       ├── artifacts.py
│       ├── block.py
│       ├── concurrency_limits.py
│       ├── deployment.py
│       ├── enums.py
│       ├── envs.py
│       ├── flow_run.py
│       ├── flow.py
│       ├── health_check.py
│       ├── main.py
│       ├── server.py
│       ├── task_run.py
│       ├── variable.py
│       ├── work_pools.py
│       ├── work_queue.py
│       └── workspace.py
├── test_scripts
│   ├── README.md
│   ├── test_mcp_prefect.sh
│   ├── test_more_prefect.py
│   └── test_prefect.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── README.md
│   ├── test_blocks.py
│   ├── test_deployments.py
│   ├── test_flow_runs.py
│   ├── test_flows.py
│   ├── test_health.py
│   ├── test_task_runs.py
│   ├── test_variables.py
│   ├── test_workqueues.py
│   ├── test_workspaces.py
│   └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12.9

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# Virtual environment
venv/
.env/
.venv/
env/

# Distribution / packaging
build/
dist/
*.egg-info/
*.egg

# Install logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
*.cover
*.py,cover
.hypothesis/
pytest_cache/
nosetests.xml
coverage.xml

# Jupyter Notebook
.ipynb_checkpoints

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# PyCharm
.idea/

# VSCode
.vscode/
*.code-workspace

# macOS
.DS_Store

# Windows
Thumbs.db
ehthumbs.db
Desktop.ini
$RECYCLE.BIN/

# Logs and databases
*.log
*.sqlite3
*.db

# Docker
docker-compose.override.yml

# Ignore env files with secrets
.envrc
venv*/
.aider*

```

--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------

```markdown

Run tests

```
uv pip install -e ".[dev]"
docker compose up
pytest -svvl tests/
...
================================================================================== short test summary info ===================================================================================
FAILED tests/test_flow_runs.py::test_get_flow_run_by_id[asyncio] - BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
FAILED tests/test_flow_runs.py::test_get_flow_run_by_id[trio] - BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
FAILED tests/test_flows.py::test_get_flow_run_by_id[asyncio] - BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
FAILED tests/test_flows.py::test_get_flow_run_by_id[trio] - BaseExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
FAILED tests/test_workqueues.py::test_create_and_delete_work_queue[asyncio] - ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
FAILED tests/test_workqueues.py::test_create_and_delete_work_queue[trio] - ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
================================================================================ 6 failed, 28 passed in 3.33s ================================================================================
```
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Prefect MCP Server

A Model Context Protocol (MCP) server implementation for [Prefect](https://www.prefect.io/), allowing AI assistants to interact with Prefect through natural language.

## Features

This MCP server provides access to the following Prefect APIs:

- **Flow Management**: List, get, and delete flows
- **Flow Run Management**: Create, monitor, and control flow runs
- **Deployment Management**: Manage deployments and their schedules
- **Task Run Management**: Monitor and control task runs
- **Work Queue Management**: Create and manage work queues
- **Block Management**: Access block types and documents
- **Variable Management**: Create and manage variables
- **Workspace Management**: Get information about workspaces


## Configuration

Set the following environment variables:

```bash
export PREFECT_API_URL="http://localhost:4200/api"  # URL of your Prefect API
export PREFECT_API_KEY="your_api_key"               # Your Prefect API key (if using Prefect Cloud)
```

## Usage

Run the MCP server, and prefect:

```
docker compose up
```

## Example Input

Once connected, an AI assistant can help users interact with Prefect using natural language. Examples:

- "Show me all my flows"
- "List all failed flow runs from yesterday"
- "Trigger the 'data-processing' deployment"
- "Pause the schedule for the 'daily-reporting' deployment"
- "What's the status of my last ETL flow run?"

## Development

Several of the endpoints have yet to be implemented

### Adding New Functions

To add a new function to an existing API:

1. Add the function to the appropriate module in `src/mcp_prefect`
2. Add the function to the `get_all_functions()` list in the module

To add a new API type:

1. Add the new type to `APIType` in `enums.py`
2. Create a new module in `src/prefect/`
3. Update `main.py` to include the new API type


Example usage:

```
{
  "mcpServers": {
    "mcp-prefect": {
      "command": "mcp-prefect",
      "args": [
        "--transport", "sse"
      ],
      "env": {
        "PYTHONPATH": "/path/to/your/project/directory"
      },
      "cwd": "/path/to/your/project/directory"
    }
  }
}
```
```

--------------------------------------------------------------------------------
/test_scripts/README.md:
--------------------------------------------------------------------------------

```markdown
```
git clone [email protected]:allen-munsch/mcp-prefect.git
cd mcp-prefect
pyenv install 3.12.9
pip install -e .
docker compose up -d
python ./test_scripts/test_prefect.py 
```

```
INFO:prefect-mcp-test:Connecting to Prefect MCP server at http://localhost:8000/sse
INFO:mcp.client.sse:Connecting to SSE endpoint: http://localhost:8000/sse
INFO:httpx:HTTP Request: GET http://localhost:8000/sse "HTTP/1.1 200 OK"
INFO:mcp.client.sse:Received endpoint URL: http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710
INFO:mcp.client.sse:Starting post writer with endpoint URL: http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710
INFO:prefect-mcp-test:Initializing MCP session...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Connected to Prefect MCP v1.6.0
INFO:prefect-mcp-test:Protocol version: 2024-11-05
INFO:prefect-mcp-test:Server capabilities: experimental={} logging=None prompts=PromptsCapability(listChanged=False) resources=ResourcesCapability(subscribe=False, listChanged=False) tools=ToolsCapability(listChanged=False)
INFO:prefect-mcp-test:Listing available tools...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Tool: get_flows - Get all flows
INFO:prefect-mcp-test:Tool: get_flow - Get a flow by ID
INFO:prefect-mcp-test:Tool: delete_flow - Delete a flow by ID
INFO:prefect-mcp-test:Tool: get_flow_runs - Get all flow runs
INFO:prefect-mcp-test:Tool: get_flow_run - Get a flow run by ID
INFO:prefect-mcp-test:Tool: get_flow_runs_by_flow - Get flow runs for a specific flow
INFO:prefect-mcp-test:Tool: restart_flow_run - Restart a flow run
INFO:prefect-mcp-test:Tool: cancel_flow_run - Cancel a flow run
INFO:prefect-mcp-test:Tool: delete_flow_run - Delete a flow run
INFO:prefect-mcp-test:Tool: set_flow_run_state - Set a flow run's state
INFO:prefect-mcp-test:Tool: get_deployments - Get all deployments
INFO:prefect-mcp-test:Tool: get_deployment - Get a deployment by ID
INFO:prefect-mcp-test:Tool: create_flow_run_from_deployment - Create a flow run from a deployment
INFO:prefect-mcp-test:Tool: delete_deployment - Delete a deployment
INFO:prefect-mcp-test:Tool: update_deployment - Update a deployment
INFO:prefect-mcp-test:Tool: get_deployment_schedule - Get a deployment's schedule
INFO:prefect-mcp-test:Tool: set_deployment_schedule - Set a deployment's schedule
INFO:prefect-mcp-test:Tool: pause_deployment_schedule - Pause a deployment's schedule
INFO:prefect-mcp-test:Tool: resume_deployment_schedule - Resume a deployment's schedule
INFO:prefect-mcp-test:Tool: get_task_runs - Get all task runs
INFO:prefect-mcp-test:Tool: get_task_run - Get a task run by ID
INFO:prefect-mcp-test:Tool: get_task_runs_by_flow_run - Get task runs for a specific flow run
INFO:prefect-mcp-test:Tool: set_task_run_state - Set a task run's state
INFO:prefect-mcp-test:Tool: get_workspaces - Get all workspaces
INFO:prefect-mcp-test:Tool: get_current_workspace - Get current workspace
INFO:prefect-mcp-test:Tool: get_workspace - Get a workspace by ID
INFO:prefect-mcp-test:Tool: get_workspace_by_handle - Get a workspace by handle
INFO:prefect-mcp-test:Tool: get_block_types - Get all block types
INFO:prefect-mcp-test:Tool: get_block_type - Get a block type by slug
INFO:prefect-mcp-test:Tool: get_block_documents - Get block documents by block type
INFO:prefect-mcp-test:Tool: get_block_document - Get a block document by ID
INFO:prefect-mcp-test:Tool: delete_block_document - Delete a block document
INFO:prefect-mcp-test:Tool: get_variables - Get all variables
INFO:prefect-mcp-test:Tool: get_variable - Get a variable by name
INFO:prefect-mcp-test:Tool: create_variable - Create a variable
INFO:prefect-mcp-test:Tool: update_variable - Update a variable
INFO:prefect-mcp-test:Tool: delete_variable - Delete a variable
INFO:prefect-mcp-test:Tool: get_work_queues - Get all work queues
INFO:prefect-mcp-test:Tool: get_work_queue - Get a work queue by ID
INFO:prefect-mcp-test:Tool: get_work_queue_by_name - Get a work queue by name
INFO:prefect-mcp-test:Tool: create_work_queue - Create a work queue
INFO:prefect-mcp-test:Tool: update_work_queue - Update a work queue
INFO:prefect-mcp-test:Tool: delete_work_queue - Delete a work queue
INFO:prefect-mcp-test:Tool: pause_work_queue - Pause a work queue
INFO:prefect-mcp-test:Tool: resume_work_queue - Resume a work queue
INFO:prefect-mcp-test:Tool: get_health - Get health status
INFO:prefect-mcp-test:Testing get_health tool...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Health check result: <Response [200 OK]>
INFO:prefect-mcp-test:Testing get_flows tool...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Flows result: {'flows': []}...
INFO:prefect-mcp-test:Testing get_flows with filter...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Filtered flows result: Error fetching flows: FlowAsyncClient.read_flows() got an unexpected keyword argument 'name'...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Skipping get_flow test - no flows available
INFO:prefect-mcp-test:Testing get_deployments tool...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
INFO:prefect-mcp-test:Deployments result: {'deployments': []}...
INFO:prefect-mcp-test:Testing get_deployments with filter...
INFO:httpx:HTTP Request: POST http://localhost:8000/messages/?session_id=bfce341df9544e3a91daef4183f24710 "HTTP/1.1 202 Accepted"
...
```
```

--------------------------------------------------------------------------------
/src/mcp_prefect/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_prefect/artifacts.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_prefect/envs.py:
--------------------------------------------------------------------------------

```python
import os

PREFECT_API_URL = os.getenv("PREFECT_API_URL", "http://localhost:4200/api")
PREFECT_API_KEY = os.getenv("PREFECT_API_KEY")
```

--------------------------------------------------------------------------------
/src/mcp_prefect/enums.py:
--------------------------------------------------------------------------------

```python
from enum import Enum


class APIType(str, Enum):
    FLOW = "flow"
    FLOW_RUN = "flow_run"
    DEPLOYMENT = "deployment"
    TASK_RUN = "task_run"
    WORKSPACE = "workspace"
    BLOCK = "block"
    VARIABLE = "variable"
    WORK_QUEUE = "work_queue"

    _MCP_INTERNAL = "_mcp_internal"
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12.9-slim

WORKDIR /app

# Install hatch
RUN pip install --no-cache-dir hatch

# Copy project files
COPY pyproject.toml .
COPY README.md .
COPY src/ ./src/

# Build and install the package
RUN pip install .

EXPOSE 8000

HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

CMD ["mcp-prefect"]
```

--------------------------------------------------------------------------------
/test_scripts/test_mcp_prefect.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

SERVER="http://127.0.0.1:8000"

# Create initialize request
INIT_REQUEST=$(cat <<EOF
{
  "jsonrpc": "2.0",
  "id": "$(date +%s)",
  "method": "initialize",
  "params": {
    "clientInfo": {"name": "test-client", "version": "1.0.0"},
    "protocolVersion": "1.0.0",
    "capabilities": {"sampling": {}, "tools": {}}
  }
}
EOF
)

# URL encode the request
ENCODED_REQUEST=$(echo "$INIT_REQUEST" | jq -r @uri)

echo "Testing SSE connection with initialize..."
curl -NL -H "Accept: text/event-stream" "${SERVER}/sse?request=${ENCODED_REQUEST}"

```

--------------------------------------------------------------------------------
/src/mcp_prefect/server.py:
--------------------------------------------------------------------------------

```python
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from fastmcp import FastMCP

# Create the FastMCP server
mcp = FastMCP("Prefect MCP", 
              host='0.0.0.0',
              dependencies=[
                  "prefect>=3.2.15",
                  "uvicorn>=0.34.0"
              ])

@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> Response:
    return JSONResponse({"status": "ok"})

# Run the server when executed directly
if __name__ == "__main__":
    from .main import main
    main()

```

--------------------------------------------------------------------------------
/tests/test_blocks.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import logging
import pytest

from .conftest import prefect_client

pytestmark = pytest.mark.anyio

logger = logging.getLogger("prefect-mcp-test")

async def test_get_block_types():
    """Test getting block types."""

    async with prefect_client("get_block_types") as (session, tools):
        logger.info("Testing get_block_types tool...")
        block_types_result = await session.call_tool("get_block_types", {"limit": 5})
        
        # Verify response contains text content
        assert block_types_result.content is not None
        
        for content in block_types_result.content:
            if content.type == "text":
                logger.info(f"Block types result: {content.text[:200]}...")
                assert "block_types" in content.text or "blocks" in content.text
```

--------------------------------------------------------------------------------
/src/mcp_prefect/health_check.py:
--------------------------------------------------------------------------------

```python
from typing import List, Union

import mcp.types as types
from prefect import get_client

from .server import mcp


@mcp.tool
async def get_health() -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get health status of the Prefect server.
    
    Returns:
        Health status information
    """
    try:
        # Test connection to Prefect by calling the health endpoint
        async with get_client() as client:
            health_status = await client.hello()
            
        return [types.TextContent(type="text", text=str(health_status))]
    
    except Exception as e:
        error_status = {
            "status": "unhealthy",
            "message": f"Error connecting to Prefect server: {str(e)}"
        }
        
        return [types.TextContent(type="text", text=str(error_status))]

```

--------------------------------------------------------------------------------
/docker-compose.yaml:
--------------------------------------------------------------------------------

```yaml
version: '3.8'

services:
  # Prefect API server
  prefect-server:
    image: prefecthq/prefect:3.4.19-python3.12
    ports:
      - "4200:4200"
    environment:
      - PREFECT_UI_API_URL=http://localhost:4200/api
      - PREFECT_API_URL=http://localhost:4200/api
      - PREFECT_SERVER_API_HOST=0.0.0.0
    volumes:
      - prefect-data:/root/.prefect
    command: prefect server start

  # Prefect agent for running flows
  prefect-agent:
    image: prefecthq/prefect:3.4.19-python3.12
    depends_on:
    - prefect-server
    environment:
      - PREFECT_API_URL=http://prefect-server:4200/api
    command: prefect agent start -q default
    volumes:
      - ./flows:/opt/prefect/flows
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:4200/api/health"]
      interval: 2s
      timeout: 2s
      retries: 3

  # MCP Prefect server using FastMCP
  mcp-prefect:
    build:
      context: .
      dockerfile: Dockerfile
    depends_on:
    - prefect-agent
    environment:
      - PREFECT_API_URL=http://prefect-server:4200/api
      - PREFECT_API_KEY=
      - MCP_TRANSPORT=sse
      - MCP_PORT=8000
      - PYTHONUNBUFFERED=1
    ports:
      - "8000:8000"
    command: >
      bash -c "python -m pip install -e . && 
              echo 'Starting MCP server...' &&
              python -m mcp_prefect.main --transport sse"

volumes:
  prefect-data:
```

--------------------------------------------------------------------------------
/src/mcp_prefect/main.py:
--------------------------------------------------------------------------------

```python
import click
import logging

from .enums import APIType
from .server import mcp

log = logging.getLogger(__name__)
info = log.info


@click.command()
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
@click.option(
    "--apis",
    type=click.Choice([api.value for api in APIType]),
    default=[api.value for api in APIType],
    multiple=True,
    help="APIs to run, default is all",
)
def main(transport: str, apis: list[str]) -> None:
    # Import modules to register their decorated tools
    if APIType.FLOW.value in apis:
        from . import flow
    if APIType.FLOW_RUN.value in apis:
        from . import flow_run
    if APIType.DEPLOYMENT.value in apis:
        from . import deployment
    if APIType.TASK_RUN.value in apis:
        from . import task_run
    if APIType.WORKSPACE.value in apis:
        from . import workspace
    if APIType.BLOCK.value in apis:
        from . import block
    if APIType.VARIABLE.value in apis:
        from . import variable
    if APIType.WORK_QUEUE.value in apis:
        from . import work_queue
    if APIType._MCP_INTERNAL.value in apis:
        from . import health_check

    # Configure transport and run
    info(f'Starting with transport: {transport}')
    if transport == "sse":
        mcp.run(transport="sse")
    else:
        mcp.run(transport="stdio")


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/tests/test_health.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import json
import logging
import re
from typing import Any, Dict, Optional

import pytest

logger = logging.getLogger("prefect-mcp-test")


def extract_id_from_response(response_text: str, key: str = "id"):
    """
    Extract an ID from a response text using regex pattern.
    
    Args:
        response_text: The text containing the ID
        key: The key associated with the ID (default: "id")
        
    Returns:
        The extracted ID or None if not found
    """
    try:
        # Extract the UUID using regex pattern
        uuid_match = re.search(rf"'{key}': UUID\('([0-9a-f-]+)'\)", response_text)
        if uuid_match:
            return uuid_match.group(1)
        
        # Try another pattern (regular string ID)
        id_match = re.search(rf"'{key}': '([^']+)'", response_text)
        if id_match:
            return id_match.group(1)
            
        return None
    except Exception as e:
        logger.error(f"Error extracting ID: {e}")
        return None


def skip_if_tool_missing(tool_name: str):
    """Decorator to skip a test if a required tool is missing."""
    def decorator(func):
        @pytest.mark.asyncio
        async def wrapper(tool_map, session, *args, **kwargs):
            if tool_name not in tool_map:
                pytest.skip(f"Tool '{tool_name}' not available")
            return await func(tool_map, session, *args, **kwargs)
        return wrapper
    return decorator
```

--------------------------------------------------------------------------------
/tests/utils.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import json
import logging
import re
from typing import Any, Dict, Optional

import pytest

logger = logging.getLogger("prefect-mcp-test")


def extract_id_from_response(response_text: str, key: str = "id"):
    """
    Extract an ID from a response text using regex pattern.
    
    Args:
        response_text: The text containing the ID
        key: The key associated with the ID (default: "id")
        
    Returns:
        The extracted ID or None if not found
    """
    try:
        # Extract the UUID using regex pattern - handle UUID('...') format
        uuid_match = re.search(rf"'{key}': UUID\('([0-9a-f-]+)'\)", response_text)
        if uuid_match:
            return uuid_match.group(1)
        
        # Try another pattern (regular string ID)
        id_match = re.search(rf"'{key}': '([^']+)'", response_text)
        if id_match:
            return id_match.group(1)
        
        # Try JSON parsing as fallback
        try:
            # Replace single quotes with double quotes for JSON parsing
            json_text = response_text.replace("'", '"')
            # Handle UUID objects in the text
            json_text = re.sub(r'UUID\("([^"]+)"\)', r'"\1"', json_text)
            parsed = json.loads(json_text)
            if isinstance(parsed, dict) and key in parsed:
                return str(parsed[key])
        except (json.JSONDecodeError, KeyError):
            pass
            
        return None
    except Exception as e:
        logger.error(f"Error extracting ID: {e}")
        return None

```

--------------------------------------------------------------------------------
/tests/test_deployments.py:
--------------------------------------------------------------------------------

```python
# test_deployments.py
import asyncio
import logging
import pytest

from .conftest import prefect_client

pytestmark = pytest.mark.anyio

logger = logging.getLogger("prefect-mcp-test")


async def test_get_deployments():
    """Test getting a list of deployments."""
    async with prefect_client("get_deployments") as (session, tools):
        logger.info("Testing get_deployments tool...")
        async with asyncio.timeout(10):
            deployments_result = await session.call_tool("get_deployments", {"limit": 5})
        
        # Verify response contains text content
        assert deployments_result.content is not None
        
        for content in deployments_result.content:
            if content.type == "text":
                logger.info(f"Deployments result: {content.text[:200]}...")
                assert "deployments" in content.text

async def test_get_deployments_with_filter():
    """Test getting deployments with filtering."""
    async with prefect_client("get_deployments") as (session, tools):
        logger.info("Testing get_deployments with filter...")
        async with asyncio.timeout(10):
            filtered_result = await session.call_tool(
                "get_deployments", 
                {"limit": 3, "flow_name": "test"}
            )
        
        # Verify response contains text content
        assert filtered_result.content is not None
        
        for content in filtered_result.content:
            if content.type == "text":
                logger.info(f"Filtered deployments result: {content.text[:200]}...")
                assert "deployments" in content.text
```

--------------------------------------------------------------------------------
/tests/test_task_runs.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import logging
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")

async def test_get_task_runs():
    """Test getting a list of task runs."""
    async with prefect_client("get_task_runs") as (session, tools):
        logger.info("Testing get_task_runs tool...")
        async with asyncio.timeout(10):
            task_runs_result = await session.call_tool("get_task_runs", {"limit": 5})
            
            # Verify response contains text content
            assert task_runs_result.content is not None
            for content in task_runs_result.content:
                if content.type == "text":
                    logger.info(f"Task runs result: {content.text[:200]}...")
                    assert "task_runs" in content.text

async def test_get_task_runs_with_filter():
    """Test getting task runs with filtering."""
    async with prefect_client("get_task_runs") as (session, tools):
        logger.info("Testing get_task_runs with filter...")
        async with asyncio.timeout(10):
            filtered_result = await session.call_tool(
                "get_task_runs", 
                {"limit": 3, "task_name": "test"}
            )
            
            # Verify response contains text content
            assert filtered_result.content is not None
            for content in filtered_result.content:
                if content.type == "text":
                    logger.info(f"Filtered task runs result: {content.text[:200]}...")
                    assert "task_runs" in content.text
```

--------------------------------------------------------------------------------
/tests/test_workspaces.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import logging
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")

async def test_get_workspaces():
    """Test getting workspaces (Cloud-only feature)."""
    async with prefect_client("get_workspaces") as (session, tools):
        logger.info("Testing get_workspaces tool (expect message about Cloud-only)...")
        async with asyncio.timeout(10):
            workspaces_result = await session.call_tool("get_workspaces")
            
            # Verify response contains text content
            assert workspaces_result.content is not None
            for content in workspaces_result.content:
                if content.type == "text":
                    logger.info(f"Workspaces response: {content.text}")
                    # Cloud-only feature - response might indicate it's not available
                    assert content.text

async def test_get_workspaces_with_filter():
    """Test getting workspaces with filtering (Cloud-only feature)."""
    async with prefect_client("get_workspaces") as (session, tools):
        logger.info("Testing get_workspaces with filter (expect message about Cloud-only)...")
        async with asyncio.timeout(10):
            filtered_result = await session.call_tool(
                "get_workspaces", 
                {"name": "test"}
            )
            
            # Verify response contains text content
            assert filtered_result.content is not None
            for content in filtered_result.content:
                if content.type == "text":
                    logger.info(f"Filtered workspaces response: {content.text}")
                    # Cloud-only feature - response might indicate it's not available
                    assert content.text
```

--------------------------------------------------------------------------------
/tests/test_variables.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import json
import logging
import uuid
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")

async def test_get_variables():
    """Test getting variables."""
    async with prefect_client("get_variables") as (session, tools):
        logger.info("Testing get_variables tool...")
        async with asyncio.timeout(10):
            variables_result = await session.call_tool("get_variables", {"limit": 5})
            
            # Verify response contains text content
            assert variables_result.content is not None
            for content in variables_result.content:
                if content.type == "text":
                    logger.info(f"Variables result: {content.text[:200]}...")
                    assert "variables" in content.text

async def test_get_variables_with_filter():
    """Test getting variables with filtering."""
    async with prefect_client("get_variables") as (session, tools):
        logger.info("Testing get_variables with filter...")
        async with asyncio.timeout(10):
            filtered_result = await session.call_tool(
                "get_variables", 
                {"limit": 3, "tag": "test"}
            )
            
            # Verify response contains text content
            assert filtered_result.content is not None
            for content in filtered_result.content:
                if content.type == "text":
                    logger.info(f"Filtered variables result: {content.text[:200]}...")
                    assert "variables" in content.text

async def test_create_and_delete_variable():
    """Test creating and deleting a variable."""
    async with prefect_client("create_variable") as (session, tools):
        # Create a test variable with a unique name
        test_var_name = f"test_var_{uuid.uuid4().hex[:8]}"
        logger.info(f"Testing create_variable with name: {test_var_name}...")
        
        async with asyncio.timeout(10):
            create_result = await session.call_tool("create_variable", {
                "name": test_var_name,
                "value": json.dumps({"test": True, "created_by": "mcp_test"}),
                "tags": ["test", "mcp_test"]
            })
            
            # Verify response contains text content
            assert create_result.content is not None
            variable_created = False
            for content in create_result.content:
                if content.type == "text":
                    logger.info(f"Create variable result: {content.text[:200]}...")
                    variable_created = test_var_name in content.text
                    assert variable_created, "Variable was not created successfully"
            
            # Now try to delete it
            logger.info(f"Testing delete_variable for name: {test_var_name}...")
            delete_result = await session.call_tool("delete_variable", {"name": test_var_name})
            
            # Verify response contains text content
            assert delete_result.content is not None
            variable_deleted = False
            for content in delete_result.content:
                if content.type == "text":
                    logger.info(f"Delete variable result: {content.text}")
                    variable_deleted = "deleted" in content.text.lower() or "success" in content.text.lower()
                    assert variable_deleted, "Variable was not deleted successfully"
```

--------------------------------------------------------------------------------
/tests/test_workqueues.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import logging
import uuid
import json
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")

async def test_get_work_queues():
    """Test getting work queues."""
    async with prefect_client("get_work_queues") as (session, tools):
        logger.info("Testing get_work_queues tool...")
        async with asyncio.timeout(10):
            work_queues_result = await session.call_tool("get_work_queues", {"limit": 5})
            
            # Verify response contains text content
            assert work_queues_result.content is not None
            for content in work_queues_result.content:
                if content.type == "text":
                    logger.info(f"Work queues result: {content.text[:200]}...")
                    assert "work_queues" in content.text

async def test_get_work_queues_with_filter():
    """Test getting work queues with filtering."""
    async with prefect_client("get_work_queues") as (session, tools):
        logger.info("Testing get_work_queues with filter...")
        async with asyncio.timeout(10):
            filtered_result = await session.call_tool(
                "get_work_queues", 
                {"limit": 3, "queue_name": "test"}
            )
            
            # Verify response contains text content
            assert filtered_result.content is not None
            for content in filtered_result.content:
                if content.type == "text":
                    logger.info(f"Filtered work queues result: {content.text[:200]}...")
                    assert "work_queues" in content.text

async def test_create_and_delete_work_queue():
    """Test creating a work queue and verifying it exists."""
    async with prefect_client("create_work_queue") as (session, tools):
        # Create a test work queue with a unique name
        test_queue_name = f"test_queue_{uuid.uuid4().hex[:8]}"
        logger.info(f"Testing create_work_queue with name: {test_queue_name}...")
        
        async with asyncio.timeout(10):
            create_result = await session.call_tool("create_work_queue", {
                "name": test_queue_name,
                "description": "Test work queue created by MCP test"
            })
            
            # Verify response contains text content
            assert create_result.content is not None
            work_queue_id = None
            for content in create_result.content:
                if content.type == "text":
                    logger.info(f"Create work queue result: {content.text[:200]}...")
                    # Use the utility function to extract ID
                    from .utils import extract_id_from_response
                    work_queue_id = extract_id_from_response(content.text, "id")
                    
                    assert work_queue_id, "Work queue ID not found in response"
            
            # Verify the work queue was created by trying to get it
            logger.info(f"Testing get_work_queue for ID: {work_queue_id}...")
            get_result = await session.call_tool("get_work_queue", {"work_queue_id": work_queue_id})
            
            # Verify response contains text content
            assert get_result.content is not None
            queue_found = False
            for content in get_result.content:
                if content.type == "text":
                    logger.info(f"Get work queue result: {content.text[:200]}...")
                    queue_found = work_queue_id in content.text or test_queue_name in content.text
                    assert queue_found, "Work queue was not found after creation"

```

--------------------------------------------------------------------------------
/tests/test_flow_runs.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import logging
import json
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")

async def test_get_flow_runs():
    """Test getting a list of flow runs."""
    async with prefect_client("get_flow_runs") as (session, tools):
        logger.info("Testing get_flow_runs tool...")
        async with asyncio.timeout(10):
            flow_runs_result = await session.call_tool("get_flow_runs", {"limit": 5})
            
            # Verify response contains text content
            assert flow_runs_result.content is not None
            for content in flow_runs_result.content:
                if content.type == "text":
                    logger.info(f"Flow runs result: {content.text[:200]}...")
                    assert "flow_runs" in content.text

async def test_get_flow_runs_with_filter():
    """Test getting flow runs with filtering."""
    async with prefect_client("get_flow_runs") as (session, tools):
        logger.info("Testing get_flow_runs with filter...")
        async with asyncio.timeout(10):
            # Remove flow_name parameter since it's not supported
            filtered_result = await session.call_tool(
                "get_flow_runs", 
                {"limit": 3}
            )
            
            # Verify response contains text content
            assert filtered_result.content is not None
            for content in filtered_result.content:
                if content.type == "text":
                    logger.info(f"Filtered flow runs result: {content.text[:200]}...")
                    assert "flow_runs" in content.text

async def test_get_flow_run_by_id():
    """Test getting a specific flow run by ID."""
    async with prefect_client(["get_flow_runs", "get_flow_run"]) as (session, tools):
        logger.info("Testing get_flow_run tool...")
        async with asyncio.timeout(10):
            # Get a list of flow runs first to get a valid ID
            flow_runs_result = await session.call_tool("get_flow_runs", {"limit": 1})
            
            flow_run_id = None
            # Extract a flow run ID if possible
            for content in flow_runs_result.content:
                if content.type == "text":
                    try:
                        # Use the utility function to extract ID
                        from .utils import extract_id_from_response
                        flow_run_id = extract_id_from_response(content.text, "id")
                        if flow_run_id:
                            break
                        
                        # Fallback to manual parsing
                        parsed = json.loads(content.text.replace("'", '"'))
                        if parsed.get("flow_runs") and len(parsed["flow_runs"]) > 0:
                            flow_run_id = parsed["flow_runs"][0].get("id")
                    except (json.JSONDecodeError, KeyError):
                        pass
            
            # If no flow run ID is found, just log and return (don't skip inside async context)
            if not flow_run_id:
                logger.info("No flow runs available to test get_flow_run - test will pass without validation")
                return
            
            logger.info(f"Testing get_flow_run with ID: {flow_run_id}...")
            flow_run_result = await session.call_tool("get_flow_run", {"flow_run_id": flow_run_id})
            
            # Verify response contains text content
            assert flow_run_result.content is not None
            for content in flow_run_result.content:
                if content.type == "text":
                    logger.info(f"Flow run details result: {content.text[:200]}...")
                    # Verify that the response contains the ID we requested
                    assert flow_run_id in content.text

```

--------------------------------------------------------------------------------
/src/mcp_prefect/workspace.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client

from .server import mcp


@mcp.tool
async def get_workspaces(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    name: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of accessible workspaces.
    
    Args:
        limit: Maximum number of workspaces to return
        offset: Number of workspaces to skip
        name: Filter workspaces by name (note: filtering may not be supported by all Prefect versions)
        
    Returns:
        A list of workspaces with their details
    """
    try:
        async with get_client() as client:
            workspaces = await client.read_workspaces(
                limit=limit,
                offset=offset,
            )
            
            workspaces_result = {
                "workspaces": [workspace.dict() for workspace in workspaces]
            }
            
            return [types.TextContent(type="text", text=str(workspaces_result))]
    except Exception as e:
        # For local Prefect instances, workspace APIs may not be available
        return [types.TextContent(
            type="text",
            text="Workspaces are only available in Prefect Cloud. This appears to be a local Prefect instance."
        )]


@mcp.tool
async def get_current_workspace() -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get the current workspace.
    
    Returns:
        Details of the current workspace
    """
    try:
        async with get_client() as client:
            workspace = await client.read_workspace()
            
            return [types.TextContent(type="text", text=str(workspace.dict()))]
    except Exception as e:
        # For local Prefect instances, workspace APIs may not be available
        return [types.TextContent(
            type="text",
            text="Workspaces are only available in Prefect Cloud. This appears to be a local Prefect instance."
        )]


@mcp.tool
async def get_workspace(
    workspace_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a workspace by ID.
    
    Args:
        workspace_id: The workspace UUID
        
    Returns:
        Workspace details
    """
    try:
        async with get_client() as client:
            workspace = await client.read_workspace_by_id(UUID(workspace_id))
            
            return [types.TextContent(type="text", text=str(workspace.dict()))]
    except Exception as e:
        # For local Prefect instances, workspace APIs may not be available
        return [types.TextContent(
            type="text",
            text="Workspaces are only available in Prefect Cloud. This appears to be a local Prefect instance."
        )]


@mcp.tool
async def get_workspace_by_handle(
    account_handle: str,
    workspace_handle: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a workspace by its handle.
    
    Args:
        account_handle: The account handle
        workspace_handle: The workspace handle
        
    Returns:
        Workspace details
    """
    try:
        async with get_client() as client:
            workspace = await client.read_workspace_by_handle(
                account_handle=account_handle,
                workspace_handle=workspace_handle
            )
            
            return [types.TextContent(type="text", text=str(workspace.dict()))]
    except Exception as e:
        # For local Prefect instances, workspace APIs may not be available
        return [types.TextContent(
            type="text",
            text="Workspaces are only available in Prefect Cloud. This appears to be a local Prefect instance."
        )]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "mcp-prefect"
version = "0.2.3419"
description = "MCP Server for Prefect"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [
    { name = "James Munsch", email = "[email protected]" },
]
dependencies = [
    "aiosqlite>=0.21.0",
    "alembic>=1.16.5",
    "annotated-types>=0.7.0",
    "anyio>=4.10.0",
    "apprise>=1.9.4",
    "asgi-lifespan>=2.1.0",
    "asyncpg>=0.30.0",
    "attrs>=25.3.0",
    "authlib>=1.6.4",
    "cachetools>=6.2.0",
    "certifi>=2025.8.3",
    "cffi>=2.0.0",
    "charset-normalizer>=3.4.3",
    "click>=8.3.0",
    "cloudpickle>=3.1.1",
    "colorama>=0.4.6",
    "coolname>=2.2.0",
    "cryptography>=46.0.1",
    "dateparser>=1.2.2",
    "deprecated>=1.2.18",
    "docker>=7.1.0",
    "exceptiongroup>=1.3.0",
    "fastapi>=0.117.1",
    "fastmcp>=2.12.3",
    "fsspec>=2025.9.0",
    "graphviz>=0.21",
    "greenlet>=3.2.4",
    "griffe>=1.14.0",
    "h11>=0.16.0",
    "h2>=4.3.0",
    "hpack>=4.1.0",
    "httpcore>=1.0.9",
    "httpx>=0.28.1",
    "httpx-sse>=0.4.1",
    "humanize>=4.13.0",
    "hyperframe>=6.1.0",
    "idna>=3.10",
    "importlib-metadata>=8.7.0",
    "jinja2>=3.1.6",
    "jinja2-humanize-extension>=0.4.0",
    "jsonpatch>=1.33",
    "jsonpointer>=3.0.0",
    "jsonschema>=4.25.1",
    "jsonschema-specifications>=2025.9.1",
    "mako>=1.3.10",
    "markdown>=3.9",
    "markdown-it-py>=4.0.0",
    "markupsafe>=3.0.2",
    "mcp>=1.14.1",
    "mdurl>=0.1.2",
    "oauthlib>=3.3.1",
    "openapi-pydantic>=0.5.1",
    "opentelemetry-api>=1.37.0",
    "orjson>=3.11.3",
    "packaging>=25.0",
    "pathspec>=0.12.1",
    "pendulum>=3.1.0",
    "pip>=24.2",
    "prefect>=3.4.19",
    "prometheus-client>=0.23.1",
    "pycparser>=2.23",
    "pydantic>=2.11.9",
    "pydantic-core>=2.33.2",
    "pydantic-extra-types>=2.10.5",
    "pydantic-settings>=2.10.1",
    "pygments>=2.19.2",
    "pyproject-freeze>=0.1.1",
    "python-dateutil>=2.9.0.post0",
    "python-dotenv>=1.1.1",
    "python-multipart>=0.0.20",
    "python-slugify>=8.0.4",
    "python-socks>=2.7.2",
    "pytz>=2025.2",
    "pyyaml>=6.0.2",
    "readchar>=4.2.1",
    "referencing>=0.36.2",
    "regex>=2025.9.18",
    "requests>=2.32.5",
    "requests-oauthlib>=2.0.0",
    "rfc3339-validator>=0.1.4",
    "rich>=14.1.0",
    "rpds-py>=0.27.1",
    "ruamel-yaml>=0.18.15",
    "ruamel-yaml-clib>=0.2.12",
    "shellingham>=1.5.4",
    "six>=1.17.0",
    "sniffio>=1.3.1",
    "sqlalchemy>=2.0.43",
    "sse-starlette>=3.0.2",
    "starlette>=0.48.0",
    "text-unidecode>=1.3",
    "time-machine>=2.19.0",
    "toml>=0.10.2",
    "tomlkit>=0.13.2",
    "typer>=0.17.4",
    "typing-extensions>=4.15.0",
    "typing-inspection>=0.4.1",
    "tzdata>=2025.2",
    "tzlocal>=5.3.1",
    "ujson>=5.11.0",
    "urllib3>=2.5.0",
    "uv>=0.8.19",
    "uvicorn>=0.36.0",
    "websockets>=15.0.1",
    "wrapt>=1.17.3",
    "zipp>=3.23.0",
]

[project.scripts]
mcp-prefect = "mcp_prefect.main:main"

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.21.1",
    "black>=23.0.0",
    "isort>=5.0.0",
    "mypy>=1.0.0",
]

[tool.hatch.build.targets.wheel]
packages = ["src/mcp_prefect"]

[tool.hatch.metadata]
allow-direct-references = true

[tool.hatch.envs.default]
dependencies = [
    "pytest>=7.0.0",
    "black>=23.0.0",
    "isort>=5.0.0",
    "mypy>=1.0.0",
]

[tool.hatch.envs.default.scripts]
test = "pytest {args:tests}"
lint = "black src tests && isort src tests"
typecheck = "mypy src"

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = "test_*.py"
python_classes = "Test*"
python_functions = "test_*"
log_cli = true
log_cli_level = "INFO"

[tool.black]
line-length = 100

[tool.isort]
profile = "black"
line_length = 100

[tool.mypy]
python_version = "3.9"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true

```

--------------------------------------------------------------------------------
/src/mcp_prefect/block.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client

from .server import mcp


@mcp.tool
async def get_block_types(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    slug: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of block types with optional filtering.
    
    Args:
        limit: Maximum number of block types to return
        offset: Number of block types to skip
        slug: Filter by slug pattern
        
    Returns:
        A list of block types with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if slug:
            filters["slug"] = {"like_": f"%{slug}%"}
        
        block_types = await client.read_block_types(
            # 
            # limit=limit,
            # offset=offset,
            # **filters
        )
        
        block_types_result = {
            "block_types": [block_type.model_dump() for block_type in block_types]
        }
        
        return [types.TextContent(type="text", text=str(block_types_result))]


@mcp.tool
async def get_block_type(
    slug: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a block type by slug.
    
    Args:
        slug: The block type slug
        
    Returns:
        Block type details
    """
    async with get_client() as client:
        block_type = await client.read_block_type_by_slug(slug)
        
        return [types.TextContent(type="text", text=str(block_type.model_dump()))]


@mcp.tool
async def get_block_documents(
    block_type_slug: str,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    name: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get block documents by block type.
    
    Args:
        block_type_slug: The block type slug
        limit: Maximum number of block documents to return
        offset: Number of block documents to skip
        name: Filter by name pattern
        
    Returns:
        A list of block documents with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if block_type_slug:
            filters["block_type_slug"] = {"eq_": block_type_slug}
        if name:
            filters["name"] = {"like_": f"%{name}%"}
        
        block_documents = await client.read_block_documents(
            limit=limit,
            offset=offset,
            **filters
        )
        
        block_documents_result = {
            "block_documents": [block_doc.model_dump() for block_doc in block_documents]
        }
        
        return [types.TextContent(type="text", text=str(block_documents_result))]


@mcp.tool
async def get_block_document(
    block_document_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a block document by ID.
    
    Args:
        block_document_id: The block document UUID
        
    Returns:
        Block document details
    """
    async with get_client() as client:
        block_document = await client.read_block_document(UUID(block_document_id))
        
        return [types.TextContent(type="text", text=str(block_document.model_dump()))]


@mcp.tool
async def delete_block_document(
    block_document_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a block document by ID.
    
    Args:
        block_document_id: The block document UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.delete_block_document(UUID(block_document_id))
        
        return [types.TextContent(type="text", text=f"Block document '{block_document_id}' deleted successfully.")]

```

--------------------------------------------------------------------------------
/tests/test_flows.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import json
import logging
import uuid
import pytest
from .conftest import prefect_client

pytestmark = pytest.mark.anyio
logger = logging.getLogger("prefect-mcp-test")


async def wait_for_flow_run(session, flow_run_id, max_attempts=10, delay=1):
    """
    Poll for flow run details, waiting for it to be in a retrievable state.
    
    :param session: The MCP client session
    :param flow_run_id: ID of the flow run to retrieve
    :param max_attempts: Maximum number of polling attempts
    :param delay: Delay between attempts in seconds
    :return: Flow run details or None if not found
    """
    for attempt in range(max_attempts):
        try:
            flow_run_result = await session.call_tool("get_flow_run", {"flow_run_id": flow_run_id})
            
            # Check the response content
            for content in flow_run_result.content:
                if content.type == "text":
                    # Try to parse the response
                    try:
                        parsed = json.loads(content.text.replace("'", '"'))
                        # Add more sophisticated state checking if needed
                        if parsed and parsed.get('id') == flow_run_id:
                            return parsed
                    except (json.JSONDecodeError, KeyError):
                        pass
            
            # If we didn't return, wait and continue
            await asyncio.sleep(delay)
        
        except Exception as e:
            logger.warning(f"Attempt {attempt + 1} failed: {e}")
            await asyncio.sleep(delay)
    
    return None

async def test_get_flow_run_by_id():
    """Test getting flow runs from existing flows and deployments."""
    async with prefect_client(["get_flow_runs", "get_flow_run"]) as (session, tools):
        logger.info("Testing get_flow_runs and get_flow_run...")
        
        async with asyncio.timeout(30):
            # First, get existing flow runs
            flow_runs_result = await session.call_tool("get_flow_runs", {"limit": 5})
            
            # Verify response contains text content
            assert flow_runs_result.content is not None
            flow_run_id = None
            
            for content in flow_runs_result.content:
                if content.type == "text":
                    logger.info(f"Flow runs result: {content.text[:200]}...")
                    assert "flow_runs" in content.text
                    
                    # Try to extract a flow run ID from the response
                    from .utils import extract_id_from_response
                    flow_run_id = extract_id_from_response(content.text, "id")
                    if flow_run_id:
                        break
            
            if flow_run_id:
                # Test getting a specific flow run
                logger.info(f"Testing get_flow_run for ID: {flow_run_id}...")
                flow_run_result = await session.call_tool("get_flow_run", {"flow_run_id": flow_run_id})
                
                # Verify response contains text content
                assert flow_run_result.content is not None
                for content in flow_run_result.content:
                    if content.type == "text":
                        logger.info(f"Flow run result: {content.text[:200]}...")
                        assert flow_run_id in content.text
            else:
                logger.info("No flow runs available to test get_flow_run with")

            # Test filtered flow runs (without flow_name since it's not supported)
            async with asyncio.timeout(10):
                filtered_result = await session.call_tool(
                    "get_flow_runs", 
                    {"limit": 3}
                )
                
                # Verify response contains text content
                assert filtered_result.content is not None
                for content in filtered_result.content:
                    if content.type == "text":
                        logger.info(f"Filtered flow runs result: {content.text[:200]}...")
                        assert "flow_runs" in content.text

```

--------------------------------------------------------------------------------
/src/mcp_prefect/variable.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
import json

import mcp.types as types
from prefect import get_client

from .server import mcp


@mcp.tool
async def get_variables(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    name: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of variables with optional filtering.
    
    Args:
        limit: Maximum number of variables to return
        offset: Number of variables to skip
        name: Filter by name pattern
        
    Returns:
        A list of variables with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if name:
            filters["name"] = {"like_": f"%{name}%"}
        
        variables = await client.read_variables(
            limit=limit,
            # prefect 3.3.3 doesn't have these
            # offset=offset,
            # **filters
        )
        
        variables_result = {
            "variables": [variable.model_dump() for variable in variables]
        }
        
        return [types.TextContent(type="text", text=str(variables_result))]


@mcp.tool
async def get_variable(
    name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a variable by name.
    
    Args:
        name: The variable name
        
    Returns:
        Variable details
    """
    async with get_client() as client:
        variable = await client.read_variable(name)
        
        return [types.TextContent(type="text", text=str(variable.model_dump()))]


@mcp.tool
async def create_variable(
    name: str,
    value: Any,  # Change type to Any to support different value types
    tags: Optional[List[str]] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a variable.
    
    Args:
        name: The variable name
        value: The variable value (can be string, dict, list, etc.)
        tags: Optional tags
        
    Returns:
        Details of the created variable
    """
    try:
        async with get_client() as client:
            # Import the VariableCreate model
            from prefect.client.schemas.actions import VariableCreate
            
            # Create the proper variable object
            variable_create = VariableCreate(
                name=name,
                value=value,  # Pass value directly, no parsing needed
                tags=tags or []
            )
            
            # Use the variable object with the client
            variable = await client.create_variable(variable=variable_create)
            
            variable_result = {"variable": variable.model_dump()}
            return [types.TextContent(type="text", text=str(variable_result))]
    except Exception as e:
        return [types.TextContent(type="text", text=str({"error": str(e)}))]


@mcp.tool
async def update_variable(
    name: str,
    value: Optional[str] = None,
    tags: Optional[List[str]] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a variable.
    
    Args:
        name: The variable name
        value: New value
        tags: New tags
        
    Returns:
        Details of the updated variable
    """
    async with get_client() as client:
        # Prepare update data
        update_data = {}
        if value is not None:
            # Parse value if it's a valid JSON
            try:
                parsed_value = json.loads(value)
            except json.JSONDecodeError:
                # If it's not valid JSON, use the string as-is
                parsed_value = value
                
            update_data["value"] = parsed_value
        if tags is not None:
            update_data["tags"] = tags
        
        updated_variable = await client.update_variable(
            name=name,
            **update_data
        )
        
        return [types.TextContent(type="text", text=str(updated_variable.model_dump()))]


@mcp.tool
async def delete_variable(
    name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a variable by name.
    
    Args:
        name: The variable name
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.delete_variable_by_name(name)
        
        return [types.TextContent(type="text", text=f"Variable '{name}' deleted successfully.")]

```

--------------------------------------------------------------------------------
/src/mcp_prefect/flow.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client

from .envs import PREFECT_API_URL
from .server import mcp


def get_flow_url(flow_id: str) -> str:
    """Generate a UI URL for a flow."""
    base_url = PREFECT_API_URL.replace("/api", "")
    return f"{base_url}/flows/{flow_id}"


@mcp.tool
async def get_flows(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    flow_name: Optional[str] = None,
    tags: Optional[List[str]] = None,
    created_after: Optional[str] = None,
    created_before: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of flows with optional filtering.
    
    Args:
        limit: Maximum number of flows to return
        offset: Number of flows to skip
        flow_name: Filter flows by name
        tags: Filter flows by tags
        created_after: ISO formatted datetime string for filtering flows created after this time
        created_before: ISO formatted datetime string for filtering flows created before this time
        
    Returns:
        A list of flows with their details
    """
    try:
        async with get_client() as client:
            # Build flow filter
            flow_filter = None
            if any([flow_name, tags, created_after, created_before]):
                from prefect.client.schemas.filters import FlowFilter
                
                filter_dict = {}
                if flow_name:
                    filter_dict["name"] = {"like_": f"%{flow_name}%"}
                if tags:
                    filter_dict["tags"] = {"all_": tags}
                
                # Handle date filters
                if created_after or created_before:
                    created_filters = {}
                    if created_after:
                        created_filters["ge_"] = created_after
                    if created_before:
                        created_filters["le_"] = created_before
                    filter_dict["created"] = created_filters
                
                flow_filter = FlowFilter(**filter_dict)
            
            # Query using proper filter object
            flows = await client.read_flows(
                flow_filter=flow_filter,
                limit=limit,
                offset=offset,
            )
            
            # Handle empty results
            if not flows:
                return [types.TextContent(type="text", text=str({"flows": []}))]
            
            # Add UI links to each flow
            flows_with_links = []
            for flow in flows:
                flow_dict = flow.model_dump()
                flow_dict["ui_url"] = get_flow_url(str(flow.id))
                flows_with_links.append(flow_dict)
                
            flows_result = {"flows": flows_with_links}
            
            return [types.TextContent(type="text", text=str(flows_result))]        
    except Exception as e:
        error_message = f"Error fetching flows: {str(e)}"
        return [types.TextContent(type="text", text=error_message)]


@mcp.tool
async def get_flow(
    flow_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get details of a specific flow by ID.
    
    Args:
        flow_id: The flow UUID
        
    Returns:
        Flow details
    """
    try:
        async with get_client() as client:
            # Validate flow_id
            try:
                flow_uuid = UUID(flow_id)
            except ValueError:
                return [types.TextContent(
                    type="text", 
                    text=f"Invalid flow ID format: {flow_id}. Must be a valid UUID."
                )]
            
            flow = await client.read_flow(flow_uuid)
            
            # Add UI link
            flow_dict = flow.model_dump()
            flow_dict["ui_url"] = get_flow_url(flow_id)
            
            return [types.TextContent(type="text", text=str(flow_dict))]
    
    except Exception as e:
        error_message = f"Error fetching flow {flow_id}: {str(e)}"
        return [types.TextContent(type="text", text=error_message)]


@mcp.tool
async def delete_flow(
    flow_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a flow by ID.
    
    Args:
        flow_id: The flow UUID
        
    Returns:
        Confirmation message
    """
    try:
        async with get_client() as client:
            # Validate flow_id
            try:
                flow_uuid = UUID(flow_id)
            except ValueError:
                return [types.TextContent(
                    type="text", 
                    text=f"Invalid flow ID format: {flow_id}. Must be a valid UUID."
                )]
            
            await client.delete_flow(flow_uuid)
            
            return [types.TextContent(type="text", text=f"Flow '{flow_id}' deleted successfully.")]
    
    except Exception as e:
        error_message = f"Error deleting flow {flow_id}: {str(e)}"
        return [types.TextContent(type="text", text=error_message)]

```

--------------------------------------------------------------------------------
/src/mcp_prefect/work_queue.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client

from .server import mcp


@mcp.tool
async def get_work_queues(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    name: Optional[str] = None,
    is_paused: Optional[bool] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of work queues with optional filtering.
    
    Args:
        limit: Maximum number of work queues to return
        offset: Number of work queues to skip
        name: Filter by name
        is_paused: Filter by paused status
        
    Returns:
        A list of work queues with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if name:
            filters["name"] = {"like_": f"%{name}%"}
        if is_paused is not None:
            filters["is_paused"] = {"eq_": is_paused}
        
        work_queues = await client.read_work_queues(
            limit=limit,
            offset=offset,
            **filters
        )
        
        work_queues_result = {
            "work_queues": [work_queue.dict() for work_queue in work_queues]
        }
        
        return [types.TextContent(type="text", text=str(work_queues_result))]


@mcp.tool
async def get_work_queue(
    work_queue_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get details of a specific work queue by ID.
    
    Args:
        work_queue_id: The work queue UUID
        
    Returns:
        Work queue details
    """
    async with get_client() as client:
        work_queue = await client.read_work_queue(UUID(work_queue_id))
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]


@mcp.tool
async def get_work_queue_by_name(
    name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a work queue by name.
    
    Args:
        name: The work queue name
        
    Returns:
        Work queue details
    """
    async with get_client() as client:
        work_queue = await client.read_work_queue_by_name(name)
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]


@mcp.tool
async def create_work_queue(
    name: str,
    description: Optional[str] = None,
    is_paused: Optional[bool] = None,
    concurrency_limit: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a work queue.
    
    Args:
        name: The name for the work queue
        description: Optional description
        is_paused: Whether the queue should be paused upon creation
        concurrency_limit: Optional concurrency limit
        
    Returns:
        Details of the created work queue
    """
    async with get_client() as client:
        work_queue = await client.create_work_queue(
            name=name,
            description=description,
            is_paused=is_paused,
            concurrency_limit=concurrency_limit,
        )
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]


@mcp.tool
async def update_work_queue(
    work_queue_id: str,
    name: Optional[str] = None,
    description: Optional[str] = None,
    is_paused: Optional[bool] = None,
    concurrency_limit: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a work queue.
    
    Args:
        work_queue_id: The work queue UUID
        name: New name
        description: New description
        is_paused: New paused status
        concurrency_limit: New concurrency limit
        
    Returns:
        Details of the updated work queue
    """
    async with get_client() as client:
        # Prepare update data
        update_data = {}
        if name is not None:
            update_data["name"] = name
        if description is not None:
            update_data["description"] = description
        if is_paused is not None:
            update_data["is_paused"] = is_paused
        if concurrency_limit is not None:
            update_data["concurrency_limit"] = concurrency_limit
        
        work_queue = await client.update_work_queue(
            id=UUID(work_queue_id),
            **update_data
        )
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]


@mcp.tool
async def delete_work_queue(
    work_queue_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a work queue by ID.
    
    Args:
        work_queue_id: The work queue UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.delete_work_queue(UUID(work_queue_id))
        
        return [types.TextContent(type="text", text=f"Work queue '{work_queue_id}' deleted successfully.")]


@mcp.tool
async def pause_work_queue(
    work_queue_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Pause a work queue.
    
    Args:
        work_queue_id: The work queue UUID
        
    Returns:
        Details of the updated work queue
    """
    async with get_client() as client:
        work_queue = await client.update_work_queue(
            id=UUID(work_queue_id),
            is_paused=True
        )
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]


@mcp.tool
async def resume_work_queue(
    work_queue_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Resume a work queue.
    
    Args:
        work_queue_id: The work queue UUID
        
    Returns:
        Details of the updated work queue
    """
    async with get_client() as client:
        work_queue = await client.update_work_queue(
            id=UUID(work_queue_id),
            is_paused=False
        )
        
        return [types.TextContent(type="text", text=str(work_queue.dict()))]

```

--------------------------------------------------------------------------------
/src/mcp_prefect/task_run.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client
from prefect.states import Cancelled, Completed, Failed, Pending, Running, Scheduled

from .envs import PREFECT_API_URL
from .server import mcp


def get_task_run_url(task_run_id: str) -> str:
    base_url = PREFECT_API_URL.replace("/api", "")
    return f"{base_url}/task-runs/{task_run_id}"


@mcp.tool
async def get_task_runs(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    task_name: Optional[str] = None,
    state_type: Optional[str] = None,
    state_name: Optional[str] = None,
    tags: Optional[List[str]] = None,
    start_time_before: Optional[str] = None,
    start_time_after: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of task runs with optional filtering.
    
    Args:
        limit: Maximum number of task runs to return
        offset: Number of task runs to skip
        task_name: Filter by task name
        state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED")
        state_name: Filter by state name
        tags: Filter by tags
        start_time_before: ISO formatted datetime string
        start_time_after: ISO formatted datetime string
        
    Returns:
        A list of task runs with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if task_name:
            filters["task_name"] = {"like_": f"%{task_name}%"}
        if state_type:
            filters["state"] = {"type": {"any_": [state_type.upper()]}}
        if state_name:
            filters["state"] = {"name": {"any_": [state_name]}}
        if tags:
            filters["tags"] = {"all_": tags}
        if start_time_after:
            filters["start_time"] = {"ge_": start_time_after}
        if start_time_before:
            if "start_time" in filters:
                filters["start_time"]["le_"] = start_time_before
            else:
                filters["start_time"] = {"le_": start_time_before}
        
        task_runs = await client.read_task_runs(
            limit=limit,
            offset=offset,
            **filters
        )
        
        # Add UI links to each task run
        task_runs_result = {
            "task_runs": [
                {
                    **task_run.dict(),
                    "ui_url": get_task_run_url(str(task_run.id))
                }
                for task_run in task_runs
            ]
        }
        
        return [types.TextContent(type="text", text=str(task_runs_result))]


@mcp.tool
async def get_task_run(
    task_run_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get details of a specific task run by ID.
    
    Args:
        task_run_id: The task run UUID
        
    Returns:
        Task run details
    """
    async with get_client() as client:
        task_run = await client.read_task_run(UUID(task_run_id))
        
        # Add UI link
        task_run_dict = task_run.dict()
        task_run_dict["ui_url"] = get_task_run_url(task_run_id)
        
        return [types.TextContent(type="text", text=str(task_run_dict))]


@mcp.tool
async def get_task_runs_by_flow_run(
    flow_run_id: str,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    state_type: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get task runs for a specific flow run.
    
    Args:
        flow_run_id: The flow run UUID
        limit: Maximum number of task runs to return
        offset: Number of task runs to skip
        state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED")
        
    Returns:
        A list of task runs for the specified flow run
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {"flow_run_id": {"eq_": UUID(flow_run_id)}}
        if state_type:
            filters["state"] = {"type": {"any_": [state_type.upper()]}}
        
        task_runs = await client.read_task_runs(
            limit=limit,
            offset=offset,
            **filters
        )
        
        # Add UI links to each task run
        task_runs_result = {
            "task_runs": [
                {
                    **task_run.dict(),
                    "ui_url": get_task_run_url(str(task_run.id))
                }
                for task_run in task_runs
            ]
        }
        
        return [types.TextContent(type="text", text=str(task_runs_result))]


@mcp.tool
async def set_task_run_state(
    task_run_id: str,
    state: str,
    message: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Set a task run's state.
    
    Args:
        task_run_id: The task run UUID
        state: The new state to set (e.g., "SCHEDULED", "RUNNING", "COMPLETED", "FAILED")
        message: An optional message explaining the state change
        
    Returns:
        Result of the state change operation
    """
    async with get_client() as client:
        state_obj = None
        if state.upper() == "SCHEDULED":
            state_obj = Scheduled(message=message)
        elif state.upper() == "RUNNING":
            state_obj = Running(message=message)
        elif state.upper() == "COMPLETED":
            state_obj = Completed(message=message)
        elif state.upper() == "FAILED":
            state_obj = Failed(message=message)
        elif state.upper() == "PENDING":
            state_obj = Pending(message=message)
        elif state.upper() == "CANCELLED":
            state_obj = Cancelled(message=message)
        else:
            return [types.TextContent(
                type="text", 
                text=f"Invalid state '{state}'. Must be one of: SCHEDULED, RUNNING, COMPLETED, FAILED, PENDING, CANCELLED"
            )]
        
        result = await client.set_task_run_state(
            task_run_id=UUID(task_run_id),
            state=state_obj
        )
        
        return [types.TextContent(type="text", text=str(result.dict()))]

```

--------------------------------------------------------------------------------
/test_scripts/test_more_prefect.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import os
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union

import httpx
from mcp.client import Client as MCPClient
from mcp.types import TextContent


async def get_flow_run_status(client, flow_run_id: str):
    """Get the status of a flow run."""
    result = await client.call_tool("get_flow_run", arguments={"flow_run_id": flow_run_id})
    content = result.get("content", [])
    if content:
        text = content[0].get("text", "")
        data = eval(text)
        return data.get("state", {}).get("type", "UNKNOWN")
    return "UNKNOWN"


async def wait_for_flow_run_completion(client, flow_run_id: str, timeout_seconds: int = 300):
    """Wait for a flow run to complete, with timeout."""
    start_time = time.time()
    status = await get_flow_run_status(client, flow_run_id)
    
    print(f"Initial flow run status: {status}")
    
    while status not in ["COMPLETED", "FAILED", "CANCELLED"] and (time.time() - start_time) < timeout_seconds:
        print(f"Waiting for flow run to complete. Current status: {status}")
        await asyncio.sleep(5)
        status = await get_flow_run_status(client, flow_run_id)
    
    if (time.time() - start_time) >= timeout_seconds:
        print(f"Timeout waiting for flow run completion. Last status: {status}")
        return False
    
    print(f"Flow run completed with status: {status}")
    return status == "COMPLETED"


async def run_prefect_workflow():
    """Run a complete Prefect workflow via MCP."""
    print("Starting Prefect MCP workflow example...")
    
    # Configure MCP client to connect to the server
    mcp_url = os.environ.get("MCP_URL", "http://localhost:8000")
    client = MCPClient(url=mcp_url)
    
    try:
        # Initialize connection
        print("Connecting to MCP server...")
        await client.initialize()
        print("Connected successfully!")
        
        # 1. Find a deployment to work with
        print("\nLooking for available deployments...")
        deployments_result = await client.call_tool("get_deployments")
        deployments_content = deployments_result.get("content", [])
        
        if not deployments_content:
            print("No deployments found. Please create a deployment in Prefect first.")
            return
            
        deployments_text = deployments_content[0].get("text", "")
        deployments_data = eval(deployments_text)
        deployments = deployments_data.get('deployments', [])
        
        if not deployments:
            print("No deployments found. Please create a deployment in Prefect first.")
            return
        
        selected_deployment = deployments[0]
        deployment_id = selected_deployment.get('id')
        deployment_name = selected_deployment.get('name')
        print(f"Selected deployment: {deployment_name} (ID: {deployment_id})")
        
        # 2. Create a flow run from the deployment
        print(f"\nCreating a flow run from deployment {deployment_name}...")
        create_result = await client.call_tool(
            "create_flow_run_from_deployment", 
            arguments={
                "deployment_id": str(deployment_id),
                "name": f"MCP Test Flow Run {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
                "tags": ["mcp-test", "automated"]
            }
        )
        create_content = create_result.get("content", [])
        
        if not create_content:
            print("Failed to create flow run.")
            return
            
        create_text = create_content[0].get("text", "")
        flow_run_data = eval(create_text)
        flow_run_id = flow_run_data.get('id')
        flow_run_url = flow_run_data.get('ui_url')
        
        print(f"Created flow run with ID: {flow_run_id}")
        print(f"Flow run URL: {flow_run_url}")
        
        # 3. Wait for the flow run to complete
        print("\nWaiting for flow run to complete...")
        completed = await wait_for_flow_run_completion(client, str(flow_run_id))
        
        if not completed:
            print("Flow run did not complete successfully within the timeout period.")
            return
        
        # 4. Get task runs for the flow run
        print("\nRetrieving task runs for the completed flow run...")
        task_runs_result = await client.call_tool(
            "get_task_runs_by_flow_run", 
            arguments={"flow_run_id": str(flow_run_id)}
        )
        task_runs_content = task_runs_result.get("content", [])
        
        if not task_runs_content:
            print("No task runs found for this flow run.")
            return
            
        task_runs_text = task_runs_content[0].get("text", "")
        task_runs_data = eval(task_runs_text)
        task_runs = task_runs_data.get('task_runs', [])
        
        print(f"Found {len(task_runs)} task runs for flow run {flow_run_id}")
        
        # 5. Summarize task run statuses
        statuses = {}
        for task_run in task_runs:
            status = task_run.get('state', {}).get('type', 'UNKNOWN')
            if status in statuses:
                statuses[status] += 1
            else:
                statuses[status] = 1
                
        print("\nTask run status summary:")
        for status, count in statuses.items():
            print(f"- {status}: {count} task(s)")
        
        # 6. Get flow run details after completion
        print("\nGetting final flow run details...")
        flow_run_result = await client.call_tool("get_flow_run", arguments={"flow_run_id": str(flow_run_id)})
        flow_run_content = flow_run_result.get("content", [])
        
        if not flow_run_content:
            print("Failed to get flow run details.")
            return
            
        flow_run_text = flow_run_content[0].get("text", "")
        flow_run_data = eval(flow_run_text)
        
        start_time = flow_run_data.get('start_time')
        end_time = flow_run_data.get('end_time')
        
        if start_time and end_time:
            # Convert string times to datetime objects
            start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
            end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
            duration = (end_dt - start_dt).total_seconds()
            
            print(f"\nFlow run execution time: {duration:.2f} seconds")
        
        print("\nWorkflow completed successfully!")
        
    except Exception as e:
        print(f"Error during workflow: {e}")
    finally:
        # Cleanup
        await client.close()


if __name__ == "__main__":
    asyncio.run(run_prefect_workflow())
```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
from contextlib import asynccontextmanager
from datetime import datetime
import logging
import os
import time
from typing import Any, List
import uuid
import pytest

# Prefect imports
from prefect import flow, task
from prefect.client.orchestration import PrefectClient, get_client
from prefect.server.schemas.core import Flow
from prefect.server.schemas.actions import DeploymentCreate
from prefect.flows import FlowRun

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("prefect-mcp-test")

# Apply anyio marker to all test modules
pytest.anyio_backend = "asyncio"

def get_server_url():
    """Get the server URL from environment or use default."""
    url = os.environ.get("MCP_URL", "http://localhost:8000")
    if not url.endswith("/sse"):
        url = url.rstrip("/") + "/sse"
    return url

async def message_handler(
    message: Any,
) -> None:
    """Handle incoming messages from the server."""
    if isinstance(message, Exception):
        logger.error(f"Error: {message}")
        return
    logger.info(f"\nReceived message type: {type(message).__name__}\n")

async def create_and_run_flow(
    client: PrefectClient,
    flow: Flow,
    flow_name: str = None,
    flow_tags: List[str] = None,
    parameters: dict = None,
    max_wait_seconds: int = 60,
    poll_interval: float = 1.0
) -> FlowRun:
    """
    Create and run a Prefect flow using the Prefect client, with polling for completion.
    
    Args:
        client (PrefectClient): Prefect client instance
        flow_name (str, optional): Name of the flow.
        flow_tags (List[str], optional): Tags to associate with the flow
        parameters (dict, optional): Parameters to pass to the flow
        max_wait_seconds (int, optional): Maximum time to wait for flow run completion
        poll_interval (float, optional): Time between polling attempts
    
    Returns:
        FlowRun: The completed flow run
    """
    from prefect.client.schemas.actions import FlowCreate
    from prefect.client.schemas.objects import Flow as FlowModel
    from prefect.client.schemas.filters import FlowRunFilter
    from prefect.client.schemas.sorting import FlowRunSort
    import time
    import asyncio
    
    # Create flow via client
    flow_create_result = await client.create_flow(flow=flow)
    
    # Create a Flow object with a version
    # flow = FlowModel(
    #     id=flow_create_result, 
    #     name=flow_create.name, 
    #     version='0.1.0',
    #     tags=flow_tags or []
    # )
    
    # Create deployment
    deployment_create_result = await client.create_deployment(
        name=f"{flow.name}-deployment",
        flow_id=flow_create_result,
        infrastructure_document_id=None,
        tags=flow_tags or [],
        work_queue_name="default"
    )

    # Create a flow run
    flow_run = await client.create_flow_run(
        flow=flow,
        parameters=parameters or {},
        tags=flow_tags or []
    )
    
    # Poll for flow run completion
    start_time = time.time()
    while time.time() - start_time < max_wait_seconds:
        # Fetch the latest flow run state
        flow_run_filter = FlowRunFilter(id={'eq': flow_run.id})
        flow_runs = await client.read_flow_runs(
            flow_run_filter=flow_run_filter,
            sort=FlowRunSort.START_TIME_DESC,
            limit=1
        )
        
        if not flow_runs:
            raise ValueError(f"Flow run {flow_run.id} not found")
        
        current_flow_run = flow_runs[0]
        
        # Check if the flow run is in a terminal state
        if current_flow_run.state.is_final():
            return current_flow_run
        
        # Wait before next poll
        await asyncio.sleep(poll_interval)
    
    # Timeout occurred
    raise TimeoutError(f"Flow run {flow_run.id} did not complete within {max_wait_seconds} seconds")


# @pytest.fixture(scope="session", autouse=True)
# async def seed_test_data():
#     """
#     Seed test data once per test session.
#     This fixture runs automatically before any tests.
#     """
#     logger.info("Seeding test data...")
    
#     async with get_client() as client:
#         # Define test flows for seeding
#         @flow
#         def simple_test_flow(x: int = 1) -> int:
#             """A simple test flow for demonstration."""
#             return x * 2
        
#         @flow
#         def another_test_flow(message: str = "Hello") -> str:
#             """Another test flow for demonstration."""
#             return message.upper()
        
#         # Create and run test flows
#         try:
#             await create_and_run_flow(
#                 client,
#                 simple_test_flow, 
#                 flow_name="test-multiplication-flow", 
#                 flow_tags=["test", "example"],
#                 parameters={"x": 5}
#             )
            
#             await create_and_run_flow(
#                 client, 
#                 another_test_flow, 
#                 flow_name="test-message-flow", 
#                 flow_tags=["test", "example"],
#                 parameters={"message": "test message"}
#             )
            
#             logger.info("Test data seeding completed successfully")
#         except Exception as e:
#             logger.error(f"Error seeding test data: {e}")
#             raise

@asynccontextmanager
async def prefect_client(required_tools: List[str] | str):
    """Create a Prefect client session for testing."""
    from mcp.client.session import ClientSession
    from mcp.client.sse import sse_client
    
    server_url = get_server_url()
    if isinstance(required_tools, str):
        required_tools = [required_tools]
    
    async with sse_client(server_url) as (read_stream, write_stream):
        # Create a ClientSession with your message handler
        async with ClientSession(
            read_stream,
            write_stream,
            message_handler=message_handler
        ) as session:
            # Initialize the connection
            logger.info("Initializing MCP session...")
            init_result = await session.initialize()
            server_info = init_result.serverInfo
            logger.info(f"Connected to {server_info.name} v{server_info.version}")
            logger.info(f"Protocol version: {init_result.protocolVersion}")
            logger.info(f"Server capabilities: {init_result.capabilities}")
            
            # List available tools
            logger.info("Listing available tools...")
            tools_result = await session.list_tools()
            tools = [tool.name for tool in tools_result.tools]
            
            if not all(tool_name in tools for tool_name in required_tools):
                pytest.skip(f"One of the Tools: '{required_tools}' not available in {tools}")
            
            yield session, tools
```

--------------------------------------------------------------------------------
/src/mcp_prefect/flow_run.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client
from prefect.states import Cancelled, Completed, Failed, Pending, Running, Scheduled

from .envs import PREFECT_API_URL
from .server import mcp


def get_flow_run_url(flow_run_id: str) -> str:
    base_url = PREFECT_API_URL.replace("/api", "")
    return f"{base_url}/flow-runs/{flow_run_id}"


@mcp.tool
async def get_flow_runs(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    flow_name: Optional[str] = None,
    state_type: Optional[str] = None,
    state_name: Optional[str] = None,
    deployment_id: Optional[str] = None,
    tags: Optional[List[str]] = None,
    start_time_before: Optional[str] = None,
    start_time_after: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of flow runs with optional filtering.
    
    Args:
        limit: Maximum number of flow runs to return
        offset: Number of flow runs to skip
        flow_name: Filter by flow name
        state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED")
        state_name: Filter by state name
        deployment_id: Filter by deployment ID
        tags: Filter by tags
        start_time_before: ISO formatted datetime string
        start_time_after: ISO formatted datetime string
        
    Returns:
        A list of flow runs with their details
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {}
        if flow_name:
            filters["flow_name"] = {"like_": f"%{flow_name}%"}
        if state_type:
            filters["state"] = {"type": {"any_": [state_type.upper()]}}
        if state_name:
            filters["state"] = {"name": {"any_": [state_name]}}
        if deployment_id:
            filters["deployment_id"] = {"eq_": UUID(deployment_id)}
        if tags:
            filters["tags"] = {"all_": tags}
        if start_time_after:
            filters["start_time"] = {"ge_": start_time_after}
        if start_time_before:
            if "start_time" in filters:
                filters["start_time"]["le_"] = start_time_before
            else:
                filters["start_time"] = {"le_": start_time_before}
        
        flow_runs = await client.read_flow_runs(
            limit=limit,
            offset=offset,
            **filters
        )
        
        # Add UI links to each flow run
        flow_runs_result = {
            "flow_runs": [
                {
                    **flow_run.dict(),
                    "ui_url": get_flow_run_url(str(flow_run.id))
                }
                for flow_run in flow_runs
            ]
        }
        
        return [types.TextContent(type="text", text=str(flow_runs_result))]


@mcp.tool
async def get_flow_run(
    flow_run_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get details of a specific flow run by ID.
    
    Args:
        flow_run_id: The flow run UUID
        
    Returns:
        Flow run details
    """
    async with get_client() as client:
        flow_run = await client.read_flow_run(UUID(flow_run_id))
        
        # Add UI link
        flow_run_dict = flow_run.dict()
        flow_run_dict["ui_url"] = get_flow_run_url(flow_run_id)
        
        return [types.TextContent(type="text", text=str(flow_run_dict))]


@mcp.tool
async def get_flow_runs_by_flow(
    flow_id: str,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    state_type: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get flow runs for a specific flow.
    
    Args:
        flow_id: The flow UUID
        limit: Maximum number of flow runs to return
        offset: Number of flow runs to skip
        state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED")
        
    Returns:
        A list of flow runs for the specified flow
    """
    async with get_client() as client:
        # Build filter parameters
        filters = {"flow_id": {"eq_": UUID(flow_id)}}
        if state_type:
            filters["state"] = {"type": {"any_": [state_type.upper()]}}
        
        flow_runs = await client.read_flow_runs(
            limit=limit,
            offset=offset,
            **filters
        )
        
        # Add UI links to each flow run
        flow_runs_result = {
            "flow_runs": [
                {
                    **flow_run.dict(),
                    "ui_url": get_flow_run_url(str(flow_run.id))
                }
                for flow_run in flow_runs
            ]
        }
        
        return [types.TextContent(type="text", text=str(flow_runs_result))]


@mcp.tool
async def restart_flow_run(
    flow_run_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Restart a flow run.
    
    Args:
        flow_run_id: The flow run UUID
        
    Returns:
        Details of the new flow run
    """
    async with get_client() as client:
        flow_run_id_uuid = UUID(flow_run_id)
        new_flow_run = await client.create_flow_run_from_flow_run(flow_run_id_uuid)
        
        new_flow_run_dict = new_flow_run.dict()
        new_flow_run_dict["ui_url"] = get_flow_run_url(str(new_flow_run.id))
        
        return [types.TextContent(type="text", text=str(new_flow_run_dict))]


@mcp.tool
async def cancel_flow_run(
    flow_run_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Cancel a flow run.
    
    Args:
        flow_run_id: The flow run UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.set_flow_run_state(
            flow_run_id=UUID(flow_run_id),
            state=Cancelled(message="Cancelled via MCP")
        )
        
        return [types.TextContent(type="text", text=f"Flow run '{flow_run_id}' cancelled successfully.")]


@mcp.tool
async def delete_flow_run(
    flow_run_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a flow run.
    
    Args:
        flow_run_id: The flow run UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.delete_flow_run(UUID(flow_run_id))
        
        return [types.TextContent(type="text", text=f"Flow run '{flow_run_id}' deleted successfully.")]


@mcp.tool
async def set_flow_run_state(
    flow_run_id: str,
    state: str,
    message: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Set a flow run's state.
    
    Args:
        flow_run_id: The flow run UUID
        state: The new state to set (e.g., "SCHEDULED", "RUNNING", "COMPLETED", "FAILED")
        message: An optional message explaining the state change
        
    Returns:
        Result of the state change operation
    """
    async with get_client() as client:
        state_obj = None
        if state.upper() == "SCHEDULED":
            state_obj = Scheduled(message=message)
        elif state.upper() == "RUNNING":
            state_obj = Running(message=message)
        elif state.upper() == "COMPLETED":
            state_obj = Completed(message=message)
        elif state.upper() == "FAILED":
            state_obj = Failed(message=message)
        elif state.upper() == "PENDING":
            state_obj = Pending(message=message)
        elif state.upper() == "CANCELLED":
            state_obj = Cancelled(message=message)
        else:
            return [types.TextContent(
                type="text", 
                text=f"Invalid state '{state}'. Must be one of: SCHEDULED, RUNNING, COMPLETED, FAILED, PENDING, CANCELLED"
            )]
        
        result = await client.set_flow_run_state(
            flow_run_id=UUID(flow_run_id),
            state=state_obj
        )
        
        return [types.TextContent(type="text", text=str(result.dict()))]

```

--------------------------------------------------------------------------------
/src/mcp_prefect/concurrency_limits.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Callable, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client


def get_all_functions() -> list[tuple[Callable, str, str]]:
    return [
        (get_concurrency_limits, "get_concurrency_limits", "Get all concurrency limits"),
        (get_concurrency_limit, "get_concurrency_limit", "Get a concurrency limit by ID"),
        (get_concurrency_limit_by_tag, "get_concurrency_limit_by_tag", "Get a concurrency limit by tag"),
        (create_concurrency_limit, "create_concurrency_limit", "Create a concurrency limit"),
        (update_concurrency_limit, "update_concurrency_limit", "Update a concurrency limit"),
        (delete_concurrency_limit, "delete_concurrency_limit", "Delete a concurrency limit"),
        (increment_concurrency_limit, "increment_concurrency_limit", "Increment a concurrency limit"),
        (decrement_concurrency_limit, "decrement_concurrency_limit", "Decrement a concurrency limit"),
        (reset_concurrency_limit, "reset_concurrency_limit", "Reset a concurrency limit by tag"),
    ]


async def get_concurrency_limits() -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get all concurrency limits.
    
    Returns:
        A list of concurrency limits
    """
    async with get_client() as client:
        try:
            # Prefect API doesn't support limit/offset on concurrency limits endpoint directly
            # Use the filter endpoint instead
            concurrency_limits = await client.read_concurrency_limits_filter()
            
            limits_result = {
                "concurrency_limits": [limit.dict() for limit in concurrency_limits]
            }
            
            return [types.TextContent(type="text", text=str(limits_result))]
        except Exception as e:
            error_message = f"Error fetching concurrency limits: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_concurrency_limit(
    limit_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a concurrency limit by ID.
    
    Args:
        limit_id: The concurrency limit UUID
        
    Returns:
        Concurrency limit details
    """
    async with get_client() as client:
        try:
            concurrency_limit = await client.read_concurrency_limit(UUID(limit_id))
            
            return [types.TextContent(type="text", text=str(concurrency_limit.dict()))]
        except Exception as e:
            error_message = f"Error fetching concurrency limit {limit_id}: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_concurrency_limit_by_tag(
    tag: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a concurrency limit by tag.
    
    Args:
        tag: The concurrency limit tag
        
    Returns:
        Concurrency limit details
    """
    async with get_client() as client:
        try:
            concurrency_limit = await client.read_concurrency_limit_by_tag(tag)
            
            return [types.TextContent(type="text", text=str(concurrency_limit.dict()))]
        except Exception as e:
            error_message = f"Error fetching concurrency limit for tag '{tag}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def create_concurrency_limit(
    tag: str,
    concurrency_limit: int,
    active: Optional[bool] = True,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a concurrency limit.
    
    Args:
        tag: The tag to limit
        concurrency_limit: The maximum allowed concurrency
        active: Whether the limit is active
        
    Returns:
        Details of the created concurrency limit
    """
    async with get_client() as client:
        try:
            limit = await client.create_concurrency_limit(
                tag=tag,
                concurrency_limit=concurrency_limit,
                active=active
            )
            
            return [types.TextContent(type="text", text=str(limit.dict()))]
        except Exception as e:
            error_message = f"Error creating concurrency limit: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def update_concurrency_limit(
    limit_id: str,
    concurrency_limit: Optional[int] = None,
    active: Optional[bool] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a concurrency limit.
    
    Args:
        limit_id: The concurrency limit UUID
        concurrency_limit: The new maximum allowed concurrency
        active: Whether the limit is active
        
    Returns:
        Details of the updated concurrency limit
    """
    async with get_client() as client:
        try:
            # Prepare update data
            update_data = {}
            if concurrency_limit is not None:
                update_data["concurrency_limit"] = concurrency_limit
            if active is not None:
                update_data["active"] = active
            
            updated_limit = await client.update_concurrency_limit(
                id=UUID(limit_id),
                **update_data
            )
            
            return [types.TextContent(type="text", text=str(updated_limit.dict()))]
        except Exception as e:
            error_message = f"Error updating concurrency limit {limit_id}: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def delete_concurrency_limit(
    limit_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a concurrency limit by ID.
    
    Args:
        limit_id: The concurrency limit UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        try:
            await client.delete_concurrency_limit(UUID(limit_id))
            
            return [types.TextContent(type="text", text=f"Concurrency limit '{limit_id}' deleted successfully.")]
        except Exception as e:
            error_message = f"Error deleting concurrency limit {limit_id}: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def increment_concurrency_limit(
    tag: str,
    delta: int = 1,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Increment a concurrency limit by tag.
    
    Args:
        tag: The concurrency limit tag
        delta: Amount to increment by (default 1)
        
    Returns:
        Updated concurrency limit details
    """
    async with get_client() as client:
        try:
            updated_limit = await client.increment_concurrency_limit(
                tag=tag,
                delta=delta
            )
            
            return [types.TextContent(type="text", text=str(updated_limit.dict()))]
        except Exception as e:
            error_message = f"Error incrementing concurrency limit for tag '{tag}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def decrement_concurrency_limit(
    tag: str,
    delta: int = 1,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Decrement a concurrency limit by tag.
    
    Args:
        tag: The concurrency limit tag
        delta: Amount to decrement by (default 1)
        
    Returns:
        Updated concurrency limit details
    """
    async with get_client() as client:
        try:
            updated_limit = await client.decrement_concurrency_limit(
                tag=tag,
                delta=delta
            )
            
            return [types.TextContent(type="text", text=str(updated_limit.dict()))]
        except Exception as e:
            error_message = f"Error decrementing concurrency limit for tag '{tag}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def reset_concurrency_limit(
    tag: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Reset a concurrency limit by tag, setting its current count to 0.
    
    Args:
        tag: The concurrency limit tag
        
    Returns:
        Updated concurrency limit details
    """
    async with get_client() as client:
        try:
            updated_limit = await client.reset_concurrency_limit(tag=tag)
            
            return [types.TextContent(type="text", text=str(updated_limit.dict()))]
        except Exception as e:
            error_message = f"Error resetting concurrency limit for tag '{tag}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]
```

--------------------------------------------------------------------------------
/src/mcp_prefect/deployment.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client

from .envs import PREFECT_API_URL
from .server import mcp


def get_deployment_url(deployment_id: str) -> str:
    base_url = PREFECT_API_URL.replace("/api", "")
    return f"{base_url}/deployments/{deployment_id}"


@mcp.tool
async def get_deployments(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    flow_name: Optional[str] = None,
    name: Optional[str] = None,
    tags: Optional[List[str]] = None,
    is_schedule_active: Optional[bool] = None,
    work_queue_name: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a list of deployments with optional filtering.
    
    Args:
        limit: Maximum number of deployments to return
        offset: Number of deployments to skip
        flow_name: Filter by flow name
        name: Filter by deployment name
        tags: Filter by tags
        is_schedule_active: Filter by schedule active status
        work_queue_name: Filter by work queue name
        
    Returns:
        A list of deployments with their details
    """
    try:
        async with get_client() as client:
            # Build deployment filter
            deployment_filter = None
            if any([name, tags, is_schedule_active, work_queue_name]):
                from prefect.client.schemas.filters import DeploymentFilter
                
                filter_dict = {}
                if name:
                    filter_dict["name"] = {"like_": f"%{name}%"}
                if tags:
                    filter_dict["tags"] = {"all_": tags}
                if is_schedule_active is not None:
                    filter_dict["is_schedule_active"] = {"eq_": is_schedule_active}
                if work_queue_name:
                    filter_dict["work_queue_name"] = {"eq_": work_queue_name}
                
                deployment_filter = DeploymentFilter(**filter_dict)
            
            # Build flow filter if flow_name is specified
            flow_filter = None
            if flow_name:
                from prefect.client.schemas.filters import FlowFilter
                
                flow_filter = FlowFilter(name={"like_": f"%{flow_name}%"})
            
            # Query using proper filter objects
            deployments = await client.read_deployments(
                deployment_filter=deployment_filter,
                flow_filter=flow_filter,
                limit=limit,
                offset=offset,
            )
            
            # Add UI links to each deployment
            deployments_result = {
                "deployments": [
                    {
                        **deployment.model_dump(),
                        "ui_url": get_deployment_url(str(deployment.id))
                    }
                    for deployment in deployments
                ]
            }
            
            return [types.TextContent(type="text", text=str(deployments_result))]
    except Exception as e:
        # Add proper error handling
        return [types.TextContent(type="text", text=str({"error": str(e)}))]


@mcp.tool
async def get_deployment(
    deployment_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get details of a specific deployment by ID.
    
    Args:
        deployment_id: The deployment UUID
        
    Returns:
        Deployment details
    """
    async with get_client() as client:
        deployment = await client.read_deployment(UUID(deployment_id))
        
        # Add UI link
        deployment_dict = deployment.model_dump()
        deployment_dict["ui_url"] = get_deployment_url(deployment_id)
        
        return [types.TextContent(type="text", text=str(deployment_dict))]


@mcp.tool
async def create_flow_run_from_deployment(
    deployment_id: str,
    parameters: Optional[Dict[str, Any]] = None,
    name: Optional[str] = None,
    tags: Optional[List[str]] = None,
    idempotency_key: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a flow run from a deployment.
    
    Args:
        deployment_id: The deployment UUID
        parameters: Optional parameters to pass to the flow run
        name: Optional name for the flow run
        tags: Optional tags for the flow run
        idempotency_key: Optional idempotency key
        
    Returns:
        Details of the created flow run
    """
    async with get_client() as client:
        parameters = parameters or {}
        
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=UUID(deployment_id),
            parameters=parameters,
            name=name,
            tags=tags,
            idempotency_key=idempotency_key,
        )
        
        # Add URL
        flow_run_dict = flow_run.model_dump()
        flow_run_dict["ui_url"] = PREFECT_API_URL.replace("/api", "") + f"/flow-runs/{flow_run.id}"
        
        return [types.TextContent(type="text", text=str(flow_run_dict))]


@mcp.tool
async def delete_deployment(
    deployment_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a deployment by ID.
    
    Args:
        deployment_id: The deployment UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.delete_deployment(UUID(deployment_id))
        
        return [types.TextContent(type="text", text=f"Deployment '{deployment_id}' deleted successfully.")]


@mcp.tool
async def update_deployment(
    deployment_id: str,
    name: Optional[str] = None,
    description: Optional[str] = None,
    version: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parameters: Optional[Dict[str, Any]] = None,
    work_queue_name: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a deployment.
    
    Args:
        deployment_id: The deployment UUID
        name: New name for the deployment
        description: New description
        version: New version
        tags: New tags
        parameters: New parameters
        work_queue_name: New work queue name
        
    Returns:
        Details of the updated deployment
    """
    async with get_client() as client:
        # Get current deployment
        deployment = await client.read_deployment(UUID(deployment_id))
        
        # Prepare update data
        update_data = {}
        if name is not None:
            update_data["name"] = name
        if description is not None:
            update_data["description"] = description
        if version is not None:
            update_data["version"] = version
        if tags is not None:
            update_data["tags"] = tags
        if parameters is not None:
            update_data["parameters"] = parameters
        if work_queue_name is not None:
            update_data["work_queue_name"] = work_queue_name
        
        # Update deployment
        updated_deployment = await client.update_deployment(
            deployment_id=UUID(deployment_id),
            **update_data
        )
        
        # Add UI link
        updated_deployment_dict = updated_deployment.model_dump()
        updated_deployment_dict["ui_url"] = get_deployment_url(deployment_id)
        
        return [types.TextContent(type="text", text=str(updated_deployment_dict))]


@mcp.tool
async def get_deployment_schedule(
    deployment_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a deployment's schedule.
    
    Args:
        deployment_id: The deployment UUID
        
    Returns:
        Schedule details
    """
    async with get_client() as client:
        schedule = await client.read_deployment_schedule(UUID(deployment_id))
        
        return [types.TextContent(type="text", text=str(schedule.model_dump()))]


@mcp.tool
async def set_deployment_schedule(
    deployment_id: str,
    cron: Optional[str] = None,
    interval_seconds: Optional[int] = None,
    anchor_date: Optional[str] = None,
    timezone: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Set a deployment's schedule.
    
    Args:
        deployment_id: The deployment UUID
        cron: Cron expression for the schedule
        interval_seconds: Alternative to cron - interval in seconds
        anchor_date: Required for interval schedules - the anchor date
        timezone: Timezone for the schedule
        
    Returns:
        Updated schedule details
    """
    async with get_client() as client:
        # Check schedule type
        if cron is not None and interval_seconds is not None:
            return [types.TextContent(
                type="text",
                text="Cannot specify both cron and interval_seconds. Choose one schedule type."
            )]
        
        if cron is not None:
            # Set cron schedule
            schedule = await client.set_deployment_schedule(
                deployment_id=UUID(deployment_id),
                schedule={"cron": cron, "timezone": timezone}
            )
        elif interval_seconds is not None:
            # Set interval schedule
            if not anchor_date:
                return [types.TextContent(
                    type="text",
                    text="anchor_date is required for interval schedules"
                )]
            
            schedule = await client.set_deployment_schedule(
                deployment_id=UUID(deployment_id),
                schedule={
                    "interval": interval_seconds,
                    "anchor_date": anchor_date,
                    "timezone": timezone
                }
            )
        else:
            return [types.TextContent(
                type="text",
                text="Must specify either cron or interval_seconds to set a schedule"
            )]
        
        return [types.TextContent(type="text", text=str(schedule.model_dump()))]


@mcp.tool
async def pause_deployment_schedule(
    deployment_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Pause a deployment's schedule.
    
    Args:
        deployment_id: The deployment UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.pause_deployment_schedule(UUID(deployment_id))
        
        return [types.TextContent(type="text", text=f"Schedule for deployment '{deployment_id}' paused successfully.")]


@mcp.tool
async def resume_deployment_schedule(
    deployment_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Resume a deployment's schedule.
    
    Args:
        deployment_id: The deployment UUID
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        await client.resume_deployment_schedule(UUID(deployment_id))
        
        return [types.TextContent(type="text", text=f"Schedule for deployment '{deployment_id}' resumed successfully.")]

```

--------------------------------------------------------------------------------
/src/mcp_prefect/work_pools.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Callable, Dict, List, Optional, Union
from uuid import UUID

import mcp.types as types
from prefect import get_client


def get_all_functions() -> list[tuple[Callable, str, str]]:
    return [
        (get_work_pools, "get_work_pools", "Get all work pools"),
        (get_work_pool, "get_work_pool", "Get a work pool by name"),
        (create_work_pool, "create_work_pool", "Create a work pool"),
        (update_work_pool, "update_work_pool", "Update a work pool"),
        (delete_work_pool, "delete_work_pool", "Delete a work pool"),
        (get_scheduled_flow_runs, "get_scheduled_flow_runs", "Get scheduled flow runs for a work pool"),
        (get_work_pool_queues, "get_work_pool_queues", "Get work queues for a work pool"),
        (get_work_pool_queue, "get_work_pool_queue", "Get a specific work queue in a work pool"),
        (create_work_pool_queue, "create_work_pool_queue", "Create a work queue in a work pool"),
        (update_work_pool_queue, "update_work_pool_queue", "Update a work queue in a work pool"),
        (delete_work_pool_queue, "delete_work_pool_queue", "Delete a work queue from a work pool"),
    ]


async def get_work_pools(
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get all work pools with optional pagination.
    
    Args:
        limit: Maximum number of work pools to return
        offset: Number of work pools to skip
        
    Returns:
        A list of work pools
    """
    async with get_client() as client:
        try:
            # The method name might be different based on Prefect's API
            work_pools = await client.read_work_pools(
                limit=limit,
                offset=offset
            )
            
            work_pools_result = {
                "work_pools": [work_pool.dict() for work_pool in work_pools]
            }
            
            return [types.TextContent(type="text", text=str(work_pools_result))]
        except Exception as e:
            error_message = f"Error fetching work pools: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_work_pool(
    name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a work pool by name.
    
    Args:
        name: The work pool name
        
    Returns:
        Work pool details
    """
    async with get_client() as client:
        try:
            work_pool = await client.read_work_pool(name)
            
            return [types.TextContent(type="text", text=str(work_pool.dict()))]
        except Exception as e:
            error_message = f"Error fetching work pool '{name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def create_work_pool(
    name: str,
    type: str,
    description: Optional[str] = None,
    base_job_template: Optional[Dict] = None,
    is_paused: Optional[bool] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a work pool.
    
    Args:
        name: The name for the work pool
        type: The type of work pool (e.g., 'kubernetes', 'process', etc.)
        description: Optional description
        base_job_template: Optional base job template as JSON
        is_paused: Whether the work pool should be paused
        
    Returns:
        Details of the created work pool
    """
    async with get_client() as client:
        try:
            work_pool = await client.create_work_pool(
                name=name,
                work_pool_type=type,
                description=description,
                base_job_template=base_job_template or {},
                is_paused=is_paused
            )
            
            return [types.TextContent(type="text", text=str(work_pool.dict()))]
        except Exception as e:
            error_message = f"Error creating work pool: {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def update_work_pool(
    name: str,
    description: Optional[str] = None,
    base_job_template: Optional[Dict] = None,
    is_paused: Optional[bool] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a work pool.
    
    Args:
        name: The work pool name
        description: New description
        base_job_template: New base job template as JSON
        is_paused: New paused status
        
    Returns:
        Details of the updated work pool
    """
    async with get_client() as client:
        try:
            # Prepare update data
            update_data = {}
            if description is not None:
                update_data["description"] = description
            if base_job_template is not None:
                update_data["base_job_template"] = base_job_template
            if is_paused is not None:
                update_data["is_paused"] = is_paused
            
            work_pool = await client.update_work_pool(
                name=name,
                **update_data
            )
            
            return [types.TextContent(type="text", text=str(work_pool.dict()))]
        except Exception as e:
            error_message = f"Error updating work pool '{name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def delete_work_pool(
    name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a work pool by name.
    
    Args:
        name: The work pool name
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        try:
            await client.delete_work_pool(name)
            
            return [types.TextContent(type="text", text=f"Work pool '{name}' deleted successfully.")]
        except Exception as e:
            error_message = f"Error deleting work pool '{name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_scheduled_flow_runs(
    work_pool_name: str,
    limit: Optional[int] = None,
    scheduled_before: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get scheduled flow runs for a work pool.
    
    Args:
        work_pool_name: The work pool name
        limit: Maximum number of flow runs to return
        scheduled_before: ISO formatted timestamp
        
    Returns:
        A list of scheduled flow runs
    """
    async with get_client() as client:
        try:
            flow_runs = await client.get_scheduled_flow_runs(
                work_pool_name=work_pool_name,
                limit=limit,
                scheduled_before=scheduled_before
            )
            
            flow_runs_result = {
                "scheduled_flow_runs": [flow_run.dict() for flow_run in flow_runs]
            }
            
            return [types.TextContent(type="text", text=str(flow_runs_result))]
        except Exception as e:
            error_message = f"Error fetching scheduled flow runs for work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_work_pool_queues(
    work_pool_name: str,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get work queues for a work pool.
    
    Args:
        work_pool_name: The work pool name
        limit: Maximum number of queues to return
        offset: Number of queues to skip
        
    Returns:
        A list of work queues in the work pool
    """
    async with get_client() as client:
        try:
            queues = await client.read_work_pool_queues(
                work_pool_name=work_pool_name,
                limit=limit,
                offset=offset
            )
            
            queues_result = {
                "work_pool_queues": [queue.dict() for queue in queues]
            }
            
            return [types.TextContent(type="text", text=str(queues_result))]
        except Exception as e:
            error_message = f"Error fetching work queues for work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def get_work_pool_queue(
    work_pool_name: str,
    queue_name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Get a specific work queue in a work pool.
    
    Args:
        work_pool_name: The work pool name
        queue_name: The work queue name
        
    Returns:
        Work queue details
    """
    async with get_client() as client:
        try:
            queue = await client.read_work_pool_queue(
                work_pool_name=work_pool_name,
                queue_name=queue_name
            )
            
            return [types.TextContent(type="text", text=str(queue.dict()))]
        except Exception as e:
            error_message = f"Error fetching work queue '{queue_name}' in work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def create_work_pool_queue(
    work_pool_name: str,
    name: str,
    description: Optional[str] = None,
    is_paused: Optional[bool] = None,
    concurrency_limit: Optional[int] = None,
    priority: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Create a work queue in a work pool.
    
    Args:
        work_pool_name: The work pool name
        name: The name for the work queue
        description: Optional description
        is_paused: Whether the queue is paused
        concurrency_limit: Optional concurrency limit
        priority: Optional priority
        
    Returns:
        Details of the created work queue
    """
    async with get_client() as client:
        try:
            queue = await client.create_work_pool_queue(
                work_pool_name=work_pool_name,
                name=name,
                description=description,
                is_paused=is_paused,
                concurrency_limit=concurrency_limit,
                priority=priority
            )
            
            return [types.TextContent(type="text", text=str(queue.dict()))]
        except Exception as e:
            error_message = f"Error creating work queue in work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def update_work_pool_queue(
    work_pool_name: str,
    queue_name: str,
    description: Optional[str] = None,
    is_paused: Optional[bool] = None,
    concurrency_limit: Optional[int] = None,
    priority: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Update a work queue in a work pool.
    
    Args:
        work_pool_name: The work pool name
        queue_name: The work queue name
        description: New description
        is_paused: New paused status
        concurrency_limit: New concurrency limit
        priority: New priority
        
    Returns:
        Details of the updated work queue
    """
    async with get_client() as client:
        try:
            # Prepare update data
            update_data = {}
            if description is not None:
                update_data["description"] = description
            if is_paused is not None:
                update_data["is_paused"] = is_paused
            if concurrency_limit is not None:
                update_data["concurrency_limit"] = concurrency_limit
            if priority is not None:
                update_data["priority"] = priority
            
            queue = await client.update_work_pool_queue(
                work_pool_name=work_pool_name,
                queue_name=queue_name,
                **update_data
            )
            
            return [types.TextContent(type="text", text=str(queue.dict()))]
        except Exception as e:
            error_message = f"Error updating work queue '{queue_name}' in work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]


async def delete_work_pool_queue(
    work_pool_name: str,
    queue_name: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
    """
    Delete a work queue from a work pool.
    
    Args:
        work_pool_name: The work pool name
        queue_name: The work queue name
        
    Returns:
        Confirmation message
    """
    async with get_client() as client:
        try:
            await client.delete_work_pool_queue(
                work_pool_name=work_pool_name,
                queue_name=queue_name
            )
            
            return [types.TextContent(type="text", text=f"Work queue '{queue_name}' deleted from work pool '{work_pool_name}' successfully.")]
        except Exception as e:
            error_message = f"Error deleting work queue '{queue_name}' from work pool '{work_pool_name}': {str(e)}"
            return [types.TextContent(type="text", text=error_message)]
```

--------------------------------------------------------------------------------
/test_scripts/test_prefect.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import ast
import asyncio
import json
import logging
import os
import re
import sys
from functools import partial
from typing import Any, Dict, List, Optional, Union
from uuid import uuid4

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

# This matches the imports in your code snippets
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
import mcp.types as types

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("prefect-mcp-test")


def write_error(msg):
    logger.error(f"\n\n#################\n{msg}")


async def message_handler(
    message: Any,
) -> None:
    """Handle incoming messages from the server."""
    if isinstance(message, Exception):
        write_error(f"Error: {message}")
        return
    
    logger.info(f"\nReceived message type: {type(message).__name__}\n")


async def test_prefect_mcp():
    """Test connecting to the Prefect MCP server and calling tools."""
    # Get the server URL from environment or use default
    server_url = os.environ.get("MCP_URL", "http://localhost:8000")
    
    # Ensure URL points to the SSE endpoint
    if not server_url.endswith("/sse"):
        server_url = server_url.rstrip("/") + "/sse"
    
    logger.info(f"Connecting to Prefect MCP server at {server_url}")
    
    try:
        # Connect to the server using sse_client from your code
        async with sse_client(server_url) as (read_stream, write_stream):
            # Create a ClientSession with your message handler
            async with ClientSession(
                read_stream, 
                write_stream, 
                message_handler=message_handler
            ) as session:
                # Initialize the connection
                logger.info("Initializing MCP session...")
                init_result = await session.initialize()
                
                server_info = init_result.serverInfo
                logger.info(f"Connected to {server_info.name} v{server_info.version}")
                logger.info(f"Protocol version: {init_result.protocolVersion}")
                logger.info(f"Server capabilities: {init_result.capabilities}")
                
                # List available tools
                logger.info("Listing available tools...")
                tools_result = await session.list_tools()
                
                # Map tools by name for easy lookup
                tool_map = {tool.name: tool for tool in tools_result.tools}
                
                # Print available tools
                for tool in tools_result.tools:
                    logger.info(f"Tool: {tool.name} - {tool.description}")

                # Test categories for better organization
                await test_health(session, tool_map)
                await test_flows(session, tool_map)
                await test_deployments(session, tool_map)
                await test_flow_runs(session, tool_map)
                await test_task_runs(session, tool_map)
                await test_workspaces(session, tool_map)
                await test_blocks(session, tool_map)
                await test_variables(session, tool_map)
                await test_work_queues(session, tool_map)
                
                logger.info("All tests completed successfully")
                
    except Exception as e:
        write_error(f"Test failed: {e}")
        raise


async def test_health(session: ClientSession, tool_map: Dict[str, Any]):
    """Test health check endpoint."""
    if "get_health" in tool_map:
        try:
            logger.info("Testing get_health tool...")
            health_result = await session.call_tool("get_health", {})
            for content in health_result.content:
                if content.type == "text":
                    logger.info(f"Health check result: {content.text}")
        except Exception as e:
            write_error(f"Error calling get_health: {e}")


async def test_flows(session: ClientSession, tool_map: Dict[str, Any]):
    """Test flow-related endpoints."""
    if "get_flows" in tool_map:
        try:
            logger.info("Testing get_flows tool...")
            flows_result = await session.call_tool("get_flows", {"limit": 5})
            for content in flows_result.content:
                if content.type == "text":
                    logger.info(f"Flows result: {content.text[:200]}...")
            
            # Try with filter
            logger.info("Testing get_flows with filter...")
            filtered_flows = await session.call_tool("get_flows", {"limit": 3, "flow_name": "test"})
            for content in filtered_flows.content:
                if content.type == "text":
                    logger.info(f"Filtered flows result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_flows: {e}")

    # Test get_flow if available
    if "get_flow" in tool_map:
        try:
            # Get a list of flows first to get a valid ID
            flows_result = await session.call_tool("get_flows", {"limit": 1})
            flow_id = None
            
            # Extract a flow ID if possible
            for content in flows_result.content:
                if content.type == "text":
                    try:
                        flows_data = eval(content.text)
                        if flows_data["flows"]:
                            flow_id = flows_data["flows"][0]["id"]
                    except:
                        pass
            
            if flow_id:
                logger.info(f"Testing get_flow with ID: {flow_id}...")
                flow_result = await session.call_tool("get_flow", {"flow_id": flow_id})
                for content in flow_result.content:
                    if content.type == "text":
                        logger.info(f"Flow details result: {content.text[:200]}...")
            else:
                logger.info("Skipping get_flow test - no flows available")
        except Exception as e:
            write_error(f"Error calling get_flow: {e}")


async def test_deployments(session: ClientSession, tool_map: Dict[str, Any]):
    """Test deployment-related endpoints."""
    if "get_deployments" in tool_map:
        try:
            logger.info("Testing get_deployments tool...")
            deployments_result = await session.call_tool("get_deployments", {"limit": 5})
            for content in deployments_result.content:
                if content.type == "text":
                    logger.info(f"Deployments result: {content.text[:200]}...")
            
            # Try with filter
            logger.info("Testing get_deployments with filter...")
            filtered_deployments = await session.call_tool("get_deployments", {"limit": 3, "flow_name": "test"})
            for content in filtered_deployments.content:
                if content.type == "text":
                    logger.info(f"Filtered deployments result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_deployments: {e}")

    # Test get_deployment if available
    if "get_deployment" in tool_map:
        try:
            # Get a list of deployments first to get a valid ID
            deployments_result = await session.call_tool("get_deployments", {"limit": 1})
            deployment_id = None
            
            # Extract a deployment ID if possible
            for content in deployments_result.content:
                if content.type == "text":
                    try:
                        deployments_data = eval(content.text)
                        if deployments_data["deployments"]:
                            deployment_id = deployments_data["deployments"][0]["id"]
                    except:
                        pass
            
            if deployment_id:
                logger.info(f"Testing get_deployment with ID: {deployment_id}...")
                deployment_result = await session.call_tool("get_deployment", {"deployment_id": deployment_id})
                for content in deployment_result.content:
                    if content.type == "text":
                        logger.info(f"Deployment details result: {content.text[:200]}...")
            else:
                logger.info("Skipping get_deployment test - no deployments available")
        except Exception as e:
            write_error(f"Error calling get_deployment: {e}")


async def test_flow_runs(session: ClientSession, tool_map: Dict[str, Any]):
    """Test flow run-related endpoints."""
    if "get_flow_runs" in tool_map:
        try:
            logger.info("Testing get_flow_runs tool...")
            flow_runs_result = await session.call_tool("get_flow_runs", {"limit": 5})
            for content in flow_runs_result.content:
                if content.type == "text":
                    logger.info(f"Flow runs result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_flow_runs: {e}")

    # Test get_flow_run if available and we have flow runs
    if "get_flow_run" in tool_map:
        try:
            # Get a list of flow runs first to get a valid ID
            flow_runs_result = await session.call_tool("get_flow_runs", {"limit": 1})
            flow_run_id = None
            
            # Extract a flow run ID if possible
            for content in flow_runs_result.content:
                if content.type == "text":
                    try:
                        flow_runs_data = eval(content.text)
                        if flow_runs_data["flow_runs"]:
                            flow_run_id = flow_runs_data["flow_runs"][0]["id"]
                    except:
                        pass
            
            if flow_run_id:
                logger.info(f"Testing get_flow_run with ID: {flow_run_id}...")
                flow_run_result = await session.call_tool("get_flow_run", {"flow_run_id": flow_run_id})
                for content in flow_run_result.content:
                    if content.type == "text":
                        logger.info(f"Flow run details result: {content.text[:200]}...")
            else:
                logger.info("Skipping get_flow_run test - no flow runs available")
        except Exception as e:
            write_error(f"Error calling get_flow_run: {e}")


async def test_task_runs(session: ClientSession, tool_map: Dict[str, Any]):
    """Test task run-related endpoints."""
    if "get_task_runs" in tool_map:
        try:
            logger.info("Testing get_task_runs tool...")
            task_runs_result = await session.call_tool("get_task_runs", {"limit": 5})
            for content in task_runs_result.content:
                if content.type == "text":
                    logger.info(f"Task runs result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_task_runs: {e}")


async def test_workspaces(session: ClientSession, tool_map: Dict[str, Any]):
    """Test workspace-related endpoints."""
    if "get_workspaces" in tool_map:
        try:
            logger.info("Testing get_workspaces tool (expect message about Cloud-only)...")
            workspaces_result = await session.call_tool("get_workspaces")
            for content in workspaces_result.content:
                if content.type == "text":
                    logger.info(f"Workspaces response: {content.text}")
        except Exception as e:
            write_error(f"Error calling get_workspaces: {e}")


async def test_blocks(session: ClientSession, tool_map: Dict[str, Any]):
    """Test block-related endpoints."""
    if "get_block_types" in tool_map:
        try:
            logger.info("Testing get_block_types tool...")
            block_types_result = await session.call_tool("get_block_types", {"limit": 5})
            for content in block_types_result.content:
                if content.type == "text":
                    logger.info(f"Block types result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_block_types: {e}")


async def test_variables(session: ClientSession, tool_map: Dict[str, Any]):
    """Test variable-related endpoints."""
    if "get_variables" in tool_map:
        try:
            logger.info("Testing get_variables tool...")
            variables_result = await session.call_tool("get_variables", {"limit": 5})
            for content in variables_result.content:
                if content.type == "text":
                    logger.info(f"Variables result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_variables: {e}")
            
    # Test create_variable and delete_variable if available
    if "create_variable" in tool_map and "delete_variable" in tool_map:
        try:
            # Create a test variable with a unique name
            test_var_name = f"test_var_{uuid4().hex[:8]}"
            logger.info(f"Testing create_variable with name: {test_var_name}...")
            
            create_result = await session.call_tool("create_variable", {
                "name": test_var_name,
                "value": json.dumps({"test": True, "created_by": "mcp_test"}),
                "tags": ["test", "mcp_test"]
            })
            
            for content in create_result.content:
                if content.type == "text":
                    logger.info(f"Create variable result: {content.text[:200]}...")
            
            # Now try to delete it
            logger.info(f"Testing delete_variable for name: {test_var_name}...")
            delete_result = await session.call_tool("delete_variable", {"name": test_var_name})
            
            for content in delete_result.content:
                if content.type == "text":
                    logger.info(f"Delete variable result: {content.text}")
        except Exception as e:
            write_error(f"Error testing variable creation/deletion: {e}")


async def test_work_queues(session: ClientSession, tool_map: Dict[str, Any]):
    """Test work queue-related endpoints."""
    if "get_work_queues" in tool_map:
        try:
            logger.info("Testing get_work_queues tool...")
            work_queues_result = await session.call_tool("get_work_queues", {"limit": 5})
            for content in work_queues_result.content:
                if content.type == "text":
                    logger.info(f"Work queues result: {content.text[:200]}...")
        except Exception as e:
            write_error(f"Error calling get_work_queues: {e}")
            
    # Test create_work_queue and delete_work_queue if available
    if "create_work_queue" in tool_map and "delete_work_queue" in tool_map:
        try:
            # Create a test work queue with a unique name
            test_queue_name = f"test_queue_{uuid4().hex[:8]}"
            logger.info(f"Testing create_work_queue with name: {test_queue_name}...")
            
            create_result = await session.call_tool("create_work_queue", {
                "name": test_queue_name,
                "description": "Test work queue created by MCP test"
            })
            
            work_queue_id = None
            for content in create_result.content:
                if content.type == "text":
                    logger.info(f"Create work queue result: {content.text[:200]}...")
                    try:
                        # Extract the UUID using regex pattern
                        uuid_match = re.search(r"'id': UUID\('([0-9a-f-]+)'\)", content.text)
                        if uuid_match:
                            work_queue_id = uuid_match.group(1)
                            logger.info(f"Extracted work queue ID: {work_queue_id}")
                        else:
                            logger.warning("Could not find work queue ID in response")
                    except Exception as e:
                        logger.error(f"Error extracting work queue ID: {e}")
            if work_queue_id:
                # Now try to delete it
                logger.info(f"Testing delete_work_queue for ID: {work_queue_id}...")
                delete_result = await session.call_tool("delete_work_queue", {"work_queue_id": work_queue_id})
                
                for content in delete_result.content:
                    if content.type == "text":
                        logger.info(f"Delete work queue result: {content.text}")
            else:
                logger.info("Skipping delete_work_queue test - couldn't get work queue ID")
        except Exception as e:
            write_error(f"Error testing work queue creation/deletion: {e}")


if __name__ == "__main__":
    # Run with asyncio backend
    anyio.run(test_prefect_mcp, backend="asyncio")
```