#
tokens: 7809/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── package.json
├── README.md
├── src
│   └── index.ts
└── tsconfig.json
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Dependencies
node_modules/
package-lock.json

# Build output
build/
dist/
*.js
*.js.map
*.d.ts
*.d.ts.map

# IDEs and editors
.idea/
.vscode/
*.swp
*.swo
.DS_Store
docs/
memory-bank/

# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Environment variables
.env
.env.*

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown

<div align="center">

# MCP BatchIt

**Batch multiple MCP tool calls into a single "batch_execute" request—reducing overhead and token usage for AI agents.**

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

</div>

---

## Table of Contents

1. [Introduction](#introduction)
2. [Why Use BatchIt](#why-use-batchit)
3. [Key Features & Limitations](#key-features--limitations)
4. [Installation & Startup](#installation--startup)
5. [Multi-Phase Usage](#multi-phase-usage)
   - [Implementation Phases](#implementation-phases)
     - [Information Gathering](#information-gathering)
     - [LLM‐Only Step (List Code Definitions)](#llm-only-step-list-code-definitions)
     - [Document Creation](#document-creation)
6. [FAQ](#faq)
7. [License](#license)

---

## Introduction
> ⚠️ **NOTICE: Work in Progress**
>
> This project is actively being developed to address several complex challenges:
> - Maintaining backwards compatibility with existing MCP servers
> - Resolving transport complexities with multi-connection clients (Cline, Roo, Claude Desktop)
> - Creating a beginner-friendly implementation
>
> While functional, expect ongoing improvements and changes as we refine the solution.

**MCP BatchIt** is a simple aggregator server in the [Model Context Protocol (MCP)](https://modelcontext.ai/) ecosystem. It exposes just **one** tool: **`batch_execute`**. Rather than calling multiple MCP tools (like `fetch`, `read_file`, `create_directory`, `write_file`, etc.) in **separate** messages, you can **batch** them together in one aggregator request.

This dramatically reduces token usage, network overhead, and repeated context in your AI agent or LLM conversation.

---

## Why Use BatchIt

- **One Action per Message** Problem:
  Normally, an LLM or AI agent can only call a single MCP tool at a time, forcing multiple calls for multi-step tasks.

- **Excessive Round Trips**:
  10 separate file operations might require 10 messages → 10 responses.

- **BatchIt’s Approach**:
  1. Takes a single `batch_execute` request.
  2. Spawns (or connects to) the actual target MCP server (like a filesystem server) behind the scenes.
  3. Runs each sub-operation (tool call) in parallel up to `maxConcurrent`.
  4. If one sub-op fails and `stopOnError` is true, it halts new sub-ops.
  5. Returns one consolidated JSON result.

---

## Key Features & Limitations

### Features

1. **Single “Batch Execute” Tool**
   - You simply specify a list of sub‐ops referencing your existing MCP server’s tools.

2. **Parallel Execution**
   - Run multiple sub-ops at once, controlled by `maxConcurrent`.

3. **Timeout & Stop on Error**
   - Each sub-op races a `timeoutMs`, and you can skip remaining ops if one fails.

4. **Connection Caching**
   - Reuses the same connection to the downstream MCP server for repeated calls, closing after an idle timeout.

### Limitations

1. **No Data Passing Mid-Batch**
   - If sub-op #2 depends on #1’s output, do multiple aggregator calls.
2. **No Partial Progress**
   - You get all sub-ops’ results together at the end of each “batch_execute.”
3. **Must Use a Real MCP Server**
   - If you spawn or connect to the aggregator itself, you’ll see “tool not found.” The aggregator only has “batch_execute.”
4. **One Target Server per Call**
   - Each aggregator call references a single target MCP server. If you want multiple servers, you’d do more advanced logic or separate calls.

---

## Installation & Startup

```bash
git clone https://github.com/ryanjoachim/mcp-batchit.git
cd mcp-batchit
npm install
npm run build
npm start
```

BatchIt starts on **STDIO** by default so your AI agent (or any MCP client) can spawn it. For example:

```
mcp-batchit is running on stdio. Ready to batch-execute!
```

You can now send JSON-RPC requests (`tools/call` method, `name= "batch_execute"`) to it.

---

## MEMORY BANK

Using Cline/Roo Code, you can build a framework of contextual project documentation by leveraging the powerful "Memory Bank" custom instructions developed by Nick Baumann.

[View Memory Bank Documentation](https://github.com/nickbaumann98/cline_docs/blob/main/prompting/custom%20instructions%20library/cline-memory-bank.md)

#### Traditional Approach (19+ calls):

1. Read package.json
2. Wait for response
3. Read README.md
4. Wait for response
5. List code definitions
6. Wait for response
7. Create memory-bank directory
8. Wait for response
9. Write productContext.md
10. Write systemPatterns.md
11. Write techContext.md
12. Write progress.md
13. Write activeContext.md
14. Wait for responses (5 more calls)

Total: ~19 separate API calls (13 operations + 6 response waits)

#### BatchIt Approach (1-3 calls)

### Multi-Phase Usage

When working with complex multi-step tasks that depend on real-time output (such as reading files and generating documentation), you'll need to handle the process in distinct phases. This is necessary because **BatchIt** doesn't support data passing between sub-operations within the same request.

### Implementation Phases

#### Information Gathering

In this initial phase, we gather information from the filesystem by reading necessary files (e.g., `package.json`, `README.md`). This is accomplished through a **batch_execute** call to the filesystem MCP server:

```jsonc
{
  "targetServer": {
    "name": "filesystem",
    "serverType": {
      "type": "filesystem",
      "config": {
        "rootDirectory": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
      }
    },
    "transport": {
      "type": "stdio",
      "command": "cmd.exe",
      "args": [
        "/c",
        "npx",
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
      ]
    }
  },
  "operations": [
    {
      "tool": "read_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/package.json"
      }
    },
    {
      "tool": "read_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/README.md"
      }
    }
  ],
  "options": {
    "maxConcurrent": 2,
    "stopOnError": true,
    "timeoutMs": 30000
  }
}
```

**Note**: The aggregator spawns `@modelcontextprotocol/server-filesystem` (via `npx`) to execute parallel `read_file` operations.

#### LLM‐Only Step (List Code Definitions)

This phase involves processing outside the aggregator, typically using LLM or AI agent capabilities:

```typescript
<list_code_definition_names>
<path>src</path>
</list_code_definition_names>
```

This step utilizes Roo Code's `list_code_definition_names` tool, which is exclusively available to LLMs. However, note that many MCP servers can provide similar functionality, making it possible to complete this process without LLM requests.

#### Document Creation

The final phase combines data from previous steps (file contents and code definitions) to generate documentation in the `memory-bank` directory:

```jsonc
{
  "targetServer": {
    "name": "filesystem",
    "serverType": {
      "type": "filesystem",
      "config": {
        "rootDirectory": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
      }
    },
    "transport": {
      "type": "stdio",
      "command": "cmd.exe",
      "args": [
        "/c",
        "npx",
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
      ]
    }
  },
  "operations": [
    {
      "tool": "create_directory",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank"
      }
    },
    {
      "tool": "write_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/productContext.md",
        "content": "# MCP BatchIt Product Context\\n\\n## Purpose\\n..."
      }
    },
    {
      "tool": "write_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/systemPatterns.md",
        "content": "# MCP BatchIt System Patterns\\n\\n## Architecture Overview\\n..."
      }
    },
    {
      "tool": "write_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/techContext.md",
        "content": "# MCP BatchIt Technical Context\\n\\n## Technology Stack\\n..."
      }
    },
    {
      "tool": "write_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/progress.md",
        "content": "# MCP BatchIt Progress Status\\n\\n## Completed Features\\n..."
      }
    },
    {
      "tool": "write_file",
      "arguments": {
        "path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/activeContext.md",
        "content": "# MCP BatchIt Active Context\\n\\n## Current Status\\n..."
      }
    }
  ],
  "options": {
    "maxConcurrent": 1,
    "stopOnError": true,
    "timeoutMs": 30000
  }
}
```

The aggregator processes these operations sequentially (`maxConcurrent=1`), creating the directory and writing multiple documentation files. The result array indicates the success/failure status of each operation.

---

## FAQ

**Q1: Do I need multiple aggregator calls if sub-op #2 depends on sub-op #1’s results?**
**Yes.** BatchIt doesn’t pass data between sub-ops in the same request. You do multi-phase calls (like the example above).

**Q2: Why do I get “Tool create_directory not found” sometimes?**
Because your `transport` might be pointing to the aggregator script itself instead of the real MCP server. Make sure you reference something like `@modelcontextprotocol/server-filesystem`.

**Q3: Can I do concurrency plus stopOnError?**
Absolutely. If a sub-op fails, we skip launching new sub-ops. Already-running ones finish in parallel.

**Q4: Does BatchIt re-spawn the target server each time?**
It *can* if you specify `keepAlive: false`. But if you use the same exact `targetServer.name + transport`, it caches the connection until an idle timeout passes.

**Q5: Are partial results returned if an error occurs in the middle?**
Yes. Each sub-op that finished prior to the error is included in the final aggregator response, along with the failing sub-op. Remaining sub-ops are skipped if `stopOnError` is true.

---

## License

**MIT**

```

--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------

```json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "node16",
    "declaration": true,
    "declarationMap": true,
    "sourceMap": true,
    "outDir": "./build",
    "rootDir": "./src",
    "strict": true,
    "noImplicitAny": true,
    "strictNullChecks": true,
    "strictFunctionTypes": true,
    "strictBindCallApply": true,
    "strictPropertyInitialization": true,
    "noImplicitThis": true,
    "alwaysStrict": true,
    "noUnusedLocals": true,
    "noUnusedParameters": true,
    "noImplicitReturns": true,
    "noFallthroughCasesInSwitch": true,
    "allowJs": true,
    "resolveJsonModule": true,
    "allowSyntheticDefaultImports": true,
    "esModuleInterop": true,
    "experimentalDecorators": true,
    "emitDecoratorMetadata": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "build", "**/*.test.ts"]
}

```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "mcp-batchit",
  "version": "1.0.0",
  "description": "Batch multiple MCP tool calls into a single request—reducing overhead and token usage for AI agents",
  "main": "build/index.js",
  "type": "module",
  "types": "build/index.d.ts",
  "bin": {
    "mcp-batchit": "./build/index.js"
  },
  "files": [
    "build"
  ],
  "scripts": {
    "build": "tsc",
    "prepare": "npm run build",
    "watch": "tsc --watch",
    "test": "jest",
    "lint": "eslint src --ext .ts",
    "clean": "rimraf build"
  },
  "config": {
    "run-script": {
      "build": "npm run build"
    }
  },
  "keywords": [
    "mcp",
    "modelcontextprotocol",
    "batch",
    "operations",
    "ai",
    "llm",
    "agent",
    "filesystem",
    "aggregator"
  ],
  "author": "Ryan Joachim",
  "repository": {
    "type": "git",
    "url": "https://github.com/ryanjoachim/mcp-batchit.git"
  },
  "license": "MIT",
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.4.0"
  },
  "devDependencies": {
    "@types/jest": "^29.5.11",
    "@types/node": "^22.8.0",
    "jest": "^29.7.0",
    "rimraf": "^5.0.5",
    "ts-jest": "^29.1.1",
    "ts-node": "^10.9.2",
    "typescript": "^5.7.2"
  }
}

```

--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------

```typescript
#!/usr/bin/env node

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { WebSocketClientTransport } from "@modelcontextprotocol/sdk/client/websocket.js"
import { z } from "zod"
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js"
import { ChildProcess } from "child_process"
import { existsSync } from "fs"
import { isAbsolute } from "path"

// Array of patterns that indicate self-referential usage
const SELF_REFERENCE_PATTERNS = [
  // Direct file path references
  "mcp-batchit/build/index.js",
  "mcp-batchit/dist/index.js",
  "mcp-batchit/lib/index.js",

  // NPM package references
  "@modelcontextprotocol/batchit",
  "@modelcontextprotocol/server-batchit",

  // Common variations
  "mcp-batchit",
  "batchit",
  "server-batchit",
]

// Transport error handling
enum TransportErrorType {
  CommandNotFound = "CommandNotFound",
  ConnectionFailed = "ConnectionFailed",
  ValidationFailed = "ValidationFailed",
  ConfigurationInvalid = "ConfigurationInvalid",
}

class TransportError extends Error {
  constructor(
    public type: TransportErrorType,
    message: string,
    public cause?: Error
  ) {
    super(message)
    this.name = "TransportError"
    Error.captureStackTrace(this, TransportError)
  }
}

// Server Type Definitions
interface FilesystemServerConfig {
  rootDirectory?: string
  permissions?: string
  watchMode?: boolean
}

interface DatabaseServerConfig {
  database: string
  readOnly?: boolean
  poolSize?: number
}

interface GenericServerConfig {
  [key: string]: unknown
}

type ServerType =
  | { type: "filesystem"; config: FilesystemServerConfig }
  | { type: "database"; config: DatabaseServerConfig }
  | { type: "generic"; config: GenericServerConfig }

// Transport Configuration
type TransportConfig =
  | {
      type: "stdio"
      command: string
      args?: string[]
      env?: Record<string, string>
    }
  | {
      type: "websocket"
      url: string
      options?: Record<string, unknown>
    }

interface ServerIdentity {
  name: string
  serverType: ServerType
  transport: TransportConfig
  maxIdleTimeMs?: number
}

interface ServerConnection {
  client: Client
  transport: WebSocketClientTransport | StdioClientTransport
  childProcess?: ChildProcess
  lastUsed: number
  identity: ServerIdentity
}

interface HPCContentItem {
  type: string
  text?: string
}

interface HPCErrorResponse {
  isError: true
  error?: string
  message?: string
  content?: HPCContentItem[]
}

function isHPCErrorResponse(value: unknown): value is HPCErrorResponse {
  return (
    value !== null &&
    typeof value === "object" &&
    "isError" in value &&
    value.isError === true
  )
}

// Type guard for StdioClientTransport
function isStdioTransport(transport: any): transport is StdioClientTransport {
  return "start" in transport
}

// Schema Definitions
const ServerTypeSchema = z.discriminatedUnion("type", [
  z.object({
    type: z.literal("filesystem"),
    config: z.object({
      rootDirectory: z.string().optional(),
      permissions: z.string().optional(),
      watchMode: z.boolean().optional(),
    }),
  }),
  z.object({
    type: z.literal("database"),
    config: z.object({
      database: z.string(),
      readOnly: z.boolean().optional(),
      poolSize: z.number().optional(),
    }),
  }),
  z.object({
    type: z.literal("generic"),
    config: z.record(z.unknown()),
  }),
])

const TransportConfigSchema = z.discriminatedUnion("type", [
  z.object({
    type: z.literal("stdio"),
    command: z.string(),
    args: z.array(z.string()).optional(),
    env: z.record(z.string()).optional(),
  }),
  z.object({
    type: z.literal("websocket"),
    url: z.string(),
    options: z.record(z.unknown()).optional(),
  }),
])

const BatchArgsSchema = z.object({
  targetServer: z.object({
    name: z.string(),
    serverType: ServerTypeSchema,
    transport: TransportConfigSchema,
    maxIdleTimeMs: z.number().optional(),
  }),
  operations: z.array(
    z.object({
      tool: z.string(),
      arguments: z.record(z.unknown()).default({}),
    })
  ),
  options: z
    .object({
      maxConcurrent: z.number().default(10),
      timeoutMs: z.number().default(30000),
      stopOnError: z.boolean().default(false),
      keepAlive: z.boolean().default(false),
    })
    .default({
      maxConcurrent: 10,
      timeoutMs: 30000,
      stopOnError: false,
      keepAlive: false,
    }),
})

// Connection Management
class ConnectionManager {
  private connections = new Map<string, ServerConnection>()
  private cleanupIntervals = new Map<string, NodeJS.Timeout>()

  createKeyForIdentity(identity: ServerIdentity): string {
    return JSON.stringify({
      name: identity.name,
      serverType: identity.serverType,
      transport: identity.transport,
    })
  }

  private validateStdioConfig(
    config: Extract<TransportConfig, { type: "stdio" }>
  ) {
    if (!config.command) {
      throw new TransportError(
        TransportErrorType.ConfigurationInvalid,
        "Command is required for stdio transport"
      )
    }

    if (!config.args?.length) {
      throw new TransportError(
        TransportErrorType.ConfigurationInvalid,
        "At least one argument (server file path) is required"
      )
    }

    // For node commands, validate the file exists
    if (config.command === "node") {
      const serverFile = config.args[0]
      if (!isAbsolute(serverFile)) {
        throw new TransportError(
          TransportErrorType.ConfigurationInvalid,
          "Server file path must be absolute when using node command"
        )
      }
      if (!existsSync(serverFile)) {
        throw new TransportError(
          TransportErrorType.ValidationFailed,
          `Server file not found: ${serverFile}`
        )
      }

      // Prevent the BatchIt aggregator from spawning itself
      const fullCommand = [config.command, ...(config.args || [])].join(" ")
      if (
        SELF_REFERENCE_PATTERNS.some((pattern) =>
          fullCommand.toLowerCase().includes(pattern.toLowerCase())
        )
      ) {
        throw new TransportError(
          TransportErrorType.ConfigurationInvalid,
          "Cannot spawn the BatchIt aggregator itself. Provide a valid MCP server file instead."
        )
      }
    }
  }

  private validateWebSocketConfig(
    config: Extract<TransportConfig, { type: "websocket" }>
  ) {
    try {
      const url = new URL(config.url)
      if (url.protocol !== "ws:" && url.protocol !== "wss:") {
        throw new TransportError(
          TransportErrorType.ConfigurationInvalid,
          "WebSocket URL must use ws:// or wss:// protocol"
        )
      }
    } catch (error) {
      throw new TransportError(
        TransportErrorType.ConfigurationInvalid,
        "Invalid WebSocket URL",
        error instanceof Error ? error : undefined
      )
    }
  }

  async getOrCreateConnection(
    identity: ServerIdentity
  ): Promise<ServerConnection> {
    const serverKey = this.createKeyForIdentity(identity)

    if (this.connections.has(serverKey)) {
      const conn = this.connections.get(serverKey)!
      conn.lastUsed = Date.now()
      return conn
    }

    const transport = await this.createTransport(identity.transport)
    const client = new Client(
      { name: "mcp-batchit", version: "1.0.0" },
      { capabilities: {} }
    )

    await client.connect(transport)

    const connection: ServerConnection = {
      client,
      transport,
      lastUsed: Date.now(),
      identity,
    }

    this.connections.set(serverKey, connection)
    this.setupMonitoring(serverKey, connection)
    this.setupCleanupInterval(serverKey)

    return connection
  }

  private async createTransport(
    config: TransportConfig
  ): Promise<WebSocketClientTransport | StdioClientTransport> {
    switch (config.type) {
      case "stdio": {
        try {
          this.validateStdioConfig(config)

          try {
            const transport = new StdioClientTransport({
              command: config.command,
              args: config.args,
              env: config.env,
              stderr: "pipe",
            })

            // Test the transport
            return transport
          } catch (error) {
            if (
              error &&
              typeof error === "object" &&
              "code" in error &&
              error.code === "ENOENT"
            ) {
              throw new TransportError(
                TransportErrorType.CommandNotFound,
                `Command '${config.command}' not found in PATH. If using 'npx', ensure it's installed globally. Consider using 'node' with direct path to server JS file instead.`
              )
            }
            throw error
          }
        } catch (error) {
          if (error instanceof TransportError) {
            throw new McpError(ErrorCode.InvalidParams, error.message)
          } else {
            throw new McpError(
              ErrorCode.InternalError,
              `Failed to create stdio transport: ${
                error instanceof Error ? error.message : String(error)
              }`
            )
          }
        }
      }

      case "websocket": {
        try {
          this.validateWebSocketConfig(config)

          const wsUrl =
            config.url.startsWith("ws://") || config.url.startsWith("wss://")
              ? config.url
              : `ws://${config.url}`

          const transport = new WebSocketClientTransport(new URL(wsUrl))
          return transport
        } catch (error) {
          if (error instanceof TransportError) {
            throw new McpError(ErrorCode.InvalidParams, error.message)
          } else {
            throw new McpError(
              ErrorCode.InternalError,
              `Failed to create WebSocket transport: ${
                error instanceof Error ? error.message : String(error)
              }`
            )
          }
        }
      }
    }
  }

  private setupMonitoring(
    serverKey: string,
    connection: ServerConnection
  ): void {
    if (isStdioTransport(connection.transport)) {
      // For stdio transports, we can monitor stderr
      const stderr = connection.transport.stderr
      if (stderr) {
        stderr.on("data", (data: Buffer) => {
          console.error(`[${connection.identity.name}] ${data.toString()}`)
        })
      }
    }

    // Monitor transport errors
    connection.transport.onerror = (error: Error) => {
      console.error(`Transport error:`, error)
      this.closeConnection(serverKey)
    }
  }

  private setupCleanupInterval(serverKey: string): void {
    const interval = setInterval(() => {
      const conn = this.connections.get(serverKey)
      if (!conn) return

      const idleTime = Date.now() - conn.lastUsed
      if (idleTime > (conn.identity.maxIdleTimeMs ?? 300000)) {
        // 5min default
        this.closeConnection(serverKey)
      }
    }, 60000) // Check every minute

    this.cleanupIntervals.set(serverKey, interval)
  }

  async closeConnection(serverKey: string): Promise<void> {
    const conn = this.connections.get(serverKey)
    if (!conn) return

    try {
      await conn.client.close()
      await conn.transport.close()
    } catch (error) {
      console.error(`Error closing connection for ${serverKey}:`, error)
    }

    this.connections.delete(serverKey)

    const interval = this.cleanupIntervals.get(serverKey)
    if (interval) {
      clearInterval(interval)
      this.cleanupIntervals.delete(serverKey)
    }
  }

  async closeAll(): Promise<void> {
    for (const serverKey of this.connections.keys()) {
      await this.closeConnection(serverKey)
    }
  }
}

// Batch Execution
interface Operation {
  tool: string
  arguments: Record<string, unknown>
}

interface OperationResult {
  tool: string
  success: boolean
  result?: unknown
  error?: string
  durationMs: number
}

class BatchExecutor {
  constructor(private connectionManager: ConnectionManager) {}

  async executeBatch(
    identity: ServerIdentity,
    operations: Operation[],
    options: {
      maxConcurrent: number
      timeoutMs: number
      stopOnError: boolean
      keepAlive?: boolean
    }
  ): Promise<OperationResult[]> {
    const connection = await this.connectionManager.getOrCreateConnection(
      identity
    )

    const results: OperationResult[] = []
    const pending = [...operations]
    const running = new Set<Promise<OperationResult>>()

    try {
      while (pending.length > 0 || running.size > 0) {
        while (pending.length > 0 && running.size < options.maxConcurrent) {
          const op = pending.shift()!
          const promise = this.executeOperation(
            connection,
            op,
            options.timeoutMs
          )
          running.add(promise)

          promise.then((res) => {
            running.delete(promise)
            results.push(res)
            if (!res.success && options.stopOnError) {
              pending.length = 0
            }
          })
        }

        if (running.size > 0) {
          await Promise.race(running)
        }
      }
    } finally {
      if (!options.keepAlive) {
        await this.connectionManager.closeConnection(
          this.connectionManager.createKeyForIdentity(identity)
        )
      }
    }

    return results
  }

  private getErrorMessage(result: HPCErrorResponse): string {
    // Direct error/message properties
    if (result.error || result.message) {
      return result.error ?? result.message ?? "Unknown HPC error"
    }

    // Look for error in content array
    if (result.content?.length) {
      const textContent = result.content
        .filter((item) => item.type === "text" && item.text)
        .map((item) => item.text)
        .filter((text): text is string => text !== undefined)
        .join(" ")

      if (textContent) {
        return textContent
      }
    }

    return "Unknown HPC error"
  }

  private async executeOperation(
    connection: ServerConnection,
    operation: Operation,
    timeoutMs: number
  ): Promise<OperationResult> {
    const start = Date.now()
    try {
      const result = await Promise.race([
        connection.client.callTool({
          name: operation.tool,
          arguments: operation.arguments,
        }),
        new Promise<never>((_, reject) =>
          setTimeout(
            () =>
              reject(
                new McpError(ErrorCode.RequestTimeout, "Operation timed out")
              ),
            timeoutMs
          )
        ),
      ])

      if (isHPCErrorResponse(result)) {
        return {
          tool: operation.tool,
          success: false,
          error: this.getErrorMessage(result),
          durationMs: Date.now() - start,
        }
      }

      return {
        tool: operation.tool,
        success: true,
        result,
        durationMs: Date.now() - start,
      }
    } catch (error) {
      return {
        tool: operation.tool,
        success: false,
        error: error instanceof Error ? error.message : String(error),
        durationMs: Date.now() - start,
      }
    }
  }
}

// Server Setup
const connectionManager = new ConnectionManager()
const batchExecutor = new BatchExecutor(connectionManager)
const server = new McpServer({
  name: "mcp-batchit",
  version: "1.0.0",
})

// Define the tool's schema shape (required properties for tool registration)
const toolSchema = {
  targetServer: BatchArgsSchema.shape.targetServer,
  operations: BatchArgsSchema.shape.operations,
  options: BatchArgsSchema.shape.options,
}

server.tool(
  "batch_execute",
  `
Execute multiple operations in batch on a specified MCP server. You must provide a real MCP server (like @modelcontextprotocol/server-filesystem). The aggregator will reject any attempt to spawn itself.

Transport Configuration:

1. For stdio transport (recommended for local servers):
   Using node with direct file path (preferred):
   {
     "transport": {
       "type": "stdio",
       "command": "node",
       "args": ["C:/path/to/server.js"]
     }
   }

   Using npx (requires global npx installation):
   {
     "transport": {
       "type": "stdio",
       "command": "npx",
       "args": ["@modelcontextprotocol/server-filesystem"]
     }
   }

2. For WebSocket transport (for connecting to running servers):
   {
     "transport": {
       "type": "websocket",
       "url": "ws://localhost:3000"
     }
   }

Usage:
  - Provide "targetServer" configuration with:
    - name: Unique identifier for the server
    - serverType: Type and configuration of the server (filesystem, database, or generic)
    - transport: Connection method (stdio or websocket) and its configuration
  - Provide "operations" as an array of objects with:
    - tool: The tool name on the target server
    - arguments: The JSON arguments to pass
  - Options:
    - maxConcurrent: Maximum concurrent operations (default: 10)
    - timeoutMs: Timeout per operation in milliseconds (default: 30000)
    - stopOnError: Whether to stop on first error (default: false)
    - keepAlive: Keep connection after batch completion (default: false)

Complete Example:
  {
    "targetServer": {
      "name": "local-fs",
      "serverType": {
        "type": "filesystem",
        "config": {
          "rootDirectory": "C:/data",
          "watchMode": true
        }
      },
       "transport": {
        "type": "stdio",
        "command": "node",
         "args": ["C:/path/to/filesystem-server.js"]
      }
    },
     "operations": [
      { "tool": "createFile", "arguments": { "path": "test1.txt", "content": "Hello" } },
      { "tool": "createFile", "arguments": { "path": "test2.txt", "content": "World" } }
    ],
    "options": {
      "maxConcurrent": 3,
      "stopOnError": true
    }
  }`,
  toolSchema,
  async (args) => {
    const parsed = BatchArgsSchema.safeParse(args)
    if (!parsed.success) {
      throw new McpError(ErrorCode.InvalidParams, parsed.error.message)
    }

    const { targetServer, operations, options } = parsed.data

    const results = await batchExecutor.executeBatch(
      targetServer,
      operations,
      options
    )

    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(
            {
              targetServer: targetServer.name,
              summary: {
                successCount: results.filter((r) => r.success).length,
                failCount: results.filter((r) => !r.success).length,
                totalDurationMs: results.reduce(
                  (sum, r) => sum + r.durationMs,
                  0
                ),
              },
              operations: results,
            },
            null,
            2
          ),
        },
      ],
    }
  }
)

// Startup
;(async function main() {
  const transport = new StdioServerTransport()
  await server.connect(transport)

  console.error("mcp-batchit is running on stdio. Ready to batch-execute!")

  process.on("SIGINT", cleanup)
  process.on("SIGTERM", cleanup)
})().catch((err) => {
  console.error("Fatal error in aggregator server:", err)
  process.exit(1)
})

async function cleanup() {
  console.error("Shutting down, closing all connections...")
  await connectionManager.closeAll()
  await server.close()
  process.exit(0)
}

```