# Directory Structure ``` ├── .buildkite │ ├── diff │ ├── docker.yml │ ├── pipeline.yml │ └── pull-requests.json ├── .dockerignore ├── .env-example ├── .github │ └── workflows │ ├── build.yml │ └── stale.yml ├── .gitignore ├── Cargo.lock ├── Cargo.toml ├── catalog-info.yaml ├── Dockerfile ├── Dockerfile-8000 ├── docs │ └── CONTRIBUTING.md ├── elastic-mcp.json5 ├── LICENSE ├── Makefile ├── NOTICE.txt ├── README.md ├── renovate.json ├── rustfmt.toml ├── scripts │ └── cargo-run.sh ├── src │ ├── bin │ │ ├── elasticsearch-core-mcp-server.rs │ │ └── start_http.rs │ ├── cli.rs │ ├── lib.rs │ ├── protocol │ │ ├── http.rs │ │ ├── mod.rs │ │ └── stdio.rs │ ├── servers │ │ ├── elasticsearch │ │ │ ├── base_tools.rs │ │ │ └── mod.rs │ │ └── mod.rs │ └── utils │ ├── interpolator.rs │ ├── mod.rs │ └── rmcp_ext.rs └── tests └── http_tests.rs ``` # Files -------------------------------------------------------------------------------- /.dockerignore: -------------------------------------------------------------------------------- ``` 1 | docs 2 | target 3 | .idea 4 | .vscode 5 | ``` -------------------------------------------------------------------------------- /.env-example: -------------------------------------------------------------------------------- ``` 1 | # The MCP server looks for `.env` files to populate environment variables that aren't already set 2 | # Copy and edit this file (it's listed in .gitignore) 3 | 4 | ES_URL="http://localhost:9200" 5 | ES_API_KEY="<my-api-key>" 6 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Generated by Cargo 2 | # will have compiled files and executables 3 | debug/ 4 | target/ 5 | 6 | # These are backup files generated by rustfmt 7 | **/*.rs.bk 8 | 9 | # MSVC Windows builds of rustc generate these, which store debugging information 10 | *.pdb 11 | 12 | # Generated by cargo mutants 13 | # Contains mutation testing data 14 | **/mutants.out*/ 15 | 16 | .idea/ 17 | .vscode/ 18 | 19 | .env 20 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Elasticsearch MCP Server 2 | 3 | > [!CAUTION] 4 | > 5 | > **WARNING: this MCP server is EXPERIMENTAL.** 6 | 7 | Connect to your Elasticsearch data directly from any MCP Client using the Model Context Protocol (MCP). 8 | 9 | This server connects agents to your Elasticsearch data using the Model Context Protocol. It allows you to interact with your Elasticsearch indices through natural language conversations. 10 | 11 | ## Available Tools 12 | 13 | * `list_indices`: List all available Elasticsearch indices 14 | * `get_mappings`: Get field mappings for a specific Elasticsearch index 15 | * `search`: Perform an Elasticsearch search with the provided query DSL 16 | * `esql`: Perform an ES|QL query 17 | * `get_shards`: Get shard information for all or specific indices 18 | 19 | ## Prerequisites 20 | 21 | * An Elasticsearch instance 22 | * Elasticsearch authentication credentials (API key or username/password) 23 | * An MCP Client (e.g. [Claude Desktop](https://claude.ai/download), [Goose](https://block.github.io/goose/)) 24 | 25 | **Supported Elasticsearch versions** 26 | 27 | This works with Elasticsearch versions `8.x` and `9.x`. 28 | 29 | ## Installation & Setup 30 | 31 | > [!NOTE] 32 | > 33 | > Versions 0.3.1 and earlier were installed via `npm`. These versions are deprecated and no longer supported. The following instructions only apply to 0.4.0 and later. 34 | > 35 | > To view instructions for versions 0.3.1 and earlier, see the [README for v0.3.1](https://github.com/elastic/mcp-server-elasticsearch/tree/v0.3.1). 36 | 37 | This MCP server is provided as a Docker image at `docker.elastic.co/mcp/elasticsearch` 38 | that supports MCP's stdio, SSE and streamable-HTTP protocols. 39 | 40 | Running this container without any argument will output a usage message: 41 | 42 | ``` 43 | docker run docker.elastic.co/mcp/elasticsearch 44 | ``` 45 | 46 | ``` 47 | Usage: elasticsearch-mcp-server <COMMAND> 48 | 49 | Commands: 50 | stdio Start a stdio server 51 | http Start a streamable-HTTP server with optional SSE support 52 | help Print this message or the help of the given subcommand(s) 53 | 54 | Options: 55 | -h, --help Print help 56 | -V, --version Print version 57 | ``` 58 | 59 | ### Using the stdio protocol 60 | 61 | The MCP server needs environment variables to be set: 62 | 63 | * `ES_URL`: the URL of your Elasticsearch cluster 64 | * For authentication use either an API key or basic authentication: 65 | * API key: `ES_API_KEY` 66 | * Basic auth: `ES_USERNAME` and `ES_PASSWORD` 67 | * Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting 68 | to Elasticsearch. The ability to provide a custom certificate will be added in a later version. 69 | 70 | The MCP server is started in stdio mode with this command: 71 | 72 | ```bash 73 | docker run -i --rm -e ES_URL -e ES_API_KEY docker.elastic.co/mcp/elasticsearch stdio 74 | ``` 75 | 76 | The configuration for Claude Desktop is as follows: 77 | 78 | ```json 79 | { 80 | "mcpServers": { 81 | "elasticsearch-mcp-server": { 82 | "command": "docker", 83 | "args": [ 84 | "run", "-i", "--rm", 85 | "-e", "ES_URL", "-e", "ES_API_KEY", 86 | "docker.elastic.co/mcp/elasticsearch", 87 | "stdio" 88 | ], 89 | "env": { 90 | "ES_URL": "<elasticsearch-cluster-url>", 91 | "ES_API_KEY": "<elasticsearch-API-key>" 92 | } 93 | } 94 | } 95 | } 96 | ``` 97 | 98 | ### Using the streamable-HTTP and SSE protocols 99 | 100 | Note: streamable-HTTP is recommended, as [SSE is deprecated](https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse-deprecated). 101 | 102 | The MCP server needs environment variables to be set: 103 | 104 | * `ES_URL`, the URL of your Elasticsearch cluster 105 | * For authentication use either an API key or basic authentication: 106 | * API key: `ES_API_KEY` 107 | * Basic auth: `ES_USERNAME` and `ES_PASSWORD` 108 | * Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting 109 | to Elasticsearch. The ability to provide a custom certificate will be added in a later version. 110 | 111 | The MCP server is started in http mode with this command: 112 | 113 | ```bash 114 | docker run --rm -e ES_URL -e ES_API_KEY -p 8080:8080 docker.elastic.co/mcp/elasticsearch http 115 | ``` 116 | 117 | If for some reason your execution environment doesn't allow passing parameters to the container, they can be passed 118 | using the `CLI_ARGS` environment variable: `docker run --rm -e ES_URL -e ES_API_KEY -e CLI_ARGS=http -p 8080:8080...` 119 | 120 | The streamable-HTTP endpoint is at `http:<host>:8080/mcp`. There's also a health check at `http:<host>:8080/ping` 121 | 122 | Configuration for Claude Desktop (free edition that only supports the stdio protocol). 123 | 124 | 1. Install `mcp-proxy` (or an equivalent), that will bridge stdio to streamable-http. The executable 125 | will be installed in `~/.local/bin`: 126 | 127 | ```bash 128 | uv tool install mcp-proxy 129 | ``` 130 | 131 | 2. Add this configuration to Claude Desktop: 132 | 133 | ```json 134 | { 135 | "mcpServers": { 136 | "elasticsearch-mcp-server": { 137 | "command": "/<home-directory>/.local/bin/mcp-proxy", 138 | "args": [ 139 | "--transport=streamablehttp", 140 | "--header", "Authorization", "ApiKey <elasticsearch-API-key>", 141 | "http://<mcp-server-host>:<mcp-server-port>/mcp" 142 | ] 143 | } 144 | } 145 | } 146 | ``` 147 | ``` -------------------------------------------------------------------------------- /docs/CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown 1 | # Contributing 2 | 3 | [fork]: https://github.com/elastic/mcp-server-elasticsearch/fork 4 | [pr]: https://github.com/elastic/mcp-server-elasticsearch/compare 5 | [code-of-conduct]: https://www.elastic.co/community/codeofconduct 6 | 7 | Elasticsearch MCP Server is open source, and we love to receive contributions from our community — you! 8 | 9 | There are many ways to contribute, from writing tutorials or blog posts, improving the documentation, submitting bug reports and feature requests or writing code. 10 | 11 | Contributions are [released](https://help.github.com/articles/github-terms-of-service/#6-contributions-under-repository-license) under the [project's license](../LICENSE). 12 | 13 | Please note that this project follows the [Elastic's Open Source Community Code of Conduct][code-of-conduct]. 14 | 15 | ## Setup 16 | 17 | 1. Install Rust (using [rustup](https://www.rust-lang.org/tools/install) is recommended) 18 | 19 | 2. Build the project: 20 | ```sh 21 | cargo build 22 | ``` 23 | 24 | or to build the Docker image, run: 25 | 26 | ```sh 27 | docker build -t mcp/elasticsearch 28 | ``` 29 | 30 | ## Start Elasticsearch 31 | 32 | You can use either: 33 | 34 | 1. **Elastic Cloud** - Use an existing Elasticsearch deployment and your API key 35 | 2. **Local Elasticsearch** - Run Elasticsearch locally using the [start-local](https://www.elastic.co/guide/en/elasticsearch/reference/current/run-elasticsearch-locally.html) script: 36 | 37 | ```bash 38 | curl -fsSL https://elastic.co/start-local | sh 39 | ``` 40 | 41 | This starts Elasticsearch and Kibana with Docker: 42 | - Elasticsearch: <http://localhost:9200> 43 | - Kibana: <http://localhost:5601> 44 | 45 | > [!NOTE] 46 | > The `start-local` setup is for development only. It uses basic authentication and disables HTTPS. 47 | 48 | ## Development Workflow 49 | 50 | 1. [Fork][fork] and clone the repository 51 | 2. Create a new branch: `git checkout -b my-branch-name` 52 | 3. Make your changes and add tests 53 | 4. Fix `cargo clippy` warnings, run `cargo fmt` and `cargo test` 54 | 5. Test locally with the MCP Inspector: 55 | ```bash 56 | npx @modelcontextprotocol/inspector 57 | ``` 58 | 7. [Test with MCP Client](../README.md#installation--setup) 59 | 8. Push to your fork and [submit a pull request][pr] 60 | 61 | ## Best Practices 62 | 63 | - Follow existing code style and patterns 64 | - Write [conventional commits](https://www.conventionalcommits.org/) 65 | - Include tests for your changes 66 | - Keep PRs focused on a single concern 67 | - Update documentation as needed 68 | 69 | ## Getting Help 70 | 71 | - Open an issue in the repository 72 | - Ask questions on [discuss.elastic.co](https://discuss.elastic.co/) 73 | 74 | ## Resources 75 | 76 | - [How to Contribute to Open Source](https://opensource.guide/how-to-contribute/) 77 | - [Using Pull Requests](https://help.github.com/articles/about-pull-requests/) 78 | - [Elastic Code of Conduct][code-of-conduct] 79 | ``` -------------------------------------------------------------------------------- /rustfmt.toml: -------------------------------------------------------------------------------- ```toml 1 | max_width = 120 2 | ``` -------------------------------------------------------------------------------- /NOTICE.txt: -------------------------------------------------------------------------------- ``` 1 | Elasticsearch MCP Server 2 | Copyright 2025 Elasticsearch B.V. 3 | ``` -------------------------------------------------------------------------------- /scripts/cargo-run.sh: -------------------------------------------------------------------------------- ```bash 1 | #!/usr/bin/env bash 2 | 3 | cd "$(dirname $0)"/.. 4 | exec cargo run "$@" 5 | ``` -------------------------------------------------------------------------------- /renovate.json: -------------------------------------------------------------------------------- ```json 1 | { 2 | "$schema": "https://docs.renovatebot.com/renovate-schema.json", 3 | "extends": [ 4 | "local>elastic/renovate-config" 5 | ], 6 | "schedule": [ 7 | "after 1am on monday" 8 | ] 9 | } 10 | ``` -------------------------------------------------------------------------------- /.buildkite/docker.yml: -------------------------------------------------------------------------------- ```yaml 1 | --- 2 | # $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json 3 | steps: 4 | - label: "Build and publish Docker image" 5 | command: "make docker-push-elastic" 6 | agents: 7 | provider: "gcp" 8 | ``` -------------------------------------------------------------------------------- /.buildkite/pull-requests.json: -------------------------------------------------------------------------------- ```json 1 | { 2 | "jobs": [ 3 | { 4 | "enabled": true, 5 | "pipelineSlug": "mcp-server-elasticsearch", 6 | "allow_org_users": true, 7 | "allowed_repo_permissions": [ 8 | "admin", 9 | "write" 10 | ], 11 | "allowed_list": [], 12 | "set_commit_status": true, 13 | "commit_status_context": "buildkite/mcp-server-elasticsearch", 14 | "build_on_commit": false, 15 | "build_on_comment": true, 16 | "trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))", 17 | "always_trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))", 18 | "skip_ci_labels": [ 19 | "skip-ci" 20 | ], 21 | "skip_target_branches": [], 22 | "always_require_ci_on_changed": [] 23 | } 24 | ] 25 | } 26 | ``` -------------------------------------------------------------------------------- /src/protocol/mod.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | pub mod http; 19 | pub mod stdio; 20 | ``` -------------------------------------------------------------------------------- /src/protocol/stdio.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | //! Empty for now, stdio is handled in `lib.rs`. 19 | ``` -------------------------------------------------------------------------------- /.github/workflows/stale.yml: -------------------------------------------------------------------------------- ```yaml 1 | --- 2 | name: "Close stale issues and PRs" 3 | on: 4 | schedule: 5 | - cron: "30 1 * * *" 6 | 7 | jobs: 8 | stale: 9 | runs-on: ubuntu-latest 10 | steps: 11 | - uses: actions/stale@5bef64f19d7facfb25b37b414482c7164d639639 # v9 12 | with: 13 | stale-issue-label: stale 14 | stale-pr-label: stale 15 | days-before-stale: 60 16 | days-before-close: 14 17 | exempt-issue-labels: "good first issue" 18 | close-issue-label: closed-stale 19 | close-pr-label: closed-stale 20 | stale-issue-message: "This issue is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days." 21 | stale-pr-message: "This pull request is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days." 22 | ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | # Copyright Elasticsearch B.V. and contributors 2 | # SPDX-License-Identifier: Apache-2.0 3 | 4 | # To create a multi-arch image, run: 5 | # docker buildx build --platform linux/amd64,linux/arm64 --tag elasticsearch-core-mcp-server . 6 | 7 | FROM rust:1.89@sha256:c50cd6e20c46b0b36730b5eb27289744e4bb8f32abc90d8c64ca09decf4f55ba AS builder 8 | 9 | WORKDIR /app 10 | 11 | COPY Cargo.toml Cargo.lock ./ 12 | 13 | # Cache dependencies 14 | RUN mkdir -p ./src/bin && \ 15 | echo "pub fn main() {}" > ./src/bin/elasticsearch-core-mcp-server.rs && \ 16 | cargo build --release 17 | 18 | COPY src ./src/ 19 | 20 | RUN cargo build --release 21 | 22 | #-------------------------------------------------------------------------------------------------- 23 | 24 | FROM cgr.dev/chainguard/wolfi-base:latest 25 | 26 | COPY --from=builder /app/target/release/elasticsearch-core-mcp-server /usr/local/bin/elasticsearch-core-mcp-server 27 | 28 | ENV CONTAINER_MODE=true 29 | 30 | EXPOSE 8080/tcp 31 | ENTRYPOINT ["/usr/local/bin/elasticsearch-core-mcp-server"] 32 | ``` -------------------------------------------------------------------------------- /.buildkite/pipeline.yml: -------------------------------------------------------------------------------- ```yaml 1 | --- 2 | # $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json 3 | steps: 4 | - label: "Triggering pipelines" 5 | plugins: 6 | monorepo-diff#v1.4.0: 7 | diff: ".buildkite/diff ${BUILDKITE_COMMIT}" 8 | wait: true 9 | watch: 10 | # if our Renovate configuration is amended, then make sure we have well-formed config 11 | # for more info, see https://docs.elastic.dev/plat-prod-team/service-catalogue/renovate/testing-renovate-changes 12 | - path: "renovate.json" 13 | config: 14 | label: "Verify Renovate configuration" 15 | command: "renovate-config-validator" 16 | agents: 17 | image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681" 18 | # if our catalog-info.yaml is changed, make sure it's well-formed according to our internal standards as well as Backstage's validation 19 | - path: "catalog-info.yaml" 20 | config: 21 | command: "/agent/check-catalog-info.sh" 22 | agents: 23 | image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681" 24 | ``` -------------------------------------------------------------------------------- /src/bin/start_http.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use elasticsearch_core_mcp_server::cli::HttpCommand; 19 | use elasticsearch_core_mcp_server::run_http; 20 | 21 | /// Start the MCP http server with the local configuration. 22 | /// Useful for debugging from the IDE. 23 | #[tokio::main] 24 | pub async fn main() -> anyhow::Result<()> { 25 | println!("Current directory: {:?}", std::env::current_dir()?); 26 | 27 | run_http(HttpCommand { 28 | config: Some("elastic-mcp.json5".parse()?), 29 | address: None, 30 | sse: true, 31 | }, 32 | false) 33 | .await?; 34 | 35 | Ok(()) 36 | } 37 | ``` -------------------------------------------------------------------------------- /src/utils/mod.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use serde::{Deserialize, Deserializer}; 19 | 20 | pub mod interpolator; 21 | pub mod rmcp_ext; 22 | 23 | /// Deserialize a string, and return `None` if it's empty. Useful for configuration fields like 24 | /// `"foo": "${SOME_ENV_VAR:}"` that uses an env var if present without failing if missing. 25 | pub fn none_if_empty_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> { 26 | let s: Option<String> = Deserialize::deserialize(deserializer)?; 27 | match s { 28 | Some(s) if s.is_empty() => Ok(None), 29 | _ => Ok(s), 30 | } 31 | } 32 | ``` -------------------------------------------------------------------------------- /src/utils/rmcp_ext.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | //! Various extensions and utilities for the Rust MCP sdk. 19 | 20 | use rmcp::{RoleServer, Service}; 21 | use std::sync::Arc; 22 | 23 | /// A factory to create server (`Service<RoleServer>`) instances. 24 | pub struct ServerProvider<S: Service<RoleServer>>(pub Arc<dyn Fn() -> S + Send + Sync>); 25 | 26 | impl<S: Service<RoleServer>, F: Fn() -> S + Send + Sync + 'static> From<F> for ServerProvider<S> { 27 | fn from(value: F) -> Self { 28 | ServerProvider(Arc::new(value)) 29 | } 30 | } 31 | 32 | impl<S: Service<RoleServer>> From<Arc<dyn Fn() -> S + Send + Sync>> for ServerProvider<S> { 33 | fn from(value: Arc<dyn Fn() -> S + Send + Sync>) -> Self { 34 | ServerProvider(value) 35 | } 36 | } 37 | ``` -------------------------------------------------------------------------------- /src/servers/mod.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use serde::{Deserialize, Serialize}; 19 | 20 | pub mod elasticsearch; 21 | 22 | /// Inclusion or exclusion list. 23 | #[derive(Debug, Serialize, Deserialize)] 24 | #[serde(rename_all = "snake_case")] 25 | pub enum IncludeExclude { 26 | Include(Vec<String>), 27 | Exclude(Vec<String>), 28 | } 29 | 30 | impl IncludeExclude { 31 | pub fn is_included(&self, name: &str) -> bool { 32 | use IncludeExclude::*; 33 | match self { 34 | Include(includes) => includes.iter().map(|s| s.as_str()).any(|s| s == name), 35 | Exclude(excludes) => excludes.iter().map(|s| s.as_str()).all(|s| s != name), 36 | } 37 | } 38 | 39 | pub fn filter(&self, tools: &mut Vec<rmcp::model::Tool>) { 40 | tools.retain(|t| self.is_included(&t.name)) 41 | } 42 | } 43 | ``` -------------------------------------------------------------------------------- /Cargo.toml: -------------------------------------------------------------------------------- ```toml 1 | [package] 2 | name = "elasticsearch-core-mcp-server" 3 | version = "0.4.5" 4 | edition = "2024" 5 | authors = ["Elastic.co"] 6 | license-file = "LICENSE" 7 | description = "MCP server for core Elastisearch features" 8 | homepage = "https://github.com/elastic/mcp-server-elasticsearch" 9 | repository = "https://github.com/elastic/mcp-server-elasticsearch" 10 | 11 | default-run = "elasticsearch-core-mcp-server" 12 | 13 | [dependencies] 14 | # Base stuff 15 | anyhow = "1.0" 16 | futures = "0.3" 17 | indexmap = { version = "2", features = ["serde"] } 18 | itertools = "0.12" 19 | thiserror = "2" 20 | 21 | serde = { version = "1.0", features = ["derive"] } 22 | serde_json = "1" 23 | 24 | # CLI, config 25 | clap = { version = "4", features = ["derive", "env"] } 26 | dotenvy = "0.15" 27 | serde-aux = "4" 28 | serde_json5 = "0.2" 29 | 30 | # Logging 31 | tracing = "0.1" 32 | tracing-subscriber = { version = "0.3", features = [ 33 | "env-filter", 34 | "std", 35 | "fmt", 36 | ]} 37 | 38 | elasticsearch = { version = "9.0.0-alpha.1", git = "https://github.com/elastic/elasticsearch-rs", branch = "new-with-creds" } 39 | 40 | # Async and http 41 | tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-std", "signal", "process"] } 42 | tokio-util = "0.7" 43 | axum = "0.8" 44 | http = "1.3.1" 45 | 46 | # Schemars: keep in sync with rmcp 47 | schemars = { version = "0.8", features = ["chrono"] } 48 | 49 | reqwest = "0.12" 50 | futures-util = "0.3" 51 | 52 | # MCP rust sdk: main branch, 2025-06-26 53 | [dependencies.rmcp] 54 | features = [ 55 | "server", "auth", 56 | "transport-sse-server", "transport-streamable-http-server", 57 | "transport-io", # stdio 58 | "client", "transport-sse-client", "transport-streamable-http-client", "transport-child-process", 59 | ] 60 | # Keep rev in sync with rmcp-macros below 61 | version = "0.2.1" 62 | 63 | [dependencies.rmcp-macros] 64 | version = "0.2.1" 65 | 66 | [dev-dependencies] 67 | sse-stream = "0.2" 68 | 69 | [profile.release] 70 | codegen-units = 1 71 | strip = true 72 | lto = true 73 | opt-level = "z" 74 | # Note: do not add `panic = "abort"` since tower-http has a panic-handling middleware 75 | ``` -------------------------------------------------------------------------------- /elastic-mcp.json5: -------------------------------------------------------------------------------- ``` 1 | 2 | { 3 | // Configure the target Elasticsearch server 4 | "elasticsearch": { 5 | "url": "${ES_URL}", 6 | "api_key": "${ES_API_KEY:}", 7 | "username": "${ES_USERNAME:}", 8 | "password": "${ES_PASSWORD:}", 9 | "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}", 10 | 11 | /* WIP 12 | "tools": { 13 | // Exclude the "search" builtin tool as it's too broad 14 | "exclude": ["search"], 15 | 16 | // Custom tools 17 | "custom": { 18 | // An ES|QL query 19 | "add-42": { 20 | "type": "esql", 21 | "description": "Adds 42 to the input value", 22 | "query": "row value = ?value | eval result = value + 42 | keep result", 23 | "parameters": { 24 | "value": { 25 | "title": "The value", 26 | "type": "number" 27 | } 28 | } 29 | }, 30 | // A stored search template 31 | "a-stored-template": { 32 | "type": "search_template", 33 | "description": "This is the description for this stored template", 34 | "template_id": "my-template", 35 | "parameters": { 36 | "param_1": { 37 | "title": "The first parameter", 38 | "description": "Use this parameter to blah blah and blah", 39 | "type": "string" 40 | } 41 | } 42 | }, 43 | // An inline search template 44 | "an-inline-template": { 45 | "type": "search_template", 46 | "description": "This is the description for this inline template", 47 | "template": { 48 | "query": { 49 | "term": { 50 | "some-field": "{{param_1}}" 51 | } 52 | } 53 | }, 54 | "parameters": { 55 | "param_1": { 56 | "title": "The first parameter", 57 | "description": "Use this parameter to blah blah and blah", 58 | "type": "string" 59 | } 60 | } 61 | } 62 | } 63 | } 64 | */ 65 | } 66 | } 67 | ``` -------------------------------------------------------------------------------- /src/bin/elasticsearch-core-mcp-server.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use std::io::ErrorKind; 19 | use clap::Parser; 20 | use elasticsearch_core_mcp_server::cli::Cli; 21 | use tracing_subscriber::EnvFilter; 22 | // To test with stdio, use npx @modelcontextprotocol/inspector cargo run -p elastic-mcp 23 | 24 | #[tokio::main] 25 | async fn main() -> anyhow::Result<()> { 26 | 27 | // Also accept .env files 28 | match dotenvy::dotenv() { 29 | Err(dotenvy::Error::Io(io_err)) if io_err.kind() == ErrorKind::NotFound => {} 30 | Err(err) => return Err(err)?, 31 | Ok(_) => {} 32 | } 33 | 34 | let env_args = std::env::vars().find(|(k, _v)| k == "CLI_ARGS").map(|(_k, v)| v); 35 | 36 | let cli = if let Some(env_args) = env_args { 37 | // Concatenate arg[0] with the ARGS value split on whitespaces 38 | // Note: we don't handle shell-style string quoting and character escaping 39 | let arg0 = std::env::args().next().unwrap(); 40 | let mut args = vec![arg0.as_str()]; 41 | args.extend(env_args.split_whitespace()); 42 | 43 | Cli::parse_from(args) 44 | } else { 45 | Cli::parse() 46 | }; 47 | 48 | // Initialize the tracing subscriber with file and stdout logging 49 | tracing_subscriber::fmt() 50 | .with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())) 51 | .with_writer(std::io::stderr) 52 | .with_ansi(false) 53 | .init(); 54 | 55 | tracing::info!("Elasticsearch MCP server, version {}", env!("CARGO_PKG_VERSION")); 56 | 57 | cli.run().await 58 | } 59 | ``` -------------------------------------------------------------------------------- /catalog-info.yaml: -------------------------------------------------------------------------------- ```yaml 1 | --- 2 | # yaml-language-server: $schema=https://json.schemastore.org/catalog-info.json 3 | apiVersion: backstage.io/v1alpha1 4 | kind: Component 5 | metadata: 6 | name: mcp-server-elasticsearch 7 | spec: 8 | type: library 9 | owner: group:devtools-team 10 | lifecycle: beta 11 | 12 | --- 13 | # yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json 14 | apiVersion: backstage.io/v1alpha1 15 | kind: Resource 16 | metadata: 17 | name: buildkite-pipeline-mcp-server-elasticsearch 18 | description: Buildkite Pipeline for mcp-server-elasticsearch 19 | links: 20 | - title: Pipeline 21 | url: https://buildkite.com/elastic/mcp-server-elasticsearch 22 | 23 | spec: 24 | type: buildkite-pipeline 25 | owner: group:devtools-team 26 | system: buildkite 27 | implementation: 28 | apiVersion: buildkite.elastic.dev/v1 29 | kind: Pipeline 30 | metadata: 31 | name: mcp-server-elasticsearch 32 | description: Run checks for the mcp-server-elasticsearch package 33 | spec: 34 | repository: elastic/mcp-server-elasticsearch 35 | pipeline_file: ".buildkite/pipeline.yml" 36 | teams: 37 | devtools-team: 38 | access_level: MANAGE_BUILD_AND_READ 39 | everyone: 40 | access_level: READ_ONLY 41 | 42 | --- 43 | # yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json 44 | apiVersion: backstage.io/v1alpha1 45 | kind: Resource 46 | metadata: 47 | name: mcp-server-elasticsearch-docker 48 | description: Build and publish Docker images for mcp-server-elasticsearch 49 | spec: 50 | type: buildkite-pipeline 51 | owner: group:devtools-team 52 | system: buildkite 53 | implementation: 54 | apiVersion: buildkite.elastic.dev/v1 55 | kind: Pipeline 56 | metadata: 57 | name: mcp-server-elasticsearch-docker 58 | description: Build and publish Docker images for mcp-server-elasticsearch 59 | spec: 60 | repository: elastic/mcp-server-elasticsearch 61 | pipeline_file: ".buildkite/docker.yml" 62 | teams: 63 | devtools-team: 64 | access_level: MANAGE_BUILD_AND_READ 65 | everyone: 66 | access_level: READ_ONLY 67 | provider_settings: 68 | build_pull_requests: false 69 | build_branches: false 70 | build_tags: true 71 | cancel_intermediate_builds: true 72 | ``` -------------------------------------------------------------------------------- /.github/workflows/build.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: Build Rust binaries 2 | on: 3 | release: 4 | types: [published] 5 | workflow_dispatch: 6 | inputs: 7 | version: 8 | description: Version to build (e.g. v0.4.1) 9 | required: true 10 | type: string 11 | 12 | jobs: 13 | build-binary: 14 | runs-on: ${{ matrix.target.runner }} 15 | permissions: 16 | contents: write 17 | id-token: write 18 | strategy: 19 | fail-fast: false 20 | matrix: 21 | target: 22 | - name: linux-x86_64 23 | runner: ubuntu-latest 24 | ext: "" 25 | target: x86_64-unknown-linux-gnu 26 | - name: windows-x86_64 27 | runner: windows-latest 28 | ext: ".exe" 29 | target: x86_64-pc-windows-msvc 30 | - name: macos-x86_64 31 | runner: macos-latest 32 | ext: "" 33 | target: x86_64-apple-darwin 34 | - name: linux-arm64 35 | runner: ubuntu-latest 36 | ext: "" 37 | target: aarch64-unknown-linux-gnu 38 | - name: windows-arm64 39 | runner: windows-latest 40 | ext: ".exe" 41 | target: aarch64-pc-windows-msvc 42 | - name: macos-arm64 43 | runner: macos-latest 44 | ext: "" 45 | target: aarch64-apple-darwin 46 | steps: 47 | - name: Get release tag value 48 | id: version-tag 49 | shell: bash 50 | run: | 51 | if [ -n "${{ inputs.version }}" ]; then 52 | echo "ref=${{ inputs.version }}" >> $GITHUB_OUTPUT 53 | else 54 | ref=$(echo "$GITHUB_REF" | cut -d '/' -f3) 55 | echo "ref=$ref" >> $GITHUB_OUTPUT 56 | fi 57 | - name: Checkout 58 | uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 59 | with: 60 | path: checkout-main 61 | - name: Checkout 62 | uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 63 | with: 64 | ref: ${{ steps.version-tag.outputs.ref }} 65 | path: checkout-tag 66 | - name: Add target config for arm64 on Linux 67 | if: ${{ matrix.target.name == 'linux-arm64' }} 68 | run: | 69 | sudo apt update && sudo apt install -y gcc-aarch64-linux-gnu 70 | mkdir -p checkout-tag/.cargo 71 | echo '[target.aarch64-unknown-linux-gnu]' >> checkout-tag/.cargo/config 72 | echo 'linker = "aarch64-linux-gnu-gcc"' >> checkout-tag/.cargo/config 73 | - name: Build binary 74 | uses: houseabsolute/actions-rust-cross@9a1618ffb70e8374ab5f48fcccea3ebeacf57971 # v1.0.5 75 | with: 76 | command: build 77 | target: ${{ matrix.target.target }} 78 | args: "--locked --release" 79 | working-directory: ${{ github.workspace }}/checkout-tag 80 | - name: Upload binaries to release 81 | uses: svenstaro/upload-release-action@v2 82 | with: 83 | tag: ${{ steps.version-tag.outputs.ref }} 84 | file: checkout-tag/target/${{ matrix.target.target }}/release/elasticsearch-core-mcp-server${{ matrix.target.ext }} 85 | asset_name: elasticsearch-core-mcp-server-${{ steps.version-tag.outputs.ref }}-${{ matrix.target.name }}${{ matrix.target.ext }} 86 | overwrite: true 87 | ``` -------------------------------------------------------------------------------- /src/cli.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use crate::servers::elasticsearch; 19 | use clap::Parser; 20 | use clap::{Args, Subcommand}; 21 | use serde::{Deserialize, Serialize}; 22 | use std::collections::HashMap; 23 | use std::path::PathBuf; 24 | 25 | /// Elastic MCP server 26 | #[derive(Debug, Parser)] 27 | #[command(version)] 28 | pub struct Cli { 29 | /// Container mode: change default http address, rewrite localhost to the host's address 30 | #[clap(global=true, long, env = "CONTAINER_MODE")] 31 | pub container_mode: bool, 32 | 33 | #[clap(subcommand)] 34 | pub command: Command, 35 | } 36 | 37 | #[derive(Debug, Subcommand)] 38 | pub enum Command { 39 | Stdio(StdioCommand), 40 | Http(HttpCommand), 41 | } 42 | 43 | /// Start a streamable-HTTP server with optional SSE support 44 | #[derive(Debug, Args)] 45 | pub struct HttpCommand { 46 | /// Config file 47 | #[clap(short, long)] 48 | pub config: Option<PathBuf>, 49 | 50 | /// Address to listen to [default: 127.0.0.1:8080] 51 | #[clap(long, value_name = "IP_ADDRESS:PORT", env = "HTTP_ADDRESS")] 52 | pub address: Option<std::net::SocketAddr>, 53 | 54 | /// Also start an SSE server on '/sse' 55 | #[clap(long)] 56 | pub sse: bool, 57 | } 58 | 59 | /// Start an stdio server 60 | #[derive(Debug, Args)] 61 | pub struct StdioCommand { 62 | /// Config file 63 | #[clap(short, long)] 64 | pub config: Option<PathBuf>, 65 | } 66 | 67 | //--------------------------------------------------------------- 68 | 69 | // Reference material: 70 | // https://modelcontextprotocol.io/quickstart/user 71 | // https://code.visualstudio.com/docs/copilot/chat/mcp-servers 72 | // https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line-mcp-configuration.html 73 | // https://github.com/landicefu/mcp-client-configuration-server 74 | 75 | #[derive(Debug, Serialize, Deserialize)] 76 | #[serde(rename_all = "camelCase")] 77 | pub struct Stdio { 78 | /// Command to run (e.g. "npx", "docker") 79 | pub command: String, 80 | 81 | /// Command arguments 82 | pub args: Vec<String>, 83 | 84 | /// Environment variables 85 | #[serde(default)] 86 | pub env: HashMap<String, String>, 87 | } 88 | 89 | #[derive(Debug, Serialize, Deserialize)] 90 | #[serde(rename_all = "camelCase")] 91 | pub struct Http { 92 | /// URL of the server 93 | pub url: String, 94 | 95 | /// HTTP headers to send with the request 96 | #[serde(default)] 97 | pub headers: HashMap<String, String>, 98 | } 99 | 100 | #[derive(Debug, Serialize, Deserialize)] 101 | #[serde(rename_all = "kebab-case")] 102 | #[serde(tag = "type")] 103 | pub enum McpServer { 104 | //Builtin(BuiltinConfig), 105 | Sse(Http), 106 | StreamableHttp(Http), 107 | Stdio(Stdio), 108 | } 109 | 110 | #[derive(Debug, Serialize, Deserialize)] 111 | #[serde(rename_all = "camelCase")] 112 | pub struct Configuration { 113 | pub elasticsearch: elasticsearch::ElasticsearchMcpConfig, 114 | #[serde(default)] 115 | pub mcp_servers: HashMap<String, McpServer>, 116 | } 117 | ``` -------------------------------------------------------------------------------- /src/utils/interpolator.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | //! Simple string interpolator to inject environment variables in the configuration file. 19 | use thiserror::Error; 20 | 21 | #[derive(Error, Debug)] 22 | #[error("Invalid configuration template: {reason} at {line}:{char}")] 23 | pub struct InterpolationError { 24 | pub reason: String, 25 | pub line: usize, 26 | pub char: usize, 27 | } 28 | 29 | pub fn interpolate_from_env(s: String) -> Result<String, InterpolationError> { 30 | interpolate(s, |name| std::env::var(name).ok()) 31 | } 32 | 33 | const OPEN: &str = "${"; 34 | const OPEN_LEN: usize = OPEN.len(); 35 | const CLOSE: &str = "}"; 36 | const CLOSE_LEN: usize = CLOSE.len(); 37 | 38 | /// Simple string interpolation using the `${name}` and `${name:default_value}` syntax. 39 | pub fn interpolate(s: String, lookup: impl Fn(&str) -> Option<String>) -> Result<String, InterpolationError> { 40 | if !s.contains(OPEN) { 41 | return Ok(s); 42 | } 43 | 44 | let mut result: String = String::new(); 45 | 46 | for (line_no, mut line) in s.lines().enumerate() { 47 | if line_no > 0 { 48 | result.push('\n'); 49 | } 50 | let mut char_no = 0; 51 | 52 | let err = |char_no: usize, msg: String| InterpolationError { 53 | reason: msg, 54 | line: line_no + 1, // editors (and humans) are 1-based 55 | char: char_no, 56 | }; 57 | 58 | while let Some(pos) = line.find(OPEN) { 59 | // Push text before the opening brace 60 | result.push_str(&line[..pos]); 61 | 62 | char_no += pos + OPEN_LEN; 63 | line = &line[pos + OPEN_LEN..]; 64 | 65 | if let Some(pos) = line.find(CLOSE) { 66 | let expr = &line[..pos]; 67 | let value = if let Some((name, default)) = expr.split_once(':') { 68 | lookup(name).unwrap_or(default.to_string()) 69 | } else { 70 | lookup(expr).ok_or_else(|| err(char_no, format!("env variable '{expr}' not defined")))? 71 | }; 72 | result.push_str(&value); 73 | 74 | char_no += expr.len() + CLOSE_LEN; 75 | line = &line[expr.len() + CLOSE_LEN..]; 76 | } else { 77 | return Err(err(char_no, "missing closing braces".to_string())); 78 | } 79 | } 80 | result.push_str(line); 81 | } 82 | 83 | Ok(result) 84 | } 85 | 86 | #[cfg(test)] 87 | mod tests { 88 | use super::*; 89 | 90 | fn expand(name: &str) -> Result<String, InterpolationError> { 91 | let lookup = |s: &str| match s { 92 | "foo" => Some("foo_value".to_string()), 93 | "bar" => Some("bar_value".to_string()), 94 | _ => None, 95 | }; 96 | 97 | interpolate(name.to_string(), lookup) 98 | } 99 | 100 | #[test] 101 | fn good_extrapolation() -> anyhow::Result<()> { 102 | assert_eq!("012345678", expand("012345678")?); 103 | assert_eq!("foo_value01234", expand("${foo}01234")?); 104 | assert_eq!("foo_value01234\n1234bar_value", expand("${foo}01234\n1234${bar}")?); 105 | assert_eq!("foo_value01234bar_value", expand("${foo}01234${bar}")?); 106 | assert_eq!("_01_foo_value01234bar_value567", expand("_01_${foo}01234${bar}567")?); 107 | Ok(()) 108 | } 109 | 110 | #[test] 111 | fn failed_extrapolation() { 112 | assert!(expand("${foo01234").is_err()); 113 | assert!(expand("${foo}01234${bar").is_err()); 114 | assert!(expand("${baz}01234").is_err()); 115 | } 116 | } 117 | ``` -------------------------------------------------------------------------------- /src/lib.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | pub mod cli; 19 | mod protocol; 20 | mod servers; 21 | mod utils; 22 | 23 | use crate::cli::{Cli, Command, Configuration, HttpCommand, StdioCommand}; 24 | use crate::protocol::http::{HttpProtocol, HttpServerConfig}; 25 | use crate::servers::elasticsearch; 26 | use crate::utils::interpolator; 27 | use rmcp::transport::stdio; 28 | use rmcp::transport::streamable_http_server::session::never::NeverSessionManager; 29 | use rmcp::{RoleServer, Service, ServiceExt}; 30 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 31 | use std::path::PathBuf; 32 | use std::sync::Arc; 33 | use tokio::select; 34 | use tokio_util::sync::CancellationToken; 35 | 36 | impl Cli { 37 | pub async fn run(self) -> anyhow::Result<()> { 38 | match self.command { 39 | Command::Stdio(cmd) => run_stdio(cmd, self.container_mode).await, 40 | Command::Http(cmd) => run_http(cmd, self.container_mode).await, 41 | } 42 | } 43 | } 44 | 45 | pub async fn run_stdio(cmd: StdioCommand, container_mode: bool) -> anyhow::Result<()> { 46 | tracing::info!("Starting stdio server"); 47 | let handler = setup_services(&cmd.config, container_mode).await?; 48 | let service = handler.serve(stdio()).await.inspect_err(|e| { 49 | tracing::error!("serving error: {:?}", e); 50 | })?; 51 | 52 | select! { 53 | _ = service.waiting() => {}, 54 | _ = tokio::signal::ctrl_c() => {}, 55 | } 56 | 57 | Ok(()) 58 | } 59 | 60 | pub async fn run_http(cmd: HttpCommand, container_mode: bool) -> anyhow::Result<()> { 61 | let handler = setup_services(&cmd.config, container_mode).await?; 62 | let server_provider = move || handler.clone(); 63 | let address: SocketAddr = if let Some(addr) = cmd.address { 64 | addr 65 | } else if container_mode { 66 | SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8080) 67 | } else { 68 | SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080) 69 | }; 70 | 71 | let ct = HttpProtocol::serve_with_config( 72 | server_provider, 73 | HttpServerConfig { 74 | bind: address, 75 | ct: CancellationToken::new(), 76 | // streaming http: 77 | keep_alive: None, 78 | stateful_mode: false, 79 | session_manager: Arc::new(NeverSessionManager::default()), 80 | }, 81 | ) 82 | .await?; 83 | 84 | tracing::info!("Starting http server at address {}", address); 85 | 86 | tokio::signal::ctrl_c().await?; 87 | ct.cancel(); 88 | Ok(()) 89 | } 90 | 91 | pub async fn setup_services(config: &Option<PathBuf>, container_mode: bool) -> anyhow::Result<impl Service<RoleServer> + Clone> { 92 | // Read config file and expand variables 93 | 94 | let config = if let Some(path) = config { 95 | std::fs::read_to_string(path)? 96 | } else { 97 | // Built-in default configuration, based on env variables. 98 | r#"{ 99 | "elasticsearch": { 100 | "url": "${ES_URL}", 101 | "api_key": "${ES_API_KEY:}", 102 | "username": "${ES_USERNAME:}", 103 | "password": "${ES_PASSWORD:}", 104 | "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}" 105 | } 106 | }"# 107 | .to_string() 108 | }; 109 | 110 | // Expand environment variables in the config file 111 | let config = interpolator::interpolate_from_env(config)?; 112 | 113 | // JSON5 adds comments and multiline strings (useful for ES|QL) to JSON 114 | let config: Configuration = match serde_json5::from_str(&config) { 115 | Ok(c) => c, 116 | Err(serde_json5::Error::Message { msg, location }) if location.is_some() => { 117 | let location = location.unwrap(); 118 | let line = location.line; 119 | let column = location.column; 120 | anyhow::bail!("Failed to parse config: {msg}, at line {line} column {column}"); 121 | } 122 | Err(err) => return Err(err)?, 123 | }; 124 | 125 | let handler = elasticsearch::ElasticsearchMcp::new_with_config(config.elasticsearch, container_mode)?; 126 | Ok(handler) 127 | } 128 | ``` -------------------------------------------------------------------------------- /src/protocol/http.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | //! Implementation of HTTP protocols 19 | 20 | use crate::utils::rmcp_ext::ServerProvider; 21 | use axum::Router; 22 | use axum::http::StatusCode; 23 | use axum::routing::get; 24 | use rmcp::transport::sse_server::SseServerConfig; 25 | use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; 26 | use rmcp::transport::streamable_http_server::{SessionManager, StreamableHttpServerConfig}; 27 | use rmcp::transport::{SseServer, StreamableHttpService}; 28 | use rmcp::{RoleServer, Service}; 29 | use std::net::SocketAddr; 30 | use std::sync::Arc; 31 | use std::time::Duration; 32 | use tokio_util::sync::CancellationToken; 33 | use tracing::Instrument; 34 | 35 | /// Configuration for an HTTP MCP server 36 | pub struct HttpServerConfig<M: SessionManager = LocalSessionManager> { 37 | /// TCP address to bind to 38 | pub bind: SocketAddr, 39 | 40 | /// Parent cancellation token. `serve_with_config` will return a child token 41 | pub ct: CancellationToken, 42 | 43 | /// Streamable http server option 44 | pub keep_alive: Option<Duration>, 45 | 46 | /// Streamable http server option 47 | pub stateful_mode: bool, 48 | 49 | /// Streamable http server option 50 | pub session_manager: Arc<M>, 51 | } 52 | 53 | /// An HTTP MCP server that supports both SSE and streamable HTTP. 54 | pub struct HttpProtocol {} 55 | 56 | impl HttpProtocol { 57 | pub async fn serve_with_config<S: Service<RoleServer>, M: SessionManager>( 58 | server_provider: impl Into<ServerProvider<S>>, 59 | config: HttpServerConfig<M>, 60 | ) -> std::io::Result<CancellationToken> { 61 | let server_provider = server_provider.into().0; 62 | 63 | let ct = config.ct.child_token(); 64 | 65 | // Create a streamable http router 66 | let sh_router = { 67 | let sh_config = StreamableHttpServerConfig { 68 | sse_keep_alive: config.keep_alive, 69 | stateful_mode: config.stateful_mode, 70 | }; 71 | 72 | let server_provider = server_provider.clone(); 73 | // TODO: internally, new() wraps the server provider closure with an Arc. We can avoid 74 | // "double-Arc" by having 75 | let sh_service = 76 | StreamableHttpService::new(move || Ok(server_provider()), config.session_manager, sh_config); 77 | Router::new().route_service("/", sh_service) 78 | }; 79 | 80 | // Create an SSE router 81 | let sse_router = { 82 | let sse_config = SseServerConfig { 83 | bind: config.bind, 84 | // SSE server will create a child cancellation token for every transport that is created 85 | // (see with_service() below) 86 | ct: ct.clone(), 87 | sse_keep_alive: config.keep_alive, 88 | sse_path: "/".to_string(), 89 | post_path: "/message".to_string(), 90 | }; 91 | let (sse_server, sse_router) = SseServer::new(sse_config); 92 | let _sse_ct = sse_server.with_service(move || server_provider()); 93 | 94 | sse_router 95 | }; 96 | 97 | // Health and readiness 98 | // See https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/ 99 | let health_router = { 100 | Router::new() 101 | // We may introduce a startup probe if we need to fetch/cache remote resources 102 | // during initialization 103 | // Ready: once we have the tool list we can process incoming requests 104 | .route("/ready", get(async || (StatusCode::OK, "Ready\n"))) 105 | // Live: are we alive? 106 | .route("/live", get(async || "Alive\n")) 107 | }; 108 | 109 | // Put all things together 110 | let main_router = Router::new() 111 | .route("/", get(hello)) 112 | .route("/ping", get(async || (StatusCode::OK, "Ready\n"))) 113 | .nest("/mcp/sse", sse_router) 114 | .nest("/mcp", sh_router) 115 | .nest("/_health", health_router) 116 | .with_state(()); 117 | 118 | // Start the http server 119 | let listener = tokio::net::TcpListener::bind(config.bind).await?; 120 | let server = axum::serve(listener, main_router).with_graceful_shutdown({ 121 | let ct = ct.clone(); 122 | async move { 123 | ct.cancelled().await; 124 | tracing::info!("http server cancelled"); 125 | } 126 | }); 127 | 128 | // Await the server, or it will do nothing :-) 129 | tokio::spawn( 130 | async { 131 | let _ = server.await; 132 | } 133 | .instrument(tracing::info_span!("http-server", bind_address = %config.bind)), 134 | ); 135 | 136 | Ok(ct) 137 | } 138 | } 139 | 140 | async fn hello() -> String { 141 | let version = env!("CARGO_PKG_VERSION"); 142 | format!( 143 | r#"Elasticsearch MCP server. Version {version} 144 | 145 | Endpoints: 146 | - streamable-http: /mcp 147 | - sse: /mcp/sse 148 | "# 149 | ) 150 | } 151 | 152 | #[cfg(test)] 153 | mod tests { 154 | #[test] 155 | pub fn test_parts_in_extensions() {} 156 | } 157 | ``` -------------------------------------------------------------------------------- /tests/http_tests.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use anyhow::bail; 19 | use axum::Router; 20 | use axum::extract::Path; 21 | use elasticsearch_core_mcp_server::cli; 22 | use futures_util::StreamExt; 23 | use http::HeaderMap; 24 | use http::header::{ACCEPT, CONTENT_TYPE}; 25 | use reqwest::Client; 26 | use rmcp::model::ToolAnnotations; 27 | use serde::Deserialize; 28 | use serde::de::DeserializeOwned; 29 | use serde_json::json; 30 | use sse_stream::SseStream; 31 | use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener}; 32 | 33 | /// Simple smoke test 34 | #[tokio::test] 35 | async fn http_tool_list() -> anyhow::Result<()> { 36 | let addr = find_address()?; 37 | 38 | let cli = cli::Cli { 39 | container_mode: false, 40 | command: cli::Command::Http(cli::HttpCommand { 41 | config: None, 42 | address: Some(addr), 43 | sse: false, 44 | }), 45 | }; 46 | 47 | tokio::spawn(async move { cli.run().await }); 48 | 49 | let url = format!("http://127.0.0.1:{}/mcp", addr.port()); 50 | 51 | let body = json!({ 52 | "jsonrpc": "2.0", 53 | "id": 1, 54 | "method": "tools/list" 55 | }); 56 | 57 | let client = Client::builder().build()?; 58 | tokio::time::sleep(std::time::Duration::from_secs(1)).await; 59 | 60 | let response = client 61 | .post(url) 62 | .header(CONTENT_TYPE, "application/json") 63 | .header(ACCEPT, "application/json, text/event-stream") 64 | .json(&body) 65 | .send() 66 | .await? 67 | .error_for_status()?; 68 | 69 | let response_body: ListToolsResponse = parse_response(response).await?; 70 | 71 | let names = response_body 72 | .result 73 | .tools 74 | .iter() 75 | .map(|t| t.name.as_str()) 76 | .collect::<Vec<_>>(); 77 | assert!(names.contains(&"search")); 78 | assert!(names.contains(&"list_indices")); 79 | assert!(names.contains(&"get_mappings")); 80 | Ok(()) 81 | } 82 | 83 | // End-to-end test that spawns a mock ES server and calls the `list_indices` tool via http 84 | #[tokio::test] 85 | async fn end_to_end() -> anyhow::Result<()> { 86 | // Start an ES mock that will reply to list_indices 87 | let router = Router::new().route( 88 | "/_cat/indices/{index}", 89 | axum::routing::get(async move |headers: HeaderMap, Path(index): Path<String>| { 90 | // Check parameter forwarding 91 | assert_eq!(index, "test-index"); 92 | // Check API key 93 | assert_eq!( 94 | headers.get("Authorization").unwrap().to_str().unwrap(), 95 | "ApiKey value-from-the-test" 96 | ); 97 | axum::Json(json!([ 98 | { 99 | "index": "test-index", 100 | "status": "open", 101 | "docs.count": "100" 102 | } 103 | ])) 104 | }), 105 | ); 106 | 107 | let listener = tokio::net::TcpListener::bind(LOCALHOST_0).await?; 108 | 109 | // SAFETY: works since this is the only test in this module that sets env vars 110 | // TODO: refactor the CLI to accept an alternate source of key/values 111 | unsafe { 112 | std::env::set_var("ES_URL", format!("http://127.0.0.1:{}/", listener.local_addr()?.port())); 113 | } 114 | let server = axum::serve(listener, router); 115 | tokio::spawn(async { server.await }); 116 | 117 | // Start an http MCP server 118 | let addr = find_address()?; 119 | let cli = cli::Cli { 120 | container_mode: false, 121 | command: cli::Command::Http(cli::HttpCommand { 122 | config: None, 123 | address: Some(addr), 124 | sse: false, 125 | }), 126 | }; 127 | 128 | tokio::spawn(async move { cli.run().await }); 129 | let url = format!("http://127.0.0.1:{}/mcp", addr.port()); 130 | let body = json!({ 131 | "jsonrpc": "2.0", 132 | "id": 1, 133 | "method": "tools/call", 134 | "params": { 135 | "name": "list_indices", 136 | "arguments": { 137 | "index_pattern": "test-index" 138 | } 139 | } 140 | }); 141 | 142 | let client = Client::builder().build()?; 143 | tokio::time::sleep(std::time::Duration::from_secs(1)).await; 144 | 145 | let response = client 146 | .post(url) 147 | .header(CONTENT_TYPE, "application/json") 148 | .header(ACCEPT, "application/json, text/event-stream") 149 | .header("Authorization", "ApiKey value-from-the-test") 150 | .json(&body) 151 | .send() 152 | .await? 153 | .error_for_status()?; 154 | 155 | let response_body: serde_json::Value = parse_response(response).await?; 156 | 157 | assert_eq!(response_body["result"]["content"][0]["text"], "Found 1 indices:"); 158 | assert_eq!( 159 | response_body["result"]["content"][1]["text"], 160 | "[{\"index\":\"test-index\",\"status\":\"open\",\"docs.count\":100}]" 161 | ); 162 | 163 | Ok(()) 164 | } 165 | 166 | const LOCALHOST_0: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); 167 | 168 | fn find_address() -> anyhow::Result<SocketAddr> { 169 | // Find a free port 170 | Ok(TcpListener::bind(LOCALHOST_0)?.local_addr()?) 171 | } 172 | 173 | async fn parse_response<T: DeserializeOwned>(response: reqwest::Response) -> anyhow::Result<T> { 174 | let result = match response.headers().get(CONTENT_TYPE) { 175 | Some(v) if v == "application/json" => response.json().await?, 176 | Some(v) if v == "text/event-stream" => { 177 | let mut stream = SseStream::from_byte_stream(response.bytes_stream()); 178 | match stream.next().await { 179 | None => bail!("No data"), 180 | Some(Err(e)) => bail!("Bad SSE stream: {}", e), 181 | Some(Ok(sse)) => { 182 | let data = sse.data.unwrap(); 183 | serde_json::from_str(&data)? 184 | } 185 | } 186 | } 187 | _ => { 188 | panic!("Unexpected content type"); 189 | } 190 | }; 191 | 192 | Ok(result) 193 | } 194 | 195 | #[derive(Debug, Deserialize)] 196 | #[allow(dead_code)] 197 | struct ListToolsResponse { 198 | jsonrpc: String, 199 | id: i64, 200 | result: ToolResult, 201 | } 202 | 203 | #[derive(Debug, Deserialize)] 204 | #[allow(dead_code)] 205 | struct ToolResult { 206 | tools: Vec<Tool>, 207 | } 208 | 209 | #[derive(Debug, Deserialize)] 210 | #[allow(dead_code)] 211 | struct Tool { 212 | name: String, 213 | description: String, 214 | input_schema: Option<serde_json::Value>, 215 | annotations: Option<ToolAnnotations>, 216 | } 217 | ``` -------------------------------------------------------------------------------- /src/servers/elasticsearch/mod.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | mod base_tools; 19 | 20 | use crate::servers::IncludeExclude; 21 | use crate::utils::none_if_empty_string; 22 | use elasticsearch::Elasticsearch; 23 | use elasticsearch::auth::Credentials; 24 | use elasticsearch::cert::CertificateValidation; 25 | use elasticsearch::http::Url; 26 | use elasticsearch::http::response::Response; 27 | use http::header::USER_AGENT; 28 | use http::request::Parts; 29 | use http::{HeaderValue, header}; 30 | use indexmap::IndexMap; 31 | use rmcp::RoleServer; 32 | use rmcp::model::ToolAnnotations; 33 | use rmcp::service::RequestContext; 34 | use serde::de::DeserializeOwned; 35 | use serde::{Deserialize, Serialize}; 36 | use serde_aux::field_attributes::deserialize_bool_from_anything; 37 | use std::borrow::Cow; 38 | use std::collections::HashMap; 39 | 40 | #[derive(Debug, Serialize, Deserialize)] 41 | pub struct ElasticsearchMcpConfig { 42 | /// Cluster URL 43 | pub url: String, 44 | 45 | /// API key 46 | #[serde(default, deserialize_with = "none_if_empty_string")] 47 | pub api_key: Option<String>, 48 | 49 | /// Username 50 | #[serde(default, deserialize_with = "none_if_empty_string")] 51 | pub username: Option<String>, 52 | 53 | /// Password 54 | #[serde(default, deserialize_with = "none_if_empty_string")] 55 | pub password: Option<String>, 56 | 57 | /// Should we skip SSL certificate verification? 58 | #[serde(default, deserialize_with = "deserialize_bool_from_anything")] 59 | pub ssl_skip_verify: bool, 60 | 61 | /// Search templates to expose as tools or resources 62 | #[serde(default)] 63 | pub tools: Tools, 64 | 65 | /// Prompts 66 | #[serde(default)] 67 | pub prompts: Vec<String>, 68 | // TODO: search as resources? 69 | } 70 | 71 | // A wrapper around an ES client that provides a client instance configured 72 | /// for a given request context (i.e. auth credentials) 73 | #[derive(Clone)] 74 | pub struct EsClientProvider(Elasticsearch); 75 | 76 | impl EsClientProvider { 77 | pub fn new(client: Elasticsearch) -> Self { 78 | EsClientProvider(client) 79 | } 80 | 81 | /// If the incoming request is a http request and has an `Authorization` header, use it 82 | /// to authenticate to the remote ES instance. 83 | pub fn get(&self, context: RequestContext<RoleServer>) -> Cow<'_, Elasticsearch> { 84 | let client = &self.0; 85 | 86 | let Some(mut auth) = context 87 | .extensions 88 | .get::<Parts>() 89 | .and_then(|p| p.headers.get(header::AUTHORIZATION)) 90 | .and_then(|h| h.to_str().ok()) 91 | else { 92 | // No auth 93 | return Cow::Borrowed(client); 94 | }; 95 | 96 | // MCP inspector insists on sending a bearer token and prepends "Bearer" to the value provided 97 | if auth.starts_with("Bearer ApiKey ") || auth.starts_with("Bearer Basic ") { 98 | auth = auth.trim_start_matches("Bearer "); 99 | } 100 | 101 | let transport = client 102 | .transport() 103 | .clone_with_auth(Some(Credentials::AuthorizationHeader(auth.to_string()))); 104 | 105 | Cow::Owned(Elasticsearch::new(transport)) 106 | } 107 | } 108 | 109 | #[derive(Debug, Serialize, Deserialize, Default)] 110 | pub struct Tools { 111 | #[serde(flatten)] 112 | pub incl_excl: Option<IncludeExclude>, 113 | pub custom: HashMap<String, CustomTool>, 114 | } 115 | 116 | #[derive(Debug, Serialize, Deserialize)] 117 | #[serde(tag = "type", rename_all = "snake_case")] 118 | pub enum CustomTool { 119 | Esql(EsqlTool), 120 | SearchTemplate(SearchTemplateTool), 121 | } 122 | 123 | impl CustomTool { 124 | pub fn base(&self) -> &ToolBase { 125 | match self { 126 | CustomTool::Esql(esql) => &esql.base, 127 | CustomTool::SearchTemplate(search_template) => &search_template.base, 128 | } 129 | } 130 | } 131 | 132 | #[derive(Debug, Serialize, Deserialize)] 133 | pub struct ToolBase { 134 | pub description: String, 135 | pub parameters: IndexMap<String, schemars::schema::SchemaObject>, 136 | pub annotations: Option<ToolAnnotations>, 137 | } 138 | 139 | #[derive(Debug, Serialize, Deserialize)] 140 | pub struct EsqlTool { 141 | #[serde(flatten)] 142 | base: ToolBase, 143 | query: String, 144 | #[serde(default)] 145 | format: EsqlResultFormat, 146 | } 147 | 148 | #[derive(Debug, Serialize, Deserialize, Default)] 149 | #[serde(rename_all = "snake_case")] 150 | pub enum EsqlResultFormat { 151 | #[default] 152 | // Output as JSON, either as an array of objects or as a single object. 153 | Json, 154 | // If a single object with a single property, output only its value 155 | Value, 156 | //Csv, 157 | } 158 | 159 | #[derive(Debug, Serialize, Deserialize)] 160 | pub struct SearchTemplateTool { 161 | #[serde(flatten)] 162 | base: ToolBase, 163 | #[serde(flatten)] 164 | template: SearchTemplate, 165 | } 166 | 167 | #[derive(Debug, Serialize, Deserialize)] 168 | #[serde(rename_all = "snake_case")] 169 | pub enum SearchTemplate { 170 | TemplateId(String), 171 | Template(serde_json::Value), // or constrain to an object? 172 | } 173 | 174 | #[derive(Clone)] 175 | pub struct ElasticsearchMcp {} 176 | 177 | impl ElasticsearchMcp { 178 | pub fn new_with_config(config: ElasticsearchMcpConfig, container_mode: bool) -> anyhow::Result<base_tools::EsBaseTools> { 179 | let creds = if let Some(api_key) = config.api_key.clone() { 180 | Some(Credentials::EncodedApiKey(api_key)) 181 | } else if let Some(username) = config.username.clone() { 182 | let pwd = config.password.clone().ok_or(anyhow::Error::msg("missing password"))?; 183 | Some(Credentials::Basic(username, pwd)) 184 | } else { 185 | None 186 | }; 187 | 188 | let url = config.url.as_str(); 189 | if url.is_empty() { 190 | return Err(anyhow::Error::msg("Elasticsearch URL is empty")); 191 | } 192 | 193 | let mut url = Url::parse(url)?; 194 | if container_mode { 195 | rewrite_localhost(&mut url)?; 196 | } 197 | 198 | let pool = elasticsearch::http::transport::SingleNodeConnectionPool::new(url.clone()); 199 | let mut transport = elasticsearch::http::transport::TransportBuilder::new(pool); 200 | if let Some(creds) = creds { 201 | transport = transport.auth(creds); 202 | } 203 | if config.ssl_skip_verify { 204 | transport = transport.cert_validation(CertificateValidation::None) 205 | } 206 | transport = transport.header( 207 | USER_AGENT, 208 | HeaderValue::from_str(&format!("elastic-mcp/{}", env!("CARGO_PKG_VERSION")))?, 209 | ); 210 | let transport = transport.build()?; 211 | let es_client = Elasticsearch::new(transport); 212 | 213 | Ok(base_tools::EsBaseTools::new(es_client)) 214 | } 215 | } 216 | 217 | //------------------------------------------------------------------------------------------------ 218 | // Utilities 219 | 220 | /// Rewrite urls targeting `localhost` to a hostname that maps to the container host, if possible. 221 | /// 222 | /// The host name for the container host depends on the OCI runtime used. This is useful to accept 223 | /// Elasticsearch URLs like `http://localhost:9200`. 224 | fn rewrite_localhost(url: &mut Url) -> anyhow::Result<()> { 225 | use std::net::ToSocketAddrs; 226 | let aliases = &[ 227 | "host.docker.internal", // Docker 228 | "host.containers.internal", // Podman, maybe others 229 | ]; 230 | 231 | if let Some(host) = url.host_str() && host == "localhost" { 232 | for alias in aliases { 233 | if let Ok(mut alias_add) = (*alias, 80).to_socket_addrs() && alias_add.next().is_some() { 234 | url.set_host(Some(alias))?; 235 | tracing::info!("Container mode: using '{alias}' instead of 'localhost'"); 236 | return Ok(()); 237 | } 238 | } 239 | } 240 | tracing::warn!("Container mode: cound not find a replacement for 'localhost'"); 241 | Ok(()) 242 | } 243 | 244 | /// Map any error to an internal error of the MCP server 245 | pub fn internal_error(e: impl std::error::Error) -> rmcp::Error { 246 | rmcp::Error::internal_error(e.to_string(), None) 247 | } 248 | 249 | /// Return an error as an error response to the client, which may be able to take 250 | /// action to correct it. This should be refined to handle common error types such 251 | /// as index not found, which could be caused by the client hallucinating an index name. 252 | /// 253 | /// TODO (in rmcp): if rmcp::Error had a variant that accepts a CallToolResult, this would 254 | /// allow to use the '?' operator while sending a result to the client. 255 | pub fn handle_error(result: Result<Response, elasticsearch::Error>) -> Result<Response, rmcp::Error> { 256 | match result { 257 | Ok(resp) => resp.error_for_status_code(), 258 | Err(e) => { 259 | tracing::error!("Error: {:?}", &e); 260 | Err(e) 261 | } 262 | } 263 | .map_err(internal_error) 264 | } 265 | 266 | pub async fn read_json<T: DeserializeOwned>( 267 | response: Result<Response, elasticsearch::Error>, 268 | ) -> Result<T, rmcp::Error> { 269 | // let text = read_text(response).await?; 270 | // tracing::debug!("Received json {text}"); 271 | // serde_json::from_str(&text).map_err(internal_error) 272 | 273 | let response = handle_error(response)?; 274 | response.json().await.map_err(internal_error) 275 | } 276 | 277 | #[allow(dead_code)] 278 | pub async fn read_text(result: Result<Response, elasticsearch::Error>) -> Result<String, rmcp::Error> { 279 | let response = handle_error(result)?; 280 | response.text().await.map_err(internal_error) 281 | } 282 | ``` -------------------------------------------------------------------------------- /src/servers/elasticsearch/base_tools.rs: -------------------------------------------------------------------------------- ```rust 1 | // Licensed to Elasticsearch B.V. under one or more contributor 2 | // license agreements. See the NOTICE file distributed with 3 | // this work for additional information regarding copyright 4 | // ownership. Elasticsearch B.V. licenses this file to you under 5 | // the Apache License, Version 2.0 (the "License"); you may 6 | // not use this file except in compliance with the License. 7 | // You may obtain a copy of the License at 8 | // 9 | // http://www.apache.org/licenses/LICENSE-2.0 10 | // 11 | // Unless required by applicable law or agreed to in writing, 12 | // software distributed under the License is distributed on an 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 | // KIND, either express or implied. See the License for the 15 | // specific language governing permissions and limitations 16 | // under the License. 17 | 18 | use crate::servers::elasticsearch::{EsClientProvider, read_json}; 19 | use elasticsearch::cat::{CatIndicesParts, CatShardsParts}; 20 | use elasticsearch::indices::IndicesGetMappingParts; 21 | use elasticsearch::{Elasticsearch, SearchParts}; 22 | use indexmap::IndexMap; 23 | use rmcp::handler::server::tool::{Parameters, ToolRouter}; 24 | use rmcp::model::{ 25 | CallToolResult, Content, Implementation, JsonObject, ProtocolVersion, ServerCapabilities, ServerInfo, 26 | }; 27 | use rmcp::service::RequestContext; 28 | use rmcp::{RoleServer, ServerHandler}; 29 | use rmcp_macros::{tool, tool_handler, tool_router}; 30 | use serde::{Deserialize, Serialize}; 31 | use serde_aux::prelude::*; 32 | use serde_json::{Map, Value, json}; 33 | use std::collections::HashMap; 34 | 35 | #[derive(Clone)] 36 | pub struct EsBaseTools { 37 | es_client: EsClientProvider, 38 | tool_router: ToolRouter<EsBaseTools>, 39 | } 40 | 41 | impl EsBaseTools { 42 | pub fn new(es_client: Elasticsearch) -> Self { 43 | Self { 44 | es_client: EsClientProvider::new(es_client), 45 | tool_router: Self::tool_router(), 46 | } 47 | } 48 | } 49 | 50 | #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] 51 | struct ListIndicesParams { 52 | /// Index pattern of Elasticsearch indices to list 53 | pub index_pattern: String, 54 | } 55 | 56 | #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] 57 | struct GetMappingsParams { 58 | /// Name of the Elasticsearch index to get mappings for 59 | index: String, 60 | } 61 | 62 | #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] 63 | struct SearchParams { 64 | /// Name of the Elasticsearch index to search 65 | index: String, 66 | 67 | /// Name of the fields that need to be returned (optional) 68 | fields: Option<Vec<String>>, 69 | 70 | /// Complete Elasticsearch query DSL object that can include query, size, from, sort, etc. 71 | query_body: Map<String, Value>, // note: just Value doesn't work, as Claude would send a string 72 | } 73 | 74 | #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] 75 | struct EsqlQueryParams { 76 | /// Complete Elasticsearch ES|QL query 77 | query: String, 78 | } 79 | 80 | #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] 81 | struct GetShardsParams { 82 | /// Optional index name to get shard information for 83 | index: Option<String>, 84 | } 85 | 86 | #[tool_router] 87 | impl EsBaseTools { 88 | //--------------------------------------------------------------------------------------------- 89 | /// Tool: list indices 90 | #[tool( 91 | description = "List all available Elasticsearch indices", 92 | annotations(title = "List ES indices", read_only_hint = true) 93 | )] 94 | async fn list_indices( 95 | &self, 96 | req_ctx: RequestContext<RoleServer>, 97 | Parameters(ListIndicesParams { index_pattern }): Parameters<ListIndicesParams>, 98 | ) -> Result<CallToolResult, rmcp::Error> { 99 | let es_client = self.es_client.get(req_ctx); 100 | let response = es_client 101 | .cat() 102 | .indices(CatIndicesParts::Index(&[&index_pattern])) 103 | .h(&["index", "status", "docs.count"]) 104 | .format("json") 105 | .send() 106 | .await; 107 | 108 | let response: Vec<CatIndexResponse> = read_json(response).await?; 109 | 110 | Ok(CallToolResult::success(vec![ 111 | Content::text(format!("Found {} indices:", response.len())), 112 | Content::json(response)?, 113 | ])) 114 | } 115 | 116 | //--------------------------------------------------------------------------------------------- 117 | /// Tool: get mappings for an index 118 | #[tool( 119 | description = "Get field mappings for a specific Elasticsearch index", 120 | annotations(title = "Get ES index mappings", read_only_hint = true) 121 | )] 122 | async fn get_mappings( 123 | &self, 124 | req_ctx: RequestContext<RoleServer>, 125 | Parameters(GetMappingsParams { index }): Parameters<GetMappingsParams>, 126 | ) -> Result<CallToolResult, rmcp::Error> { 127 | let es_client = self.es_client.get(req_ctx); 128 | let response = es_client 129 | .indices() 130 | .get_mapping(IndicesGetMappingParts::Index(&[&index])) 131 | .send() 132 | .await; 133 | 134 | let response: MappingResponse = read_json(response).await?; 135 | 136 | // use the first mapping (we can have many if the name is a wildcard) 137 | let mapping = response.values().next().unwrap(); 138 | 139 | Ok(CallToolResult::success(vec![ 140 | Content::text(format!("Mappings for index {index}:")), 141 | Content::json(mapping)?, 142 | ])) 143 | } 144 | 145 | //--------------------------------------------------------------------------------------------- 146 | /// Tool: search an index with the Query DSL 147 | /// 148 | /// The additional 'fields' parameter helps some LLMs that don't know about the `_source` 149 | /// request property to narrow down the data returned and reduce their context size 150 | #[tool( 151 | description = "Perform an Elasticsearch search with the provided query DSL.", 152 | annotations(title = "Elasticsearch search DSL query", read_only_hint = true) 153 | )] 154 | async fn search( 155 | &self, 156 | req_ctx: RequestContext<RoleServer>, 157 | Parameters(SearchParams { 158 | index, 159 | fields, 160 | query_body, 161 | }): Parameters<SearchParams>, 162 | ) -> Result<CallToolResult, rmcp::Error> { 163 | let es_client = self.es_client.get(req_ctx); 164 | 165 | let mut query_body = query_body; 166 | 167 | if let Some(fields) = fields { 168 | // Augment _source if it exists 169 | if let Some(Value::Array(values)) = query_body.get_mut("_source") { 170 | for field in fields.into_iter() { 171 | values.push(Value::String(field)) 172 | } 173 | } else { 174 | query_body.insert("_source".to_string(), json!(fields)); 175 | } 176 | } 177 | 178 | let response = es_client 179 | .search(SearchParts::Index(&[&index])) 180 | .body(query_body) 181 | .send() 182 | .await; 183 | 184 | let response: SearchResult = read_json(response).await?; 185 | 186 | let mut results: Vec<Content> = Vec::new(); 187 | 188 | // Send result stats only if it's not pure aggregation results 189 | if response.aggregations.is_empty() || !response.hits.hits.is_empty() { 190 | let total = response 191 | .hits 192 | .total 193 | .map(|t| t.value.to_string()) 194 | .unwrap_or("unknown".to_string()); 195 | 196 | results.push(Content::text(format!( 197 | "Total results: {}, showing {}.", 198 | total, 199 | response.hits.hits.len() 200 | ))); 201 | } 202 | 203 | // Original prototype sent a separate content for each document, it seems to confuse some LLMs 204 | // for hit in &response.hits.hits { 205 | // results.push(Content::json(&hit.source)?); 206 | // } 207 | if !response.hits.hits.is_empty() { 208 | let sources = response.hits.hits.iter().map(|hit| &hit.source).collect::<Vec<_>>(); 209 | results.push(Content::json(&sources)?); 210 | } 211 | 212 | if !response.aggregations.is_empty() { 213 | results.push(Content::text("Aggregations results:")); 214 | results.push(Content::json(&response.aggregations)?); 215 | } 216 | 217 | Ok(CallToolResult::success(results)) 218 | } 219 | 220 | //--------------------------------------------------------------------------------------------- 221 | /// Tool: ES|QL 222 | #[tool( 223 | description = "Perform an Elasticsearch ES|QL query.", 224 | annotations(title = "Elasticsearch ES|QL query", read_only_hint = true) 225 | )] 226 | async fn esql( 227 | &self, 228 | req_ctx: RequestContext<RoleServer>, 229 | Parameters(EsqlQueryParams { query }): Parameters<EsqlQueryParams>, 230 | ) -> Result<CallToolResult, rmcp::Error> { 231 | let es_client = self.es_client.get(req_ctx); 232 | 233 | let request = EsqlQueryRequest { query }; 234 | 235 | let response = es_client.esql().query().body(request).send().await; 236 | let response: EsqlQueryResponse = read_json(response).await?; 237 | 238 | // Transform response into an array of objects 239 | let mut objects: Vec<Value> = Vec::new(); 240 | for row in response.values.into_iter() { 241 | let mut obj = Map::new(); 242 | for (i, value) in row.into_iter().enumerate() { 243 | obj.insert(response.columns[i].name.clone(), value); 244 | } 245 | objects.push(Value::Object(obj)); 246 | } 247 | 248 | Ok(CallToolResult::success(vec![ 249 | Content::text("Results"), 250 | Content::json(objects)?, 251 | ])) 252 | } 253 | 254 | //--------------------------------------------------------------------------------------------- 255 | // Tool: get shard information 256 | #[tool( 257 | description = "Get shard information for all or specific indices.", 258 | annotations(title = "Get ES shard information", read_only_hint = true) 259 | )] 260 | async fn get_shards( 261 | &self, 262 | req_ctx: RequestContext<RoleServer>, 263 | Parameters(GetShardsParams { index }): Parameters<GetShardsParams>, 264 | ) -> Result<CallToolResult, rmcp::Error> { 265 | let es_client = self.es_client.get(req_ctx); 266 | 267 | let indices: [&str; 1]; 268 | let parts = match &index { 269 | Some(index) => { 270 | indices = [index]; 271 | CatShardsParts::Index(&indices) 272 | } 273 | None => CatShardsParts::None, 274 | }; 275 | let response = es_client 276 | .cat() 277 | .shards(parts) 278 | .format("json") 279 | .h(&["index", "shard", "prirep", "state", "docs", "store", "node"]) 280 | .send() 281 | .await; 282 | 283 | let response: Vec<CatShardsResponse> = read_json(response).await?; 284 | 285 | Ok(CallToolResult::success(vec![ 286 | Content::text(format!("Found {} shards:", response.len())), 287 | Content::json(response)?, 288 | ])) 289 | } 290 | } 291 | 292 | #[tool_handler] 293 | impl ServerHandler for EsBaseTools { 294 | fn get_info(&self) -> ServerInfo { 295 | ServerInfo { 296 | protocol_version: ProtocolVersion::V_2025_03_26, 297 | capabilities: ServerCapabilities::builder().enable_tools().build(), 298 | server_info: Implementation::from_build_env(), 299 | instructions: Some("Provides access to Elasticsearch".to_string()), 300 | } 301 | } 302 | } 303 | 304 | //------------------------------------------------------------------------------------------------- 305 | // Type definitions for ES request/responses (the Rust client doesn't have them yet) and tool responses. 306 | 307 | //----- Search request 308 | 309 | #[derive(Serialize, Deserialize)] 310 | pub struct SearchResult { 311 | pub hits: Hits, 312 | #[serde(default)] 313 | pub aggregations: IndexMap<String, Value>, 314 | } 315 | 316 | #[derive(Serialize, Deserialize)] 317 | pub struct Hits { 318 | pub total: Option<TotalHits>, 319 | pub hits: Vec<Hit>, 320 | } 321 | 322 | #[derive(Serialize, Deserialize)] 323 | pub struct TotalHits { 324 | pub value: u64, 325 | } 326 | 327 | #[derive(Serialize, Deserialize)] 328 | pub struct Hit { 329 | #[serde(rename = "_source")] 330 | pub source: Value, 331 | } 332 | 333 | //----- Cat responses 334 | 335 | #[derive(Serialize, Deserialize)] 336 | pub struct CatIndexResponse { 337 | pub index: String, 338 | pub status: String, 339 | #[serde(rename = "docs.count", deserialize_with = "deserialize_number_from_string")] 340 | pub doc_count: u64, 341 | } 342 | 343 | #[derive(Serialize, Deserialize)] 344 | pub struct CatShardsResponse { 345 | pub index: String, 346 | #[serde(deserialize_with = "deserialize_number_from_string")] 347 | pub shard: usize, 348 | pub prirep: String, 349 | pub state: String, 350 | #[serde(deserialize_with = "deserialize_option_number_from_string")] 351 | pub docs: Option<u64>, 352 | pub store: Option<String>, 353 | pub node: Option<String>, 354 | } 355 | 356 | //----- Index mappings 357 | 358 | pub type MappingResponse = HashMap<String, Mappings>; 359 | 360 | #[derive(Serialize, Deserialize)] 361 | pub struct Mappings { 362 | pub mappings: Mapping, 363 | } 364 | 365 | #[derive(Serialize, Deserialize)] 366 | pub struct Mapping { 367 | #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")] 368 | pub meta: Option<JsonObject>, 369 | properties: HashMap<String, MappingProperty>, 370 | } 371 | 372 | #[derive(Serialize, Deserialize)] 373 | pub struct MappingProperty { 374 | #[serde(rename = "type")] 375 | pub type_: String, 376 | #[serde(flatten)] 377 | pub settings: HashMap<String, serde_json::Value>, 378 | } 379 | 380 | //----- ES|QL 381 | 382 | #[derive(Serialize, Deserialize)] 383 | pub struct EsqlQueryRequest { 384 | pub query: String, 385 | } 386 | 387 | #[derive(Serialize, Deserialize)] 388 | pub struct Column { 389 | pub name: String, 390 | #[serde(rename = "type")] 391 | pub type_: String, 392 | } 393 | 394 | #[derive(Serialize, Deserialize)] 395 | pub struct EsqlQueryResponse { 396 | pub is_partial: Option<bool>, 397 | pub columns: Vec<Column>, 398 | pub values: Vec<Vec<Value>>, 399 | } 400 | ```