This is page 1 of 4. Use http://codebase.md/arthurcolle/openai-mcp?page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── claude_code
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-312.pyc
│ │ └── mcp_server.cpython-312.pyc
│ ├── claude.py
│ ├── commands
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── serve.cpython-312.pyc
│ │ ├── client.py
│ │ ├── multi_agent_client.py
│ │ └── serve.py
│ ├── config
│ │ └── __init__.py
│ ├── examples
│ │ ├── agents_config.json
│ │ ├── claude_mcp_config.html
│ │ ├── claude_mcp_config.json
│ │ ├── echo_server.py
│ │ └── README.md
│ ├── lib
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ └── __init__.cpython-312.pyc
│ │ ├── context
│ │ │ └── __init__.py
│ │ ├── monitoring
│ │ │ ├── __init__.py
│ │ │ ├── __pycache__
│ │ │ │ ├── __init__.cpython-312.pyc
│ │ │ │ └── server_metrics.cpython-312.pyc
│ │ │ ├── cost_tracker.py
│ │ │ └── server_metrics.py
│ │ ├── providers
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ └── openai.py
│ │ ├── rl
│ │ │ ├── __init__.py
│ │ │ ├── grpo.py
│ │ │ ├── mcts.py
│ │ │ └── tool_optimizer.py
│ │ ├── tools
│ │ │ ├── __init__.py
│ │ │ ├── __pycache__
│ │ │ │ ├── __init__.cpython-312.pyc
│ │ │ │ ├── base.cpython-312.pyc
│ │ │ │ ├── file_tools.cpython-312.pyc
│ │ │ │ └── manager.cpython-312.pyc
│ │ │ ├── ai_tools.py
│ │ │ ├── base.py
│ │ │ ├── code_tools.py
│ │ │ ├── file_tools.py
│ │ │ ├── manager.py
│ │ │ └── search_tools.py
│ │ └── ui
│ │ ├── __init__.py
│ │ └── tool_visualizer.py
│ ├── mcp_server.py
│ ├── README_MCP_CLIENT.md
│ ├── README_MULTI_AGENT.md
│ └── util
│ └── __init__.py
├── claude.py
├── cli.py
├── data
│ └── prompt_templates.json
├── deploy_modal_mcp.py
├── deploy.sh
├── examples
│ ├── agents_config.json
│ └── echo_server.py
├── install.sh
├── mcp_modal_adapter.py
├── mcp_server.py
├── modal_mcp_server.py
├── README_modal_mcp.md
├── README.md
├── requirements.txt
├── setup.py
├── static
│ └── style.css
├── templates
│ └── index.html
└── web-client.html
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
venv
.aider*
```
--------------------------------------------------------------------------------
/claude_code/examples/README.md:
--------------------------------------------------------------------------------
```markdown
# Claude Code MCP Examples
This directory contains examples for using the Claude Code MCP client with different MCP servers.
## Echo Server
A simple server that provides two tools:
- `echo`: Echoes back any message sent to it
- `reverse`: Reverses any message sent to it
To run the echo server example:
1. Start the server:
```bash
python examples/echo_server.py
```
2. In a separate terminal, connect to it with the MCP client:
```bash
claude mcp-client examples/echo_server.py
```
3. Try these example queries:
- "Echo the phrase 'hello world'"
- "Can you reverse the text 'Claude is awesome'?"
## Multi-Agent Example
The `agents_config.json` file contains a configuration for a multi-agent setup with three specialized roles:
- **Researcher**: Focuses on finding and analyzing information
- **Coder**: Specializes in writing and debugging code
- **Critic**: Evaluates solutions and suggests improvements
To run the multi-agent example:
1. Start the echo server:
```bash
python examples/echo_server.py
```
2. In a separate terminal, launch the multi-agent client:
```bash
claude mcp-multi-agent examples/echo_server.py --config examples/agents_config.json
```
3. Try these example interactions:
- "I need to write a function that calculates the Fibonacci sequence"
- "/talk Researcher What are the applications of Fibonacci sequences?"
- "/talk Critic What are the efficiency concerns with recursive Fibonacci implementations?"
- "/agents" (to see all available agents)
- "/history" (to view the conversation history)
## Adding Your Own Examples
Feel free to create your own MCP servers by following these steps:
1. Create a new Python file in this directory
2. Import FastMCP: `from fastmcp import FastMCP`
3. Create a server instance: `my_server = FastMCP("Server Name", description="...")`
4. Define tools using the `@my_server.tool` decorator
5. Define resources using the `@my_server.resource` decorator
6. Run your server with `my_server.run()`
### Creating Custom Agent Configurations
To create your own agent configurations:
1. Create a JSON file with an array of agent definitions:
```json
[
{
"name": "AgentName",
"role": "agent specialization",
"model": "claude model to use",
"system_prompt": "Detailed instructions for the agent's behavior and role"
},
...
]
```
2. Launch the multi-agent client with your configuration:
```bash
claude mcp-multi-agent path/to/server.py --config path/to/your_config.json
```
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
[](https://mseep.ai/app/arthurcolle-openai-mcp)
# MCP Coding Assistant with support for OpenAI + other LLM Providers
A powerful Python recreation of Claude Code with enhanced real-time visualization, cost management, and Model Context Protocol (MCP) server capabilities. This tool provides a natural language interface for software development tasks with support for multiple LLM providers.


## Key Features
- **Multi-Provider Support:** Works with OpenAI, Anthropic, and other LLM providers
- **Model Context Protocol Integration:**
- Run as an MCP server for use with Claude Desktop and other clients
- Connect to any MCP server with the built-in MCP client
- Multi-agent synchronization for complex problem solving
- **Real-Time Tool Visualization:** See tool execution progress and results in real-time
- **Cost Management:** Track token usage and expenses with budget controls
- **Comprehensive Tool Suite:** File operations, search, command execution, and more
- **Enhanced UI:** Rich terminal interface with progress indicators and syntax highlighting
- **Context Optimization:** Smart conversation compaction and memory management
- **Agent Coordination:** Specialized agents with different roles can collaborate on tasks
## Installation
1. Clone this repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Create a `.env` file with your API keys:
```
# Choose one or more providers
OPENAI_API_KEY=your_openai_api_key_here
ANTHROPIC_API_KEY=your_anthropic_api_key_here
# Optional model selection
OPENAI_MODEL=gpt-4o
ANTHROPIC_MODEL=claude-3-opus-20240229
```
## Usage
### CLI Mode
Run the CLI with the default provider (determined from available API keys):
```bash
python claude.py chat
```
Specify a provider and model:
```bash
python claude.py chat --provider openai --model gpt-4o
```
Set a budget limit to manage costs:
```bash
python claude.py chat --budget 5.00
```
### MCP Server Mode
Run as a Model Context Protocol server:
```bash
python claude.py serve
```
Start in development mode with the MCP Inspector:
```bash
python claude.py serve --dev
```
Configure host and port:
```bash
python claude.py serve --host 0.0.0.0 --port 8000
```
Specify additional dependencies:
```bash
python claude.py serve --dependencies pandas numpy
```
Load environment variables from file:
```bash
python claude.py serve --env-file .env
```
### MCP Client Mode
Connect to an MCP server using Claude as the reasoning engine:
```bash
python claude.py mcp-client path/to/server.py
```
Specify a Claude model:
```bash
python claude.py mcp-client path/to/server.py --model claude-3-5-sonnet-20241022
```
Try the included example server:
```bash
# In terminal 1 - start the server
python examples/echo_server.py
# In terminal 2 - connect with the client
python claude.py mcp-client examples/echo_server.py
```
### Multi-Agent MCP Mode
Launch a multi-agent client with synchronized agents:
```bash
python claude.py mcp-multi-agent path/to/server.py
```
Use a custom agent configuration file:
```bash
python claude.py mcp-multi-agent path/to/server.py --config examples/agents_config.json
```
Example with the echo server:
```bash
# In terminal 1 - start the server
python examples/echo_server.py
# In terminal 2 - launch the multi-agent client
python claude.py mcp-multi-agent examples/echo_server.py --config examples/agents_config.json
```
## Available Tools
- **View:** Read files with optional line limits
- **Edit:** Modify files with precise text replacement
- **Replace:** Create or overwrite files
- **GlobTool:** Find files by pattern matching
- **GrepTool:** Search file contents using regex
- **LS:** List directory contents
- **Bash:** Execute shell commands
## Chat Commands
- **/help:** Show available commands
- **/compact:** Compress conversation history to save tokens
- **/version:** Show version information
- **/providers:** List available LLM providers
- **/cost:** Show cost and usage information
- **/budget [amount]:** Set a budget limit
- **/quit, /exit:** Exit the application
## Architecture
Claude Code Python Edition is built with a modular architecture:
```
/claude_code/
/lib/
/providers/ # LLM provider implementations
/tools/ # Tool implementations
/context/ # Context management
/ui/ # UI components
/monitoring/ # Cost tracking & metrics
/commands/ # CLI commands
/config/ # Configuration management
/util/ # Utility functions
claude.py # Main CLI entry point
mcp_server.py # Model Context Protocol server
```
## Using with Model Context Protocol
### Using Claude Code as an MCP Server
Once the MCP server is running, you can connect to it from Claude Desktop or other MCP-compatible clients:
1. Install and run the MCP server:
```bash
python claude.py serve
```
2. Open the configuration page in your browser:
```
http://localhost:8000
```
3. Follow the instructions to configure Claude Desktop, including:
- Copy the JSON configuration
- Download the auto-configured JSON file
- Step-by-step setup instructions
### Using Claude Code as an MCP Client
To connect to any MCP server using Claude Code:
1. Ensure you have your Anthropic API key in the environment or .env file
2. Start the MCP server you want to connect to
3. Connect using the MCP client:
```bash
python claude.py mcp-client path/to/server.py
```
4. Type queries in the interactive chat interface
### Using Multi-Agent Mode
For complex tasks, the multi-agent mode allows multiple specialized agents to collaborate:
1. Create an agent configuration file or use the provided example
2. Start your MCP server
3. Launch the multi-agent client:
```bash
python claude.py mcp-multi-agent path/to/server.py --config examples/agents_config.json
```
4. Use the command interface to interact with multiple agents:
- Type a message to broadcast to all agents
- Use `/talk Agent_Name message` for direct communication
- Use `/agents` to see all available agents
- Use `/history` to view the conversation history
## Contributing
1. Fork the repository
2. Create a feature branch
3. Implement your changes with tests
4. Submit a pull request
## License
MIT
## Acknowledgments
This project is inspired by Anthropic's Claude Code CLI tool, reimplemented in Python with additional features for enhanced visibility, cost management, and MCP server capabilities.# OpenAI Code Assistant
A powerful command-line and API-based coding assistant that uses OpenAI APIs with function calling and streaming.
## Features
- Interactive CLI for coding assistance
- Web API for integration with other applications
- Model Context Protocol (MCP) server implementation
- Replication support for high availability
- Tool-based architecture for extensibility
- Reinforcement learning for tool optimization
- Web client for browser-based interaction
## Installation
1. Clone the repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Set your OpenAI API key:
```bash
export OPENAI_API_KEY=your_api_key
```
## Usage
### CLI Mode
Run the assistant in interactive CLI mode:
```bash
python cli.py
```
Options:
- `--model`, `-m`: Specify the model to use (default: gpt-4o)
- `--temperature`, `-t`: Set temperature for response generation (default: 0)
- `--verbose`, `-v`: Enable verbose output with additional information
- `--enable-rl/--disable-rl`: Enable/disable reinforcement learning for tool optimization
- `--rl-update`: Manually trigger an update of the RL model
### API Server Mode
Run the assistant as an API server:
```bash
python cli.py serve
```
Options:
- `--host`: Host address to bind to (default: 127.0.0.1)
- `--port`, `-p`: Port to listen on (default: 8000)
- `--workers`, `-w`: Number of worker processes (default: 1)
- `--enable-replication`: Enable replication across instances
- `--primary/--secondary`: Whether this is a primary or secondary instance
- `--peer`: Peer instances to replicate with (host:port), can be specified multiple times
### MCP Server Mode
Run the assistant as a Model Context Protocol (MCP) server:
```bash
python cli.py mcp-serve
```
Options:
- `--host`: Host address to bind to (default: 127.0.0.1)
- `--port`, `-p`: Port to listen on (default: 8000)
- `--dev`: Enable development mode with additional logging
- `--dependencies`: Additional Python dependencies to install
- `--env-file`: Path to .env file with environment variables
### MCP Client Mode
Connect to an MCP server using the assistant as the reasoning engine:
```bash
python cli.py mcp-client path/to/server.py
```
Options:
- `--model`, `-m`: Model to use for reasoning (default: gpt-4o)
- `--host`: Host address for the MCP server (default: 127.0.0.1)
- `--port`, `-p`: Port for the MCP server (default: 8000)
### Deployment Script
For easier deployment, use the provided script:
```bash
./deploy.sh --host 0.0.0.0 --port 8000 --workers 4
```
To enable replication:
```bash
# Primary instance
./deploy.sh --enable-replication --port 8000
# Secondary instance
./deploy.sh --enable-replication --secondary --port 8001 --peer 127.0.0.1:8000
```
### Web Client
To use the web client, open `web-client.html` in your browser. Make sure the API server is running.
## API Endpoints
### Standard API Endpoints
- `POST /conversation`: Create a new conversation
- `POST /conversation/{conversation_id}/message`: Send a message to a conversation
- `POST /conversation/{conversation_id}/message/stream`: Stream a message response
- `GET /conversation/{conversation_id}`: Get conversation details
- `DELETE /conversation/{conversation_id}`: Delete a conversation
- `GET /health`: Health check endpoint
### MCP Protocol Endpoints
- `GET /`: Health check (MCP protocol)
- `POST /context`: Get context for a prompt template
- `GET /prompts`: List available prompt templates
- `GET /prompts/{prompt_id}`: Get a specific prompt template
- `POST /prompts`: Create a new prompt template
- `PUT /prompts/{prompt_id}`: Update an existing prompt template
- `DELETE /prompts/{prompt_id}`: Delete a prompt template
## Replication
The replication system allows running multiple instances of the assistant with synchronized state. This provides:
- High availability
- Load balancing
- Fault tolerance
To set up replication:
1. Start a primary instance with `--enable-replication`
2. Start secondary instances with `--enable-replication --secondary --peer [primary-host:port]`
## Tools
The assistant includes various tools:
- Weather: Get current weather for a location
- View: Read files from the filesystem
- Edit: Edit files
- Replace: Write files
- Bash: Execute bash commands
- GlobTool: File pattern matching
- GrepTool: Content search
- LS: List directory contents
- JinaSearch: Web search using Jina.ai
- JinaFactCheck: Fact checking using Jina.ai
- JinaReadURL: Read and summarize webpages
## CLI Commands
- `/help`: Show help message
- `/compact`: Compact the conversation to reduce token usage
- `/status`: Show token usage and session information
- `/config`: Show current configuration settings
- `/rl-status`: Show RL tool optimizer status (if enabled)
- `/rl-update`: Update the RL model manually (if enabled)
- `/rl-stats`: Show tool usage statistics (if enabled)
```
--------------------------------------------------------------------------------
/claude_code/config/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/lib/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/lib/context/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/lib/monitoring/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/lib/ui/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/util/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/claude_code/commands/__init__.py:
--------------------------------------------------------------------------------
```python
"""Commands package for Claude Code."""
from claude_code.commands import serve
from claude_code.commands import client
```
--------------------------------------------------------------------------------
/claude_code/examples/claude_mcp_config.json:
--------------------------------------------------------------------------------
```json
{
"name": "Claude Code Tools",
"type": "local_process",
"command": "python",
"args": ["claude.py", "serve"],
"workingDirectory": "/path/to/claude-code-directory",
"environment": {},
"description": "A Model Context Protocol server for Claude Code tools"
}
```
--------------------------------------------------------------------------------
/claude_code/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Claude Code Python Edition - A powerful LLM-powered CLI for software development.
This package provides a Python reimplementation of Claude Code with enhanced
real-time tool visualization and cost management features.
"""
__version__ = "0.1.0"
__author__ = "Claude Code Team"
```
--------------------------------------------------------------------------------
/claude_code/lib/rl/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Reinforcement Learning module for Claude Code.
This package contains implementations of MCTS and GRPO for decision making.
"""
from .mcts import AdvancedMCTS, MCTSToolSelector
from .grpo import GRPO, MultiAgentGroupRL, ToolSelectionGRPO
__all__ = [
"AdvancedMCTS",
"MCTSToolSelector",
"GRPO",
"MultiAgentGroupRL",
"ToolSelectionGRPO",
]
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
# Core dependencies
openai>=1.0.0
anthropic>=0.8.0
python-dotenv>=1.0.0
pydantic>=2.0.0
requests>=2.0.0
fastmcp>=0.4.1
# CLI and UI
typer>=0.9.0
rich>=10.0.0
prompt_toolkit>=3.0.0
# Tools and utilities
tiktoken>=0.3.0
tokenizers>=0.13.0
regex>=2022.0.0
GitPython>=3.1.0
pygments>=2.15.0
# Performance
tqdm>=4.65.0
concurrent-log-handler>=0.9.0
# Machine Learning and Optimization
torch>=2.0.0
numpy>=1.20.0
sentence-transformers>=2.2.0
# Testing
pytest>=7.0.0
pytest-cov>=4.0.0
# Web API
fastapi>=0.100.0
uvicorn>=0.23.0
```
--------------------------------------------------------------------------------
/claude_code/lib/tools/__init__.py:
--------------------------------------------------------------------------------
```python
"""Tools module for Claude Code Python Edition."""
from .base import Tool, ToolParameter, ToolResult, ToolRegistry, tool
from .manager import ToolExecutionManager
from .file_tools import register_file_tools
from .search_tools import register_search_tools
from .code_tools import register_code_tools
from .ai_tools import register_ai_tools
__all__ = [
"Tool",
"ToolParameter",
"ToolResult",
"ToolRegistry",
"ToolExecutionManager",
"tool",
"register_file_tools",
"register_search_tools",
"register_code_tools",
"register_ai_tools"
]
def register_all_tools(registry: ToolRegistry = None) -> ToolRegistry:
"""Register all available tools with the registry.
Args:
registry: Existing registry or None to create a new one
Returns:
Tool registry with all tools registered
"""
if registry is None:
registry = ToolRegistry()
# Register tool categories
register_file_tools(registry)
register_search_tools(registry)
register_code_tools(registry)
register_ai_tools(registry)
# Load saved routines
registry.load_routines()
return registry
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("requirements.txt", "r", encoding="utf-8") as f:
requirements = [line.strip() for line in f.readlines() if line.strip()]
setup(
name="claude_code",
version="0.1.0",
author="Claude Code Team",
author_email="[email protected]",
description="Python recreation of Claude Code with enhanced features",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/yourusername/claude-code-python",
packages=find_packages(),
install_requires=requirements,
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Software Development :: User Interfaces",
],
python_requires=">=3.10",
entry_points={
"console_scripts": [
"claude-code=claude_code.claude:app",
],
},
)
```
--------------------------------------------------------------------------------
/claude_code/examples/agents_config.json:
--------------------------------------------------------------------------------
```json
[
{
"name": "Researcher",
"role": "research specialist",
"model": "claude-3-5-sonnet-20241022",
"system_prompt": "You are a research specialist participating in a multi-agent conversation. Your primary role is to find information, analyze data, and provide well-researched answers. You should use tools to gather information and verify facts. Always cite your sources when possible."
},
{
"name": "Coder",
"role": "programming expert",
"model": "claude-3-5-sonnet-20241022",
"system_prompt": "You are a coding expert participating in a multi-agent conversation. Your primary role is to write, debug, and explain code. You should use tools to test your code and provide working solutions. Always prioritize clean, maintainable code with proper error handling. You can collaborate with other agents to solve complex problems."
},
{
"name": "Critic",
"role": "critical thinker",
"model": "claude-3-5-sonnet-20241022",
"system_prompt": "You are a critical thinker participating in a multi-agent conversation. Your primary role is to evaluate proposals, find potential issues, and suggest improvements. You should question assumptions, point out flaws, and help refine ideas. Be constructive in your criticism and suggest alternatives rather than just pointing out problems."
}
]
```
--------------------------------------------------------------------------------
/claude_code/examples/echo_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Example Echo MCP Server for testing the Claude Code MCP client.
This server provides a simple 'echo' tool that returns whatever is sent to it.
"""
from fastmcp import FastMCP
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create the MCP server
echo_server = FastMCP(
"Echo Server",
description="A simple echo server for testing MCP clients",
dependencies=[]
)
@echo_server.tool(name="echo", description="Echoes back the input message")
async def echo(message: str) -> str:
"""Echo back the input message.
Args:
message: The message to echo back
Returns:
The same message
"""
logger.info(f"Received message: {message}")
return f"Echo: {message}"
@echo_server.tool(name="reverse", description="Reverses the input message")
async def reverse(message: str) -> str:
"""Reverse the input message.
Args:
message: The message to reverse
Returns:
The reversed message
"""
logger.info(f"Reversing message: {message}")
return f"Reversed: {message[::-1]}"
@echo_server.resource("echo://{message}")
def echo_resource(message: str) -> str:
"""Echo resource.
Args:
message: The message to echo
Returns:
The echoed message
"""
return f"Resource Echo: {message}"
if __name__ == "__main__":
# Run the server
echo_server.run()
```
--------------------------------------------------------------------------------
/examples/agents_config.json:
--------------------------------------------------------------------------------
```json
{
"agents": [
{
"name": "CodeExpert",
"role": "primary",
"system_prompt": "You are a code expert specializing in software development. Focus on providing high-quality code solutions, explaining code concepts, and helping with debugging issues. You should prioritize code quality, readability, and best practices.",
"model": "gpt-4o",
"temperature": 0.0
},
{
"name": "Architect",
"role": "specialist",
"system_prompt": "You are a software architect specializing in system design. Focus on providing high-level architectural guidance, design patterns, and system organization advice. You should think about scalability, maintainability, and overall system structure.",
"model": "gpt-4o",
"temperature": 0.1
},
{
"name": "SecurityExpert",
"role": "specialist",
"system_prompt": "You are a security expert specializing in identifying and fixing security vulnerabilities in code. Focus on security best practices, potential vulnerabilities, and secure coding patterns. Always prioritize security considerations in your advice.",
"model": "gpt-4o",
"temperature": 0.0
}
],
"coordination": {
"strategy": "round_robin",
"primary_agent": "CodeExpert",
"auto_delegation": true,
"voting_threshold": 0.6
},
"settings": {
"max_turns_per_agent": 3,
"enable_agent_reflection": true,
"enable_cross_agent_communication": true,
"enable_user_selection": true
}
}
```
--------------------------------------------------------------------------------
/deploy.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# OpenAI Code Assistant Deployment Script
# Default values
HOST="127.0.0.1"
PORT=8000
WORKERS=1
ENABLE_REPLICATION=false
PRIMARY=true
PEERS=""
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--host)
HOST="$2"
shift 2
;;
--port)
PORT="$2"
shift 2
;;
--workers)
WORKERS="$2"
shift 2
;;
--enable-replication)
ENABLE_REPLICATION=true
shift
;;
--secondary)
PRIMARY=false
shift
;;
--peer)
if [ -z "$PEERS" ]; then
PEERS="--peer $2"
else
PEERS="$PEERS --peer $2"
fi
shift 2
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
# Check if OpenAI API key is set
if [ -z "$OPENAI_API_KEY" ]; then
echo "Error: OPENAI_API_KEY environment variable is not set"
echo "Please set it with: export OPENAI_API_KEY=your_api_key"
exit 1
fi
# Create log directory if it doesn't exist
mkdir -p logs
# Start the server
echo "Starting OpenAI Code Assistant API Server..."
echo "Host: $HOST"
echo "Port: $PORT"
echo "Workers: $WORKERS"
echo "Replication: $ENABLE_REPLICATION"
echo "Role: $([ "$PRIMARY" = true ] && echo "Primary" || echo "Secondary")"
echo "Peers: $PEERS"
# Build the command
CMD="python cli.py serve --host $HOST --port $PORT --workers $WORKERS"
if [ "$ENABLE_REPLICATION" = true ]; then
CMD="$CMD --enable-replication"
fi
if [ "$PRIMARY" = false ]; then
CMD="$CMD --secondary"
fi
if [ -n "$PEERS" ]; then
CMD="$CMD $PEERS"
fi
# Run the command
echo "Running: $CMD"
$CMD > logs/server_$(date +%Y%m%d_%H%M%S).log 2>&1 &
# Save the PID
echo $! > server.pid
echo "Server started with PID $(cat server.pid)"
echo "Logs are being written to logs/server_*.log"
```
--------------------------------------------------------------------------------
/claude.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Main entry point for Claude Code."""
import os
import sys
import argparse
import logging
from typing import Optional, List, Dict, Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def main() -> int:
"""Main entry point for Claude Code.
Returns:
Exit code
"""
# Create the main parser
parser = argparse.ArgumentParser(
description="Claude Code - A powerful LLM-powered CLI for software development"
)
# Add version information
from claude_code import __version__
parser.add_argument(
"--version",
action="version",
version=f"Claude Code v{__version__}"
)
# Create subparsers for commands
subparsers = parser.add_subparsers(
title="commands",
dest="command",
help="Command to execute"
)
# Add the chat command (default)
chat_parser = subparsers.add_parser(
"chat",
help="Start an interactive chat session with Claude Code"
)
# Add chat-specific arguments here
# Add the serve command for MCP server
serve_parser = subparsers.add_parser(
"serve",
help="Start the Claude Code MCP server"
)
# Add serve-specific arguments
from claude_code.commands.serve import add_arguments
add_arguments(serve_parser)
# Parse arguments
args = parser.parse_args()
# If no command specified, default to chat
if not args.command:
args.command = "chat"
# Execute the appropriate command
if args.command == "chat":
# Import and run the chat command
from claude_code.claude import main as chat_main
return chat_main()
elif args.command == "serve":
# Import and run the serve command
from claude_code.commands.serve import execute
return execute(args)
else:
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/claude_code/README_MCP_CLIENT.md:
--------------------------------------------------------------------------------
```markdown
# Claude Code MCP Client
This is an implementation of a Model Context Protocol (MCP) client for Claude Code. It allows you to connect to any MCP-compatible server and interact with it using Claude as the reasoning engine.
## Prerequisites
- Python 3.8 or later
- Anthropic API key (set in your environment or `.env` file)
- Required packages: `mcp`, `anthropic`, `python-dotenv`
## Installation
The MCP client is included as part of Claude Code. If you have Claude Code installed, you already have access to the MCP client.
If you need to install the dependencies separately:
```bash
pip install mcp anthropic python-dotenv
```
## Usage
### Command Line Interface
The MCP client can be run directly from the command line:
```bash
# Using the claude command (recommended)
claude mcp-client path/to/server.py [--model MODEL]
# Or by running the client module directly
python -m claude_code.commands.client path/to/server.py [--model MODEL]
```
### Arguments
- `server_script`: Path to the MCP server script (required, must be a `.py` or `.js` file)
- `--model`: Claude model to use (optional, defaults to `claude-3-5-sonnet-20241022`)
### Environment Variables
Create a `.env` file in your project directory with your Anthropic API key:
```
ANTHROPIC_API_KEY=your_api_key_here
```
## Features
- Connect to any MCP-compatible server (Python or JavaScript)
- Interactive chat interface
- Automatically handles tool calls between Claude and the MCP server
- Maintains conversation context
- Clean resource management with proper error handling
## Example
1. Start your MCP server (e.g., a weather server)
2. Run the MCP client targeting that server:
```bash
claude mcp-client path/to/weather_server.py
```
3. Interact with the server through the client:
```
Query: What's the weather in San Francisco?
[Claude will use the tools provided by the server to answer your query]
```
## Troubleshooting
- If the client can't find the server, double-check the path to your server script
- Ensure your environment variables are correctly set (ANTHROPIC_API_KEY)
- For Node.js servers, make sure Node.js is installed on your system
- The first response might take up to 30 seconds while the server initializes
## Extending the Client
The MCP client is designed to be modular. You can extend its functionality by:
1. Adding custom response processing
2. Implementing specific tool handling
3. Enhancing the user interface
4. Adding support for additional authentication methods
## License
Same as Claude Code
```
--------------------------------------------------------------------------------
/static/style.css:
--------------------------------------------------------------------------------
```css
/* OpenAI Code Assistant MCP Server Dashboard Styles */
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
line-height: 1.6;
color: #333;
background-color: #f8f9fa;
margin: 0;
padding: 20px;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
h1 {
color: #2c3e50;
border-bottom: 2px solid #eee;
padding-bottom: 10px;
margin-bottom: 20px;
}
h2 {
color: #3498db;
margin-top: 30px;
margin-bottom: 15px;
}
.card {
background-color: #fff;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
margin-bottom: 20px;
overflow: hidden;
}
.card-header {
background-color: #f1f1f1;
padding: 12px 15px;
font-weight: bold;
border-bottom: 1px solid #ddd;
}
.card-body {
padding: 15px;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
gap: 20px;
margin-bottom: 30px;
}
.stat-card {
background-color: #fff;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
padding: 15px;
text-align: center;
}
.stat-value {
font-size: 24px;
font-weight: bold;
color: #2980b9;
margin: 10px 0;
}
.stat-label {
color: #7f8c8d;
font-size: 14px;
}
.btn {
display: inline-block;
padding: 8px 16px;
margin-right: 10px;
border-radius: 4px;
text-decoration: none;
font-weight: 500;
cursor: pointer;
border: none;
}
.btn-primary {
background-color: #3498db;
color: white;
}
.btn-secondary {
background-color: #95a5a6;
color: white;
}
.btn-info {
background-color: #2ecc71;
color: white;
}
.template-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 20px;
}
.parameter-list {
list-style-type: none;
padding-left: 0;
}
.parameter-list li {
padding: 5px 0;
border-bottom: 1px solid #eee;
}
.parameter-list li:last-child {
border-bottom: none;
}
.tag {
display: inline-block;
background-color: #e0f7fa;
color: #0097a7;
padding: 3px 8px;
border-radius: 4px;
font-size: 12px;
margin-right: 5px;
}
.footer {
margin-top: 40px;
padding-top: 20px;
border-top: 1px solid #eee;
text-align: center;
color: #7f8c8d;
font-size: 14px;
}
/* Responsive adjustments */
@media (max-width: 768px) {
.stats-grid, .template-grid {
grid-template-columns: 1fr;
}
}
```
--------------------------------------------------------------------------------
/claude_code/lib/providers/__init__.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/providers/__init__.py
"""LLM provider module."""
import logging
import os
from typing import Dict, Type, Optional
from .base import BaseProvider
from .openai import OpenAIProvider
logger = logging.getLogger(__name__)
# Registry of provider classes
PROVIDER_REGISTRY: Dict[str, Type[BaseProvider]] = {
"openai": OpenAIProvider,
}
# Try to import other providers if available
try:
from .anthropic import AnthropicProvider
PROVIDER_REGISTRY["anthropic"] = AnthropicProvider
except ImportError:
logger.debug("Anthropic provider not available")
try:
from .local import LocalProvider
PROVIDER_REGISTRY["local"] = LocalProvider
except ImportError:
logger.debug("Local provider not available")
def get_provider(name: Optional[str] = None, **kwargs) -> BaseProvider:
"""Get a provider instance by name.
Args:
name: Provider name, or None to use default provider
**kwargs: Additional arguments to pass to the provider constructor
Returns:
Provider instance
Raises:
ValueError: If provider is not found
"""
# If name is not specified, try to infer from environment
if name is None:
if os.environ.get("OPENAI_API_KEY"):
name = "openai"
elif os.environ.get("ANTHROPIC_API_KEY"):
name = "anthropic"
else:
# Default to OpenAI if nothing else is available
name = "openai"
if name.lower() not in PROVIDER_REGISTRY:
raise ValueError(f"Provider {name} not found. Available providers: {', '.join(PROVIDER_REGISTRY.keys())}")
provider_class = PROVIDER_REGISTRY[name.lower()]
return provider_class(**kwargs)
def list_available_providers() -> Dict[str, Dict]:
"""List all available providers and their models.
Returns:
Dictionary mapping provider names to information about them
"""
result = {}
for name, provider_class in PROVIDER_REGISTRY.items():
try:
# Create a temporary instance to get model information
# This might fail if API keys are not available
instance = provider_class()
result[name] = {
"name": instance.name,
"available": True,
"models": instance.available_models,
"current_model": instance.current_model
}
except Exception as e:
# Provider is available but not configured correctly
result[name] = {
"name": name.capitalize(),
"available": False,
"error": str(e),
"models": [],
"current_model": None
}
return result
```
--------------------------------------------------------------------------------
/data/prompt_templates.json:
--------------------------------------------------------------------------------
```json
{
"greeting": {
"template": "Hello! The current time is {time}. How can I help you today?",
"description": "A simple greeting template",
"parameters": {
"time": {
"type": "string",
"description": "The current time"
}
},
"default_model": "gpt-4o",
"metadata": {
"category": "general"
}
},
"code_review": {
"template": "Please review the following code:\n\n```{language}\n{code}\n```\n\nFocus on: {focus_areas}",
"description": "Template for code review requests",
"parameters": {
"language": {
"type": "string",
"description": "Programming language of the code"
},
"code": {
"type": "string",
"description": "The code to review"
},
"focus_areas": {
"type": "string",
"description": "Areas to focus on during review (e.g., 'performance, security')"
}
},
"default_model": "gpt-4o",
"metadata": {
"category": "development"
}
},
"system_prompt": {
"template": "You are OpenAI Code Assistant, a CLI tool that helps users with software engineering tasks and general information.\nUse the available tools to assist the user with their requests.\n\n# Tone and style\nYou should be concise, direct, and to the point. When you run a non-trivial bash command, \nyou should explain what the command does and why you are running it.\nOutput text to communicate with the user; all text you output outside of tool use is displayed to the user.\nRemember that your output will be displayed on a command line interface.\n\n# Tool usage policy\n- When doing file search, remember to search effectively with the available tools.\n- Always use the appropriate tool for the task.\n- Use parallel tool calls when appropriate to improve performance.\n- NEVER commit changes unless the user explicitly asks you to.\n- For weather queries, use the Weather tool to provide real-time information.\n\n# Tasks\nThe user will primarily request you perform software engineering tasks:\n1. Solving bugs\n2. Adding new functionality \n3. Refactoring code\n4. Explaining code\n5. Writing tests\n\nFor these tasks:\n1. Use search tools to understand the codebase\n2. Implement solutions using the available tools\n3. Verify solutions with tests if possible\n4. Run lint and typecheck commands when appropriate\n\nThe user may also ask for general information:\n1. Weather conditions\n2. Simple calculations\n3. General knowledge questions\n\n# Code style\n- Follow the existing code style of the project\n- Maintain consistent naming conventions\n- Use appropriate libraries that are already in the project\n- Add comments when code is complex or non-obvious\n\nIMPORTANT: You should minimize output tokens as much as possible while maintaining helpfulness, \nquality, and accuracy. Answer concisely with short lines of text unless the user asks for detail.",
"description": "System prompt for the assistant",
"parameters": {},
"default_model": "gpt-4o",
"metadata": {
"category": "system"
}
}
}
```
--------------------------------------------------------------------------------
/install.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# Installation script for Claude Code Python Edition
# Set up colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
echo -e "${GREEN}Installing Claude Code Python Edition...${NC}"
# Check Python version
python_version=$(python3 --version 2>&1 | awk '{print $2}')
echo -e "${YELLOW}Detected Python version: ${python_version}${NC}"
# Check if Python version is at least 3.10
if [[ $(echo "${python_version}" | cut -d. -f1,2 | sed 's/\.//') -lt 310 ]]; then
echo -e "${RED}Error: Python 3.10 or higher is required.${NC}"
exit 1
fi
# Create virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
echo -e "${YELLOW}Creating virtual environment...${NC}"
python3 -m venv venv
if [ $? -ne 0 ]; then
echo -e "${RED}Error creating virtual environment.${NC}"
exit 1
fi
else
echo -e "${YELLOW}Using existing virtual environment.${NC}"
fi
# Activate virtual environment
echo -e "${YELLOW}Activating virtual environment...${NC}"
source venv/bin/activate
if [ $? -ne 0 ]; then
echo -e "${RED}Error activating virtual environment.${NC}"
exit 1
fi
# Install dependencies
echo -e "${YELLOW}Installing dependencies...${NC}"
pip install -r requirements.txt
if [ $? -ne 0 ]; then
echo -e "${RED}Error installing dependencies.${NC}"
exit 1
fi
# Install in development mode
echo -e "${YELLOW}Installing Claude Code in development mode...${NC}"
pip install -e .
if [ $? -ne 0 ]; then
echo -e "${RED}Error installing package.${NC}"
exit 1
fi
# Create .env file if it doesn't exist
if [ ! -f ".env" ]; then
echo -e "${YELLOW}Creating .env file...${NC}"
cat > .env << EOF
# API Keys (uncomment and add your keys)
# OPENAI_API_KEY=your_openai_api_key
# ANTHROPIC_API_KEY=your_anthropic_api_key
# Models (optional)
# OPENAI_MODEL=gpt-4o
# ANTHROPIC_MODEL=claude-3-opus-20240229
# Budget limit in dollars (optional)
# BUDGET_LIMIT=5.0
EOF
echo -e "${YELLOW}Created .env file. Please edit it to add your API keys.${NC}"
else
echo -e "${YELLOW}.env file already exists. Skipping creation.${NC}"
fi
# Create setup.py if it doesn't exist
if [ ! -f "setup.py" ]; then
echo -e "${YELLOW}Creating setup.py...${NC}"
cat > setup.py << EOF
from setuptools import setup, find_packages
setup(
name="claude_code",
version="0.1.0",
packages=find_packages(),
install_requires=[
line.strip() for line in open("requirements.txt", "r").readlines()
],
entry_points={
"console_scripts": [
"claude-code=claude_code.claude:app",
],
},
)
EOF
echo -e "${YELLOW}Created setup.py file.${NC}"
else
echo -e "${YELLOW}setup.py file already exists. Skipping creation.${NC}"
fi
echo -e "${GREEN}Installation complete!${NC}"
echo -e "${YELLOW}To activate the virtual environment, run:${NC}"
echo -e " source venv/bin/activate"
echo -e "${YELLOW}To run Claude Code, use:${NC}"
echo -e " claude-code"
echo -e "${YELLOW}Or:${NC}"
echo -e " python -m claude_code.claude"
echo -e "${GREEN}Enjoy using Claude Code Python Edition!${NC}"
```
--------------------------------------------------------------------------------
/claude_code/commands/serve.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/commands/serve.py
"""Command to start the MCP server."""
import os
import sys
import logging
import argparse
from typing import Dict, Any, Optional, List
from claude_code.mcp_server import initialize_server
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def add_arguments(parser: argparse.ArgumentParser) -> None:
"""Add command-specific arguments to the parser.
Args:
parser: Argument parser
"""
parser.add_argument(
"--dev",
action="store_true",
help="Run in development mode with the MCP Inspector"
)
parser.add_argument(
"--host",
type=str,
default="localhost",
help="Host to bind the server to"
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind the server to"
)
parser.add_argument(
"--dependencies",
type=str,
nargs="*",
help="Additional dependencies to install"
)
parser.add_argument(
"--env-file",
type=str,
help="Path to environment file (.env)"
)
def execute(args: argparse.Namespace) -> int:
"""Execute the serve command.
Args:
args: Command arguments
Returns:
Exit code
"""
try:
# Initialize the MCP server
mcp_server = initialize_server()
# Add any additional dependencies
if args.dependencies:
for dep in args.dependencies:
mcp_server.dependencies.append(dep)
# Load environment variables from file
if args.env_file:
if not os.path.exists(args.env_file):
logger.error(f"Environment file not found: {args.env_file}")
return 1
import dotenv
dotenv.load_dotenv(args.env_file)
# Run the server
if args.dev:
logger.info(f"Starting MCP server in development mode on {args.host}:{args.port}")
# Use the fastmcp dev mode
import subprocess
cmd = [
"fastmcp", "dev",
"--module", "claude_code.mcp_server:mcp",
"--host", args.host,
"--port", str(args.port)
]
return subprocess.call(cmd)
else:
# Run directly
logger.info(f"Starting MCP server on {args.host}:{args.port}")
logger.info(f"Visit http://{args.host}:{args.port} for Claude Desktop configuration instructions")
# FastMCP.run() method signature changed to accept host/port
try:
mcp_server.run(host=args.host, port=args.port)
except TypeError:
# Fallback for older versions of FastMCP
logger.info("Using older FastMCP version without host/port parameters")
mcp_server.run()
return 0
except Exception as e:
logger.exception(f"Error running MCP server: {e}")
return 1
def main() -> int:
"""Run the serve command as a standalone script."""
parser = argparse.ArgumentParser(description="Run the Claude Code MCP server")
add_arguments(parser)
args = parser.parse_args()
return execute(args)
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/claude_code/lib/providers/base.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/providers/base.py
"""Base provider interface for LLM integration."""
import abc
from typing import Dict, List, Generator, Optional, Any, Union
class BaseProvider(abc.ABC):
"""Abstract base class for LLM providers.
This class defines the interface that all LLM providers must implement.
Providers are responsible for:
- Generating completions from LLMs
- Counting tokens
- Managing rate limits
- Tracking costs
"""
@property
@abc.abstractmethod
def name(self) -> str:
"""Get the name of the provider."""
pass
@property
@abc.abstractmethod
def available_models(self) -> List[str]:
"""Get a list of available models from this provider."""
pass
@property
@abc.abstractmethod
def current_model(self) -> str:
"""Get the currently selected model."""
pass
@abc.abstractmethod
def set_model(self, model_name: str) -> None:
"""Set the current model.
Args:
model_name: The name of the model to use
Raises:
ValueError: If the model is not available
"""
pass
@abc.abstractmethod
def generate_completion(self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
temperature: float = 0.0,
stream: bool = True) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""Generate a completion from the provider.
Args:
messages: List of message dictionaries
tools: Optional list of tool dictionaries
temperature: Model temperature (0-1)
stream: Whether to stream the response
Returns:
If stream=True, returns a generator of response chunks
If stream=False, returns the complete response
"""
pass
@abc.abstractmethod
def count_tokens(self, text: str) -> int:
"""Count tokens in text.
Args:
text: The text to count tokens for
Returns:
The number of tokens in the text
"""
pass
@abc.abstractmethod
def count_message_tokens(self, messages: List[Dict[str, Any]]) -> Dict[str, int]:
"""Count tokens in a message list.
Args:
messages: List of message dictionaries
Returns:
Dictionary with 'input' and 'output' token counts
"""
pass
@abc.abstractmethod
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the current model.
Returns:
Dictionary with model information including:
- context_window: Maximum context window size
- input_cost_per_1k: Cost per 1K input tokens
- output_cost_per_1k: Cost per 1K output tokens
- capabilities: List of model capabilities
"""
pass
@property
@abc.abstractmethod
def cost_per_1k_tokens(self) -> Dict[str, float]:
"""Get cost per 1K tokens for input and output.
Returns:
Dictionary with 'input' and 'output' costs
"""
pass
@abc.abstractmethod
def validate_api_key(self) -> bool:
"""Validate the API key.
Returns:
True if the API key is valid, False otherwise
"""
pass
@abc.abstractmethod
def get_rate_limit_info(self) -> Dict[str, Any]:
"""Get rate limit information.
Returns:
Dictionary with rate limit information
"""
pass
```
--------------------------------------------------------------------------------
/claude_code/README_MULTI_AGENT.md:
--------------------------------------------------------------------------------
```markdown
# Claude Code Multi-Agent MCP Client
This is an implementation of a multi-agent Model Context Protocol (MCP) client for Claude Code. It allows you to run multiple Claude-powered agents that can communicate with each other while connected to the same MCP server.
## Key Features
- **Multiple Specialized Agents**: Run agents with different roles and prompts simultaneously
- **Agent Synchronization**: Agents automatically share messages and respond to each other
- **Direct & Broadcast Messaging**: Send messages to specific agents or broadcast to all
- **Rich Interface**: Colorful terminal interface with command-based controls
- **Message History**: Track all conversations between agents
- **Customizable Roles**: Define agent specializations through configuration files
## Prerequisites
- Python 3.8 or later
- Anthropic API key (set in your environment or `.env` file)
- Required packages: `mcp`, `anthropic`, `dotenv`, `rich`
## Usage
### Command Line Interface
The multi-agent client can be run directly from the command line:
```bash
# Using the claude command (recommended)
claude mcp-multi-agent path/to/server.py [--config CONFIG_FILE]
# Or by running the client module directly
python -m claude_code.commands.multi_agent_client path/to/server.py [--config CONFIG_FILE]
```
### Arguments
- `server_script`: Path to the MCP server script (required, must be a `.py` or `.js` file)
- `--config`: Path to agent configuration JSON file (optional, default uses a single assistant agent)
### Environment Variables
Create a `.env` file in your project directory with your Anthropic API key:
```
ANTHROPIC_API_KEY=your_api_key_here
```
## Agent Configuration
Create a JSON file to define your agents:
```json
[
{
"name": "Researcher",
"role": "research specialist",
"model": "claude-3-5-sonnet-20241022",
"system_prompt": "You are a research specialist participating in a multi-agent conversation. Your primary role is to find information, analyze data, and provide well-researched answers."
},
{
"name": "Coder",
"role": "programming expert",
"model": "claude-3-5-sonnet-20241022",
"system_prompt": "You are a coding expert participating in a multi-agent conversation. Your primary role is to write, debug, and explain code."
}
]
```
## Interactive Commands
When running the multi-agent client, you can use these commands:
- `/help`: Show available commands
- `/agents`: List all active agents
- `/talk <agent> <message>`: Send a direct message to a specific agent
- `/history`: Show message history
- `/quit`, `/exit`: Exit the application
To broadcast a message to all agents, simply type your message without any command.
## Example Session
This is a sample session with the multi-agent client:
1. Start a server:
```bash
python examples/echo_server.py
```
2. Start the multi-agent client:
```bash
claude mcp-multi-agent examples/echo_server.py --config examples/agents_config.json
```
3. Broadcast a message to all agents:
```
> I need to analyze some data and then create a visualization
```
4. Send a direct message to the researcher agent:
```
> /talk Researcher What statistical methods would be best for this analysis?
```
5. View the message history:
```
> /history
```
## Use Cases
The multi-agent client is particularly useful for:
1. **Complex Problem Solving**: Break down problems into parts handled by specialized agents
2. **Collaborative Development**: Use a researcher, coder, and critic to develop better solutions
3. **Debate and Refinement**: Have agents with different perspectives refine ideas
4. **Automated Workflows**: Set up agents that collaborate on tasks without human intervention
5. **Education**: Create teaching scenarios where agents play different roles
## Troubleshooting
- If agents aren't responding to each other, check for errors in your configuration file
- For better performance, use smaller models for simple agents
- Make sure your Anthropic API key has sufficient quota for multiple simultaneous requests
- Use the `/history` command to debug message flow between agents
## License
Same as Claude Code
```
--------------------------------------------------------------------------------
/README_modal_mcp.md:
--------------------------------------------------------------------------------
```markdown
# Modal MCP Server
This project provides an OpenAI-compatible API server running on Modal.com with a Model Context Protocol (MCP) adapter.
## Components
1. **Modal OpenAI-compatible Server** (`modal_mcp_server.py`): A full-featured OpenAI-compatible API server that runs on Modal.com's infrastructure.
2. **MCP Adapter** (`mcp_modal_adapter.py`): A FastAPI server that adapts the OpenAI API to the Model Context Protocol (MCP).
3. **Deployment Script** (`deploy_modal_mcp.py`): A helper script to deploy both components.
## Features
- **OpenAI-compatible API**: Full compatibility with OpenAI's chat completions API
- **Multiple Models**: Support for various models including Llama 3, Phi-4, DeepSeek-R1, and more
- **Streaming Support**: Real-time streaming of model outputs
- **Advanced Caching**: Efficient caching of responses for improved performance
- **Rate Limiting**: Token bucket algorithm for fair API usage
- **MCP Compatibility**: Adapter for Model Context Protocol support
## Prerequisites
- Python 3.10+
- Modal.com account and CLI set up (`pip install modal`)
- FastAPI and Uvicorn (`pip install fastapi uvicorn`)
- HTTPX for async HTTP requests (`pip install httpx`)
## Installation
1. Install dependencies:
```bash
pip install modal fastapi uvicorn httpx
```
2. Set up Modal CLI:
```bash
modal token new
```
## Deployment
### Option 1: Using the deployment script
The easiest way to deploy is using the provided script:
```bash
python deploy_modal_mcp.py
```
This will:
1. Deploy the OpenAI-compatible server to Modal
2. Start the MCP adapter locally
3. Open a browser to verify the deployment
### Option 2: Manual deployment
1. Deploy the Modal server:
```bash
modal deploy modal_mcp_server.py
```
2. Note the URL of your deployed Modal app.
3. Set environment variables for the MCP adapter:
```bash
export MODAL_API_URL="https://your-modal-app-url.modal.run"
export MODAL_API_KEY="sk-modal-llm-api-key" # Default key
export DEFAULT_MODEL="phi-4" # Or any other supported model
```
4. Start the MCP adapter:
```bash
uvicorn mcp_modal_adapter:app --host 0.0.0.0 --port 8000
```
## Usage
### MCP API Endpoints
- `GET /health`: Health check endpoint
- `GET /prompts`: List available prompt templates
- `GET /prompts/{prompt_id}`: Get a specific prompt template
- `POST /context/{prompt_id}`: Generate context from a prompt template
- `POST /prompts`: Add a new prompt template
- `DELETE /prompts/{prompt_id}`: Delete a prompt template
### Example: Generate context
```bash
curl -X POST "http://localhost:8000/context/default" \
-H "Content-Type: application/json" \
-d '{
"parameters": {
"prompt": "Explain quantum computing in simple terms"
},
"model": "phi-4",
"stream": false
}'
```
### Example: Streaming response
```bash
curl -X POST "http://localhost:8000/context/default" \
-H "Content-Type: application/json" \
-d '{
"parameters": {
"prompt": "Write a short story about AI"
},
"model": "phi-4",
"stream": true
}'
```
## Advanced Configuration
### Adding Custom Prompt Templates
```bash
curl -X POST "http://localhost:8000/prompts" \
-H "Content-Type: application/json" \
-d '{
"id": "code-generator",
"name": "Code Generator",
"description": "Generates code based on a description",
"template": "Write code in {language} that accomplishes the following: {task}",
"parameters": {
"language": {
"type": "string",
"description": "Programming language"
},
"task": {
"type": "string",
"description": "Task description"
}
}
}'
```
### Using Custom Prompt Templates
```bash
curl -X POST "http://localhost:8000/context/code-generator" \
-H "Content-Type: application/json" \
-d '{
"parameters": {
"language": "Python",
"task": "Create a function that calculates the Fibonacci sequence"
},
"model": "phi-4"
}'
```
## Supported Models
- **vLLM Models**:
- `llama3-8b`: Meta Llama 3.1 8B Instruct (quantized)
- `mistral-7b`: Mistral 7B Instruct v0.2
- `tiny-llama-1.1b`: TinyLlama 1.1B Chat
- **Llama.cpp Models**:
- `deepseek-r1`: DeepSeek R1 (quantized)
- `phi-4`: Microsoft Phi-4 (quantized)
- `phi-2`: Microsoft Phi-2 (quantized)
## License
MIT
```
--------------------------------------------------------------------------------
/deploy_modal_mcp.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Deployment script for Modal MCP Server
"""
import os
import sys
import argparse
import subprocess
import webbrowser
import time
from pathlib import Path
def check_dependencies():
"""Check if required dependencies are installed"""
try:
import modal
import httpx
import fastapi
import uvicorn
print("✅ All required dependencies are installed")
return True
except ImportError as e:
print(f"❌ Missing dependency: {e}")
print("Please install required dependencies:")
print("pip install modal httpx fastapi uvicorn")
return False
def deploy_modal_server(args):
"""Deploy the Modal OpenAI-compatible server"""
print("Deploying Modal OpenAI-compatible server...")
# Run the Modal deployment command
cmd = ["modal", "deploy", "modal_mcp_server.py"]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"❌ Error deploying Modal server: {result.stderr}")
return None
# Extract the deployment URL from the output
for line in result.stdout.splitlines():
if "https://" in line and "modal.run" in line:
url = line.strip()
print(f"✅ Modal server deployed at: {url}")
return url
print("❌ Could not find deployment URL in output")
print(result.stdout)
return None
except Exception as e:
print(f"❌ Error deploying Modal server: {e}")
return None
def deploy_mcp_adapter(modal_url, args):
"""Deploy the MCP adapter server"""
print("Deploying MCP adapter server...")
# Set environment variables for the adapter
os.environ["MODAL_API_URL"] = modal_url
os.environ["MODAL_API_KEY"] = args.api_key
os.environ["DEFAULT_MODEL"] = args.model
# Start the adapter server
try:
import uvicorn
from mcp_modal_adapter import app
# Start in a separate process if not in foreground mode
if not args.foreground:
print(f"Starting MCP adapter server on port {args.port}...")
cmd = [
sys.executable, "-m", "uvicorn", "mcp_modal_adapter:app",
"--host", "0.0.0.0", "--port", str(args.port)
]
# Use subprocess.Popen to run in background
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE if not args.verbose else None,
stderr=subprocess.PIPE if not args.verbose else None
)
# Wait a bit to make sure it starts
time.sleep(2)
# Check if process is still running
if process.poll() is None:
print(f"✅ MCP adapter server running on http://localhost:{args.port}")
return f"http://localhost:{args.port}"
else:
stdout, stderr = process.communicate()
print(f"❌ Error starting MCP adapter server: {stderr.decode() if stderr else 'Unknown error'}")
return None
else:
# Run in foreground
print(f"Starting MCP adapter server on port {args.port} in foreground mode...")
uvicorn.run(app, host="0.0.0.0", port=args.port)
return None # Will never reach here in foreground mode
except Exception as e:
print(f"❌ Error starting MCP adapter server: {e}")
return None
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Deploy Modal MCP Server")
parser.add_argument("--port", type=int, default=8000, help="Port for MCP adapter server")
parser.add_argument("--api-key", type=str, default="sk-modal-llm-api-key", help="API key for Modal server")
parser.add_argument("--model", type=str, default="phi-4", help="Default model to use")
parser.add_argument("--foreground", action="store_true", help="Run MCP adapter in foreground")
parser.add_argument("--verbose", action="store_true", help="Show verbose output")
parser.add_argument("--skip-modal-deploy", action="store_true", help="Skip Modal server deployment")
parser.add_argument("--modal-url", type=str, help="Use existing Modal server URL")
args = parser.parse_args()
# Check dependencies
if not check_dependencies():
return 1
# Deploy Modal server if not skipped
modal_url = args.modal_url
if not args.skip_modal_deploy and not modal_url:
modal_url = deploy_modal_server(args)
if not modal_url:
return 1
# Deploy MCP adapter
mcp_url = deploy_mcp_adapter(modal_url, args)
if not mcp_url and not args.foreground:
return 1
# Open browser if not in foreground mode
if mcp_url and not args.foreground:
print(f"Opening browser to MCP server health check...")
webbrowser.open(f"{mcp_url}/health")
print("\nMCP Server is now running!")
print(f"- Health check: {mcp_url}/health")
print(f"- List prompts: {mcp_url}/prompts")
print(f"- Modal API: {modal_url}")
print("\nPress Ctrl+C to stop the server")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping server...")
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/claude_code/commands/client.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/commands/client.py
"""MCP client implementation for testing MCP servers."""
import asyncio
import sys
import os
import logging
import argparse
from typing import Optional, Dict, Any
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from anthropic import Anthropic
from dotenv import load_dotenv
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
class MCPClient:
"""Model Context Protocol client for testing MCP servers."""
def __init__(self, model: str = "claude-3-5-sonnet-20241022"):
"""Initialize the MCP client.
Args:
model: The Claude model to use
"""
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.anthropic = Anthropic()
self.model = model
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server.
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
logger.info(f"Connected to server with tools: {[tool.name for tool in tools]}")
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools.
Args:
query: The user query
Returns:
The response text
"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
} for tool in response.tools]
# Initial Claude API call
response = self.anthropic.messages.create(
model=self.model,
max_tokens=1000,
messages=messages,
tools=available_tools
)
# Process response and handle tool calls
tool_results = []
final_text = []
assistant_message_content = []
for content in response.content:
if content.type == 'text':
final_text.append(content.text)
assistant_message_content.append(content)
elif content.type == 'tool_use':
tool_name = content.name
tool_args = content.input
# Execute tool call
result = await self.session.call_tool(tool_name, tool_args)
tool_results.append({"call": tool_name, "result": result})
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
assistant_message_content.append(content)
messages.append({
"role": "assistant",
"content": assistant_message_content
})
messages.append({
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": content.id,
"content": result.content
}
]
})
# Get next response from Claude
response = self.anthropic.messages.create(
model=self.model,
max_tokens=1000,
messages=messages,
tools=available_tools
)
final_text.append(response.content[0].text)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop."""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
logger.exception("Error processing query")
async def cleanup(self):
"""Clean up resources."""
await self.exit_stack.aclose()
def add_arguments(parser: argparse.ArgumentParser) -> None:
"""Add command-specific arguments to the parser.
Args:
parser: Argument parser
"""
parser.add_argument(
"server_script",
type=str,
help="Path to the server script (.py or .js)"
)
parser.add_argument(
"--model",
type=str,
default="claude-3-5-sonnet-20241022",
help="Claude model to use"
)
def execute(args: argparse.Namespace) -> int:
"""Execute the client command.
Args:
args: Command arguments
Returns:
Exit code
"""
try:
client = MCPClient(model=args.model)
async def run_client():
try:
await client.connect_to_server(args.server_script)
await client.chat_loop()
finally:
await client.cleanup()
asyncio.run(run_client())
return 0
except Exception as e:
logger.exception(f"Error running MCP client: {e}")
print(f"\nError: {str(e)}")
return 1
def main() -> int:
"""Run the client command as a standalone script."""
parser = argparse.ArgumentParser(description="Run the Claude Code MCP client")
add_arguments(parser)
args = parser.parse_args()
return execute(args)
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/examples/echo_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Simple Echo MCP Server Example
This is a basic implementation of a Model Context Protocol (MCP) server
that simply echoes back the parameters it receives.
"""
import os
import json
import time
import uuid
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn
# MCP Protocol Models
class MCPHealthResponse(BaseModel):
status: str = "healthy"
version: str = "1.0.0"
protocol_version: str = "0.1.0"
provider: str = "Echo MCP Server"
models: List[str] = ["echo-model"]
class MCPContextRequest(BaseModel):
prompt_id: str
parameters: Dict[str, Any] = Field(default_factory=dict)
model: Optional[str] = None
stream: bool = False
user: Optional[str] = None
conversation_id: Optional[str] = None
message_id: Optional[str] = None
class MCPContextResponse(BaseModel):
context: str
context_id: str
model: str
usage: Dict[str, int] = Field(default_factory=dict)
metadata: Dict[str, Any] = Field(default_factory=dict)
class MCPPromptTemplate(BaseModel):
id: str
template: str
description: Optional[str] = None
parameters: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
default_model: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class MCPPromptLibraryResponse(BaseModel):
prompts: List[MCPPromptTemplate]
count: int
# Create FastAPI app
app = FastAPI(
title="Echo MCP Server",
description="A simple MCP server that echoes back parameters",
version="1.0.0",
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define prompt templates
prompt_templates = {
"echo": {
"template": "You said: {message}",
"description": "Echoes back the message",
"parameters": {
"message": {
"type": "string",
"description": "The message to echo"
}
},
"default_model": "echo-model",
"metadata": {
"category": "utility"
}
},
"reverse": {
"template": "Reversed: {message}",
"description": "Reverses the message",
"parameters": {
"message": {
"type": "string",
"description": "The message to reverse"
}
},
"default_model": "echo-model",
"metadata": {
"category": "utility"
}
}
}
# MCP Protocol Routes
@app.get("/", response_model=MCPHealthResponse)
async def health_check():
"""Health check endpoint required by MCP protocol"""
return MCPHealthResponse()
@app.post("/context", response_model=MCPContextResponse)
async def get_context(request: MCPContextRequest):
"""Get context for a prompt template with parameters"""
try:
# Check if prompt template exists
if request.prompt_id not in prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{request.prompt_id}' not found"
)
# Get prompt template
template = prompt_templates[request.prompt_id]
# Use default model if not specified
model = request.model or template.get("default_model", "echo-model")
# Generate context ID
context_id = str(uuid.uuid4())
# Process template with parameters
try:
if request.prompt_id == "echo":
context = f"Echo: {request.parameters.get('message', '')}"
elif request.prompt_id == "reverse":
message = request.parameters.get('message', '')
context = f"Reversed: {message[::-1]}"
else:
context = template["template"].format(**request.parameters)
except KeyError as e:
raise HTTPException(
status_code=400,
detail=f"Missing required parameter: {e}"
)
# Calculate token usage (simplified)
token_estimate = len(context.split())
usage = {
"prompt_tokens": token_estimate,
"completion_tokens": 0,
"total_tokens": token_estimate
}
return MCPContextResponse(
context=context,
context_id=context_id,
model=model,
usage=usage,
metadata={
"prompt_id": request.prompt_id,
"timestamp": time.time()
}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing context: {str(e)}"
)
@app.get("/prompts", response_model=MCPPromptLibraryResponse)
async def get_prompts():
"""Get available prompt templates"""
prompts = [
MCPPromptTemplate(
id=prompt_id,
template=template["template"],
description=template.get("description", ""),
parameters=template.get("parameters", {}),
default_model=template.get("default_model", "echo-model"),
metadata=template.get("metadata", {})
)
for prompt_id, template in prompt_templates.items()
]
return MCPPromptLibraryResponse(
prompts=prompts,
count=len(prompts)
)
@app.get("/prompts/{prompt_id}", response_model=MCPPromptTemplate)
async def get_prompt(prompt_id: str):
"""Get a specific prompt template"""
if prompt_id not in prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{prompt_id}' not found"
)
template = prompt_templates[prompt_id]
return MCPPromptTemplate(
id=prompt_id,
template=template["template"],
description=template.get("description", ""),
parameters=template.get("parameters", {}),
default_model=template.get("default_model", "echo-model"),
metadata=template.get("metadata", {})
)
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Handle HTTP exceptions in MCP format"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"error_type": "http_error",
"status_code": exc.status_code,
"details": exc.detail if isinstance(exc.detail, dict) else None
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle general exceptions in MCP format"""
return JSONResponse(
status_code=500,
content={
"error": str(exc),
"error_type": "server_error",
"status_code": 500,
"details": None
}
)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
```
--------------------------------------------------------------------------------
/claude_code/lib/tools/ai_tools.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/tools/ai_tools.py
"""AI-powered tools for generation and analysis."""
import os
import logging
import json
import base64
import requests
import tempfile
from typing import Dict, List, Optional, Any, Union
import time
from .base import tool, ToolRegistry
logger = logging.getLogger(__name__)
@tool(
name="GenerateImage",
description="Generate an image using AI based on a text prompt",
parameters={
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Text description of the image to generate"
},
"style": {
"type": "string",
"description": "Style of the image (realistic, cartoon, sketch, etc.)",
"enum": ["realistic", "cartoon", "sketch", "painting", "3d", "pixel-art", "abstract"],
"default": "realistic"
},
"size": {
"type": "string",
"description": "Size of the image",
"enum": ["small", "medium", "large"],
"default": "medium"
},
"save_path": {
"type": "string",
"description": "Absolute path where the image should be saved (optional)"
}
},
"required": ["prompt"]
},
needs_permission=True,
category="ai"
)
def generate_image(prompt: str, style: str = "realistic", size: str = "medium", save_path: Optional[str] = None) -> str:
"""Generate an image using AI based on a text prompt.
Args:
prompt: Text description of the image to generate
style: Style of the image
size: Size of the image
save_path: Path where to save the image
Returns:
Path to the generated image or error message
"""
logger.info(f"Generating image with prompt: {prompt} (style: {style}, size: {size})")
# Map size to actual dimensions
size_map = {
"small": "512x512",
"medium": "1024x1024",
"large": "1792x1024"
}
# Get API key
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return "Error: OpenAI API key not found. Please set the OPENAI_API_KEY environment variable."
# Prepare the prompt based on style
full_prompt = prompt
if style != "realistic":
style_prompts = {
"cartoon": f"A cartoon-style image of {prompt}",
"sketch": f"A pencil sketch of {prompt}",
"painting": f"An oil painting of {prompt}",
"3d": f"A 3D rendered image of {prompt}",
"pixel-art": f"A pixel art image of {prompt}",
"abstract": f"An abstract representation of {prompt}"
}
full_prompt = style_prompts.get(style, prompt)
try:
# Call OpenAI API to generate image
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": "dall-e-3",
"prompt": full_prompt,
"size": size_map.get(size, "1024x1024"),
"quality": "standard",
"n": 1
}
response = requests.post(
"https://api.openai.com/v1/images/generations",
headers=headers,
json=payload
)
if response.status_code != 200:
return f"Error: API request failed with status code {response.status_code}: {response.text}"
data = response.json()
if "data" not in data or not data["data"]:
return "Error: No image data in response"
image_url = data["data"][0]["url"]
# Download the image
image_response = requests.get(image_url)
if image_response.status_code != 200:
return f"Error: Failed to download image: {image_response.status_code}"
# Save the image
if save_path:
# Ensure the path is absolute
if not os.path.isabs(save_path):
return f"Error: Save path must be absolute: {save_path}"
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# Save the image
with open(save_path, "wb") as f:
f.write(image_response.content)
return f"Image generated and saved to: {save_path}"
else:
# Save to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
tmp.write(image_response.content)
return f"Image generated and saved to temporary file: {tmp.name}"
except Exception as e:
logger.exception(f"Error generating image: {str(e)}")
return f"Error generating image: {str(e)}"
@tool(
name="TextToSpeech",
description="Convert text to speech using AI",
parameters={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to convert to speech"
},
"voice": {
"type": "string",
"description": "Voice to use",
"enum": ["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
"default": "nova"
},
"save_path": {
"type": "string",
"description": "Absolute path where the audio file should be saved (optional)"
}
},
"required": ["text"]
},
needs_permission=True,
category="ai"
)
def text_to_speech(text: str, voice: str = "nova", save_path: Optional[str] = None) -> str:
"""Convert text to speech using AI.
Args:
text: Text to convert to speech
voice: Voice to use
save_path: Path where to save the audio file
Returns:
Path to the generated audio file or error message
"""
logger.info(f"Converting text to speech: {text[:50]}... (voice: {voice})")
# Get API key
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return "Error: OpenAI API key not found. Please set the OPENAI_API_KEY environment variable."
try:
# Call OpenAI API to generate speech
headers = {
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": "tts-1",
"input": text,
"voice": voice
}
response = requests.post(
"https://api.openai.com/v1/audio/speech",
headers=headers,
json=payload
)
if response.status_code != 200:
return f"Error: API request failed with status code {response.status_code}: {response.text}"
# Save the audio
if save_path:
# Ensure the path is absolute
if not os.path.isabs(save_path):
return f"Error: Save path must be absolute: {save_path}"
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# Save the audio
with open(save_path, "wb") as f:
f.write(response.content)
return f"Speech generated and saved to: {save_path}"
else:
# Save to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp:
tmp.write(response.content)
return f"Speech generated and saved to temporary file: {tmp.name}"
except Exception as e:
logger.exception(f"Error generating speech: {str(e)}")
return f"Error generating speech: {str(e)}"
def register_ai_tools(registry: ToolRegistry) -> None:
"""Register all AI tools with the registry.
Args:
registry: Tool registry to register with
"""
from .base import create_tools_from_functions
ai_tools = [
generate_image,
text_to_speech
]
create_tools_from_functions(registry, ai_tools)
```
--------------------------------------------------------------------------------
/claude_code/lib/tools/search_tools.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/tools/search_tools.py
"""Web search and information retrieval tools."""
import os
import logging
import json
import urllib.parse
import requests
from typing import Dict, List, Optional, Any
from .base import tool, ToolRegistry
logger = logging.getLogger(__name__)
@tool(
name="WebSearch",
description="Search the web for information using various search engines",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
},
"engine": {
"type": "string",
"description": "Search engine to use (google, bing, duckduckgo)",
"enum": ["google", "bing", "duckduckgo"]
},
"num_results": {
"type": "integer",
"description": "Number of results to return (max 10)"
}
},
"required": ["query"]
},
category="search"
)
def web_search(query: str, engine: str = "google", num_results: int = 5) -> str:
"""Search the web for information.
Args:
query: Search query
engine: Search engine to use
num_results: Number of results to return
Returns:
Search results as formatted text
"""
logger.info(f"Searching web for: {query} using {engine}")
# Validate inputs
if num_results > 10:
num_results = 10 # Cap at 10 results
# Get API key based on engine
api_key = None
if engine == "google":
api_key = os.getenv("GOOGLE_SEARCH_API_KEY")
cx = os.getenv("GOOGLE_SEARCH_CX")
if not api_key or not cx:
return "Error: Google Search API key or CX not configured. Please set GOOGLE_SEARCH_API_KEY and GOOGLE_SEARCH_CX environment variables."
elif engine == "bing":
api_key = os.getenv("BING_SEARCH_API_KEY")
if not api_key:
return "Error: Bing Search API key not configured. Please set BING_SEARCH_API_KEY environment variable."
# Perform search based on engine
try:
if engine == "google":
return _google_search(query, api_key, cx, num_results)
elif engine == "bing":
return _bing_search(query, api_key, num_results)
elif engine == "duckduckgo":
return _duckduckgo_search(query, num_results)
else:
return f"Error: Unsupported search engine: {engine}"
except Exception as e:
logger.exception(f"Error during web search: {str(e)}")
return f"Error performing search: {str(e)}"
def _google_search(query: str, api_key: str, cx: str, num_results: int) -> str:
"""Perform Google search using Custom Search API."""
url = "https://www.googleapis.com/customsearch/v1"
params = {
"key": api_key,
"cx": cx,
"q": query,
"num": min(num_results, 10)
}
response = requests.get(url, params=params)
if response.status_code != 200:
return f"Error: Google search failed with status code {response.status_code}: {response.text}"
data = response.json()
if "items" not in data:
return f"No results found for '{query}'"
results = []
for i, item in enumerate(data["items"], 1):
title = item.get("title", "No title")
link = item.get("link", "No link")
snippet = item.get("snippet", "No description").replace("\n", " ")
results.append(f"{i}. {title}\n URL: {link}\n {snippet}\n")
return f"Google Search Results for '{query}':\n\n" + "\n".join(results)
def _bing_search(query: str, api_key: str, num_results: int) -> str:
"""Perform Bing search using Bing Web Search API."""
url = "https://api.bing.microsoft.com/v7.0/search"
headers = {"Ocp-Apim-Subscription-Key": api_key}
params = {
"q": query,
"count": min(num_results, 10),
"responseFilter": "Webpages"
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
return f"Error: Bing search failed with status code {response.status_code}: {response.text}"
data = response.json()
if "webPages" not in data or "value" not in data["webPages"]:
return f"No results found for '{query}'"
results = []
for i, item in enumerate(data["webPages"]["value"], 1):
title = item.get("name", "No title")
link = item.get("url", "No link")
snippet = item.get("snippet", "No description").replace("\n", " ")
results.append(f"{i}. {title}\n URL: {link}\n {snippet}\n")
return f"Bing Search Results for '{query}':\n\n" + "\n".join(results)
def _duckduckgo_search(query: str, num_results: int) -> str:
"""Perform DuckDuckGo search using their API."""
# DuckDuckGo doesn't have an official API, but we can use their instant answer API
url = "https://api.duckduckgo.com/"
params = {
"q": query,
"format": "json",
"no_html": 1,
"skip_disambig": 1
}
response = requests.get(url, params=params)
if response.status_code != 200:
return f"Error: DuckDuckGo search failed with status code {response.status_code}: {response.text}"
data = response.json()
results = []
# Add the abstract if available
if data.get("Abstract"):
results.append(f"Summary: {data['Abstract']}\n")
# Add related topics
if data.get("RelatedTopics"):
topics = data["RelatedTopics"][:num_results]
for i, topic in enumerate(topics, 1):
if "Text" in topic:
text = topic.get("Text", "No description")
url = topic.get("FirstURL", "No URL")
results.append(f"{i}. {text}\n URL: {url}\n")
if not results:
return f"No results found for '{query}'"
return f"DuckDuckGo Search Results for '{query}':\n\n" + "\n".join(results)
@tool(
name="WikipediaSearch",
description="Search Wikipedia for information on a topic",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The topic to search for"
},
"language": {
"type": "string",
"description": "Language code (e.g., 'en', 'es', 'fr')",
"default": "en"
}
},
"required": ["query"]
},
category="search"
)
def wikipedia_search(query: str, language: str = "en") -> str:
"""Search Wikipedia for information on a topic.
Args:
query: Topic to search for
language: Language code
Returns:
Wikipedia article summary
"""
logger.info(f"Searching Wikipedia for: {query} in {language}")
try:
# Wikipedia API endpoint
url = f"https://{language}.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(query)}"
response = requests.get(url)
if response.status_code != 200:
# Try search API if direct lookup fails
search_url = f"https://{language}.wikipedia.org/w/api.php"
search_params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json"
}
search_response = requests.get(search_url, params=search_params)
if search_response.status_code != 200:
return f"Error: Wikipedia search failed with status code {search_response.status_code}"
search_data = search_response.json()
if "query" not in search_data or "search" not in search_data["query"] or not search_data["query"]["search"]:
return f"No Wikipedia articles found for '{query}'"
# Get the first search result
first_result = search_data["query"]["search"][0]
title = first_result["title"]
# Get the summary for the first result
url = f"https://{language}.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(title)}"
response = requests.get(url)
if response.status_code != 200:
return f"Error: Wikipedia article lookup failed with status code {response.status_code}"
data = response.json()
# Format the response
title = data.get("title", "Unknown")
extract = data.get("extract", "No information available")
url = data.get("content_urls", {}).get("desktop", {}).get("page", "")
result = f"Wikipedia: {title}\n\n{extract}\n"
if url:
result += f"\nSource: {url}"
return result
except Exception as e:
logger.exception(f"Error during Wikipedia search: {str(e)}")
return f"Error searching Wikipedia: {str(e)}"
def register_search_tools(registry: ToolRegistry) -> None:
"""Register all search tools with the registry.
Args:
registry: Tool registry to register with
"""
from .base import create_tools_from_functions
search_tools = [
web_search,
wikipedia_search
]
create_tools_from_functions(registry, search_tools)
```
--------------------------------------------------------------------------------
/templates/index.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>OpenAI Code Assistant MCP Server</title>
<link rel="stylesheet" href="/static/style.css">
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<div class="container">
<h1>OpenAI Code Assistant MCP Server</h1>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-label">Status</div>
<div class="stat-value" style="color: #27ae60;">{{ status }}</div>
</div>
<div class="stat-card">
<div class="stat-label">Uptime</div>
<div class="stat-value">{{ uptime }}</div>
</div>
<div class="stat-card">
<div class="stat-label">Requests Served</div>
<div class="stat-value">{{ request_count }}</div>
</div>
<div class="stat-card">
<div class="stat-label">Cache Hit Ratio</div>
<div class="stat-value">{{ cache_hit_ratio }}%</div>
</div>
</div>
<div class="card">
<div class="card-header">System Status</div>
<div class="card-body">
<canvas id="requestsChart" height="100"></canvas>
</div>
</div>
<h2>Available Models</h2>
<div class="card">
<div class="card-body">
<div class="template-grid">
{% for model in models %}
<div class="stat-card">
<div class="stat-label">Model</div>
<div class="stat-value" style="font-size: 20px;">{{ model }}</div>
</div>
{% endfor %}
</div>
</div>
</div>
<h2>Available Prompt Templates</h2>
<div class="template-grid">
{% for template in templates %}
<div class="card">
<div class="card-header">{{ template.id }}</div>
<div class="card-body">
<p><strong>Description:</strong> {{ template.description }}</p>
{% if template.parameters %}
<p><strong>Parameters:</strong></p>
<ul class="parameter-list">
{% for param in template.parameters %}
<li>{{ param }}</li>
{% endfor %}
</ul>
{% else %}
<p><em>No parameters required</em></p>
{% endif %}
<p><strong>Default Model:</strong> <span class="tag">{{ template.default_model }}</span></p>
<div style="margin-top: 15px;">
<button class="btn btn-primary" onclick="testTemplate('{{ template.id }}')">Test Template</button>
</div>
</div>
</div>
{% endfor %}
</div>
<h2>API Documentation</h2>
<div class="card">
<div class="card-body">
<p>Explore the API using the interactive documentation:</p>
<a href="/docs" class="btn btn-primary">Swagger UI</a>
<a href="/redoc" class="btn btn-secondary">ReDoc</a>
<a href="/metrics" class="btn btn-info">Prometheus Metrics</a>
</div>
</div>
<div class="footer">
<p>OpenAI Code Assistant MCP Server © 2025</p>
</div>
</div>
<!-- Template Test Modal -->
<div id="templateModal" style="display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0,0,0,0.5); z-index: 1000;">
<div style="background-color: white; margin: 10% auto; padding: 20px; width: 80%; max-width: 600px; border-radius: 8px;">
<h3 id="modalTitle">Test Template</h3>
<div id="modalContent">
<div id="parameterInputs"></div>
<div style="margin-top: 20px;">
<button class="btn btn-primary" onclick="submitTemplateTest()">Generate Context</button>
<button class="btn btn-secondary" onclick="closeModal()">Cancel</button>
</div>
</div>
<div id="resultContent" style="display: none; margin-top: 20px;">
<h4>Generated Context:</h4>
<pre id="contextResult" style="background-color: #f5f5f5; padding: 10px; border-radius: 4px; overflow-x: auto;"></pre>
<button class="btn btn-secondary" onclick="closeResults()">Close</button>
</div>
</div>
</div>
<script>
// Sample data for the chart - in a real implementation, this would come from the server
const ctx = document.getElementById('requestsChart').getContext('2d');
const requestsChart = new Chart(ctx, {
type: 'line',
data: {
labels: Array.from({length: 12}, (_, i) => `${i*5} min ago`).reverse(),
datasets: [{
label: 'Requests',
data: [12, 19, 3, 5, 2, 3, 20, 33, 23, 12, 5, 3],
borderColor: '#3498db',
tension: 0.1,
fill: false
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
// Template testing functionality
let currentTemplate = '';
function testTemplate(templateId) {
currentTemplate = templateId;
document.getElementById('modalTitle').textContent = `Test Template: ${templateId}`;
document.getElementById('parameterInputs').innerHTML = '';
document.getElementById('resultContent').style.display = 'none';
// Fetch template details
fetch(`/prompts/${templateId}`)
.then(response => response.json())
.then(template => {
const parametersDiv = document.getElementById('parameterInputs');
// Create input fields for each parameter
for (const [paramName, paramInfo] of Object.entries(template.parameters)) {
const paramDiv = document.createElement('div');
paramDiv.style.marginBottom = '15px';
const label = document.createElement('label');
label.textContent = `${paramName}: ${paramInfo.description || ''}`;
label.style.display = 'block';
label.style.marginBottom = '5px';
const input = document.createElement('input');
input.type = 'text';
input.id = `param-${paramName}`;
input.style.width = '100%';
input.style.padding = '8px';
input.style.borderRadius = '4px';
input.style.border = '1px solid #ddd';
paramDiv.appendChild(label);
paramDiv.appendChild(input);
parametersDiv.appendChild(paramDiv);
}
// Show the modal
document.getElementById('templateModal').style.display = 'block';
})
.catch(error => {
console.error('Error fetching template:', error);
alert('Error fetching template details');
});
}
function submitTemplateTest() {
// Collect parameter values
const parameters = {};
const inputs = document.querySelectorAll('[id^="param-"]');
inputs.forEach(input => {
const paramName = input.id.replace('param-', '');
parameters[paramName] = input.value;
});
// Call the context API
fetch('/context', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
prompt_id: currentTemplate,
parameters: parameters
})
})
.then(response => response.json())
.then(data => {
// Display the result
document.getElementById('contextResult').textContent = data.context;
document.getElementById('modalContent').style.display = 'none';
document.getElementById('resultContent').style.display = 'block';
})
.catch(error => {
console.error('Error generating context:', error);
alert('Error generating context');
});
}
function closeModal() {
document.getElementById('templateModal').style.display = 'none';
}
function closeResults() {
document.getElementById('resultContent').style.display = 'none';
document.getElementById('modalContent').style.display = 'block';
closeModal();
}
// Close modal when clicking outside
window.onclick = function(event) {
const modal = document.getElementById('templateModal');
if (event.target === modal) {
closeModal();
}
}
</script>
</body>
</html>
```
--------------------------------------------------------------------------------
/claude_code/lib/tools/base.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/tools/base.py
"""Base classes for tools."""
import abc
import inspect
import time
import logging
import os
import json
from typing import Dict, List, Any, Callable, Optional, Type, Union, Sequence
from dataclasses import dataclass
from pydantic import BaseModel, Field, validator
logger = logging.getLogger(__name__)
class ToolParameter(BaseModel):
"""Definition of a tool parameter."""
name: str
description: str
type: str
required: bool = False
class Config:
"""Pydantic config."""
extra = "forbid"
class ToolResult(BaseModel):
"""Result of a tool execution."""
tool_call_id: str
name: str
result: str
execution_time: float
token_usage: int = 0
status: str = "success"
error: Optional[str] = None
class Config:
"""Pydantic config."""
extra = "forbid"
class Routine(BaseModel):
"""Definition of a tool routine."""
name: str
description: str
steps: List[Dict[str, Any]]
usage_count: int = 0
created_at: float = Field(default_factory=time.time)
last_used_at: Optional[float] = None
class Config:
"""Pydantic config."""
extra = "allow"
class Tool(BaseModel):
"""Base class for all tools."""
name: str
description: str
parameters: Dict[str, Any]
function: Callable
needs_permission: bool = False
category: str = "general"
class Config:
"""Pydantic config."""
arbitrary_types_allowed = True
extra = "forbid"
def execute(self, tool_call: Dict[str, Any]) -> ToolResult:
"""Execute the tool with the given parameters.
Args:
tool_call: Dictionary containing tool call information
Returns:
ToolResult with execution result
"""
# Extract parameters
function_name = tool_call.get("function", {}).get("name", "")
arguments_str = tool_call.get("function", {}).get("arguments", "{}")
tool_call_id = tool_call.get("id", "unknown")
# Parse arguments
try:
arguments = json.loads(arguments_str)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse arguments: {e}")
return ToolResult(
tool_call_id=tool_call_id,
name=self.name,
result=f"Error: Failed to parse arguments: {e}",
execution_time=0,
status="error",
error=str(e)
)
# Execute function
start_time = time.time()
try:
result = self.function(**arguments)
execution_time = time.time() - start_time
# Convert result to string if it's not already
if not isinstance(result, str):
result = str(result)
return ToolResult(
tool_call_id=tool_call_id,
name=self.name,
result=result,
execution_time=execution_time,
status="success"
)
except Exception as e:
execution_time = time.time() - start_time
logger.exception(f"Error executing tool {self.name}: {e}")
return ToolResult(
tool_call_id=tool_call_id,
name=self.name,
result=f"Error: {str(e)}",
execution_time=execution_time,
status="error",
error=str(e)
)
class ToolRegistry:
"""Registry for tools."""
def __init__(self):
"""Initialize the tool registry."""
self.tools: Dict[str, Tool] = {}
self.routines: Dict[str, Routine] = {}
self._routine_file = os.path.join(os.path.expanduser("~"), ".claude_code", "routines.json")
def register_tool(self, tool: Tool) -> None:
"""Register a tool.
Args:
tool: Tool instance to register
Raises:
ValueError: If a tool with the same name is already registered
"""
if tool.name in self.tools:
raise ValueError(f"Tool {tool.name} is already registered")
self.tools[tool.name] = tool
logger.debug(f"Registered tool: {tool.name}")
def register_routine(self, routine: Routine) -> None:
"""Register a routine.
Args:
routine: Routine to register
Raises:
ValueError: If a routine with the same name is already registered
"""
if routine.name in self.routines:
raise ValueError(f"Routine {routine.name} is already registered")
self.routines[routine.name] = routine
logger.debug(f"Registered routine: {routine.name}")
self._save_routines()
def register_routine_from_dict(self, routine_dict: Dict[str, Any]) -> None:
"""Register a routine from a dictionary.
Args:
routine_dict: Dictionary with routine data
Raises:
ValueError: If a routine with the same name is already registered
"""
routine = Routine(**routine_dict)
self.register_routine(routine)
def get_tool(self, name: str) -> Optional[Tool]:
"""Get a tool by name.
Args:
name: Name of the tool
Returns:
Tool instance or None if not found
"""
return self.tools.get(name)
def get_routine(self, name: str) -> Optional[Routine]:
"""Get a routine by name.
Args:
name: Name of the routine
Returns:
Routine or None if not found
"""
return self.routines.get(name)
def get_all_tools(self) -> List[Tool]:
"""Get all registered tools.
Returns:
List of all registered tools
"""
return list(self.tools.values())
def get_all_routines(self) -> List[Routine]:
"""Get all registered routines.
Returns:
List of all registered routines
"""
return list(self.routines.values())
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""Get OpenAI-compatible schemas for all tools.
Returns:
List of tool schemas for OpenAI function calling
"""
schemas = []
for tool in self.tools.values():
schemas.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
return schemas
def record_routine_usage(self, name: str) -> None:
"""Record usage of a routine.
Args:
name: Name of the routine
"""
if name in self.routines:
routine = self.routines[name]
routine.usage_count += 1
routine.last_used_at = time.time()
self._save_routines()
def _save_routines(self) -> None:
"""Save routines to file."""
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self._routine_file), exist_ok=True)
# Convert routines to dict for serialization
routines_dict = {name: routine.dict() for name, routine in self.routines.items()}
# Save to file
with open(self._routine_file, 'w') as f:
json.dump(routines_dict, f, indent=2)
logger.debug(f"Saved {len(self.routines)} routines to {self._routine_file}")
except Exception as e:
logger.error(f"Error saving routines: {e}")
def load_routines(self) -> None:
"""Load routines from file."""
if not os.path.exists(self._routine_file):
logger.debug(f"Routines file not found: {self._routine_file}")
return
try:
with open(self._routine_file, 'r') as f:
routines_dict = json.load(f)
# Clear existing routines
self.routines.clear()
# Register each routine
for name, routine_data in routines_dict.items():
self.routines[name] = Routine(**routine_data)
logger.debug(f"Loaded {len(self.routines)} routines from {self._routine_file}")
except Exception as e:
logger.error(f"Error loading routines: {e}")
@dataclass
class RoutineStep:
"""A step in a routine."""
tool_name: str
args: Dict[str, Any]
condition: Optional[Dict[str, Any]] = None
store_result: bool = False
result_var: Optional[str] = None
@dataclass
class RoutineDefinition:
"""Definition of a routine."""
name: str
description: str
steps: List[RoutineStep]
def tool(name: str, description: str, parameters: Dict[str, Any],
needs_permission: bool = False, category: str = "general"):
"""Decorator to register a function as a tool.
Args:
name: Name of the tool
description: Description of the tool
parameters: Parameter schema for the tool
needs_permission: Whether the tool needs user permission
category: Category of the tool
Returns:
Decorator function
"""
def decorator(func: Callable) -> Callable:
# Set tool metadata on the function
func._tool_info = {
"name": name,
"description": description,
"parameters": parameters,
"needs_permission": needs_permission,
"category": category
}
return func
return decorator
def create_tools_from_functions(registry: ToolRegistry, functions: List[Callable]) -> None:
"""Create and register tools from functions with _tool_info.
Args:
registry: Tool registry to register tools with
functions: List of functions to create tools from
"""
for func in functions:
if hasattr(func, "_tool_info"):
info = func._tool_info
tool = Tool(
name=info["name"],
description=info["description"],
parameters=info["parameters"],
function=func,
needs_permission=info["needs_permission"],
category=info["category"]
)
registry.register_tool(tool)
```
--------------------------------------------------------------------------------
/mcp_modal_adapter.py:
--------------------------------------------------------------------------------
```python
import os
import json
import logging
import asyncio
import httpx
from typing import Dict, List, Optional, Any, AsyncIterator
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create FastAPI app
app = FastAPI(
title="MCP Server Modal Adapter",
description="Model Context Protocol server adapter for Modal OpenAI API",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuration
MODAL_API_URL = os.environ.get("MODAL_API_URL", "https://your-modal-app-url.modal.run")
MODAL_API_KEY = os.environ.get("MODAL_API_KEY", "sk-modal-llm-api-key") # Default key from modal_mcp_server.py
DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "phi-4")
# MCP Protocol Models
class MCPHealthResponse(BaseModel):
status: str = "healthy"
version: str = "1.0.0"
class MCPPromptTemplate(BaseModel):
id: str
name: str
description: str
template: str
parameters: Dict[str, Any] = Field(default_factory=dict)
class MCPPromptLibraryResponse(BaseModel):
prompts: List[MCPPromptTemplate]
class MCPContextResponse(BaseModel):
context_id: str
content: str
model: str
prompt_id: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
# Default prompt template
DEFAULT_TEMPLATE = MCPPromptTemplate(
id="default",
name="Default Template",
description="Default prompt template for general use",
template="{prompt}",
parameters={"prompt": {"type": "string", "description": "The prompt to send to the model"}}
)
# In-memory prompt library
prompt_library = {
"default": DEFAULT_TEMPLATE.dict()
}
# Health check endpoint
@app.get("/health", response_model=MCPHealthResponse)
async def health_check():
"""Health check endpoint"""
return MCPHealthResponse()
# List prompts endpoint
@app.get("/prompts", response_model=MCPPromptLibraryResponse)
async def list_prompts():
"""List available prompt templates"""
return MCPPromptLibraryResponse(prompts=[MCPPromptTemplate(**prompt) for prompt in prompt_library.values()])
# Get prompt endpoint
@app.get("/prompts/{prompt_id}", response_model=MCPPromptTemplate)
async def get_prompt(prompt_id: str):
"""Get a specific prompt template"""
if prompt_id not in prompt_library:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Prompt template with ID {prompt_id} not found"
)
return MCPPromptTemplate(**prompt_library[prompt_id])
# Get context endpoint
@app.post("/context/{prompt_id}")
async def get_context(prompt_id: str, request: Request):
"""Get context from a prompt template"""
try:
# Get request data
data = await request.json()
parameters = data.get("parameters", {})
model = data.get("model", DEFAULT_MODEL)
stream = data.get("stream", False)
# Get prompt template
if prompt_id not in prompt_library:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Prompt template with ID {prompt_id} not found"
)
prompt_template = prompt_library[prompt_id]
# Process template
template = prompt_template["template"]
prompt_text = template.format(**parameters)
# Create OpenAI-compatible request
openai_request = {
"model": model,
"messages": [{"role": "user", "content": prompt_text}],
"temperature": parameters.get("temperature", 0.7),
"max_tokens": parameters.get("max_tokens", 1024),
"stream": stream
}
# If streaming is requested, return a streaming response
if stream:
return StreamingResponse(
stream_from_modal(openai_request),
media_type="text/event-stream"
)
# Otherwise, make a regular request to Modal API
async with httpx.AsyncClient(timeout=60.0) as client:
headers = {
"Authorization": f"Bearer {MODAL_API_KEY}",
"Content-Type": "application/json"
}
response = await client.post(
f"{MODAL_API_URL}/v1/chat/completions",
json=openai_request,
headers=headers
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Error from Modal API: {response.text}"
)
result = response.json()
# Extract content from OpenAI response
content = ""
if "choices" in result and len(result["choices"]) > 0:
if "message" in result["choices"][0] and "content" in result["choices"][0]["message"]:
content = result["choices"][0]["message"]["content"]
# Create MCP response
mcp_response = MCPContextResponse(
context_id=result.get("id", ""),
content=content,
model=model,
prompt_id=prompt_id,
parameters=parameters
)
return mcp_response.dict()
except Exception as e:
logging.error(f"Error in get_context: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error generating context: {str(e)}"
)
async def stream_from_modal(openai_request: Dict[str, Any]) -> AsyncIterator[str]:
"""Stream response from Modal API"""
try:
async with httpx.AsyncClient(timeout=300.0) as client:
headers = {
"Authorization": f"Bearer {MODAL_API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
async with client.stream(
"POST",
f"{MODAL_API_URL}/v1/chat/completions",
json=openai_request,
headers=headers
) as response:
if response.status_code != 200:
error_detail = await response.aread()
yield f"data: {json.dumps({'error': f'Error from Modal API: {error_detail.decode()}'})}\n\n"
yield "data: [DONE]\n\n"
return
# Process streaming response
buffer = ""
content_buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
# Process complete SSE messages
while "\n\n" in buffer:
message, buffer = buffer.split("\n\n", 1)
if message.startswith("data: "):
data = message[6:] # Remove "data: " prefix
if data == "[DONE]":
# End of stream, send final MCP response
final_response = MCPContextResponse(
context_id="stream-" + str(hash(content_buffer))[:8],
content=content_buffer,
model=openai_request.get("model", DEFAULT_MODEL),
prompt_id="default",
parameters={}
)
yield f"data: {json.dumps(final_response.dict())}\n\n"
yield "data: [DONE]\n\n"
return
try:
# Parse JSON data
chunk_data = json.loads(data)
# Extract content from chunk
if 'choices' in chunk_data and len(chunk_data['choices']) > 0:
if 'delta' in chunk_data['choices'][0] and 'content' in chunk_data['choices'][0]['delta']:
content = chunk_data['choices'][0]['delta']['content']
content_buffer += content
# Create partial MCP response
partial_response = {
"context_id": "stream-" + str(hash(content_buffer))[:8],
"content": content,
"model": openai_request.get("model", DEFAULT_MODEL),
"is_partial": True
}
yield f"data: {json.dumps(partial_response)}\n\n"
except json.JSONDecodeError:
logging.error(f"Invalid JSON in stream: {data}")
except Exception as e:
logging.error(f"Error in stream_from_modal: {str(e)}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
yield "data: [DONE]\n\n"
# Add a custom prompt template
@app.post("/prompts")
async def add_prompt(prompt: MCPPromptTemplate):
"""Add a new prompt template"""
prompt_library[prompt.id] = prompt.dict()
return {"status": "success", "message": f"Added prompt template with ID {prompt.id}"}
# Delete a prompt template
@app.delete("/prompts/{prompt_id}")
async def delete_prompt(prompt_id: str):
"""Delete a prompt template"""
if prompt_id == "default":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete the default prompt template"
)
if prompt_id not in prompt_library:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Prompt template with ID {prompt_id} not found"
)
del prompt_library[prompt_id]
return {"status": "success", "message": f"Deleted prompt template with ID {prompt_id}"}
# Main entry point
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
--------------------------------------------------------------------------------
/claude_code/lib/providers/openai.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/providers/openai.py
"""OpenAI provider implementation."""
import os
from typing import Dict, List, Generator, Optional, Any, Union
import time
import logging
import json
import tiktoken
from openai import OpenAI, RateLimitError, APIError
from .base import BaseProvider
logger = logging.getLogger(__name__)
# Model information including context window and pricing
MODEL_INFO = {
"gpt-3.5-turbo": {
"context_window": 16385,
"input_cost_per_1k": 0.0015,
"output_cost_per_1k": 0.002,
"capabilities": ["function_calling", "json_mode"],
},
"gpt-4o": {
"context_window": 128000,
"input_cost_per_1k": 0.005,
"output_cost_per_1k": 0.015,
"capabilities": ["function_calling", "json_mode", "vision"],
},
"gpt-4-turbo": {
"context_window": 128000,
"input_cost_per_1k": 0.01,
"output_cost_per_1k": 0.03,
"capabilities": ["function_calling", "json_mode", "vision"],
},
"gpt-4": {
"context_window": 8192,
"input_cost_per_1k": 0.03,
"output_cost_per_1k": 0.06,
"capabilities": ["function_calling", "json_mode"],
},
}
DEFAULT_MODEL = "gpt-4o"
class OpenAIProvider(BaseProvider):
"""OpenAI API provider implementation."""
def __init__(self, api_key: Optional[str] = None, model: Optional[str] = None):
"""Initialize the OpenAI provider.
Args:
api_key: OpenAI API key. If None, will use OPENAI_API_KEY environment variable
model: Model to use. If None, will use DEFAULT_MODEL
"""
self._api_key = api_key or os.environ.get("OPENAI_API_KEY")
if not self._api_key:
raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass api_key.")
self._client = OpenAI(api_key=self._api_key)
self._model = model or os.environ.get("OPENAI_MODEL", DEFAULT_MODEL)
if self._model not in MODEL_INFO:
logger.warning(f"Unknown model: {self._model}. Using {DEFAULT_MODEL} instead.")
self._model = DEFAULT_MODEL
# Cache for tokenizers
self._tokenizers = {}
@property
def name(self) -> str:
return "OpenAI"
@property
def available_models(self) -> List[str]:
return list(MODEL_INFO.keys())
@property
def current_model(self) -> str:
return self._model
def set_model(self, model_name: str) -> None:
if model_name not in MODEL_INFO:
raise ValueError(f"Unknown model: {model_name}. Available models: {', '.join(self.available_models)}")
self._model = model_name
def generate_completion(self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
temperature: float = 0.0,
stream: bool = True) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""Generate a completion from OpenAI.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
tools: Optional list of tool dictionaries
temperature: Model temperature (0-1)
stream: Whether to stream the response
Returns:
If stream=True, returns a generator of response chunks
If stream=False, returns the complete response
"""
try:
# Convert tools to OpenAI format if provided
api_tools = None
if tools:
api_tools = []
for tool in tools:
api_tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
})
# Make the API call
response = self._client.chat.completions.create(
model=self._model,
messages=messages,
tools=api_tools,
temperature=temperature,
stream=stream
)
# Handle streaming and non-streaming responses
if stream:
return self._process_streaming_response(response)
else:
return {
"content": response.choices[0].message.content,
"tool_calls": response.choices[0].message.tool_calls,
"finish_reason": response.choices[0].finish_reason,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
except RateLimitError as e:
logger.error(f"Rate limit exceeded: {str(e)}")
raise
except APIError as e:
logger.error(f"API error: {str(e)}")
raise
except Exception as e:
logger.error(f"Error generating completion: {str(e)}")
raise
def _process_streaming_response(self, response):
"""Process a streaming response from OpenAI."""
current_tool_calls = []
tool_call_chunks = {}
for chunk in response:
# Create a result chunk to yield
result_chunk = {
"content": None,
"tool_calls": None,
"delta": True
}
# Process content
delta = chunk.choices[0].delta
if delta.content:
result_chunk["content"] = delta.content
# Process tool calls
if delta.tool_calls:
result_chunk["tool_calls"] = []
for tool_call_delta in delta.tool_calls:
# Initialize tool call in chunks dictionary if new
idx = tool_call_delta.index
if idx not in tool_call_chunks:
tool_call_chunks[idx] = {
"id": "",
"function": {"name": "", "arguments": ""}
}
# Update tool call data
if tool_call_delta.id:
tool_call_chunks[idx]["id"] = tool_call_delta.id
if tool_call_delta.function:
if tool_call_delta.function.name:
tool_call_chunks[idx]["function"]["name"] = tool_call_delta.function.name
if tool_call_delta.function.arguments:
tool_call_chunks[idx]["function"]["arguments"] += tool_call_delta.function.arguments
# Add current state to result
result_chunk["tool_calls"].append(tool_call_chunks[idx])
# Yield the chunk
yield result_chunk
# Final yield with complete tool calls
if tool_call_chunks:
complete_calls = list(tool_call_chunks.values())
yield {
"content": None,
"tool_calls": complete_calls,
"delta": False,
"finish_reason": "tool_calls"
}
def _get_tokenizer(self, model: str = None) -> Any:
"""Get a tokenizer for the specified model."""
model = model or self._model
if model not in self._tokenizers:
try:
encoder_name = "cl100k_base" if model.startswith("gpt-4") or model.startswith("gpt-3.5") else "p50k_base"
self._tokenizers[model] = tiktoken.get_encoding(encoder_name)
except Exception as e:
logger.error(f"Error loading tokenizer for {model}: {str(e)}")
raise
return self._tokenizers[model]
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
tokenizer = self._get_tokenizer()
return len(tokenizer.encode(text))
def count_message_tokens(self, messages: List[Dict[str, Any]]) -> Dict[str, int]:
"""Count tokens in a message list."""
# Simple approximation - in production, would need to match OpenAI's tokenization exactly
prompt_tokens = 0
for message in messages:
# Add tokens for message role
prompt_tokens += 4 # ~4 tokens for role
# Count content tokens
if "content" in message and message["content"]:
prompt_tokens += self.count_tokens(message["content"])
# Count tokens from any tool calls or tool results
if "tool_calls" in message and message["tool_calls"]:
for tool_call in message["tool_calls"]:
prompt_tokens += 4 # ~4 tokens for tool call overhead
prompt_tokens += self.count_tokens(tool_call.get("function", {}).get("name", ""))
prompt_tokens += self.count_tokens(tool_call.get("function", {}).get("arguments", ""))
if "name" in message and message["name"]:
prompt_tokens += self.count_tokens(message["name"])
if "tool_call_id" in message and message["tool_call_id"]:
prompt_tokens += 10 # ~10 tokens for tool_call_id and overhead
# Add ~3 tokens for message formatting
prompt_tokens += 3
return {
"input": prompt_tokens,
"output": 0 # We don't know output tokens yet
}
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the current model."""
return MODEL_INFO[self._model]
@property
def cost_per_1k_tokens(self) -> Dict[str, float]:
"""Get cost per 1K tokens for input and output."""
info = self.get_model_info()
return {
"input": info["input_cost_per_1k"],
"output": info["output_cost_per_1k"]
}
def validate_api_key(self) -> bool:
"""Validate the API key."""
try:
# Make a minimal API call to test the key
self._client.models.list(limit=1)
return True
except Exception as e:
logger.error(f"API key validation failed: {str(e)}")
return False
def get_rate_limit_info(self) -> Dict[str, Any]:
"""Get rate limit information."""
# OpenAI doesn't provide direct rate limit info via API
# This is a placeholder implementation
return {
"requests_per_minute": 3500,
"tokens_per_minute": 90000,
"reset_time": None
}
```
--------------------------------------------------------------------------------
/claude_code/lib/monitoring/server_metrics.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Module for tracking MCP server metrics."""
import os
import time
import json
import logging
import threading
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime, timedelta
from collections import deque, Counter
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ServerMetrics:
"""Tracks MCP server metrics for visualization."""
def __init__(self, history_size: int = 100, save_interval: int = 60):
"""Initialize the server metrics tracker.
Args:
history_size: Number of data points to keep in history
save_interval: How often to save metrics to disk (in seconds)
"""
self._start_time = time.time()
self._lock = threading.RLock()
self._history_size = history_size
self._save_interval = save_interval
self._save_path = os.path.expanduser("~/.config/claude_code/metrics.json")
# Ensure directory exists
os.makedirs(os.path.dirname(self._save_path), exist_ok=True)
# Metrics
self._request_history = deque(maxlen=history_size)
self._tool_calls = Counter()
self._resource_calls = Counter()
self._connections = 0
self._active_connections = set()
self._errors = Counter()
# Time series data for charts
self._time_series = {
"tool_calls": deque([(time.time(), 0)] * 10, maxlen=10),
"resource_calls": deque([(time.time(), 0)] * 10, maxlen=10)
}
# Start auto-save thread
self._running = True
self._save_thread = threading.Thread(target=self._auto_save, daemon=True)
self._save_thread.start()
# Load previous metrics if available
self._load_metrics()
def _auto_save(self):
"""Periodically save metrics to disk."""
while self._running:
time.sleep(self._save_interval)
try:
self.save_metrics()
except Exception as e:
logger.error(f"Error saving metrics: {e}")
def _load_metrics(self):
"""Load metrics from disk if available."""
try:
if os.path.exists(self._save_path):
with open(self._save_path, 'r', encoding='utf-8') as f:
data = json.load(f)
with self._lock:
# Load previous tool and resource calls
self._tool_calls = Counter(data.get("tool_calls", {}))
self._resource_calls = Counter(data.get("resource_calls", {}))
# Don't load time-sensitive data like connections and history
logger.info(f"Loaded metrics from {self._save_path}")
except Exception as e:
logger.error(f"Error loading metrics: {e}")
def save_metrics(self):
"""Save metrics to disk."""
try:
with self._lock:
data = {
"tool_calls": dict(self._tool_calls),
"resource_calls": dict(self._resource_calls),
"total_connections": self._connections,
"last_saved": time.time()
}
with open(self._save_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
logger.debug(f"Metrics saved to {self._save_path}")
except Exception as e:
logger.error(f"Error saving metrics: {e}")
def log_tool_call(self, tool_name: str, success: bool = True):
"""Log a tool call.
Args:
tool_name: The name of the tool that was called
success: Whether the call was successful
"""
with self._lock:
self._tool_calls[tool_name] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "tool",
"name": tool_name,
"success": success,
"timestamp": timestamp
})
# Update time series
current_time = time.time()
last_time, count = self._time_series["tool_calls"][-1]
if current_time - last_time < 60: # Less than a minute
self._time_series["tool_calls"][-1] = (last_time, count + 1)
else:
self._time_series["tool_calls"].append((current_time, 1))
def log_resource_request(self, resource_uri: str, success: bool = True):
"""Log a resource request.
Args:
resource_uri: The URI of the requested resource
success: Whether the request was successful
"""
with self._lock:
self._resource_calls[resource_uri] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "resource",
"uri": resource_uri,
"success": success,
"timestamp": timestamp
})
# Update time series
current_time = time.time()
last_time, count = self._time_series["resource_calls"][-1]
if current_time - last_time < 60: # Less than a minute
self._time_series["resource_calls"][-1] = (last_time, count + 1)
else:
self._time_series["resource_calls"].append((current_time, 1))
def log_connection(self, client_id: str, connected: bool = True):
"""Log a client connection or disconnection.
Args:
client_id: Client identifier
connected: True for connection, False for disconnection
"""
with self._lock:
if connected:
self._connections += 1
self._active_connections.add(client_id)
else:
self._active_connections.discard(client_id)
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "connection",
"client_id": client_id,
"action": "connect" if connected else "disconnect",
"timestamp": timestamp
})
def log_error(self, error_type: str, message: str):
"""Log an error.
Args:
error_type: Type of error
message: Error message
"""
with self._lock:
self._errors[error_type] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "error",
"error_type": error_type,
"message": message,
"timestamp": timestamp
})
def get_uptime(self) -> str:
"""Get the server uptime as a human-readable string.
Returns:
Uptime string (e.g., "2 hours 15 minutes")
"""
uptime_seconds = time.time() - self._start_time
uptime = timedelta(seconds=int(uptime_seconds))
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days > 0:
parts.append(f"{days} {'day' if days == 1 else 'days'}")
if hours > 0 or days > 0:
parts.append(f"{hours} {'hour' if hours == 1 else 'hours'}")
if minutes > 0 or hours > 0 or days > 0:
parts.append(f"{minutes} {'minute' if minutes == 1 else 'minutes'}")
if not parts:
return f"{seconds} seconds"
return " ".join(parts)
def get_active_connections_count(self) -> int:
"""Get the number of active connections.
Returns:
Number of active connections
"""
with self._lock:
return len(self._active_connections)
def get_total_connections(self) -> int:
"""Get the total number of connections since startup.
Returns:
Total connection count
"""
with self._lock:
return self._connections
def get_recent_activity(self, count: int = 10) -> List[Dict[str, Any]]:
"""Get recent activity.
Args:
count: Number of recent events to return
Returns:
List of recent activity events
"""
with self._lock:
recent = list(self._request_history)[-count:]
# Format timestamps
for event in recent:
ts = event["timestamp"]
event["formatted_time"] = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
return recent
def get_tool_usage_stats(self) -> Dict[str, int]:
"""Get statistics on tool usage.
Returns:
Dictionary mapping tool names to call counts
"""
with self._lock:
return dict(self._tool_calls)
def get_resource_usage_stats(self) -> Dict[str, int]:
"""Get statistics on resource usage.
Returns:
Dictionary mapping resource URIs to request counts
"""
with self._lock:
return dict(self._resource_calls)
def get_error_stats(self) -> Dict[str, int]:
"""Get statistics on errors.
Returns:
Dictionary mapping error types to counts
"""
with self._lock:
return dict(self._errors)
def get_time_series_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get time series data for charts.
Returns:
Dictionary with time series data
"""
with self._lock:
result = {}
# Convert deques to lists of dictionaries
for series_name, series_data in self._time_series.items():
result[series_name] = [
{"timestamp": ts, "value": val, "formatted_time": datetime.fromtimestamp(ts).strftime("%H:%M:%S")}
for ts, val in series_data
]
return result
def get_all_metrics(self) -> Dict[str, Any]:
"""Get all metrics data.
Returns:
Dictionary with all metrics
"""
return {
"uptime": self.get_uptime(),
"active_connections": self.get_active_connections_count(),
"total_connections": self.get_total_connections(),
"recent_activity": self.get_recent_activity(20),
"tool_usage": self.get_tool_usage_stats(),
"resource_usage": self.get_resource_usage_stats(),
"errors": self.get_error_stats(),
"time_series": self.get_time_series_data()
}
def reset_stats(self):
"""Reset all statistics but keep the start time."""
with self._lock:
self._request_history.clear()
self._tool_calls.clear()
self._resource_calls.clear()
self._connections = 0
self._active_connections.clear()
self._errors.clear()
# Reset time series
current_time = time.time()
self._time_series = {
"tool_calls": deque([(current_time - (600 - i * 60), 0) for i in range(10)], maxlen=10),
"resource_calls": deque([(current_time - (600 - i * 60), 0) for i in range(10)], maxlen=10)
}
def shutdown(self):
"""Shutdown the metrics tracker and save data."""
self._running = False
self.save_metrics()
# Singleton instance
_metrics_instance = None
def get_metrics() -> ServerMetrics:
"""Get or create the singleton metrics instance.
Returns:
ServerMetrics instance
"""
global _metrics_instance
if _metrics_instance is None:
_metrics_instance = ServerMetrics()
return _metrics_instance
```
--------------------------------------------------------------------------------
/claude_code/lib/monitoring/cost_tracker.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/monitoring/cost_tracker.py
"""Cost tracking and management."""
import logging
import json
import os
import time
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.box import ROUNDED
logger = logging.getLogger(__name__)
class CostTracker:
"""Tracks token usage and calculates costs for LLM interactions."""
def __init__(self, budget_limit: Optional[float] = None, history_file: Optional[str] = None):
"""Initialize the cost tracker.
Args:
budget_limit: Optional budget limit in dollars
history_file: Optional path to a file to store history
"""
self.budget_limit = budget_limit
self.history_file = history_file
# Initialize session counters
self.session_start = datetime.now()
self.session_tokens_input = 0
self.session_tokens_output = 0
self.session_cost = 0.0
# Request history
self.requests: List[Dict[str, Any]] = []
# Load history from file if provided
self._load_history()
def add_request(self,
provider: str,
model: str,
tokens_input: int,
tokens_output: int,
input_cost_per_1k: float,
output_cost_per_1k: float,
request_id: Optional[str] = None) -> Dict[str, Any]:
"""Add a request to the tracker.
Args:
provider: Provider name (e.g., "openai", "anthropic")
model: Model name (e.g., "gpt-4o", "claude-3-opus")
tokens_input: Number of input tokens
tokens_output: Number of output tokens
input_cost_per_1k: Cost per 1,000 input tokens
output_cost_per_1k: Cost per 1,000 output tokens
request_id: Optional request ID
Returns:
Dictionary with request information including costs
"""
# Calculate costs
input_cost = (tokens_input / 1000) * input_cost_per_1k
output_cost = (tokens_output / 1000) * output_cost_per_1k
total_cost = input_cost + output_cost
# Update session counters
self.session_tokens_input += tokens_input
self.session_tokens_output += tokens_output
self.session_cost += total_cost
# Create request record
request = {
"id": request_id or f"{int(time.time())}-{len(self.requests)}",
"timestamp": datetime.now().isoformat(),
"provider": provider,
"model": model,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": total_cost
}
# Add to history
self.requests.append(request)
# Save history
self._save_history()
# Log the request
logger.info(
f"Request: {provider}/{model}, " +
f"Tokens: {tokens_input} in / {tokens_output} out, " +
f"Cost: ${total_cost:.4f}"
)
return request
def get_session_stats(self) -> Dict[str, Any]:
"""Get statistics for the current session.
Returns:
Dictionary with session statistics
"""
return {
"start_time": self.session_start.isoformat(),
"duration_seconds": (datetime.now() - self.session_start).total_seconds(),
"tokens_input": self.session_tokens_input,
"tokens_output": self.session_tokens_output,
"total_tokens": self.session_tokens_input + self.session_tokens_output,
"total_cost": self.session_cost,
"request_count": len(self.requests),
"budget_limit": self.budget_limit,
"budget_remaining": None if self.budget_limit is None else self.budget_limit - self.session_cost
}
def check_budget(self) -> Dict[str, Any]:
"""Check if budget limit is approached or exceeded.
Returns:
Dictionary with budget status information
"""
if self.budget_limit is None:
return {
"has_budget": False,
"status": "no_limit",
"message": "No budget limit set"
}
remaining = self.budget_limit - self.session_cost
percentage_used = (self.session_cost / self.budget_limit) * 100
if remaining <= 0:
status = "exceeded"
message = f"Budget exceeded by ${abs(remaining):.2f}"
elif percentage_used > 90:
status = "critical"
message = f"Budget critical: ${remaining:.2f} remaining ({percentage_used:.1f}% used)"
elif percentage_used > 75:
status = "warning"
message = f"Budget warning: ${remaining:.2f} remaining ({percentage_used:.1f}% used)"
else:
status = "ok"
message = f"Budget OK: ${remaining:.2f} remaining ({percentage_used:.1f}% used)"
return {
"has_budget": True,
"status": status,
"message": message,
"limit": self.budget_limit,
"used": self.session_cost,
"remaining": remaining,
"percentage_used": percentage_used
}
def get_usage_by_model(self) -> Dict[str, Dict[str, Any]]:
"""Get usage statistics grouped by model.
Returns:
Dictionary mapping "provider/model" to usage statistics
"""
usage: Dict[str, Dict[str, Any]] = {}
for request in self.requests:
key = f"{request['provider']}/{request['model']}"
if key not in usage:
usage[key] = {
"provider": request["provider"],
"model": request["model"],
"request_count": 0,
"tokens_input": 0,
"tokens_output": 0,
"total_cost": 0.0
}
usage[key]["request_count"] += 1
usage[key]["tokens_input"] += request["tokens_input"]
usage[key]["tokens_output"] += request["tokens_output"]
usage[key]["total_cost"] += request["total_cost"]
return usage
def get_cost_summary_panel(self) -> Panel:
"""Create a Rich panel with cost summary information.
Returns:
Rich Panel object
"""
# Get stats and budget info
stats = self.get_session_stats()
budget = self.check_budget()
# Create a table for the summary
table = Table(show_header=False, box=ROUNDED, expand=True)
table.add_column("Item", style="bold")
table.add_column("Value")
# Add rows with token usage
table.add_row(
"Tokens (Input)",
f"{stats['tokens_input']:,}"
)
table.add_row(
"Tokens (Output)",
f"{stats['tokens_output']:,}"
)
table.add_row(
"Total Cost",
f"${stats['total_cost']:.4f}"
)
# Add budget information if available
if budget["has_budget"]:
# Create styled text for budget status
status_text = Text(budget["message"])
if budget["status"] == "exceeded":
status_text.stylize("bold red")
elif budget["status"] == "critical":
status_text.stylize("bold yellow")
elif budget["status"] == "warning":
status_text.stylize("yellow")
else:
status_text.stylize("green")
table.add_row("Budget", status_text)
# Create the panel
title = "[bold]Cost & Usage Summary[/bold]"
return Panel(table, title=title, border_style="yellow")
def reset_session(self) -> None:
"""Reset the session counters but keep request history."""
self.session_start = datetime.now()
self.session_tokens_input = 0
self.session_tokens_output = 0
self.session_cost = 0.0
logger.info("Cost tracking session reset")
def _save_history(self) -> None:
"""Save request history to file if configured."""
if not self.history_file:
return
try:
# Ensure directory exists
directory = os.path.dirname(self.history_file)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
# Save history
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump({
"session_start": self.session_start.isoformat(),
"budget_limit": self.budget_limit,
"requests": self.requests,
"updated_at": datetime.now().isoformat()
}, f, indent=2)
except Exception as e:
logger.error(f"Failed to save cost history: {e}")
def _load_history(self) -> None:
"""Load request history from file if available."""
if not self.history_file or not os.path.exists(self.history_file):
return
try:
with open(self.history_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load session data
self.session_start = datetime.fromisoformat(data.get('session_start', self.session_start.isoformat()))
self.budget_limit = data.get('budget_limit', self.budget_limit)
# Load requests
self.requests = data.get('requests', [])
# Recalculate session totals
self.session_tokens_input = sum(r.get('tokens_input', 0) for r in self.requests)
self.session_tokens_output = sum(r.get('tokens_output', 0) for r in self.requests)
self.session_cost = sum(r.get('total_cost', 0) for r in self.requests)
logger.info(f"Loaded cost history with {len(self.requests)} requests")
except Exception as e:
logger.error(f"Failed to load cost history: {e}")
def generate_usage_report(self, format: str = "text") -> str:
"""Generate a usage report.
Args:
format: Output format ("text", "json", "markdown")
Returns:
Formatted usage report
"""
stats = self.get_session_stats()
model_usage = self.get_usage_by_model()
if format == "json":
return json.dumps({
"session": stats,
"models": model_usage
}, indent=2)
# Text or markdown format
lines = []
lines.append("# Usage Report" if format == "markdown" else "USAGE REPORT")
lines.append("")
# Session summary
lines.append("## Session Summary" if format == "markdown" else "SESSION SUMMARY")
lines.append(f"- Start time: {stats['start_time']}")
lines.append(f"- Duration: {stats['duration_seconds'] / 60:.1f} minutes")
lines.append(f"- Requests: {stats['request_count']}")
lines.append(f"- Total tokens: {stats['total_tokens']:,} ({stats['tokens_input']:,} in / {stats['tokens_output']:,} out)")
lines.append(f"- Total cost: ${stats['total_cost']:.4f}")
if stats['budget_limit'] is not None:
lines.append(f"- Budget: ${stats['budget_limit']:.2f} (${stats['budget_remaining']:.2f} remaining)")
lines.append("")
# Usage by model
lines.append("## Usage by Model" if format == "markdown" else "USAGE BY MODEL")
for key, usage in sorted(model_usage.items(), key=lambda x: x[1]['total_cost'], reverse=True):
lines.append(f"### {key}" if format == "markdown" else key.upper())
lines.append(f"- Requests: {usage['request_count']}")
lines.append(f"- Tokens: {usage['tokens_input'] + usage['tokens_output']:,} ({usage['tokens_input']:,} in / {usage['tokens_output']:,} out)")
lines.append(f"- Cost: ${usage['total_cost']:.4f}")
lines.append("")
return "\n".join(lines)
```
--------------------------------------------------------------------------------
/claude_code/mcp_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/mcp_server.py
"""Model Context Protocol server implementation using FastMCP."""
import os
import logging
import platform
import sys
import uuid
import time
from typing import Dict, List, Any, Optional, Callable, Union
import pathlib
import json
from fastmcp import FastMCP, Context, Image
from claude_code.lib.tools.base import Tool, ToolRegistry
from claude_code.lib.tools.manager import ToolExecutionManager
from claude_code.lib.tools.file_tools import register_file_tools
from claude_code.lib.monitoring.server_metrics import get_metrics
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get server metrics
metrics = get_metrics()
# Create the FastMCP server
mcp = FastMCP(
"Claude Code MCP Server",
description="A Model Context Protocol server for Claude Code tools",
dependencies=["fastmcp>=0.4.1", "openai", "pydantic"],
homepage_html_file=str(pathlib.Path(__file__).parent / "examples" / "claude_mcp_config.html")
)
# Initialize tool registry and manager
tool_registry = ToolRegistry()
tool_manager = ToolExecutionManager(tool_registry)
# Register file tools
register_file_tools(tool_registry)
def setup_tools():
"""Register all tools from the tool registry with FastMCP."""
# Get all registered tools
registered_tools = tool_registry.get_all_tools()
for tool_obj in registered_tools:
# Convert the tool execution function to an MCP tool
@mcp.tool(name=tool_obj.name, description=tool_obj.description)
async def tool_executor(params: Dict[str, Any], ctx: Context) -> str:
# Create a tool call in the format expected by ToolExecutionManager
tool_call = {
"id": ctx.request_id,
"function": {
"name": tool_obj.name,
"arguments": str(params)
}
}
try:
# Log the tool call in metrics
metrics.log_tool_call(tool_obj.name)
# Execute the tool and get the result
result = tool_obj.execute(tool_call)
# Report progress when complete
await ctx.report_progress(1, 1)
return result.result
except Exception as e:
# Log error in metrics
metrics.log_error(f"tool_{tool_obj.name}", str(e))
raise
# Function to register all View resources
def register_view_resources():
"""Register file viewing as resources."""
@mcp.resource("file://{file_path}")
def get_file_content(file_path: str) -> str:
"""Get the content of a file"""
try:
# Log resource request
metrics.log_resource_request(f"file://{file_path}")
# Get the View tool
view_tool = tool_registry.get_tool("View")
if not view_tool:
metrics.log_error("resource_error", "View tool not found")
return "Error: View tool not found"
# Execute the tool to get file content
tool_call = {
"id": "resource_call",
"function": {
"name": "View",
"arguments": json.dumps({"file_path": file_path})
}
}
result = view_tool.execute(tool_call)
return result.result
except Exception as e:
metrics.log_error("resource_error", f"Error viewing file: {str(e)}")
return f"Error: {str(e)}"
# Register file system resources
@mcp.resource("filesystem://{path}")
def list_directory(path: str) -> str:
"""List files and directories at the given path."""
try:
# Log resource request
metrics.log_resource_request(f"filesystem://{path}")
import os
if not os.path.isabs(path):
metrics.log_error("resource_error", f"Path must be absolute: {path}")
return f"Error: Path must be absolute: {path}"
if not os.path.exists(path):
metrics.log_error("resource_error", f"Path does not exist: {path}")
return f"Error: Path does not exist: {path}"
if not os.path.isdir(path):
metrics.log_error("resource_error", f"Path is not a directory: {path}")
return f"Error: Path is not a directory: {path}"
items = os.listdir(path)
result = []
for item in items:
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
result.append(f"{item}/")
else:
result.append(item)
return "\n".join(result)
except Exception as e:
metrics.log_error("resource_error", f"Error listing directory: {str(e)}")
return f"Error: {str(e)}"
# Add system information resource
@mcp.resource("system://info")
def get_system_info() -> str:
"""Get system information"""
try:
# Log resource request
metrics.log_resource_request("system://info")
info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": sys.version,
"hostname": platform.node(),
"platform": platform.platform(),
"architecture": platform.architecture(),
"processor": platform.processor(),
"uptime": metrics.get_uptime()
}
return "\n".join([f"{k}: {v}" for k, v in info.items()])
except Exception as e:
metrics.log_error("resource_error", f"Error getting system info: {str(e)}")
return f"Error: {str(e)}"
# Add configuration resource
@mcp.resource("config://json")
def get_config_json() -> str:
"""Get Claude Desktop MCP configuration in JSON format"""
try:
# Log resource request
metrics.log_resource_request("config://json")
config_path = pathlib.Path(__file__).parent / "examples" / "claude_mcp_config.json"
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Update working directory to actual path
current_dir = str(pathlib.Path(__file__).parent.parent.absolute())
config["workingDirectory"] = current_dir
return json.dumps(config, indent=2)
except Exception as e:
logger.error(f"Error reading config file: {e}")
metrics.log_error("resource_error", f"Error reading config file: {str(e)}")
return json.dumps({
"name": "Claude Code Tools",
"type": "local_process",
"command": "python",
"args": ["claude.py", "serve"],
"workingDirectory": str(pathlib.Path(__file__).parent.parent.absolute()),
"environment": {},
"description": "A Model Context Protocol server for Claude Code tools"
}, indent=2)
except Exception as e:
metrics.log_error("resource_error", f"Error in config resource: {str(e)}")
return f"Error: {str(e)}"
# Add metrics resource
@mcp.resource("metrics://json")
def get_metrics_json() -> str:
"""Get server metrics in JSON format"""
try:
# Log resource request
metrics.log_resource_request("metrics://json")
# Get all metrics
all_metrics = metrics.get_all_metrics()
return json.dumps(all_metrics, indent=2)
except Exception as e:
metrics.log_error("resource_error", f"Error getting metrics: {str(e)}")
return f"Error: {str(e)}"
# Add metrics tool
@mcp.tool(name="GetServerMetrics", description="Get server metrics and statistics")
async def get_server_metrics(metric_type: str = "all") -> str:
"""Get server metrics and statistics.
Args:
metric_type: Type of metrics to return (all, uptime, tools, resources, errors)
Returns:
The requested metrics information
"""
try:
# Log tool call
metrics.log_tool_call("GetServerMetrics")
if metric_type.lower() == "all":
all_metrics = metrics.get_all_metrics()
return json.dumps(all_metrics, indent=2)
elif metric_type.lower() == "uptime":
return f"Server uptime: {metrics.get_uptime()}"
elif metric_type.lower() == "tools":
tool_stats = metrics.get_tool_usage_stats()
result = "Tool Usage Statistics:\n\n"
for tool, count in sorted(tool_stats.items(), key=lambda x: x[1], reverse=True):
result += f"- {tool}: {count} calls\n"
return result
elif metric_type.lower() == "resources":
resource_stats = metrics.get_resource_usage_stats()
result = "Resource Usage Statistics:\n\n"
for resource, count in sorted(resource_stats.items(), key=lambda x: x[1], reverse=True):
result += f"- {resource}: {count} requests\n"
return result
elif metric_type.lower() == "errors":
error_stats = metrics.get_error_stats()
if not error_stats:
return "No errors recorded."
result = "Error Statistics:\n\n"
for error_type, count in sorted(error_stats.items(), key=lambda x: x[1], reverse=True):
result += f"- {error_type}: {count} occurrences\n"
return result
elif metric_type.lower() == "activity":
recent = metrics.get_recent_activity(15)
result = "Recent Activity:\n\n"
for event in recent:
time_str = event.get("formatted_time", "unknown")
if event["type"] == "tool":
result += f"[{time_str}] Tool call: {event['name']}\n"
elif event["type"] == "resource":
result += f"[{time_str}] Resource request: {event['uri']}\n"
elif event["type"] == "connection":
action = "connected" if event["action"] == "connect" else "disconnected"
result += f"[{time_str}] Client {event['client_id']} {action}\n"
elif event["type"] == "error":
result += f"[{time_str}] Error ({event['error_type']}): {event['message']}\n"
return result
else:
return f"Unknown metric type: {metric_type}. Available types: all, uptime, tools, resources, errors, activity"
except Exception as e:
metrics.log_error("tool_error", f"Error in GetServerMetrics: {str(e)}")
return f"Error retrieving metrics: {str(e)}"
# Add connection tracking
@mcp.on_connect
async def handle_connect(ctx: Context):
"""Track client connections."""
client_id = str(uuid.uuid4())
ctx.client_data["id"] = client_id
metrics.log_connection(client_id, connected=True)
logger.info(f"Client connected: {client_id}")
@mcp.on_disconnect
async def handle_disconnect(ctx: Context):
"""Track client disconnections."""
client_id = ctx.client_data.get("id", "unknown")
metrics.log_connection(client_id, connected=False)
logger.info(f"Client disconnected: {client_id}")
@mcp.tool(name="GetConfiguration", description="Get Claude Desktop configuration for this MCP server")
async def get_configuration(format: str = "json") -> str:
"""Get configuration for connecting Claude Desktop to this MCP server.
Args:
format: The format to return (json or text)
Returns:
The configuration in the requested format
"""
if format.lower() == "json":
return get_config_json()
else:
# Return text instructions
config = json.loads(get_config_json())
return f"""
To connect Claude Desktop to this MCP server:
1. Open Claude Desktop and go to Settings
2. Navigate to "Model Context Protocol" section
3. Click "Add New Server"
4. Use the following settings:
- Name: {config['name']}
- Type: Local Process
- Command: {config['command']}
- Arguments: {" ".join(config['args'])}
- Working Directory: {config['workingDirectory']}
5. Click Save and connect to the server
You can also visit http://localhost:8000 for more detailed instructions and to download the configuration file.
"""
# Initialize MCP server
def initialize_server():
"""Initialize the MCP server with all tools and resources."""
# Register all tools
setup_tools()
# Register resources
register_view_resources()
# Add metrics tool for server monitoring
@mcp.tool(name="ResetServerMetrics", description="Reset server metrics tracking")
async def reset_metrics(confirm: bool = False) -> str:
"""Reset server metrics tracking.
Args:
confirm: Confirmation flag to prevent accidental resets
Returns:
Confirmation message
"""
if not confirm:
return "Please set confirm=true to reset server metrics."
# Log the call
metrics.log_tool_call("ResetServerMetrics")
# Reset metrics
metrics.reset_stats()
return "Server metrics have been reset successfully."
logger.info("MCP server initialized with all tools and resources")
return mcp
# Main function to run the server
def main():
"""Run the MCP server"""
# Initialize the server
server = initialize_server()
# Run the server
server.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/claude_code/lib/tools/file_tools.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# claude_code/lib/tools/file_tools.py
"""File operation tools."""
import os
import logging
from typing import Dict, List, Optional, Any
from .base import tool, ToolRegistry
logger = logging.getLogger(__name__)
@tool(
name="View",
description="Reads a file from the local filesystem. The file_path parameter must be an absolute path, not a relative path.",
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The absolute path to the file to read"
},
"limit": {
"type": "number",
"description": "The number of lines to read. Only provide if the file is too large to read at once."
},
"offset": {
"type": "number",
"description": "The line number to start reading from. Only provide if the file is too large to read at once"
}
},
"required": ["file_path"]
},
category="file"
)
def view_file(file_path: str, limit: Optional[int] = None, offset: Optional[int] = 0) -> str:
"""Read contents of a file.
Args:
file_path: Absolute path to the file
limit: Maximum number of lines to read
offset: Line number to start reading from
Returns:
File contents as a string
Raises:
FileNotFoundError: If the file doesn't exist
PermissionError: If the file can't be read
"""
logger.info(f"Reading file: {file_path} (offset={offset}, limit={limit})")
if not os.path.isabs(file_path):
return f"Error: File path must be absolute: {file_path}"
if not os.path.exists(file_path):
return f"Error: File not found: {file_path}"
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
if limit is not None and offset is not None:
# Skip to offset
for _ in range(offset):
next(f, None)
# Read limited lines
lines = []
for _ in range(limit):
line = next(f, None)
if line is None:
break
lines.append(line)
content = ''.join(lines)
else:
content = f.read()
return content
except Exception as e:
logger.exception(f"Error reading file: {file_path}")
return f"Error reading file: {str(e)}"
@tool(
name="Edit",
description="This is a tool for editing files. For moving or renaming files, you should generally use the Bash tool with the 'mv' command instead.",
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The absolute path to the file to modify"
},
"old_string": {
"type": "string",
"description": "The text to replace"
},
"new_string": {
"type": "string",
"description": "The text to replace it with"
}
},
"required": ["file_path", "old_string", "new_string"]
},
needs_permission=True,
category="file"
)
def edit_file(file_path: str, old_string: str, new_string: str) -> str:
"""Edit a file by replacing text.
Args:
file_path: Absolute path to the file
old_string: Text to replace
new_string: Replacement text
Returns:
Success or error message
Raises:
FileNotFoundError: If the file doesn't exist
PermissionError: If the file can't be modified
"""
logger.info(f"Editing file: {file_path}")
if not os.path.isabs(file_path):
return f"Error: File path must be absolute: {file_path}"
try:
# Create directory if creating new file
if not os.path.exists(os.path.dirname(file_path)) and old_string == "":
os.makedirs(os.path.dirname(file_path), exist_ok=True)
if old_string == "" and not os.path.exists(file_path):
# Creating new file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_string)
return f"Created new file: {file_path}"
# Reading existing file
if not os.path.exists(file_path):
return f"Error: File not found: {file_path}"
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# Replace string
if old_string not in content:
return f"Error: Could not find the specified text in {file_path}"
# Count occurrences to ensure uniqueness
occurrences = content.count(old_string)
if occurrences > 1:
return f"Error: Found {occurrences} occurrences of the specified text in {file_path}. Please provide more context to uniquely identify the text to replace."
new_content = content.replace(old_string, new_string)
# Write back to file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
return f"Successfully edited {file_path}"
except Exception as e:
logger.exception(f"Error editing file: {file_path}")
return f"Error editing file: {str(e)}"
@tool(
name="Replace",
description="Write a file to the local filesystem. Overwrites the existing file if there is one.",
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The absolute path to the file to write"
},
"content": {
"type": "string",
"description": "The content to write to the file"
}
},
"required": ["file_path", "content"]
},
needs_permission=True,
category="file"
)
def replace_file(file_path: str, content: str) -> str:
"""Replace file contents or create a new file.
Args:
file_path: Absolute path to the file
content: New content for the file
Returns:
Success or error message
Raises:
PermissionError: If the file can't be written
"""
logger.info(f"Replacing file: {file_path}")
if not os.path.isabs(file_path):
return f"Error: File path must be absolute: {file_path}"
try:
# Create directory if it doesn't exist
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
# Write content to file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {file_path}"
except Exception as e:
logger.exception(f"Error writing file: {file_path}")
return f"Error writing file: {str(e)}"
@tool(
name="MakeDirectory",
description="Create a new directory on the local filesystem.",
parameters={
"type": "object",
"properties": {
"directory_path": {
"type": "string",
"description": "The absolute path to the directory to create"
},
"parents": {
"type": "boolean",
"description": "Whether to create parent directories if they don't exist",
"default": True
},
"mode": {
"type": "integer",
"description": "The file mode (permissions) to set for the directory (octal)",
"default": 0o755
}
},
"required": ["directory_path"]
},
needs_permission=True,
category="file"
)
def make_directory(directory_path: str, parents: bool = True, mode: int = 0o755) -> str:
"""Create a new directory.
Args:
directory_path: Absolute path to the directory to create
parents: Whether to create parent directories
mode: File mode (permissions) to set
Returns:
Success or error message
Raises:
PermissionError: If the directory can't be created
"""
logger.info(f"Creating directory: {directory_path}")
if not os.path.isabs(directory_path):
return f"Error: Directory path must be absolute: {directory_path}"
try:
if os.path.exists(directory_path):
if os.path.isdir(directory_path):
return f"Directory already exists: {directory_path}"
else:
return f"Error: Path exists but is not a directory: {directory_path}"
# Create directory
os.makedirs(directory_path, exist_ok=parents, mode=mode)
return f"Successfully created directory: {directory_path}"
except Exception as e:
logger.exception(f"Error creating directory: {directory_path}")
return f"Error creating directory: {str(e)}"
@tool(
name="ListDirectory",
description="List files and directories in a given path with detailed information.",
parameters={
"type": "object",
"properties": {
"directory_path": {
"type": "string",
"description": "The absolute path to the directory to list"
},
"pattern": {
"type": "string",
"description": "Optional glob pattern to filter files (e.g., '*.py')"
},
"recursive": {
"type": "boolean",
"description": "Whether to list files recursively",
"default": False
},
"show_hidden": {
"type": "boolean",
"description": "Whether to show hidden files (starting with .)",
"default": False
},
"details": {
"type": "boolean",
"description": "Whether to show detailed information (size, permissions, etc.)",
"default": False
}
},
"required": ["directory_path"]
},
category="file"
)
def list_directory(
directory_path: str,
pattern: Optional[str] = None,
recursive: bool = False,
show_hidden: bool = False,
details: bool = False
) -> str:
"""List files and directories with detailed information.
Args:
directory_path: Absolute path to the directory
pattern: Glob pattern to filter files
recursive: Whether to list files recursively
show_hidden: Whether to show hidden files
details: Whether to show detailed information
Returns:
Directory listing as formatted text
"""
logger.info(f"Listing directory: {directory_path}")
if not os.path.isabs(directory_path):
return f"Error: Directory path must be absolute: {directory_path}"
if not os.path.exists(directory_path):
return f"Error: Directory not found: {directory_path}"
if not os.path.isdir(directory_path):
return f"Error: Path is not a directory: {directory_path}"
try:
import glob
import stat
from datetime import datetime
# Build the pattern
if pattern:
if recursive:
search_pattern = os.path.join(directory_path, "**", pattern)
else:
search_pattern = os.path.join(directory_path, pattern)
else:
if recursive:
search_pattern = os.path.join(directory_path, "**")
else:
search_pattern = os.path.join(directory_path, "*")
# Get all matching files
if recursive:
matches = glob.glob(search_pattern, recursive=True)
else:
matches = glob.glob(search_pattern)
# Filter hidden files if needed
if not show_hidden:
matches = [m for m in matches if not os.path.basename(m).startswith('.')]
# Sort by name
matches.sort()
# Format the output
result = []
if details:
# Header
result.append(f"{'Type':<6} {'Permissions':<11} {'Size':<10} {'Modified':<20} {'Name'}")
result.append("-" * 80)
for item_path in matches:
try:
# Get file stats
item_stat = os.stat(item_path)
# Determine type
if os.path.isdir(item_path):
item_type = "dir"
elif os.path.islink(item_path):
item_type = "link"
else:
item_type = "file"
# Format permissions
mode = item_stat.st_mode
perms = ""
for who in "USR", "GRP", "OTH":
for what in "R", "W", "X":
perm = getattr(stat, f"S_I{what}{who}")
perms += what.lower() if mode & perm else "-"
# Format size
size = item_stat.st_size
if size < 1024:
size_str = f"{size}B"
elif size < 1024 * 1024:
size_str = f"{size/1024:.1f}KB"
elif size < 1024 * 1024 * 1024:
size_str = f"{size/(1024*1024):.1f}MB"
else:
size_str = f"{size/(1024*1024*1024):.1f}GB"
# Format modification time
mtime = datetime.fromtimestamp(item_stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
# Format name (relative to the directory)
name = os.path.relpath(item_path, directory_path)
# Add to result
result.append(f"{item_type:<6} {perms:<11} {size_str:<10} {mtime:<20} {name}")
except Exception as e:
result.append(f"Error getting info for {item_path}: {str(e)}")
else:
# Simple listing
dirs = []
files = []
for item_path in matches:
name = os.path.relpath(item_path, directory_path)
if os.path.isdir(item_path):
dirs.append(f"{name}/")
else:
files.append(name)
if dirs:
result.append("Directories:")
for d in dirs:
result.append(f" {d}")
if files:
if dirs:
result.append("")
result.append("Files:")
for f in files:
result.append(f" {f}")
if not result:
return f"No matching items found in {directory_path}"
return "\n".join(result)
except Exception as e:
logger.exception(f"Error listing directory: {directory_path}")
return f"Error listing directory: {str(e)}"
def register_file_tools(registry: ToolRegistry) -> None:
"""Register all file tools with the registry.
Args:
registry: Tool registry to register with
"""
from .base import create_tools_from_functions
file_tools = [
view_file,
edit_file,
replace_file,
make_directory,
list_directory
]
create_tools_from_functions(registry, file_tools)
```
--------------------------------------------------------------------------------
/web-client.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>OpenAI Code Assistant Web Client</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
line-height: 1.6;
color: #333;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #2c3e50;
border-bottom: 2px solid #eee;
padding-bottom: 10px;
}
.chat-container {
display: flex;
height: 70vh;
}
.sidebar {
width: 250px;
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin-right: 20px;
}
.main-chat {
flex: 1;
display: flex;
flex-direction: column;
border: 1px solid #ddd;
border-radius: 5px;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 15px;
background-color: #fff;
}
.chat-input {
display: flex;
padding: 10px;
background-color: #f8f9fa;
border-top: 1px solid #ddd;
}
.chat-input textarea {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
resize: none;
font-family: inherit;
}
.chat-input button {
margin-left: 10px;
padding: 10px 15px;
background-color: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.chat-input button:hover {
background-color: #45a049;
}
.message {
margin-bottom: 15px;
padding: 10px;
border-radius: 5px;
}
.user-message {
background-color: #e3f2fd;
align-self: flex-end;
margin-left: 20%;
}
.assistant-message {
background-color: #f1f1f1;
align-self: flex-start;
margin-right: 20%;
}
.tool-message {
background-color: #fff8e1;
border-left: 3px solid #ffc107;
padding-left: 10px;
font-family: monospace;
white-space: pre-wrap;
}
.status-message {
color: #666;
font-style: italic;
text-align: center;
margin: 10px 0;
}
.warning-message {
color: #ff9800;
border-left: 3px solid #ff9800;
padding-left: 10px;
}
.error-message {
color: #f44336;
border-left: 3px solid #f44336;
padding-left: 10px;
}
.conversation-list {
list-style: none;
padding: 0;
}
.conversation-list li {
padding: 8px 10px;
margin-bottom: 5px;
background-color: #e9ecef;
border-radius: 4px;
cursor: pointer;
}
.conversation-list li:hover {
background-color: #dee2e6;
}
.conversation-list li.active {
background-color: #4CAF50;
color: white;
}
.settings-panel {
margin-top: 20px;
}
.settings-panel h3 {
margin-bottom: 10px;
}
.settings-panel label {
display: block;
margin-bottom: 5px;
}
.settings-panel select, .settings-panel input {
width: 100%;
padding: 8px;
margin-bottom: 10px;
border: 1px solid #ddd;
border-radius: 4px;
}
.new-conversation-btn {
width: 100%;
padding: 10px;
background-color: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
margin-bottom: 15px;
}
.new-conversation-btn:hover {
background-color: #45a049;
}
pre {
background-color: #f5f5f5;
padding: 10px;
border-radius: 4px;
overflow-x: auto;
}
code {
font-family: 'Courier New', Courier, monospace;
}
</style>
</head>
<body>
<h1>OpenAI Code Assistant</h1>
<div class="chat-container">
<div class="sidebar">
<button id="newConversationBtn" class="new-conversation-btn">New Conversation</button>
<h3>Conversations</h3>
<ul id="conversationList" class="conversation-list">
<!-- Conversations will be added here -->
</ul>
<div class="settings-panel">
<h3>Settings</h3>
<label for="modelSelect">Model:</label>
<select id="modelSelect">
<option value="gpt-4o">GPT-4o</option>
<option value="gpt-4-turbo">GPT-4 Turbo</option>
<option value="gpt-3.5-turbo">GPT-3.5 Turbo</option>
</select>
<label for="temperatureInput">Temperature:</label>
<input type="number" id="temperatureInput" min="0" max="2" step="0.1" value="0">
</div>
</div>
<div class="main-chat">
<div id="chatMessages" class="chat-messages">
<div class="status-message">Start a new conversation or select an existing one.</div>
</div>
<div class="chat-input">
<textarea id="userInput" placeholder="Type your message here..." rows="3"></textarea>
<button id="sendButton">Send</button>
</div>
</div>
</div>
<script>
// API endpoint (change this to match your server)
const API_BASE_URL = 'http://localhost:8000';
// State
let currentConversationId = null;
let conversations = [];
// DOM Elements
const chatMessages = document.getElementById('chatMessages');
const userInput = document.getElementById('userInput');
const sendButton = document.getElementById('sendButton');
const newConversationBtn = document.getElementById('newConversationBtn');
const conversationList = document.getElementById('conversationList');
const modelSelect = document.getElementById('modelSelect');
const temperatureInput = document.getElementById('temperatureInput');
// Event Listeners
sendButton.addEventListener('click', sendMessage);
newConversationBtn.addEventListener('click', createNewConversation);
userInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
});
// Initialize
function init() {
// Load conversations from local storage
const savedConversations = localStorage.getItem('conversations');
if (savedConversations) {
conversations = JSON.parse(savedConversations);
updateConversationList();
}
}
// Create a new conversation
async function createNewConversation() {
try {
const model = modelSelect.value;
const temperature = parseFloat(temperatureInput.value);
const response = await fetch(`${API_BASE_URL}/conversation`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ model, temperature })
});
const data = await response.json();
if (data.conversation_id) {
const newConversation = {
id: data.conversation_id,
model: data.model,
created: new Date().toISOString(),
messages: []
};
conversations.push(newConversation);
saveConversations();
updateConversationList();
// Switch to the new conversation
switchConversation(data.conversation_id);
}
} catch (error) {
console.error('Error creating conversation:', error);
addErrorMessage('Failed to create a new conversation. Please try again.');
}
}
// Switch to a different conversation
function switchConversation(conversationId) {
currentConversationId = conversationId;
// Update UI
const conversationItems = conversationList.querySelectorAll('li');
conversationItems.forEach(item => {
if (item.dataset.id === conversationId) {
item.classList.add('active');
} else {
item.classList.remove('active');
}
});
// Clear and load messages
chatMessages.innerHTML = '';
const conversation = conversations.find(c => c.id === conversationId);
if (conversation && conversation.messages) {
conversation.messages.forEach(msg => {
if (msg.role === 'user') {
addUserMessage(msg.content);
} else if (msg.role === 'assistant') {
addAssistantMessage(msg.content);
} else if (msg.role === 'tool') {
addToolMessage(msg.name, msg.content);
}
});
}
// Focus input
userInput.focus();
}
// Send a message
async function sendMessage() {
const message = userInput.value.trim();
if (!message) return;
if (!currentConversationId) {
await createNewConversation();
}
// Add user message to UI
addUserMessage(message);
// Save message to conversation
const conversation = conversations.find(c => c.id === currentConversationId);
if (conversation) {
conversation.messages.push({
role: 'user',
content: message
});
saveConversations();
}
// Clear input
userInput.value = '';
// Send to API and stream response
try {
const response = await fetch(`${API_BASE_URL}/conversation/${currentConversationId}/message/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let assistantResponse = '';
let responseElement = null;
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const data = JSON.parse(line);
if (data.type === 'content') {
if (!responseElement) {
responseElement = addAssistantMessage('');
}
assistantResponse += data.content;
responseElement.textContent = assistantResponse;
}
else if (data.type === 'status') {
if (data.status === 'running_tools') {
addStatusMessage('Running tools...');
} else if (data.status.startsWith('running_tools_iteration_')) {
const iteration = data.status.split('_').pop();
addStatusMessage(`Running tools (iteration ${iteration})...`);
}
}
else if (data.type === 'tool_result') {
addToolMessage(data.tool, data.result);
}
else if (data.type === 'warning') {
addWarningMessage(data.warning);
}
else if (data.type === 'error') {
addErrorMessage(data.error);
}
} catch (e) {
console.error('Error parsing stream data:', e, line);
}
}
}
// Save assistant response to conversation
if (conversation && assistantResponse) {
conversation.messages.push({
role: 'assistant',
content: assistantResponse
});
saveConversations();
}
} catch (error) {
console.error('Error sending message:', error);
addErrorMessage('Failed to send message. Please try again.');
}
}
// Add a user message to the chat
function addUserMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'message user-message';
messageElement.textContent = message;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Add an assistant message to the chat
function addAssistantMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'message assistant-message';
messageElement.textContent = message;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Add a tool message to the chat
function addToolMessage(toolName, result) {
const messageElement = document.createElement('div');
messageElement.className = 'message tool-message';
messageElement.innerHTML = `<strong>${toolName}:</strong>\n${result}`;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Add a status message to the chat
function addStatusMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'status-message';
messageElement.textContent = message;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Add a warning message to the chat
function addWarningMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'warning-message';
messageElement.textContent = message;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Add an error message to the chat
function addErrorMessage(message) {
const messageElement = document.createElement('div');
messageElement.className = 'error-message';
messageElement.textContent = message;
chatMessages.appendChild(messageElement);
scrollToBottom();
return messageElement;
}
// Update the conversation list in the sidebar
function updateConversationList() {
conversationList.innerHTML = '';
conversations.forEach(conversation => {
const listItem = document.createElement('li');
listItem.textContent = new Date(conversation.created).toLocaleString();
listItem.dataset.id = conversation.id;
if (conversation.id === currentConversationId) {
listItem.classList.add('active');
}
listItem.addEventListener('click', () => {
switchConversation(conversation.id);
});
conversationList.appendChild(listItem);
});
}
// Save conversations to local storage
function saveConversations() {
localStorage.setItem('conversations', JSON.stringify(conversations));
}
// Scroll chat to bottom
function scrollToBottom() {
chatMessages.scrollTop = chatMessages.scrollHeight;
}
// Initialize the app
init();
</script>
</body>
</html>
```