#
tokens: 14096/50000 23/23 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── client
│   ├── app.py
│   ├── requirements.txt
│   ├── servers_config_example.json
│   └── src
│       ├── __init__.py
│       ├── api_server.py
│       ├── api.py
│       ├── auth.py
│       ├── config.py
│       └── server.py
├── example_llm_mcp
│   ├── .env.example
│   ├── main.py
│   ├── requirements.txt
│   └── servers_config_example.json
├── files
│   ├── client.gif
│   └── llm_mcp_example.gif
├── LICENSE
├── README.md
└── server
    ├── package.json
    ├── src
    │   ├── auth.ts
    │   ├── client.ts
    │   ├── config.ts
    │   ├── index.ts
    │   ├── mcp-proxy.ts
    │   └── sse.ts
    └── tsconfig.json
```

# Files

--------------------------------------------------------------------------------
/example_llm_mcp/.env.example:
--------------------------------------------------------------------------------

```
OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
.venv/
.env
servers_config.json
.idea/
node_modules/
package-lock.json
api_keys.json
build/
__pycache__/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Pocket MCP Manager

A flexible and user-friendly management system for Model Context Protocol (MCP) servers, consisting of a client-server architecture that simplifies handling multiple MCP servers through a central interface.

## Overview

The Pocket MCP Manager streamlines the process of working with multiple MCP servers by allowing you to:

- Add all your MCP servers to a central management UI
- Selectively launch specific servers through an intuitive interface
- Generate API keys linked to these running servers
- Connect to them through a single proxy MCP server in clients like Claude or Cursor

This approach means you only need to update a single API key in your AI tools when changing which MCP servers you want to use, rather than reconfiguring multiple connection settings.

## Server Component

The server component is an MCP proxy server that:

1. Accepts an API key from the client
2. Connects to the already running MCP servers authorized by that key
3. Exposes all connected servers' capabilities through a unified MCP interface
4. Routes requests to the appropriate backend servers

### Server Installation

```bash
# Clone the repository
git clone [email protected]:dailydaniel/pocket-mcp.git
cd pocket-mcp/server

# Install dependencies
npm install

# The build step runs automatically during installation
```

### Connecting to Claude Desktop / Cursor

Add the following configuration to your Claude Desktop settings:

```json
{
  "mcpServers": {
    "mcp-proxy": {
      "command": "node",
      "args": ["/full/path/to/pocket-mcp/server/build/index.js"],
      "env": {
        "MCP_API_KEY": "api_key_from_client",
        "CLIENT_API_URL": "http://localhost:<port>/api"
      }
    }
  }
}
```

Replace:
- `/full/path/to/pocket-mcp/server/build/index.js` with the absolute path to your server's build/index.js file
- `api_key_from_client` with the API key generated from the client UI
- `<port>` with the port shown in the API server logs (typically 8000)

## Client Component

The client provides a web-based UI built with Streamlit for:

- Viewing all configured MCP servers
- Launching selected servers as a group
- Generating API keys for launched servers
- Managing existing API keys

### Client Setup

```bash
# Navigate to the client directory
cd pocket-mcp/client

# Create and activate a virtual environment
python -m venv .venv --prompt "mcp-venv"
source .venv/bin/activate

# Install requirements
pip install -r requirements.txt

# Copy the example config
cp servers_config_example.json servers_config.json

# Edit the configuration with your MCP servers
vim servers_config.json

# Run the client
streamlit run app.py
```

### Server Configuration Example

Create a `servers_config.json` file in the client directory with your MCP servers:

```json
{
  "mcpServers": {
    "jetbrains": {
      "command": "npx",
      "args": ["-y", "@jetbrains/mcp-proxy"]
    },
    "logseq": {
      "command": "uvx",
      "args": ["mcp-server-logseq"],
      "env": {
        "LOGSEQ_API_TOKEN": "API_KEY",
        "LOGSEQ_API_URL": "http://127.0.0.1:<port>"
      }
    },
    "brave-search": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-brave-search"
      ],
      "env": {
        "BRAVE_API_KEY": "API_KEY"
      }
    }
  }
}
```

Replace `API_KEY` and `<port>` with your actual values.

### Client Demo
![Client Demo](files/client.gif)

## Example LLM MCP Client

The repository includes an example client for chatting with LLMs using OpenAI API and MCP servers.

### Setup and Usage

```bash
# Navigate to the example client directory
cd pocket-mcp/example_llm_mcp

# Copy the example environment file
cp .env.example .env

# Edit the .env file and add your OpenAI API key
vim .env

# Add server configurations to the servers_config.json file
cp servers_config_example.json servers_config.json

# Add API key from the client
vim servers_config.json

# If not already in a virtual environment
# May use the same virtual environment as for the client
source ../client/.venv/bin/activate

# Install requirements
pip install -r requirements.txt

# Run the client
python3 main.py
```

The example client will connect to your running MCP servers and allow you to chat with an LLM while utilizing MCP capabilities.

### Chat Demo
![Chat Demo](files/llm_mcp_example.gif)

## Acknowledgments

- Server component based on [mcp-proxy-server](https://github.com/adamwattis/mcp-proxy-server)
- SSE implementation with [mcp-proxy](https://github.com/sparfenyuk/mcp-proxy)

## TODO

- Add additional functionality to the client component
- Convert the server into a standalone npm module that can be installed with npx
- Delete cringe copyright notice from streamlit app :)
```

--------------------------------------------------------------------------------
/client/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/example_llm_mcp/requirements.txt:
--------------------------------------------------------------------------------

```
python-dotenv
mcp
openai
```

--------------------------------------------------------------------------------
/client/requirements.txt:
--------------------------------------------------------------------------------

```
streamlit>=1.25.0
mcp>=0.6.0
python-dotenv>=1.0.0
pyperclip>=1.8.2
fastapi>=0.95.0
uvicorn>=0.21.0
mcp-proxy>=0.5.1
```

--------------------------------------------------------------------------------
/example_llm_mcp/servers_config_example.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "mcp-proxy": {
      "command": "node",
      "args": ["/full/path/to/pocket-mcp/server/build/index.js"],
      "env": {
        "MCP_API_KEY": "API_KEY",
        "CLIENT_API_URL": "http://localhost:<PORT>/api"
      }
    }
  }
}
```

--------------------------------------------------------------------------------
/server/tsconfig.json:
--------------------------------------------------------------------------------

```json
 {
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./build",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules"]
}
```

--------------------------------------------------------------------------------
/client/servers_config_example.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "jetbrains": {
      "command": "npx",
      "args": ["-y", "@jetbrains/mcp-proxy"]
    },
    "logseq": {
      "command": "uvx",
      "args": ["mcp-server-logseq"],
      "env": {
        "LOGSEQ_API_TOKEN": "API_KEY",
        "LOGSEQ_API_URL": "http://127.0.0.1:<port>"
      }
    },
    "brave-search": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-brave-search"
      ],
      "env": {
        "BRAVE_API_KEY": "API_KEY"
      }
    }
  }
}
```

--------------------------------------------------------------------------------
/server/src/index.ts:
--------------------------------------------------------------------------------

```typescript
#!/usr/bin/env node

import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { createServer } from "./mcp-proxy.js";

async function main() {
  const apiKey = process.env.MCP_API_KEY;
  
  const transport = new StdioServerTransport();
  
  try {
    const { server, cleanup } = await createServer(apiKey);

    await server.connect(transport);

    process.on("SIGINT", async () => {
      await cleanup();
      await server.close();
      process.exit(0);
    });
  } catch (error) {
    console.error("Server error:", error);
    process.exit(1);
  }
}

main().catch((error) => {
  console.error("Error:", error);
  process.exit(1);
});

```

--------------------------------------------------------------------------------
/client/src/api.py:
--------------------------------------------------------------------------------

```python
import json
import os
from typing import Any, Dict, List, Optional


class ApiClient:
    """Client for generating server configurations for MCP API server."""

    @staticmethod
    def generate_server_config(
            api_key: str,
            server_configs: Dict[str, Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Generate MCP Proxy Server configuration based on API key.

        Args:
            api_key: API key for authentication
            server_configs: Dictionary of server configurations

        Returns:
            Dictionary with instructions for proxy server
        """
        return {
            "api_key": api_key,
            "instructions": {
                "env_variable": "MCP_API_KEY",
                "command": "mcp-proxy-server",
                "sse_command": "node build/sse.js"
            }
        }
```

--------------------------------------------------------------------------------
/server/src/auth.ts:
--------------------------------------------------------------------------------

```typescript
// src/auth.ts
import axios from 'axios';
import { ServerConfig } from './config.js';

const CLIENT_API_URL = process.env.CLIENT_API_URL || 'http://localhost:8000/api';

export interface AuthResponse {
  success: boolean;
  servers: ServerConfig[];
  message?: string;
}

export async function authenticateAndGetServers(apiKey: string): Promise<ServerConfig[]> {
  try {
    const response = await axios.get<AuthResponse>(`${CLIENT_API_URL}/servers`, {
      headers: {
        Authorization: `Bearer ${apiKey}`
      }
    });

    if (!response.data.success) {
      throw new Error(response.data.message || 'Auth failed');
    }

    return response.data.servers;
  } catch (error) {
    if (axios.isAxiosError(error)) {
      if (error.response?.status === 401) {
        throw new Error('Invalid API key');
      } else if (error.response?.status === 403) {
        throw new Error('No permission');
      }
      throw new Error(`Auth error: ${error.message}`);
    }
    throw error;
  }
}

```

--------------------------------------------------------------------------------
/server/src/config.ts:
--------------------------------------------------------------------------------

```typescript
// src/config.ts
import { readFile } from 'fs/promises';
import { resolve } from 'path';
import { authenticateAndGetServers } from './auth.js';

export type TransportConfigStdio = {
  type?: 'stdio'
  command: string;
  args?: string[];
  env?: string[]
}

export type TransportConfigSSE = {
  type: 'sse'
  url: string
}

export type TransportConfig = TransportConfigSSE | TransportConfigStdio
export interface ServerConfig {
  name: string;
  transport: TransportConfig;
}

export interface Config {
  servers: ServerConfig[];
}

export const loadConfig = async (apiKey?: string): Promise<Config> => {
  if (apiKey) {
    try {
      const servers = await authenticateAndGetServers(apiKey);
      return { servers };
    } catch (error) {
      console.error('Auth error with API key:', error);
      throw error;
    }
  }

  try {
    const configPath = process.env.MCP_CONFIG_PATH || resolve(process.cwd(), 'config.json');
    console.log(`Load config from: ${configPath}`);
    const fileContents = await readFile(configPath, 'utf-8');
    return JSON.parse(fileContents);
  } catch (error) {
    console.error('Error in loading config.json:', error);
    return { servers: [] };
  }
};

```

--------------------------------------------------------------------------------
/server/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "mcp-proxy-server",
  "version": "0.0.1",
  "author": "Your Name",
  "license": "MIT",
  "description": "An MCP proxy server that aggregates and serves multiple MCP resource servers through a single interface with API key support",
  "private": false,
  "type": "module",
  "bin": {
    "mcp-proxy-server": "./build/index.js"
  },
  "files": [
    "build"
  ],
  "scripts": {
    "dev": "nodemon --watch 'src/**' --ext 'ts,json' --ignore 'src/**/*.spec.ts' --exec 'tsx src/index.ts'",
    "dev:sse": "nodemon --watch 'src/**' --ext 'ts,json' --ignore 'src/**/*.spec.ts' --exec 'tsx src/sse.ts'",
    "build": "tsc",
    "postbuild": "chmod +x build/index.js",
    "prepare": "npm run build",
    "watch": "tsc --watch"
  },
  "dependencies": {
    "@modelcontextprotocol/sdk": "0.6.0",
    "@types/cors": "^2.8.17",
    "axios": "^1.6.0",
    "cors": "^2.8.5",
    "eventsource": "^3.0.2",
    "express": "^4.21.1",
    "zod": "^3.23.8",
    "zod-to-json-schema": "^3.23.5"
  },
  "devDependencies": {
    "@types/express": "^5.0.0",
    "@types/node": "^20.11.24",
    "nodemon": "^3.1.9",
    "tsx": "^4.19.2",
    "typescript": "^5.3.3"
  },
  "repository": {
    "type": "git",
    "url": "https://github.com/dailydaniel/pocket-mcp",
    "directory": "server"
  }
}

```

--------------------------------------------------------------------------------
/server/src/sse.ts:
--------------------------------------------------------------------------------

```typescript
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
import { createServer } from "./mcp-proxy.js";
import cors from "cors";

const app = express();
app.use(cors());

const apiKey = process.env.MCP_API_KEY;

let transport: SSEServerTransport;
let cleanupFunction: () => Promise<void>;

async function initServer() {
  try {
    const { server, cleanup } = await createServer(apiKey);
    cleanupFunction = cleanup;

    app.get("/sse", async (req, res) => {
      console.log("Got connection");
      transport = new SSEServerTransport("/message", res);
      await server.connect(transport);

      server.onerror = (err) => {
        console.error(`Server error: ${err.stack}`);
      };

      server.onclose = async () => {
        console.log('Server closed');
        if (process.env.KEEP_SERVER_OPEN !== "1") {
          await cleanup();
          await server.close();
          process.exit(0);
        }
      };
    });

    app.post("/message", async (req, res) => {
      console.log("Got message");
      await transport.handlePostMessage(req, res);
    });
  } catch (error) {
    console.error("Error in server initialization:", error);
    process.exit(1);
  }
}

initServer();

const PORT = process.env.PORT || 3006;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`);
});

process.on("SIGINT", async () => {
  if (cleanupFunction) {
    await cleanupFunction();
  }
  process.exit(0);
});

```

--------------------------------------------------------------------------------
/client/src/config.py:
--------------------------------------------------------------------------------

```python
import json
import os
from typing import Any, Dict, Optional

from dotenv import load_dotenv


class Configuration:
    """Manages configuration and environment variables for the MCP client."""

    def __init__(self, config_path: str = "servers_config.json") -> None:
        """Initialize the configuration manager.

        Args:
            config_path: Path to the configuration file
        """
        self.config_path = config_path
        self.load_env()
        self.config = self.load_config()

    @staticmethod
    def load_env() -> None:
        """Load environment variables from .env file."""
        load_dotenv()

    def load_config(self, file_path: Optional[str] = None) -> Dict[str, Any]:
        """Load configuration from the JSON file.

        Args:
            file_path: Optional path to the configuration file (overrides config_path)

        Returns:
            Dictionary containing the configuration
        """
        try:
            path = file_path or self.config_path
            if os.path.exists(path):
                with open(path, "r") as f:
                    return json.load(f)
            return {"mcpServers": {}}
        except Exception as e:
            print(f"Error loading configuration: {e}")
            return {"mcpServers": {}}

    def save_config(self, config: Dict[str, Any]) -> None:
        """Save configuration to the JSON file.

        Args:
            config: Configuration dictionary to save
        """
        try:
            with open(self.config_path, "w") as f:
                json.dump(config, f, indent=2)
            self.config = config
        except Exception as e:
            print(f"Error saving configuration: {e}")

    def get_server_config(self, server_name: str) -> Optional[Dict[str, Any]]:
        """Get configuration for a specific server.

        Args:
            server_name: Name of the server

        Returns:
            Server configuration or None if not found
        """
        return self.config.get("mcpServers", {}).get(server_name)

    def get_servers(self) -> Dict[str, Dict[str, Any]]:
        """Get all server configurations.

        Returns:
            Dictionary of server configurations
        """
        return self.config.get("mcpServers", {})

```

--------------------------------------------------------------------------------
/client/src/api_server.py:
--------------------------------------------------------------------------------

```python
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, List, Optional
import socket

from .auth import AuthManager
from .server import ServerManager
from .config import Configuration

API_PORT = None

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

auth_manager = AuthManager()
config = Configuration()
server_manager = ServerManager()


async def verify_api_key(authorization: str = Header(None)):
    if not authorization:
        raise HTTPException(status_code=401, detail="API key required")

    if authorization.startswith("Bearer "):
        api_key = authorization[7:]
    else:
        api_key = authorization

    is_valid, server_names = auth_manager.validate_key(api_key)
    if not is_valid:
        raise HTTPException(status_code=401, detail="Invalid API key")

    return server_names


@app.get("/api/servers")
async def get_servers(server_names: List[str] = Depends(verify_api_key)):
    servers_config = config.get_servers()

    authorized_servers = []
    for name in server_names:
        if name in servers_config:
            sse_port = 3000 + hash(name) % 1000

            authorized_servers.append({
                "name": name,
                "transport": {
                    "type": "sse",
                    "url": f"http://0.0.0.0:{sse_port}/sse"
                }
            })

    return {
        "success": True,
        "servers": authorized_servers
    }


@app.get("/api/health")
async def health_check():
    return {"status": "ok"}


def find_free_port(start_port=8000, max_attempts=100):
    for port in range(start_port, start_port + max_attempts):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('127.0.0.1', port))
                return port
        except OSError:
            continue
    raise IOError("No free ports found")


def start_api_server(host="0.0.0.0", port=None):
    if port is None:
        port = find_free_port()
    print(f"Starting API server on port {port}")
    global API_PORT
    API_PORT = port
    uvicorn.run(app, host=host, port=port)


if __name__ == "__main__":
    start_api_server()

```

--------------------------------------------------------------------------------
/server/src/client.ts:
--------------------------------------------------------------------------------

```typescript
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { ServerConfig } from './config.js';

const sleep = (time: number) => new Promise<void>(resolve => setTimeout(() => resolve(), time))
export interface ConnectedClient {
  client: Client;
  cleanup: () => Promise<void>;
  name: string;
}

const createClient = (server: ServerConfig): { client: Client | undefined, transport: Transport | undefined } => {

  let transport: Transport | null = null
  try {
    if (server.transport.type === 'sse') {
      transport = new SSEClientTransport(new URL(server.transport.url));
    } else {
      transport = new StdioClientTransport({
        command: server.transport.command,
        args: server.transport.args,
        env: server.transport.env ? server.transport.env.reduce((o, v) => ({
          [v]: process.env[v] || ''
        }), {}) : undefined
      });
    }
  } catch (error) {
    console.error(`Failed to create transport ${server.transport.type || 'stdio'} to ${server.name}:`, error);
  }

  if (!transport) {
    console.warn(`Transport ${server.name} not available.`)
    return { transport: undefined, client: undefined }
  }

  const client = new Client({
    name: 'mcp-proxy-client',
    version: '1.0.0',
  }, {
    capabilities: {
      prompts: {},
      resources: { subscribe: true },
      tools: {}
    }
  });

  return { client, transport }
}

export const createClients = async (servers: ServerConfig[]): Promise<ConnectedClient[]> => {
  const clients: ConnectedClient[] = [];

  for (const server of servers) {
    console.log(`Connecting to server: ${server.name}`);

    const waitFor = 2500
    const retries = 3
    let count = 0
    let retry = true

    while (retry) {

      const { client, transport } = createClient(server)
      if (!client || !transport) {
        break
      }

      try {
        await client.connect(transport);
        console.log(`Connected to server: ${server.name}`);

        clients.push({
          client,
          name: server.name,
          cleanup: async () => {
            await transport.close();
          }
        });

        break

      } catch (error) {
        console.error(`Failed to connect to ${server.name}:`, error);
        count++
        retry = (count < retries)
        if (retry) {
          try {
            await client.close()
          } catch { }
          console.log(`Retry connection to ${server.name} in ${waitFor}ms (${count}/${retries})`);
          await sleep(waitFor)
        }
      }

    }

  }

  return clients;
};
```

--------------------------------------------------------------------------------
/client/src/auth.py:
--------------------------------------------------------------------------------

```python
import json
import os
import secrets
import time
from typing import Dict, List, Optional, Set, Tuple, Union


class AuthManager:
    """Manages API keys and server access."""

    def __init__(self, keys_file: str = "api_keys.json") -> None:
        """Initialize the authentication manager.

        Args:
            keys_file: Path to the API keys file
        """
        self.keys_file = keys_file
        self.keys = self.load_keys()

    def load_keys(self) -> Dict[str, Dict[str, Union[List[str], int]]]:
        """Load API keys from the JSON file.

        Returns:
            Dictionary of API keys and their associated servers
        """
        try:
            if os.path.exists(self.keys_file):
                with open(self.keys_file, "r") as f:
                    return json.load(f)
            return {}
        except Exception as e:
            print(f"Error loading API keys: {e}")
            return {}

    def save_keys(self) -> None:
        """Save API keys to the JSON file."""
        try:
            with open(self.keys_file, "w") as f:
                json.dump(self.keys, f, indent=2)
        except Exception as e:
            print(f"Error saving API keys: {e}")

    def generate_key(self, server_names: List[str]) -> str:
        """Generate a new API key for a group of servers.

        Args:
            server_names: List of server names to associate with the key

        Returns:
            Generated API key
        """
        # Generate a random token
        token = secrets.token_urlsafe(32)

        # Store the key with associated servers and creation time
        self.keys[token] = {
            "servers": server_names,
            "created": int(time.time())
        }

        # Save the updated keys
        self.save_keys()

        return token

    def validate_key(self, key: str) -> Tuple[bool, List[str]]:
        """Validate an API key and return associated servers.

        Args:
            key: API key to validate

        Returns:
            Tuple of (is_valid, server_names)
        """
        # print(f"current keys: {self.keys}")
        self.keys = self.load_keys()
        # print(f"current keys after reload: {self.keys}")
        # print(f"validating key: {key}")

        if key in self.keys:
            return True, self.keys[key]["servers"]
        return False, []

    def revoke_key(self, key: str) -> bool:
        """Revoke an API key.

        Args:
            key: API key to revoke

        Returns:
            True if the key was revoked, False otherwise
        """
        if key in self.keys:
            del self.keys[key]
            self.save_keys()
            return True
        return False

    def get_all_keys(self) -> Dict[str, Dict[str, Union[List[str], int]]]:
        """Get all API keys.

        Returns:
            Dictionary of API keys and their details
        """
        return self.keys

```

--------------------------------------------------------------------------------
/client/src/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import os
import shutil
import subprocess
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional, Set, Tuple

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


class Server:
    """Manages an MCP server connection and execution."""

    def __init__(self, name: str, config: Dict[str, Any]) -> None:
        """Initialize the server.

        Args:
            name: Server name
            config: Server configuration
        """
        self.name: str = name
        self.config: Dict[str, Any] = config
        self.process: Optional[subprocess.Popen] = None
        self.session: Optional[ClientSession] = None
        self._cleanup_lock: asyncio.Lock = asyncio.Lock()
        self.exit_stack: AsyncExitStack = AsyncExitStack()

    async def start(self) -> bool:
        """Start the server process via mcp-proxy in SSE mode."""
        if self.process and self.is_running():
            print(f"Server {self.name} is already running")
            return True

        try:
            # Prepare command and environment for mcp-proxy
            proxy_command = shutil.which("mcp-proxy")
            if not proxy_command:
                print(f"Command 'mcp-proxy' not found. Please install it.")
                return False

            # Define the SSE port for this server
            sse_port = self.config.get("sse_port", 3000 + hash(self.name) % 1000)

            start_command = shutil.which(self.config["command"])
            print(f"start command: {start_command}")
            if not start_command:
                print(f"Command {self.config['command']} not found. Please install Node.js and npm.")

            args = [
                       "--allow-origin='*'",
                       f"--sse-port={str(sse_port)}",
                       "--sse-host=0.0.0.0",
                       "--pass-environment",
                       "--",
                       start_command
                   ] + self.config["args"]

            # Add server-specific environment variables
            env = os.environ.copy()
            if "env" in self.config:
                env.update(self.config["env"])

            # Start mcp-proxy process
            self.process = subprocess.Popen(
                [proxy_command] + args,
                env=env,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=False
            )

            print(f"Started mcp-proxy for {self.name} on port {sse_port}")
            return True

        except Exception as e:
            print(f"Error starting mcp-proxy for {self.name}: {e}")
            return False

    async def stop(self) -> bool:
        """Stop the server.

        Returns:
            True if the server was stopped successfully, False otherwise
        """
        await self.cleanup()

        if not self.process:
            return True

        try:
            self.process.terminate()

            # Wait for the process to terminate
            try:
                self.process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self.process.kill()

            self.process = None
            print(f"Stopped server {self.name}")
            return True

        except Exception as e:
            print(f"Error stopping server {self.name}: {e}")
            return False

    async def cleanup(self) -> None:
        """Clean up resources."""
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
            except Exception as e:
                print(f"Error during cleanup of server {self.name}: {e}")

    def is_running(self) -> bool:
        """Check if the server is running.

        Returns:
            True if the server is running, False otherwise
        """
        if not self.process:
            return False

        # Check if the process is still alive
        if self.process.poll() is not None:
            # Process has terminated
            self.process = None
            return False

        return True


class ServerManager:
    """Manages multiple MCP servers."""

    def __init__(self) -> None:
        """Initialize the server manager."""
        self.servers: Dict[str, Server] = {}
        self.selected_servers: Set[str] = set()

    def select_servers(self, server_names: List[str]) -> None:
        """Select a group of servers.

        Args:
            server_names: List of server names to select
        """
        self.selected_servers = set(server_names)

    def get_selected_servers(self) -> Set[str]:
        """Get the currently selected servers.

        Returns:
            Set of selected server names
        """
        return self.selected_servers

    async def start_server(self, name: str, config: Dict[str, Any]) -> bool:
        """Start an MCP server.

        Args:
            name: Server name
            config: Server configuration

        Returns:
            True if the server was started successfully, False otherwise
        """
        if name not in self.servers:
            self.servers[name] = Server(name, config)

        return await self.servers[name].start()

    async def start_selected_servers(self, server_configs: Dict[str, Dict[str, Any]]) -> List[str]:
        """Start all selected servers.

        Args:
            server_configs: Dictionary of server configurations

        Returns:
            List of successfully started server names
        """
        started_servers = []

        for name in self.selected_servers:
            if name in server_configs:
                success = await self.start_server(name, server_configs[name])
                if success:
                    print(f"Success in starting server {name}")
                    started_servers.append(name)
                else:
                    print(f"Failed to start server {name}")
            else:
                print(f"Server {name} not found in configuration")

        return started_servers

    async def stop_server(self, name: str) -> bool:
        """Stop an MCP server.

        Args:
            name: Server name

        Returns:
            True if the server was stopped successfully, False otherwise
        """
        if name not in self.servers:
            print(f"Server {name} not found")
            return False

        return await self.servers[name].stop()

    async def stop_all_servers(self) -> None:
        """Stop all servers."""
        for name in list(self.servers.keys()):
            await self.stop_server(name)

    def get_running_servers(self) -> List[str]:
        """Get names of all running servers.

        Returns:
            List of running server names
        """
        return [name for name, server in self.servers.items() if server.is_running()]

    def is_server_running(self, name: str) -> bool:
        """Check if a server is running.

        Args:
            name: Server name

        Returns:
            True if the server is running, False otherwise
        """
        if name not in self.servers:
            return False

        return self.servers[name].is_running()

```

--------------------------------------------------------------------------------
/client/app.py:
--------------------------------------------------------------------------------

```python
import asyncio
import datetime
import json
import os
import pyperclip
from typing import Dict, List, Set

import threading
from src.api_server import start_api_server
import time

import streamlit as st

from src.auth import AuthManager
from src.config import Configuration
from src.server import ServerManager
from src.api import ApiClient

# Set page configuration
st.set_page_config(
    page_title="MCP Server Manager",
    page_icon="🔌",
    layout="wide",
    initial_sidebar_state="expanded"
)

if "api_server_started" not in st.session_state:
    api_thread = threading.Thread(target=start_api_server, daemon=True)
    api_thread.start()
    time.sleep(1)
    st.session_state.api_server_started = True

# Initialize session state for persistent objects
if "server_manager" not in st.session_state:
    st.session_state.server_manager = ServerManager()
if "config" not in st.session_state:
    st.session_state.config = Configuration()
if "auth_manager" not in st.session_state:
    st.session_state.auth_manager = AuthManager()
if "api_client" not in st.session_state:
    st.session_state.api_client = ApiClient()


# Function to copy text to clipboard
def copy_to_clipboard(text):
    pyperclip.copy(text)
    return True


# Page title and description
st.title("🔌 MCP Server Manager")
st.markdown("""
This application helps you manage your MCP servers, launch them in groups, and generate API keys for use with the MCP proxy server.
""")

# Sidebar navigation
st.sidebar.header("Navigation")
page = st.sidebar.radio(
    "Go to",
    ["Server Dashboard", "Launch Servers", "API Keys Management"]
)

# Server Dashboard page
if page == "Server Dashboard":
    st.header("📋 Server Dashboard")

    servers = st.session_state.config.get_servers()

    if not servers:
        st.warning("No servers configured. Please add servers to your servers_config.json file.")

        st.subheader("Sample Configuration")
        sample_config = {
            "mcpServers": {
                "logseq": {
                    "command": "uvx",
                    "args": ["mcp-server-logseq"],
                    "env": {
                        "LOGSEQ_API_TOKEN": "your_token",
                        "LOGSEQ_API_URL": "http://127.0.0.1:8000"
                    }
                }
            }
        }
        st.code(json.dumps(sample_config, indent=2), language="json")
    else:
        # Display running servers first
        running_servers = st.session_state.server_manager.get_running_servers()
        if running_servers:
            st.subheader("🟢 Running Servers")
            for name in running_servers:
                with st.expander(f"{name}", expanded=True):
                    st.json(st.session_state.config.get_server_config(name))
                    if st.button(f"Stop {name}", key=f"stop_{name}"):
                        with st.spinner(f"Stopping {name}..."):
                            asyncio.run(st.session_state.server_manager.stop_server(name))
                            st.success(f"Server {name} stopped.")
                            st.rerun()

        # Display all servers
        st.subheader("📃 All Configured Servers")
        for name, config in servers.items():
            status = "🟢 Running" if st.session_state.server_manager.is_server_running(name) else "🔴 Stopped"
            with st.expander(f"{name} - {status}"):
                st.json(config)

# Launch Servers page
elif page == "Launch Servers":
    st.header("🚀 Launch Server Group")

    servers = st.session_state.config.get_servers()

    if not servers:
        st.warning("No servers configured. Please add servers to your servers_config.json file.")
    else:
        col1, col2 = st.columns([3, 1])

        with col1:
            # Server selection
            server_names = list(servers.keys())
            selected_servers = st.multiselect(
                "Select servers to launch",
                options=server_names,
                default=list(st.session_state.server_manager.get_selected_servers()),
                help="Select the MCP servers you want to launch as a group"
            )

            # Update selected servers
            if selected_servers:
                st.session_state.server_manager.select_servers(selected_servers)

            # Launch button
            launch_button = st.button("Launch Selected Servers", type="primary", disabled=len(selected_servers) == 0)

            if launch_button:
                with st.spinner("Starting servers..."):
                    # Use asyncio to start servers
                    started_servers = asyncio.run(
                        st.session_state.server_manager.start_selected_servers(servers)
                    )

                    if started_servers:
                        st.success(f"Started servers: {', '.join(started_servers)}")

                        # Generate API key for the started servers
                        api_key = st.session_state.auth_manager.generate_key(started_servers)

                        # Display the API key
                        st.subheader("🔑 Generated API Key")
                        key_col1, key_col2 = st.columns([5, 1])
                        with key_col1:
                            st.code(api_key, language="bash")
                        with key_col2:
                            if st.button("Copy", key="copy_api_key"):
                                copy_to_clipboard(api_key)
                                st.success("Copied to clipboard!")

                        # Instructions for using the API key
                        st.subheader("How to Use the API Key")
                        st.markdown("""
                        To use this API key with the MCP proxy server:

                        1. **Environment Variable:**
                        ```bash
                        export MCP_API_KEY=your_api_key_here
                        ```

                        2. **Start the MCP proxy server:**
                        ```bash
                        mcp-proxy-server
                        ```

                        3. **Or for SSE mode:**
                        ```bash
                        node build/sse.js
                        ```
                        """)
                    else:
                        st.error("Failed to start servers. Check the logs for more information.")

        with col2:
            # Information box
            st.info(
                "💡 **Tip:**\n\nStarting servers as a group allows you to generate a single API key for all of them.")

    # Display running servers
    running_servers = st.session_state.server_manager.get_running_servers()
    if running_servers:
        st.subheader("🟢 Currently Running Servers")
        st.write(", ".join(running_servers))

        # Stop all servers button
        if st.button("Stop All Servers", type="secondary"):
            with st.spinner("Stopping all servers..."):
                asyncio.run(st.session_state.server_manager.stop_all_servers())
                st.success("All servers stopped.")
                st.rerun()

# API Keys Management page
elif page == "API Keys Management":
    st.header("🔑 API Keys Management")

    keys = st.session_state.auth_manager.get_all_keys()

    if not keys:
        st.info("No API keys have been generated yet. Launch a server group to generate an API key.")
    else:
        st.markdown("Below are the API keys you've generated for your MCP server groups.")

        # Display all keys
        for key, details in keys.items():
            created_time = datetime.datetime.fromtimestamp(details['created'])

            with st.expander(f"API Key: {key[:10]}..."):
                col1, col2 = st.columns([3, 1])

                with col1:
                    st.code(key, language="bash")
                    st.write(f"🖥️ **Servers:** {', '.join(details['servers'])}")
                    st.write(f"📅 **Created:** {created_time.strftime('%Y-%m-%d %H:%M:%S')}")

                with col2:
                    if st.button("Copy", key=f"copy_{key[:8]}"):
                        copy_to_clipboard(key)
                        st.success("Copied to clipboard!")

                    if st.button("Revoke", key=f"revoke_{key[:8]}"):
                        if st.session_state.auth_manager.revoke_key(key):
                            st.success("API key revoked.")
                            st.rerun()
                        else:
                            st.error("Failed to revoke API key.")

# Footer
st.markdown("---")
st.markdown("© MCP Server Manager - Manage your MCP servers with ease")

```

--------------------------------------------------------------------------------
/server/src/mcp-proxy.ts:
--------------------------------------------------------------------------------

```typescript
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
  CallToolRequestSchema,
  GetPromptRequestSchema,
  ListPromptsRequestSchema,
  ListResourcesRequestSchema,
  ListToolsRequestSchema,
  ReadResourceRequestSchema,
  Tool,
  ListToolsResultSchema,
  ListPromptsResultSchema,
  ListResourcesResultSchema,
  ReadResourceResultSchema,
  ListResourceTemplatesRequestSchema,
  ListResourceTemplatesResultSchema,
  ResourceTemplate,
  CompatibilityCallToolResultSchema,
  GetPromptResultSchema
} from "@modelcontextprotocol/sdk/types.js";
import { createClients, ConnectedClient } from './client.js';
import { Config, loadConfig } from './config.js';
import { z } from 'zod';
import * as eventsource from 'eventsource';

global.EventSource = eventsource.EventSource

export const createServer = async (apiKey?: string) => {
  const config = await loadConfig(apiKey);
  
  if (config.servers.length === 0) {
    console.warn('Warning: No servers found.');
  } else {
    console.log(`Found servers: ${config.servers.length}`);
  }
  
  const connectedClients = await createClients(config.servers);
  console.log(`Connected to ${connectedClients.length} server`);

  // Maps to track which client owns which resource
  const toolToClientMap = new Map<string, ConnectedClient>();
  const resourceToClientMap = new Map<string, ConnectedClient>();
  const promptToClientMap = new Map<string, ConnectedClient>();

  const server = new Server(
    {
      name: "mcp-proxy-server",
      version: "1.0.0",
    },
    {
      capabilities: {
        prompts: {},
        resources: { subscribe: true },
        tools: {},
      },
    },
  );

  // List Tools Handler
  server.setRequestHandler(ListToolsRequestSchema, async (request) => {
    const allTools: Tool[] = [];
    toolToClientMap.clear();

    for (const connectedClient of connectedClients) {
      try {
        const result = await connectedClient.client.request(
          {
            method: 'tools/list',
            params: {
              _meta: request.params?._meta
            }
          },
          ListToolsResultSchema
        );

        if (result.tools) {
          const toolsWithSource = result.tools.map(tool => {
            toolToClientMap.set(tool.name, connectedClient);
            return {
              ...tool,
              description: `[${connectedClient.name}] ${tool.description || ''}`
            };
          });
          allTools.push(...toolsWithSource);
        }
      } catch (error) {
        console.error(`Error fetching tools from ${connectedClient.name}:`, error);
      }
    }

    return { tools: allTools };
  });

  // Call Tool Handler
  server.setRequestHandler(CallToolRequestSchema, async (request) => {
    const { name, arguments: args } = request.params;
    const clientForTool = toolToClientMap.get(name);

    if (!clientForTool) {
      throw new Error(`Unknown tool: ${name}`);
    }

    try {
      console.log('Forwarding tool call:', name);

      // Use the correct schema for tool calls
      return await clientForTool.client.request(
        {
          method: 'tools/call',
          params: {
            name,
            arguments: args || {},
            _meta: {
              progressToken: request.params._meta?.progressToken
            }
          }
        },
        CompatibilityCallToolResultSchema
      );
    } catch (error) {
      console.error(`Error calling tool through ${clientForTool.name}:`, error);
      throw error;
    }
  });

  // Get Prompt Handler
  server.setRequestHandler(GetPromptRequestSchema, async (request) => {
    const { name } = request.params;
    const clientForPrompt = promptToClientMap.get(name);

    if (!clientForPrompt) {
      throw new Error(`Unknown prompt: ${name}`);
    }

    try {
      console.log('Forwarding prompt request:', name);

      // Match the exact structure from the example code
      const response = await clientForPrompt.client.request(
        {
          method: 'prompts/get' as const,
          params: {
            name,
            arguments: request.params.arguments || {},
            _meta: request.params._meta || {
              progressToken: undefined
            }
          }
        },
        GetPromptResultSchema
      );

      console.log('Prompt result:', response);
      return response;
    } catch (error) {
      console.error(`Error getting prompt from ${clientForPrompt.name}:`, error);
      throw error;
    }
  });

  // List Prompts Handler
  server.setRequestHandler(ListPromptsRequestSchema, async (request) => {
    const allPrompts: z.infer<typeof ListPromptsResultSchema>['prompts'] = [];
    promptToClientMap.clear();

    for (const connectedClient of connectedClients) {
      try {
        const result = await connectedClient.client.request(
          {
            method: 'prompts/list' as const,
            params: {
              cursor: request.params?.cursor,
              _meta: request.params?._meta || {
                progressToken: undefined
              }
            }
          },
          ListPromptsResultSchema
        );

        if (result.prompts) {
          const promptsWithSource = result.prompts.map(prompt => {
            promptToClientMap.set(prompt.name, connectedClient);
            return {
              ...prompt,
              description: `[${connectedClient.name}] ${prompt.description || ''}`
            };
          });
          allPrompts.push(...promptsWithSource);
        }
      } catch (error) {
        console.error(`Error fetching prompts from ${connectedClient.name}:`, error);
      }
    }

    return {
      prompts: allPrompts,
      nextCursor: request.params?.cursor
    };
  });

  // List Resources Handler
  server.setRequestHandler(ListResourcesRequestSchema, async (request) => {
    const allResources: z.infer<typeof ListResourcesResultSchema>['resources'] = [];
    resourceToClientMap.clear();

    for (const connectedClient of connectedClients) {
      try {
        const result = await connectedClient.client.request(
          {
            method: 'resources/list',
            params: {
              cursor: request.params?.cursor,
              _meta: request.params?._meta
            }
          },
          ListResourcesResultSchema
        );

        if (result.resources) {
          const resourcesWithSource = result.resources.map(resource => {
            resourceToClientMap.set(resource.uri, connectedClient);
            return {
              ...resource,
              name: `[${connectedClient.name}] ${resource.name || ''}`
            };
          });
          allResources.push(...resourcesWithSource);
        }
      } catch (error) {
        console.error(`Error fetching resources from ${connectedClient.name}:`, error);
      }
    }

    return {
      resources: allResources,
      nextCursor: undefined
    };
  });

  // Read Resource Handler
  server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
    const { uri } = request.params;
    const clientForResource = resourceToClientMap.get(uri);

    if (!clientForResource) {
      throw new Error(`Unknown resource: ${uri}`);
    }

    try {
      return await clientForResource.client.request(
        {
          method: 'resources/read',
          params: {
            uri,
            _meta: request.params._meta
          }
        },
        ReadResourceResultSchema
      );
    } catch (error) {
      console.error(`Error reading resource from ${clientForResource.name}:`, error);
      throw error;
    }
  });

  // List Resource Templates Handler
  server.setRequestHandler(ListResourceTemplatesRequestSchema, async (request) => {
    const allTemplates: ResourceTemplate[] = [];

    for (const connectedClient of connectedClients) {
      try {
        const result = await connectedClient.client.request(
          {
            method: 'resources/templates/list' as const,
            params: {
              cursor: request.params?.cursor,
              _meta: request.params?._meta || {
                progressToken: undefined
              }
            }
          },
          ListResourceTemplatesResultSchema
        );

        if (result.resourceTemplates) {
          const templatesWithSource = result.resourceTemplates.map(template => ({
            ...template,
            name: `[${connectedClient.name}] ${template.name || ''}`,
            description: template.description ? `[${connectedClient.name}] ${template.description}` : undefined
          }));
          allTemplates.push(...templatesWithSource);
        }
      } catch (error) {
        console.error(`Error fetching resource templates from ${connectedClient.name}:`, error);
      }
    }

    return {
      resourceTemplates: allTemplates,
      nextCursor: request.params?.cursor
    };
  });

  const cleanup = async () => {
    await Promise.all(connectedClients.map(({ cleanup }) => cleanup()));
  };

  return { server, cleanup };
};

```

--------------------------------------------------------------------------------
/example_llm_mcp/main.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Union

from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI, ChatCompletion


class Configuration:
    """Manages configuration and environment variables for the MCP client."""

    def __init__(self) -> None:
        self.load_env()
        self.api_key = os.getenv("OPENAI_API_KEY")

    @staticmethod
    def load_env() -> None:
        load_dotenv()

    @staticmethod
    def load_config(file_path: str = "servers_config.json") -> dict[str, Any]:
        with open(file_path, "r") as f:
            return json.load(f)

    @property
    def llm_api_key(self) -> str:
        if not self.api_key:
            raise ValueError("LLM_API_KEY not found in environment variables")
        return self.api_key


class Server:
    """Manages MCP server connections and tool execution."""

    def __init__(self, name: str, config: dict[str, Any]) -> None:
        self.name: str = name
        self.config: dict[str, Any] = config
        self.stdio_context: Any | None = None
        self.session: ClientSession | None = None
        self._cleanup_lock: asyncio.Lock = asyncio.Lock()
        self.exit_stack: AsyncExitStack = AsyncExitStack()

    async def initialize(self) -> None:
        command = (
            shutil.which("npx")
            if self.config["command"] == "npx"
            else self.config["command"]
        )
        if command is None:
            raise ValueError("The command must be a valid string and cannot be None.")

        server_params = StdioServerParameters(
            command=command,
            args=self.config["args"],
            env={**os.environ, **self.config["env"]}
            if self.config.get("env")
            else None,
        )
        try:
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            self.session = session
        except Exception as e:
            print(f"Error initializing server {self.name}: {e}")
            await self.cleanup()
            raise

    async def list_tools(self) -> list[Any]:
        if not self.session:
            raise RuntimeError(f"Server {self.name} not initialized")

        tools_response = await self.session.list_tools()
        tools = []

        for item in tools_response:
            if isinstance(item, tuple) and item[0] == "tools":
                for tool in item[1]:
                    tools.append(Tool(tool.name, tool.description, tool.inputSchema))

        return tools

    async def execute_tool(
            self,
            tool_name: str,
            arguments: dict[str, Any],
            retries: int = 2,
            delay: float = 1.0,
    ) -> Any:
        if not self.session:
            raise RuntimeError(f"Server {self.name} not initialized")

        attempt = 0
        while attempt < retries:
            try:
                print(f"Executing tool {tool_name}")
                result = await self.session.call_tool(tool_name, arguments)

                return result

            except Exception as e:
                attempt += 1
                print(f"Error executing tool: {e}. Attempt {attempt} of {retries}.")
                if attempt < retries:
                    print(f"Retrying in {delay} seconds...")
                    await asyncio.sleep(delay)
                else:
                    print("Max retries reached. Failing.")
                    raise

    async def cleanup(self) -> None:
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
                self.stdio_context = None
            except Exception as e:
                print(f"Error during cleanup of server {self.name}: {e}")


class Tool:
    """Represents a tool with its properties and formatting."""

    def __init__(
            self, name: str, description: str, input_schema: dict[str, Any]
    ) -> None:
        self.name: str = name
        self.description: str = description
        self.input_schema: dict[str, Any] = input_schema

    def format_for_llm(self, provider_with_func_call: bool = False) -> str | dict:
        if provider_with_func_call:
            return {
                "type": "function",
                "function": {
                    "name": self.name,
                    "description": self.description,
                    "parameters": {
                        "type": "object",
                        "properties": {
                            param_name: param_info
                            for param_name, param_info in self.input_schema["properties"].items()
                        } if "properties" in self.input_schema else {},
                        "required": self.input_schema.get("required", []),
                        "additionalProperties": self.input_schema.get("additionalProperties", False)
                    }
                }
            }
        else:
            args_desc = []

            if "properties" in self.input_schema:
                for param_name, param_info in self.input_schema["properties"].items():
                    arg_desc = (
                        f"- {param_name}: {param_info.get('description', 'No description')}"
                    )
                    if param_name in self.input_schema.get("required", []):
                        arg_desc += " (required)"
                    args_desc.append(arg_desc)

            return f"""
Tool: {self.name}
Description: {self.description}
Arguments:
{chr(10).join(args_desc)}
"""


class LLMClient:
    """Manages communication with the LLM provider."""

    def __init__(self, api_key: str = os.getenv("OPENAI_API_KEY")) -> None:
        self.api_key: str = api_key
        self.client = OpenAI(api_key=self.api_key)

    def get_response(
            self,
            messages: list[dict[str, str]],
            temperature: float = 0.3,
            model: str = "gpt-4o",
            max_tokens: int = 4096,
            tools: list[dict[str, Any]] | None = None,
    ) -> Union[str, ChatCompletion]:
        if tools:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                tools=tools,
            )

            if response.choices[0].finish_reason == "tool_calls":
                return response
            else:
                return response.choices[0].message.content
        else:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
            )

            return response.choices[0].message.content


class ChatSession:
    """Orchestrates the interaction between user, LLM, and tools."""

    def __init__(self, servers: list[Server], llm_client: LLMClient) -> None:
        self.servers: list[Server] = servers
        self.llm_client: LLMClient = llm_client

    async def cleanup_servers(self) -> None:
        cleanup_tasks = []
        for server in self.servers:
            cleanup_tasks.append(asyncio.create_task(server.cleanup()))

        if cleanup_tasks:
            try:
                await asyncio.gather(*cleanup_tasks, return_exceptions=True)
            except Exception as e:
                print(f"Warning during final cleanup: {e}")

    async def process_llm_response(self, llm_response: Union[str, ChatCompletion]) -> str | list:
        try:
            if isinstance(llm_response, str):
                tool_call = json.loads(llm_response)
                if "tool" in tool_call and "arguments" in tool_call:
                    print(f"Executing tool: {tool_call['tool']}")
                    print(f"With arguments: {tool_call['arguments']}")

                    for server in self.servers:
                        tools = await server.list_tools()
                        if any(tool.name == tool_call["tool"] for tool in tools):
                            try:
                                result = await server.execute_tool(
                                    tool_call["tool"], tool_call["arguments"]
                                )

                                if isinstance(result, dict) and "progress" in result:
                                    progress = result["progress"]
                                    total = result["total"]
                                    percentage = (progress / total) * 100
                                    print(f"Progress: {progress}/{total} ({percentage:.1f}%)")

                                return f"Tool execution result: {result}"
                            except Exception as e:
                                error_msg = f"Error executing tool: {str(e)}"
                                print(error_msg)
                                return error_msg
                return llm_response
            else:
                if llm_response.choices[0].finish_reason == "tool_calls":
                    for tool_call in llm_response.choices[0].message.tool_calls:
                        function_call = json.loads(tool_call.function.to_json())
                        if "arguments" in function_call and "name" in function_call:
                            function_name = function_call["name"]
                            arguments = json.loads(function_call["arguments"])
                            print(f"Executing function: {function_name} with arguments: {arguments}")
                            results = []

                            for server in self.servers:
                                tools = await server.list_tools()
                                if any(tool.name == function_name for tool in tools):
                                    try:
                                        result = await server.execute_tool(
                                            function_name, arguments
                                        )

                                        if isinstance(result, dict) and "progress" in result:
                                            progress = result["progress"]
                                            total = result["total"]
                                            percentage = (progress / total) * 100
                                            print(f"Progress: {progress}/{total} ({percentage:.1f}%)")

                                        results.append(f"Tool execution result: {result}")
                                    except Exception as e:
                                        error_msg = f"Error executing tool: {str(e)}"
                                        print(error_msg)
                                        results.append(error_msg)

                            return results

            return llm_response
        except json.JSONDecodeError:
            return llm_response

    async def start(self) -> None:
        try:
            for server in self.servers:
                try:
                    await server.initialize()
                except Exception as e:
                    print(f"Failed to initialize server: {e}")
                    await self.cleanup_servers()
                    return

            all_tools = []
            for server in self.servers:
                tools = await server.list_tools()
                all_tools.extend(tools)

            tools_schema = [tool.format_for_llm(provider_with_func_call=True) for tool in all_tools]

            # tools_description = "\n".join([tool.format_for_llm() for tool in all_tools])
            #
            # system_message = (
            #     "You are a helpful assistant with access to these tools:\n\n"
            #     f"{tools_description}\n"
            #     "Choose the appropriate tool based on the user's question. "
            #     "If no tool is needed, reply directly.\n\n"
            #     "IMPORTANT: When you need to use a tool, you must ONLY respond with "
            #     "the exact JSON object format below, nothing else:\n"
            #     "{\n"
            #     '    "tool": "tool-name",\n'
            #     '    "arguments": {\n'
            #     '        "argument-name": "value"\n'
            #     "    }\n"
            #     "}\n\n"
            #     "After receiving a tool's response:\n"
            #     "1. Transform the raw data into a natural, conversational response\n"
            #     "2. Keep responses concise but informative\n"
            #     "3. Focus on the most relevant information\n"
            #     "4. Use appropriate context from the user's question\n"
            #     "5. Avoid simply repeating the raw data\n\n"
            #     "Please use only the tools that are explicitly defined above."
            # )

            system_message = "You are a helpful assistant with access to some tools. Use them only when necessary."

            messages = [{"role": "system", "content": system_message}]

            while True:
                try:
                    user_input = input("You: ").strip().lower()
                    if user_input in ["quit", "exit"]:
                        print("\nExiting...")
                        break

                    messages.append({"role": "user", "content": user_input})

                    # llm_response = self.llm_client.get_response(messages)
                    llm_response = self.llm_client.get_response(messages, tools=tools_schema)
                    print("\nAssistant: %s", llm_response)

                    result = await self.process_llm_response(llm_response)

                    if isinstance(result, list) or result != llm_response:
                        print(f"\nTool execution result: {result}")
                        tool_call_output = json.loads(llm_response.choices[0].message.to_json())
                        messages.append(tool_call_output)
                        messages[-1]['content'] = ""

                        if isinstance(result, str):
                            messages.append({                               # append result message
                                "type": "function_call_output",
                                "tool_call_id": tool_call_output['tool_calls'][0]['id'],
                                "content": result,
                                "role": "tool",
                            })
                        else:
                            for i, res in enumerate(result):
                                messages.append({                               # append result message
                                    "type": "function_call_output",
                                    "tool_call_id": tool_call_output['tool_calls'][i]['id'],
                                    "content": res,
                                    "role": "tool",
                                })
                        # messages.append({"role": "assistant", "content": llm_response})
                        # messages.append({"role": "system", "content": result})

                        final_response = self.llm_client.get_response(messages)
                        print("\nFinal response: %s", final_response)
                        messages.append(
                            {"role": "assistant", "content": final_response}
                        )
                    else:
                        messages.append({"role": "assistant", "content": llm_response})

                except KeyboardInterrupt:
                    print("\nExiting...")
                    break

        finally:
            await self.cleanup_servers()


async def main() -> None:
    """Initialize and run the chat session."""
    config = Configuration()
    server_config = config.load_config("servers_config.json")
    servers = [
        Server(name, srv_config)
        for name, srv_config in server_config["mcpServers"].items()
    ]
    llm_client = LLMClient(config.llm_api_key)
    chat_session = ChatSession(servers, llm_client)
    await chat_session.start()


if __name__ == "__main__":
    asyncio.run(main())
```