# Directory Structure ``` ├── .buildkite │ ├── diff │ ├── docker.yml │ ├── pipeline.yml │ └── pull-requests.json ├── .dockerignore ├── .env-example ├── .github │ └── workflows │ ├── build.yml │ └── stale.yml ├── .gitignore ├── Cargo.lock ├── Cargo.toml ├── catalog-info.yaml ├── Dockerfile ├── Dockerfile-8000 ├── docs │ └── CONTRIBUTING.md ├── elastic-mcp.json5 ├── LICENSE ├── Makefile ├── NOTICE.txt ├── README.md ├── renovate.json ├── rustfmt.toml ├── scripts │ └── cargo-run.sh ├── src │ ├── bin │ │ ├── elasticsearch-core-mcp-server.rs │ │ └── start_http.rs │ ├── cli.rs │ ├── lib.rs │ ├── protocol │ │ ├── http.rs │ │ ├── mod.rs │ │ └── stdio.rs │ ├── servers │ │ ├── elasticsearch │ │ │ ├── base_tools.rs │ │ │ └── mod.rs │ │ └── mod.rs │ └── utils │ ├── interpolator.rs │ ├── mod.rs │ └── rmcp_ext.rs └── tests └── http_tests.rs ``` # Files -------------------------------------------------------------------------------- /.dockerignore: -------------------------------------------------------------------------------- ``` docs target .idea .vscode ``` -------------------------------------------------------------------------------- /.env-example: -------------------------------------------------------------------------------- ``` # The MCP server looks for `.env` files to populate environment variables that aren't already set # Copy and edit this file (it's listed in .gitignore) ES_URL="http://localhost:9200" ES_API_KEY="<my-api-key>" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Generated by Cargo # will have compiled files and executables debug/ target/ # These are backup files generated by rustfmt **/*.rs.bk # MSVC Windows builds of rustc generate these, which store debugging information *.pdb # Generated by cargo mutants # Contains mutation testing data **/mutants.out*/ .idea/ .vscode/ .env ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Elasticsearch MCP Server > [!CAUTION] > > **WARNING: this MCP server is EXPERIMENTAL.** Connect to your Elasticsearch data directly from any MCP Client using the Model Context Protocol (MCP). This server connects agents to your Elasticsearch data using the Model Context Protocol. It allows you to interact with your Elasticsearch indices through natural language conversations. ## Available Tools * `list_indices`: List all available Elasticsearch indices * `get_mappings`: Get field mappings for a specific Elasticsearch index * `search`: Perform an Elasticsearch search with the provided query DSL * `esql`: Perform an ES|QL query * `get_shards`: Get shard information for all or specific indices ## Prerequisites * An Elasticsearch instance * Elasticsearch authentication credentials (API key or username/password) * An MCP Client (e.g. [Claude Desktop](https://claude.ai/download), [Goose](https://block.github.io/goose/)) **Supported Elasticsearch versions** This works with Elasticsearch versions `8.x` and `9.x`. ## Installation & Setup > [!NOTE] > > Versions 0.3.1 and earlier were installed via `npm`. These versions are deprecated and no longer supported. The following instructions only apply to 0.4.0 and later. > > To view instructions for versions 0.3.1 and earlier, see the [README for v0.3.1](https://github.com/elastic/mcp-server-elasticsearch/tree/v0.3.1). This MCP server is provided as a Docker image at `docker.elastic.co/mcp/elasticsearch` that supports MCP's stdio, SSE and streamable-HTTP protocols. Running this container without any argument will output a usage message: ``` docker run docker.elastic.co/mcp/elasticsearch ``` ``` Usage: elasticsearch-mcp-server <COMMAND> Commands: stdio Start a stdio server http Start a streamable-HTTP server with optional SSE support help Print this message or the help of the given subcommand(s) Options: -h, --help Print help -V, --version Print version ``` ### Using the stdio protocol The MCP server needs environment variables to be set: * `ES_URL`: the URL of your Elasticsearch cluster * For authentication use either an API key or basic authentication: * API key: `ES_API_KEY` * Basic auth: `ES_USERNAME` and `ES_PASSWORD` * Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting to Elasticsearch. The ability to provide a custom certificate will be added in a later version. The MCP server is started in stdio mode with this command: ```bash docker run -i --rm -e ES_URL -e ES_API_KEY docker.elastic.co/mcp/elasticsearch stdio ``` The configuration for Claude Desktop is as follows: ```json { "mcpServers": { "elasticsearch-mcp-server": { "command": "docker", "args": [ "run", "-i", "--rm", "-e", "ES_URL", "-e", "ES_API_KEY", "docker.elastic.co/mcp/elasticsearch", "stdio" ], "env": { "ES_URL": "<elasticsearch-cluster-url>", "ES_API_KEY": "<elasticsearch-API-key>" } } } } ``` ### Using the streamable-HTTP and SSE protocols Note: streamable-HTTP is recommended, as [SSE is deprecated](https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse-deprecated). The MCP server needs environment variables to be set: * `ES_URL`, the URL of your Elasticsearch cluster * For authentication use either an API key or basic authentication: * API key: `ES_API_KEY` * Basic auth: `ES_USERNAME` and `ES_PASSWORD` * Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting to Elasticsearch. The ability to provide a custom certificate will be added in a later version. The MCP server is started in http mode with this command: ```bash docker run --rm -e ES_URL -e ES_API_KEY -p 8080:8080 docker.elastic.co/mcp/elasticsearch http ``` If for some reason your execution environment doesn't allow passing parameters to the container, they can be passed using the `CLI_ARGS` environment variable: `docker run --rm -e ES_URL -e ES_API_KEY -e CLI_ARGS=http -p 8080:8080...` The streamable-HTTP endpoint is at `http:<host>:8080/mcp`. There's also a health check at `http:<host>:8080/ping` Configuration for Claude Desktop (free edition that only supports the stdio protocol). 1. Install `mcp-proxy` (or an equivalent), that will bridge stdio to streamable-http. The executable will be installed in `~/.local/bin`: ```bash uv tool install mcp-proxy ``` 2. Add this configuration to Claude Desktop: ```json { "mcpServers": { "elasticsearch-mcp-server": { "command": "/<home-directory>/.local/bin/mcp-proxy", "args": [ "--transport=streamablehttp", "--header", "Authorization", "ApiKey <elasticsearch-API-key>", "http://<mcp-server-host>:<mcp-server-port>/mcp" ] } } } ``` ``` -------------------------------------------------------------------------------- /docs/CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown # Contributing [fork]: https://github.com/elastic/mcp-server-elasticsearch/fork [pr]: https://github.com/elastic/mcp-server-elasticsearch/compare [code-of-conduct]: https://www.elastic.co/community/codeofconduct Elasticsearch MCP Server is open source, and we love to receive contributions from our community — you! There are many ways to contribute, from writing tutorials or blog posts, improving the documentation, submitting bug reports and feature requests or writing code. Contributions are [released](https://help.github.com/articles/github-terms-of-service/#6-contributions-under-repository-license) under the [project's license](../LICENSE). Please note that this project follows the [Elastic's Open Source Community Code of Conduct][code-of-conduct]. ## Setup 1. Install Rust (using [rustup](https://www.rust-lang.org/tools/install) is recommended) 2. Build the project: ```sh cargo build ``` or to build the Docker image, run: ```sh docker build -t mcp/elasticsearch ``` ## Start Elasticsearch You can use either: 1. **Elastic Cloud** - Use an existing Elasticsearch deployment and your API key 2. **Local Elasticsearch** - Run Elasticsearch locally using the [start-local](https://www.elastic.co/guide/en/elasticsearch/reference/current/run-elasticsearch-locally.html) script: ```bash curl -fsSL https://elastic.co/start-local | sh ``` This starts Elasticsearch and Kibana with Docker: - Elasticsearch: <http://localhost:9200> - Kibana: <http://localhost:5601> > [!NOTE] > The `start-local` setup is for development only. It uses basic authentication and disables HTTPS. ## Development Workflow 1. [Fork][fork] and clone the repository 2. Create a new branch: `git checkout -b my-branch-name` 3. Make your changes and add tests 4. Fix `cargo clippy` warnings, run `cargo fmt` and `cargo test` 5. Test locally with the MCP Inspector: ```bash npx @modelcontextprotocol/inspector ``` 7. [Test with MCP Client](../README.md#installation--setup) 8. Push to your fork and [submit a pull request][pr] ## Best Practices - Follow existing code style and patterns - Write [conventional commits](https://www.conventionalcommits.org/) - Include tests for your changes - Keep PRs focused on a single concern - Update documentation as needed ## Getting Help - Open an issue in the repository - Ask questions on [discuss.elastic.co](https://discuss.elastic.co/) ## Resources - [How to Contribute to Open Source](https://opensource.guide/how-to-contribute/) - [Using Pull Requests](https://help.github.com/articles/about-pull-requests/) - [Elastic Code of Conduct][code-of-conduct] ``` -------------------------------------------------------------------------------- /rustfmt.toml: -------------------------------------------------------------------------------- ```toml max_width = 120 ``` -------------------------------------------------------------------------------- /NOTICE.txt: -------------------------------------------------------------------------------- ``` Elasticsearch MCP Server Copyright 2025 Elasticsearch B.V. ``` -------------------------------------------------------------------------------- /scripts/cargo-run.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash cd "$(dirname $0)"/.. exec cargo run "$@" ``` -------------------------------------------------------------------------------- /renovate.json: -------------------------------------------------------------------------------- ```json { "$schema": "https://docs.renovatebot.com/renovate-schema.json", "extends": [ "local>elastic/renovate-config" ], "schedule": [ "after 1am on monday" ] } ``` -------------------------------------------------------------------------------- /.buildkite/docker.yml: -------------------------------------------------------------------------------- ```yaml --- # $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json steps: - label: "Build and publish Docker image" command: "make docker-push-elastic" agents: provider: "gcp" ``` -------------------------------------------------------------------------------- /.buildkite/pull-requests.json: -------------------------------------------------------------------------------- ```json { "jobs": [ { "enabled": true, "pipelineSlug": "mcp-server-elasticsearch", "allow_org_users": true, "allowed_repo_permissions": [ "admin", "write" ], "allowed_list": [], "set_commit_status": true, "commit_status_context": "buildkite/mcp-server-elasticsearch", "build_on_commit": false, "build_on_comment": true, "trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))", "always_trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))", "skip_ci_labels": [ "skip-ci" ], "skip_target_branches": [], "always_require_ci_on_changed": [] } ] } ``` -------------------------------------------------------------------------------- /src/protocol/mod.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. pub mod http; pub mod stdio; ``` -------------------------------------------------------------------------------- /src/protocol/stdio.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Empty for now, stdio is handled in `lib.rs`. ``` -------------------------------------------------------------------------------- /.github/workflows/stale.yml: -------------------------------------------------------------------------------- ```yaml --- name: "Close stale issues and PRs" on: schedule: - cron: "30 1 * * *" jobs: stale: runs-on: ubuntu-latest steps: - uses: actions/stale@5bef64f19d7facfb25b37b414482c7164d639639 # v9 with: stale-issue-label: stale stale-pr-label: stale days-before-stale: 60 days-before-close: 14 exempt-issue-labels: "good first issue" close-issue-label: closed-stale close-pr-label: closed-stale stale-issue-message: "This issue is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days." stale-pr-message: "This pull request is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days." ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Copyright Elasticsearch B.V. and contributors # SPDX-License-Identifier: Apache-2.0 # To create a multi-arch image, run: # docker buildx build --platform linux/amd64,linux/arm64 --tag elasticsearch-core-mcp-server . FROM rust:1.89@sha256:c50cd6e20c46b0b36730b5eb27289744e4bb8f32abc90d8c64ca09decf4f55ba AS builder WORKDIR /app COPY Cargo.toml Cargo.lock ./ # Cache dependencies RUN mkdir -p ./src/bin && \ echo "pub fn main() {}" > ./src/bin/elasticsearch-core-mcp-server.rs && \ cargo build --release COPY src ./src/ RUN cargo build --release #-------------------------------------------------------------------------------------------------- FROM cgr.dev/chainguard/wolfi-base:latest COPY --from=builder /app/target/release/elasticsearch-core-mcp-server /usr/local/bin/elasticsearch-core-mcp-server ENV CONTAINER_MODE=true EXPOSE 8080/tcp ENTRYPOINT ["/usr/local/bin/elasticsearch-core-mcp-server"] ``` -------------------------------------------------------------------------------- /.buildkite/pipeline.yml: -------------------------------------------------------------------------------- ```yaml --- # $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json steps: - label: "Triggering pipelines" plugins: monorepo-diff#v1.4.0: diff: ".buildkite/diff ${BUILDKITE_COMMIT}" wait: true watch: # if our Renovate configuration is amended, then make sure we have well-formed config # for more info, see https://docs.elastic.dev/plat-prod-team/service-catalogue/renovate/testing-renovate-changes - path: "renovate.json" config: label: "Verify Renovate configuration" command: "renovate-config-validator" agents: image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681" # if our catalog-info.yaml is changed, make sure it's well-formed according to our internal standards as well as Backstage's validation - path: "catalog-info.yaml" config: command: "/agent/check-catalog-info.sh" agents: image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681" ``` -------------------------------------------------------------------------------- /src/bin/start_http.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use elasticsearch_core_mcp_server::cli::HttpCommand; use elasticsearch_core_mcp_server::run_http; /// Start the MCP http server with the local configuration. /// Useful for debugging from the IDE. #[tokio::main] pub async fn main() -> anyhow::Result<()> { println!("Current directory: {:?}", std::env::current_dir()?); run_http(HttpCommand { config: Some("elastic-mcp.json5".parse()?), address: None, sse: true, }, false) .await?; Ok(()) } ``` -------------------------------------------------------------------------------- /src/utils/mod.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use serde::{Deserialize, Deserializer}; pub mod interpolator; pub mod rmcp_ext; /// Deserialize a string, and return `None` if it's empty. Useful for configuration fields like /// `"foo": "${SOME_ENV_VAR:}"` that uses an env var if present without failing if missing. pub fn none_if_empty_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> { let s: Option<String> = Deserialize::deserialize(deserializer)?; match s { Some(s) if s.is_empty() => Ok(None), _ => Ok(s), } } ``` -------------------------------------------------------------------------------- /src/utils/rmcp_ext.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Various extensions and utilities for the Rust MCP sdk. use rmcp::{RoleServer, Service}; use std::sync::Arc; /// A factory to create server (`Service<RoleServer>`) instances. pub struct ServerProvider<S: Service<RoleServer>>(pub Arc<dyn Fn() -> S + Send + Sync>); impl<S: Service<RoleServer>, F: Fn() -> S + Send + Sync + 'static> From<F> for ServerProvider<S> { fn from(value: F) -> Self { ServerProvider(Arc::new(value)) } } impl<S: Service<RoleServer>> From<Arc<dyn Fn() -> S + Send + Sync>> for ServerProvider<S> { fn from(value: Arc<dyn Fn() -> S + Send + Sync>) -> Self { ServerProvider(value) } } ``` -------------------------------------------------------------------------------- /src/servers/mod.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use serde::{Deserialize, Serialize}; pub mod elasticsearch; /// Inclusion or exclusion list. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum IncludeExclude { Include(Vec<String>), Exclude(Vec<String>), } impl IncludeExclude { pub fn is_included(&self, name: &str) -> bool { use IncludeExclude::*; match self { Include(includes) => includes.iter().map(|s| s.as_str()).any(|s| s == name), Exclude(excludes) => excludes.iter().map(|s| s.as_str()).all(|s| s != name), } } pub fn filter(&self, tools: &mut Vec<rmcp::model::Tool>) { tools.retain(|t| self.is_included(&t.name)) } } ``` -------------------------------------------------------------------------------- /Cargo.toml: -------------------------------------------------------------------------------- ```toml [package] name = "elasticsearch-core-mcp-server" version = "0.4.5" edition = "2024" authors = ["Elastic.co"] license-file = "LICENSE" description = "MCP server for core Elastisearch features" homepage = "https://github.com/elastic/mcp-server-elasticsearch" repository = "https://github.com/elastic/mcp-server-elasticsearch" default-run = "elasticsearch-core-mcp-server" [dependencies] # Base stuff anyhow = "1.0" futures = "0.3" indexmap = { version = "2", features = ["serde"] } itertools = "0.12" thiserror = "2" serde = { version = "1.0", features = ["derive"] } serde_json = "1" # CLI, config clap = { version = "4", features = ["derive", "env"] } dotenvy = "0.15" serde-aux = "4" serde_json5 = "0.2" # Logging tracing = "0.1" tracing-subscriber = { version = "0.3", features = [ "env-filter", "std", "fmt", ]} elasticsearch = { version = "9.0.0-alpha.1", git = "https://github.com/elastic/elasticsearch-rs", branch = "new-with-creds" } # Async and http tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-std", "signal", "process"] } tokio-util = "0.7" axum = "0.8" http = "1.3.1" # Schemars: keep in sync with rmcp schemars = { version = "0.8", features = ["chrono"] } reqwest = "0.12" futures-util = "0.3" # MCP rust sdk: main branch, 2025-06-26 [dependencies.rmcp] features = [ "server", "auth", "transport-sse-server", "transport-streamable-http-server", "transport-io", # stdio "client", "transport-sse-client", "transport-streamable-http-client", "transport-child-process", ] # Keep rev in sync with rmcp-macros below version = "0.2.1" [dependencies.rmcp-macros] version = "0.2.1" [dev-dependencies] sse-stream = "0.2" [profile.release] codegen-units = 1 strip = true lto = true opt-level = "z" # Note: do not add `panic = "abort"` since tower-http has a panic-handling middleware ``` -------------------------------------------------------------------------------- /elastic-mcp.json5: -------------------------------------------------------------------------------- ``` { // Configure the target Elasticsearch server "elasticsearch": { "url": "${ES_URL}", "api_key": "${ES_API_KEY:}", "username": "${ES_USERNAME:}", "password": "${ES_PASSWORD:}", "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}", /* WIP "tools": { // Exclude the "search" builtin tool as it's too broad "exclude": ["search"], // Custom tools "custom": { // An ES|QL query "add-42": { "type": "esql", "description": "Adds 42 to the input value", "query": "row value = ?value | eval result = value + 42 | keep result", "parameters": { "value": { "title": "The value", "type": "number" } } }, // A stored search template "a-stored-template": { "type": "search_template", "description": "This is the description for this stored template", "template_id": "my-template", "parameters": { "param_1": { "title": "The first parameter", "description": "Use this parameter to blah blah and blah", "type": "string" } } }, // An inline search template "an-inline-template": { "type": "search_template", "description": "This is the description for this inline template", "template": { "query": { "term": { "some-field": "{{param_1}}" } } }, "parameters": { "param_1": { "title": "The first parameter", "description": "Use this parameter to blah blah and blah", "type": "string" } } } } } */ } } ``` -------------------------------------------------------------------------------- /src/bin/elasticsearch-core-mcp-server.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use std::io::ErrorKind; use clap::Parser; use elasticsearch_core_mcp_server::cli::Cli; use tracing_subscriber::EnvFilter; // To test with stdio, use npx @modelcontextprotocol/inspector cargo run -p elastic-mcp #[tokio::main] async fn main() -> anyhow::Result<()> { // Also accept .env files match dotenvy::dotenv() { Err(dotenvy::Error::Io(io_err)) if io_err.kind() == ErrorKind::NotFound => {} Err(err) => return Err(err)?, Ok(_) => {} } let env_args = std::env::vars().find(|(k, _v)| k == "CLI_ARGS").map(|(_k, v)| v); let cli = if let Some(env_args) = env_args { // Concatenate arg[0] with the ARGS value split on whitespaces // Note: we don't handle shell-style string quoting and character escaping let arg0 = std::env::args().next().unwrap(); let mut args = vec![arg0.as_str()]; args.extend(env_args.split_whitespace()); Cli::parse_from(args) } else { Cli::parse() }; // Initialize the tracing subscriber with file and stdout logging tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())) .with_writer(std::io::stderr) .with_ansi(false) .init(); tracing::info!("Elasticsearch MCP server, version {}", env!("CARGO_PKG_VERSION")); cli.run().await } ``` -------------------------------------------------------------------------------- /catalog-info.yaml: -------------------------------------------------------------------------------- ```yaml --- # yaml-language-server: $schema=https://json.schemastore.org/catalog-info.json apiVersion: backstage.io/v1alpha1 kind: Component metadata: name: mcp-server-elasticsearch spec: type: library owner: group:devtools-team lifecycle: beta --- # yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json apiVersion: backstage.io/v1alpha1 kind: Resource metadata: name: buildkite-pipeline-mcp-server-elasticsearch description: Buildkite Pipeline for mcp-server-elasticsearch links: - title: Pipeline url: https://buildkite.com/elastic/mcp-server-elasticsearch spec: type: buildkite-pipeline owner: group:devtools-team system: buildkite implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline metadata: name: mcp-server-elasticsearch description: Run checks for the mcp-server-elasticsearch package spec: repository: elastic/mcp-server-elasticsearch pipeline_file: ".buildkite/pipeline.yml" teams: devtools-team: access_level: MANAGE_BUILD_AND_READ everyone: access_level: READ_ONLY --- # yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json apiVersion: backstage.io/v1alpha1 kind: Resource metadata: name: mcp-server-elasticsearch-docker description: Build and publish Docker images for mcp-server-elasticsearch spec: type: buildkite-pipeline owner: group:devtools-team system: buildkite implementation: apiVersion: buildkite.elastic.dev/v1 kind: Pipeline metadata: name: mcp-server-elasticsearch-docker description: Build and publish Docker images for mcp-server-elasticsearch spec: repository: elastic/mcp-server-elasticsearch pipeline_file: ".buildkite/docker.yml" teams: devtools-team: access_level: MANAGE_BUILD_AND_READ everyone: access_level: READ_ONLY provider_settings: build_pull_requests: false build_branches: false build_tags: true cancel_intermediate_builds: true ``` -------------------------------------------------------------------------------- /.github/workflows/build.yml: -------------------------------------------------------------------------------- ```yaml name: Build Rust binaries on: release: types: [published] workflow_dispatch: inputs: version: description: Version to build (e.g. v0.4.1) required: true type: string jobs: build-binary: runs-on: ${{ matrix.target.runner }} permissions: contents: write id-token: write strategy: fail-fast: false matrix: target: - name: linux-x86_64 runner: ubuntu-latest ext: "" target: x86_64-unknown-linux-gnu - name: windows-x86_64 runner: windows-latest ext: ".exe" target: x86_64-pc-windows-msvc - name: macos-x86_64 runner: macos-latest ext: "" target: x86_64-apple-darwin - name: linux-arm64 runner: ubuntu-latest ext: "" target: aarch64-unknown-linux-gnu - name: windows-arm64 runner: windows-latest ext: ".exe" target: aarch64-pc-windows-msvc - name: macos-arm64 runner: macos-latest ext: "" target: aarch64-apple-darwin steps: - name: Get release tag value id: version-tag shell: bash run: | if [ -n "${{ inputs.version }}" ]; then echo "ref=${{ inputs.version }}" >> $GITHUB_OUTPUT else ref=$(echo "$GITHUB_REF" | cut -d '/' -f3) echo "ref=$ref" >> $GITHUB_OUTPUT fi - name: Checkout uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 with: path: checkout-main - name: Checkout uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 with: ref: ${{ steps.version-tag.outputs.ref }} path: checkout-tag - name: Add target config for arm64 on Linux if: ${{ matrix.target.name == 'linux-arm64' }} run: | sudo apt update && sudo apt install -y gcc-aarch64-linux-gnu mkdir -p checkout-tag/.cargo echo '[target.aarch64-unknown-linux-gnu]' >> checkout-tag/.cargo/config echo 'linker = "aarch64-linux-gnu-gcc"' >> checkout-tag/.cargo/config - name: Build binary uses: houseabsolute/actions-rust-cross@9a1618ffb70e8374ab5f48fcccea3ebeacf57971 # v1.0.5 with: command: build target: ${{ matrix.target.target }} args: "--locked --release" working-directory: ${{ github.workspace }}/checkout-tag - name: Upload binaries to release uses: svenstaro/upload-release-action@v2 with: tag: ${{ steps.version-tag.outputs.ref }} file: checkout-tag/target/${{ matrix.target.target }}/release/elasticsearch-core-mcp-server${{ matrix.target.ext }} asset_name: elasticsearch-core-mcp-server-${{ steps.version-tag.outputs.ref }}-${{ matrix.target.name }}${{ matrix.target.ext }} overwrite: true ``` -------------------------------------------------------------------------------- /src/cli.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use crate::servers::elasticsearch; use clap::Parser; use clap::{Args, Subcommand}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; /// Elastic MCP server #[derive(Debug, Parser)] #[command(version)] pub struct Cli { /// Container mode: change default http address, rewrite localhost to the host's address #[clap(global=true, long, env = "CONTAINER_MODE")] pub container_mode: bool, #[clap(subcommand)] pub command: Command, } #[derive(Debug, Subcommand)] pub enum Command { Stdio(StdioCommand), Http(HttpCommand), } /// Start a streamable-HTTP server with optional SSE support #[derive(Debug, Args)] pub struct HttpCommand { /// Config file #[clap(short, long)] pub config: Option<PathBuf>, /// Address to listen to [default: 127.0.0.1:8080] #[clap(long, value_name = "IP_ADDRESS:PORT", env = "HTTP_ADDRESS")] pub address: Option<std::net::SocketAddr>, /// Also start an SSE server on '/sse' #[clap(long)] pub sse: bool, } /// Start an stdio server #[derive(Debug, Args)] pub struct StdioCommand { /// Config file #[clap(short, long)] pub config: Option<PathBuf>, } //--------------------------------------------------------------- // Reference material: // https://modelcontextprotocol.io/quickstart/user // https://code.visualstudio.com/docs/copilot/chat/mcp-servers // https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line-mcp-configuration.html // https://github.com/landicefu/mcp-client-configuration-server #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Stdio { /// Command to run (e.g. "npx", "docker") pub command: String, /// Command arguments pub args: Vec<String>, /// Environment variables #[serde(default)] pub env: HashMap<String, String>, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Http { /// URL of the server pub url: String, /// HTTP headers to send with the request #[serde(default)] pub headers: HashMap<String, String>, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] #[serde(tag = "type")] pub enum McpServer { //Builtin(BuiltinConfig), Sse(Http), StreamableHttp(Http), Stdio(Stdio), } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Configuration { pub elasticsearch: elasticsearch::ElasticsearchMcpConfig, #[serde(default)] pub mcp_servers: HashMap<String, McpServer>, } ``` -------------------------------------------------------------------------------- /src/utils/interpolator.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Simple string interpolator to inject environment variables in the configuration file. use thiserror::Error; #[derive(Error, Debug)] #[error("Invalid configuration template: {reason} at {line}:{char}")] pub struct InterpolationError { pub reason: String, pub line: usize, pub char: usize, } pub fn interpolate_from_env(s: String) -> Result<String, InterpolationError> { interpolate(s, |name| std::env::var(name).ok()) } const OPEN: &str = "${"; const OPEN_LEN: usize = OPEN.len(); const CLOSE: &str = "}"; const CLOSE_LEN: usize = CLOSE.len(); /// Simple string interpolation using the `${name}` and `${name:default_value}` syntax. pub fn interpolate(s: String, lookup: impl Fn(&str) -> Option<String>) -> Result<String, InterpolationError> { if !s.contains(OPEN) { return Ok(s); } let mut result: String = String::new(); for (line_no, mut line) in s.lines().enumerate() { if line_no > 0 { result.push('\n'); } let mut char_no = 0; let err = |char_no: usize, msg: String| InterpolationError { reason: msg, line: line_no + 1, // editors (and humans) are 1-based char: char_no, }; while let Some(pos) = line.find(OPEN) { // Push text before the opening brace result.push_str(&line[..pos]); char_no += pos + OPEN_LEN; line = &line[pos + OPEN_LEN..]; if let Some(pos) = line.find(CLOSE) { let expr = &line[..pos]; let value = if let Some((name, default)) = expr.split_once(':') { lookup(name).unwrap_or(default.to_string()) } else { lookup(expr).ok_or_else(|| err(char_no, format!("env variable '{expr}' not defined")))? }; result.push_str(&value); char_no += expr.len() + CLOSE_LEN; line = &line[expr.len() + CLOSE_LEN..]; } else { return Err(err(char_no, "missing closing braces".to_string())); } } result.push_str(line); } Ok(result) } #[cfg(test)] mod tests { use super::*; fn expand(name: &str) -> Result<String, InterpolationError> { let lookup = |s: &str| match s { "foo" => Some("foo_value".to_string()), "bar" => Some("bar_value".to_string()), _ => None, }; interpolate(name.to_string(), lookup) } #[test] fn good_extrapolation() -> anyhow::Result<()> { assert_eq!("012345678", expand("012345678")?); assert_eq!("foo_value01234", expand("${foo}01234")?); assert_eq!("foo_value01234\n1234bar_value", expand("${foo}01234\n1234${bar}")?); assert_eq!("foo_value01234bar_value", expand("${foo}01234${bar}")?); assert_eq!("_01_foo_value01234bar_value567", expand("_01_${foo}01234${bar}567")?); Ok(()) } #[test] fn failed_extrapolation() { assert!(expand("${foo01234").is_err()); assert!(expand("${foo}01234${bar").is_err()); assert!(expand("${baz}01234").is_err()); } } ``` -------------------------------------------------------------------------------- /src/lib.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. pub mod cli; mod protocol; mod servers; mod utils; use crate::cli::{Cli, Command, Configuration, HttpCommand, StdioCommand}; use crate::protocol::http::{HttpProtocol, HttpServerConfig}; use crate::servers::elasticsearch; use crate::utils::interpolator; use rmcp::transport::stdio; use rmcp::transport::streamable_http_server::session::never::NeverSessionManager; use rmcp::{RoleServer, Service, ServiceExt}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use tokio::select; use tokio_util::sync::CancellationToken; impl Cli { pub async fn run(self) -> anyhow::Result<()> { match self.command { Command::Stdio(cmd) => run_stdio(cmd, self.container_mode).await, Command::Http(cmd) => run_http(cmd, self.container_mode).await, } } } pub async fn run_stdio(cmd: StdioCommand, container_mode: bool) -> anyhow::Result<()> { tracing::info!("Starting stdio server"); let handler = setup_services(&cmd.config, container_mode).await?; let service = handler.serve(stdio()).await.inspect_err(|e| { tracing::error!("serving error: {:?}", e); })?; select! { _ = service.waiting() => {}, _ = tokio::signal::ctrl_c() => {}, } Ok(()) } pub async fn run_http(cmd: HttpCommand, container_mode: bool) -> anyhow::Result<()> { let handler = setup_services(&cmd.config, container_mode).await?; let server_provider = move || handler.clone(); let address: SocketAddr = if let Some(addr) = cmd.address { addr } else if container_mode { SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8080) } else { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080) }; let ct = HttpProtocol::serve_with_config( server_provider, HttpServerConfig { bind: address, ct: CancellationToken::new(), // streaming http: keep_alive: None, stateful_mode: false, session_manager: Arc::new(NeverSessionManager::default()), }, ) .await?; tracing::info!("Starting http server at address {}", address); tokio::signal::ctrl_c().await?; ct.cancel(); Ok(()) } pub async fn setup_services(config: &Option<PathBuf>, container_mode: bool) -> anyhow::Result<impl Service<RoleServer> + Clone> { // Read config file and expand variables let config = if let Some(path) = config { std::fs::read_to_string(path)? } else { // Built-in default configuration, based on env variables. r#"{ "elasticsearch": { "url": "${ES_URL}", "api_key": "${ES_API_KEY:}", "username": "${ES_USERNAME:}", "password": "${ES_PASSWORD:}", "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}" } }"# .to_string() }; // Expand environment variables in the config file let config = interpolator::interpolate_from_env(config)?; // JSON5 adds comments and multiline strings (useful for ES|QL) to JSON let config: Configuration = match serde_json5::from_str(&config) { Ok(c) => c, Err(serde_json5::Error::Message { msg, location }) if location.is_some() => { let location = location.unwrap(); let line = location.line; let column = location.column; anyhow::bail!("Failed to parse config: {msg}, at line {line} column {column}"); } Err(err) => return Err(err)?, }; let handler = elasticsearch::ElasticsearchMcp::new_with_config(config.elasticsearch, container_mode)?; Ok(handler) } ``` -------------------------------------------------------------------------------- /src/protocol/http.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Implementation of HTTP protocols use crate::utils::rmcp_ext::ServerProvider; use axum::Router; use axum::http::StatusCode; use axum::routing::get; use rmcp::transport::sse_server::SseServerConfig; use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; use rmcp::transport::streamable_http_server::{SessionManager, StreamableHttpServerConfig}; use rmcp::transport::{SseServer, StreamableHttpService}; use rmcp::{RoleServer, Service}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; use tracing::Instrument; /// Configuration for an HTTP MCP server pub struct HttpServerConfig<M: SessionManager = LocalSessionManager> { /// TCP address to bind to pub bind: SocketAddr, /// Parent cancellation token. `serve_with_config` will return a child token pub ct: CancellationToken, /// Streamable http server option pub keep_alive: Option<Duration>, /// Streamable http server option pub stateful_mode: bool, /// Streamable http server option pub session_manager: Arc<M>, } /// An HTTP MCP server that supports both SSE and streamable HTTP. pub struct HttpProtocol {} impl HttpProtocol { pub async fn serve_with_config<S: Service<RoleServer>, M: SessionManager>( server_provider: impl Into<ServerProvider<S>>, config: HttpServerConfig<M>, ) -> std::io::Result<CancellationToken> { let server_provider = server_provider.into().0; let ct = config.ct.child_token(); // Create a streamable http router let sh_router = { let sh_config = StreamableHttpServerConfig { sse_keep_alive: config.keep_alive, stateful_mode: config.stateful_mode, }; let server_provider = server_provider.clone(); // TODO: internally, new() wraps the server provider closure with an Arc. We can avoid // "double-Arc" by having let sh_service = StreamableHttpService::new(move || Ok(server_provider()), config.session_manager, sh_config); Router::new().route_service("/", sh_service) }; // Create an SSE router let sse_router = { let sse_config = SseServerConfig { bind: config.bind, // SSE server will create a child cancellation token for every transport that is created // (see with_service() below) ct: ct.clone(), sse_keep_alive: config.keep_alive, sse_path: "/".to_string(), post_path: "/message".to_string(), }; let (sse_server, sse_router) = SseServer::new(sse_config); let _sse_ct = sse_server.with_service(move || server_provider()); sse_router }; // Health and readiness // See https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/ let health_router = { Router::new() // We may introduce a startup probe if we need to fetch/cache remote resources // during initialization // Ready: once we have the tool list we can process incoming requests .route("/ready", get(async || (StatusCode::OK, "Ready\n"))) // Live: are we alive? .route("/live", get(async || "Alive\n")) }; // Put all things together let main_router = Router::new() .route("/", get(hello)) .route("/ping", get(async || (StatusCode::OK, "Ready\n"))) .nest("/mcp/sse", sse_router) .nest("/mcp", sh_router) .nest("/_health", health_router) .with_state(()); // Start the http server let listener = tokio::net::TcpListener::bind(config.bind).await?; let server = axum::serve(listener, main_router).with_graceful_shutdown({ let ct = ct.clone(); async move { ct.cancelled().await; tracing::info!("http server cancelled"); } }); // Await the server, or it will do nothing :-) tokio::spawn( async { let _ = server.await; } .instrument(tracing::info_span!("http-server", bind_address = %config.bind)), ); Ok(ct) } } async fn hello() -> String { let version = env!("CARGO_PKG_VERSION"); format!( r#"Elasticsearch MCP server. Version {version} Endpoints: - streamable-http: /mcp - sse: /mcp/sse "# ) } #[cfg(test)] mod tests { #[test] pub fn test_parts_in_extensions() {} } ``` -------------------------------------------------------------------------------- /tests/http_tests.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use anyhow::bail; use axum::Router; use axum::extract::Path; use elasticsearch_core_mcp_server::cli; use futures_util::StreamExt; use http::HeaderMap; use http::header::{ACCEPT, CONTENT_TYPE}; use reqwest::Client; use rmcp::model::ToolAnnotations; use serde::Deserialize; use serde::de::DeserializeOwned; use serde_json::json; use sse_stream::SseStream; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener}; /// Simple smoke test #[tokio::test] async fn http_tool_list() -> anyhow::Result<()> { let addr = find_address()?; let cli = cli::Cli { container_mode: false, command: cli::Command::Http(cli::HttpCommand { config: None, address: Some(addr), sse: false, }), }; tokio::spawn(async move { cli.run().await }); let url = format!("http://127.0.0.1:{}/mcp", addr.port()); let body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list" }); let client = Client::builder().build()?; tokio::time::sleep(std::time::Duration::from_secs(1)).await; let response = client .post(url) .header(CONTENT_TYPE, "application/json") .header(ACCEPT, "application/json, text/event-stream") .json(&body) .send() .await? .error_for_status()?; let response_body: ListToolsResponse = parse_response(response).await?; let names = response_body .result .tools .iter() .map(|t| t.name.as_str()) .collect::<Vec<_>>(); assert!(names.contains(&"search")); assert!(names.contains(&"list_indices")); assert!(names.contains(&"get_mappings")); Ok(()) } // End-to-end test that spawns a mock ES server and calls the `list_indices` tool via http #[tokio::test] async fn end_to_end() -> anyhow::Result<()> { // Start an ES mock that will reply to list_indices let router = Router::new().route( "/_cat/indices/{index}", axum::routing::get(async move |headers: HeaderMap, Path(index): Path<String>| { // Check parameter forwarding assert_eq!(index, "test-index"); // Check API key assert_eq!( headers.get("Authorization").unwrap().to_str().unwrap(), "ApiKey value-from-the-test" ); axum::Json(json!([ { "index": "test-index", "status": "open", "docs.count": "100" } ])) }), ); let listener = tokio::net::TcpListener::bind(LOCALHOST_0).await?; // SAFETY: works since this is the only test in this module that sets env vars // TODO: refactor the CLI to accept an alternate source of key/values unsafe { std::env::set_var("ES_URL", format!("http://127.0.0.1:{}/", listener.local_addr()?.port())); } let server = axum::serve(listener, router); tokio::spawn(async { server.await }); // Start an http MCP server let addr = find_address()?; let cli = cli::Cli { container_mode: false, command: cli::Command::Http(cli::HttpCommand { config: None, address: Some(addr), sse: false, }), }; tokio::spawn(async move { cli.run().await }); let url = format!("http://127.0.0.1:{}/mcp", addr.port()); let body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "list_indices", "arguments": { "index_pattern": "test-index" } } }); let client = Client::builder().build()?; tokio::time::sleep(std::time::Duration::from_secs(1)).await; let response = client .post(url) .header(CONTENT_TYPE, "application/json") .header(ACCEPT, "application/json, text/event-stream") .header("Authorization", "ApiKey value-from-the-test") .json(&body) .send() .await? .error_for_status()?; let response_body: serde_json::Value = parse_response(response).await?; assert_eq!(response_body["result"]["content"][0]["text"], "Found 1 indices:"); assert_eq!( response_body["result"]["content"][1]["text"], "[{\"index\":\"test-index\",\"status\":\"open\",\"docs.count\":100}]" ); Ok(()) } const LOCALHOST_0: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); fn find_address() -> anyhow::Result<SocketAddr> { // Find a free port Ok(TcpListener::bind(LOCALHOST_0)?.local_addr()?) } async fn parse_response<T: DeserializeOwned>(response: reqwest::Response) -> anyhow::Result<T> { let result = match response.headers().get(CONTENT_TYPE) { Some(v) if v == "application/json" => response.json().await?, Some(v) if v == "text/event-stream" => { let mut stream = SseStream::from_byte_stream(response.bytes_stream()); match stream.next().await { None => bail!("No data"), Some(Err(e)) => bail!("Bad SSE stream: {}", e), Some(Ok(sse)) => { let data = sse.data.unwrap(); serde_json::from_str(&data)? } } } _ => { panic!("Unexpected content type"); } }; Ok(result) } #[derive(Debug, Deserialize)] #[allow(dead_code)] struct ListToolsResponse { jsonrpc: String, id: i64, result: ToolResult, } #[derive(Debug, Deserialize)] #[allow(dead_code)] struct ToolResult { tools: Vec<Tool>, } #[derive(Debug, Deserialize)] #[allow(dead_code)] struct Tool { name: String, description: String, input_schema: Option<serde_json::Value>, annotations: Option<ToolAnnotations>, } ``` -------------------------------------------------------------------------------- /src/servers/elasticsearch/mod.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. mod base_tools; use crate::servers::IncludeExclude; use crate::utils::none_if_empty_string; use elasticsearch::Elasticsearch; use elasticsearch::auth::Credentials; use elasticsearch::cert::CertificateValidation; use elasticsearch::http::Url; use elasticsearch::http::response::Response; use http::header::USER_AGENT; use http::request::Parts; use http::{HeaderValue, header}; use indexmap::IndexMap; use rmcp::RoleServer; use rmcp::model::ToolAnnotations; use rmcp::service::RequestContext; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_aux::field_attributes::deserialize_bool_from_anything; use std::borrow::Cow; use std::collections::HashMap; #[derive(Debug, Serialize, Deserialize)] pub struct ElasticsearchMcpConfig { /// Cluster URL pub url: String, /// API key #[serde(default, deserialize_with = "none_if_empty_string")] pub api_key: Option<String>, /// Username #[serde(default, deserialize_with = "none_if_empty_string")] pub username: Option<String>, /// Password #[serde(default, deserialize_with = "none_if_empty_string")] pub password: Option<String>, /// Should we skip SSL certificate verification? #[serde(default, deserialize_with = "deserialize_bool_from_anything")] pub ssl_skip_verify: bool, /// Search templates to expose as tools or resources #[serde(default)] pub tools: Tools, /// Prompts #[serde(default)] pub prompts: Vec<String>, // TODO: search as resources? } // A wrapper around an ES client that provides a client instance configured /// for a given request context (i.e. auth credentials) #[derive(Clone)] pub struct EsClientProvider(Elasticsearch); impl EsClientProvider { pub fn new(client: Elasticsearch) -> Self { EsClientProvider(client) } /// If the incoming request is a http request and has an `Authorization` header, use it /// to authenticate to the remote ES instance. pub fn get(&self, context: RequestContext<RoleServer>) -> Cow<'_, Elasticsearch> { let client = &self.0; let Some(mut auth) = context .extensions .get::<Parts>() .and_then(|p| p.headers.get(header::AUTHORIZATION)) .and_then(|h| h.to_str().ok()) else { // No auth return Cow::Borrowed(client); }; // MCP inspector insists on sending a bearer token and prepends "Bearer" to the value provided if auth.starts_with("Bearer ApiKey ") || auth.starts_with("Bearer Basic ") { auth = auth.trim_start_matches("Bearer "); } let transport = client .transport() .clone_with_auth(Some(Credentials::AuthorizationHeader(auth.to_string()))); Cow::Owned(Elasticsearch::new(transport)) } } #[derive(Debug, Serialize, Deserialize, Default)] pub struct Tools { #[serde(flatten)] pub incl_excl: Option<IncludeExclude>, pub custom: HashMap<String, CustomTool>, } #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum CustomTool { Esql(EsqlTool), SearchTemplate(SearchTemplateTool), } impl CustomTool { pub fn base(&self) -> &ToolBase { match self { CustomTool::Esql(esql) => &esql.base, CustomTool::SearchTemplate(search_template) => &search_template.base, } } } #[derive(Debug, Serialize, Deserialize)] pub struct ToolBase { pub description: String, pub parameters: IndexMap<String, schemars::schema::SchemaObject>, pub annotations: Option<ToolAnnotations>, } #[derive(Debug, Serialize, Deserialize)] pub struct EsqlTool { #[serde(flatten)] base: ToolBase, query: String, #[serde(default)] format: EsqlResultFormat, } #[derive(Debug, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] pub enum EsqlResultFormat { #[default] // Output as JSON, either as an array of objects or as a single object. Json, // If a single object with a single property, output only its value Value, //Csv, } #[derive(Debug, Serialize, Deserialize)] pub struct SearchTemplateTool { #[serde(flatten)] base: ToolBase, #[serde(flatten)] template: SearchTemplate, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum SearchTemplate { TemplateId(String), Template(serde_json::Value), // or constrain to an object? } #[derive(Clone)] pub struct ElasticsearchMcp {} impl ElasticsearchMcp { pub fn new_with_config(config: ElasticsearchMcpConfig, container_mode: bool) -> anyhow::Result<base_tools::EsBaseTools> { let creds = if let Some(api_key) = config.api_key.clone() { Some(Credentials::EncodedApiKey(api_key)) } else if let Some(username) = config.username.clone() { let pwd = config.password.clone().ok_or(anyhow::Error::msg("missing password"))?; Some(Credentials::Basic(username, pwd)) } else { None }; let url = config.url.as_str(); if url.is_empty() { return Err(anyhow::Error::msg("Elasticsearch URL is empty")); } let mut url = Url::parse(url)?; if container_mode { rewrite_localhost(&mut url)?; } let pool = elasticsearch::http::transport::SingleNodeConnectionPool::new(url.clone()); let mut transport = elasticsearch::http::transport::TransportBuilder::new(pool); if let Some(creds) = creds { transport = transport.auth(creds); } if config.ssl_skip_verify { transport = transport.cert_validation(CertificateValidation::None) } transport = transport.header( USER_AGENT, HeaderValue::from_str(&format!("elastic-mcp/{}", env!("CARGO_PKG_VERSION")))?, ); let transport = transport.build()?; let es_client = Elasticsearch::new(transport); Ok(base_tools::EsBaseTools::new(es_client)) } } //------------------------------------------------------------------------------------------------ // Utilities /// Rewrite urls targeting `localhost` to a hostname that maps to the container host, if possible. /// /// The host name for the container host depends on the OCI runtime used. This is useful to accept /// Elasticsearch URLs like `http://localhost:9200`. fn rewrite_localhost(url: &mut Url) -> anyhow::Result<()> { use std::net::ToSocketAddrs; let aliases = &[ "host.docker.internal", // Docker "host.containers.internal", // Podman, maybe others ]; if let Some(host) = url.host_str() && host == "localhost" { for alias in aliases { if let Ok(mut alias_add) = (*alias, 80).to_socket_addrs() && alias_add.next().is_some() { url.set_host(Some(alias))?; tracing::info!("Container mode: using '{alias}' instead of 'localhost'"); return Ok(()); } } } tracing::warn!("Container mode: cound not find a replacement for 'localhost'"); Ok(()) } /// Map any error to an internal error of the MCP server pub fn internal_error(e: impl std::error::Error) -> rmcp::Error { rmcp::Error::internal_error(e.to_string(), None) } /// Return an error as an error response to the client, which may be able to take /// action to correct it. This should be refined to handle common error types such /// as index not found, which could be caused by the client hallucinating an index name. /// /// TODO (in rmcp): if rmcp::Error had a variant that accepts a CallToolResult, this would /// allow to use the '?' operator while sending a result to the client. pub fn handle_error(result: Result<Response, elasticsearch::Error>) -> Result<Response, rmcp::Error> { match result { Ok(resp) => resp.error_for_status_code(), Err(e) => { tracing::error!("Error: {:?}", &e); Err(e) } } .map_err(internal_error) } pub async fn read_json<T: DeserializeOwned>( response: Result<Response, elasticsearch::Error>, ) -> Result<T, rmcp::Error> { // let text = read_text(response).await?; // tracing::debug!("Received json {text}"); // serde_json::from_str(&text).map_err(internal_error) let response = handle_error(response)?; response.json().await.map_err(internal_error) } #[allow(dead_code)] pub async fn read_text(result: Result<Response, elasticsearch::Error>) -> Result<String, rmcp::Error> { let response = handle_error(result)?; response.text().await.map_err(internal_error) } ``` -------------------------------------------------------------------------------- /src/servers/elasticsearch/base_tools.rs: -------------------------------------------------------------------------------- ```rust // Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use crate::servers::elasticsearch::{EsClientProvider, read_json}; use elasticsearch::cat::{CatIndicesParts, CatShardsParts}; use elasticsearch::indices::IndicesGetMappingParts; use elasticsearch::{Elasticsearch, SearchParts}; use indexmap::IndexMap; use rmcp::handler::server::tool::{Parameters, ToolRouter}; use rmcp::model::{ CallToolResult, Content, Implementation, JsonObject, ProtocolVersion, ServerCapabilities, ServerInfo, }; use rmcp::service::RequestContext; use rmcp::{RoleServer, ServerHandler}; use rmcp_macros::{tool, tool_handler, tool_router}; use serde::{Deserialize, Serialize}; use serde_aux::prelude::*; use serde_json::{Map, Value, json}; use std::collections::HashMap; #[derive(Clone)] pub struct EsBaseTools { es_client: EsClientProvider, tool_router: ToolRouter<EsBaseTools>, } impl EsBaseTools { pub fn new(es_client: Elasticsearch) -> Self { Self { es_client: EsClientProvider::new(es_client), tool_router: Self::tool_router(), } } } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] struct ListIndicesParams { /// Index pattern of Elasticsearch indices to list pub index_pattern: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] struct GetMappingsParams { /// Name of the Elasticsearch index to get mappings for index: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] struct SearchParams { /// Name of the Elasticsearch index to search index: String, /// Name of the fields that need to be returned (optional) fields: Option<Vec<String>>, /// Complete Elasticsearch query DSL object that can include query, size, from, sort, etc. query_body: Map<String, Value>, // note: just Value doesn't work, as Claude would send a string } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] struct EsqlQueryParams { /// Complete Elasticsearch ES|QL query query: String, } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] struct GetShardsParams { /// Optional index name to get shard information for index: Option<String>, } #[tool_router] impl EsBaseTools { //--------------------------------------------------------------------------------------------- /// Tool: list indices #[tool( description = "List all available Elasticsearch indices", annotations(title = "List ES indices", read_only_hint = true) )] async fn list_indices( &self, req_ctx: RequestContext<RoleServer>, Parameters(ListIndicesParams { index_pattern }): Parameters<ListIndicesParams>, ) -> Result<CallToolResult, rmcp::Error> { let es_client = self.es_client.get(req_ctx); let response = es_client .cat() .indices(CatIndicesParts::Index(&[&index_pattern])) .h(&["index", "status", "docs.count"]) .format("json") .send() .await; let response: Vec<CatIndexResponse> = read_json(response).await?; Ok(CallToolResult::success(vec![ Content::text(format!("Found {} indices:", response.len())), Content::json(response)?, ])) } //--------------------------------------------------------------------------------------------- /// Tool: get mappings for an index #[tool( description = "Get field mappings for a specific Elasticsearch index", annotations(title = "Get ES index mappings", read_only_hint = true) )] async fn get_mappings( &self, req_ctx: RequestContext<RoleServer>, Parameters(GetMappingsParams { index }): Parameters<GetMappingsParams>, ) -> Result<CallToolResult, rmcp::Error> { let es_client = self.es_client.get(req_ctx); let response = es_client .indices() .get_mapping(IndicesGetMappingParts::Index(&[&index])) .send() .await; let response: MappingResponse = read_json(response).await?; // use the first mapping (we can have many if the name is a wildcard) let mapping = response.values().next().unwrap(); Ok(CallToolResult::success(vec![ Content::text(format!("Mappings for index {index}:")), Content::json(mapping)?, ])) } //--------------------------------------------------------------------------------------------- /// Tool: search an index with the Query DSL /// /// The additional 'fields' parameter helps some LLMs that don't know about the `_source` /// request property to narrow down the data returned and reduce their context size #[tool( description = "Perform an Elasticsearch search with the provided query DSL.", annotations(title = "Elasticsearch search DSL query", read_only_hint = true) )] async fn search( &self, req_ctx: RequestContext<RoleServer>, Parameters(SearchParams { index, fields, query_body, }): Parameters<SearchParams>, ) -> Result<CallToolResult, rmcp::Error> { let es_client = self.es_client.get(req_ctx); let mut query_body = query_body; if let Some(fields) = fields { // Augment _source if it exists if let Some(Value::Array(values)) = query_body.get_mut("_source") { for field in fields.into_iter() { values.push(Value::String(field)) } } else { query_body.insert("_source".to_string(), json!(fields)); } } let response = es_client .search(SearchParts::Index(&[&index])) .body(query_body) .send() .await; let response: SearchResult = read_json(response).await?; let mut results: Vec<Content> = Vec::new(); // Send result stats only if it's not pure aggregation results if response.aggregations.is_empty() || !response.hits.hits.is_empty() { let total = response .hits .total .map(|t| t.value.to_string()) .unwrap_or("unknown".to_string()); results.push(Content::text(format!( "Total results: {}, showing {}.", total, response.hits.hits.len() ))); } // Original prototype sent a separate content for each document, it seems to confuse some LLMs // for hit in &response.hits.hits { // results.push(Content::json(&hit.source)?); // } if !response.hits.hits.is_empty() { let sources = response.hits.hits.iter().map(|hit| &hit.source).collect::<Vec<_>>(); results.push(Content::json(&sources)?); } if !response.aggregations.is_empty() { results.push(Content::text("Aggregations results:")); results.push(Content::json(&response.aggregations)?); } Ok(CallToolResult::success(results)) } //--------------------------------------------------------------------------------------------- /// Tool: ES|QL #[tool( description = "Perform an Elasticsearch ES|QL query.", annotations(title = "Elasticsearch ES|QL query", read_only_hint = true) )] async fn esql( &self, req_ctx: RequestContext<RoleServer>, Parameters(EsqlQueryParams { query }): Parameters<EsqlQueryParams>, ) -> Result<CallToolResult, rmcp::Error> { let es_client = self.es_client.get(req_ctx); let request = EsqlQueryRequest { query }; let response = es_client.esql().query().body(request).send().await; let response: EsqlQueryResponse = read_json(response).await?; // Transform response into an array of objects let mut objects: Vec<Value> = Vec::new(); for row in response.values.into_iter() { let mut obj = Map::new(); for (i, value) in row.into_iter().enumerate() { obj.insert(response.columns[i].name.clone(), value); } objects.push(Value::Object(obj)); } Ok(CallToolResult::success(vec![ Content::text("Results"), Content::json(objects)?, ])) } //--------------------------------------------------------------------------------------------- // Tool: get shard information #[tool( description = "Get shard information for all or specific indices.", annotations(title = "Get ES shard information", read_only_hint = true) )] async fn get_shards( &self, req_ctx: RequestContext<RoleServer>, Parameters(GetShardsParams { index }): Parameters<GetShardsParams>, ) -> Result<CallToolResult, rmcp::Error> { let es_client = self.es_client.get(req_ctx); let indices: [&str; 1]; let parts = match &index { Some(index) => { indices = [index]; CatShardsParts::Index(&indices) } None => CatShardsParts::None, }; let response = es_client .cat() .shards(parts) .format("json") .h(&["index", "shard", "prirep", "state", "docs", "store", "node"]) .send() .await; let response: Vec<CatShardsResponse> = read_json(response).await?; Ok(CallToolResult::success(vec![ Content::text(format!("Found {} shards:", response.len())), Content::json(response)?, ])) } } #[tool_handler] impl ServerHandler for EsBaseTools { fn get_info(&self) -> ServerInfo { ServerInfo { protocol_version: ProtocolVersion::V_2025_03_26, capabilities: ServerCapabilities::builder().enable_tools().build(), server_info: Implementation::from_build_env(), instructions: Some("Provides access to Elasticsearch".to_string()), } } } //------------------------------------------------------------------------------------------------- // Type definitions for ES request/responses (the Rust client doesn't have them yet) and tool responses. //----- Search request #[derive(Serialize, Deserialize)] pub struct SearchResult { pub hits: Hits, #[serde(default)] pub aggregations: IndexMap<String, Value>, } #[derive(Serialize, Deserialize)] pub struct Hits { pub total: Option<TotalHits>, pub hits: Vec<Hit>, } #[derive(Serialize, Deserialize)] pub struct TotalHits { pub value: u64, } #[derive(Serialize, Deserialize)] pub struct Hit { #[serde(rename = "_source")] pub source: Value, } //----- Cat responses #[derive(Serialize, Deserialize)] pub struct CatIndexResponse { pub index: String, pub status: String, #[serde(rename = "docs.count", deserialize_with = "deserialize_number_from_string")] pub doc_count: u64, } #[derive(Serialize, Deserialize)] pub struct CatShardsResponse { pub index: String, #[serde(deserialize_with = "deserialize_number_from_string")] pub shard: usize, pub prirep: String, pub state: String, #[serde(deserialize_with = "deserialize_option_number_from_string")] pub docs: Option<u64>, pub store: Option<String>, pub node: Option<String>, } //----- Index mappings pub type MappingResponse = HashMap<String, Mappings>; #[derive(Serialize, Deserialize)] pub struct Mappings { pub mappings: Mapping, } #[derive(Serialize, Deserialize)] pub struct Mapping { #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")] pub meta: Option<JsonObject>, properties: HashMap<String, MappingProperty>, } #[derive(Serialize, Deserialize)] pub struct MappingProperty { #[serde(rename = "type")] pub type_: String, #[serde(flatten)] pub settings: HashMap<String, serde_json::Value>, } //----- ES|QL #[derive(Serialize, Deserialize)] pub struct EsqlQueryRequest { pub query: String, } #[derive(Serialize, Deserialize)] pub struct Column { pub name: String, #[serde(rename = "type")] pub type_: String, } #[derive(Serialize, Deserialize)] pub struct EsqlQueryResponse { pub is_partial: Option<bool>, pub columns: Vec<Column>, pub values: Vec<Vec<Value>>, } ```