# Directory Structure
```
├── .gitignore
├── package.json
├── README.md
├── src
│ └── index.ts
└── tsconfig.json
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Dependencies
node_modules/
package-lock.json
# Build output
build/
dist/
*.js
*.js.map
*.d.ts
*.d.ts.map
# IDEs and editors
.idea/
.vscode/
*.swp
*.swo
.DS_Store
docs/
memory-bank/
# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Environment variables
.env
.env.*
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
<div align="center">
# MCP BatchIt
**Batch multiple MCP tool calls into a single "batch_execute" request—reducing overhead and token usage for AI agents.**
[](https://opensource.org/licenses/MIT)
</div>
---
## Table of Contents
1. [Introduction](#introduction)
2. [Why Use BatchIt](#why-use-batchit)
3. [Key Features & Limitations](#key-features--limitations)
4. [Installation & Startup](#installation--startup)
5. [Multi-Phase Usage](#multi-phase-usage)
- [Implementation Phases](#implementation-phases)
- [Information Gathering](#information-gathering)
- [LLM‐Only Step (List Code Definitions)](#llm-only-step-list-code-definitions)
- [Document Creation](#document-creation)
6. [FAQ](#faq)
7. [License](#license)
---
## Introduction
> ⚠️ **NOTICE: Work in Progress**
>
> This project is actively being developed to address several complex challenges:
> - Maintaining backwards compatibility with existing MCP servers
> - Resolving transport complexities with multi-connection clients (Cline, Roo, Claude Desktop)
> - Creating a beginner-friendly implementation
>
> While functional, expect ongoing improvements and changes as we refine the solution.
**MCP BatchIt** is a simple aggregator server in the [Model Context Protocol (MCP)](https://modelcontext.ai/) ecosystem. It exposes just **one** tool: **`batch_execute`**. Rather than calling multiple MCP tools (like `fetch`, `read_file`, `create_directory`, `write_file`, etc.) in **separate** messages, you can **batch** them together in one aggregator request.
This dramatically reduces token usage, network overhead, and repeated context in your AI agent or LLM conversation.
---
## Why Use BatchIt
- **One Action per Message** Problem:
Normally, an LLM or AI agent can only call a single MCP tool at a time, forcing multiple calls for multi-step tasks.
- **Excessive Round Trips**:
10 separate file operations might require 10 messages → 10 responses.
- **BatchIt’s Approach**:
1. Takes a single `batch_execute` request.
2. Spawns (or connects to) the actual target MCP server (like a filesystem server) behind the scenes.
3. Runs each sub-operation (tool call) in parallel up to `maxConcurrent`.
4. If one sub-op fails and `stopOnError` is true, it halts new sub-ops.
5. Returns one consolidated JSON result.
---
## Key Features & Limitations
### Features
1. **Single “Batch Execute” Tool**
- You simply specify a list of sub‐ops referencing your existing MCP server’s tools.
2. **Parallel Execution**
- Run multiple sub-ops at once, controlled by `maxConcurrent`.
3. **Timeout & Stop on Error**
- Each sub-op races a `timeoutMs`, and you can skip remaining ops if one fails.
4. **Connection Caching**
- Reuses the same connection to the downstream MCP server for repeated calls, closing after an idle timeout.
### Limitations
1. **No Data Passing Mid-Batch**
- If sub-op #2 depends on #1’s output, do multiple aggregator calls.
2. **No Partial Progress**
- You get all sub-ops’ results together at the end of each “batch_execute.”
3. **Must Use a Real MCP Server**
- If you spawn or connect to the aggregator itself, you’ll see “tool not found.” The aggregator only has “batch_execute.”
4. **One Target Server per Call**
- Each aggregator call references a single target MCP server. If you want multiple servers, you’d do more advanced logic or separate calls.
---
## Installation & Startup
```bash
git clone https://github.com/ryanjoachim/mcp-batchit.git
cd mcp-batchit
npm install
npm run build
npm start
```
BatchIt starts on **STDIO** by default so your AI agent (or any MCP client) can spawn it. For example:
```
mcp-batchit is running on stdio. Ready to batch-execute!
```
You can now send JSON-RPC requests (`tools/call` method, `name= "batch_execute"`) to it.
---
## MEMORY BANK
Using Cline/Roo Code, you can build a framework of contextual project documentation by leveraging the powerful "Memory Bank" custom instructions developed by Nick Baumann.
[View Memory Bank Documentation](https://github.com/nickbaumann98/cline_docs/blob/main/prompting/custom%20instructions%20library/cline-memory-bank.md)
#### Traditional Approach (19+ calls):
1. Read package.json
2. Wait for response
3. Read README.md
4. Wait for response
5. List code definitions
6. Wait for response
7. Create memory-bank directory
8. Wait for response
9. Write productContext.md
10. Write systemPatterns.md
11. Write techContext.md
12. Write progress.md
13. Write activeContext.md
14. Wait for responses (5 more calls)
Total: ~19 separate API calls (13 operations + 6 response waits)
#### BatchIt Approach (1-3 calls)
### Multi-Phase Usage
When working with complex multi-step tasks that depend on real-time output (such as reading files and generating documentation), you'll need to handle the process in distinct phases. This is necessary because **BatchIt** doesn't support data passing between sub-operations within the same request.
### Implementation Phases
#### Information Gathering
In this initial phase, we gather information from the filesystem by reading necessary files (e.g., `package.json`, `README.md`). This is accomplished through a **batch_execute** call to the filesystem MCP server:
```jsonc
{
"targetServer": {
"name": "filesystem",
"serverType": {
"type": "filesystem",
"config": {
"rootDirectory": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
}
},
"transport": {
"type": "stdio",
"command": "cmd.exe",
"args": [
"/c",
"npx",
"-y",
"@modelcontextprotocol/server-filesystem",
"C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
]
}
},
"operations": [
{
"tool": "read_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/package.json"
}
},
{
"tool": "read_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/README.md"
}
}
],
"options": {
"maxConcurrent": 2,
"stopOnError": true,
"timeoutMs": 30000
}
}
```
**Note**: The aggregator spawns `@modelcontextprotocol/server-filesystem` (via `npx`) to execute parallel `read_file` operations.
#### LLM‐Only Step (List Code Definitions)
This phase involves processing outside the aggregator, typically using LLM or AI agent capabilities:
```typescript
<list_code_definition_names>
<path>src</path>
</list_code_definition_names>
```
This step utilizes Roo Code's `list_code_definition_names` tool, which is exclusively available to LLMs. However, note that many MCP servers can provide similar functionality, making it possible to complete this process without LLM requests.
#### Document Creation
The final phase combines data from previous steps (file contents and code definitions) to generate documentation in the `memory-bank` directory:
```jsonc
{
"targetServer": {
"name": "filesystem",
"serverType": {
"type": "filesystem",
"config": {
"rootDirectory": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
}
},
"transport": {
"type": "stdio",
"command": "cmd.exe",
"args": [
"/c",
"npx",
"-y",
"@modelcontextprotocol/server-filesystem",
"C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit"
]
}
},
"operations": [
{
"tool": "create_directory",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank"
}
},
{
"tool": "write_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/productContext.md",
"content": "# MCP BatchIt Product Context\\n\\n## Purpose\\n..."
}
},
{
"tool": "write_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/systemPatterns.md",
"content": "# MCP BatchIt System Patterns\\n\\n## Architecture Overview\\n..."
}
},
{
"tool": "write_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/techContext.md",
"content": "# MCP BatchIt Technical Context\\n\\n## Technology Stack\\n..."
}
},
{
"tool": "write_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/progress.md",
"content": "# MCP BatchIt Progress Status\\n\\n## Completed Features\\n..."
}
},
{
"tool": "write_file",
"arguments": {
"path": "C:/Users/Chewy/Documents/GitHub/ryanjoachim/mcp-batchit/memory-bank/activeContext.md",
"content": "# MCP BatchIt Active Context\\n\\n## Current Status\\n..."
}
}
],
"options": {
"maxConcurrent": 1,
"stopOnError": true,
"timeoutMs": 30000
}
}
```
The aggregator processes these operations sequentially (`maxConcurrent=1`), creating the directory and writing multiple documentation files. The result array indicates the success/failure status of each operation.
---
## FAQ
**Q1: Do I need multiple aggregator calls if sub-op #2 depends on sub-op #1’s results?**
**Yes.** BatchIt doesn’t pass data between sub-ops in the same request. You do multi-phase calls (like the example above).
**Q2: Why do I get “Tool create_directory not found” sometimes?**
Because your `transport` might be pointing to the aggregator script itself instead of the real MCP server. Make sure you reference something like `@modelcontextprotocol/server-filesystem`.
**Q3: Can I do concurrency plus stopOnError?**
Absolutely. If a sub-op fails, we skip launching new sub-ops. Already-running ones finish in parallel.
**Q4: Does BatchIt re-spawn the target server each time?**
It *can* if you specify `keepAlive: false`. But if you use the same exact `targetServer.name + transport`, it caches the connection until an idle timeout passes.
**Q5: Are partial results returned if an error occurs in the middle?**
Yes. Each sub-op that finished prior to the error is included in the final aggregator response, along with the failing sub-op. Remaining sub-ops are skipped if `stopOnError` is true.
---
## License
**MIT**
```
--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------
```json
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "node16",
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "./build",
"rootDir": "./src",
"strict": true,
"noImplicitAny": true,
"strictNullChecks": true,
"strictFunctionTypes": true,
"strictBindCallApply": true,
"strictPropertyInitialization": true,
"noImplicitThis": true,
"alwaysStrict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true,
"allowJs": true,
"resolveJsonModule": true,
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "build", "**/*.test.ts"]
}
```
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "mcp-batchit",
"version": "1.0.0",
"description": "Batch multiple MCP tool calls into a single request—reducing overhead and token usage for AI agents",
"main": "build/index.js",
"type": "module",
"types": "build/index.d.ts",
"bin": {
"mcp-batchit": "./build/index.js"
},
"files": [
"build"
],
"scripts": {
"build": "tsc",
"prepare": "npm run build",
"watch": "tsc --watch",
"test": "jest",
"lint": "eslint src --ext .ts",
"clean": "rimraf build"
},
"config": {
"run-script": {
"build": "npm run build"
}
},
"keywords": [
"mcp",
"modelcontextprotocol",
"batch",
"operations",
"ai",
"llm",
"agent",
"filesystem",
"aggregator"
],
"author": "Ryan Joachim",
"repository": {
"type": "git",
"url": "https://github.com/ryanjoachim/mcp-batchit.git"
},
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.4.0"
},
"devDependencies": {
"@types/jest": "^29.5.11",
"@types/node": "^22.8.0",
"jest": "^29.7.0",
"rimraf": "^5.0.5",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.2",
"typescript": "^5.7.2"
}
}
```
--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------
```typescript
#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { WebSocketClientTransport } from "@modelcontextprotocol/sdk/client/websocket.js"
import { z } from "zod"
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js"
import { ChildProcess } from "child_process"
import { existsSync } from "fs"
import { isAbsolute } from "path"
// Array of patterns that indicate self-referential usage
const SELF_REFERENCE_PATTERNS = [
// Direct file path references
"mcp-batchit/build/index.js",
"mcp-batchit/dist/index.js",
"mcp-batchit/lib/index.js",
// NPM package references
"@modelcontextprotocol/batchit",
"@modelcontextprotocol/server-batchit",
// Common variations
"mcp-batchit",
"batchit",
"server-batchit",
]
// Transport error handling
enum TransportErrorType {
CommandNotFound = "CommandNotFound",
ConnectionFailed = "ConnectionFailed",
ValidationFailed = "ValidationFailed",
ConfigurationInvalid = "ConfigurationInvalid",
}
class TransportError extends Error {
constructor(
public type: TransportErrorType,
message: string,
public cause?: Error
) {
super(message)
this.name = "TransportError"
Error.captureStackTrace(this, TransportError)
}
}
// Server Type Definitions
interface FilesystemServerConfig {
rootDirectory?: string
permissions?: string
watchMode?: boolean
}
interface DatabaseServerConfig {
database: string
readOnly?: boolean
poolSize?: number
}
interface GenericServerConfig {
[key: string]: unknown
}
type ServerType =
| { type: "filesystem"; config: FilesystemServerConfig }
| { type: "database"; config: DatabaseServerConfig }
| { type: "generic"; config: GenericServerConfig }
// Transport Configuration
type TransportConfig =
| {
type: "stdio"
command: string
args?: string[]
env?: Record<string, string>
}
| {
type: "websocket"
url: string
options?: Record<string, unknown>
}
interface ServerIdentity {
name: string
serverType: ServerType
transport: TransportConfig
maxIdleTimeMs?: number
}
interface ServerConnection {
client: Client
transport: WebSocketClientTransport | StdioClientTransport
childProcess?: ChildProcess
lastUsed: number
identity: ServerIdentity
}
interface HPCContentItem {
type: string
text?: string
}
interface HPCErrorResponse {
isError: true
error?: string
message?: string
content?: HPCContentItem[]
}
function isHPCErrorResponse(value: unknown): value is HPCErrorResponse {
return (
value !== null &&
typeof value === "object" &&
"isError" in value &&
value.isError === true
)
}
// Type guard for StdioClientTransport
function isStdioTransport(transport: any): transport is StdioClientTransport {
return "start" in transport
}
// Schema Definitions
const ServerTypeSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("filesystem"),
config: z.object({
rootDirectory: z.string().optional(),
permissions: z.string().optional(),
watchMode: z.boolean().optional(),
}),
}),
z.object({
type: z.literal("database"),
config: z.object({
database: z.string(),
readOnly: z.boolean().optional(),
poolSize: z.number().optional(),
}),
}),
z.object({
type: z.literal("generic"),
config: z.record(z.unknown()),
}),
])
const TransportConfigSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("stdio"),
command: z.string(),
args: z.array(z.string()).optional(),
env: z.record(z.string()).optional(),
}),
z.object({
type: z.literal("websocket"),
url: z.string(),
options: z.record(z.unknown()).optional(),
}),
])
const BatchArgsSchema = z.object({
targetServer: z.object({
name: z.string(),
serverType: ServerTypeSchema,
transport: TransportConfigSchema,
maxIdleTimeMs: z.number().optional(),
}),
operations: z.array(
z.object({
tool: z.string(),
arguments: z.record(z.unknown()).default({}),
})
),
options: z
.object({
maxConcurrent: z.number().default(10),
timeoutMs: z.number().default(30000),
stopOnError: z.boolean().default(false),
keepAlive: z.boolean().default(false),
})
.default({
maxConcurrent: 10,
timeoutMs: 30000,
stopOnError: false,
keepAlive: false,
}),
})
// Connection Management
class ConnectionManager {
private connections = new Map<string, ServerConnection>()
private cleanupIntervals = new Map<string, NodeJS.Timeout>()
createKeyForIdentity(identity: ServerIdentity): string {
return JSON.stringify({
name: identity.name,
serverType: identity.serverType,
transport: identity.transport,
})
}
private validateStdioConfig(
config: Extract<TransportConfig, { type: "stdio" }>
) {
if (!config.command) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Command is required for stdio transport"
)
}
if (!config.args?.length) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"At least one argument (server file path) is required"
)
}
// For node commands, validate the file exists
if (config.command === "node") {
const serverFile = config.args[0]
if (!isAbsolute(serverFile)) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Server file path must be absolute when using node command"
)
}
if (!existsSync(serverFile)) {
throw new TransportError(
TransportErrorType.ValidationFailed,
`Server file not found: ${serverFile}`
)
}
// Prevent the BatchIt aggregator from spawning itself
const fullCommand = [config.command, ...(config.args || [])].join(" ")
if (
SELF_REFERENCE_PATTERNS.some((pattern) =>
fullCommand.toLowerCase().includes(pattern.toLowerCase())
)
) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Cannot spawn the BatchIt aggregator itself. Provide a valid MCP server file instead."
)
}
}
}
private validateWebSocketConfig(
config: Extract<TransportConfig, { type: "websocket" }>
) {
try {
const url = new URL(config.url)
if (url.protocol !== "ws:" && url.protocol !== "wss:") {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"WebSocket URL must use ws:// or wss:// protocol"
)
}
} catch (error) {
throw new TransportError(
TransportErrorType.ConfigurationInvalid,
"Invalid WebSocket URL",
error instanceof Error ? error : undefined
)
}
}
async getOrCreateConnection(
identity: ServerIdentity
): Promise<ServerConnection> {
const serverKey = this.createKeyForIdentity(identity)
if (this.connections.has(serverKey)) {
const conn = this.connections.get(serverKey)!
conn.lastUsed = Date.now()
return conn
}
const transport = await this.createTransport(identity.transport)
const client = new Client(
{ name: "mcp-batchit", version: "1.0.0" },
{ capabilities: {} }
)
await client.connect(transport)
const connection: ServerConnection = {
client,
transport,
lastUsed: Date.now(),
identity,
}
this.connections.set(serverKey, connection)
this.setupMonitoring(serverKey, connection)
this.setupCleanupInterval(serverKey)
return connection
}
private async createTransport(
config: TransportConfig
): Promise<WebSocketClientTransport | StdioClientTransport> {
switch (config.type) {
case "stdio": {
try {
this.validateStdioConfig(config)
try {
const transport = new StdioClientTransport({
command: config.command,
args: config.args,
env: config.env,
stderr: "pipe",
})
// Test the transport
return transport
} catch (error) {
if (
error &&
typeof error === "object" &&
"code" in error &&
error.code === "ENOENT"
) {
throw new TransportError(
TransportErrorType.CommandNotFound,
`Command '${config.command}' not found in PATH. If using 'npx', ensure it's installed globally. Consider using 'node' with direct path to server JS file instead.`
)
}
throw error
}
} catch (error) {
if (error instanceof TransportError) {
throw new McpError(ErrorCode.InvalidParams, error.message)
} else {
throw new McpError(
ErrorCode.InternalError,
`Failed to create stdio transport: ${
error instanceof Error ? error.message : String(error)
}`
)
}
}
}
case "websocket": {
try {
this.validateWebSocketConfig(config)
const wsUrl =
config.url.startsWith("ws://") || config.url.startsWith("wss://")
? config.url
: `ws://${config.url}`
const transport = new WebSocketClientTransport(new URL(wsUrl))
return transport
} catch (error) {
if (error instanceof TransportError) {
throw new McpError(ErrorCode.InvalidParams, error.message)
} else {
throw new McpError(
ErrorCode.InternalError,
`Failed to create WebSocket transport: ${
error instanceof Error ? error.message : String(error)
}`
)
}
}
}
}
}
private setupMonitoring(
serverKey: string,
connection: ServerConnection
): void {
if (isStdioTransport(connection.transport)) {
// For stdio transports, we can monitor stderr
const stderr = connection.transport.stderr
if (stderr) {
stderr.on("data", (data: Buffer) => {
console.error(`[${connection.identity.name}] ${data.toString()}`)
})
}
}
// Monitor transport errors
connection.transport.onerror = (error: Error) => {
console.error(`Transport error:`, error)
this.closeConnection(serverKey)
}
}
private setupCleanupInterval(serverKey: string): void {
const interval = setInterval(() => {
const conn = this.connections.get(serverKey)
if (!conn) return
const idleTime = Date.now() - conn.lastUsed
if (idleTime > (conn.identity.maxIdleTimeMs ?? 300000)) {
// 5min default
this.closeConnection(serverKey)
}
}, 60000) // Check every minute
this.cleanupIntervals.set(serverKey, interval)
}
async closeConnection(serverKey: string): Promise<void> {
const conn = this.connections.get(serverKey)
if (!conn) return
try {
await conn.client.close()
await conn.transport.close()
} catch (error) {
console.error(`Error closing connection for ${serverKey}:`, error)
}
this.connections.delete(serverKey)
const interval = this.cleanupIntervals.get(serverKey)
if (interval) {
clearInterval(interval)
this.cleanupIntervals.delete(serverKey)
}
}
async closeAll(): Promise<void> {
for (const serverKey of this.connections.keys()) {
await this.closeConnection(serverKey)
}
}
}
// Batch Execution
interface Operation {
tool: string
arguments: Record<string, unknown>
}
interface OperationResult {
tool: string
success: boolean
result?: unknown
error?: string
durationMs: number
}
class BatchExecutor {
constructor(private connectionManager: ConnectionManager) {}
async executeBatch(
identity: ServerIdentity,
operations: Operation[],
options: {
maxConcurrent: number
timeoutMs: number
stopOnError: boolean
keepAlive?: boolean
}
): Promise<OperationResult[]> {
const connection = await this.connectionManager.getOrCreateConnection(
identity
)
const results: OperationResult[] = []
const pending = [...operations]
const running = new Set<Promise<OperationResult>>()
try {
while (pending.length > 0 || running.size > 0) {
while (pending.length > 0 && running.size < options.maxConcurrent) {
const op = pending.shift()!
const promise = this.executeOperation(
connection,
op,
options.timeoutMs
)
running.add(promise)
promise.then((res) => {
running.delete(promise)
results.push(res)
if (!res.success && options.stopOnError) {
pending.length = 0
}
})
}
if (running.size > 0) {
await Promise.race(running)
}
}
} finally {
if (!options.keepAlive) {
await this.connectionManager.closeConnection(
this.connectionManager.createKeyForIdentity(identity)
)
}
}
return results
}
private getErrorMessage(result: HPCErrorResponse): string {
// Direct error/message properties
if (result.error || result.message) {
return result.error ?? result.message ?? "Unknown HPC error"
}
// Look for error in content array
if (result.content?.length) {
const textContent = result.content
.filter((item) => item.type === "text" && item.text)
.map((item) => item.text)
.filter((text): text is string => text !== undefined)
.join(" ")
if (textContent) {
return textContent
}
}
return "Unknown HPC error"
}
private async executeOperation(
connection: ServerConnection,
operation: Operation,
timeoutMs: number
): Promise<OperationResult> {
const start = Date.now()
try {
const result = await Promise.race([
connection.client.callTool({
name: operation.tool,
arguments: operation.arguments,
}),
new Promise<never>((_, reject) =>
setTimeout(
() =>
reject(
new McpError(ErrorCode.RequestTimeout, "Operation timed out")
),
timeoutMs
)
),
])
if (isHPCErrorResponse(result)) {
return {
tool: operation.tool,
success: false,
error: this.getErrorMessage(result),
durationMs: Date.now() - start,
}
}
return {
tool: operation.tool,
success: true,
result,
durationMs: Date.now() - start,
}
} catch (error) {
return {
tool: operation.tool,
success: false,
error: error instanceof Error ? error.message : String(error),
durationMs: Date.now() - start,
}
}
}
}
// Server Setup
const connectionManager = new ConnectionManager()
const batchExecutor = new BatchExecutor(connectionManager)
const server = new McpServer({
name: "mcp-batchit",
version: "1.0.0",
})
// Define the tool's schema shape (required properties for tool registration)
const toolSchema = {
targetServer: BatchArgsSchema.shape.targetServer,
operations: BatchArgsSchema.shape.operations,
options: BatchArgsSchema.shape.options,
}
server.tool(
"batch_execute",
`
Execute multiple operations in batch on a specified MCP server. You must provide a real MCP server (like @modelcontextprotocol/server-filesystem). The aggregator will reject any attempt to spawn itself.
Transport Configuration:
1. For stdio transport (recommended for local servers):
Using node with direct file path (preferred):
{
"transport": {
"type": "stdio",
"command": "node",
"args": ["C:/path/to/server.js"]
}
}
Using npx (requires global npx installation):
{
"transport": {
"type": "stdio",
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem"]
}
}
2. For WebSocket transport (for connecting to running servers):
{
"transport": {
"type": "websocket",
"url": "ws://localhost:3000"
}
}
Usage:
- Provide "targetServer" configuration with:
- name: Unique identifier for the server
- serverType: Type and configuration of the server (filesystem, database, or generic)
- transport: Connection method (stdio or websocket) and its configuration
- Provide "operations" as an array of objects with:
- tool: The tool name on the target server
- arguments: The JSON arguments to pass
- Options:
- maxConcurrent: Maximum concurrent operations (default: 10)
- timeoutMs: Timeout per operation in milliseconds (default: 30000)
- stopOnError: Whether to stop on first error (default: false)
- keepAlive: Keep connection after batch completion (default: false)
Complete Example:
{
"targetServer": {
"name": "local-fs",
"serverType": {
"type": "filesystem",
"config": {
"rootDirectory": "C:/data",
"watchMode": true
}
},
"transport": {
"type": "stdio",
"command": "node",
"args": ["C:/path/to/filesystem-server.js"]
}
},
"operations": [
{ "tool": "createFile", "arguments": { "path": "test1.txt", "content": "Hello" } },
{ "tool": "createFile", "arguments": { "path": "test2.txt", "content": "World" } }
],
"options": {
"maxConcurrent": 3,
"stopOnError": true
}
}`,
toolSchema,
async (args) => {
const parsed = BatchArgsSchema.safeParse(args)
if (!parsed.success) {
throw new McpError(ErrorCode.InvalidParams, parsed.error.message)
}
const { targetServer, operations, options } = parsed.data
const results = await batchExecutor.executeBatch(
targetServer,
operations,
options
)
return {
content: [
{
type: "text",
text: JSON.stringify(
{
targetServer: targetServer.name,
summary: {
successCount: results.filter((r) => r.success).length,
failCount: results.filter((r) => !r.success).length,
totalDurationMs: results.reduce(
(sum, r) => sum + r.durationMs,
0
),
},
operations: results,
},
null,
2
),
},
],
}
}
)
// Startup
;(async function main() {
const transport = new StdioServerTransport()
await server.connect(transport)
console.error("mcp-batchit is running on stdio. Ready to batch-execute!")
process.on("SIGINT", cleanup)
process.on("SIGTERM", cleanup)
})().catch((err) => {
console.error("Fatal error in aggregator server:", err)
process.exit(1)
})
async function cleanup() {
console.error("Shutting down, closing all connections...")
await connectionManager.closeAll()
await server.close()
process.exit(0)
}
```