#
tokens: 18911/50000 32/32 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .buildkite
│   ├── diff
│   ├── docker.yml
│   ├── pipeline.yml
│   └── pull-requests.json
├── .dockerignore
├── .env-example
├── .github
│   └── workflows
│       ├── build.yml
│       └── stale.yml
├── .gitignore
├── Cargo.lock
├── Cargo.toml
├── catalog-info.yaml
├── Dockerfile
├── Dockerfile-8000
├── docs
│   └── CONTRIBUTING.md
├── elastic-mcp.json5
├── LICENSE
├── Makefile
├── NOTICE.txt
├── README.md
├── renovate.json
├── rustfmt.toml
├── scripts
│   └── cargo-run.sh
├── src
│   ├── bin
│   │   ├── elasticsearch-core-mcp-server.rs
│   │   └── start_http.rs
│   ├── cli.rs
│   ├── lib.rs
│   ├── protocol
│   │   ├── http.rs
│   │   ├── mod.rs
│   │   └── stdio.rs
│   ├── servers
│   │   ├── elasticsearch
│   │   │   ├── base_tools.rs
│   │   │   └── mod.rs
│   │   └── mod.rs
│   └── utils
│       ├── interpolator.rs
│       ├── mod.rs
│       └── rmcp_ext.rs
└── tests
    └── http_tests.rs
```

# Files

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
docs
target
.idea
.vscode

```

--------------------------------------------------------------------------------
/.env-example:
--------------------------------------------------------------------------------

```
# The MCP server looks for `.env` files to populate environment variables that aren't already set 
# Copy and edit this file (it's listed in .gitignore)

ES_URL="http://localhost:9200"
ES_API_KEY="<my-api-key>"

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Generated by Cargo
# will have compiled files and executables
debug/
target/

# These are backup files generated by rustfmt
**/*.rs.bk

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

# Generated by cargo mutants
# Contains mutation testing data
**/mutants.out*/

.idea/
.vscode/

.env

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Elasticsearch MCP Server

> [!CAUTION]
>
> **WARNING: this MCP server is EXPERIMENTAL.**

Connect to your Elasticsearch data directly from any MCP Client using the Model Context Protocol (MCP).

This server connects agents to your Elasticsearch data using the Model Context Protocol. It allows you to interact with your Elasticsearch indices through natural language conversations.

## Available Tools

* `list_indices`: List all available Elasticsearch indices
* `get_mappings`: Get field mappings for a specific Elasticsearch index
* `search`: Perform an Elasticsearch search with the provided query DSL
* `esql`: Perform an ES|QL query
* `get_shards`: Get shard information for all or specific indices

## Prerequisites

* An Elasticsearch instance
* Elasticsearch authentication credentials (API key or username/password)
* An MCP Client (e.g. [Claude Desktop](https://claude.ai/download), [Goose](https://block.github.io/goose/))

**Supported Elasticsearch versions**

This works with Elasticsearch versions `8.x` and `9.x`.

## Installation & Setup

> [!NOTE]
>
> Versions 0.3.1 and earlier were installed via `npm`. These versions are deprecated and no longer supported. The following instructions only apply to 0.4.0 and later.
>
> To view instructions for versions 0.3.1 and earlier, see the [README for v0.3.1](https://github.com/elastic/mcp-server-elasticsearch/tree/v0.3.1).

This MCP server is provided as a Docker image at `docker.elastic.co/mcp/elasticsearch`
that supports MCP's stdio, SSE and streamable-HTTP protocols.

Running this container without any argument will output a usage message:

```
docker run docker.elastic.co/mcp/elasticsearch
```

```
Usage: elasticsearch-mcp-server <COMMAND>

Commands:
  stdio  Start a stdio server
  http   Start a streamable-HTTP server with optional SSE support
  help   Print this message or the help of the given subcommand(s)

Options:
  -h, --help     Print help
  -V, --version  Print version
```

### Using the stdio protocol

The MCP server needs environment variables to be set:

* `ES_URL`: the URL of your Elasticsearch cluster
* For authentication use either an API key or basic authentication:
  * API key: `ES_API_KEY`
  * Basic auth: `ES_USERNAME` and `ES_PASSWORD`
* Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting
  to Elasticsearch. The ability to provide a custom certificate will be added in a later version.

The MCP server is started in stdio mode with this command:

```bash
docker run -i --rm -e ES_URL -e ES_API_KEY docker.elastic.co/mcp/elasticsearch stdio
```

The configuration for Claude Desktop is as follows:

```json
{
 "mcpServers": {
   "elasticsearch-mcp-server": {
    "command": "docker",
    "args": [
     "run", "-i", "--rm",
     "-e", "ES_URL", "-e", "ES_API_KEY",
     "docker.elastic.co/mcp/elasticsearch",
     "stdio"
    ],
    "env": {
      "ES_URL": "<elasticsearch-cluster-url>",
      "ES_API_KEY": "<elasticsearch-API-key>"
    }
   }
 }
}
```

### Using the streamable-HTTP and SSE protocols

Note: streamable-HTTP is recommended, as [SSE is deprecated](https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse-deprecated).

The MCP server needs environment variables to be set:

* `ES_URL`, the URL of your Elasticsearch cluster
* For authentication use either an API key or basic authentication:
  * API key: `ES_API_KEY`
  * Basic auth: `ES_USERNAME` and `ES_PASSWORD`
* Optionally, `ES_SSL_SKIP_VERIFY` set to `true` skips SSL/TLS certificate verification when connecting
  to Elasticsearch. The ability to provide a custom certificate will be added in a later version.

The MCP server is started in http mode with this command:

```bash
docker run --rm -e ES_URL -e ES_API_KEY -p 8080:8080 docker.elastic.co/mcp/elasticsearch http
```

If for some reason your execution environment doesn't allow passing parameters to the container, they can be passed
using the `CLI_ARGS` environment variable: `docker run --rm -e ES_URL -e ES_API_KEY -e CLI_ARGS=http -p 8080:8080...`

The streamable-HTTP endpoint is at `http:<host>:8080/mcp`. There's also a health check at `http:<host>:8080/ping`

Configuration for Claude Desktop (free edition that only supports the stdio protocol).

1. Install `mcp-proxy` (or an equivalent), that will bridge stdio to streamable-http. The executable
   will be installed in `~/.local/bin`:

    ```bash
    uv tool install mcp-proxy
    ```

2. Add this configuration to Claude Desktop:

    ```json
    {
      "mcpServers": {
        "elasticsearch-mcp-server": {
          "command": "/<home-directory>/.local/bin/mcp-proxy",
          "args": [
            "--transport=streamablehttp",
            "--header", "Authorization", "ApiKey <elasticsearch-API-key>",
            "http://<mcp-server-host>:<mcp-server-port>/mcp"
          ]
        }
      }
    }
    ```

```

--------------------------------------------------------------------------------
/docs/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing

[fork]: https://github.com/elastic/mcp-server-elasticsearch/fork
[pr]: https://github.com/elastic/mcp-server-elasticsearch/compare
[code-of-conduct]: https://www.elastic.co/community/codeofconduct

Elasticsearch MCP Server is open source, and we love to receive contributions from our community — you!

There are many ways to contribute, from writing tutorials or blog posts, improving the documentation, submitting bug reports and feature requests or writing code.

Contributions are [released](https://help.github.com/articles/github-terms-of-service/#6-contributions-under-repository-license) under the [project's license](../LICENSE).

Please note that this project follows the [Elastic's Open Source Community Code of Conduct][code-of-conduct].

## Setup

1. Install Rust (using [rustup](https://www.rust-lang.org/tools/install) is recommended)

2. Build the project:
   ```sh
   cargo build
   ```

   or to build the Docker image, run:

   ```sh
   docker build -t mcp/elasticsearch
   ```

## Start Elasticsearch

You can use either:

1. **Elastic Cloud** - Use an existing Elasticsearch deployment and your API key
2. **Local Elasticsearch** - Run Elasticsearch locally using the [start-local](https://www.elastic.co/guide/en/elasticsearch/reference/current/run-elasticsearch-locally.html) script:

   ```bash
   curl -fsSL https://elastic.co/start-local | sh
   ```

   This starts Elasticsearch and Kibana with Docker:
   - Elasticsearch: <http://localhost:9200>
   - Kibana: <http://localhost:5601>

> [!NOTE]
> The `start-local` setup is for development only. It uses basic authentication and disables HTTPS.

## Development Workflow

1. [Fork][fork] and clone the repository
2. Create a new branch: `git checkout -b my-branch-name`
3. Make your changes and add tests
4. Fix `cargo clippy` warnings, run `cargo fmt` and `cargo test`
5. Test locally with the MCP Inspector:
   ```bash
   npx @modelcontextprotocol/inspector
   ```
7. [Test with MCP Client](../README.md#installation--setup)
8. Push to your fork and [submit a pull request][pr]

## Best Practices

- Follow existing code style and patterns
- Write [conventional commits](https://www.conventionalcommits.org/)
- Include tests for your changes
- Keep PRs focused on a single concern
- Update documentation as needed

## Getting Help

- Open an issue in the repository
- Ask questions on [discuss.elastic.co](https://discuss.elastic.co/)

## Resources

- [How to Contribute to Open Source](https://opensource.guide/how-to-contribute/)
- [Using Pull Requests](https://help.github.com/articles/about-pull-requests/)
- [Elastic Code of Conduct][code-of-conduct]

```

--------------------------------------------------------------------------------
/rustfmt.toml:
--------------------------------------------------------------------------------

```toml
max_width = 120

```

--------------------------------------------------------------------------------
/NOTICE.txt:
--------------------------------------------------------------------------------

```
Elasticsearch MCP Server
Copyright 2025 Elasticsearch B.V.

```

--------------------------------------------------------------------------------
/scripts/cargo-run.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

cd "$(dirname $0)"/..
exec cargo run "$@"

```

--------------------------------------------------------------------------------
/renovate.json:
--------------------------------------------------------------------------------

```json
{
  "$schema": "https://docs.renovatebot.com/renovate-schema.json",
  "extends": [
    "local>elastic/renovate-config"
  ],
  "schedule": [
    "after 1am on monday"
  ]
}

```

--------------------------------------------------------------------------------
/.buildkite/docker.yml:
--------------------------------------------------------------------------------

```yaml
---
# $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json
steps:
  - label: "Build and publish Docker image"
    command: "make docker-push-elastic"
    agents:
      provider: "gcp"

```

--------------------------------------------------------------------------------
/.buildkite/pull-requests.json:
--------------------------------------------------------------------------------

```json
{
  "jobs": [
    {
      "enabled": true,
      "pipelineSlug": "mcp-server-elasticsearch",
      "allow_org_users": true,
      "allowed_repo_permissions": [
        "admin",
        "write"
      ],
      "allowed_list": [],
      "set_commit_status": true,
      "commit_status_context": "buildkite/mcp-server-elasticsearch",
      "build_on_commit": false,
      "build_on_comment": true,
      "trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))",
      "always_trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))",
      "skip_ci_labels": [
        "skip-ci"
      ],
      "skip_target_branches": [],
      "always_require_ci_on_changed": []
    }
  ]
}

```

--------------------------------------------------------------------------------
/src/protocol/mod.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod http;
pub mod stdio;

```

--------------------------------------------------------------------------------
/src/protocol/stdio.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Empty for now, stdio is handled in `lib.rs`.

```

--------------------------------------------------------------------------------
/.github/workflows/stale.yml:
--------------------------------------------------------------------------------

```yaml
---
name: "Close stale issues and PRs"
on:
  schedule:
    - cron: "30 1 * * *"

jobs:
  stale:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/stale@5bef64f19d7facfb25b37b414482c7164d639639 # v9
        with:
          stale-issue-label: stale
          stale-pr-label: stale
          days-before-stale: 60
          days-before-close: 14
          exempt-issue-labels: "good first issue"
          close-issue-label: closed-stale
          close-pr-label: closed-stale
          stale-issue-message: "This issue is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days."
          stale-pr-message: "This pull request is stale because it has been open 60 days with no activity. Remove the `stale` label, or leave a comment, or this will be closed in 14 days."

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Copyright Elasticsearch B.V. and contributors
# SPDX-License-Identifier: Apache-2.0

# To create a multi-arch image, run:
# docker buildx build --platform linux/amd64,linux/arm64 --tag elasticsearch-core-mcp-server .

FROM rust:1.89@sha256:c50cd6e20c46b0b36730b5eb27289744e4bb8f32abc90d8c64ca09decf4f55ba AS builder

WORKDIR /app

COPY Cargo.toml Cargo.lock ./

# Cache dependencies
RUN mkdir -p ./src/bin && \
    echo "pub fn main() {}" > ./src/bin/elasticsearch-core-mcp-server.rs && \
    cargo build --release

COPY src ./src/

RUN cargo build --release

#--------------------------------------------------------------------------------------------------

FROM cgr.dev/chainguard/wolfi-base:latest

COPY --from=builder /app/target/release/elasticsearch-core-mcp-server /usr/local/bin/elasticsearch-core-mcp-server

ENV CONTAINER_MODE=true

EXPOSE 8080/tcp
ENTRYPOINT ["/usr/local/bin/elasticsearch-core-mcp-server"]

```

--------------------------------------------------------------------------------
/.buildkite/pipeline.yml:
--------------------------------------------------------------------------------

```yaml
---
# $yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json
steps:
  - label: "Triggering pipelines"
    plugins:
      monorepo-diff#v1.4.0:
        diff: ".buildkite/diff ${BUILDKITE_COMMIT}"
        wait: true
        watch:
          # if our Renovate configuration is amended, then make sure we have well-formed config
          # for more info, see https://docs.elastic.dev/plat-prod-team/service-catalogue/renovate/testing-renovate-changes
          - path: "renovate.json"
            config:
              label: "Verify Renovate configuration"
              command: "renovate-config-validator"
              agents:
                image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681"
          # if our catalog-info.yaml is changed, make sure it's well-formed according to our internal standards as well as Backstage's validation
          - path: "catalog-info.yaml"
            config:
              command: "/agent/check-catalog-info.sh"
              agents:
                image: "docker.elastic.co/ci-agent-images/pipelib:0.15.0@sha256:753c420cf254a7ed0be658ab153965e0708fe0636dfe2fe57e6e4ae0972bb681"

```

--------------------------------------------------------------------------------
/src/bin/start_http.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use elasticsearch_core_mcp_server::cli::HttpCommand;
use elasticsearch_core_mcp_server::run_http;

/// Start the MCP http server with the local configuration.
/// Useful for debugging from the IDE.
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
    println!("Current directory: {:?}", std::env::current_dir()?);

    run_http(HttpCommand {
        config: Some("elastic-mcp.json5".parse()?),
        address: None,
        sse: true,
    },
    false)
    .await?;

    Ok(())
}

```

--------------------------------------------------------------------------------
/src/utils/mod.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use serde::{Deserialize, Deserializer};

pub mod interpolator;
pub mod rmcp_ext;

/// Deserialize a string, and return `None` if it's empty. Useful for configuration fields like
/// `"foo": "${SOME_ENV_VAR:}"` that uses an env var if present without failing if missing.
pub fn none_if_empty_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> {
    let s: Option<String> = Deserialize::deserialize(deserializer)?;
    match s {
        Some(s) if s.is_empty() => Ok(None),
        _ => Ok(s),
    }
}

```

--------------------------------------------------------------------------------
/src/utils/rmcp_ext.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Various extensions and utilities for the Rust MCP sdk.

use rmcp::{RoleServer, Service};
use std::sync::Arc;

/// A factory to create server (`Service<RoleServer>`) instances.
pub struct ServerProvider<S: Service<RoleServer>>(pub Arc<dyn Fn() -> S + Send + Sync>);

impl<S: Service<RoleServer>, F: Fn() -> S + Send + Sync + 'static> From<F> for ServerProvider<S> {
    fn from(value: F) -> Self {
        ServerProvider(Arc::new(value))
    }
}

impl<S: Service<RoleServer>> From<Arc<dyn Fn() -> S + Send + Sync>> for ServerProvider<S> {
    fn from(value: Arc<dyn Fn() -> S + Send + Sync>) -> Self {
        ServerProvider(value)
    }
}

```

--------------------------------------------------------------------------------
/src/servers/mod.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use serde::{Deserialize, Serialize};

pub mod elasticsearch;

/// Inclusion or exclusion list.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IncludeExclude {
    Include(Vec<String>),
    Exclude(Vec<String>),
}

impl IncludeExclude {
    pub fn is_included(&self, name: &str) -> bool {
        use IncludeExclude::*;
        match self {
            Include(includes) => includes.iter().map(|s| s.as_str()).any(|s| s == name),
            Exclude(excludes) => excludes.iter().map(|s| s.as_str()).all(|s| s != name),
        }
    }

    pub fn filter(&self, tools: &mut Vec<rmcp::model::Tool>) {
        tools.retain(|t| self.is_included(&t.name))
    }
}

```

--------------------------------------------------------------------------------
/Cargo.toml:
--------------------------------------------------------------------------------

```toml
[package]
name = "elasticsearch-core-mcp-server"
version = "0.4.5"
edition = "2024"
authors = ["Elastic.co"]
license-file = "LICENSE"
description = "MCP server for core Elastisearch features"
homepage = "https://github.com/elastic/mcp-server-elasticsearch"
repository = "https://github.com/elastic/mcp-server-elasticsearch"

default-run = "elasticsearch-core-mcp-server"

[dependencies]
# Base stuff
anyhow = "1.0"
futures = "0.3"
indexmap = { version = "2", features = ["serde"] }
itertools = "0.12"
thiserror = "2"

serde = { version = "1.0", features = ["derive"] }
serde_json = "1"

# CLI, config
clap = { version = "4", features = ["derive", "env"] }
dotenvy = "0.15"
serde-aux = "4"
serde_json5 = "0.2"

# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
    "env-filter",
    "std",
    "fmt",
]}

elasticsearch = { version = "9.0.0-alpha.1", git = "https://github.com/elastic/elasticsearch-rs", branch = "new-with-creds" }

# Async and http
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-std", "signal", "process"] }
tokio-util = "0.7"
axum = "0.8"
http = "1.3.1"

# Schemars: keep in sync with rmcp
schemars = { version = "0.8", features = ["chrono"] }

reqwest = "0.12"
futures-util = "0.3"

# MCP rust sdk: main branch, 2025-06-26
[dependencies.rmcp]
features = [
    "server", "auth",
    "transport-sse-server", "transport-streamable-http-server",
    "transport-io", # stdio
    "client", "transport-sse-client", "transport-streamable-http-client", "transport-child-process",
]
# Keep rev in sync with rmcp-macros below
version = "0.2.1"

[dependencies.rmcp-macros]
version = "0.2.1"

[dev-dependencies]
sse-stream = "0.2"

[profile.release]
codegen-units = 1
strip = true
lto = true
opt-level = "z"
# Note: do not add `panic = "abort"` since tower-http has a panic-handling middleware

```

--------------------------------------------------------------------------------
/elastic-mcp.json5:
--------------------------------------------------------------------------------

```

{
    // Configure the target Elasticsearch server
    "elasticsearch": {
      "url": "${ES_URL}",
      "api_key": "${ES_API_KEY:}",
      "username": "${ES_USERNAME:}",
      "password": "${ES_PASSWORD:}",
      "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}",

      /* WIP
      "tools": {
        // Exclude the "search" builtin tool as it's too broad
        "exclude": ["search"],

        // Custom tools
        "custom": {
          // An ES|QL query
          "add-42": {
            "type": "esql",
            "description": "Adds 42 to the input value",
            "query": "row value = ?value | eval result = value + 42 | keep result",
            "parameters": {
              "value": {
                "title": "The value",
                "type": "number"
              }
            }
          },
          // A stored search template
          "a-stored-template": {
            "type": "search_template",
            "description": "This is the description for this stored template",
            "template_id": "my-template",
            "parameters": {
              "param_1": {
                "title": "The first parameter",
                "description": "Use this parameter to blah blah and blah",
                "type": "string"
              }
            }
          },
          // An inline search template
          "an-inline-template": {
            "type": "search_template",
            "description": "This is the description for this inline template",
            "template": {
              "query": {
                "term": {
                  "some-field": "{{param_1}}"
                }
              }
            },
            "parameters": {
              "param_1": {
                "title": "The first parameter",
                "description": "Use this parameter to blah blah and blah",
                "type": "string"
              }
            }
          }
        }
      }
      */
    }
}

```

--------------------------------------------------------------------------------
/src/bin/elasticsearch-core-mcp-server.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::io::ErrorKind;
use clap::Parser;
use elasticsearch_core_mcp_server::cli::Cli;
use tracing_subscriber::EnvFilter;
// To test with stdio, use npx @modelcontextprotocol/inspector cargo run -p elastic-mcp

#[tokio::main]
async fn main() -> anyhow::Result<()> {

    // Also accept .env files
    match dotenvy::dotenv() {
        Err(dotenvy::Error::Io(io_err)) if io_err.kind() == ErrorKind::NotFound => {}
        Err(err) => return Err(err)?,
        Ok(_) => {}
    }

    let env_args = std::env::vars().find(|(k, _v)| k == "CLI_ARGS").map(|(_k, v)| v);

    let cli = if let Some(env_args) = env_args {
        // Concatenate arg[0] with the ARGS value split on whitespaces
        // Note: we don't handle shell-style string quoting and character escaping
        let arg0 = std::env::args().next().unwrap();
        let mut args = vec![arg0.as_str()];
        args.extend(env_args.split_whitespace());

        Cli::parse_from(args)
    } else {
        Cli::parse()
    };

    // Initialize the tracing subscriber with file and stdout logging
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into()))
        .with_writer(std::io::stderr)
        .with_ansi(false)
        .init();

    tracing::info!("Elasticsearch MCP server, version {}", env!("CARGO_PKG_VERSION"));

    cli.run().await
}

```

--------------------------------------------------------------------------------
/catalog-info.yaml:
--------------------------------------------------------------------------------

```yaml
---
# yaml-language-server: $schema=https://json.schemastore.org/catalog-info.json
apiVersion: backstage.io/v1alpha1
kind: Component
metadata:
  name: mcp-server-elasticsearch
spec:
  type: library
  owner: group:devtools-team
  lifecycle: beta

---
# yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json
apiVersion: backstage.io/v1alpha1
kind: Resource
metadata:
  name: buildkite-pipeline-mcp-server-elasticsearch
  description: Buildkite Pipeline for mcp-server-elasticsearch
  links:
    - title: Pipeline
      url: https://buildkite.com/elastic/mcp-server-elasticsearch

spec:
  type: buildkite-pipeline
  owner: group:devtools-team
  system: buildkite
  implementation:
    apiVersion: buildkite.elastic.dev/v1
    kind: Pipeline
    metadata:
      name: mcp-server-elasticsearch
      description: Run checks for the mcp-server-elasticsearch package
    spec:
      repository: elastic/mcp-server-elasticsearch
      pipeline_file: ".buildkite/pipeline.yml"
      teams:
        devtools-team:
          access_level: MANAGE_BUILD_AND_READ
        everyone:
          access_level: READ_ONLY

---
# yaml-language-server: $schema=https://gist.githubusercontent.com/elasticmachine/988b80dae436cafea07d9a4a460a011d/raw/rre.schema.json
apiVersion: backstage.io/v1alpha1
kind: Resource
metadata:
  name: mcp-server-elasticsearch-docker
  description: Build and publish Docker images for mcp-server-elasticsearch
spec:
  type: buildkite-pipeline
  owner: group:devtools-team
  system: buildkite
  implementation:
    apiVersion: buildkite.elastic.dev/v1
    kind: Pipeline
    metadata:
      name: mcp-server-elasticsearch-docker
      description: Build and publish Docker images for mcp-server-elasticsearch
    spec:
      repository: elastic/mcp-server-elasticsearch
      pipeline_file: ".buildkite/docker.yml"
      teams:
        devtools-team:
          access_level: MANAGE_BUILD_AND_READ
        everyone:
          access_level: READ_ONLY
      provider_settings:
        build_pull_requests: false
        build_branches: false
        build_tags: true
      cancel_intermediate_builds: true

```

--------------------------------------------------------------------------------
/.github/workflows/build.yml:
--------------------------------------------------------------------------------

```yaml
name: Build Rust binaries
on:
  release:
    types: [published]
  workflow_dispatch:
    inputs:
      version:
        description: Version to build (e.g. v0.4.1)
        required: true
        type: string

jobs:
  build-binary:
    runs-on: ${{ matrix.target.runner }}
    permissions:
      contents: write
      id-token: write
    strategy:
      fail-fast: false
      matrix:
        target:
          - name: linux-x86_64
            runner: ubuntu-latest
            ext: ""
            target: x86_64-unknown-linux-gnu
          - name: windows-x86_64
            runner: windows-latest
            ext: ".exe"
            target: x86_64-pc-windows-msvc
          - name: macos-x86_64
            runner: macos-latest
            ext: ""
            target: x86_64-apple-darwin
          - name: linux-arm64
            runner: ubuntu-latest
            ext: ""
            target: aarch64-unknown-linux-gnu
          - name: windows-arm64
            runner: windows-latest
            ext: ".exe"
            target: aarch64-pc-windows-msvc
          - name: macos-arm64
            runner: macos-latest
            ext: ""
            target: aarch64-apple-darwin
    steps:
      - name: Get release tag value
        id: version-tag
        shell: bash
        run: |
          if [ -n "${{ inputs.version }}" ]; then
            echo "ref=${{ inputs.version }}" >> $GITHUB_OUTPUT
          else
            ref=$(echo "$GITHUB_REF" | cut -d '/' -f3)
            echo "ref=$ref" >> $GITHUB_OUTPUT
          fi
      - name: Checkout
        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
        with:
          path: checkout-main
      - name: Checkout
        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
        with:
          ref: ${{ steps.version-tag.outputs.ref }}
          path: checkout-tag
      - name: Add target config for arm64 on Linux
        if: ${{ matrix.target.name == 'linux-arm64' }}
        run: |
          sudo apt update && sudo apt install -y gcc-aarch64-linux-gnu
          mkdir -p checkout-tag/.cargo
          echo '[target.aarch64-unknown-linux-gnu]' >> checkout-tag/.cargo/config
          echo 'linker = "aarch64-linux-gnu-gcc"' >> checkout-tag/.cargo/config
      - name: Build binary
        uses: houseabsolute/actions-rust-cross@9a1618ffb70e8374ab5f48fcccea3ebeacf57971 # v1.0.5
        with:
          command: build
          target: ${{ matrix.target.target }}
          args: "--locked --release"
          working-directory: ${{ github.workspace }}/checkout-tag
      - name: Upload binaries to release
        uses: svenstaro/upload-release-action@v2
        with:
          tag: ${{ steps.version-tag.outputs.ref }}
          file: checkout-tag/target/${{ matrix.target.target }}/release/elasticsearch-core-mcp-server${{ matrix.target.ext }}
          asset_name: elasticsearch-core-mcp-server-${{ steps.version-tag.outputs.ref }}-${{ matrix.target.name }}${{ matrix.target.ext }}
          overwrite: true

```

--------------------------------------------------------------------------------
/src/cli.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::servers::elasticsearch;
use clap::Parser;
use clap::{Args, Subcommand};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;

/// Elastic MCP server
#[derive(Debug, Parser)]
#[command(version)]
pub struct Cli {
    /// Container mode: change default http address, rewrite localhost to the host's address
    #[clap(global=true, long, env = "CONTAINER_MODE")]
    pub container_mode: bool,

    #[clap(subcommand)]
    pub command: Command,
}

#[derive(Debug, Subcommand)]
pub enum Command {
    Stdio(StdioCommand),
    Http(HttpCommand),
}

/// Start a streamable-HTTP server with optional SSE support
#[derive(Debug, Args)]
pub struct HttpCommand {
    /// Config file
    #[clap(short, long)]
    pub config: Option<PathBuf>,

    /// Address to listen to [default: 127.0.0.1:8080]
    #[clap(long, value_name = "IP_ADDRESS:PORT", env = "HTTP_ADDRESS")]
    pub address: Option<std::net::SocketAddr>,

    /// Also start an SSE server on '/sse'
    #[clap(long)]
    pub sse: bool,
}

/// Start an stdio server
#[derive(Debug, Args)]
pub struct StdioCommand {
    /// Config file
    #[clap(short, long)]
    pub config: Option<PathBuf>,
}

//---------------------------------------------------------------

// Reference material:
// https://modelcontextprotocol.io/quickstart/user
// https://code.visualstudio.com/docs/copilot/chat/mcp-servers
// https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line-mcp-configuration.html
// https://github.com/landicefu/mcp-client-configuration-server

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Stdio {
    /// Command to run (e.g. "npx", "docker")
    pub command: String,

    /// Command arguments
    pub args: Vec<String>,

    /// Environment variables
    #[serde(default)]
    pub env: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Http {
    /// URL of the server
    pub url: String,

    /// HTTP headers to send with the request
    #[serde(default)]
    pub headers: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "type")]
pub enum McpServer {
    //Builtin(BuiltinConfig),
    Sse(Http),
    StreamableHttp(Http),
    Stdio(Stdio),
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Configuration {
    pub elasticsearch: elasticsearch::ElasticsearchMcpConfig,
    #[serde(default)]
    pub mcp_servers: HashMap<String, McpServer>,
}

```

--------------------------------------------------------------------------------
/src/utils/interpolator.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Simple string interpolator to inject environment variables in the configuration file.
use thiserror::Error;

#[derive(Error, Debug)]
#[error("Invalid configuration template: {reason} at {line}:{char}")]
pub struct InterpolationError {
    pub reason: String,
    pub line: usize,
    pub char: usize,
}

pub fn interpolate_from_env(s: String) -> Result<String, InterpolationError> {
    interpolate(s, |name| std::env::var(name).ok())
}

const OPEN: &str = "${";
const OPEN_LEN: usize = OPEN.len();
const CLOSE: &str = "}";
const CLOSE_LEN: usize = CLOSE.len();

/// Simple string interpolation using the `${name}` and `${name:default_value}` syntax.
pub fn interpolate(s: String, lookup: impl Fn(&str) -> Option<String>) -> Result<String, InterpolationError> {
    if !s.contains(OPEN) {
        return Ok(s);
    }

    let mut result: String = String::new();

    for (line_no, mut line) in s.lines().enumerate() {
        if line_no > 0 {
            result.push('\n');
        }
        let mut char_no = 0;

        let err = |char_no: usize, msg: String| InterpolationError {
            reason: msg,
            line: line_no + 1, // editors (and humans) are 1-based
            char: char_no,
        };

        while let Some(pos) = line.find(OPEN) {
            // Push text before the opening brace
            result.push_str(&line[..pos]);

            char_no += pos + OPEN_LEN;
            line = &line[pos + OPEN_LEN..];

            if let Some(pos) = line.find(CLOSE) {
                let expr = &line[..pos];
                let value = if let Some((name, default)) = expr.split_once(':') {
                    lookup(name).unwrap_or(default.to_string())
                } else {
                    lookup(expr).ok_or_else(|| err(char_no, format!("env variable '{expr}' not defined")))?
                };
                result.push_str(&value);

                char_no += expr.len() + CLOSE_LEN;
                line = &line[expr.len() + CLOSE_LEN..];
            } else {
                return Err(err(char_no, "missing closing braces".to_string()));
            }
        }
        result.push_str(line);
    }

    Ok(result)
}

#[cfg(test)]
mod tests {
    use super::*;

    fn expand(name: &str) -> Result<String, InterpolationError> {
        let lookup = |s: &str| match s {
            "foo" => Some("foo_value".to_string()),
            "bar" => Some("bar_value".to_string()),
            _ => None,
        };

        interpolate(name.to_string(), lookup)
    }

    #[test]
    fn good_extrapolation() -> anyhow::Result<()> {
        assert_eq!("012345678", expand("012345678")?);
        assert_eq!("foo_value01234", expand("${foo}01234")?);
        assert_eq!("foo_value01234\n1234bar_value", expand("${foo}01234\n1234${bar}")?);
        assert_eq!("foo_value01234bar_value", expand("${foo}01234${bar}")?);
        assert_eq!("_01_foo_value01234bar_value567", expand("_01_${foo}01234${bar}567")?);
        Ok(())
    }

    #[test]
    fn failed_extrapolation() {
        assert!(expand("${foo01234").is_err());
        assert!(expand("${foo}01234${bar").is_err());
        assert!(expand("${baz}01234").is_err());
    }
}

```

--------------------------------------------------------------------------------
/src/lib.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod cli;
mod protocol;
mod servers;
mod utils;

use crate::cli::{Cli, Command, Configuration, HttpCommand, StdioCommand};
use crate::protocol::http::{HttpProtocol, HttpServerConfig};
use crate::servers::elasticsearch;
use crate::utils::interpolator;
use rmcp::transport::stdio;
use rmcp::transport::streamable_http_server::session::never::NeverSessionManager;
use rmcp::{RoleServer, Service, ServiceExt};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::select;
use tokio_util::sync::CancellationToken;

impl Cli {
    pub async fn run(self) -> anyhow::Result<()> {
        match self.command {
            Command::Stdio(cmd) => run_stdio(cmd, self.container_mode).await,
            Command::Http(cmd) => run_http(cmd, self.container_mode).await,
        }
    }
}

pub async fn run_stdio(cmd: StdioCommand, container_mode: bool) -> anyhow::Result<()> {
    tracing::info!("Starting stdio server");
    let handler = setup_services(&cmd.config, container_mode).await?;
    let service = handler.serve(stdio()).await.inspect_err(|e| {
        tracing::error!("serving error: {:?}", e);
    })?;

    select! {
        _ = service.waiting() => {},
        _ = tokio::signal::ctrl_c() => {},
    }

    Ok(())
}

pub async fn run_http(cmd: HttpCommand, container_mode: bool) -> anyhow::Result<()> {
    let handler = setup_services(&cmd.config, container_mode).await?;
    let server_provider = move || handler.clone();
    let address: SocketAddr = if let Some(addr) = cmd.address {
        addr
    } else if container_mode {
        SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8080)
    } else {
        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080)
    };

    let ct = HttpProtocol::serve_with_config(
        server_provider,
        HttpServerConfig {
            bind: address,
            ct: CancellationToken::new(),
            // streaming http:
            keep_alive: None,
            stateful_mode: false,
            session_manager: Arc::new(NeverSessionManager::default()),
        },
    )
    .await?;

    tracing::info!("Starting http server at address {}", address);

    tokio::signal::ctrl_c().await?;
    ct.cancel();
    Ok(())
}

pub async fn setup_services(config: &Option<PathBuf>, container_mode: bool) -> anyhow::Result<impl Service<RoleServer> + Clone> {
    // Read config file and expand variables

    let config = if let Some(path) = config {
        std::fs::read_to_string(path)?
    } else {
        // Built-in default configuration, based on env variables.
        r#"{
            "elasticsearch": {
                "url": "${ES_URL}",
                "api_key": "${ES_API_KEY:}",
                "username": "${ES_USERNAME:}",
                "password": "${ES_PASSWORD:}",
                "ssl_skip_verify": "${ES_SSL_SKIP_VERIFY:false}"
            }
        }"#
        .to_string()
    };

    // Expand environment variables in the config file
    let config = interpolator::interpolate_from_env(config)?;

    // JSON5 adds comments and multiline strings (useful for ES|QL) to JSON
    let config: Configuration = match serde_json5::from_str(&config) {
        Ok(c) => c,
        Err(serde_json5::Error::Message { msg, location }) if location.is_some() => {
            let location = location.unwrap();
            let line = location.line;
            let column = location.column;
            anyhow::bail!("Failed to parse config: {msg}, at line {line} column {column}");
        }
        Err(err) => return Err(err)?,
    };

    let handler = elasticsearch::ElasticsearchMcp::new_with_config(config.elasticsearch, container_mode)?;
    Ok(handler)
}

```

--------------------------------------------------------------------------------
/src/protocol/http.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Implementation of HTTP protocols

use crate::utils::rmcp_ext::ServerProvider;
use axum::Router;
use axum::http::StatusCode;
use axum::routing::get;
use rmcp::transport::sse_server::SseServerConfig;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp::transport::streamable_http_server::{SessionManager, StreamableHttpServerConfig};
use rmcp::transport::{SseServer, StreamableHttpService};
use rmcp::{RoleServer, Service};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

/// Configuration for an HTTP MCP server
pub struct HttpServerConfig<M: SessionManager = LocalSessionManager> {
    /// TCP address to bind to
    pub bind: SocketAddr,

    /// Parent cancellation token. `serve_with_config` will return a child token
    pub ct: CancellationToken,

    /// Streamable http server option
    pub keep_alive: Option<Duration>,

    /// Streamable http server option
    pub stateful_mode: bool,

    /// Streamable http server option
    pub session_manager: Arc<M>,
}

/// An HTTP MCP server that supports both SSE and streamable HTTP.
pub struct HttpProtocol {}

impl HttpProtocol {
    pub async fn serve_with_config<S: Service<RoleServer>, M: SessionManager>(
        server_provider: impl Into<ServerProvider<S>>,
        config: HttpServerConfig<M>,
    ) -> std::io::Result<CancellationToken> {
        let server_provider = server_provider.into().0;

        let ct = config.ct.child_token();

        // Create a streamable http router
        let sh_router = {
            let sh_config = StreamableHttpServerConfig {
                sse_keep_alive: config.keep_alive,
                stateful_mode: config.stateful_mode,
            };

            let server_provider = server_provider.clone();
            // TODO: internally, new() wraps the server provider closure with an Arc. We can avoid
            // "double-Arc" by having
            let sh_service =
                StreamableHttpService::new(move || Ok(server_provider()), config.session_manager, sh_config);
            Router::new().route_service("/", sh_service)
        };

        // Create an SSE router
        let sse_router = {
            let sse_config = SseServerConfig {
                bind: config.bind,
                // SSE server will create a child cancellation token for every transport that is created
                // (see with_service() below)
                ct: ct.clone(),
                sse_keep_alive: config.keep_alive,
                sse_path: "/".to_string(),
                post_path: "/message".to_string(),
            };
            let (sse_server, sse_router) = SseServer::new(sse_config);
            let _sse_ct = sse_server.with_service(move || server_provider());

            sse_router
        };

        // Health and readiness
        // See https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/
        let health_router = {
            Router::new()
                // We may introduce a startup probe if we need to fetch/cache remote resources
                // during initialization
                // Ready: once we have the tool list we can process incoming requests
                .route("/ready", get(async || (StatusCode::OK, "Ready\n")))
                // Live: are we alive?
                .route("/live", get(async || "Alive\n"))
        };

        // Put all things together
        let main_router = Router::new()
            .route("/", get(hello))
            .route("/ping", get(async || (StatusCode::OK, "Ready\n")))
            .nest("/mcp/sse", sse_router)
            .nest("/mcp", sh_router)
            .nest("/_health", health_router)
            .with_state(());

        // Start the http server
        let listener = tokio::net::TcpListener::bind(config.bind).await?;
        let server = axum::serve(listener, main_router).with_graceful_shutdown({
            let ct = ct.clone();
            async move {
                ct.cancelled().await;
                tracing::info!("http server cancelled");
            }
        });

        // Await the server, or it will do nothing :-)
        tokio::spawn(
            async {
                let _ = server.await;
            }
            .instrument(tracing::info_span!("http-server", bind_address = %config.bind)),
        );

        Ok(ct)
    }
}

async fn hello() -> String {
    let version = env!("CARGO_PKG_VERSION");
    format!(
        r#"Elasticsearch MCP server. Version {version}

Endpoints:
- streamable-http: /mcp
- sse: /mcp/sse
"#
    )
}

#[cfg(test)]
mod tests {
    #[test]
    pub fn test_parts_in_extensions() {}
}

```

--------------------------------------------------------------------------------
/tests/http_tests.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::bail;
use axum::Router;
use axum::extract::Path;
use elasticsearch_core_mcp_server::cli;
use futures_util::StreamExt;
use http::HeaderMap;
use http::header::{ACCEPT, CONTENT_TYPE};
use reqwest::Client;
use rmcp::model::ToolAnnotations;
use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde_json::json;
use sse_stream::SseStream;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener};

/// Simple smoke test
#[tokio::test]
async fn http_tool_list() -> anyhow::Result<()> {
    let addr = find_address()?;

    let cli = cli::Cli {
        container_mode: false,
        command: cli::Command::Http(cli::HttpCommand {
            config: None,
            address: Some(addr),
            sse: false,
        }),
    };

    tokio::spawn(async move { cli.run().await });

    let url = format!("http://127.0.0.1:{}/mcp", addr.port());

    let body = json!({
        "jsonrpc": "2.0",
        "id": 1,
        "method": "tools/list"
    });

    let client = Client::builder().build()?;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    let response = client
        .post(url)
        .header(CONTENT_TYPE, "application/json")
        .header(ACCEPT, "application/json, text/event-stream")
        .json(&body)
        .send()
        .await?
        .error_for_status()?;

    let response_body: ListToolsResponse = parse_response(response).await?;

    let names = response_body
        .result
        .tools
        .iter()
        .map(|t| t.name.as_str())
        .collect::<Vec<_>>();
    assert!(names.contains(&"search"));
    assert!(names.contains(&"list_indices"));
    assert!(names.contains(&"get_mappings"));
    Ok(())
}

// End-to-end test that spawns a mock ES server and calls the `list_indices` tool via http
#[tokio::test]
async fn end_to_end() -> anyhow::Result<()> {
    // Start an ES mock that will reply to list_indices
    let router = Router::new().route(
        "/_cat/indices/{index}",
        axum::routing::get(async move |headers: HeaderMap, Path(index): Path<String>| {
            // Check parameter forwarding
            assert_eq!(index, "test-index");
            // Check API key
            assert_eq!(
                headers.get("Authorization").unwrap().to_str().unwrap(),
                "ApiKey value-from-the-test"
            );
            axum::Json(json!([
              {
                "index": "test-index",
                "status": "open",
                "docs.count": "100"
              }
            ]))
        }),
    );

    let listener = tokio::net::TcpListener::bind(LOCALHOST_0).await?;

    // SAFETY: works since this is the only test in this module that sets env vars
    // TODO: refactor the CLI to accept an alternate source of key/values
    unsafe {
        std::env::set_var("ES_URL", format!("http://127.0.0.1:{}/", listener.local_addr()?.port()));
    }
    let server = axum::serve(listener, router);
    tokio::spawn(async { server.await });

    // Start an http MCP server
    let addr = find_address()?;
    let cli = cli::Cli {
        container_mode: false,
        command: cli::Command::Http(cli::HttpCommand {
            config: None,
            address: Some(addr),
            sse: false,
        }),
    };

    tokio::spawn(async move { cli.run().await });
    let url = format!("http://127.0.0.1:{}/mcp", addr.port());
    let body = json!({
        "jsonrpc": "2.0",
        "id": 1,
        "method": "tools/call",
        "params": {
            "name": "list_indices",
            "arguments": {
                "index_pattern": "test-index"
            }
        }
    });

    let client = Client::builder().build()?;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    let response = client
        .post(url)
        .header(CONTENT_TYPE, "application/json")
        .header(ACCEPT, "application/json, text/event-stream")
        .header("Authorization", "ApiKey value-from-the-test")
        .json(&body)
        .send()
        .await?
        .error_for_status()?;

    let response_body: serde_json::Value = parse_response(response).await?;

    assert_eq!(response_body["result"]["content"][0]["text"], "Found 1 indices:");
    assert_eq!(
        response_body["result"]["content"][1]["text"],
        "[{\"index\":\"test-index\",\"status\":\"open\",\"docs.count\":100}]"
    );

    Ok(())
}

const LOCALHOST_0: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);

fn find_address() -> anyhow::Result<SocketAddr> {
    // Find a free port
    Ok(TcpListener::bind(LOCALHOST_0)?.local_addr()?)
}

async fn parse_response<T: DeserializeOwned>(response: reqwest::Response) -> anyhow::Result<T> {
    let result = match response.headers().get(CONTENT_TYPE) {
        Some(v) if v == "application/json" => response.json().await?,
        Some(v) if v == "text/event-stream" => {
            let mut stream = SseStream::from_byte_stream(response.bytes_stream());
            match stream.next().await {
                None => bail!("No data"),
                Some(Err(e)) => bail!("Bad SSE stream: {}", e),
                Some(Ok(sse)) => {
                    let data = sse.data.unwrap();
                    serde_json::from_str(&data)?
                }
            }
        }
        _ => {
            panic!("Unexpected content type");
        }
    };

    Ok(result)
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct ListToolsResponse {
    jsonrpc: String,
    id: i64,
    result: ToolResult,
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct ToolResult {
    tools: Vec<Tool>,
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct Tool {
    name: String,
    description: String,
    input_schema: Option<serde_json::Value>,
    annotations: Option<ToolAnnotations>,
}

```

--------------------------------------------------------------------------------
/src/servers/elasticsearch/mod.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

mod base_tools;

use crate::servers::IncludeExclude;
use crate::utils::none_if_empty_string;
use elasticsearch::Elasticsearch;
use elasticsearch::auth::Credentials;
use elasticsearch::cert::CertificateValidation;
use elasticsearch::http::Url;
use elasticsearch::http::response::Response;
use http::header::USER_AGENT;
use http::request::Parts;
use http::{HeaderValue, header};
use indexmap::IndexMap;
use rmcp::RoleServer;
use rmcp::model::ToolAnnotations;
use rmcp::service::RequestContext;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_aux::field_attributes::deserialize_bool_from_anything;
use std::borrow::Cow;
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize)]
pub struct ElasticsearchMcpConfig {
    /// Cluster URL
    pub url: String,

    /// API key
    #[serde(default, deserialize_with = "none_if_empty_string")]
    pub api_key: Option<String>,

    /// Username
    #[serde(default, deserialize_with = "none_if_empty_string")]
    pub username: Option<String>,

    /// Password
    #[serde(default, deserialize_with = "none_if_empty_string")]
    pub password: Option<String>,

    /// Should we skip SSL certificate verification?
    #[serde(default, deserialize_with = "deserialize_bool_from_anything")]
    pub ssl_skip_verify: bool,

    /// Search templates to expose as tools or resources
    #[serde(default)]
    pub tools: Tools,

    /// Prompts
    #[serde(default)]
    pub prompts: Vec<String>,
    // TODO: search as resources?
}

// A wrapper around an ES client that provides a client instance configured
/// for a given request context (i.e. auth credentials)
#[derive(Clone)]
pub struct EsClientProvider(Elasticsearch);

impl EsClientProvider {
    pub fn new(client: Elasticsearch) -> Self {
        EsClientProvider(client)
    }

    /// If the incoming request is a http request and has an `Authorization` header, use it
    /// to authenticate to the remote ES instance.
    pub fn get(&self, context: RequestContext<RoleServer>) -> Cow<'_, Elasticsearch> {
        let client = &self.0;

        let Some(mut auth) = context
            .extensions
            .get::<Parts>()
            .and_then(|p| p.headers.get(header::AUTHORIZATION))
            .and_then(|h| h.to_str().ok())
        else {
            // No auth
            return Cow::Borrowed(client);
        };

        // MCP inspector insists on sending a bearer token and prepends "Bearer" to the value provided
        if auth.starts_with("Bearer ApiKey ") || auth.starts_with("Bearer Basic ") {
            auth = auth.trim_start_matches("Bearer ");
        }

        let transport = client
            .transport()
            .clone_with_auth(Some(Credentials::AuthorizationHeader(auth.to_string())));

        Cow::Owned(Elasticsearch::new(transport))
    }
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Tools {
    #[serde(flatten)]
    pub incl_excl: Option<IncludeExclude>,
    pub custom: HashMap<String, CustomTool>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CustomTool {
    Esql(EsqlTool),
    SearchTemplate(SearchTemplateTool),
}

impl CustomTool {
    pub fn base(&self) -> &ToolBase {
        match self {
            CustomTool::Esql(esql) => &esql.base,
            CustomTool::SearchTemplate(search_template) => &search_template.base,
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ToolBase {
    pub description: String,
    pub parameters: IndexMap<String, schemars::schema::SchemaObject>,
    pub annotations: Option<ToolAnnotations>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct EsqlTool {
    #[serde(flatten)]
    base: ToolBase,
    query: String,
    #[serde(default)]
    format: EsqlResultFormat,
}

#[derive(Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum EsqlResultFormat {
    #[default]
    // Output as JSON, either as an array of objects or as a single object.
    Json,
    // If a single object with a single property, output only its value
    Value,
    //Csv,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SearchTemplateTool {
    #[serde(flatten)]
    base: ToolBase,
    #[serde(flatten)]
    template: SearchTemplate,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SearchTemplate {
    TemplateId(String),
    Template(serde_json::Value), // or constrain to an object?
}

#[derive(Clone)]
pub struct ElasticsearchMcp {}

impl ElasticsearchMcp {
    pub fn new_with_config(config: ElasticsearchMcpConfig, container_mode: bool) -> anyhow::Result<base_tools::EsBaseTools> {
        let creds = if let Some(api_key) = config.api_key.clone() {
            Some(Credentials::EncodedApiKey(api_key))
        } else if let Some(username) = config.username.clone() {
            let pwd = config.password.clone().ok_or(anyhow::Error::msg("missing password"))?;
            Some(Credentials::Basic(username, pwd))
        } else {
            None
        };

        let url = config.url.as_str();
        if url.is_empty() {
            return Err(anyhow::Error::msg("Elasticsearch URL is empty"));
        }

        let mut url = Url::parse(url)?;
        if container_mode {
            rewrite_localhost(&mut url)?;
        }

        let pool = elasticsearch::http::transport::SingleNodeConnectionPool::new(url.clone());
        let mut transport = elasticsearch::http::transport::TransportBuilder::new(pool);
        if let Some(creds) = creds {
            transport = transport.auth(creds);
        }
        if config.ssl_skip_verify {
            transport = transport.cert_validation(CertificateValidation::None)
        }
        transport = transport.header(
            USER_AGENT,
            HeaderValue::from_str(&format!("elastic-mcp/{}", env!("CARGO_PKG_VERSION")))?,
        );
        let transport = transport.build()?;
        let es_client = Elasticsearch::new(transport);

        Ok(base_tools::EsBaseTools::new(es_client))
    }
}

//------------------------------------------------------------------------------------------------
// Utilities

/// Rewrite urls targeting `localhost` to a hostname that maps to the container host, if possible.
///
/// The host name for the container host depends on the OCI runtime used. This is useful to accept
/// Elasticsearch URLs like `http://localhost:9200`.
fn rewrite_localhost(url: &mut Url) -> anyhow::Result<()> {
    use std::net::ToSocketAddrs;
    let aliases = &[
        "host.docker.internal", // Docker
        "host.containers.internal", // Podman, maybe others
    ];

    if let Some(host) = url.host_str() && host == "localhost" {
        for alias in aliases {
            if let Ok(mut alias_add) = (*alias, 80).to_socket_addrs() && alias_add.next().is_some() {
                url.set_host(Some(alias))?;
                tracing::info!("Container mode: using '{alias}' instead of 'localhost'");
                return Ok(());
            }
        }
    }
    tracing::warn!("Container mode: cound not find a replacement for 'localhost'");
    Ok(())
}

/// Map any error to an internal error of the MCP server
pub fn internal_error(e: impl std::error::Error) -> rmcp::Error {
    rmcp::Error::internal_error(e.to_string(), None)
}

/// Return an error as an error response to the client, which may be able to take
/// action to correct it. This should be refined to handle common error types such
/// as index not found, which could be caused by the client hallucinating an index name.
///
/// TODO (in rmcp): if rmcp::Error had a variant that accepts a CallToolResult, this would
/// allow to use the '?' operator while sending a result to the client.
pub fn handle_error(result: Result<Response, elasticsearch::Error>) -> Result<Response, rmcp::Error> {
    match result {
        Ok(resp) => resp.error_for_status_code(),
        Err(e) => {
            tracing::error!("Error: {:?}", &e);
            Err(e)
        }
    }
    .map_err(internal_error)
}

pub async fn read_json<T: DeserializeOwned>(
    response: Result<Response, elasticsearch::Error>,
) -> Result<T, rmcp::Error> {
    // let text = read_text(response).await?;
    // tracing::debug!("Received json {text}");
    // serde_json::from_str(&text).map_err(internal_error)

    let response = handle_error(response)?;
    response.json().await.map_err(internal_error)
}

#[allow(dead_code)]
pub async fn read_text(result: Result<Response, elasticsearch::Error>) -> Result<String, rmcp::Error> {
    let response = handle_error(result)?;
    response.text().await.map_err(internal_error)
}

```

--------------------------------------------------------------------------------
/src/servers/elasticsearch/base_tools.rs:
--------------------------------------------------------------------------------

```rust
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::servers::elasticsearch::{EsClientProvider, read_json};
use elasticsearch::cat::{CatIndicesParts, CatShardsParts};
use elasticsearch::indices::IndicesGetMappingParts;
use elasticsearch::{Elasticsearch, SearchParts};
use indexmap::IndexMap;
use rmcp::handler::server::tool::{Parameters, ToolRouter};
use rmcp::model::{
    CallToolResult, Content, Implementation, JsonObject, ProtocolVersion, ServerCapabilities, ServerInfo,
};
use rmcp::service::RequestContext;
use rmcp::{RoleServer, ServerHandler};
use rmcp_macros::{tool, tool_handler, tool_router};
use serde::{Deserialize, Serialize};
use serde_aux::prelude::*;
use serde_json::{Map, Value, json};
use std::collections::HashMap;

#[derive(Clone)]
pub struct EsBaseTools {
    es_client: EsClientProvider,
    tool_router: ToolRouter<EsBaseTools>,
}

impl EsBaseTools {
    pub fn new(es_client: Elasticsearch) -> Self {
        Self {
            es_client: EsClientProvider::new(es_client),
            tool_router: Self::tool_router(),
        }
    }
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct ListIndicesParams {
    /// Index pattern of Elasticsearch indices to list
    pub index_pattern: String,
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetMappingsParams {
    /// Name of the Elasticsearch index to get mappings for
    index: String,
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct SearchParams {
    /// Name of the Elasticsearch index to search
    index: String,

    /// Name of the fields that need to be returned (optional)
    fields: Option<Vec<String>>,

    /// Complete Elasticsearch query DSL object that can include query, size, from, sort, etc.
    query_body: Map<String, Value>, // note: just Value doesn't work, as Claude would send a string
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct EsqlQueryParams {
    /// Complete Elasticsearch ES|QL query
    query: String,
}

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct GetShardsParams {
    /// Optional index name to get shard information for
    index: Option<String>,
}

#[tool_router]
impl EsBaseTools {
    //---------------------------------------------------------------------------------------------
    /// Tool: list indices
    #[tool(
        description = "List all available Elasticsearch indices",
        annotations(title = "List ES indices", read_only_hint = true)
    )]
    async fn list_indices(
        &self,
        req_ctx: RequestContext<RoleServer>,
        Parameters(ListIndicesParams { index_pattern }): Parameters<ListIndicesParams>,
    ) -> Result<CallToolResult, rmcp::Error> {
        let es_client = self.es_client.get(req_ctx);
        let response = es_client
            .cat()
            .indices(CatIndicesParts::Index(&[&index_pattern]))
            .h(&["index", "status", "docs.count"])
            .format("json")
            .send()
            .await;

        let response: Vec<CatIndexResponse> = read_json(response).await?;

        Ok(CallToolResult::success(vec![
            Content::text(format!("Found {} indices:", response.len())),
            Content::json(response)?,
        ]))
    }

    //---------------------------------------------------------------------------------------------
    /// Tool: get mappings for an index
    #[tool(
        description = "Get field mappings for a specific Elasticsearch index",
        annotations(title = "Get ES index mappings", read_only_hint = true)
    )]
    async fn get_mappings(
        &self,
        req_ctx: RequestContext<RoleServer>,
        Parameters(GetMappingsParams { index }): Parameters<GetMappingsParams>,
    ) -> Result<CallToolResult, rmcp::Error> {
        let es_client = self.es_client.get(req_ctx);
        let response = es_client
            .indices()
            .get_mapping(IndicesGetMappingParts::Index(&[&index]))
            .send()
            .await;

        let response: MappingResponse = read_json(response).await?;

        // use the first mapping (we can have many if the name is a wildcard)
        let mapping = response.values().next().unwrap();

        Ok(CallToolResult::success(vec![
            Content::text(format!("Mappings for index {index}:")),
            Content::json(mapping)?,
        ]))
    }

    //---------------------------------------------------------------------------------------------
    /// Tool: search an index with the Query DSL
    ///
    /// The additional 'fields' parameter helps some LLMs that don't know about the `_source`
    /// request property to narrow down the data returned and reduce their context size
    #[tool(
        description = "Perform an Elasticsearch search with the provided query DSL.",
        annotations(title = "Elasticsearch search DSL query", read_only_hint = true)
    )]
    async fn search(
        &self,
        req_ctx: RequestContext<RoleServer>,
        Parameters(SearchParams {
            index,
            fields,
            query_body,
        }): Parameters<SearchParams>,
    ) -> Result<CallToolResult, rmcp::Error> {
        let es_client = self.es_client.get(req_ctx);

        let mut query_body = query_body;

        if let Some(fields) = fields {
            // Augment _source if it exists
            if let Some(Value::Array(values)) = query_body.get_mut("_source") {
                for field in fields.into_iter() {
                    values.push(Value::String(field))
                }
            } else {
                query_body.insert("_source".to_string(), json!(fields));
            }
        }

        let response = es_client
            .search(SearchParts::Index(&[&index]))
            .body(query_body)
            .send()
            .await;

        let response: SearchResult = read_json(response).await?;

        let mut results: Vec<Content> = Vec::new();

        // Send result stats only if it's not pure aggregation results
        if response.aggregations.is_empty() || !response.hits.hits.is_empty() {
            let total = response
                .hits
                .total
                .map(|t| t.value.to_string())
                .unwrap_or("unknown".to_string());

            results.push(Content::text(format!(
                "Total results: {}, showing {}.",
                total,
                response.hits.hits.len()
            )));
        }

        // Original prototype sent a separate content for each document, it seems to confuse some LLMs
        // for hit in &response.hits.hits {
        //     results.push(Content::json(&hit.source)?);
        // }
        if !response.hits.hits.is_empty() {
            let sources = response.hits.hits.iter().map(|hit| &hit.source).collect::<Vec<_>>();
            results.push(Content::json(&sources)?);
        }

        if !response.aggregations.is_empty() {
            results.push(Content::text("Aggregations results:"));
            results.push(Content::json(&response.aggregations)?);
        }

        Ok(CallToolResult::success(results))
    }

    //---------------------------------------------------------------------------------------------
    /// Tool: ES|QL
    #[tool(
        description = "Perform an Elasticsearch ES|QL query.",
        annotations(title = "Elasticsearch ES|QL query", read_only_hint = true)
    )]
    async fn esql(
        &self,
        req_ctx: RequestContext<RoleServer>,
        Parameters(EsqlQueryParams { query }): Parameters<EsqlQueryParams>,
    ) -> Result<CallToolResult, rmcp::Error> {
        let es_client = self.es_client.get(req_ctx);

        let request = EsqlQueryRequest { query };

        let response = es_client.esql().query().body(request).send().await;
        let response: EsqlQueryResponse = read_json(response).await?;

        // Transform response into an array of objects
        let mut objects: Vec<Value> = Vec::new();
        for row in response.values.into_iter() {
            let mut obj = Map::new();
            for (i, value) in row.into_iter().enumerate() {
                obj.insert(response.columns[i].name.clone(), value);
            }
            objects.push(Value::Object(obj));
        }

        Ok(CallToolResult::success(vec![
            Content::text("Results"),
            Content::json(objects)?,
        ]))
    }

    //---------------------------------------------------------------------------------------------
    // Tool: get shard information
    #[tool(
        description = "Get shard information for all or specific indices.",
        annotations(title = "Get ES shard information", read_only_hint = true)
    )]
    async fn get_shards(
        &self,
        req_ctx: RequestContext<RoleServer>,
        Parameters(GetShardsParams { index }): Parameters<GetShardsParams>,
    ) -> Result<CallToolResult, rmcp::Error> {
        let es_client = self.es_client.get(req_ctx);

        let indices: [&str; 1];
        let parts = match &index {
            Some(index) => {
                indices = [index];
                CatShardsParts::Index(&indices)
            }
            None => CatShardsParts::None,
        };
        let response = es_client
            .cat()
            .shards(parts)
            .format("json")
            .h(&["index", "shard", "prirep", "state", "docs", "store", "node"])
            .send()
            .await;

        let response: Vec<CatShardsResponse> = read_json(response).await?;

        Ok(CallToolResult::success(vec![
            Content::text(format!("Found {} shards:", response.len())),
            Content::json(response)?,
        ]))
    }
}

#[tool_handler]
impl ServerHandler for EsBaseTools {
    fn get_info(&self) -> ServerInfo {
        ServerInfo {
            protocol_version: ProtocolVersion::V_2025_03_26,
            capabilities: ServerCapabilities::builder().enable_tools().build(),
            server_info: Implementation::from_build_env(),
            instructions: Some("Provides access to Elasticsearch".to_string()),
        }
    }
}

//-------------------------------------------------------------------------------------------------
// Type definitions for ES request/responses (the Rust client doesn't have them yet) and tool responses.

//----- Search request

#[derive(Serialize, Deserialize)]
pub struct SearchResult {
    pub hits: Hits,
    #[serde(default)]
    pub aggregations: IndexMap<String, Value>,
}

#[derive(Serialize, Deserialize)]
pub struct Hits {
    pub total: Option<TotalHits>,
    pub hits: Vec<Hit>,
}

#[derive(Serialize, Deserialize)]
pub struct TotalHits {
    pub value: u64,
}

#[derive(Serialize, Deserialize)]
pub struct Hit {
    #[serde(rename = "_source")]
    pub source: Value,
}

//----- Cat responses

#[derive(Serialize, Deserialize)]
pub struct CatIndexResponse {
    pub index: String,
    pub status: String,
    #[serde(rename = "docs.count", deserialize_with = "deserialize_number_from_string")]
    pub doc_count: u64,
}

#[derive(Serialize, Deserialize)]
pub struct CatShardsResponse {
    pub index: String,
    #[serde(deserialize_with = "deserialize_number_from_string")]
    pub shard: usize,
    pub prirep: String,
    pub state: String,
    #[serde(deserialize_with = "deserialize_option_number_from_string")]
    pub docs: Option<u64>,
    pub store: Option<String>,
    pub node: Option<String>,
}

//----- Index mappings

pub type MappingResponse = HashMap<String, Mappings>;

#[derive(Serialize, Deserialize)]
pub struct Mappings {
    pub mappings: Mapping,
}

#[derive(Serialize, Deserialize)]
pub struct Mapping {
    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
    pub meta: Option<JsonObject>,
    properties: HashMap<String, MappingProperty>,
}

#[derive(Serialize, Deserialize)]
pub struct MappingProperty {
    #[serde(rename = "type")]
    pub type_: String,
    #[serde(flatten)]
    pub settings: HashMap<String, serde_json::Value>,
}

//----- ES|QL

#[derive(Serialize, Deserialize)]
pub struct EsqlQueryRequest {
    pub query: String,
}

#[derive(Serialize, Deserialize)]
pub struct Column {
    pub name: String,
    #[serde(rename = "type")]
    pub type_: String,
}

#[derive(Serialize, Deserialize)]
pub struct EsqlQueryResponse {
    pub is_partial: Option<bool>,
    pub columns: Vec<Column>,
    pub values: Vec<Vec<Value>>,
}

```