# Directory Structure
```
├── .gitignore
├── client
│ ├── app.py
│ ├── requirements.txt
│ ├── servers_config_example.json
│ └── src
│ ├── __init__.py
│ ├── api_server.py
│ ├── api.py
│ ├── auth.py
│ ├── config.py
│ └── server.py
├── example_llm_mcp
│ ├── .env.example
│ ├── main.py
│ ├── requirements.txt
│ └── servers_config_example.json
├── files
│ ├── client.gif
│ └── llm_mcp_example.gif
├── LICENSE
├── README.md
└── server
├── package.json
├── src
│ ├── auth.ts
│ ├── client.ts
│ ├── config.ts
│ ├── index.ts
│ ├── mcp-proxy.ts
│ └── sse.ts
└── tsconfig.json
```
# Files
--------------------------------------------------------------------------------
/example_llm_mcp/.env.example:
--------------------------------------------------------------------------------
```
OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.venv/
.env
servers_config.json
.idea/
node_modules/
package-lock.json
api_keys.json
build/
__pycache__/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Pocket MCP Manager
A flexible and user-friendly management system for Model Context Protocol (MCP) servers, consisting of a client-server architecture that simplifies handling multiple MCP servers through a central interface.
## Overview
The Pocket MCP Manager streamlines the process of working with multiple MCP servers by allowing you to:
- Add all your MCP servers to a central management UI
- Selectively launch specific servers through an intuitive interface
- Generate API keys linked to these running servers
- Connect to them through a single proxy MCP server in clients like Claude or Cursor
This approach means you only need to update a single API key in your AI tools when changing which MCP servers you want to use, rather than reconfiguring multiple connection settings.
## Server Component
The server component is an MCP proxy server that:
1. Accepts an API key from the client
2. Connects to the already running MCP servers authorized by that key
3. Exposes all connected servers' capabilities through a unified MCP interface
4. Routes requests to the appropriate backend servers
### Server Installation
```bash
# Clone the repository
git clone [email protected]:dailydaniel/pocket-mcp.git
cd pocket-mcp/server
# Install dependencies
npm install
# The build step runs automatically during installation
```
### Connecting to Claude Desktop / Cursor
Add the following configuration to your Claude Desktop settings:
```json
{
"mcpServers": {
"mcp-proxy": {
"command": "node",
"args": ["/full/path/to/pocket-mcp/server/build/index.js"],
"env": {
"MCP_API_KEY": "api_key_from_client",
"CLIENT_API_URL": "http://localhost:<port>/api"
}
}
}
}
```
Replace:
- `/full/path/to/pocket-mcp/server/build/index.js` with the absolute path to your server's build/index.js file
- `api_key_from_client` with the API key generated from the client UI
- `<port>` with the port shown in the API server logs (typically 8000)
## Client Component
The client provides a web-based UI built with Streamlit for:
- Viewing all configured MCP servers
- Launching selected servers as a group
- Generating API keys for launched servers
- Managing existing API keys
### Client Setup
```bash
# Navigate to the client directory
cd pocket-mcp/client
# Create and activate a virtual environment
python -m venv .venv --prompt "mcp-venv"
source .venv/bin/activate
# Install requirements
pip install -r requirements.txt
# Copy the example config
cp servers_config_example.json servers_config.json
# Edit the configuration with your MCP servers
vim servers_config.json
# Run the client
streamlit run app.py
```
### Server Configuration Example
Create a `servers_config.json` file in the client directory with your MCP servers:
```json
{
"mcpServers": {
"jetbrains": {
"command": "npx",
"args": ["-y", "@jetbrains/mcp-proxy"]
},
"logseq": {
"command": "uvx",
"args": ["mcp-server-logseq"],
"env": {
"LOGSEQ_API_TOKEN": "API_KEY",
"LOGSEQ_API_URL": "http://127.0.0.1:<port>"
}
},
"brave-search": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-brave-search"
],
"env": {
"BRAVE_API_KEY": "API_KEY"
}
}
}
}
```
Replace `API_KEY` and `<port>` with your actual values.
### Client Demo

## Example LLM MCP Client
The repository includes an example client for chatting with LLMs using OpenAI API and MCP servers.
### Setup and Usage
```bash
# Navigate to the example client directory
cd pocket-mcp/example_llm_mcp
# Copy the example environment file
cp .env.example .env
# Edit the .env file and add your OpenAI API key
vim .env
# Add server configurations to the servers_config.json file
cp servers_config_example.json servers_config.json
# Add API key from the client
vim servers_config.json
# If not already in a virtual environment
# May use the same virtual environment as for the client
source ../client/.venv/bin/activate
# Install requirements
pip install -r requirements.txt
# Run the client
python3 main.py
```
The example client will connect to your running MCP servers and allow you to chat with an LLM while utilizing MCP capabilities.
### Chat Demo

## Acknowledgments
- Server component based on [mcp-proxy-server](https://github.com/adamwattis/mcp-proxy-server)
- SSE implementation with [mcp-proxy](https://github.com/sparfenyuk/mcp-proxy)
## TODO
- Add additional functionality to the client component
- Convert the server into a standalone npm module that can be installed with npx
- Delete cringe copyright notice from streamlit app :)
```
--------------------------------------------------------------------------------
/client/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/example_llm_mcp/requirements.txt:
--------------------------------------------------------------------------------
```
python-dotenv
mcp
openai
```
--------------------------------------------------------------------------------
/client/requirements.txt:
--------------------------------------------------------------------------------
```
streamlit>=1.25.0
mcp>=0.6.0
python-dotenv>=1.0.0
pyperclip>=1.8.2
fastapi>=0.95.0
uvicorn>=0.21.0
mcp-proxy>=0.5.1
```
--------------------------------------------------------------------------------
/example_llm_mcp/servers_config_example.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"mcp-proxy": {
"command": "node",
"args": ["/full/path/to/pocket-mcp/server/build/index.js"],
"env": {
"MCP_API_KEY": "API_KEY",
"CLIENT_API_URL": "http://localhost:<PORT>/api"
}
}
}
}
```
--------------------------------------------------------------------------------
/server/tsconfig.json:
--------------------------------------------------------------------------------
```json
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./build",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"],
"exclude": ["node_modules"]
}
```
--------------------------------------------------------------------------------
/client/servers_config_example.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"jetbrains": {
"command": "npx",
"args": ["-y", "@jetbrains/mcp-proxy"]
},
"logseq": {
"command": "uvx",
"args": ["mcp-server-logseq"],
"env": {
"LOGSEQ_API_TOKEN": "API_KEY",
"LOGSEQ_API_URL": "http://127.0.0.1:<port>"
}
},
"brave-search": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-brave-search"
],
"env": {
"BRAVE_API_KEY": "API_KEY"
}
}
}
}
```
--------------------------------------------------------------------------------
/server/src/index.ts:
--------------------------------------------------------------------------------
```typescript
#!/usr/bin/env node
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { createServer } from "./mcp-proxy.js";
async function main() {
const apiKey = process.env.MCP_API_KEY;
const transport = new StdioServerTransport();
try {
const { server, cleanup } = await createServer(apiKey);
await server.connect(transport);
process.on("SIGINT", async () => {
await cleanup();
await server.close();
process.exit(0);
});
} catch (error) {
console.error("Server error:", error);
process.exit(1);
}
}
main().catch((error) => {
console.error("Error:", error);
process.exit(1);
});
```
--------------------------------------------------------------------------------
/client/src/api.py:
--------------------------------------------------------------------------------
```python
import json
import os
from typing import Any, Dict, List, Optional
class ApiClient:
"""Client for generating server configurations for MCP API server."""
@staticmethod
def generate_server_config(
api_key: str,
server_configs: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Generate MCP Proxy Server configuration based on API key.
Args:
api_key: API key for authentication
server_configs: Dictionary of server configurations
Returns:
Dictionary with instructions for proxy server
"""
return {
"api_key": api_key,
"instructions": {
"env_variable": "MCP_API_KEY",
"command": "mcp-proxy-server",
"sse_command": "node build/sse.js"
}
}
```
--------------------------------------------------------------------------------
/server/src/auth.ts:
--------------------------------------------------------------------------------
```typescript
// src/auth.ts
import axios from 'axios';
import { ServerConfig } from './config.js';
const CLIENT_API_URL = process.env.CLIENT_API_URL || 'http://localhost:8000/api';
export interface AuthResponse {
success: boolean;
servers: ServerConfig[];
message?: string;
}
export async function authenticateAndGetServers(apiKey: string): Promise<ServerConfig[]> {
try {
const response = await axios.get<AuthResponse>(`${CLIENT_API_URL}/servers`, {
headers: {
Authorization: `Bearer ${apiKey}`
}
});
if (!response.data.success) {
throw new Error(response.data.message || 'Auth failed');
}
return response.data.servers;
} catch (error) {
if (axios.isAxiosError(error)) {
if (error.response?.status === 401) {
throw new Error('Invalid API key');
} else if (error.response?.status === 403) {
throw new Error('No permission');
}
throw new Error(`Auth error: ${error.message}`);
}
throw error;
}
}
```
--------------------------------------------------------------------------------
/server/src/config.ts:
--------------------------------------------------------------------------------
```typescript
// src/config.ts
import { readFile } from 'fs/promises';
import { resolve } from 'path';
import { authenticateAndGetServers } from './auth.js';
export type TransportConfigStdio = {
type?: 'stdio'
command: string;
args?: string[];
env?: string[]
}
export type TransportConfigSSE = {
type: 'sse'
url: string
}
export type TransportConfig = TransportConfigSSE | TransportConfigStdio
export interface ServerConfig {
name: string;
transport: TransportConfig;
}
export interface Config {
servers: ServerConfig[];
}
export const loadConfig = async (apiKey?: string): Promise<Config> => {
if (apiKey) {
try {
const servers = await authenticateAndGetServers(apiKey);
return { servers };
} catch (error) {
console.error('Auth error with API key:', error);
throw error;
}
}
try {
const configPath = process.env.MCP_CONFIG_PATH || resolve(process.cwd(), 'config.json');
console.log(`Load config from: ${configPath}`);
const fileContents = await readFile(configPath, 'utf-8');
return JSON.parse(fileContents);
} catch (error) {
console.error('Error in loading config.json:', error);
return { servers: [] };
}
};
```
--------------------------------------------------------------------------------
/server/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp-proxy-server",
"version": "0.0.1",
"author": "Your Name",
"license": "MIT",
"description": "An MCP proxy server that aggregates and serves multiple MCP resource servers through a single interface with API key support",
"private": false,
"type": "module",
"bin": {
"mcp-proxy-server": "./build/index.js"
},
"files": [
"build"
],
"scripts": {
"dev": "nodemon --watch 'src/**' --ext 'ts,json' --ignore 'src/**/*.spec.ts' --exec 'tsx src/index.ts'",
"dev:sse": "nodemon --watch 'src/**' --ext 'ts,json' --ignore 'src/**/*.spec.ts' --exec 'tsx src/sse.ts'",
"build": "tsc",
"postbuild": "chmod +x build/index.js",
"prepare": "npm run build",
"watch": "tsc --watch"
},
"dependencies": {
"@modelcontextprotocol/sdk": "0.6.0",
"@types/cors": "^2.8.17",
"axios": "^1.6.0",
"cors": "^2.8.5",
"eventsource": "^3.0.2",
"express": "^4.21.1",
"zod": "^3.23.8",
"zod-to-json-schema": "^3.23.5"
},
"devDependencies": {
"@types/express": "^5.0.0",
"@types/node": "^20.11.24",
"nodemon": "^3.1.9",
"tsx": "^4.19.2",
"typescript": "^5.3.3"
},
"repository": {
"type": "git",
"url": "https://github.com/dailydaniel/pocket-mcp",
"directory": "server"
}
}
```
--------------------------------------------------------------------------------
/server/src/sse.ts:
--------------------------------------------------------------------------------
```typescript
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
import { createServer } from "./mcp-proxy.js";
import cors from "cors";
const app = express();
app.use(cors());
const apiKey = process.env.MCP_API_KEY;
let transport: SSEServerTransport;
let cleanupFunction: () => Promise<void>;
async function initServer() {
try {
const { server, cleanup } = await createServer(apiKey);
cleanupFunction = cleanup;
app.get("/sse", async (req, res) => {
console.log("Got connection");
transport = new SSEServerTransport("/message", res);
await server.connect(transport);
server.onerror = (err) => {
console.error(`Server error: ${err.stack}`);
};
server.onclose = async () => {
console.log('Server closed');
if (process.env.KEEP_SERVER_OPEN !== "1") {
await cleanup();
await server.close();
process.exit(0);
}
};
});
app.post("/message", async (req, res) => {
console.log("Got message");
await transport.handlePostMessage(req, res);
});
} catch (error) {
console.error("Error in server initialization:", error);
process.exit(1);
}
}
initServer();
const PORT = process.env.PORT || 3006;
app.listen(PORT, () => {
console.log(`Server listening on port ${PORT}`);
});
process.on("SIGINT", async () => {
if (cleanupFunction) {
await cleanupFunction();
}
process.exit(0);
});
```
--------------------------------------------------------------------------------
/client/src/config.py:
--------------------------------------------------------------------------------
```python
import json
import os
from typing import Any, Dict, Optional
from dotenv import load_dotenv
class Configuration:
"""Manages configuration and environment variables for the MCP client."""
def __init__(self, config_path: str = "servers_config.json") -> None:
"""Initialize the configuration manager.
Args:
config_path: Path to the configuration file
"""
self.config_path = config_path
self.load_env()
self.config = self.load_config()
@staticmethod
def load_env() -> None:
"""Load environment variables from .env file."""
load_dotenv()
def load_config(self, file_path: Optional[str] = None) -> Dict[str, Any]:
"""Load configuration from the JSON file.
Args:
file_path: Optional path to the configuration file (overrides config_path)
Returns:
Dictionary containing the configuration
"""
try:
path = file_path or self.config_path
if os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
return {"mcpServers": {}}
except Exception as e:
print(f"Error loading configuration: {e}")
return {"mcpServers": {}}
def save_config(self, config: Dict[str, Any]) -> None:
"""Save configuration to the JSON file.
Args:
config: Configuration dictionary to save
"""
try:
with open(self.config_path, "w") as f:
json.dump(config, f, indent=2)
self.config = config
except Exception as e:
print(f"Error saving configuration: {e}")
def get_server_config(self, server_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific server.
Args:
server_name: Name of the server
Returns:
Server configuration or None if not found
"""
return self.config.get("mcpServers", {}).get(server_name)
def get_servers(self) -> Dict[str, Dict[str, Any]]:
"""Get all server configurations.
Returns:
Dictionary of server configurations
"""
return self.config.get("mcpServers", {})
```
--------------------------------------------------------------------------------
/client/src/api_server.py:
--------------------------------------------------------------------------------
```python
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, List, Optional
import socket
from .auth import AuthManager
from .server import ServerManager
from .config import Configuration
API_PORT = None
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
auth_manager = AuthManager()
config = Configuration()
server_manager = ServerManager()
async def verify_api_key(authorization: str = Header(None)):
if not authorization:
raise HTTPException(status_code=401, detail="API key required")
if authorization.startswith("Bearer "):
api_key = authorization[7:]
else:
api_key = authorization
is_valid, server_names = auth_manager.validate_key(api_key)
if not is_valid:
raise HTTPException(status_code=401, detail="Invalid API key")
return server_names
@app.get("/api/servers")
async def get_servers(server_names: List[str] = Depends(verify_api_key)):
servers_config = config.get_servers()
authorized_servers = []
for name in server_names:
if name in servers_config:
sse_port = 3000 + hash(name) % 1000
authorized_servers.append({
"name": name,
"transport": {
"type": "sse",
"url": f"http://0.0.0.0:{sse_port}/sse"
}
})
return {
"success": True,
"servers": authorized_servers
}
@app.get("/api/health")
async def health_check():
return {"status": "ok"}
def find_free_port(start_port=8000, max_attempts=100):
for port in range(start_port, start_port + max_attempts):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', port))
return port
except OSError:
continue
raise IOError("No free ports found")
def start_api_server(host="0.0.0.0", port=None):
if port is None:
port = find_free_port()
print(f"Starting API server on port {port}")
global API_PORT
API_PORT = port
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
start_api_server()
```
--------------------------------------------------------------------------------
/server/src/client.ts:
--------------------------------------------------------------------------------
```typescript
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { ServerConfig } from './config.js';
const sleep = (time: number) => new Promise<void>(resolve => setTimeout(() => resolve(), time))
export interface ConnectedClient {
client: Client;
cleanup: () => Promise<void>;
name: string;
}
const createClient = (server: ServerConfig): { client: Client | undefined, transport: Transport | undefined } => {
let transport: Transport | null = null
try {
if (server.transport.type === 'sse') {
transport = new SSEClientTransport(new URL(server.transport.url));
} else {
transport = new StdioClientTransport({
command: server.transport.command,
args: server.transport.args,
env: server.transport.env ? server.transport.env.reduce((o, v) => ({
[v]: process.env[v] || ''
}), {}) : undefined
});
}
} catch (error) {
console.error(`Failed to create transport ${server.transport.type || 'stdio'} to ${server.name}:`, error);
}
if (!transport) {
console.warn(`Transport ${server.name} not available.`)
return { transport: undefined, client: undefined }
}
const client = new Client({
name: 'mcp-proxy-client',
version: '1.0.0',
}, {
capabilities: {
prompts: {},
resources: { subscribe: true },
tools: {}
}
});
return { client, transport }
}
export const createClients = async (servers: ServerConfig[]): Promise<ConnectedClient[]> => {
const clients: ConnectedClient[] = [];
for (const server of servers) {
console.log(`Connecting to server: ${server.name}`);
const waitFor = 2500
const retries = 3
let count = 0
let retry = true
while (retry) {
const { client, transport } = createClient(server)
if (!client || !transport) {
break
}
try {
await client.connect(transport);
console.log(`Connected to server: ${server.name}`);
clients.push({
client,
name: server.name,
cleanup: async () => {
await transport.close();
}
});
break
} catch (error) {
console.error(`Failed to connect to ${server.name}:`, error);
count++
retry = (count < retries)
if (retry) {
try {
await client.close()
} catch { }
console.log(`Retry connection to ${server.name} in ${waitFor}ms (${count}/${retries})`);
await sleep(waitFor)
}
}
}
}
return clients;
};
```
--------------------------------------------------------------------------------
/client/src/auth.py:
--------------------------------------------------------------------------------
```python
import json
import os
import secrets
import time
from typing import Dict, List, Optional, Set, Tuple, Union
class AuthManager:
"""Manages API keys and server access."""
def __init__(self, keys_file: str = "api_keys.json") -> None:
"""Initialize the authentication manager.
Args:
keys_file: Path to the API keys file
"""
self.keys_file = keys_file
self.keys = self.load_keys()
def load_keys(self) -> Dict[str, Dict[str, Union[List[str], int]]]:
"""Load API keys from the JSON file.
Returns:
Dictionary of API keys and their associated servers
"""
try:
if os.path.exists(self.keys_file):
with open(self.keys_file, "r") as f:
return json.load(f)
return {}
except Exception as e:
print(f"Error loading API keys: {e}")
return {}
def save_keys(self) -> None:
"""Save API keys to the JSON file."""
try:
with open(self.keys_file, "w") as f:
json.dump(self.keys, f, indent=2)
except Exception as e:
print(f"Error saving API keys: {e}")
def generate_key(self, server_names: List[str]) -> str:
"""Generate a new API key for a group of servers.
Args:
server_names: List of server names to associate with the key
Returns:
Generated API key
"""
# Generate a random token
token = secrets.token_urlsafe(32)
# Store the key with associated servers and creation time
self.keys[token] = {
"servers": server_names,
"created": int(time.time())
}
# Save the updated keys
self.save_keys()
return token
def validate_key(self, key: str) -> Tuple[bool, List[str]]:
"""Validate an API key and return associated servers.
Args:
key: API key to validate
Returns:
Tuple of (is_valid, server_names)
"""
# print(f"current keys: {self.keys}")
self.keys = self.load_keys()
# print(f"current keys after reload: {self.keys}")
# print(f"validating key: {key}")
if key in self.keys:
return True, self.keys[key]["servers"]
return False, []
def revoke_key(self, key: str) -> bool:
"""Revoke an API key.
Args:
key: API key to revoke
Returns:
True if the key was revoked, False otherwise
"""
if key in self.keys:
del self.keys[key]
self.save_keys()
return True
return False
def get_all_keys(self) -> Dict[str, Dict[str, Union[List[str], int]]]:
"""Get all API keys.
Returns:
Dictionary of API keys and their details
"""
return self.keys
```
--------------------------------------------------------------------------------
/client/src/server.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import os
import shutil
import subprocess
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional, Set, Tuple
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class Server:
"""Manages an MCP server connection and execution."""
def __init__(self, name: str, config: Dict[str, Any]) -> None:
"""Initialize the server.
Args:
name: Server name
config: Server configuration
"""
self.name: str = name
self.config: Dict[str, Any] = config
self.process: Optional[subprocess.Popen] = None
self.session: Optional[ClientSession] = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()
async def start(self) -> bool:
"""Start the server process via mcp-proxy in SSE mode."""
if self.process and self.is_running():
print(f"Server {self.name} is already running")
return True
try:
# Prepare command and environment for mcp-proxy
proxy_command = shutil.which("mcp-proxy")
if not proxy_command:
print(f"Command 'mcp-proxy' not found. Please install it.")
return False
# Define the SSE port for this server
sse_port = self.config.get("sse_port", 3000 + hash(self.name) % 1000)
start_command = shutil.which(self.config["command"])
print(f"start command: {start_command}")
if not start_command:
print(f"Command {self.config['command']} not found. Please install Node.js and npm.")
args = [
"--allow-origin='*'",
f"--sse-port={str(sse_port)}",
"--sse-host=0.0.0.0",
"--pass-environment",
"--",
start_command
] + self.config["args"]
# Add server-specific environment variables
env = os.environ.copy()
if "env" in self.config:
env.update(self.config["env"])
# Start mcp-proxy process
self.process = subprocess.Popen(
[proxy_command] + args,
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=False
)
print(f"Started mcp-proxy for {self.name} on port {sse_port}")
return True
except Exception as e:
print(f"Error starting mcp-proxy for {self.name}: {e}")
return False
async def stop(self) -> bool:
"""Stop the server.
Returns:
True if the server was stopped successfully, False otherwise
"""
await self.cleanup()
if not self.process:
return True
try:
self.process.terminate()
# Wait for the process to terminate
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process = None
print(f"Stopped server {self.name}")
return True
except Exception as e:
print(f"Error stopping server {self.name}: {e}")
return False
async def cleanup(self) -> None:
"""Clean up resources."""
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
except Exception as e:
print(f"Error during cleanup of server {self.name}: {e}")
def is_running(self) -> bool:
"""Check if the server is running.
Returns:
True if the server is running, False otherwise
"""
if not self.process:
return False
# Check if the process is still alive
if self.process.poll() is not None:
# Process has terminated
self.process = None
return False
return True
class ServerManager:
"""Manages multiple MCP servers."""
def __init__(self) -> None:
"""Initialize the server manager."""
self.servers: Dict[str, Server] = {}
self.selected_servers: Set[str] = set()
def select_servers(self, server_names: List[str]) -> None:
"""Select a group of servers.
Args:
server_names: List of server names to select
"""
self.selected_servers = set(server_names)
def get_selected_servers(self) -> Set[str]:
"""Get the currently selected servers.
Returns:
Set of selected server names
"""
return self.selected_servers
async def start_server(self, name: str, config: Dict[str, Any]) -> bool:
"""Start an MCP server.
Args:
name: Server name
config: Server configuration
Returns:
True if the server was started successfully, False otherwise
"""
if name not in self.servers:
self.servers[name] = Server(name, config)
return await self.servers[name].start()
async def start_selected_servers(self, server_configs: Dict[str, Dict[str, Any]]) -> List[str]:
"""Start all selected servers.
Args:
server_configs: Dictionary of server configurations
Returns:
List of successfully started server names
"""
started_servers = []
for name in self.selected_servers:
if name in server_configs:
success = await self.start_server(name, server_configs[name])
if success:
print(f"Success in starting server {name}")
started_servers.append(name)
else:
print(f"Failed to start server {name}")
else:
print(f"Server {name} not found in configuration")
return started_servers
async def stop_server(self, name: str) -> bool:
"""Stop an MCP server.
Args:
name: Server name
Returns:
True if the server was stopped successfully, False otherwise
"""
if name not in self.servers:
print(f"Server {name} not found")
return False
return await self.servers[name].stop()
async def stop_all_servers(self) -> None:
"""Stop all servers."""
for name in list(self.servers.keys()):
await self.stop_server(name)
def get_running_servers(self) -> List[str]:
"""Get names of all running servers.
Returns:
List of running server names
"""
return [name for name, server in self.servers.items() if server.is_running()]
def is_server_running(self, name: str) -> bool:
"""Check if a server is running.
Args:
name: Server name
Returns:
True if the server is running, False otherwise
"""
if name not in self.servers:
return False
return self.servers[name].is_running()
```
--------------------------------------------------------------------------------
/client/app.py:
--------------------------------------------------------------------------------
```python
import asyncio
import datetime
import json
import os
import pyperclip
from typing import Dict, List, Set
import threading
from src.api_server import start_api_server
import time
import streamlit as st
from src.auth import AuthManager
from src.config import Configuration
from src.server import ServerManager
from src.api import ApiClient
# Set page configuration
st.set_page_config(
page_title="MCP Server Manager",
page_icon="🔌",
layout="wide",
initial_sidebar_state="expanded"
)
if "api_server_started" not in st.session_state:
api_thread = threading.Thread(target=start_api_server, daemon=True)
api_thread.start()
time.sleep(1)
st.session_state.api_server_started = True
# Initialize session state for persistent objects
if "server_manager" not in st.session_state:
st.session_state.server_manager = ServerManager()
if "config" not in st.session_state:
st.session_state.config = Configuration()
if "auth_manager" not in st.session_state:
st.session_state.auth_manager = AuthManager()
if "api_client" not in st.session_state:
st.session_state.api_client = ApiClient()
# Function to copy text to clipboard
def copy_to_clipboard(text):
pyperclip.copy(text)
return True
# Page title and description
st.title("🔌 MCP Server Manager")
st.markdown("""
This application helps you manage your MCP servers, launch them in groups, and generate API keys for use with the MCP proxy server.
""")
# Sidebar navigation
st.sidebar.header("Navigation")
page = st.sidebar.radio(
"Go to",
["Server Dashboard", "Launch Servers", "API Keys Management"]
)
# Server Dashboard page
if page == "Server Dashboard":
st.header("📋 Server Dashboard")
servers = st.session_state.config.get_servers()
if not servers:
st.warning("No servers configured. Please add servers to your servers_config.json file.")
st.subheader("Sample Configuration")
sample_config = {
"mcpServers": {
"logseq": {
"command": "uvx",
"args": ["mcp-server-logseq"],
"env": {
"LOGSEQ_API_TOKEN": "your_token",
"LOGSEQ_API_URL": "http://127.0.0.1:8000"
}
}
}
}
st.code(json.dumps(sample_config, indent=2), language="json")
else:
# Display running servers first
running_servers = st.session_state.server_manager.get_running_servers()
if running_servers:
st.subheader("🟢 Running Servers")
for name in running_servers:
with st.expander(f"{name}", expanded=True):
st.json(st.session_state.config.get_server_config(name))
if st.button(f"Stop {name}", key=f"stop_{name}"):
with st.spinner(f"Stopping {name}..."):
asyncio.run(st.session_state.server_manager.stop_server(name))
st.success(f"Server {name} stopped.")
st.rerun()
# Display all servers
st.subheader("📃 All Configured Servers")
for name, config in servers.items():
status = "🟢 Running" if st.session_state.server_manager.is_server_running(name) else "🔴 Stopped"
with st.expander(f"{name} - {status}"):
st.json(config)
# Launch Servers page
elif page == "Launch Servers":
st.header("🚀 Launch Server Group")
servers = st.session_state.config.get_servers()
if not servers:
st.warning("No servers configured. Please add servers to your servers_config.json file.")
else:
col1, col2 = st.columns([3, 1])
with col1:
# Server selection
server_names = list(servers.keys())
selected_servers = st.multiselect(
"Select servers to launch",
options=server_names,
default=list(st.session_state.server_manager.get_selected_servers()),
help="Select the MCP servers you want to launch as a group"
)
# Update selected servers
if selected_servers:
st.session_state.server_manager.select_servers(selected_servers)
# Launch button
launch_button = st.button("Launch Selected Servers", type="primary", disabled=len(selected_servers) == 0)
if launch_button:
with st.spinner("Starting servers..."):
# Use asyncio to start servers
started_servers = asyncio.run(
st.session_state.server_manager.start_selected_servers(servers)
)
if started_servers:
st.success(f"Started servers: {', '.join(started_servers)}")
# Generate API key for the started servers
api_key = st.session_state.auth_manager.generate_key(started_servers)
# Display the API key
st.subheader("🔑 Generated API Key")
key_col1, key_col2 = st.columns([5, 1])
with key_col1:
st.code(api_key, language="bash")
with key_col2:
if st.button("Copy", key="copy_api_key"):
copy_to_clipboard(api_key)
st.success("Copied to clipboard!")
# Instructions for using the API key
st.subheader("How to Use the API Key")
st.markdown("""
To use this API key with the MCP proxy server:
1. **Environment Variable:**
```bash
export MCP_API_KEY=your_api_key_here
```
2. **Start the MCP proxy server:**
```bash
mcp-proxy-server
```
3. **Or for SSE mode:**
```bash
node build/sse.js
```
""")
else:
st.error("Failed to start servers. Check the logs for more information.")
with col2:
# Information box
st.info(
"💡 **Tip:**\n\nStarting servers as a group allows you to generate a single API key for all of them.")
# Display running servers
running_servers = st.session_state.server_manager.get_running_servers()
if running_servers:
st.subheader("🟢 Currently Running Servers")
st.write(", ".join(running_servers))
# Stop all servers button
if st.button("Stop All Servers", type="secondary"):
with st.spinner("Stopping all servers..."):
asyncio.run(st.session_state.server_manager.stop_all_servers())
st.success("All servers stopped.")
st.rerun()
# API Keys Management page
elif page == "API Keys Management":
st.header("🔑 API Keys Management")
keys = st.session_state.auth_manager.get_all_keys()
if not keys:
st.info("No API keys have been generated yet. Launch a server group to generate an API key.")
else:
st.markdown("Below are the API keys you've generated for your MCP server groups.")
# Display all keys
for key, details in keys.items():
created_time = datetime.datetime.fromtimestamp(details['created'])
with st.expander(f"API Key: {key[:10]}..."):
col1, col2 = st.columns([3, 1])
with col1:
st.code(key, language="bash")
st.write(f"🖥️ **Servers:** {', '.join(details['servers'])}")
st.write(f"📅 **Created:** {created_time.strftime('%Y-%m-%d %H:%M:%S')}")
with col2:
if st.button("Copy", key=f"copy_{key[:8]}"):
copy_to_clipboard(key)
st.success("Copied to clipboard!")
if st.button("Revoke", key=f"revoke_{key[:8]}"):
if st.session_state.auth_manager.revoke_key(key):
st.success("API key revoked.")
st.rerun()
else:
st.error("Failed to revoke API key.")
# Footer
st.markdown("---")
st.markdown("© MCP Server Manager - Manage your MCP servers with ease")
```
--------------------------------------------------------------------------------
/server/src/mcp-proxy.ts:
--------------------------------------------------------------------------------
```typescript
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
CallToolRequestSchema,
GetPromptRequestSchema,
ListPromptsRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
Tool,
ListToolsResultSchema,
ListPromptsResultSchema,
ListResourcesResultSchema,
ReadResourceResultSchema,
ListResourceTemplatesRequestSchema,
ListResourceTemplatesResultSchema,
ResourceTemplate,
CompatibilityCallToolResultSchema,
GetPromptResultSchema
} from "@modelcontextprotocol/sdk/types.js";
import { createClients, ConnectedClient } from './client.js';
import { Config, loadConfig } from './config.js';
import { z } from 'zod';
import * as eventsource from 'eventsource';
global.EventSource = eventsource.EventSource
export const createServer = async (apiKey?: string) => {
const config = await loadConfig(apiKey);
if (config.servers.length === 0) {
console.warn('Warning: No servers found.');
} else {
console.log(`Found servers: ${config.servers.length}`);
}
const connectedClients = await createClients(config.servers);
console.log(`Connected to ${connectedClients.length} server`);
// Maps to track which client owns which resource
const toolToClientMap = new Map<string, ConnectedClient>();
const resourceToClientMap = new Map<string, ConnectedClient>();
const promptToClientMap = new Map<string, ConnectedClient>();
const server = new Server(
{
name: "mcp-proxy-server",
version: "1.0.0",
},
{
capabilities: {
prompts: {},
resources: { subscribe: true },
tools: {},
},
},
);
// List Tools Handler
server.setRequestHandler(ListToolsRequestSchema, async (request) => {
const allTools: Tool[] = [];
toolToClientMap.clear();
for (const connectedClient of connectedClients) {
try {
const result = await connectedClient.client.request(
{
method: 'tools/list',
params: {
_meta: request.params?._meta
}
},
ListToolsResultSchema
);
if (result.tools) {
const toolsWithSource = result.tools.map(tool => {
toolToClientMap.set(tool.name, connectedClient);
return {
...tool,
description: `[${connectedClient.name}] ${tool.description || ''}`
};
});
allTools.push(...toolsWithSource);
}
} catch (error) {
console.error(`Error fetching tools from ${connectedClient.name}:`, error);
}
}
return { tools: allTools };
});
// Call Tool Handler
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
const clientForTool = toolToClientMap.get(name);
if (!clientForTool) {
throw new Error(`Unknown tool: ${name}`);
}
try {
console.log('Forwarding tool call:', name);
// Use the correct schema for tool calls
return await clientForTool.client.request(
{
method: 'tools/call',
params: {
name,
arguments: args || {},
_meta: {
progressToken: request.params._meta?.progressToken
}
}
},
CompatibilityCallToolResultSchema
);
} catch (error) {
console.error(`Error calling tool through ${clientForTool.name}:`, error);
throw error;
}
});
// Get Prompt Handler
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name } = request.params;
const clientForPrompt = promptToClientMap.get(name);
if (!clientForPrompt) {
throw new Error(`Unknown prompt: ${name}`);
}
try {
console.log('Forwarding prompt request:', name);
// Match the exact structure from the example code
const response = await clientForPrompt.client.request(
{
method: 'prompts/get' as const,
params: {
name,
arguments: request.params.arguments || {},
_meta: request.params._meta || {
progressToken: undefined
}
}
},
GetPromptResultSchema
);
console.log('Prompt result:', response);
return response;
} catch (error) {
console.error(`Error getting prompt from ${clientForPrompt.name}:`, error);
throw error;
}
});
// List Prompts Handler
server.setRequestHandler(ListPromptsRequestSchema, async (request) => {
const allPrompts: z.infer<typeof ListPromptsResultSchema>['prompts'] = [];
promptToClientMap.clear();
for (const connectedClient of connectedClients) {
try {
const result = await connectedClient.client.request(
{
method: 'prompts/list' as const,
params: {
cursor: request.params?.cursor,
_meta: request.params?._meta || {
progressToken: undefined
}
}
},
ListPromptsResultSchema
);
if (result.prompts) {
const promptsWithSource = result.prompts.map(prompt => {
promptToClientMap.set(prompt.name, connectedClient);
return {
...prompt,
description: `[${connectedClient.name}] ${prompt.description || ''}`
};
});
allPrompts.push(...promptsWithSource);
}
} catch (error) {
console.error(`Error fetching prompts from ${connectedClient.name}:`, error);
}
}
return {
prompts: allPrompts,
nextCursor: request.params?.cursor
};
});
// List Resources Handler
server.setRequestHandler(ListResourcesRequestSchema, async (request) => {
const allResources: z.infer<typeof ListResourcesResultSchema>['resources'] = [];
resourceToClientMap.clear();
for (const connectedClient of connectedClients) {
try {
const result = await connectedClient.client.request(
{
method: 'resources/list',
params: {
cursor: request.params?.cursor,
_meta: request.params?._meta
}
},
ListResourcesResultSchema
);
if (result.resources) {
const resourcesWithSource = result.resources.map(resource => {
resourceToClientMap.set(resource.uri, connectedClient);
return {
...resource,
name: `[${connectedClient.name}] ${resource.name || ''}`
};
});
allResources.push(...resourcesWithSource);
}
} catch (error) {
console.error(`Error fetching resources from ${connectedClient.name}:`, error);
}
}
return {
resources: allResources,
nextCursor: undefined
};
});
// Read Resource Handler
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
const clientForResource = resourceToClientMap.get(uri);
if (!clientForResource) {
throw new Error(`Unknown resource: ${uri}`);
}
try {
return await clientForResource.client.request(
{
method: 'resources/read',
params: {
uri,
_meta: request.params._meta
}
},
ReadResourceResultSchema
);
} catch (error) {
console.error(`Error reading resource from ${clientForResource.name}:`, error);
throw error;
}
});
// List Resource Templates Handler
server.setRequestHandler(ListResourceTemplatesRequestSchema, async (request) => {
const allTemplates: ResourceTemplate[] = [];
for (const connectedClient of connectedClients) {
try {
const result = await connectedClient.client.request(
{
method: 'resources/templates/list' as const,
params: {
cursor: request.params?.cursor,
_meta: request.params?._meta || {
progressToken: undefined
}
}
},
ListResourceTemplatesResultSchema
);
if (result.resourceTemplates) {
const templatesWithSource = result.resourceTemplates.map(template => ({
...template,
name: `[${connectedClient.name}] ${template.name || ''}`,
description: template.description ? `[${connectedClient.name}] ${template.description}` : undefined
}));
allTemplates.push(...templatesWithSource);
}
} catch (error) {
console.error(`Error fetching resource templates from ${connectedClient.name}:`, error);
}
}
return {
resourceTemplates: allTemplates,
nextCursor: request.params?.cursor
};
});
const cleanup = async () => {
await Promise.all(connectedClients.map(({ cleanup }) => cleanup()));
};
return { server, cleanup };
};
```
--------------------------------------------------------------------------------
/example_llm_mcp/main.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Union
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI, ChatCompletion
class Configuration:
"""Manages configuration and environment variables for the MCP client."""
def __init__(self) -> None:
self.load_env()
self.api_key = os.getenv("OPENAI_API_KEY")
@staticmethod
def load_env() -> None:
load_dotenv()
@staticmethod
def load_config(file_path: str = "servers_config.json") -> dict[str, Any]:
with open(file_path, "r") as f:
return json.load(f)
@property
def llm_api_key(self) -> str:
if not self.api_key:
raise ValueError("LLM_API_KEY not found in environment variables")
return self.api_key
class Server:
"""Manages MCP server connections and tool execution."""
def __init__(self, name: str, config: dict[str, Any]) -> None:
self.name: str = name
self.config: dict[str, Any] = config
self.stdio_context: Any | None = None
self.session: ClientSession | None = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()
async def initialize(self) -> None:
command = (
shutil.which("npx")
if self.config["command"] == "npx"
else self.config["command"]
)
if command is None:
raise ValueError("The command must be a valid string and cannot be None.")
server_params = StdioServerParameters(
command=command,
args=self.config["args"],
env={**os.environ, **self.config["env"]}
if self.config.get("env")
else None,
)
try:
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = stdio_transport
session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
self.session = session
except Exception as e:
print(f"Error initializing server {self.name}: {e}")
await self.cleanup()
raise
async def list_tools(self) -> list[Any]:
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
tools_response = await self.session.list_tools()
tools = []
for item in tools_response:
if isinstance(item, tuple) and item[0] == "tools":
for tool in item[1]:
tools.append(Tool(tool.name, tool.description, tool.inputSchema))
return tools
async def execute_tool(
self,
tool_name: str,
arguments: dict[str, Any],
retries: int = 2,
delay: float = 1.0,
) -> Any:
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
attempt = 0
while attempt < retries:
try:
print(f"Executing tool {tool_name}")
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
attempt += 1
print(f"Error executing tool: {e}. Attempt {attempt} of {retries}.")
if attempt < retries:
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
print("Max retries reached. Failing.")
raise
async def cleanup(self) -> None:
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
self.stdio_context = None
except Exception as e:
print(f"Error during cleanup of server {self.name}: {e}")
class Tool:
"""Represents a tool with its properties and formatting."""
def __init__(
self, name: str, description: str, input_schema: dict[str, Any]
) -> None:
self.name: str = name
self.description: str = description
self.input_schema: dict[str, Any] = input_schema
def format_for_llm(self, provider_with_func_call: bool = False) -> str | dict:
if provider_with_func_call:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
param_name: param_info
for param_name, param_info in self.input_schema["properties"].items()
} if "properties" in self.input_schema else {},
"required": self.input_schema.get("required", []),
"additionalProperties": self.input_schema.get("additionalProperties", False)
}
}
}
else:
args_desc = []
if "properties" in self.input_schema:
for param_name, param_info in self.input_schema["properties"].items():
arg_desc = (
f"- {param_name}: {param_info.get('description', 'No description')}"
)
if param_name in self.input_schema.get("required", []):
arg_desc += " (required)"
args_desc.append(arg_desc)
return f"""
Tool: {self.name}
Description: {self.description}
Arguments:
{chr(10).join(args_desc)}
"""
class LLMClient:
"""Manages communication with the LLM provider."""
def __init__(self, api_key: str = os.getenv("OPENAI_API_KEY")) -> None:
self.api_key: str = api_key
self.client = OpenAI(api_key=self.api_key)
def get_response(
self,
messages: list[dict[str, str]],
temperature: float = 0.3,
model: str = "gpt-4o",
max_tokens: int = 4096,
tools: list[dict[str, Any]] | None = None,
) -> Union[str, ChatCompletion]:
if tools:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
tools=tools,
)
if response.choices[0].finish_reason == "tool_calls":
return response
else:
return response.choices[0].message.content
else:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return response.choices[0].message.content
class ChatSession:
"""Orchestrates the interaction between user, LLM, and tools."""
def __init__(self, servers: list[Server], llm_client: LLMClient) -> None:
self.servers: list[Server] = servers
self.llm_client: LLMClient = llm_client
async def cleanup_servers(self) -> None:
cleanup_tasks = []
for server in self.servers:
cleanup_tasks.append(asyncio.create_task(server.cleanup()))
if cleanup_tasks:
try:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
except Exception as e:
print(f"Warning during final cleanup: {e}")
async def process_llm_response(self, llm_response: Union[str, ChatCompletion]) -> str | list:
try:
if isinstance(llm_response, str):
tool_call = json.loads(llm_response)
if "tool" in tool_call and "arguments" in tool_call:
print(f"Executing tool: {tool_call['tool']}")
print(f"With arguments: {tool_call['arguments']}")
for server in self.servers:
tools = await server.list_tools()
if any(tool.name == tool_call["tool"] for tool in tools):
try:
result = await server.execute_tool(
tool_call["tool"], tool_call["arguments"]
)
if isinstance(result, dict) and "progress" in result:
progress = result["progress"]
total = result["total"]
percentage = (progress / total) * 100
print(f"Progress: {progress}/{total} ({percentage:.1f}%)")
return f"Tool execution result: {result}"
except Exception as e:
error_msg = f"Error executing tool: {str(e)}"
print(error_msg)
return error_msg
return llm_response
else:
if llm_response.choices[0].finish_reason == "tool_calls":
for tool_call in llm_response.choices[0].message.tool_calls:
function_call = json.loads(tool_call.function.to_json())
if "arguments" in function_call and "name" in function_call:
function_name = function_call["name"]
arguments = json.loads(function_call["arguments"])
print(f"Executing function: {function_name} with arguments: {arguments}")
results = []
for server in self.servers:
tools = await server.list_tools()
if any(tool.name == function_name for tool in tools):
try:
result = await server.execute_tool(
function_name, arguments
)
if isinstance(result, dict) and "progress" in result:
progress = result["progress"]
total = result["total"]
percentage = (progress / total) * 100
print(f"Progress: {progress}/{total} ({percentage:.1f}%)")
results.append(f"Tool execution result: {result}")
except Exception as e:
error_msg = f"Error executing tool: {str(e)}"
print(error_msg)
results.append(error_msg)
return results
return llm_response
except json.JSONDecodeError:
return llm_response
async def start(self) -> None:
try:
for server in self.servers:
try:
await server.initialize()
except Exception as e:
print(f"Failed to initialize server: {e}")
await self.cleanup_servers()
return
all_tools = []
for server in self.servers:
tools = await server.list_tools()
all_tools.extend(tools)
tools_schema = [tool.format_for_llm(provider_with_func_call=True) for tool in all_tools]
# tools_description = "\n".join([tool.format_for_llm() for tool in all_tools])
#
# system_message = (
# "You are a helpful assistant with access to these tools:\n\n"
# f"{tools_description}\n"
# "Choose the appropriate tool based on the user's question. "
# "If no tool is needed, reply directly.\n\n"
# "IMPORTANT: When you need to use a tool, you must ONLY respond with "
# "the exact JSON object format below, nothing else:\n"
# "{\n"
# ' "tool": "tool-name",\n'
# ' "arguments": {\n'
# ' "argument-name": "value"\n'
# " }\n"
# "}\n\n"
# "After receiving a tool's response:\n"
# "1. Transform the raw data into a natural, conversational response\n"
# "2. Keep responses concise but informative\n"
# "3. Focus on the most relevant information\n"
# "4. Use appropriate context from the user's question\n"
# "5. Avoid simply repeating the raw data\n\n"
# "Please use only the tools that are explicitly defined above."
# )
system_message = "You are a helpful assistant with access to some tools. Use them only when necessary."
messages = [{"role": "system", "content": system_message}]
while True:
try:
user_input = input("You: ").strip().lower()
if user_input in ["quit", "exit"]:
print("\nExiting...")
break
messages.append({"role": "user", "content": user_input})
# llm_response = self.llm_client.get_response(messages)
llm_response = self.llm_client.get_response(messages, tools=tools_schema)
print("\nAssistant: %s", llm_response)
result = await self.process_llm_response(llm_response)
if isinstance(result, list) or result != llm_response:
print(f"\nTool execution result: {result}")
tool_call_output = json.loads(llm_response.choices[0].message.to_json())
messages.append(tool_call_output)
messages[-1]['content'] = ""
if isinstance(result, str):
messages.append({ # append result message
"type": "function_call_output",
"tool_call_id": tool_call_output['tool_calls'][0]['id'],
"content": result,
"role": "tool",
})
else:
for i, res in enumerate(result):
messages.append({ # append result message
"type": "function_call_output",
"tool_call_id": tool_call_output['tool_calls'][i]['id'],
"content": res,
"role": "tool",
})
# messages.append({"role": "assistant", "content": llm_response})
# messages.append({"role": "system", "content": result})
final_response = self.llm_client.get_response(messages)
print("\nFinal response: %s", final_response)
messages.append(
{"role": "assistant", "content": final_response}
)
else:
messages.append({"role": "assistant", "content": llm_response})
except KeyboardInterrupt:
print("\nExiting...")
break
finally:
await self.cleanup_servers()
async def main() -> None:
"""Initialize and run the chat session."""
config = Configuration()
server_config = config.load_config("servers_config.json")
servers = [
Server(name, srv_config)
for name, srv_config in server_config["mcpServers"].items()
]
llm_client = LLMClient(config.llm_api_key)
chat_session = ChatSession(servers, llm_client)
await chat_session.start()
if __name__ == "__main__":
asyncio.run(main())
```