#
tokens: 3679/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── canon_camera.py
├── README.md
├── requirements.txt
└── server.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
.venv
.idea
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Canon Camera MCP

A minimal server for controlling Canon cameras via the Canon Camera Control API (CCAPI), using FastMCP for streamable HTTP transport.

#### Demo 🎥
[![IMAGE ALT TEXT](http://img.youtube.com/vi/59icGndauho/0.jpg)](http://www.youtube.com/watch?v=59icGndauho "Canon MCP Server Demo")

[LinkedIn Post](https://www.linkedin.com/posts/ishanjoshi99_claude-ai-canon-camera-ive-been-activity-7333390072735535104-Sl0b?utm_source=share&utm_medium=member_desktop&rcm=ACoAACE9cFEBBFrka0tZ6SOykeuUIa1qgqTv7WE)

## Features

- Control Canon cameras remotely via CCAPI.
- Expose camera functions over HTTP using FastMCP.
- Image compression and streaming support.

## Requirements

- Python 3.10+
- Canon camera with CCAPI enabled ([CCAPI activation guide](https://www.canon.com.au/apps/eos-digital-software-development-kit))
- See `requirements.txt` for Python dependencies.

## Setup

1. **Install dependencies:**
   ```bash
   pip install -r requirements.txt
   ```

2. **Activate CCAPI on your Canon camera:**
   - Follow the official [Canon CCAPI activation instructions](https://www.canon.com.au/apps/eos-digital-software-development-kit).

3. **Configure camera IP:**
   - Set the `CANON_IP` environment variable to your camera’s IP address, or pass it as an argument.

## Usage

To run the server with Claude Desktop Client

```json
{
  "mcpServers": {
    "Canon Camera Controller": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/dir",
        "run",
        "server.py"
      ],
      "env": {
        "CANON_IP": "192.168.0.111"
      }
    }
  }
}
```

Or with plain Python:

```bash
python server.py
```

## References

- Based on [laszewsk/canon-r7-ccapi](https://github.com/laszewsk/canon-r7-ccapi)

## Project Structure

- `canon_camera.py`: Canon camera CCAPI interface.
- `server.py`: FastMCP HTTP server exposing camera controls.
- `requirements.txt`: Python dependencies.


## Extending the project
The license terms of CCAPI access do not permit sharing the API reference. 

Once you have access, it's quite straightforward to get it working. 

You may also refer to the Canon CCAPI Feature [list](https://developercommunity.usa.canon.com/s/article/CCAPI-Function-List)


```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.4.26
charset-normalizer==3.4.2
click==8.2.1
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
markdown-it-py==3.0.0
mcp==1.9.1
mdurl==0.1.2
pillow==11.2.1
pydantic==2.11.5
pydantic-settings==2.9.1
pydantic_core==2.33.2
Pygments==2.19.1
python-dotenv==1.1.0
python-multipart==0.0.20
requests==2.32.3
rich==14.0.0
shellingham==1.5.4
sniffio==1.3.1
sse-starlette==2.3.5
starlette==0.46.2
typer==0.16.0
typer-cli==0.16.0
typing-inspection==0.4.1
typing_extensions==4.13.2
urllib3==2.4.0
uvicorn==0.34.2

```

--------------------------------------------------------------------------------
/canon_camera.py:
--------------------------------------------------------------------------------

```python
import base64
import os

import requests
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("canon-camera")


class CanonCamera:
    """Canon Camera CCAPI interface"""

    def __init__(self, ip: str = None, port: int = 8080):
        self.ip = ip or os.environ.get("CANON_IP", None)
        self.port = port
        self.base_url = f"http://{self.ip}:{self.port}"

    def _get(self, path: str) -> requests.Response:
        """Execute GET request"""
        url = f"{self.base_url}{path}"
        logger.info(f"GET: {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response

    def _put(self, path: str, data: dict) -> requests.Response:
        """Execute PUT request"""
        url = f"{self.base_url}{path}"
        logger.info(f"PUT: {url} <- {data}")
        response = requests.put(url, json=data, timeout=10)
        response.raise_for_status()
        return response

    def get_all_settings(self) -> dict:
        """Get all shooting settings"""
        response = self._get("/ccapi/ver100/shooting/settings")
        return response.json()

    def get_setting(self, setting_name: str) -> dict:
        """Get specific shooting setting"""
        response = self._get(f"/ccapi/ver100/shooting/settings/{setting_name}")
        return response.json()

    def set_setting(self, setting_name: str, value: str) -> dict:
        """Set specific shooting setting"""
        # First get current setting to validate
        current = self.get_setting(setting_name)

        if "ability" in current and value not in current["ability"]:
            raise ValueError(f"Invalid value '{value}' for {setting_name}. "
                             f"Available options: {current['ability']}")

        response = self._put(f"/ccapi/ver100/shooting/settings/{setting_name}",
                             {"value": value})
        response.raise_for_status()

        # Return updated setting info
        return {
            "setting": setting_name,
            "previous_value": current.get("value"),
            "new_value": value,
            "success": True
        }

    def init_live_view(self, liveviewsize="small", cameradisplay="keep"):
        try:
            url = f"{self.base_url}/ccapi/ver100/shooting/liveview/"
            body = {"liveviewsize": liveviewsize, "cameradisplay": cameradisplay}
            res = requests.post(url, json=body)
            return res.status_code
        except Exception as e:
            logger.error(f"Failed to init live view {e}")
            raise

    def get_liveview_image(self) -> str:
        """Get live view image as base64 string"""
        try:
            url = f"{self.base_url}/ccapi/ver100/shooting/liveview/flip"
            logger.info(f"Getting live view: {url}")
            response = requests.get(url, stream=True, timeout=15)
            response.raise_for_status()

            # Convert to base64
            image_data = f"{base64.b64encode(response.content).decode('utf-8')}\n"
            return image_data
        except Exception as e:
            logger.error(f"Failed to get live view: {e}")
            raise

```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Canon Camera MCP Server with FastMCP Streamable HTTP Transport
A minimal MCP server for controlling Canon cameras via CCAPI using FastMCP
"""
import io
import json
import os
import base64
import typing

import requests
import logging
from typing import Literal

from mcp.server.fastmcp import FastMCP, Image

from PIL import Image as PILImage

from canon_camera import CanonCamera

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("canon-camera-mcp")





def compress_image_to_target_size(pil_img, target_size_mb=1, format="JPEG"):
    """
    Compress a PIL image to approximately the target size in MB.
    < Written by ChatGPT >

    Args:
        pil_img: PIL Image object
        target_size_mb: Target size in megabytes (default: 1)
        format: Image format ("JPEG" or "PNG")

    Returns:
        bytes: Compressed image data
    """
    target_size_bytes = target_size_mb * 1024 * 1024  # Convert MB to bytes

    # Convert to RGB if saving as JPEG (JPEG doesn't support transparency)
    if format.upper() == "JPEG" and pil_img.mode in ("RGBA", "P"):
        pil_img = pil_img.convert("RGB")

    # Start with high quality and reduce if needed
    quality = 95
    min_quality = 10

    while quality >= min_quality:
        img_byte_arr = io.BytesIO()

        if format.upper() == "JPEG":
            pil_img.save(img_byte_arr, format="JPEG", quality=quality, optimize=True)
        else:
            # For PNG, use optimization
            pil_img.save(img_byte_arr, format="PNG", optimize=True)

        img_size = img_byte_arr.tell()

        if img_size <= target_size_bytes:
            return img_byte_arr.getvalue()

        # Reduce quality for next iteration
        quality -= 5

    # If still too large, resize the image
    return resize_and_compress_image(pil_img, target_size_bytes, format)


def resize_and_compress_image(pil_img, target_size_bytes, format="JPEG"):
    """
    Resize and compress image if quality reduction alone isn't enough.
    """
    original_width, original_height = pil_img.size
    scale_factor = 0.9  # Start by reducing size by 10%
    img_byte_arr = None

    while scale_factor > 0.1:  # Don't go below 10% of original size
        new_width = int(original_width * scale_factor)
        new_height = int(original_height * scale_factor)

        resized_img = pil_img.resize((new_width, new_height), PILImage.Resampling.LANCZOS)

        # Try with medium quality after resizing
        img_byte_arr = io.BytesIO()

        if format.upper() == "JPEG":
            resized_img.save(img_byte_arr, format="JPEG", quality=75, optimize=True)
        else:
            resized_img.save(img_byte_arr, format="PNG", optimize=True)

        img_size = img_byte_arr.tell()

        if img_size <= target_size_bytes:
            return img_byte_arr.getvalue()

        scale_factor -= 0.1

    # If still too large, return the smallest version
    return img_byte_arr.getvalue()





# Initialize camera and FastMCP server
camera = CanonCamera()

mcp = FastMCP("Canon Camera Controller")


@mcp.tool()
def get_camera_settings(setting: Literal["all", "av", "tv", "iso"]) -> str:
    """
    Get all camera shooting settings or a specific setting

    Args:
        setting: Specific setting to get (av, tv, iso, shootingmodedial) or 'all' for all settings
    """
    try:
        if setting == "all":
            result = camera.get_all_settings()
            # Filter the result to only the keys
            keys_to_keep = ["av", "tv", "iso", "shootingmodedial"]
            result = {key: result[key] for key in keys_to_keep if key in result}

        else:
            valid_settings = ["av", "tv", "iso", "shootingmodedial"]
            if setting not in valid_settings:
                raise ValueError(f"Invalid setting '{setting}'. Valid options: {valid_settings}")

            result = camera.get_setting(setting)
            result["setting_name"] = setting

        res = json.dumps(result, indent=2)
        return res

    except requests.exceptions.RequestException as e:
        logger.error(f"Camera communication error: {e}")
        return json.dumps({
            "success": False,
            "error": "camera_communication_error",
            "message": f"Failed to communicate with camera: {str(e)}"
        }, indent=2)

    except ValueError as e:
        logger.error(f"Invalid parameter: {e}")
        return json.dumps({
            "success": False,
            "error": "invalid_parameter",
            "message": str(e)
        }, indent=2)

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return json.dumps({
            "success": False,
            "error": "internal_error",
            "message": f"Unexpected error: {str(e)}"
        }, indent=2)


@mcp.tool()
def set_camera_setting(setting: str, value: str) -> str:
    """
    Set a camera shooting setting (aperture, shutter speed, or ISO)

    Args:
        setting: Setting to change (av, tv, iso)
        value: Value to set (must be from the setting's ability list)
    """
    try:
        valid_settings = ["av", "tv", "iso"]
        if setting not in valid_settings:
            raise ValueError(f"Invalid setting '{setting}'. Valid options: {valid_settings}")

        if not value:
            raise ValueError("Value is required")

        result = camera.set_setting(setting, value)
        return json.dumps(result, indent=2)

    except requests.exceptions.RequestException as e:
        logger.error(f"Camera communication error: {e}")
        return json.dumps({
            "success": False,
            "error": "camera_communication_error",
            "message": f"Failed to communicate with camera: {str(e)}"
        }, indent=2)

    except ValueError as e:
        logger.error(f"Invalid parameter: {e}")
        return json.dumps({
            "success": False,
            "error": "invalid_parameter",
            "message": str(e)
        }, indent=2)

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return json.dumps({
            "success": False,
            "error": "internal_error",
            "message": f"Unexpected error: {str(e)}"
        }, indent=2)


@mcp.tool()
def get_liveview() -> typing.Union[Image, str]:
    """
    Get current live view image from camera. Use this to cross check if the new settings have actually worked.

    Returns an image.
    """
    try:
        image_data_b64 = camera.get_liveview_image()
        image_data_bytes = base64.b64decode(image_data_b64)
        pil_img = PILImage.open(io.BytesIO(image_data_bytes))

        # Compress the image to ~1MB
        compressed_img_bytes = compress_image_to_target_size(pil_img, target_size_mb=1, format="JPEG")
        img = Image(data=compressed_img_bytes, format="jpeg")
        img_content = img.to_image_content()
        logger.info(f"Image content: {img_content}")
        logger.info(f"Compressed image size: {len(compressed_img_bytes) / (1024 * 1024):.2f} MB")

        return img

    except requests.exceptions.RequestException as e:
        logger.error(f"Camera communication error: {e}")
        return json.dumps({
            "success": False,
            "error": "camera_communication_error",
            "message": f"Failed to communicate with camera: {str(e)}"
        }, indent=2)

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return json.dumps({
            "success": False,
            "error": "internal_error",
            "message": f"Unexpected error: {str(e)}"
        }, indent=2)


def main():
    """Main entry point"""
    # Configuration
    host = os.environ.get("MCP_HOST", "localhost")
    port = int(os.environ.get("MCP_PORT", "3001"))

    logger.info(f"Starting Canon Camera MCP Server on {host}:{port}")
    logger.info(f"Canon Camera IP: {camera.ip}:{camera.port}")
    logger.info(f"MCP endpoint: http://{host}:{port}/")

    # Test camera connection on startup
    try:
        camera.get_all_settings()
        camera.init_live_view()
        logger.info("✓ Camera connection successful")
    except Exception as e:
        logger.warning(f"⚠ Camera connection failed: {e}")
        logger.info("Server will start anyway - camera connection will be retried on requests")

    # Run server with streamable HTTP transport
    # mcp.run(transport="streamable-http")
    mcp.run(transport="stdio")


if __name__ == "__main__":
    main()



```