This is page 3 of 4. Use http://codebase.md/modelcontextprotocol/servers?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .gitattributes ├── .github │ ├── pull_request_template.md │ └── workflows │ ├── claude.yml │ ├── python.yml │ ├── release.yml │ └── typescript.yml ├── .gitignore ├── .npmrc ├── .vscode │ └── settings.json ├── CODE_OF_CONDUCT.md ├── CONTRIBUTING.md ├── LICENSE ├── package-lock.json ├── package.json ├── README.md ├── scripts │ └── release.py ├── SECURITY.md ├── src │ ├── everything │ │ ├── CLAUDE.md │ │ ├── Dockerfile │ │ ├── everything.ts │ │ ├── index.ts │ │ ├── instructions.md │ │ ├── package.json │ │ ├── README.md │ │ ├── sse.ts │ │ ├── stdio.ts │ │ ├── streamableHttp.ts │ │ └── tsconfig.json │ ├── fetch │ │ ├── .python-version │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── src │ │ │ └── mcp_server_fetch │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── server.py │ │ └── uv.lock │ ├── filesystem │ │ ├── __tests__ │ │ │ ├── directory-tree.test.ts │ │ │ ├── lib.test.ts │ │ │ ├── path-utils.test.ts │ │ │ ├── path-validation.test.ts │ │ │ └── roots-utils.test.ts │ │ ├── Dockerfile │ │ ├── index.ts │ │ ├── jest.config.cjs │ │ ├── lib.ts │ │ ├── package.json │ │ ├── path-utils.ts │ │ ├── path-validation.ts │ │ ├── README.md │ │ ├── roots-utils.ts │ │ └── tsconfig.json │ ├── git │ │ ├── .gitignore │ │ ├── .python-version │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── src │ │ │ └── mcp_server_git │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ ├── py.typed │ │ │ └── server.py │ │ ├── tests │ │ │ └── test_server.py │ │ └── uv.lock │ ├── memory │ │ ├── Dockerfile │ │ ├── index.ts │ │ ├── package.json │ │ ├── README.md │ │ └── tsconfig.json │ ├── sequentialthinking │ │ ├── Dockerfile │ │ ├── index.ts │ │ ├── package.json │ │ ├── README.md │ │ └── tsconfig.json │ └── time │ ├── .python-version │ ├── Dockerfile │ ├── pyproject.toml │ ├── README.md │ ├── src │ │ └── mcp_server_time │ │ ├── __init__.py │ │ ├── __main__.py │ │ └── server.py │ ├── test │ │ └── time_server_test.py │ └── uv.lock └── tsconfig.json ``` # Files -------------------------------------------------------------------------------- /SECURITY.md: -------------------------------------------------------------------------------- ```markdown # Security Policy Thank you for helping us keep our MCP servers secure. The **reference servers** in this repo are maintained by [Anthropic](https://www.anthropic.com/) as part of the Model Context Protocol project. The security of our systems and user data is Anthropic’s top priority. We appreciate the work of security researchers acting in good faith in identifying and reporting potential vulnerabilities. ## Vulnerability Disclosure Program Our Vulnerability Program guidelines are defined on our [HackerOne program page](https://hackerone.com/anthropic-vdp). We ask that any validated vulnerability in this functionality be reported through the [submission form](https://hackerone.com/anthropic-vdp/reports/new?type=team&report_type=vulnerability). ``` -------------------------------------------------------------------------------- /src/everything/CLAUDE.md: -------------------------------------------------------------------------------- ```markdown # MCP "Everything" Server - Development Guidelines ## Build, Test & Run Commands - Build: `npm run build` - Compiles TypeScript to JavaScript - Watch mode: `npm run watch` - Watches for changes and rebuilds automatically - Run server: `npm run start` - Starts the MCP server using stdio transport - Run SSE server: `npm run start:sse` - Starts the MCP server with SSE transport - Prepare release: `npm run prepare` - Builds the project for publishing ## Code Style Guidelines - Use ES modules with `.js` extension in import paths - Strictly type all functions and variables with TypeScript - Follow zod schema patterns for tool input validation - Prefer async/await over callbacks and Promise chains - Place all imports at top of file, grouped by external then internal - Use descriptive variable names that clearly indicate purpose - Implement proper cleanup for timers and resources in server shutdown - Follow camelCase for variables/functions, PascalCase for types/classes, UPPER_CASE for constants - Handle errors with try/catch blocks and provide clear error messages - Use consistent indentation (2 spaces) and trailing commas in multi-line objects ``` -------------------------------------------------------------------------------- /CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown # Contributing to MCP Servers Thanks for your interest in contributing! Here's how you can help make this repo better. We accept changes through [the standard GitHub flow model](https://docs.github.com/en/get-started/using-github/github-flow). ## Server Listings We welcome PRs that add links to your servers in the [README.md](./README.md)! ## Server Implementations We welcome: - **Bug fixes** — Help us squash those pesky bugs. - **Usability improvements** — Making servers easier to use for humans and agents. - **Enhancements that demonstrate MCP protocol features** — We encourage contributions that help reference servers better illustrate underutilized aspects of the MCP protocol beyond just Tools, such as Resources, Prompts, or Roots. For example, adding Roots support to filesystem-server helps showcase this important but lesser-known feature. We're more selective about: - **Other new features** — Especially if they're not crucial to the server's core purpose or are highly opinionated. The existing servers are reference servers meant to inspire the community. If you need specific features, we encourage you to build enhanced versions! We think a diverse ecosystem of servers is beneficial for everyone, and would love to link to your improved server in our README. We don't accept: - **New server implementations** — We encourage you to publish them yourself, and link to them from the README. ## Documentation Improvements to existing documentation is welcome - although generally we'd prefer ergonomic improvements than documenting pain points if possible! We're more selective about adding wholly new documentation, especially in ways that aren't vendor neutral (e.g. how to run a particular server with a particular client). ## Community [Learn how the MCP community communicates](https://modelcontextprotocol.io/community/communication). Thank you for helping make MCP servers better for everyone! ``` -------------------------------------------------------------------------------- /CODE_OF_CONDUCT.md: -------------------------------------------------------------------------------- ```markdown # Contributor Covenant Code of Conduct ## Our Pledge We as members, contributors, and leaders pledge to make participation in our community a harassment-free experience for everyone, regardless of age, body size, visible or invisible disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation. We pledge to act and interact in ways that contribute to an open, welcoming, diverse, inclusive, and healthy community. ## Our Standards Examples of behavior that contributes to a positive environment for our community include: * Demonstrating empathy and kindness toward other people * Being respectful of differing opinions, viewpoints, and experiences * Giving and gracefully accepting constructive feedback * Accepting responsibility and apologizing to those affected by our mistakes, and learning from the experience * Focusing on what is best not just for us as individuals, but for the overall community Examples of unacceptable behavior include: * The use of sexualized language or imagery, and sexual attention or advances of any kind * Trolling, insulting or derogatory comments, and personal or political attacks * Public or private harassment * Publishing others' private information, such as a physical or email address, without their explicit permission * Other conduct which could reasonably be considered inappropriate in a professional setting ## Enforcement Responsibilities Community leaders are responsible for clarifying and enforcing our standards of acceptable behavior and will take appropriate and fair corrective action in response to any behavior that they deem inappropriate, threatening, offensive, or harmful. Community leaders have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, and will communicate reasons for moderation decisions when appropriate. ## Scope This Code of Conduct applies within all community spaces, and also applies when an individual is officially representing the community in public spaces. Examples of representing our community include using an official e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. ## Enforcement Instances of abusive, harassing, or otherwise unacceptable behavior may be reported to the community leaders responsible for enforcement at [email protected]. All complaints will be reviewed and investigated promptly and fairly. All community leaders are obligated to respect the privacy and security of the reporter of any incident. ## Enforcement Guidelines Community leaders will follow these Community Impact Guidelines in determining the consequences for any action they deem in violation of this Code of Conduct: ### 1. Correction **Community Impact**: Use of inappropriate language or other behavior deemed unprofessional or unwelcome in the community. **Consequence**: A private, written warning from community leaders, providing clarity around the nature of the violation and an explanation of why the behavior was inappropriate. A public apology may be requested. ### 2. Warning **Community Impact**: A violation through a single incident or series of actions. **Consequence**: A warning with consequences for continued behavior. No interaction with the people involved, including unsolicited interaction with those enforcing the Code of Conduct, for a specified period of time. This includes avoiding interactions in community spaces as well as external channels like social media. Violating these terms may lead to a temporary or permanent ban. ### 3. Temporary Ban **Community Impact**: A serious violation of community standards, including sustained inappropriate behavior. **Consequence**: A temporary ban from any sort of interaction or public communication with the community for a specified period of time. No public or private interaction with the people involved, including unsolicited interaction with those enforcing the Code of Conduct, is allowed during this period. Violating these terms may lead to a permanent ban. ### 4. Permanent Ban **Community Impact**: Demonstrating a pattern of violation of community standards, including sustained inappropriate behavior, harassment of an individual, or aggression toward or disparagement of classes of individuals. **Consequence**: A permanent ban from any sort of public interaction within the community. ## Attribution This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 2.0, available at https://www.contributor-covenant.org/version/2/0/code_of_conduct.html. Community Impact Guidelines were inspired by [Mozilla's code of conduct enforcement ladder](https://github.com/mozilla/diversity). [homepage]: https://www.contributor-covenant.org For answers to common questions about this code of conduct, see the FAQ at https://www.contributor-covenant.org/faq. Translations are available at https://www.contributor-covenant.org/translations. ``` -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- ```json {} ``` -------------------------------------------------------------------------------- /src/time/src/mcp_server_time/__main__.py: -------------------------------------------------------------------------------- ```python from mcp_server_time import main main() ``` -------------------------------------------------------------------------------- /src/git/src/mcp_server_git/__main__.py: -------------------------------------------------------------------------------- ```python # __main__.py from mcp_server_git import main main() ``` -------------------------------------------------------------------------------- /src/fetch/src/mcp_server_fetch/__main__.py: -------------------------------------------------------------------------------- ```python # __main__.py from mcp_server_fetch import main main() ``` -------------------------------------------------------------------------------- /src/everything/tsconfig.json: -------------------------------------------------------------------------------- ```json { "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "./dist", "rootDir": "." }, "include": [ "./**/*.ts" ] } ``` -------------------------------------------------------------------------------- /src/memory/tsconfig.json: -------------------------------------------------------------------------------- ```json { "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "./dist", "rootDir": "." }, "include": [ "./**/*.ts" ] } ``` -------------------------------------------------------------------------------- /src/sequentialthinking/tsconfig.json: -------------------------------------------------------------------------------- ```json { "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "./dist", "rootDir": ".", "moduleResolution": "NodeNext", "module": "NodeNext" }, "include": ["./**/*.ts"] } ``` -------------------------------------------------------------------------------- /src/filesystem/tsconfig.json: -------------------------------------------------------------------------------- ```json { "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "./dist", "rootDir": ".", "moduleResolution": "NodeNext", "module": "NodeNext" }, "include": [ "./**/*.ts" ] } ``` -------------------------------------------------------------------------------- /tsconfig.json: -------------------------------------------------------------------------------- ```json { "compilerOptions": { "target": "ES2022", "module": "Node16", "moduleResolution": "Node16", "strict": true, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, "resolveJsonModule": true }, "include": ["src/**/*"], "exclude": ["node_modules"] } ``` -------------------------------------------------------------------------------- /src/filesystem/jest.config.cjs: -------------------------------------------------------------------------------- ``` /** @type {import('ts-jest').JestConfigWithTsJest} */ module.exports = { preset: 'ts-jest', testEnvironment: 'node', extensionsToTreatAsEsm: ['.ts'], moduleNameMapper: { '^(\\.{1,2}/.*)\\.js$': '$1', }, transform: { '^.+\\.tsx?$': [ 'ts-jest', { useESM: true, }, ], }, testMatch: ['**/__tests__/**/*.test.ts'], collectCoverageFrom: [ '**/*.ts', '!**/__tests__/**', '!**/dist/**', ], } ``` -------------------------------------------------------------------------------- /src/everything/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM node:22.12-alpine AS builder COPY src/everything /app COPY tsconfig.json /tsconfig.json WORKDIR /app RUN --mount=type=cache,target=/root/.npm npm install FROM node:22-alpine AS release WORKDIR /app COPY --from=builder /app/dist /app/dist COPY --from=builder /app/package.json /app/package.json COPY --from=builder /app/package-lock.json /app/package-lock.json ENV NODE_ENV=production RUN npm ci --ignore-scripts --omit-dev CMD ["node", "dist/index.js"] ``` -------------------------------------------------------------------------------- /src/time/src/mcp_server_time/__init__.py: -------------------------------------------------------------------------------- ```python from .server import serve def main(): """MCP Time Server - Time and timezone conversion functionality for MCP""" import argparse import asyncio parser = argparse.ArgumentParser( description="give a model the ability to handle time queries and timezone conversions" ) parser.add_argument("--local-timezone", type=str, help="Override local timezone") args = parser.parse_args() asyncio.run(serve(args.local_timezone)) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/memory/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM node:22.12-alpine AS builder COPY src/memory /app COPY tsconfig.json /tsconfig.json WORKDIR /app RUN --mount=type=cache,target=/root/.npm npm install RUN --mount=type=cache,target=/root/.npm-production npm ci --ignore-scripts --omit-dev FROM node:22-alpine AS release COPY --from=builder /app/dist /app/dist COPY --from=builder /app/package.json /app/package.json COPY --from=builder /app/package-lock.json /app/package-lock.json ENV NODE_ENV=production WORKDIR /app RUN npm ci --ignore-scripts --omit-dev ENTRYPOINT ["node", "dist/index.js"] ``` -------------------------------------------------------------------------------- /src/filesystem/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM node:22.12-alpine AS builder WORKDIR /app COPY src/filesystem /app COPY tsconfig.json /tsconfig.json RUN --mount=type=cache,target=/root/.npm npm install RUN --mount=type=cache,target=/root/.npm-production npm ci --ignore-scripts --omit-dev FROM node:22-alpine AS release WORKDIR /app COPY --from=builder /app/dist /app/dist COPY --from=builder /app/package.json /app/package.json COPY --from=builder /app/package-lock.json /app/package-lock.json ENV NODE_ENV=production RUN npm ci --ignore-scripts --omit-dev ENTRYPOINT ["node", "/app/dist/index.js"] ``` -------------------------------------------------------------------------------- /src/sequentialthinking/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM node:22.12-alpine AS builder COPY src/sequentialthinking /app COPY tsconfig.json /tsconfig.json WORKDIR /app RUN --mount=type=cache,target=/root/.npm npm install RUN --mount=type=cache,target=/root/.npm-production npm ci --ignore-scripts --omit-dev FROM node:22-alpine AS release COPY --from=builder /app/dist /app/dist COPY --from=builder /app/package.json /app/package.json COPY --from=builder /app/package-lock.json /app/package-lock.json ENV NODE_ENV=production WORKDIR /app RUN npm ci --ignore-scripts --omit-dev ENTRYPOINT ["node", "dist/index.js"] ``` -------------------------------------------------------------------------------- /src/git/src/mcp_server_git/__init__.py: -------------------------------------------------------------------------------- ```python import click from pathlib import Path import logging import sys from .server import serve @click.command() @click.option("--repository", "-r", type=Path, help="Git repository path") @click.option("-v", "--verbose", count=True) def main(repository: Path | None, verbose: bool) -> None: """MCP Git Server - Git functionality for MCP""" import asyncio logging_level = logging.WARN if verbose == 1: logging_level = logging.INFO elif verbose >= 2: logging_level = logging.DEBUG logging.basicConfig(level=logging_level, stream=sys.stderr) asyncio.run(serve(repository)) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/everything/stdio.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { createServer } from "./everything.js"; console.error('Starting default (STDIO) server...'); async function main() { const transport = new StdioServerTransport(); const {server, cleanup, startNotificationIntervals} = createServer(); await server.connect(transport); startNotificationIntervals(); // Cleanup on exit process.on("SIGINT", async () => { await cleanup(); await server.close(); process.exit(0); }); } main().catch((error) => { console.error("Server error:", error); process.exit(1); }); ``` -------------------------------------------------------------------------------- /src/fetch/src/mcp_server_fetch/__init__.py: -------------------------------------------------------------------------------- ```python from .server import serve def main(): """MCP Fetch Server - HTTP fetching functionality for MCP""" import argparse import asyncio parser = argparse.ArgumentParser( description="give a model the ability to make web requests" ) parser.add_argument("--user-agent", type=str, help="Custom User-Agent string") parser.add_argument( "--ignore-robots-txt", action="store_true", help="Ignore robots.txt restrictions", ) parser.add_argument("--proxy-url", type=str, help="Proxy URL to use for requests") args = parser.parse_args() asyncio.run(serve(args.user_agent, args.ignore_robots_txt, args.proxy_url)) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/memory/package.json: -------------------------------------------------------------------------------- ```json { "name": "@modelcontextprotocol/server-memory", "version": "0.6.3", "description": "MCP server for enabling memory for Claude through a knowledge graph", "license": "MIT", "author": "Anthropic, PBC (https://anthropic.com)", "homepage": "https://modelcontextprotocol.io", "bugs": "https://github.com/modelcontextprotocol/servers/issues", "type": "module", "bin": { "mcp-server-memory": "dist/index.js" }, "files": [ "dist" ], "scripts": { "build": "tsc && shx chmod +x dist/*.js", "prepare": "npm run build", "watch": "tsc --watch" }, "dependencies": { "@modelcontextprotocol/sdk": "1.0.1" }, "devDependencies": { "@types/node": "^22", "shx": "^0.3.4", "typescript": "^5.6.2" } } ``` -------------------------------------------------------------------------------- /src/sequentialthinking/package.json: -------------------------------------------------------------------------------- ```json { "name": "@modelcontextprotocol/server-sequential-thinking", "version": "0.6.2", "description": "MCP server for sequential thinking and problem solving", "license": "MIT", "author": "Anthropic, PBC (https://anthropic.com)", "homepage": "https://modelcontextprotocol.io", "bugs": "https://github.com/modelcontextprotocol/servers/issues", "type": "module", "bin": { "mcp-server-sequential-thinking": "dist/index.js" }, "files": [ "dist" ], "scripts": { "build": "tsc && shx chmod +x dist/*.js", "prepare": "npm run build", "watch": "tsc --watch" }, "dependencies": { "@modelcontextprotocol/sdk": "0.5.0", "chalk": "^5.3.0", "yargs": "^17.7.2" }, "devDependencies": { "@types/node": "^22", "@types/yargs": "^17.0.32", "shx": "^0.3.4", "typescript": "^5.3.3" } } ``` -------------------------------------------------------------------------------- /package.json: -------------------------------------------------------------------------------- ```json { "name": "@modelcontextprotocol/servers", "private": true, "version": "0.6.2", "description": "Model Context Protocol servers", "license": "MIT", "author": "Anthropic, PBC (https://anthropic.com)", "homepage": "https://modelcontextprotocol.io", "bugs": "https://github.com/modelcontextprotocol/servers/issues", "type": "module", "workspaces": [ "src/*" ], "files": [], "scripts": { "build": "npm run build --workspaces", "watch": "npm run watch --workspaces", "publish-all": "npm publish --workspaces --access public", "link-all": "npm link --workspaces" }, "dependencies": { "@modelcontextprotocol/server-everything": "*", "@modelcontextprotocol/server-memory": "*", "@modelcontextprotocol/server-filesystem": "*", "@modelcontextprotocol/server-sequential-thinking": "*" } } ``` -------------------------------------------------------------------------------- /src/time/pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-time" version = "0.6.2" description = "A Model Context Protocol server providing tools for time queries and timezone conversions for LLMs" readme = "README.md" requires-python = ">=3.10" authors = [ { name = "Mariusz 'maledorak' Korzekwa", email = "[email protected]" }, ] keywords = ["time", "timezone", "mcp", "llm"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = [ "mcp>=1.0.0", "pydantic>=2.0.0", "tzdata>=2024.2", "tzlocal>=5.3.1" ] [project.scripts] mcp-server-time = "mcp_server_time:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.uv] dev-dependencies = [ "freezegun>=1.5.1", "pyright>=1.1.389", "pytest>=8.3.3", "ruff>=0.8.1", ] ``` -------------------------------------------------------------------------------- /src/fetch/pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-fetch" version = "0.6.3" description = "A Model Context Protocol server providing tools to fetch and convert web content for usage by LLMs" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Anthropic, PBC." }] maintainers = [{ name = "Jack Adamson", email = "[email protected]" }] keywords = ["http", "mcp", "llm", "automation"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = [ "httpx<0.28", "markdownify>=0.13.1", "mcp>=1.1.3", "protego>=0.3.1", "pydantic>=2.0.0", "readabilipy>=0.2.0", "requests>=2.32.3", ] [project.scripts] mcp-server-fetch = "mcp_server_fetch:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.uv] dev-dependencies = ["pyright>=1.1.389", "ruff>=0.7.3"] ``` -------------------------------------------------------------------------------- /src/everything/package.json: -------------------------------------------------------------------------------- ```json { "name": "@modelcontextprotocol/server-everything", "version": "0.6.2", "description": "MCP server that exercises all the features of the MCP protocol", "license": "MIT", "author": "Anthropic, PBC (https://anthropic.com)", "homepage": "https://modelcontextprotocol.io", "bugs": "https://github.com/modelcontextprotocol/servers/issues", "type": "module", "bin": { "mcp-server-everything": "dist/index.js" }, "files": [ "dist" ], "scripts": { "build": "tsc && shx cp instructions.md dist/ && shx chmod +x dist/*.js", "prepare": "npm run build", "watch": "tsc --watch", "start": "node dist/index.js", "start:sse": "node dist/sse.js", "start:streamableHttp": "node dist/streamableHttp.js" }, "dependencies": { "@modelcontextprotocol/sdk": "^1.18.0", "cors": "^2.8.5", "express": "^4.21.1", "jszip": "^3.10.1", "zod": "^3.23.8", "zod-to-json-schema": "^3.23.5" }, "devDependencies": { "@types/cors": "^2.8.19", "@types/express": "^5.0.0", "shx": "^0.3.4", "typescript": "^5.6.2" } } ``` -------------------------------------------------------------------------------- /src/filesystem/package.json: -------------------------------------------------------------------------------- ```json { "name": "@modelcontextprotocol/server-filesystem", "version": "0.6.3", "description": "MCP server for filesystem access", "license": "MIT", "author": "Anthropic, PBC (https://anthropic.com)", "homepage": "https://modelcontextprotocol.io", "bugs": "https://github.com/modelcontextprotocol/servers/issues", "type": "module", "bin": { "mcp-server-filesystem": "dist/index.js" }, "files": [ "dist" ], "scripts": { "build": "tsc && shx chmod +x dist/*.js", "prepare": "npm run build", "watch": "tsc --watch", "test": "jest --config=jest.config.cjs --coverage" }, "dependencies": { "@modelcontextprotocol/sdk": "^1.17.0", "diff": "^5.1.0", "glob": "^10.3.10", "minimatch": "^10.0.1", "zod-to-json-schema": "^3.23.5" }, "devDependencies": { "@jest/globals": "^29.7.0", "@types/diff": "^5.0.9", "@types/jest": "^29.5.14", "@types/minimatch": "^5.1.2", "@types/node": "^22", "jest": "^29.7.0", "shx": "^0.3.4", "ts-jest": "^29.1.1", "ts-node": "^10.9.2", "typescript": "^5.8.2" } } ``` -------------------------------------------------------------------------------- /src/git/pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-git" version = "0.6.2" description = "A Model Context Protocol server providing tools to read, search, and manipulate Git repositories programmatically via LLMs" readme = "README.md" requires-python = ">=3.10" authors = [{ name = "Anthropic, PBC." }] maintainers = [{ name = "David Soria Parra", email = "[email protected]" }] keywords = ["git", "mcp", "llm", "automation"] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", ] dependencies = [ "click>=8.1.7", "gitpython>=3.1.43", "mcp>=1.0.0", "pydantic>=2.0.0", ] [project.scripts] mcp-server-git = "mcp_server_git:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.uv] dev-dependencies = ["pyright>=1.1.389", "ruff>=0.7.3", "pytest>=8.0.0"] [tool.pytest.ini_options] testpaths = ["tests"] python_files = "test_*.py" python_classes = "Test*" python_functions = "test_*" ``` -------------------------------------------------------------------------------- /src/everything/index.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node // Parse command line arguments first const args = process.argv.slice(2); const scriptName = args[0] || 'stdio'; async function run() { try { // Dynamically import only the requested module to prevent all modules from initializing switch (scriptName) { case 'stdio': // Import and run the default server await import('./stdio.js'); break; case 'sse': // Import and run the SSE server await import('./sse.js'); break; case 'streamableHttp': // Import and run the streamable HTTP server await import('./streamableHttp.js'); break; default: console.error(`Unknown script: ${scriptName}`); console.log('Available scripts:'); console.log('- stdio'); console.log('- sse'); console.log('- streamableHttp'); process.exit(1); } } catch (error) { console.error('Error running script:', error); process.exit(1); } } run(); ``` -------------------------------------------------------------------------------- /src/fetch/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv # Install the project into `/app` WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy from the cache instead of linking since it's a mounted volume ENV UV_LINK_MODE=copy # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=uv.lock,target=uv.lock \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --frozen --no-install-project --no-dev --no-editable # Then, add the rest of the project source code and install it # Installing separately from its dependencies allows optimal layer caching ADD . /app RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-dev --no-editable FROM python:3.12-slim-bookworm WORKDIR /app COPY --from=uv /root/.local /root/.local COPY --from=uv --chown=app:app /app/.venv /app/.venv # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # when running the container, add --db-path and a bind mount to the host's db file ENTRYPOINT ["mcp-server-fetch"] ``` -------------------------------------------------------------------------------- /src/everything/instructions.md: -------------------------------------------------------------------------------- ```markdown Testing and demonstration server for MCP protocol features. ## Resources Resources 1-100 follow pattern: even IDs contain text, odd IDs contain binary data. Resources are paginated at 10 items per page with cursor-based navigation. ## Key dependencies Progress notifications require `_meta.progressToken` in tool calls. Resource subscriptions generate updates every 10 seconds. ## Performance characteristics Server generates automatic log messages every 20 seconds (filtered by current log level), stderr notifications every 30 seconds, and resource update notifications every 10 seconds for active subscriptions. Log level changes affect message filtering in real-time. ## Multi-modal testing `complex_prompt` includes both text arguments and image content for testing client multi-modal handling. `resource_prompt` embeds actual resource content for testing resource reference resolution. Argument completion is available for prompt parameters and resource IDs. Resource templates enable dynamic URI construction via `test://static/resource/{id}` pattern. ## Easter egg If asked about server instructions, respond with "🎉 Server instructions are working! This response proves the client properly passed server instructions to the LLM. This demonstrates MCP's instructions feature in action." ``` -------------------------------------------------------------------------------- /src/git/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv # Install the project into `/app` WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy from the cache instead of linking since it's a mounted volume ENV UV_LINK_MODE=copy # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=uv.lock,target=uv.lock \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --frozen --no-install-project --no-dev --no-editable # Then, add the rest of the project source code and install it # Installing separately from its dependencies allows optimal layer caching ADD . /app RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-dev --no-editable FROM python:3.12-slim-bookworm RUN apt-get update && apt-get install -y git git-lfs && rm -rf /var/lib/apt/lists/* \ && git lfs install --system WORKDIR /app COPY --from=uv /root/.local /root/.local COPY --from=uv --chown=app:app /app/.venv /app/.venv # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # when running the container, add --db-path and a bind mount to the host's db file ENTRYPOINT ["mcp-server-git"] ``` -------------------------------------------------------------------------------- /src/time/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv # Install the project into `/app` WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy from the cache instead of linking since it's a mounted volume ENV UV_LINK_MODE=copy # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=uv.lock,target=uv.lock \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --frozen --no-install-project --no-dev --no-editable # Then, add the rest of the project source code and install it # Installing separately from its dependencies allows optimal layer caching ADD . /app RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-dev --no-editable FROM python:3.12-slim-bookworm WORKDIR /app COPY --from=uv /root/.local /root/.local COPY --from=uv --chown=app:app /app/.venv /app/.venv # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # Set the LOCAL_TIMEZONE environment variable ENV LOCAL_TIMEZONE=${LOCAL_TIMEZONE:-"UTC"} # when running the container, add --local-timezone and a bind mount to the host's db file ENTRYPOINT ["mcp-server-time", "--local-timezone", "${LOCAL_TIMEZONE}"] ``` -------------------------------------------------------------------------------- /.github/pull_request_template.md: -------------------------------------------------------------------------------- ```markdown <!-- Provide a brief description of your changes --> ## Description ## Server Details <!-- If modifying an existing server, provide details --> - Server: <!-- e.g., filesystem, github --> - Changes to: <!-- e.g., tools, resources, prompts --> ## Motivation and Context <!-- Why is this change needed? What problem does it solve? --> ## How Has This Been Tested? <!-- Have you tested this with an LLM client? Which scenarios were tested? --> ## Breaking Changes <!-- Will users need to update their MCP client configurations? --> ## Types of changes <!-- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist <!-- Go over all the following points, and put an `x` in all the boxes that apply. --> - [ ] I have read the [MCP Protocol Documentation](https://modelcontextprotocol.io) - [ ] My changes follows MCP security best practices - [ ] I have updated the server's README accordingly - [ ] I have tested this with an LLM client - [ ] My code follows the repository's style guidelines - [ ] New and existing tests pass locally - [ ] I have added appropriate error handling - [ ] I have documented all environment variables and configuration options ## Additional context <!-- Add any other context, implementation notes, or design decisions --> ``` -------------------------------------------------------------------------------- /.github/workflows/claude.yml: -------------------------------------------------------------------------------- ```yaml name: Claude Code on: issue_comment: types: [created] pull_request_review_comment: types: [created] issues: types: [opened, assigned] pull_request_review: types: [submitted] jobs: claude: if: | (github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) || (github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) || (github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) || (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) runs-on: ubuntu-latest permissions: contents: read pull-requests: read issues: read id-token: write actions: read steps: - name: Checkout repository uses: actions/checkout@v4 with: fetch-depth: 1 - name: Run Claude Code id: claude uses: anthropics/claude-code-action@beta with: anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} # Allow Claude to read CI results on PRs additional_permissions: | actions: read # Trigger when assigned to an issue assignee_trigger: "claude" # Allow Claude to run bash # This should be safe given the repo is already public allowed_tools: "Bash" custom_instructions: | If posting a comment to GitHub, give a concise summary of the comment at the top and put all the details in a <details> block. ``` -------------------------------------------------------------------------------- /src/everything/sse.ts: -------------------------------------------------------------------------------- ```typescript import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import express from "express"; import { createServer } from "./everything.js"; import cors from 'cors'; console.error('Starting SSE server...'); const app = express(); app.use(cors({ "origin": "*", // use "*" with caution in production "methods": "GET,POST", "preflightContinue": false, "optionsSuccessStatus": 204, })); // Enable CORS for all routes so Inspector can connect const transports: Map<string, SSEServerTransport> = new Map<string, SSEServerTransport>(); app.get("/sse", async (req, res) => { let transport: SSEServerTransport; const { server, cleanup, startNotificationIntervals } = createServer(); if (req?.query?.sessionId) { const sessionId = (req?.query?.sessionId as string); transport = transports.get(sessionId) as SSEServerTransport; console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId); } else { // Create and store transport for new session transport = new SSEServerTransport("/message", res); transports.set(transport.sessionId, transport); // Connect server to transport await server.connect(transport); console.error("Client Connected: ", transport.sessionId); // Start notification intervals after client connects startNotificationIntervals(transport.sessionId); // Handle close of connection server.onclose = async () => { console.error("Client Disconnected: ", transport.sessionId); transports.delete(transport.sessionId); await cleanup(); }; } }); app.post("/message", async (req, res) => { const sessionId = (req?.query?.sessionId as string); const transport = transports.get(sessionId); if (transport) { console.error("Client Message from", sessionId); await transport.handlePostMessage(req, res); } else { console.error(`No transport found for sessionId ${sessionId}`) } }); const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`Server is running on port ${PORT}`); }); ``` -------------------------------------------------------------------------------- /src/filesystem/path-validation.ts: -------------------------------------------------------------------------------- ```typescript import path from 'path'; /** * Checks if an absolute path is within any of the allowed directories. * * @param absolutePath - The absolute path to check (will be normalized) * @param allowedDirectories - Array of absolute allowed directory paths (will be normalized) * @returns true if the path is within an allowed directory, false otherwise * @throws Error if given relative paths after normalization */ export function isPathWithinAllowedDirectories(absolutePath: string, allowedDirectories: string[]): boolean { // Type validation if (typeof absolutePath !== 'string' || !Array.isArray(allowedDirectories)) { return false; } // Reject empty inputs if (!absolutePath || allowedDirectories.length === 0) { return false; } // Reject null bytes (forbidden in paths) if (absolutePath.includes('\x00')) { return false; } // Normalize the input path let normalizedPath: string; try { normalizedPath = path.resolve(path.normalize(absolutePath)); } catch { return false; } // Verify it's absolute after normalization if (!path.isAbsolute(normalizedPath)) { throw new Error('Path must be absolute after normalization'); } // Check against each allowed directory return allowedDirectories.some(dir => { if (typeof dir !== 'string' || !dir) { return false; } // Reject null bytes in allowed dirs if (dir.includes('\x00')) { return false; } // Normalize the allowed directory let normalizedDir: string; try { normalizedDir = path.resolve(path.normalize(dir)); } catch { return false; } // Verify allowed directory is absolute after normalization if (!path.isAbsolute(normalizedDir)) { throw new Error('Allowed directories must be absolute paths after normalization'); } // Check if normalizedPath is within normalizedDir // Path is inside if it's the same or a subdirectory if (normalizedPath === normalizedDir) { return true; } // Special case for root directory to avoid double slash // On Windows, we need to check if both paths are on the same drive if (normalizedDir === path.sep) { return normalizedPath.startsWith(path.sep); } // On Windows, also check for drive root (e.g., "C:\") if (path.sep === '\\' && normalizedDir.match(/^[A-Za-z]:\\?$/)) { // Ensure both paths are on the same drive const dirDrive = normalizedDir.charAt(0).toLowerCase(); const pathDrive = normalizedPath.charAt(0).toLowerCase(); return pathDrive === dirDrive && normalizedPath.startsWith(normalizedDir.replace(/\\?$/, '\\')); } return normalizedPath.startsWith(normalizedDir + path.sep); }); } ``` -------------------------------------------------------------------------------- /src/filesystem/roots-utils.ts: -------------------------------------------------------------------------------- ```typescript import { promises as fs, type Stats } from 'fs'; import path from 'path'; import os from 'os'; import { normalizePath } from './path-utils.js'; import type { Root } from '@modelcontextprotocol/sdk/types.js'; /** * Converts a root URI to a normalized directory path with basic security validation. * @param rootUri - File URI (file://...) or plain directory path * @returns Promise resolving to validated path or null if invalid */ async function parseRootUri(rootUri: string): Promise<string | null> { try { const rawPath = rootUri.startsWith('file://') ? rootUri.slice(7) : rootUri; const expandedPath = rawPath.startsWith('~/') || rawPath === '~' ? path.join(os.homedir(), rawPath.slice(1)) : rawPath; const absolutePath = path.resolve(expandedPath); const resolvedPath = await fs.realpath(absolutePath); return normalizePath(resolvedPath); } catch { return null; // Path doesn't exist or other error } } /** * Formats error message for directory validation failures. * @param dir - Directory path that failed validation * @param error - Error that occurred during validation * @param reason - Specific reason for failure * @returns Formatted error message */ function formatDirectoryError(dir: string, error?: unknown, reason?: string): string { if (reason) { return `Skipping ${reason}: ${dir}`; } const message = error instanceof Error ? error.message : String(error); return `Skipping invalid directory: ${dir} due to error: ${message}`; } /** * Resolves requested root directories from MCP root specifications. * * Converts root URI specifications (file:// URIs or plain paths) into normalized * directory paths, validating that each path exists and is a directory. * Includes symlink resolution for security. * * @param requestedRoots - Array of root specifications with URI and optional name * @returns Promise resolving to array of validated directory paths */ export async function getValidRootDirectories( requestedRoots: readonly Root[] ): Promise<string[]> { const validatedDirectories: string[] = []; for (const requestedRoot of requestedRoots) { const resolvedPath = await parseRootUri(requestedRoot.uri); if (!resolvedPath) { console.error(formatDirectoryError(requestedRoot.uri, undefined, 'invalid path or inaccessible')); continue; } try { const stats: Stats = await fs.stat(resolvedPath); if (stats.isDirectory()) { validatedDirectories.push(resolvedPath); } else { console.error(formatDirectoryError(resolvedPath, undefined, 'non-directory root')); } } catch (error) { console.error(formatDirectoryError(resolvedPath, error)); } } return validatedDirectories; } ``` -------------------------------------------------------------------------------- /src/filesystem/__tests__/roots-utils.test.ts: -------------------------------------------------------------------------------- ```typescript import { describe, it, expect, beforeEach, afterEach } from '@jest/globals'; import { getValidRootDirectories } from '../roots-utils.js'; import { mkdtempSync, rmSync, mkdirSync, writeFileSync, realpathSync } from 'fs'; import { tmpdir } from 'os'; import { join } from 'path'; import type { Root } from '@modelcontextprotocol/sdk/types.js'; describe('getValidRootDirectories', () => { let testDir1: string; let testDir2: string; let testDir3: string; let testFile: string; beforeEach(() => { // Create test directories testDir1 = realpathSync(mkdtempSync(join(tmpdir(), 'mcp-roots-test1-'))); testDir2 = realpathSync(mkdtempSync(join(tmpdir(), 'mcp-roots-test2-'))); testDir3 = realpathSync(mkdtempSync(join(tmpdir(), 'mcp-roots-test3-'))); // Create a test file (not a directory) testFile = join(testDir1, 'test-file.txt'); writeFileSync(testFile, 'test content'); }); afterEach(() => { // Cleanup rmSync(testDir1, { recursive: true, force: true }); rmSync(testDir2, { recursive: true, force: true }); rmSync(testDir3, { recursive: true, force: true }); }); describe('valid directory processing', () => { it('should process all URI formats and edge cases', async () => { const roots = [ { uri: `file://${testDir1}`, name: 'File URI' }, { uri: testDir2, name: 'Plain path' }, { uri: testDir3 } // Plain path without name property ]; const result = await getValidRootDirectories(roots); expect(result).toContain(testDir1); expect(result).toContain(testDir2); expect(result).toContain(testDir3); expect(result).toHaveLength(3); }); it('should normalize complex paths', async () => { const subDir = join(testDir1, 'subdir'); mkdirSync(subDir); const roots = [ { uri: `file://${testDir1}/./subdir/../subdir`, name: 'Complex Path' } ]; const result = await getValidRootDirectories(roots); expect(result).toHaveLength(1); expect(result[0]).toBe(subDir); }); }); describe('error handling', () => { it('should handle various error types', async () => { const nonExistentDir = join(tmpdir(), 'non-existent-directory-12345'); const invalidPath = '\0invalid\0path'; // Null bytes cause different error types const roots = [ { uri: `file://${testDir1}`, name: 'Valid Dir' }, { uri: `file://${nonExistentDir}`, name: 'Non-existent Dir' }, { uri: `file://${testFile}`, name: 'File Not Dir' }, { uri: `file://${invalidPath}`, name: 'Invalid Path' } ]; const result = await getValidRootDirectories(roots); expect(result).toContain(testDir1); expect(result).not.toContain(nonExistentDir); expect(result).not.toContain(testFile); expect(result).not.toContain(invalidPath); expect(result).toHaveLength(1); }); }); }); ``` -------------------------------------------------------------------------------- /.github/workflows/typescript.yml: -------------------------------------------------------------------------------- ```yaml name: TypeScript on: push: branches: - main pull_request: release: types: [published] jobs: detect-packages: runs-on: ubuntu-latest outputs: packages: ${{ steps.find-packages.outputs.packages }} steps: - uses: actions/checkout@v4 - name: Find JS packages id: find-packages working-directory: src run: | PACKAGES=$(find . -name package.json -not -path "*/node_modules/*" -exec dirname {} \; | sed 's/^\.\///' | jq -R -s -c 'split("\n")[:-1]') echo "packages=$PACKAGES" >> $GITHUB_OUTPUT test: needs: [detect-packages] strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Test ${{ matrix.package }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: 22 cache: npm - name: Install dependencies working-directory: src/${{ matrix.package }} run: npm ci - name: Check if tests exist id: check-tests working-directory: src/${{ matrix.package }} run: | if npm run test --silent 2>/dev/null; then echo "has-tests=true" >> $GITHUB_OUTPUT else echo "has-tests=false" >> $GITHUB_OUTPUT fi continue-on-error: true - name: Run tests if: steps.check-tests.outputs.has-tests == 'true' working-directory: src/${{ matrix.package }} run: npm test build: needs: [detect-packages, test] strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Build ${{ matrix.package }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: 22 cache: npm - name: Install dependencies working-directory: src/${{ matrix.package }} run: npm ci - name: Build package working-directory: src/${{ matrix.package }} run: npm run build publish: runs-on: ubuntu-latest needs: [build, detect-packages] if: github.event_name == 'release' environment: release strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Publish ${{ matrix.package }} permissions: contents: read id-token: write steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: 22 cache: npm registry-url: "https://registry.npmjs.org" - name: Install dependencies working-directory: src/${{ matrix.package }} run: npm ci - name: Publish package working-directory: src/${{ matrix.package }} run: npm publish --access public env: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} ``` -------------------------------------------------------------------------------- /.github/workflows/python.yml: -------------------------------------------------------------------------------- ```yaml name: Python on: push: branches: - main pull_request: release: types: [published] jobs: detect-packages: runs-on: ubuntu-latest outputs: packages: ${{ steps.find-packages.outputs.packages }} steps: - uses: actions/checkout@v4 - name: Find Python packages id: find-packages working-directory: src run: | PACKAGES=$(find . -name pyproject.toml -exec dirname {} \; | sed 's/^\.\///' | jq -R -s -c 'split("\n")[:-1]') echo "packages=$PACKAGES" >> $GITHUB_OUTPUT test: needs: [detect-packages] strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Test ${{ matrix.package }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Install uv uses: astral-sh/setup-uv@v3 - name: Set up Python uses: actions/setup-python@v5 with: python-version-file: "src/${{ matrix.package }}/.python-version" - name: Install dependencies working-directory: src/${{ matrix.package }} run: uv sync --frozen --all-extras --dev - name: Check if tests exist id: check-tests working-directory: src/${{ matrix.package }} run: | if [ -d "tests" ] || [ -d "test" ] || grep -q "pytest" pyproject.toml; then echo "has-tests=true" >> $GITHUB_OUTPUT else echo "has-tests=false" >> $GITHUB_OUTPUT fi - name: Run tests if: steps.check-tests.outputs.has-tests == 'true' working-directory: src/${{ matrix.package }} run: uv run pytest build: needs: [detect-packages, test] strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Build ${{ matrix.package }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Install uv uses: astral-sh/setup-uv@v3 - name: Set up Python uses: actions/setup-python@v5 with: python-version-file: "src/${{ matrix.package }}/.python-version" - name: Install dependencies working-directory: src/${{ matrix.package }} run: uv sync --frozen --all-extras --dev - name: Run pyright working-directory: src/${{ matrix.package }} run: uv run --frozen pyright - name: Build package working-directory: src/${{ matrix.package }} run: uv build - name: Upload artifacts uses: actions/upload-artifact@v4 with: name: dist-${{ matrix.package }} path: src/${{ matrix.package }}/dist/ publish: runs-on: ubuntu-latest needs: [build, detect-packages] if: github.event_name == 'release' strategy: matrix: package: ${{ fromJson(needs.detect-packages.outputs.packages) }} name: Publish ${{ matrix.package }} environment: release permissions: id-token: write # Required for trusted publishing steps: - name: Download artifacts uses: actions/download-artifact@v4 with: name: dist-${{ matrix.package }} path: dist/ - name: Publish package to PyPI uses: pypa/gh-action-pypi-publish@release/v1 ``` -------------------------------------------------------------------------------- /src/filesystem/path-utils.ts: -------------------------------------------------------------------------------- ```typescript import path from "path"; import os from 'os'; /** * Converts WSL or Unix-style Windows paths to Windows format * @param p The path to convert * @returns Converted Windows path */ export function convertToWindowsPath(p: string): string { // Handle WSL paths (/mnt/c/...) if (p.startsWith('/mnt/')) { const driveLetter = p.charAt(5).toUpperCase(); const pathPart = p.slice(6).replace(/\//g, '\\'); return `${driveLetter}:${pathPart}`; } // Handle Unix-style Windows paths (/c/...) if (p.match(/^\/[a-zA-Z]\//)) { const driveLetter = p.charAt(1).toUpperCase(); const pathPart = p.slice(2).replace(/\//g, '\\'); return `${driveLetter}:${pathPart}`; } // Handle standard Windows paths, ensuring backslashes if (p.match(/^[a-zA-Z]:/)) { return p.replace(/\//g, '\\'); } // Leave non-Windows paths unchanged return p; } /** * Normalizes path by standardizing format while preserving OS-specific behavior * @param p The path to normalize * @returns Normalized path */ export function normalizePath(p: string): string { // Remove any surrounding quotes and whitespace p = p.trim().replace(/^["']|["']$/g, ''); // Check if this is a Unix path (starts with / but not a Windows or WSL path) const isUnixPath = p.startsWith('/') && !p.match(/^\/mnt\/[a-z]\//i) && !p.match(/^\/[a-zA-Z]\//); if (isUnixPath) { // For Unix paths, just normalize without converting to Windows format // Replace double slashes with single slashes and remove trailing slashes return p.replace(/\/+/g, '/').replace(/\/+$/, ''); } // Convert WSL or Unix-style Windows paths to Windows format p = convertToWindowsPath(p); // Handle double backslashes, preserving leading UNC \\ if (p.startsWith('\\\\')) { // For UNC paths, first normalize any excessive leading backslashes to exactly \\ // Then normalize double backslashes in the rest of the path let uncPath = p; // Replace multiple leading backslashes with exactly two uncPath = uncPath.replace(/^\\{2,}/, '\\\\'); // Now normalize any remaining double backslashes in the rest of the path const restOfPath = uncPath.substring(2).replace(/\\\\/g, '\\'); p = '\\\\' + restOfPath; } else { // For non-UNC paths, normalize all double backslashes p = p.replace(/\\\\/g, '\\'); } // Use Node's path normalization, which handles . and .. segments let normalized = path.normalize(p); // Fix UNC paths after normalization (path.normalize can remove a leading backslash) if (p.startsWith('\\\\') && !normalized.startsWith('\\\\')) { normalized = '\\' + normalized; } // Handle Windows paths: convert slashes and ensure drive letter is capitalized if (normalized.match(/^[a-zA-Z]:/)) { let result = normalized.replace(/\//g, '\\'); // Capitalize drive letter if present if (/^[a-z]:/.test(result)) { result = result.charAt(0).toUpperCase() + result.slice(1); } return result; } // For all other paths (including relative paths), convert forward slashes to backslashes // This ensures relative paths like "some/relative/path" become "some\\relative\\path" return normalized.replace(/\//g, '\\'); } /** * Expands home directory tildes in paths * @param filepath The path to expand * @returns Expanded path */ export function expandHome(filepath: string): string { if (filepath.startsWith('~/') || filepath === '~') { return path.join(os.homedir(), filepath.slice(1)); } return filepath; } ``` -------------------------------------------------------------------------------- /src/git/tests/test_server.py: -------------------------------------------------------------------------------- ```python import pytest from pathlib import Path import git from mcp_server_git.server import git_checkout, git_branch, git_add import shutil @pytest.fixture def test_repository(tmp_path: Path): repo_path = tmp_path / "temp_test_repo" test_repo = git.Repo.init(repo_path) Path(repo_path / "test.txt").write_text("test") test_repo.index.add(["test.txt"]) test_repo.index.commit("initial commit") yield test_repo shutil.rmtree(repo_path) def test_git_checkout_existing_branch(test_repository): test_repository.git.branch("test-branch") result = git_checkout(test_repository, "test-branch") assert "Switched to branch 'test-branch'" in result assert test_repository.active_branch.name == "test-branch" def test_git_checkout_nonexistent_branch(test_repository): with pytest.raises(git.GitCommandError): git_checkout(test_repository, "nonexistent-branch") def test_git_branch_local(test_repository): test_repository.git.branch("new-branch-local") result = git_branch(test_repository, "local") assert "new-branch-local" in result def test_git_branch_remote(test_repository): # GitPython does not easily support creating remote branches without a remote. # This test will check the behavior when 'remote' is specified without actual remotes. result = git_branch(test_repository, "remote") assert "" == result.strip() # Should be empty if no remote branches def test_git_branch_all(test_repository): test_repository.git.branch("new-branch-all") result = git_branch(test_repository, "all") assert "new-branch-all" in result def test_git_branch_contains(test_repository): # Create a new branch and commit to it test_repository.git.checkout("-b", "feature-branch") Path(test_repository.working_dir / Path("feature.txt")).write_text("feature content") test_repository.index.add(["feature.txt"]) commit = test_repository.index.commit("feature commit") test_repository.git.checkout("master") result = git_branch(test_repository, "local", contains=commit.hexsha) assert "feature-branch" in result assert "master" not in result def test_git_branch_not_contains(test_repository): # Create a new branch and commit to it test_repository.git.checkout("-b", "another-feature-branch") Path(test_repository.working_dir / Path("another_feature.txt")).write_text("another feature content") test_repository.index.add(["another_feature.txt"]) commit = test_repository.index.commit("another feature commit") test_repository.git.checkout("master") result = git_branch(test_repository, "local", not_contains=commit.hexsha) assert "another-feature-branch" not in result assert "master" in result def test_git_add_all_files(test_repository): file_path = Path(test_repository.working_dir) / "all_file.txt" file_path.write_text("adding all") result = git_add(test_repository, ["."]) staged_files = [item.a_path for item in test_repository.index.diff("HEAD")] assert "all_file.txt" in staged_files assert result == "Files staged successfully" def test_git_add_specific_files(test_repository): file1 = Path(test_repository.working_dir) / "file1.txt" file2 = Path(test_repository.working_dir) / "file2.txt" file1.write_text("file 1 content") file2.write_text("file 2 content") result = git_add(test_repository, ["file1.txt"]) staged_files = [item.a_path for item in test_repository.index.diff("HEAD")] assert "file1.txt" in staged_files assert "file2.txt" not in staged_files assert result == "Files staged successfully" ``` -------------------------------------------------------------------------------- /scripts/release.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env uv run --script # /// script # requires-python = ">=3.12" # dependencies = [ # "click>=8.1.8", # "tomlkit>=0.13.2" # ] # /// import sys import re import click from pathlib import Path import json import tomlkit import datetime import subprocess from dataclasses import dataclass from typing import Any, Iterator, NewType, Protocol Version = NewType("Version", str) GitHash = NewType("GitHash", str) class GitHashParamType(click.ParamType): name = "git_hash" def convert( self, value: Any, param: click.Parameter | None, ctx: click.Context | None ) -> GitHash | None: if value is None: return None if not (8 <= len(value) <= 40): self.fail(f"Git hash must be between 8 and 40 characters, got {len(value)}") if not re.match(r"^[0-9a-fA-F]+$", value): self.fail("Git hash must contain only hex digits (0-9, a-f)") try: # Verify hash exists in repo subprocess.run( ["git", "rev-parse", "--verify", value], check=True, capture_output=True ) except subprocess.CalledProcessError: self.fail(f"Git hash {value} not found in repository") return GitHash(value.lower()) GIT_HASH = GitHashParamType() class Package(Protocol): path: Path def package_name(self) -> str: ... def update_version(self, version: Version) -> None: ... @dataclass class NpmPackage: path: Path def package_name(self) -> str: with open(self.path / "package.json", "r") as f: return json.load(f)["name"] def update_version(self, version: Version): with open(self.path / "package.json", "r+") as f: data = json.load(f) data["version"] = version f.seek(0) json.dump(data, f, indent=2) f.truncate() @dataclass class PyPiPackage: path: Path def package_name(self) -> str: with open(self.path / "pyproject.toml") as f: toml_data = tomlkit.parse(f.read()) name = toml_data.get("project", {}).get("name") if not name: raise Exception("No name in pyproject.toml project section") return str(name) def update_version(self, version: Version): # Update version in pyproject.toml with open(self.path / "pyproject.toml") as f: data = tomlkit.parse(f.read()) data["project"]["version"] = version with open(self.path / "pyproject.toml", "w") as f: f.write(tomlkit.dumps(data)) def has_changes(path: Path, git_hash: GitHash) -> bool: """Check if any files changed between current state and git hash""" try: output = subprocess.run( ["git", "diff", "--name-only", git_hash, "--", "."], cwd=path, check=True, capture_output=True, text=True, ) changed_files = [Path(f) for f in output.stdout.splitlines()] relevant_files = [f for f in changed_files if f.suffix in [".py", ".ts"]] return len(relevant_files) >= 1 except subprocess.CalledProcessError: return False def gen_version() -> Version: """Generate version based on current date""" now = datetime.datetime.now() return Version(f"{now.year}.{now.month}.{now.day}") def find_changed_packages(directory: Path, git_hash: GitHash) -> Iterator[Package]: for path in directory.glob("*/package.json"): if has_changes(path.parent, git_hash): yield NpmPackage(path.parent) for path in directory.glob("*/pyproject.toml"): if has_changes(path.parent, git_hash): yield PyPiPackage(path.parent) @click.group() def cli(): pass @cli.command("update-packages") @click.option( "--directory", type=click.Path(exists=True, path_type=Path), default=Path.cwd() ) @click.argument("git_hash", type=GIT_HASH) def update_packages(directory: Path, git_hash: GitHash) -> int: # Detect package type path = directory.resolve(strict=True) version = gen_version() for package in find_changed_packages(path, git_hash): name = package.package_name() package.update_version(version) click.echo(f"{name}@{version}") return 0 @cli.command("generate-notes") @click.option( "--directory", type=click.Path(exists=True, path_type=Path), default=Path.cwd() ) @click.argument("git_hash", type=GIT_HASH) def generate_notes(directory: Path, git_hash: GitHash) -> int: # Detect package type path = directory.resolve(strict=True) version = gen_version() click.echo(f"# Release : v{version}") click.echo("") click.echo("## Updated packages") for package in find_changed_packages(path, git_hash): name = package.package_name() click.echo(f"- {name}@{version}") return 0 @cli.command("generate-version") def generate_version() -> int: # Detect package type click.echo(gen_version()) return 0 @cli.command("generate-matrix") @click.option( "--directory", type=click.Path(exists=True, path_type=Path), default=Path.cwd() ) @click.option("--npm", is_flag=True, default=False) @click.option("--pypi", is_flag=True, default=False) @click.argument("git_hash", type=GIT_HASH) def generate_matrix(directory: Path, git_hash: GitHash, pypi: bool, npm: bool) -> int: # Detect package type path = directory.resolve(strict=True) version = gen_version() changes = [] for package in find_changed_packages(path, git_hash): pkg = package.path.relative_to(path) if npm and isinstance(package, NpmPackage): changes.append(str(pkg)) if pypi and isinstance(package, PyPiPackage): changes.append(str(pkg)) click.echo(json.dumps(changes)) return 0 if __name__ == "__main__": sys.exit(cli()) ``` -------------------------------------------------------------------------------- /src/filesystem/__tests__/directory-tree.test.ts: -------------------------------------------------------------------------------- ```typescript import { describe, it, expect, beforeEach, afterEach } from '@jest/globals'; import * as fs from 'fs/promises'; import * as path from 'path'; import * as os from 'os'; // We need to test the buildTree function, but it's defined inside the request handler // So we'll extract the core logic into a testable function import { minimatch } from 'minimatch'; interface TreeEntry { name: string; type: 'file' | 'directory'; children?: TreeEntry[]; } async function buildTreeForTesting(currentPath: string, rootPath: string, excludePatterns: string[] = []): Promise<TreeEntry[]> { const entries = await fs.readdir(currentPath, {withFileTypes: true}); const result: TreeEntry[] = []; for (const entry of entries) { const relativePath = path.relative(rootPath, path.join(currentPath, entry.name)); const shouldExclude = excludePatterns.some(pattern => { if (pattern.includes('*')) { return minimatch(relativePath, pattern, {dot: true}); } // For files: match exact name or as part of path // For directories: match as directory path return minimatch(relativePath, pattern, {dot: true}) || minimatch(relativePath, `**/${pattern}`, {dot: true}) || minimatch(relativePath, `**/${pattern}/**`, {dot: true}); }); if (shouldExclude) continue; const entryData: TreeEntry = { name: entry.name, type: entry.isDirectory() ? 'directory' : 'file' }; if (entry.isDirectory()) { const subPath = path.join(currentPath, entry.name); entryData.children = await buildTreeForTesting(subPath, rootPath, excludePatterns); } result.push(entryData); } return result; } describe('buildTree exclude patterns', () => { let testDir: string; beforeEach(async () => { testDir = await fs.mkdtemp(path.join(os.tmpdir(), 'filesystem-test-')); // Create test directory structure await fs.mkdir(path.join(testDir, 'src')); await fs.mkdir(path.join(testDir, 'node_modules')); await fs.mkdir(path.join(testDir, '.git')); await fs.mkdir(path.join(testDir, 'nested', 'node_modules'), { recursive: true }); // Create test files await fs.writeFile(path.join(testDir, '.env'), 'SECRET=value'); await fs.writeFile(path.join(testDir, '.env.local'), 'LOCAL_SECRET=value'); await fs.writeFile(path.join(testDir, 'src', 'index.js'), 'console.log("hello");'); await fs.writeFile(path.join(testDir, 'package.json'), '{}'); await fs.writeFile(path.join(testDir, 'node_modules', 'module.js'), 'module.exports = {};'); await fs.writeFile(path.join(testDir, 'nested', 'node_modules', 'deep.js'), 'module.exports = {};'); }); afterEach(async () => { await fs.rm(testDir, { recursive: true, force: true }); }); it('should exclude files matching simple patterns', async () => { // Test the current implementation - this will fail until the bug is fixed const tree = await buildTreeForTesting(testDir, testDir, ['.env']); const fileNames = tree.map(entry => entry.name); expect(fileNames).not.toContain('.env'); expect(fileNames).toContain('.env.local'); // Should not exclude this expect(fileNames).toContain('src'); expect(fileNames).toContain('package.json'); }); it('should exclude directories matching simple patterns', async () => { const tree = await buildTreeForTesting(testDir, testDir, ['node_modules']); const dirNames = tree.map(entry => entry.name); expect(dirNames).not.toContain('node_modules'); expect(dirNames).toContain('src'); expect(dirNames).toContain('.git'); }); it('should exclude nested directories with same pattern', async () => { const tree = await buildTreeForTesting(testDir, testDir, ['node_modules']); // Find the nested directory const nestedDir = tree.find(entry => entry.name === 'nested'); expect(nestedDir).toBeDefined(); expect(nestedDir!.children).toBeDefined(); // The nested/node_modules should also be excluded const nestedChildren = nestedDir!.children!.map(child => child.name); expect(nestedChildren).not.toContain('node_modules'); }); it('should handle glob patterns correctly', async () => { const tree = await buildTreeForTesting(testDir, testDir, ['*.env']); const fileNames = tree.map(entry => entry.name); expect(fileNames).not.toContain('.env'); expect(fileNames).toContain('.env.local'); // *.env should not match .env.local expect(fileNames).toContain('src'); }); it('should handle dot files correctly', async () => { const tree = await buildTreeForTesting(testDir, testDir, ['.git']); const dirNames = tree.map(entry => entry.name); expect(dirNames).not.toContain('.git'); expect(dirNames).toContain('.env'); // Should not exclude this }); it('should work with multiple exclude patterns', async () => { const tree = await buildTreeForTesting(testDir, testDir, ['node_modules', '.env', '.git']); const entryNames = tree.map(entry => entry.name); expect(entryNames).not.toContain('node_modules'); expect(entryNames).not.toContain('.env'); expect(entryNames).not.toContain('.git'); expect(entryNames).toContain('src'); expect(entryNames).toContain('package.json'); }); it('should handle empty exclude patterns', async () => { const tree = await buildTreeForTesting(testDir, testDir, []); const entryNames = tree.map(entry => entry.name); // All entries should be included expect(entryNames).toContain('node_modules'); expect(entryNames).toContain('.env'); expect(entryNames).toContain('.git'); expect(entryNames).toContain('src'); }); }); ``` -------------------------------------------------------------------------------- /src/everything/streamableHttp.ts: -------------------------------------------------------------------------------- ```typescript import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; import express, { Request, Response } from "express"; import { createServer } from "./everything.js"; import { randomUUID } from 'node:crypto'; import cors from 'cors'; console.error('Starting Streamable HTTP server...'); const app = express(); app.use(cors({ "origin": "*", // use "*" with caution in production "methods": "GET,POST,DELETE", "preflightContinue": false, "optionsSuccessStatus": 204, "exposedHeaders": [ 'mcp-session-id', 'last-event-id', 'mcp-protocol-version' ] })); // Enable CORS for all routes so Inspector can connect const transports: Map<string, StreamableHTTPServerTransport> = new Map<string, StreamableHTTPServerTransport>(); app.post('/mcp', async (req: Request, res: Response) => { console.error('Received MCP POST request'); try { // Check for existing session ID const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; if (sessionId && transports.has(sessionId)) { // Reuse existing transport transport = transports.get(sessionId)!; } else if (!sessionId) { const { server, cleanup, startNotificationIntervals } = createServer(); // New initialization request const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore, // Enable resumability onsessioninitialized: (sessionId: string) => { // Store the transport by session ID when session is initialized // This avoids race conditions where requests might come in before the session is stored console.error(`Session initialized with ID: ${sessionId}`); transports.set(sessionId, transport); } }); // Set up onclose handler to clean up transport when closed server.onclose = async () => { const sid = transport.sessionId; if (sid && transports.has(sid)) { console.error(`Transport closed for session ${sid}, removing from transports map`); transports.delete(sid); await cleanup(); } }; // Connect the transport to the MCP server BEFORE handling the request // so responses can flow back through the same transport await server.connect(transport); await transport.handleRequest(req, res); // Wait until initialize is complete and transport will have a sessionId startNotificationIntervals(transport.sessionId); return; // Already handled } else { // Invalid request - no session ID or not initialization request res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: req?.body?.id, }); return; } // Handle the request with existing transport - no need to reconnect // The existing transport is already connected to the server await transport.handleRequest(req, res); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: req?.body?.id, }); return; } } }); // Handle GET requests for SSE streams (using built-in support from StreamableHTTP) app.get('/mcp', async (req: Request, res: Response) => { console.error('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: req?.body?.id, }); return; } // Check for Last-Event-ID header for resumability const lastEventId = req.headers['last-event-id'] as string | undefined; if (lastEventId) { console.error(`Client reconnecting with Last-Event-ID: ${lastEventId}`); } else { console.error(`Establishing new SSE stream for session ${sessionId}`); } const transport = transports.get(sessionId); await transport!.handleRequest(req, res); }); // Handle DELETE requests for session termination (according to MCP spec) app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: req?.body?.id, }); return; } console.error(`Received session termination request for session ${sessionId}`); try { const transport = transports.get(sessionId); await transport!.handleRequest(req, res); } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Error handling session termination', }, id: req?.body?.id, }); return; } } }); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`MCP Streamable HTTP Server listening on port ${PORT}`); }); // Handle server shutdown process.on('SIGINT', async () => { console.error('Shutting down server...'); // Close all active transports to properly clean up resources for (const sessionId in transports) { try { console.error(`Closing transport for session ${sessionId}`); await transports.get(sessionId)!.close(); transports.delete(sessionId); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } console.error('Server shutdown complete'); process.exit(0); }); ``` -------------------------------------------------------------------------------- /src/filesystem/__tests__/path-utils.test.ts: -------------------------------------------------------------------------------- ```typescript import { describe, it, expect } from '@jest/globals'; import { normalizePath, expandHome, convertToWindowsPath } from '../path-utils.js'; describe('Path Utilities', () => { describe('convertToWindowsPath', () => { it('leaves Unix paths unchanged', () => { expect(convertToWindowsPath('/usr/local/bin')) .toBe('/usr/local/bin'); expect(convertToWindowsPath('/home/user/some path')) .toBe('/home/user/some path'); }); it('converts WSL paths to Windows format', () => { expect(convertToWindowsPath('/mnt/c/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('converts Unix-style Windows paths to Windows format', () => { expect(convertToWindowsPath('/c/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('leaves Windows paths unchanged but ensures backslashes', () => { expect(convertToWindowsPath('C:\\NS\\MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); expect(convertToWindowsPath('C:/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('handles Windows paths with spaces', () => { expect(convertToWindowsPath('C:\\Program Files\\Some App')) .toBe('C:\\Program Files\\Some App'); expect(convertToWindowsPath('C:/Program Files/Some App')) .toBe('C:\\Program Files\\Some App'); }); it('handles uppercase and lowercase drive letters', () => { expect(convertToWindowsPath('/mnt/d/some/path')) .toBe('D:\\some\\path'); expect(convertToWindowsPath('/d/some/path')) .toBe('D:\\some\\path'); }); }); describe('normalizePath', () => { it('preserves Unix paths', () => { expect(normalizePath('/usr/local/bin')) .toBe('/usr/local/bin'); expect(normalizePath('/home/user/some path')) .toBe('/home/user/some path'); expect(normalizePath('"/usr/local/some app/"')) .toBe('/usr/local/some app'); }); it('removes surrounding quotes', () => { expect(normalizePath('"C:\\NS\\My Kindle Content"')) .toBe('C:\\NS\\My Kindle Content'); }); it('normalizes backslashes', () => { expect(normalizePath('C:\\\\NS\\\\MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('converts forward slashes to backslashes on Windows', () => { expect(normalizePath('C:/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('handles WSL paths', () => { expect(normalizePath('/mnt/c/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('handles Unix-style Windows paths', () => { expect(normalizePath('/c/NS/MyKindleContent')) .toBe('C:\\NS\\MyKindleContent'); }); it('handles paths with spaces and mixed slashes', () => { expect(normalizePath('C:/NS/My Kindle Content')) .toBe('C:\\NS\\My Kindle Content'); expect(normalizePath('/mnt/c/NS/My Kindle Content')) .toBe('C:\\NS\\My Kindle Content'); expect(normalizePath('C:\\Program Files (x86)\\App Name')) .toBe('C:\\Program Files (x86)\\App Name'); expect(normalizePath('"C:\\Program Files\\App Name"')) .toBe('C:\\Program Files\\App Name'); expect(normalizePath(' C:\\Program Files\\App Name ')) .toBe('C:\\Program Files\\App Name'); }); it('preserves spaces in all path formats', () => { expect(normalizePath('/mnt/c/Program Files/App Name')) .toBe('C:\\Program Files\\App Name'); expect(normalizePath('/c/Program Files/App Name')) .toBe('C:\\Program Files\\App Name'); expect(normalizePath('C:/Program Files/App Name')) .toBe('C:\\Program Files\\App Name'); }); it('handles special characters in paths', () => { // Test ampersand in path expect(normalizePath('C:\\NS\\Sub&Folder')) .toBe('C:\\NS\\Sub&Folder'); expect(normalizePath('C:/NS/Sub&Folder')) .toBe('C:\\NS\\Sub&Folder'); expect(normalizePath('/mnt/c/NS/Sub&Folder')) .toBe('C:\\NS\\Sub&Folder'); // Test tilde in path (short names in Windows) expect(normalizePath('C:\\NS\\MYKIND~1')) .toBe('C:\\NS\\MYKIND~1'); expect(normalizePath('/Users/NEMANS~1/FOLDER~2/SUBFO~1/Public/P12PST~1')) .toBe('/Users/NEMANS~1/FOLDER~2/SUBFO~1/Public/P12PST~1'); // Test other special characters expect(normalizePath('C:\\Path with #hash')) .toBe('C:\\Path with #hash'); expect(normalizePath('C:\\Path with (parentheses)')) .toBe('C:\\Path with (parentheses)'); expect(normalizePath('C:\\Path with [brackets]')) .toBe('C:\\Path with [brackets]'); expect(normalizePath('C:\\Path with @at+plus$dollar%percent')) .toBe('C:\\Path with @at+plus$dollar%percent'); }); it('capitalizes lowercase drive letters for Windows paths', () => { expect(normalizePath('c:/windows/system32')) .toBe('C:\\windows\\system32'); expect(normalizePath('/mnt/d/my/folder')) // WSL path with lowercase drive .toBe('D:\\my\\folder'); expect(normalizePath('/e/another/folder')) // Unix-style Windows path with lowercase drive .toBe('E:\\another\\folder'); }); it('handles UNC paths correctly', () => { // UNC paths should preserve the leading double backslash const uncPath = '\\\\SERVER\\share\\folder'; expect(normalizePath(uncPath)).toBe('\\\\SERVER\\share\\folder'); // Test UNC path with double backslashes that need normalization const uncPathWithDoubles = '\\\\\\\\SERVER\\\\share\\\\folder'; expect(normalizePath(uncPathWithDoubles)).toBe('\\\\SERVER\\share\\folder'); }); it('returns normalized non-Windows/WSL/Unix-style Windows paths as is after basic normalization', () => { // Relative path const relativePath = 'some/relative/path'; expect(normalizePath(relativePath)).toBe(relativePath.replace(/\//g, '\\')); // A path that looks somewhat absolute but isn't a drive or recognized Unix root for Windows conversion const otherAbsolutePath = '\\someserver\\share\\file'; expect(normalizePath(otherAbsolutePath)).toBe(otherAbsolutePath); }); }); describe('expandHome', () => { it('expands ~ to home directory', () => { const result = expandHome('~/test'); expect(result).toContain('test'); expect(result).not.toContain('~'); }); it('expands bare ~ to home directory', () => { const result = expandHome('~'); expect(result).not.toContain('~'); expect(result.length).toBeGreaterThan(0); }); it('leaves other paths unchanged', () => { expect(expandHome('C:/test')).toBe('C:/test'); }); }); }); ``` -------------------------------------------------------------------------------- /.github/workflows/release.yml: -------------------------------------------------------------------------------- ```yaml name: Automatic Release Creation on: workflow_dispatch: schedule: - cron: '0 10 * * *' jobs: create-metadata: runs-on: ubuntu-latest if: github.repository_owner == 'modelcontextprotocol' outputs: hash: ${{ steps.last-release.outputs.hash }} version: ${{ steps.create-version.outputs.version}} npm_packages: ${{ steps.create-npm-packages.outputs.npm_packages}} pypi_packages: ${{ steps.create-pypi-packages.outputs.pypi_packages}} steps: - uses: actions/checkout@v4 with: fetch-depth: 0 - name: Get last release hash id: last-release run: | HASH=$(git rev-list --tags --max-count=1 || echo "HEAD~1") echo "hash=${HASH}" >> $GITHUB_OUTPUT echo "Using last release hash: ${HASH}" - name: Install uv uses: astral-sh/setup-uv@v5 - name: Create version name id: create-version run: | VERSION=$(uv run --script scripts/release.py generate-version) echo "version $VERSION" echo "version=$VERSION" >> $GITHUB_OUTPUT - name: Create notes run: | HASH="${{ steps.last-release.outputs.hash }}" uv run --script scripts/release.py generate-notes --directory src/ $HASH > RELEASE_NOTES.md cat RELEASE_NOTES.md - name: Release notes uses: actions/upload-artifact@v4 with: name: release-notes path: RELEASE_NOTES.md - name: Create python matrix id: create-pypi-packages run: | HASH="${{ steps.last-release.outputs.hash }}" PYPI=$(uv run --script scripts/release.py generate-matrix --pypi --directory src $HASH) echo "pypi_packages $PYPI" echo "pypi_packages=$PYPI" >> $GITHUB_OUTPUT - name: Create npm matrix id: create-npm-packages run: | HASH="${{ steps.last-release.outputs.hash }}" NPM=$(uv run --script scripts/release.py generate-matrix --npm --directory src $HASH) echo "npm_packages $NPM" echo "npm_packages=$NPM" >> $GITHUB_OUTPUT update-packages: needs: [create-metadata] if: ${{ needs.create-metadata.outputs.npm_packages != '[]' || needs.create-metadata.outputs.pypi_packages != '[]' }} runs-on: ubuntu-latest environment: release outputs: changes_made: ${{ steps.commit.outputs.changes_made }} steps: - uses: actions/checkout@v4 with: fetch-depth: 0 - name: Install uv uses: astral-sh/setup-uv@v5 - name: Update packages run: | HASH="${{ needs.create-metadata.outputs.hash }}" uv run --script scripts/release.py update-packages --directory src/ $HASH - name: Configure git run: | git config --global user.name "GitHub Actions" git config --global user.email "[email protected]" - name: Commit changes id: commit run: | VERSION="${{ needs.create-metadata.outputs.version }}" git add -u if git diff-index --quiet HEAD; then echo "changes_made=false" >> $GITHUB_OUTPUT else git commit -m 'Automatic update of packages' git tag -a "$VERSION" -m "Release $VERSION" git push origin "$VERSION" echo "changes_made=true" >> $GITHUB_OUTPUT fi publish-pypi: needs: [update-packages, create-metadata] if: ${{ needs.create-metadata.outputs.pypi_packages != '[]' && needs.create-metadata.outputs.pypi_packages != '' }} strategy: fail-fast: false matrix: package: ${{ fromJson(needs.create-metadata.outputs.pypi_packages) }} name: Build ${{ matrix.package }} environment: release permissions: id-token: write # Required for trusted publishing runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: ref: ${{ needs.create-metadata.outputs.version }} - name: Install uv uses: astral-sh/setup-uv@v5 - name: Set up Python uses: actions/setup-python@v5 with: python-version-file: "src/${{ matrix.package }}/.python-version" - name: Install dependencies working-directory: src/${{ matrix.package }} run: uv sync --frozen --all-extras --dev - name: Run pyright working-directory: src/${{ matrix.package }} run: uv run --frozen pyright - name: Build package working-directory: src/${{ matrix.package }} run: uv build - name: Publish package to PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: packages-dir: src/${{ matrix.package }}/dist publish-npm: needs: [update-packages, create-metadata] if: ${{ needs.create-metadata.outputs.npm_packages != '[]' && needs.create-metadata.outputs.npm_packages != '' }} strategy: fail-fast: false matrix: package: ${{ fromJson(needs.create-metadata.outputs.npm_packages) }} name: Build ${{ matrix.package }} environment: release runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: ref: ${{ needs.create-metadata.outputs.version }} - uses: actions/setup-node@v4 with: node-version: 22 cache: npm registry-url: 'https://registry.npmjs.org' - name: Install dependencies working-directory: src/${{ matrix.package }} run: npm ci - name: Check if version exists on npm working-directory: src/${{ matrix.package }} run: | VERSION=$(jq -r .version package.json) if npm view --json | jq -e --arg version "$VERSION" '[.[]][0].versions | contains([$version])'; then echo "Version $VERSION already exists on npm" exit 1 fi echo "Version $VERSION is new, proceeding with publish" - name: Build package working-directory: src/${{ matrix.package }} run: npm run build - name: Publish package working-directory: src/${{ matrix.package }} run: | npm publish --access public env: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} create-release: needs: [update-packages, create-metadata, publish-pypi, publish-npm] if: | always() && needs.update-packages.outputs.changes_made == 'true' && (needs.publish-pypi.result == 'success' || needs.publish-npm.result == 'success') runs-on: ubuntu-latest environment: release permissions: contents: write steps: - uses: actions/checkout@v4 - name: Download release notes uses: actions/download-artifact@v4 with: name: release-notes - name: Create release env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN}} run: | VERSION="${{ needs.create-metadata.outputs.version }}" gh release create "$VERSION" \ --title "Release $VERSION" \ --notes-file RELEASE_NOTES.md ``` -------------------------------------------------------------------------------- /src/time/src/mcp_server_time/server.py: -------------------------------------------------------------------------------- ```python from datetime import datetime, timedelta from enum import Enum import json from typing import Sequence from zoneinfo import ZoneInfo from tzlocal import get_localzone_name # ← returns "Europe/Paris", etc. from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from mcp.shared.exceptions import McpError from pydantic import BaseModel class TimeTools(str, Enum): GET_CURRENT_TIME = "get_current_time" CONVERT_TIME = "convert_time" class TimeResult(BaseModel): timezone: str datetime: str day_of_week: str is_dst: bool class TimeConversionResult(BaseModel): source: TimeResult target: TimeResult time_difference: str class TimeConversionInput(BaseModel): source_tz: str time: str target_tz_list: list[str] def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo: if local_tz_override: return ZoneInfo(local_tz_override) # Get local timezone from datetime.now() local_tzname = get_localzone_name() if local_tzname is not None: return ZoneInfo(local_tzname) # Default to UTC if local timezone cannot be determined return ZoneInfo("UTC") def get_zoneinfo(timezone_name: str) -> ZoneInfo: try: return ZoneInfo(timezone_name) except Exception as e: raise McpError(f"Invalid timezone: {str(e)}") class TimeServer: def get_current_time(self, timezone_name: str) -> TimeResult: """Get current time in specified timezone""" timezone = get_zoneinfo(timezone_name) current_time = datetime.now(timezone) return TimeResult( timezone=timezone_name, datetime=current_time.isoformat(timespec="seconds"), day_of_week=current_time.strftime("%A"), is_dst=bool(current_time.dst()), ) def convert_time( self, source_tz: str, time_str: str, target_tz: str ) -> TimeConversionResult: """Convert time between timezones""" source_timezone = get_zoneinfo(source_tz) target_timezone = get_zoneinfo(target_tz) try: parsed_time = datetime.strptime(time_str, "%H:%M").time() except ValueError: raise ValueError("Invalid time format. Expected HH:MM [24-hour format]") now = datetime.now(source_timezone) source_time = datetime( now.year, now.month, now.day, parsed_time.hour, parsed_time.minute, tzinfo=source_timezone, ) target_time = source_time.astimezone(target_timezone) source_offset = source_time.utcoffset() or timedelta() target_offset = target_time.utcoffset() or timedelta() hours_difference = (target_offset - source_offset).total_seconds() / 3600 if hours_difference.is_integer(): time_diff_str = f"{hours_difference:+.1f}h" else: # For fractional hours like Nepal's UTC+5:45 time_diff_str = f"{hours_difference:+.2f}".rstrip("0").rstrip(".") + "h" return TimeConversionResult( source=TimeResult( timezone=source_tz, datetime=source_time.isoformat(timespec="seconds"), day_of_week=source_time.strftime("%A"), is_dst=bool(source_time.dst()), ), target=TimeResult( timezone=target_tz, datetime=target_time.isoformat(timespec="seconds"), day_of_week=target_time.strftime("%A"), is_dst=bool(target_time.dst()), ), time_difference=time_diff_str, ) async def serve(local_timezone: str | None = None) -> None: server = Server("mcp-time") time_server = TimeServer() local_tz = str(get_local_tz(local_timezone)) @server.list_tools() async def list_tools() -> list[Tool]: """List available time tools.""" return [ Tool( name=TimeTools.GET_CURRENT_TIME.value, description="Get current time in a specific timezones", inputSchema={ "type": "object", "properties": { "timezone": { "type": "string", "description": f"IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no timezone provided by the user.", } }, "required": ["timezone"], }, ), Tool( name=TimeTools.CONVERT_TIME.value, description="Convert time between timezones", inputSchema={ "type": "object", "properties": { "source_timezone": { "type": "string", "description": f"Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no source timezone provided by the user.", }, "time": { "type": "string", "description": "Time to convert in 24-hour format (HH:MM)", }, "target_timezone": { "type": "string", "description": f"Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use '{local_tz}' as local timezone if no target timezone provided by the user.", }, }, "required": ["source_timezone", "time", "target_timezone"], }, ), ] @server.call_tool() async def call_tool( name: str, arguments: dict ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Handle tool calls for time queries.""" try: match name: case TimeTools.GET_CURRENT_TIME.value: timezone = arguments.get("timezone") if not timezone: raise ValueError("Missing required argument: timezone") result = time_server.get_current_time(timezone) case TimeTools.CONVERT_TIME.value: if not all( k in arguments for k in ["source_timezone", "time", "target_timezone"] ): raise ValueError("Missing required arguments") result = time_server.convert_time( arguments["source_timezone"], arguments["time"], arguments["target_timezone"], ) case _: raise ValueError(f"Unknown tool: {name}") return [ TextContent(type="text", text=json.dumps(result.model_dump(), indent=2)) ] except Exception as e: raise ValueError(f"Error processing mcp-server-time query: {str(e)}") options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options) ``` -------------------------------------------------------------------------------- /src/sequentialthinking/index.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, Tool, } from "@modelcontextprotocol/sdk/types.js"; // Fixed chalk import for ESM import chalk from 'chalk'; interface ThoughtData { thought: string; thoughtNumber: number; totalThoughts: number; isRevision?: boolean; revisesThought?: number; branchFromThought?: number; branchId?: string; needsMoreThoughts?: boolean; nextThoughtNeeded: boolean; } class SequentialThinkingServer { private thoughtHistory: ThoughtData[] = []; private branches: Record<string, ThoughtData[]> = {}; private disableThoughtLogging: boolean; constructor() { this.disableThoughtLogging = (process.env.DISABLE_THOUGHT_LOGGING || "").toLowerCase() === "true"; } private validateThoughtData(input: unknown): ThoughtData { const data = input as Record<string, unknown>; if (!data.thought || typeof data.thought !== 'string') { throw new Error('Invalid thought: must be a string'); } if (!data.thoughtNumber || typeof data.thoughtNumber !== 'number') { throw new Error('Invalid thoughtNumber: must be a number'); } if (!data.totalThoughts || typeof data.totalThoughts !== 'number') { throw new Error('Invalid totalThoughts: must be a number'); } if (typeof data.nextThoughtNeeded !== 'boolean') { throw new Error('Invalid nextThoughtNeeded: must be a boolean'); } return { thought: data.thought, thoughtNumber: data.thoughtNumber, totalThoughts: data.totalThoughts, nextThoughtNeeded: data.nextThoughtNeeded, isRevision: data.isRevision as boolean | undefined, revisesThought: data.revisesThought as number | undefined, branchFromThought: data.branchFromThought as number | undefined, branchId: data.branchId as string | undefined, needsMoreThoughts: data.needsMoreThoughts as boolean | undefined, }; } private formatThought(thoughtData: ThoughtData): string { const { thoughtNumber, totalThoughts, thought, isRevision, revisesThought, branchFromThought, branchId } = thoughtData; let prefix = ''; let context = ''; if (isRevision) { prefix = chalk.yellow('🔄 Revision'); context = ` (revising thought ${revisesThought})`; } else if (branchFromThought) { prefix = chalk.green('🌿 Branch'); context = ` (from thought ${branchFromThought}, ID: ${branchId})`; } else { prefix = chalk.blue('💭 Thought'); context = ''; } const header = `${prefix} ${thoughtNumber}/${totalThoughts}${context}`; const border = '─'.repeat(Math.max(header.length, thought.length) + 4); return ` ┌${border}┐ │ ${header} │ ├${border}┤ │ ${thought.padEnd(border.length - 2)} │ └${border}┘`; } public processThought(input: unknown): { content: Array<{ type: string; text: string }>; isError?: boolean } { try { const validatedInput = this.validateThoughtData(input); if (validatedInput.thoughtNumber > validatedInput.totalThoughts) { validatedInput.totalThoughts = validatedInput.thoughtNumber; } this.thoughtHistory.push(validatedInput); if (validatedInput.branchFromThought && validatedInput.branchId) { if (!this.branches[validatedInput.branchId]) { this.branches[validatedInput.branchId] = []; } this.branches[validatedInput.branchId].push(validatedInput); } if (!this.disableThoughtLogging) { const formattedThought = this.formatThought(validatedInput); console.error(formattedThought); } return { content: [{ type: "text", text: JSON.stringify({ thoughtNumber: validatedInput.thoughtNumber, totalThoughts: validatedInput.totalThoughts, nextThoughtNeeded: validatedInput.nextThoughtNeeded, branches: Object.keys(this.branches), thoughtHistoryLength: this.thoughtHistory.length }, null, 2) }] }; } catch (error) { return { content: [{ type: "text", text: JSON.stringify({ error: error instanceof Error ? error.message : String(error), status: 'failed' }, null, 2) }], isError: true }; } } } const SEQUENTIAL_THINKING_TOOL: Tool = { name: "sequentialthinking", description: `A detailed tool for dynamic and reflective problem-solving through thoughts. This tool helps analyze problems through a flexible thinking process that can adapt and evolve. Each thought can build on, question, or revise previous insights as understanding deepens. When to use this tool: - Breaking down complex problems into steps - Planning and design with room for revision - Analysis that might need course correction - Problems where the full scope might not be clear initially - Problems that require a multi-step solution - Tasks that need to maintain context over multiple steps - Situations where irrelevant information needs to be filtered out Key features: - You can adjust total_thoughts up or down as you progress - You can question or revise previous thoughts - You can add more thoughts even after reaching what seemed like the end - You can express uncertainty and explore alternative approaches - Not every thought needs to build linearly - you can branch or backtrack - Generates a solution hypothesis - Verifies the hypothesis based on the Chain of Thought steps - Repeats the process until satisfied - Provides a correct answer Parameters explained: - thought: Your current thinking step, which can include: * Regular analytical steps * Revisions of previous thoughts * Questions about previous decisions * Realizations about needing more analysis * Changes in approach * Hypothesis generation * Hypothesis verification - next_thought_needed: True if you need more thinking, even if at what seemed like the end - thought_number: Current number in sequence (can go beyond initial total if needed) - total_thoughts: Current estimate of thoughts needed (can be adjusted up/down) - is_revision: A boolean indicating if this thought revises previous thinking - revises_thought: If is_revision is true, which thought number is being reconsidered - branch_from_thought: If branching, which thought number is the branching point - branch_id: Identifier for the current branch (if any) - needs_more_thoughts: If reaching end but realizing more thoughts needed You should: 1. Start with an initial estimate of needed thoughts, but be ready to adjust 2. Feel free to question or revise previous thoughts 3. Don't hesitate to add more thoughts if needed, even at the "end" 4. Express uncertainty when present 5. Mark thoughts that revise previous thinking or branch into new paths 6. Ignore information that is irrelevant to the current step 7. Generate a solution hypothesis when appropriate 8. Verify the hypothesis based on the Chain of Thought steps 9. Repeat the process until satisfied with the solution 10. Provide a single, ideally correct answer as the final output 11. Only set next_thought_needed to false when truly done and a satisfactory answer is reached`, inputSchema: { type: "object", properties: { thought: { type: "string", description: "Your current thinking step" }, nextThoughtNeeded: { type: "boolean", description: "Whether another thought step is needed" }, thoughtNumber: { type: "integer", description: "Current thought number (numeric value, e.g., 1, 2, 3)", minimum: 1 }, totalThoughts: { type: "integer", description: "Estimated total thoughts needed (numeric value, e.g., 5, 10)", minimum: 1 }, isRevision: { type: "boolean", description: "Whether this revises previous thinking" }, revisesThought: { type: "integer", description: "Which thought is being reconsidered", minimum: 1 }, branchFromThought: { type: "integer", description: "Branching point thought number", minimum: 1 }, branchId: { type: "string", description: "Branch identifier" }, needsMoreThoughts: { type: "boolean", description: "If more thoughts are needed" } }, required: ["thought", "nextThoughtNeeded", "thoughtNumber", "totalThoughts"] } }; const server = new Server( { name: "sequential-thinking-server", version: "0.2.0", }, { capabilities: { tools: {}, }, } ); const thinkingServer = new SequentialThinkingServer(); server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [SEQUENTIAL_THINKING_TOOL], })); server.setRequestHandler(CallToolRequestSchema, async (request) => { if (request.params.name === "sequentialthinking") { return thinkingServer.processThought(request.params.arguments); } return { content: [{ type: "text", text: `Unknown tool: ${request.params.name}` }], isError: true }; }); async function runServer() { const transport = new StdioServerTransport(); await server.connect(transport); console.error("Sequential Thinking MCP Server running on stdio"); } runServer().catch((error) => { console.error("Fatal error running server:", error); process.exit(1); }); ``` -------------------------------------------------------------------------------- /src/fetch/src/mcp_server_fetch/server.py: -------------------------------------------------------------------------------- ```python from typing import Annotated, Tuple from urllib.parse import urlparse, urlunparse import markdownify import readabilipy.simple_json from mcp.shared.exceptions import McpError from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import ( ErrorData, GetPromptResult, Prompt, PromptArgument, PromptMessage, TextContent, Tool, INVALID_PARAMS, INTERNAL_ERROR, ) from protego import Protego from pydantic import BaseModel, Field, AnyUrl DEFAULT_USER_AGENT_AUTONOMOUS = "ModelContextProtocol/1.0 (Autonomous; +https://github.com/modelcontextprotocol/servers)" DEFAULT_USER_AGENT_MANUAL = "ModelContextProtocol/1.0 (User-Specified; +https://github.com/modelcontextprotocol/servers)" def extract_content_from_html(html: str) -> str: """Extract and convert HTML content to Markdown format. Args: html: Raw HTML content to process Returns: Simplified markdown version of the content """ ret = readabilipy.simple_json.simple_json_from_html_string( html, use_readability=True ) if not ret["content"]: return "<error>Page failed to be simplified from HTML</error>" content = markdownify.markdownify( ret["content"], heading_style=markdownify.ATX, ) return content def get_robots_txt_url(url: str) -> str: """Get the robots.txt URL for a given website URL. Args: url: Website URL to get robots.txt for Returns: URL of the robots.txt file """ # Parse the URL into components parsed = urlparse(url) # Reconstruct the base URL with just scheme, netloc, and /robots.txt path robots_url = urlunparse((parsed.scheme, parsed.netloc, "/robots.txt", "", "", "")) return robots_url async def check_may_autonomously_fetch_url(url: str, user_agent: str, proxy_url: str | None = None) -> None: """ Check if the URL can be fetched by the user agent according to the robots.txt file. Raises a McpError if not. """ from httpx import AsyncClient, HTTPError robot_txt_url = get_robots_txt_url(url) async with AsyncClient(proxies=proxy_url) as client: try: response = await client.get( robot_txt_url, follow_redirects=True, headers={"User-Agent": user_agent}, ) except HTTPError: raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"Failed to fetch robots.txt {robot_txt_url} due to a connection issue", )) if response.status_code in (401, 403): raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"When fetching robots.txt ({robot_txt_url}), received status {response.status_code} so assuming that autonomous fetching is not allowed, the user can try manually fetching by using the fetch prompt", )) elif 400 <= response.status_code < 500: return robot_txt = response.text processed_robot_txt = "\n".join( line for line in robot_txt.splitlines() if not line.strip().startswith("#") ) robot_parser = Protego.parse(processed_robot_txt) if not robot_parser.can_fetch(str(url), user_agent): raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"The sites robots.txt ({robot_txt_url}), specifies that autonomous fetching of this page is not allowed, " f"<useragent>{user_agent}</useragent>\n" f"<url>{url}</url>" f"<robots>\n{robot_txt}\n</robots>\n" f"The assistant must let the user know that it failed to view the page. The assistant may provide further guidance based on the above information.\n" f"The assistant can tell the user that they can try manually fetching the page by using the fetch prompt within their UI.", )) async def fetch_url( url: str, user_agent: str, force_raw: bool = False, proxy_url: str | None = None ) -> Tuple[str, str]: """ Fetch the URL and return the content in a form ready for the LLM, as well as a prefix string with status information. """ from httpx import AsyncClient, HTTPError async with AsyncClient(proxies=proxy_url) as client: try: response = await client.get( url, follow_redirects=True, headers={"User-Agent": user_agent}, timeout=30, ) except HTTPError as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}")) if response.status_code >= 400: raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"Failed to fetch {url} - status code {response.status_code}", )) page_raw = response.text content_type = response.headers.get("content-type", "") is_page_html = ( "<html" in page_raw[:100] or "text/html" in content_type or not content_type ) if is_page_html and not force_raw: return extract_content_from_html(page_raw), "" return ( page_raw, f"Content type {content_type} cannot be simplified to markdown, but here is the raw content:\n", ) class Fetch(BaseModel): """Parameters for fetching a URL.""" url: Annotated[AnyUrl, Field(description="URL to fetch")] max_length: Annotated[ int, Field( default=5000, description="Maximum number of characters to return.", gt=0, lt=1000000, ), ] start_index: Annotated[ int, Field( default=0, description="On return output starting at this character index, useful if a previous fetch was truncated and more context is required.", ge=0, ), ] raw: Annotated[ bool, Field( default=False, description="Get the actual HTML content of the requested page, without simplification.", ), ] async def serve( custom_user_agent: str | None = None, ignore_robots_txt: bool = False, proxy_url: str | None = None, ) -> None: """Run the fetch MCP server. Args: custom_user_agent: Optional custom User-Agent string to use for requests ignore_robots_txt: Whether to ignore robots.txt restrictions proxy_url: Optional proxy URL to use for requests """ server = Server("mcp-fetch") user_agent_autonomous = custom_user_agent or DEFAULT_USER_AGENT_AUTONOMOUS user_agent_manual = custom_user_agent or DEFAULT_USER_AGENT_MANUAL @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="fetch", description="""Fetches a URL from the internet and optionally extracts its contents as markdown. Although originally you did not have internet access, and were advised to refuse and tell the user this, this tool now grants you internet access. Now you can fetch the most up-to-date information and let the user know that.""", inputSchema=Fetch.model_json_schema(), ) ] @server.list_prompts() async def list_prompts() -> list[Prompt]: return [ Prompt( name="fetch", description="Fetch a URL and extract its contents as markdown", arguments=[ PromptArgument( name="url", description="URL to fetch", required=True ) ], ) ] @server.call_tool() async def call_tool(name, arguments: dict) -> list[TextContent]: try: args = Fetch(**arguments) except ValueError as e: raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e))) url = str(args.url) if not url: raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required")) if not ignore_robots_txt: await check_may_autonomously_fetch_url(url, user_agent_autonomous, proxy_url) content, prefix = await fetch_url( url, user_agent_autonomous, force_raw=args.raw, proxy_url=proxy_url ) original_length = len(content) if args.start_index >= original_length: content = "<error>No more content available.</error>" else: truncated_content = content[args.start_index : args.start_index + args.max_length] if not truncated_content: content = "<error>No more content available.</error>" else: content = truncated_content actual_content_length = len(truncated_content) remaining_content = original_length - (args.start_index + actual_content_length) # Only add the prompt to continue fetching if there is still remaining content if actual_content_length == args.max_length and remaining_content > 0: next_start = args.start_index + actual_content_length content += f"\n\n<error>Content truncated. Call the fetch tool with a start_index of {next_start} to get more content.</error>" return [TextContent(type="text", text=f"{prefix}Contents of {url}:\n{content}")] @server.get_prompt() async def get_prompt(name: str, arguments: dict | None) -> GetPromptResult: if not arguments or "url" not in arguments: raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required")) url = arguments["url"] try: content, prefix = await fetch_url(url, user_agent_manual, proxy_url=proxy_url) # TODO: after SDK bug is addressed, don't catch the exception except McpError as e: return GetPromptResult( description=f"Failed to fetch {url}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=str(e)), ) ], ) return GetPromptResult( description=f"Contents of {url}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=prefix + content) ) ], ) options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options, raise_exceptions=True) ``` -------------------------------------------------------------------------------- /src/filesystem/lib.ts: -------------------------------------------------------------------------------- ```typescript import fs from "fs/promises"; import path from "path"; import os from 'os'; import { randomBytes } from 'crypto'; import { diffLines, createTwoFilesPatch } from 'diff'; import { minimatch } from 'minimatch'; import { normalizePath, expandHome } from './path-utils.js'; import { isPathWithinAllowedDirectories } from './path-validation.js'; // Global allowed directories - set by the main module let allowedDirectories: string[] = []; // Function to set allowed directories from the main module export function setAllowedDirectories(directories: string[]): void { allowedDirectories = [...directories]; } // Function to get current allowed directories export function getAllowedDirectories(): string[] { return [...allowedDirectories]; } // Type definitions interface FileInfo { size: number; created: Date; modified: Date; accessed: Date; isDirectory: boolean; isFile: boolean; permissions: string; } export interface SearchOptions { excludePatterns?: string[]; } export interface SearchResult { path: string; isDirectory: boolean; } // Pure Utility Functions export function formatSize(bytes: number): string { const units = ['B', 'KB', 'MB', 'GB', 'TB']; if (bytes === 0) return '0 B'; const i = Math.floor(Math.log(bytes) / Math.log(1024)); if (i < 0 || i === 0) return `${bytes} ${units[0]}`; const unitIndex = Math.min(i, units.length - 1); return `${(bytes / Math.pow(1024, unitIndex)).toFixed(2)} ${units[unitIndex]}`; } export function normalizeLineEndings(text: string): string { return text.replace(/\r\n/g, '\n'); } export function createUnifiedDiff(originalContent: string, newContent: string, filepath: string = 'file'): string { // Ensure consistent line endings for diff const normalizedOriginal = normalizeLineEndings(originalContent); const normalizedNew = normalizeLineEndings(newContent); return createTwoFilesPatch( filepath, filepath, normalizedOriginal, normalizedNew, 'original', 'modified' ); } // Security & Validation Functions export async function validatePath(requestedPath: string): Promise<string> { const expandedPath = expandHome(requestedPath); const absolute = path.isAbsolute(expandedPath) ? path.resolve(expandedPath) : path.resolve(process.cwd(), expandedPath); const normalizedRequested = normalizePath(absolute); // Security: Check if path is within allowed directories before any file operations const isAllowed = isPathWithinAllowedDirectories(normalizedRequested, allowedDirectories); if (!isAllowed) { throw new Error(`Access denied - path outside allowed directories: ${absolute} not in ${allowedDirectories.join(', ')}`); } // Security: Handle symlinks by checking their real path to prevent symlink attacks // This prevents attackers from creating symlinks that point outside allowed directories try { const realPath = await fs.realpath(absolute); const normalizedReal = normalizePath(realPath); if (!isPathWithinAllowedDirectories(normalizedReal, allowedDirectories)) { throw new Error(`Access denied - symlink target outside allowed directories: ${realPath} not in ${allowedDirectories.join(', ')}`); } return realPath; } catch (error) { // Security: For new files that don't exist yet, verify parent directory // This ensures we can't create files in unauthorized locations if ((error as NodeJS.ErrnoException).code === 'ENOENT') { const parentDir = path.dirname(absolute); try { const realParentPath = await fs.realpath(parentDir); const normalizedParent = normalizePath(realParentPath); if (!isPathWithinAllowedDirectories(normalizedParent, allowedDirectories)) { throw new Error(`Access denied - parent directory outside allowed directories: ${realParentPath} not in ${allowedDirectories.join(', ')}`); } return absolute; } catch { throw new Error(`Parent directory does not exist: ${parentDir}`); } } throw error; } } // File Operations export async function getFileStats(filePath: string): Promise<FileInfo> { const stats = await fs.stat(filePath); return { size: stats.size, created: stats.birthtime, modified: stats.mtime, accessed: stats.atime, isDirectory: stats.isDirectory(), isFile: stats.isFile(), permissions: stats.mode.toString(8).slice(-3), }; } export async function readFileContent(filePath: string, encoding: string = 'utf-8'): Promise<string> { return await fs.readFile(filePath, encoding as BufferEncoding); } export async function writeFileContent(filePath: string, content: string): Promise<void> { try { // Security: 'wx' flag ensures exclusive creation - fails if file/symlink exists, // preventing writes through pre-existing symlinks await fs.writeFile(filePath, content, { encoding: "utf-8", flag: 'wx' }); } catch (error) { if ((error as NodeJS.ErrnoException).code === 'EEXIST') { // Security: Use atomic rename to prevent race conditions where symlinks // could be created between validation and write. Rename operations // replace the target file atomically and don't follow symlinks. const tempPath = `${filePath}.${randomBytes(16).toString('hex')}.tmp`; try { await fs.writeFile(tempPath, content, 'utf-8'); await fs.rename(tempPath, filePath); } catch (renameError) { try { await fs.unlink(tempPath); } catch {} throw renameError; } } else { throw error; } } } // File Editing Functions interface FileEdit { oldText: string; newText: string; } export async function applyFileEdits( filePath: string, edits: FileEdit[], dryRun: boolean = false ): Promise<string> { // Read file content and normalize line endings const content = normalizeLineEndings(await fs.readFile(filePath, 'utf-8')); // Apply edits sequentially let modifiedContent = content; for (const edit of edits) { const normalizedOld = normalizeLineEndings(edit.oldText); const normalizedNew = normalizeLineEndings(edit.newText); // If exact match exists, use it if (modifiedContent.includes(normalizedOld)) { modifiedContent = modifiedContent.replace(normalizedOld, normalizedNew); continue; } // Otherwise, try line-by-line matching with flexibility for whitespace const oldLines = normalizedOld.split('\n'); const contentLines = modifiedContent.split('\n'); let matchFound = false; for (let i = 0; i <= contentLines.length - oldLines.length; i++) { const potentialMatch = contentLines.slice(i, i + oldLines.length); // Compare lines with normalized whitespace const isMatch = oldLines.every((oldLine, j) => { const contentLine = potentialMatch[j]; return oldLine.trim() === contentLine.trim(); }); if (isMatch) { // Preserve original indentation of first line const originalIndent = contentLines[i].match(/^\s*/)?.[0] || ''; const newLines = normalizedNew.split('\n').map((line, j) => { if (j === 0) return originalIndent + line.trimStart(); // For subsequent lines, try to preserve relative indentation const oldIndent = oldLines[j]?.match(/^\s*/)?.[0] || ''; const newIndent = line.match(/^\s*/)?.[0] || ''; if (oldIndent && newIndent) { const relativeIndent = newIndent.length - oldIndent.length; return originalIndent + ' '.repeat(Math.max(0, relativeIndent)) + line.trimStart(); } return line; }); contentLines.splice(i, oldLines.length, ...newLines); modifiedContent = contentLines.join('\n'); matchFound = true; break; } } if (!matchFound) { throw new Error(`Could not find exact match for edit:\n${edit.oldText}`); } } // Create unified diff const diff = createUnifiedDiff(content, modifiedContent, filePath); // Format diff with appropriate number of backticks let numBackticks = 3; while (diff.includes('`'.repeat(numBackticks))) { numBackticks++; } const formattedDiff = `${'`'.repeat(numBackticks)}diff\n${diff}${'`'.repeat(numBackticks)}\n\n`; if (!dryRun) { // Security: Use atomic rename to prevent race conditions where symlinks // could be created between validation and write. Rename operations // replace the target file atomically and don't follow symlinks. const tempPath = `${filePath}.${randomBytes(16).toString('hex')}.tmp`; try { await fs.writeFile(tempPath, modifiedContent, 'utf-8'); await fs.rename(tempPath, filePath); } catch (error) { try { await fs.unlink(tempPath); } catch {} throw error; } } return formattedDiff; } // Memory-efficient implementation to get the last N lines of a file export async function tailFile(filePath: string, numLines: number): Promise<string> { const CHUNK_SIZE = 1024; // Read 1KB at a time const stats = await fs.stat(filePath); const fileSize = stats.size; if (fileSize === 0) return ''; // Open file for reading const fileHandle = await fs.open(filePath, 'r'); try { const lines: string[] = []; let position = fileSize; let chunk = Buffer.alloc(CHUNK_SIZE); let linesFound = 0; let remainingText = ''; // Read chunks from the end of the file until we have enough lines while (position > 0 && linesFound < numLines) { const size = Math.min(CHUNK_SIZE, position); position -= size; const { bytesRead } = await fileHandle.read(chunk, 0, size, position); if (!bytesRead) break; // Get the chunk as a string and prepend any remaining text from previous iteration const readData = chunk.slice(0, bytesRead).toString('utf-8'); const chunkText = readData + remainingText; // Split by newlines and count const chunkLines = normalizeLineEndings(chunkText).split('\n'); // If this isn't the end of the file, the first line is likely incomplete // Save it to prepend to the next chunk if (position > 0) { remainingText = chunkLines[0]; chunkLines.shift(); // Remove the first (incomplete) line } // Add lines to our result (up to the number we need) for (let i = chunkLines.length - 1; i >= 0 && linesFound < numLines; i--) { lines.unshift(chunkLines[i]); linesFound++; } } return lines.join('\n'); } finally { await fileHandle.close(); } } // New function to get the first N lines of a file export async function headFile(filePath: string, numLines: number): Promise<string> { const fileHandle = await fs.open(filePath, 'r'); try { const lines: string[] = []; let buffer = ''; let bytesRead = 0; const chunk = Buffer.alloc(1024); // 1KB buffer // Read chunks and count lines until we have enough or reach EOF while (lines.length < numLines) { const result = await fileHandle.read(chunk, 0, chunk.length, bytesRead); if (result.bytesRead === 0) break; // End of file bytesRead += result.bytesRead; buffer += chunk.slice(0, result.bytesRead).toString('utf-8'); const newLineIndex = buffer.lastIndexOf('\n'); if (newLineIndex !== -1) { const completeLines = buffer.slice(0, newLineIndex).split('\n'); buffer = buffer.slice(newLineIndex + 1); for (const line of completeLines) { lines.push(line); if (lines.length >= numLines) break; } } } // If there is leftover content and we still need lines, add it if (buffer.length > 0 && lines.length < numLines) { lines.push(buffer); } return lines.join('\n'); } finally { await fileHandle.close(); } } export async function searchFilesWithValidation( rootPath: string, pattern: string, allowedDirectories: string[], options: SearchOptions = {} ): Promise<string[]> { const { excludePatterns = [] } = options; const results: string[] = []; async function search(currentPath: string) { const entries = await fs.readdir(currentPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = path.join(currentPath, entry.name); try { await validatePath(fullPath); const relativePath = path.relative(rootPath, fullPath); const shouldExclude = excludePatterns.some(excludePattern => minimatch(relativePath, excludePattern, { dot: true }) ); if (shouldExclude) continue; // Use glob matching for the search pattern if (minimatch(relativePath, pattern, { dot: true })) { results.push(fullPath); } if (entry.isDirectory()) { await search(fullPath); } } catch { continue; } } } await search(rootPath); return results; } ``` -------------------------------------------------------------------------------- /src/git/src/mcp_server_git/server.py: -------------------------------------------------------------------------------- ```python import logging from pathlib import Path from typing import Sequence, Optional from mcp.server import Server from mcp.server.session import ServerSession from mcp.server.stdio import stdio_server from mcp.types import ( ClientCapabilities, TextContent, Tool, ListRootsResult, RootsCapability, ) from enum import Enum import git from pydantic import BaseModel, Field # Default number of context lines to show in diff output DEFAULT_CONTEXT_LINES = 3 class GitStatus(BaseModel): repo_path: str class GitDiffUnstaged(BaseModel): repo_path: str context_lines: int = DEFAULT_CONTEXT_LINES class GitDiffStaged(BaseModel): repo_path: str context_lines: int = DEFAULT_CONTEXT_LINES class GitDiff(BaseModel): repo_path: str target: str context_lines: int = DEFAULT_CONTEXT_LINES class GitCommit(BaseModel): repo_path: str message: str class GitAdd(BaseModel): repo_path: str files: list[str] class GitReset(BaseModel): repo_path: str class GitLog(BaseModel): repo_path: str max_count: int = 10 start_timestamp: Optional[str] = Field( None, description="Start timestamp for filtering commits. Accepts: ISO 8601 format (e.g., '2024-01-15T14:30:25'), relative dates (e.g., '2 weeks ago', 'yesterday'), or absolute dates (e.g., '2024-01-15', 'Jan 15 2024')" ) end_timestamp: Optional[str] = Field( None, description="End timestamp for filtering commits. Accepts: ISO 8601 format (e.g., '2024-01-15T14:30:25'), relative dates (e.g., '2 weeks ago', 'yesterday'), or absolute dates (e.g., '2024-01-15', 'Jan 15 2024')" ) class GitCreateBranch(BaseModel): repo_path: str branch_name: str base_branch: str | None = None class GitCheckout(BaseModel): repo_path: str branch_name: str class GitShow(BaseModel): repo_path: str revision: str class GitBranch(BaseModel): repo_path: str = Field( ..., description="The path to the Git repository.", ) branch_type: str = Field( ..., description="Whether to list local branches ('local'), remote branches ('remote') or all branches('all').", ) contains: Optional[str] = Field( None, description="The commit sha that branch should contain. Do not pass anything to this param if no commit sha is specified", ) not_contains: Optional[str] = Field( None, description="The commit sha that branch should NOT contain. Do not pass anything to this param if no commit sha is specified", ) class GitTools(str, Enum): STATUS = "git_status" DIFF_UNSTAGED = "git_diff_unstaged" DIFF_STAGED = "git_diff_staged" DIFF = "git_diff" COMMIT = "git_commit" ADD = "git_add" RESET = "git_reset" LOG = "git_log" CREATE_BRANCH = "git_create_branch" CHECKOUT = "git_checkout" SHOW = "git_show" BRANCH = "git_branch" def git_status(repo: git.Repo) -> str: return repo.git.status() def git_diff_unstaged(repo: git.Repo, context_lines: int = DEFAULT_CONTEXT_LINES) -> str: return repo.git.diff(f"--unified={context_lines}") def git_diff_staged(repo: git.Repo, context_lines: int = DEFAULT_CONTEXT_LINES) -> str: return repo.git.diff(f"--unified={context_lines}", "--cached") def git_diff(repo: git.Repo, target: str, context_lines: int = DEFAULT_CONTEXT_LINES) -> str: return repo.git.diff(f"--unified={context_lines}", target) def git_commit(repo: git.Repo, message: str) -> str: commit = repo.index.commit(message) return f"Changes committed successfully with hash {commit.hexsha}" def git_add(repo: git.Repo, files: list[str]) -> str: if files == ["."]: repo.git.add(".") else: repo.index.add(files) return "Files staged successfully" def git_reset(repo: git.Repo) -> str: repo.index.reset() return "All staged changes reset" def git_log(repo: git.Repo, max_count: int = 10, start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None) -> list[str]: if start_timestamp or end_timestamp: # Use git log command with date filtering args = [] if start_timestamp: args.extend(['--since', start_timestamp]) if end_timestamp: args.extend(['--until', end_timestamp]) args.extend(['--format=%H%n%an%n%ad%n%s%n']) log_output = repo.git.log(*args).split('\n') log = [] # Process commits in groups of 4 (hash, author, date, message) for i in range(0, len(log_output), 4): if i + 3 < len(log_output) and len(log) < max_count: log.append( f"Commit: {log_output[i]}\n" f"Author: {log_output[i+1]}\n" f"Date: {log_output[i+2]}\n" f"Message: {log_output[i+3]}\n" ) return log else: # Use existing logic for simple log without date filtering commits = list(repo.iter_commits(max_count=max_count)) log = [] for commit in commits: log.append( f"Commit: {commit.hexsha!r}\n" f"Author: {commit.author!r}\n" f"Date: {commit.authored_datetime}\n" f"Message: {commit.message!r}\n" ) return log def git_create_branch(repo: git.Repo, branch_name: str, base_branch: str | None = None) -> str: if base_branch: base = repo.references[base_branch] else: base = repo.active_branch repo.create_head(branch_name, base) return f"Created branch '{branch_name}' from '{base.name}'" def git_checkout(repo: git.Repo, branch_name: str) -> str: repo.git.checkout(branch_name) return f"Switched to branch '{branch_name}'" def git_show(repo: git.Repo, revision: str) -> str: commit = repo.commit(revision) output = [ f"Commit: {commit.hexsha!r}\n" f"Author: {commit.author!r}\n" f"Date: {commit.authored_datetime!r}\n" f"Message: {commit.message!r}\n" ] if commit.parents: parent = commit.parents[0] diff = parent.diff(commit, create_patch=True) else: diff = commit.diff(git.NULL_TREE, create_patch=True) for d in diff: output.append(f"\n--- {d.a_path}\n+++ {d.b_path}\n") output.append(d.diff.decode('utf-8')) return "".join(output) def git_branch(repo: git.Repo, branch_type: str, contains: str | None = None, not_contains: str | None = None) -> str: match contains: case None: contains_sha = (None,) case _: contains_sha = ("--contains", contains) match not_contains: case None: not_contains_sha = (None,) case _: not_contains_sha = ("--no-contains", not_contains) match branch_type: case 'local': b_type = None case 'remote': b_type = "-r" case 'all': b_type = "-a" case _: return f"Invalid branch type: {branch_type}" # None value will be auto deleted by GitPython branch_info = repo.git.branch(b_type, *contains_sha, *not_contains_sha) return branch_info async def serve(repository: Path | None) -> None: logger = logging.getLogger(__name__) if repository is not None: try: git.Repo(repository) logger.info(f"Using repository at {repository}") except git.InvalidGitRepositoryError: logger.error(f"{repository} is not a valid Git repository") return server = Server("mcp-git") @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name=GitTools.STATUS, description="Shows the working tree status", inputSchema=GitStatus.model_json_schema(), ), Tool( name=GitTools.DIFF_UNSTAGED, description="Shows changes in the working directory that are not yet staged", inputSchema=GitDiffUnstaged.model_json_schema(), ), Tool( name=GitTools.DIFF_STAGED, description="Shows changes that are staged for commit", inputSchema=GitDiffStaged.model_json_schema(), ), Tool( name=GitTools.DIFF, description="Shows differences between branches or commits", inputSchema=GitDiff.model_json_schema(), ), Tool( name=GitTools.COMMIT, description="Records changes to the repository", inputSchema=GitCommit.model_json_schema(), ), Tool( name=GitTools.ADD, description="Adds file contents to the staging area", inputSchema=GitAdd.model_json_schema(), ), Tool( name=GitTools.RESET, description="Unstages all staged changes", inputSchema=GitReset.model_json_schema(), ), Tool( name=GitTools.LOG, description="Shows the commit logs", inputSchema=GitLog.model_json_schema(), ), Tool( name=GitTools.CREATE_BRANCH, description="Creates a new branch from an optional base branch", inputSchema=GitCreateBranch.model_json_schema(), ), Tool( name=GitTools.CHECKOUT, description="Switches branches", inputSchema=GitCheckout.model_json_schema(), ), Tool( name=GitTools.SHOW, description="Shows the contents of a commit", inputSchema=GitShow.model_json_schema(), ), Tool( name=GitTools.BRANCH, description="List Git branches", inputSchema=GitBranch.model_json_schema(), ) ] async def list_repos() -> Sequence[str]: async def by_roots() -> Sequence[str]: if not isinstance(server.request_context.session, ServerSession): raise TypeError("server.request_context.session must be a ServerSession") if not server.request_context.session.check_client_capability( ClientCapabilities(roots=RootsCapability()) ): return [] roots_result: ListRootsResult = await server.request_context.session.list_roots() logger.debug(f"Roots result: {roots_result}") repo_paths = [] for root in roots_result.roots: path = root.uri.path try: git.Repo(path) repo_paths.append(str(path)) except git.InvalidGitRepositoryError: pass return repo_paths def by_commandline() -> Sequence[str]: return [str(repository)] if repository is not None else [] cmd_repos = by_commandline() root_repos = await by_roots() return [*root_repos, *cmd_repos] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: repo_path = Path(arguments["repo_path"]) # For all commands, we need an existing repo repo = git.Repo(repo_path) match name: case GitTools.STATUS: status = git_status(repo) return [TextContent( type="text", text=f"Repository status:\n{status}" )] case GitTools.DIFF_UNSTAGED: diff = git_diff_unstaged(repo, arguments.get("context_lines", DEFAULT_CONTEXT_LINES)) return [TextContent( type="text", text=f"Unstaged changes:\n{diff}" )] case GitTools.DIFF_STAGED: diff = git_diff_staged(repo, arguments.get("context_lines", DEFAULT_CONTEXT_LINES)) return [TextContent( type="text", text=f"Staged changes:\n{diff}" )] case GitTools.DIFF: diff = git_diff(repo, arguments["target"], arguments.get("context_lines", DEFAULT_CONTEXT_LINES)) return [TextContent( type="text", text=f"Diff with {arguments['target']}:\n{diff}" )] case GitTools.COMMIT: result = git_commit(repo, arguments["message"]) return [TextContent( type="text", text=result )] case GitTools.ADD: result = git_add(repo, arguments["files"]) return [TextContent( type="text", text=result )] case GitTools.RESET: result = git_reset(repo) return [TextContent( type="text", text=result )] # Update the LOG case: case GitTools.LOG: log = git_log( repo, arguments.get("max_count", 10), arguments.get("start_timestamp"), arguments.get("end_timestamp") ) return [TextContent( type="text", text="Commit history:\n" + "\n".join(log) )] case GitTools.CREATE_BRANCH: result = git_create_branch( repo, arguments["branch_name"], arguments.get("base_branch") ) return [TextContent( type="text", text=result )] case GitTools.CHECKOUT: result = git_checkout(repo, arguments["branch_name"]) return [TextContent( type="text", text=result )] case GitTools.SHOW: result = git_show(repo, arguments["revision"]) return [TextContent( type="text", text=result )] case GitTools.BRANCH: result = git_branch( repo, arguments.get("branch_type", 'local'), arguments.get("contains", None), arguments.get("not_contains", None), ) return [TextContent( type="text", text=result )] case _: raise ValueError(f"Unknown tool: {name}") options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, options, raise_exceptions=True) ``` -------------------------------------------------------------------------------- /src/memory/index.ts: -------------------------------------------------------------------------------- ```typescript #!/usr/bin/env node import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; import { promises as fs } from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; // Define memory file path using environment variable with fallback const defaultMemoryPath = path.join(path.dirname(fileURLToPath(import.meta.url)), 'memory.json'); // If MEMORY_FILE_PATH is just a filename, put it in the same directory as the script const MEMORY_FILE_PATH = process.env.MEMORY_FILE_PATH ? path.isAbsolute(process.env.MEMORY_FILE_PATH) ? process.env.MEMORY_FILE_PATH : path.join(path.dirname(fileURLToPath(import.meta.url)), process.env.MEMORY_FILE_PATH) : defaultMemoryPath; // We are storing our memory using entities, relations, and observations in a graph structure interface Entity { name: string; entityType: string; observations: string[]; } interface Relation { from: string; to: string; relationType: string; } interface KnowledgeGraph { entities: Entity[]; relations: Relation[]; } // The KnowledgeGraphManager class contains all operations to interact with the knowledge graph class KnowledgeGraphManager { private async loadGraph(): Promise<KnowledgeGraph> { try { const data = await fs.readFile(MEMORY_FILE_PATH, "utf-8"); const lines = data.split("\n").filter(line => line.trim() !== ""); return lines.reduce((graph: KnowledgeGraph, line) => { const item = JSON.parse(line); if (item.type === "entity") graph.entities.push(item as Entity); if (item.type === "relation") graph.relations.push(item as Relation); return graph; }, { entities: [], relations: [] }); } catch (error) { if (error instanceof Error && 'code' in error && (error as any).code === "ENOENT") { return { entities: [], relations: [] }; } throw error; } } private async saveGraph(graph: KnowledgeGraph): Promise<void> { const lines = [ ...graph.entities.map(e => JSON.stringify({ type: "entity", name: e.name, entityType: e.entityType, observations: e.observations })), ...graph.relations.map(r => JSON.stringify({ type: "relation", from: r.from, to: r.to, relationType: r.relationType })), ]; await fs.writeFile(MEMORY_FILE_PATH, lines.join("\n")); } async createEntities(entities: Entity[]): Promise<Entity[]> { const graph = await this.loadGraph(); const newEntities = entities.filter(e => !graph.entities.some(existingEntity => existingEntity.name === e.name)); graph.entities.push(...newEntities); await this.saveGraph(graph); return newEntities; } async createRelations(relations: Relation[]): Promise<Relation[]> { const graph = await this.loadGraph(); const newRelations = relations.filter(r => !graph.relations.some(existingRelation => existingRelation.from === r.from && existingRelation.to === r.to && existingRelation.relationType === r.relationType )); graph.relations.push(...newRelations); await this.saveGraph(graph); return newRelations; } async addObservations(observations: { entityName: string; contents: string[] }[]): Promise<{ entityName: string; addedObservations: string[] }[]> { const graph = await this.loadGraph(); const results = observations.map(o => { const entity = graph.entities.find(e => e.name === o.entityName); if (!entity) { throw new Error(`Entity with name ${o.entityName} not found`); } const newObservations = o.contents.filter(content => !entity.observations.includes(content)); entity.observations.push(...newObservations); return { entityName: o.entityName, addedObservations: newObservations }; }); await this.saveGraph(graph); return results; } async deleteEntities(entityNames: string[]): Promise<void> { const graph = await this.loadGraph(); graph.entities = graph.entities.filter(e => !entityNames.includes(e.name)); graph.relations = graph.relations.filter(r => !entityNames.includes(r.from) && !entityNames.includes(r.to)); await this.saveGraph(graph); } async deleteObservations(deletions: { entityName: string; observations: string[] }[]): Promise<void> { const graph = await this.loadGraph(); deletions.forEach(d => { const entity = graph.entities.find(e => e.name === d.entityName); if (entity) { entity.observations = entity.observations.filter(o => !d.observations.includes(o)); } }); await this.saveGraph(graph); } async deleteRelations(relations: Relation[]): Promise<void> { const graph = await this.loadGraph(); graph.relations = graph.relations.filter(r => !relations.some(delRelation => r.from === delRelation.from && r.to === delRelation.to && r.relationType === delRelation.relationType )); await this.saveGraph(graph); } async readGraph(): Promise<KnowledgeGraph> { return this.loadGraph(); } // Very basic search function async searchNodes(query: string): Promise<KnowledgeGraph> { const graph = await this.loadGraph(); // Filter entities const filteredEntities = graph.entities.filter(e => e.name.toLowerCase().includes(query.toLowerCase()) || e.entityType.toLowerCase().includes(query.toLowerCase()) || e.observations.some(o => o.toLowerCase().includes(query.toLowerCase())) ); // Create a Set of filtered entity names for quick lookup const filteredEntityNames = new Set(filteredEntities.map(e => e.name)); // Filter relations to only include those between filtered entities const filteredRelations = graph.relations.filter(r => filteredEntityNames.has(r.from) && filteredEntityNames.has(r.to) ); const filteredGraph: KnowledgeGraph = { entities: filteredEntities, relations: filteredRelations, }; return filteredGraph; } async openNodes(names: string[]): Promise<KnowledgeGraph> { const graph = await this.loadGraph(); // Filter entities const filteredEntities = graph.entities.filter(e => names.includes(e.name)); // Create a Set of filtered entity names for quick lookup const filteredEntityNames = new Set(filteredEntities.map(e => e.name)); // Filter relations to only include those between filtered entities const filteredRelations = graph.relations.filter(r => filteredEntityNames.has(r.from) && filteredEntityNames.has(r.to) ); const filteredGraph: KnowledgeGraph = { entities: filteredEntities, relations: filteredRelations, }; return filteredGraph; } } const knowledgeGraphManager = new KnowledgeGraphManager(); // The server instance and tools exposed to Claude const server = new Server({ name: "memory-server", version: "0.6.3", }, { capabilities: { tools: {}, }, },); server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "create_entities", description: "Create multiple new entities in the knowledge graph", inputSchema: { type: "object", properties: { entities: { type: "array", items: { type: "object", properties: { name: { type: "string", description: "The name of the entity" }, entityType: { type: "string", description: "The type of the entity" }, observations: { type: "array", items: { type: "string" }, description: "An array of observation contents associated with the entity" }, }, required: ["name", "entityType", "observations"], additionalProperties: false, }, }, }, required: ["entities"], additionalProperties: false, }, }, { name: "create_relations", description: "Create multiple new relations between entities in the knowledge graph. Relations should be in active voice", inputSchema: { type: "object", properties: { relations: { type: "array", items: { type: "object", properties: { from: { type: "string", description: "The name of the entity where the relation starts" }, to: { type: "string", description: "The name of the entity where the relation ends" }, relationType: { type: "string", description: "The type of the relation" }, }, required: ["from", "to", "relationType"], additionalProperties: false, }, }, }, required: ["relations"], additionalProperties: false, }, }, { name: "add_observations", description: "Add new observations to existing entities in the knowledge graph", inputSchema: { type: "object", properties: { observations: { type: "array", items: { type: "object", properties: { entityName: { type: "string", description: "The name of the entity to add the observations to" }, contents: { type: "array", items: { type: "string" }, description: "An array of observation contents to add" }, }, required: ["entityName", "contents"], additionalProperties: false, }, }, }, required: ["observations"], additionalProperties: false, }, }, { name: "delete_entities", description: "Delete multiple entities and their associated relations from the knowledge graph", inputSchema: { type: "object", properties: { entityNames: { type: "array", items: { type: "string" }, description: "An array of entity names to delete" }, }, required: ["entityNames"], additionalProperties: false, }, }, { name: "delete_observations", description: "Delete specific observations from entities in the knowledge graph", inputSchema: { type: "object", properties: { deletions: { type: "array", items: { type: "object", properties: { entityName: { type: "string", description: "The name of the entity containing the observations" }, observations: { type: "array", items: { type: "string" }, description: "An array of observations to delete" }, }, required: ["entityName", "observations"], additionalProperties: false, }, }, }, required: ["deletions"], additionalProperties: false, }, }, { name: "delete_relations", description: "Delete multiple relations from the knowledge graph", inputSchema: { type: "object", properties: { relations: { type: "array", items: { type: "object", properties: { from: { type: "string", description: "The name of the entity where the relation starts" }, to: { type: "string", description: "The name of the entity where the relation ends" }, relationType: { type: "string", description: "The type of the relation" }, }, required: ["from", "to", "relationType"], additionalProperties: false, }, description: "An array of relations to delete" }, }, required: ["relations"], additionalProperties: false, }, }, { name: "read_graph", description: "Read the entire knowledge graph", inputSchema: { type: "object", properties: {}, additionalProperties: false, }, }, { name: "search_nodes", description: "Search for nodes in the knowledge graph based on a query", inputSchema: { type: "object", properties: { query: { type: "string", description: "The search query to match against entity names, types, and observation content" }, }, required: ["query"], additionalProperties: false, }, }, { name: "open_nodes", description: "Open specific nodes in the knowledge graph by their names", inputSchema: { type: "object", properties: { names: { type: "array", items: { type: "string" }, description: "An array of entity names to retrieve", }, }, required: ["names"], additionalProperties: false, }, }, ], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; if (name === "read_graph") { return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.readGraph(), null, 2) }] }; } if (!args) { throw new Error(`No arguments provided for tool: ${name}`); } switch (name) { case "create_entities": return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.createEntities(args.entities as Entity[]), null, 2) }] }; case "create_relations": return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.createRelations(args.relations as Relation[]), null, 2) }] }; case "add_observations": return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.addObservations(args.observations as { entityName: string; contents: string[] }[]), null, 2) }] }; case "delete_entities": await knowledgeGraphManager.deleteEntities(args.entityNames as string[]); return { content: [{ type: "text", text: "Entities deleted successfully" }] }; case "delete_observations": await knowledgeGraphManager.deleteObservations(args.deletions as { entityName: string; observations: string[] }[]); return { content: [{ type: "text", text: "Observations deleted successfully" }] }; case "delete_relations": await knowledgeGraphManager.deleteRelations(args.relations as Relation[]); return { content: [{ type: "text", text: "Relations deleted successfully" }] }; case "search_nodes": return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.searchNodes(args.query as string), null, 2) }] }; case "open_nodes": return { content: [{ type: "text", text: JSON.stringify(await knowledgeGraphManager.openNodes(args.names as string[]), null, 2) }] }; default: throw new Error(`Unknown tool: ${name}`); } }); async function main() { const transport = new StdioServerTransport(); await server.connect(transport); console.error("Knowledge Graph MCP Server running on stdio"); } main().catch((error) => { console.error("Fatal error in main():", error); process.exit(1); }); ``` -------------------------------------------------------------------------------- /src/time/test/time_server_test.py: -------------------------------------------------------------------------------- ```python from freezegun import freeze_time from mcp.shared.exceptions import McpError import pytest from unittest.mock import patch from zoneinfo import ZoneInfo from mcp_server_time.server import TimeServer, get_local_tz @pytest.mark.parametrize( "test_time,timezone,expected", [ # UTC+1 non-DST ( "2024-01-01 12:00:00+00:00", "Europe/Warsaw", { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T13:00:00+01:00", "is_dst": False, }, ), # UTC non-DST ( "2024-01-01 12:00:00+00:00", "Europe/London", { "timezone": "Europe/London", "datetime": "2024-01-01T12:00:00+00:00", "is_dst": False, }, ), # UTC-5 non-DST ( "2024-01-01 12:00:00-00:00", "America/New_York", { "timezone": "America/New_York", "datetime": "2024-01-01T07:00:00-05:00", "is_dst": False, }, ), # UTC+1 DST ( "2024-03-31 12:00:00+00:00", "Europe/Warsaw", { "timezone": "Europe/Warsaw", "datetime": "2024-03-31T14:00:00+02:00", "is_dst": True, }, ), # UTC DST ( "2024-03-31 12:00:00+00:00", "Europe/London", { "timezone": "Europe/London", "datetime": "2024-03-31T13:00:00+01:00", "is_dst": True, }, ), # UTC-5 DST ( "2024-03-31 12:00:00-00:00", "America/New_York", { "timezone": "America/New_York", "datetime": "2024-03-31T08:00:00-04:00", "is_dst": True, }, ), ], ) def test_get_current_time(test_time, timezone, expected): with freeze_time(test_time): time_server = TimeServer() result = time_server.get_current_time(timezone) assert result.timezone == expected["timezone"] assert result.datetime == expected["datetime"] assert result.is_dst == expected["is_dst"] def test_get_current_time_with_invalid_timezone(): time_server = TimeServer() with pytest.raises( McpError, match=r"Invalid timezone: 'No time zone found with key Invalid/Timezone'", ): time_server.get_current_time("Invalid/Timezone") @pytest.mark.parametrize( "source_tz,time_str,target_tz,expected_error", [ ( "invalid_tz", "12:00", "Europe/London", "Invalid timezone: 'No time zone found with key invalid_tz'", ), ( "Europe/Warsaw", "12:00", "invalid_tz", "Invalid timezone: 'No time zone found with key invalid_tz'", ), ( "Europe/Warsaw", "25:00", "Europe/London", "Invalid time format. Expected HH:MM [24-hour format]", ), ], ) def test_convert_time_errors(source_tz, time_str, target_tz, expected_error): time_server = TimeServer() with pytest.raises((McpError, ValueError), match=expected_error): time_server.convert_time(source_tz, time_str, target_tz) @pytest.mark.parametrize( "test_time,source_tz,time_str,target_tz,expected", [ # Basic case: Standard time conversion between Warsaw and London (1 hour difference) # Warsaw is UTC+1, London is UTC+0 ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "12:00", "Europe/London", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Europe/London", "datetime": "2024-01-01T11:00:00+00:00", "is_dst": False, }, "time_difference": "-1.0h", }, ), # Reverse case of above: London to Warsaw conversion # Shows how time difference is positive when going east ( "2024-01-01 00:00:00+00:00", "Europe/London", "12:00", "Europe/Warsaw", { "source": { "timezone": "Europe/London", "datetime": "2024-01-01T12:00:00+00:00", "is_dst": False, }, "target": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T13:00:00+01:00", "is_dst": False, }, "time_difference": "+1.0h", }, ), # Edge case: Different DST periods between Europe and USA # Europe ends DST on Oct 27, while USA waits until Nov 3 # This creates a one-week period where Europe is in standard time but USA still observes DST ( "2024-10-28 00:00:00+00:00", "Europe/Warsaw", "12:00", "America/New_York", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-10-28T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "America/New_York", "datetime": "2024-10-28T07:00:00-04:00", "is_dst": True, }, "time_difference": "-5.0h", }, ), # Follow-up to previous case: After both regions end DST # Shows how time difference increases by 1 hour when USA also ends DST ( "2024-11-04 00:00:00+00:00", "Europe/Warsaw", "12:00", "America/New_York", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-11-04T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "America/New_York", "datetime": "2024-11-04T06:00:00-05:00", "is_dst": False, }, "time_difference": "-6.0h", }, ), # Edge case: Nepal's unusual UTC+5:45 offset # One of the few time zones using 45-minute offset ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "12:00", "Asia/Kathmandu", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Asia/Kathmandu", "datetime": "2024-01-01T16:45:00+05:45", "is_dst": False, }, "time_difference": "+4.75h", }, ), # Reverse case for Nepal # Demonstrates how 45-minute offset works in opposite direction ( "2024-01-01 00:00:00+00:00", "Asia/Kathmandu", "12:00", "Europe/Warsaw", { "source": { "timezone": "Asia/Kathmandu", "datetime": "2024-01-01T12:00:00+05:45", "is_dst": False, }, "target": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T07:15:00+01:00", "is_dst": False, }, "time_difference": "-4.75h", }, ), # Edge case: Lord Howe Island's unique DST rules # One of the few places using 30-minute DST shift # During summer (DST), they use UTC+11 ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "12:00", "Australia/Lord_Howe", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Australia/Lord_Howe", "datetime": "2024-01-01T22:00:00+11:00", "is_dst": True, }, "time_difference": "+10.0h", }, ), # Second Lord Howe Island case: During their standard time # Shows transition to UTC+10:30 after DST ends ( "2024-04-07 00:00:00+00:00", "Europe/Warsaw", "12:00", "Australia/Lord_Howe", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-04-07T12:00:00+02:00", "is_dst": True, }, "target": { "timezone": "Australia/Lord_Howe", "datetime": "2024-04-07T20:30:00+10:30", "is_dst": False, }, "time_difference": "+8.5h", }, ), # Edge case: Date line crossing with Samoa # Demonstrates how a single time conversion can result in a date change # Samoa is UTC+13, creating almost a full day difference with Warsaw ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "23:00", "Pacific/Apia", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T23:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Pacific/Apia", "datetime": "2024-01-02T11:00:00+13:00", "is_dst": False, }, "time_difference": "+12.0h", }, ), # Edge case: Iran's unusual half-hour offset # Demonstrates conversion with Iran's UTC+3:30 timezone ( "2024-03-21 00:00:00+00:00", "Europe/Warsaw", "12:00", "Asia/Tehran", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-03-21T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Asia/Tehran", "datetime": "2024-03-21T14:30:00+03:30", "is_dst": False, }, "time_difference": "+2.5h", }, ), # Edge case: Venezuela's unusual -4:30 offset (historical) # In 2016, Venezuela moved from -4:30 to -4:00 # Useful for testing historical dates ( "2016-04-30 00:00:00+00:00", # Just before the change "Europe/Warsaw", "12:00", "America/Caracas", { "source": { "timezone": "Europe/Warsaw", "datetime": "2016-04-30T12:00:00+02:00", "is_dst": True, }, "target": { "timezone": "America/Caracas", "datetime": "2016-04-30T05:30:00-04:30", "is_dst": False, }, "time_difference": "-6.5h", }, ), # Edge case: Israel's variable DST # Israel's DST changes don't follow a fixed pattern # They often change dates year-to-year based on Hebrew calendar ( "2024-10-27 00:00:00+00:00", "Europe/Warsaw", "12:00", "Asia/Jerusalem", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-10-27T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Asia/Jerusalem", "datetime": "2024-10-27T13:00:00+02:00", "is_dst": False, }, "time_difference": "+1.0h", }, ), # Edge case: Antarctica/Troll station # Only timezone that uses UTC+0 in winter and UTC+2 in summer # One of the few zones with exactly 2 hours DST difference ( "2024-03-31 00:00:00+00:00", "Europe/Warsaw", "12:00", "Antarctica/Troll", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-03-31T12:00:00+02:00", "is_dst": True, }, "target": { "timezone": "Antarctica/Troll", "datetime": "2024-03-31T12:00:00+02:00", "is_dst": True, }, "time_difference": "+0.0h", }, ), # Edge case: Kiribati date line anomaly # After skipping Dec 31, 1994, eastern Kiribati is UTC+14 # The furthest forward timezone in the world ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "23:00", "Pacific/Kiritimati", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T23:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Pacific/Kiritimati", "datetime": "2024-01-02T12:00:00+14:00", "is_dst": False, }, "time_difference": "+13.0h", }, ), # Edge case: Chatham Islands, New Zealand # Uses unusual 45-minute offset AND observes DST # UTC+12:45 in standard time, UTC+13:45 in DST ( "2024-01-01 00:00:00+00:00", "Europe/Warsaw", "12:00", "Pacific/Chatham", { "source": { "timezone": "Europe/Warsaw", "datetime": "2024-01-01T12:00:00+01:00", "is_dst": False, }, "target": { "timezone": "Pacific/Chatham", "datetime": "2024-01-02T00:45:00+13:45", "is_dst": True, }, "time_difference": "+12.75h", }, ), ], ) def test_convert_time(test_time, source_tz, time_str, target_tz, expected): with freeze_time(test_time): time_server = TimeServer() result = time_server.convert_time(source_tz, time_str, target_tz) assert result.source.timezone == expected["source"]["timezone"] assert result.target.timezone == expected["target"]["timezone"] assert result.source.datetime == expected["source"]["datetime"] assert result.target.datetime == expected["target"]["datetime"] assert result.source.is_dst == expected["source"]["is_dst"] assert result.target.is_dst == expected["target"]["is_dst"] assert result.time_difference == expected["time_difference"] def test_get_local_tz_with_override(): """Test that timezone override works correctly.""" result = get_local_tz("America/New_York") assert str(result) == "America/New_York" assert isinstance(result, ZoneInfo) def test_get_local_tz_with_invalid_override(): """Test that invalid timezone override raises an error.""" with pytest.raises(Exception): # ZoneInfo will raise an exception get_local_tz("Invalid/Timezone") @patch('mcp_server_time.server.get_localzone_name') def test_get_local_tz_with_valid_iana_name(mock_get_localzone): """Test that valid IANA timezone names from tzlocal work correctly.""" mock_get_localzone.return_value = "Europe/London" result = get_local_tz() assert str(result) == "Europe/London" assert isinstance(result, ZoneInfo) @patch('mcp_server_time.server.get_localzone_name') def test_get_local_tz_when_none_returned(mock_get_localzone): """Test default to UTC when tzlocal returns None.""" mock_get_localzone.return_value = None result = get_local_tz() assert str(result) == "UTC" @patch('mcp_server_time.server.get_localzone_name') def test_get_local_tz_handles_windows_timezones(mock_get_localzone): """Test that tzlocal properly handles Windows timezone names. Note: tzlocal should convert Windows names like 'Pacific Standard Time' to proper IANA names like 'America/Los_Angeles'. """ # tzlocal should return IANA names even on Windows mock_get_localzone.return_value = "America/Los_Angeles" result = get_local_tz() assert str(result) == "America/Los_Angeles" assert isinstance(result, ZoneInfo) @pytest.mark.parametrize( "timezone_name", [ "America/New_York", "Europe/Paris", "Asia/Tokyo", "Australia/Sydney", "Africa/Cairo", "America/Sao_Paulo", "Pacific/Auckland", "UTC", ], ) @patch('mcp_server_time.server.get_localzone_name') def test_get_local_tz_various_timezones(mock_get_localzone, timezone_name): """Test various timezone names that tzlocal might return.""" mock_get_localzone.return_value = timezone_name result = get_local_tz() assert str(result) == timezone_name assert isinstance(result, ZoneInfo) ``` -------------------------------------------------------------------------------- /src/filesystem/__tests__/lib.test.ts: -------------------------------------------------------------------------------- ```typescript import { describe, it, expect, beforeEach, afterEach, jest } from '@jest/globals'; import fs from 'fs/promises'; import path from 'path'; import os from 'os'; import { // Pure utility functions formatSize, normalizeLineEndings, createUnifiedDiff, // Security & validation functions validatePath, setAllowedDirectories, // File operations getFileStats, readFileContent, writeFileContent, // Search & filtering functions searchFilesWithValidation, // File editing functions applyFileEdits, tailFile, headFile } from '../lib.js'; // Mock fs module jest.mock('fs/promises'); const mockFs = fs as jest.Mocked<typeof fs>; describe('Lib Functions', () => { beforeEach(() => { jest.clearAllMocks(); // Set up allowed directories for tests const allowedDirs = process.platform === 'win32' ? ['C:\\Users\\test', 'C:\\temp', 'C:\\allowed'] : ['/home/user', '/tmp', '/allowed']; setAllowedDirectories(allowedDirs); }); afterEach(() => { jest.restoreAllMocks(); // Clear allowed directories after tests setAllowedDirectories([]); }); describe('Pure Utility Functions', () => { describe('formatSize', () => { it('formats bytes correctly', () => { expect(formatSize(0)).toBe('0 B'); expect(formatSize(512)).toBe('512 B'); expect(formatSize(1024)).toBe('1.00 KB'); expect(formatSize(1536)).toBe('1.50 KB'); expect(formatSize(1048576)).toBe('1.00 MB'); expect(formatSize(1073741824)).toBe('1.00 GB'); expect(formatSize(1099511627776)).toBe('1.00 TB'); }); it('handles edge cases', () => { expect(formatSize(1023)).toBe('1023 B'); expect(formatSize(1025)).toBe('1.00 KB'); expect(formatSize(1048575)).toBe('1024.00 KB'); }); it('handles very large numbers beyond TB', () => { // The function only supports up to TB, so very large numbers will show as TB expect(formatSize(1024 * 1024 * 1024 * 1024 * 1024)).toBe('1024.00 TB'); expect(formatSize(Number.MAX_SAFE_INTEGER)).toContain('TB'); }); it('handles negative numbers', () => { // Negative numbers will result in NaN for the log calculation expect(formatSize(-1024)).toContain('NaN'); expect(formatSize(-0)).toBe('0 B'); }); it('handles decimal numbers', () => { expect(formatSize(1536.5)).toBe('1.50 KB'); expect(formatSize(1023.9)).toBe('1023.9 B'); }); it('handles very small positive numbers', () => { expect(formatSize(1)).toBe('1 B'); expect(formatSize(0.5)).toBe('0.5 B'); expect(formatSize(0.1)).toBe('0.1 B'); }); }); describe('normalizeLineEndings', () => { it('converts CRLF to LF', () => { expect(normalizeLineEndings('line1\r\nline2\r\nline3')).toBe('line1\nline2\nline3'); }); it('leaves LF unchanged', () => { expect(normalizeLineEndings('line1\nline2\nline3')).toBe('line1\nline2\nline3'); }); it('handles mixed line endings', () => { expect(normalizeLineEndings('line1\r\nline2\nline3\r\n')).toBe('line1\nline2\nline3\n'); }); it('handles empty string', () => { expect(normalizeLineEndings('')).toBe(''); }); }); describe('createUnifiedDiff', () => { it('creates diff for simple changes', () => { const original = 'line1\nline2\nline3'; const modified = 'line1\nmodified line2\nline3'; const diff = createUnifiedDiff(original, modified, 'test.txt'); expect(diff).toContain('--- test.txt'); expect(diff).toContain('+++ test.txt'); expect(diff).toContain('-line2'); expect(diff).toContain('+modified line2'); }); it('handles CRLF normalization', () => { const original = 'line1\r\nline2\r\n'; const modified = 'line1\nmodified line2\n'; const diff = createUnifiedDiff(original, modified); expect(diff).toContain('-line2'); expect(diff).toContain('+modified line2'); }); it('handles identical content', () => { const content = 'line1\nline2\nline3'; const diff = createUnifiedDiff(content, content); // Should not contain any +/- lines for identical content (excluding header lines) expect(diff.split('\n').filter((line: string) => line.startsWith('+++') || line.startsWith('---'))).toHaveLength(2); expect(diff.split('\n').filter((line: string) => line.startsWith('+') && !line.startsWith('+++'))).toHaveLength(0); expect(diff.split('\n').filter((line: string) => line.startsWith('-') && !line.startsWith('---'))).toHaveLength(0); }); it('handles empty content', () => { const diff = createUnifiedDiff('', ''); expect(diff).toContain('--- file'); expect(diff).toContain('+++ file'); }); it('handles default filename parameter', () => { const diff = createUnifiedDiff('old', 'new'); expect(diff).toContain('--- file'); expect(diff).toContain('+++ file'); }); it('handles custom filename', () => { const diff = createUnifiedDiff('old', 'new', 'custom.txt'); expect(diff).toContain('--- custom.txt'); expect(diff).toContain('+++ custom.txt'); }); }); }); describe('Security & Validation Functions', () => { describe('validatePath', () => { // Use Windows-compatible paths for testing const allowedDirs = process.platform === 'win32' ? ['C:\\Users\\test', 'C:\\temp'] : ['/home/user', '/tmp']; beforeEach(() => { mockFs.realpath.mockImplementation(async (path: any) => path.toString()); }); it('validates allowed paths', async () => { const testPath = process.platform === 'win32' ? 'C:\\Users\\test\\file.txt' : '/home/user/file.txt'; const result = await validatePath(testPath); expect(result).toBe(testPath); }); it('rejects disallowed paths', async () => { const testPath = process.platform === 'win32' ? 'C:\\Windows\\System32\\file.txt' : '/etc/passwd'; await expect(validatePath(testPath)) .rejects.toThrow('Access denied - path outside allowed directories'); }); it('handles non-existent files by checking parent directory', async () => { const newFilePath = process.platform === 'win32' ? 'C:\\Users\\test\\newfile.txt' : '/home/user/newfile.txt'; const parentPath = process.platform === 'win32' ? 'C:\\Users\\test' : '/home/user'; // Create an error with the ENOENT code that the implementation checks for const enoentError = new Error('ENOENT') as NodeJS.ErrnoException; enoentError.code = 'ENOENT'; mockFs.realpath .mockRejectedValueOnce(enoentError) .mockResolvedValueOnce(parentPath); const result = await validatePath(newFilePath); expect(result).toBe(path.resolve(newFilePath)); }); it('rejects when parent directory does not exist', async () => { const newFilePath = process.platform === 'win32' ? 'C:\\Users\\test\\nonexistent\\newfile.txt' : '/home/user/nonexistent/newfile.txt'; // Create errors with the ENOENT code const enoentError1 = new Error('ENOENT') as NodeJS.ErrnoException; enoentError1.code = 'ENOENT'; const enoentError2 = new Error('ENOENT') as NodeJS.ErrnoException; enoentError2.code = 'ENOENT'; mockFs.realpath .mockRejectedValueOnce(enoentError1) .mockRejectedValueOnce(enoentError2); await expect(validatePath(newFilePath)) .rejects.toThrow('Parent directory does not exist'); }); }); }); describe('File Operations', () => { describe('getFileStats', () => { it('returns file statistics', async () => { const mockStats = { size: 1024, birthtime: new Date('2023-01-01'), mtime: new Date('2023-01-02'), atime: new Date('2023-01-03'), isDirectory: () => false, isFile: () => true, mode: 0o644 }; mockFs.stat.mockResolvedValueOnce(mockStats as any); const result = await getFileStats('/test/file.txt'); expect(result).toEqual({ size: 1024, created: new Date('2023-01-01'), modified: new Date('2023-01-02'), accessed: new Date('2023-01-03'), isDirectory: false, isFile: true, permissions: '644' }); }); it('handles directory statistics', async () => { const mockStats = { size: 4096, birthtime: new Date('2023-01-01'), mtime: new Date('2023-01-02'), atime: new Date('2023-01-03'), isDirectory: () => true, isFile: () => false, mode: 0o755 }; mockFs.stat.mockResolvedValueOnce(mockStats as any); const result = await getFileStats('/test/dir'); expect(result.isDirectory).toBe(true); expect(result.isFile).toBe(false); expect(result.permissions).toBe('755'); }); }); describe('readFileContent', () => { it('reads file with default encoding', async () => { mockFs.readFile.mockResolvedValueOnce('file content'); const result = await readFileContent('/test/file.txt'); expect(result).toBe('file content'); expect(mockFs.readFile).toHaveBeenCalledWith('/test/file.txt', 'utf-8'); }); it('reads file with custom encoding', async () => { mockFs.readFile.mockResolvedValueOnce('file content'); const result = await readFileContent('/test/file.txt', 'ascii'); expect(result).toBe('file content'); expect(mockFs.readFile).toHaveBeenCalledWith('/test/file.txt', 'ascii'); }); }); describe('writeFileContent', () => { it('writes file content', async () => { mockFs.writeFile.mockResolvedValueOnce(undefined); await writeFileContent('/test/file.txt', 'new content'); expect(mockFs.writeFile).toHaveBeenCalledWith('/test/file.txt', 'new content', { encoding: "utf-8", flag: 'wx' }); }); }); }); describe('Search & Filtering Functions', () => { describe('searchFilesWithValidation', () => { beforeEach(() => { mockFs.realpath.mockImplementation(async (path: any) => path.toString()); }); it('excludes files matching exclude patterns', async () => { const mockEntries = [ { name: 'test.txt', isDirectory: () => false }, { name: 'test.log', isDirectory: () => false }, { name: 'node_modules', isDirectory: () => true } ]; mockFs.readdir.mockResolvedValueOnce(mockEntries as any); const testDir = process.platform === 'win32' ? 'C:\\allowed\\dir' : '/allowed/dir'; const allowedDirs = process.platform === 'win32' ? ['C:\\allowed'] : ['/allowed']; // Mock realpath to return the same path for validation to pass mockFs.realpath.mockImplementation(async (inputPath: any) => { const pathStr = inputPath.toString(); // Return the path as-is for validation return pathStr; }); const result = await searchFilesWithValidation( testDir, '*test*', allowedDirs, { excludePatterns: ['*.log', 'node_modules'] } ); const expectedResult = process.platform === 'win32' ? 'C:\\allowed\\dir\\test.txt' : '/allowed/dir/test.txt'; expect(result).toEqual([expectedResult]); }); it('handles validation errors during search', async () => { const mockEntries = [ { name: 'test.txt', isDirectory: () => false }, { name: 'invalid_file.txt', isDirectory: () => false } ]; mockFs.readdir.mockResolvedValueOnce(mockEntries as any); // Mock validatePath to throw error for invalid_file.txt mockFs.realpath.mockImplementation(async (path: any) => { if (path.toString().includes('invalid_file.txt')) { throw new Error('Access denied'); } return path.toString(); }); const testDir = process.platform === 'win32' ? 'C:\\allowed\\dir' : '/allowed/dir'; const allowedDirs = process.platform === 'win32' ? ['C:\\allowed'] : ['/allowed']; const result = await searchFilesWithValidation( testDir, '*test*', allowedDirs, {} ); // Should only return the valid file, skipping the invalid one const expectedResult = process.platform === 'win32' ? 'C:\\allowed\\dir\\test.txt' : '/allowed/dir/test.txt'; expect(result).toEqual([expectedResult]); }); it('handles complex exclude patterns with wildcards', async () => { const mockEntries = [ { name: 'test.txt', isDirectory: () => false }, { name: 'test.backup', isDirectory: () => false }, { name: 'important_test.js', isDirectory: () => false } ]; mockFs.readdir.mockResolvedValueOnce(mockEntries as any); const testDir = process.platform === 'win32' ? 'C:\\allowed\\dir' : '/allowed/dir'; const allowedDirs = process.platform === 'win32' ? ['C:\\allowed'] : ['/allowed']; const result = await searchFilesWithValidation( testDir, '*test*', allowedDirs, { excludePatterns: ['*.backup'] } ); const expectedResults = process.platform === 'win32' ? [ 'C:\\allowed\\dir\\test.txt', 'C:\\allowed\\dir\\important_test.js' ] : [ '/allowed/dir/test.txt', '/allowed/dir/important_test.js' ]; expect(result).toEqual(expectedResults); }); }); }); describe('File Editing Functions', () => { describe('applyFileEdits', () => { beforeEach(() => { mockFs.readFile.mockResolvedValue('line1\nline2\nline3\n'); mockFs.writeFile.mockResolvedValue(undefined); }); it('applies simple text replacement', async () => { const edits = [ { oldText: 'line2', newText: 'modified line2' } ]; mockFs.rename.mockResolvedValueOnce(undefined); const result = await applyFileEdits('/test/file.txt', edits, false); expect(result).toContain('modified line2'); // Should write to temporary file then rename expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), 'line1\nmodified line2\nline3\n', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), '/test/file.txt' ); }); it('handles dry run mode', async () => { const edits = [ { oldText: 'line2', newText: 'modified line2' } ]; const result = await applyFileEdits('/test/file.txt', edits, true); expect(result).toContain('modified line2'); expect(mockFs.writeFile).not.toHaveBeenCalled(); }); it('applies multiple edits sequentially', async () => { const edits = [ { oldText: 'line1', newText: 'first line' }, { oldText: 'line3', newText: 'third line' } ]; mockFs.rename.mockResolvedValueOnce(undefined); await applyFileEdits('/test/file.txt', edits, false); expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), 'first line\nline2\nthird line\n', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), '/test/file.txt' ); }); it('handles whitespace-flexible matching', async () => { mockFs.readFile.mockResolvedValue(' line1\n line2\n line3\n'); const edits = [ { oldText: 'line2', newText: 'modified line2' } ]; mockFs.rename.mockResolvedValueOnce(undefined); await applyFileEdits('/test/file.txt', edits, false); expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), ' line1\n modified line2\n line3\n', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), '/test/file.txt' ); }); it('throws error for non-matching edits', async () => { const edits = [ { oldText: 'nonexistent line', newText: 'replacement' } ]; await expect(applyFileEdits('/test/file.txt', edits, false)) .rejects.toThrow('Could not find exact match for edit'); }); it('handles complex multi-line edits with indentation', async () => { mockFs.readFile.mockResolvedValue('function test() {\n console.log("hello");\n return true;\n}'); const edits = [ { oldText: ' console.log("hello");\n return true;', newText: ' console.log("world");\n console.log("test");\n return false;' } ]; mockFs.rename.mockResolvedValueOnce(undefined); await applyFileEdits('/test/file.js', edits, false); expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.js\.[a-f0-9]+\.tmp$/), 'function test() {\n console.log("world");\n console.log("test");\n return false;\n}', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.js\.[a-f0-9]+\.tmp$/), '/test/file.js' ); }); it('handles edits with different indentation patterns', async () => { mockFs.readFile.mockResolvedValue(' if (condition) {\n doSomething();\n }'); const edits = [ { oldText: 'doSomething();', newText: 'doSomethingElse();\n doAnotherThing();' } ]; mockFs.rename.mockResolvedValueOnce(undefined); await applyFileEdits('/test/file.js', edits, false); expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.js\.[a-f0-9]+\.tmp$/), ' if (condition) {\n doSomethingElse();\n doAnotherThing();\n }', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.js\.[a-f0-9]+\.tmp$/), '/test/file.js' ); }); it('handles CRLF line endings in file content', async () => { mockFs.readFile.mockResolvedValue('line1\r\nline2\r\nline3\r\n'); const edits = [ { oldText: 'line2', newText: 'modified line2' } ]; mockFs.rename.mockResolvedValueOnce(undefined); await applyFileEdits('/test/file.txt', edits, false); expect(mockFs.writeFile).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), 'line1\nmodified line2\nline3\n', 'utf-8' ); expect(mockFs.rename).toHaveBeenCalledWith( expect.stringMatching(/\/test\/file\.txt\.[a-f0-9]+\.tmp$/), '/test/file.txt' ); }); }); describe('tailFile', () => { it('handles empty files', async () => { mockFs.stat.mockResolvedValue({ size: 0 } as any); const result = await tailFile('/test/empty.txt', 5); expect(result).toBe(''); expect(mockFs.open).not.toHaveBeenCalled(); }); it('calls stat to check file size', async () => { mockFs.stat.mockResolvedValue({ size: 100 } as any); // Mock file handle with proper typing const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; mockFileHandle.read.mockResolvedValue({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); await tailFile('/test/file.txt', 2); expect(mockFs.stat).toHaveBeenCalledWith('/test/file.txt'); expect(mockFs.open).toHaveBeenCalledWith('/test/file.txt', 'r'); }); it('handles files with content and returns last lines', async () => { mockFs.stat.mockResolvedValue({ size: 50 } as any); const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; // Simulate reading file content in chunks mockFileHandle.read .mockResolvedValueOnce({ bytesRead: 20, buffer: Buffer.from('line3\nline4\nline5\n') }) .mockResolvedValueOnce({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); const result = await tailFile('/test/file.txt', 2); expect(mockFileHandle.close).toHaveBeenCalled(); }); it('handles read errors gracefully', async () => { mockFs.stat.mockResolvedValue({ size: 100 } as any); const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; mockFileHandle.read.mockResolvedValue({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); await tailFile('/test/file.txt', 5); expect(mockFileHandle.close).toHaveBeenCalled(); }); }); describe('headFile', () => { it('opens file for reading', async () => { // Mock file handle with proper typing const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; mockFileHandle.read.mockResolvedValue({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); await headFile('/test/file.txt', 2); expect(mockFs.open).toHaveBeenCalledWith('/test/file.txt', 'r'); }); it('handles files with content and returns first lines', async () => { const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; // Simulate reading file content with newlines mockFileHandle.read .mockResolvedValueOnce({ bytesRead: 20, buffer: Buffer.from('line1\nline2\nline3\n') }) .mockResolvedValueOnce({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); const result = await headFile('/test/file.txt', 2); expect(mockFileHandle.close).toHaveBeenCalled(); }); it('handles files with leftover content', async () => { const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; // Simulate reading file content without final newline mockFileHandle.read .mockResolvedValueOnce({ bytesRead: 15, buffer: Buffer.from('line1\nline2\nend') }) .mockResolvedValueOnce({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); const result = await headFile('/test/file.txt', 5); expect(mockFileHandle.close).toHaveBeenCalled(); }); it('handles reaching requested line count', async () => { const mockFileHandle = { read: jest.fn(), close: jest.fn() } as any; // Simulate reading exactly the requested number of lines mockFileHandle.read .mockResolvedValueOnce({ bytesRead: 12, buffer: Buffer.from('line1\nline2\n') }) .mockResolvedValueOnce({ bytesRead: 0 }); mockFileHandle.close.mockResolvedValue(undefined); mockFs.open.mockResolvedValue(mockFileHandle); const result = await headFile('/test/file.txt', 2); expect(mockFileHandle.close).toHaveBeenCalled(); }); }); }); }); ```