#
tokens: 8201/50000 18/18 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env-template
├── .github
│   └── workflows
│       ├── pypi-publish.yaml
│       └── release.yml
├── .gitignore
├── .python-version
├── cliff.toml
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── src
│   └── opensearch_mcp_server
│       ├── __init__.py
│       ├── es_client.py
│       ├── server.py
│       └── tools
│           ├── cluster.py
│           ├── dashboards.py
│           ├── document.py
│           ├── es_admin
│           │   ├── admin_cluster.py
│           │   └── admin_index.py
│           └── index.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.10

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
.idea
.vscode
.venv
dist
__pycache__
*.egg-info
.env
.DS_Store
**/.DS_Store

```

--------------------------------------------------------------------------------
/.env-template:
--------------------------------------------------------------------------------

```
# OpenSearch connection settings
OPENSEARCH_HOST=https://localhost:9200
OPENSEARCH_USERNAME=admin
OPENSEARCH_PASSWORD=admin
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# OpenSearch MCP Server

## Overview

 This Repository Fork of [elastic-mcp-server](https://github.com/cr7258/elasticsearch-mcp-server) and Converted to [opensearch-mcp-server](https://github.com/seohyunjun/opensearch-mcp-server) MCP Server. It is a Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.

A Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.

## Features

### Index Operations

- `list_indices`: List all indices in the Opensearch cluster.
- `get_mapping`: Retrieve the mapping configuration for a specific index.
- `get_settings`: Get the settings configuration for a specific index.

### Document Operations

- `search_documents`: Search documents in an index using Opensearch Query DSL.

### Cluster Operations

- `get_cluster_health`: Get health status of the cluster.
- `get_cluster_stats`: Get statistical information about the cluster.


## Start Opensearch Cluster

Start the Opensearch cluster using Docker Compose:

```bash
docker-compose up -d
```

This will start a 3-node Opensearch cluster and Kibana. Default Opensearch username `opensearch`, password `test123`.

You can access Kibana from http://localhost:5601.

## Usage with Claude Desktop

### Using uv with local development

Using `uv` requires cloning the repository locally and specifying the path to the source code. Add the following configuration to Claude Desktop's config file `claude_desktop_config.json`.

you need to change `path/to/src/opensearch_mcp_server` to the path where you cloned the repository.

```json
{
  "mcpServers": {
    "opensearch": {
      "command": "uv",
      "args": [
        "--directory",
        "path/to/src/opensearch_mcp_server",
        "run",
        "opensearch-mcp-server"
      ],
      "env": {
        "OPENSEARCH_HOST": "https://localhost:9200",
        "OPENSEARCH_USERNAME": "opensearch",
        "OPENSEARCH_PASSWORD": "test123",
        "DASHBOARDS_HOST": "https://localhost:5601"
      }
    }
  }
}
```

- On macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
- On Windows: `%APPDATA%/Claude/claude_desktop_config.json`

Restart Claude Desktop to load the new MCP server.

Now you can interact with your Opensearch cluster through Claude using natural language commands like:
- "List all indices in the cluster"
- "How old is the student Bob?"
- "Show me the cluster health status"

## License

This project is licensed under the Apache License Version 2.0 - see the [LICENSE](LICENSE) file for details.

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
from . import server


def main():
    """Main entry point for the package."""
    server.main()


# Optionally expose other important items at package level
__all__ = ["main", "server"]
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "opensearch-mcp-server"
version = "1.0.0"
description = "MCP Server for interacting with OpenSearch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "opensearch-py<=2.8.0",
    "mcp>=1.0.0",
    "python-dotenv>=1.0.0",
    "fastmcp>=0.4.0",
]

[project.license]
file = "LICENSE"

[project.scripts]
opensearch-mcp-server = "opensearch_mcp_server:main"

[build-system]
requires = [
    "hatchling",
]
build-backend = "hatchling.build"

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Start with a Python base image
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Copy necessary files
COPY . .

# Install hatch to handle the build
RUN pip install hatch

# Clean dist directory before build
RUN rm -rf dist/*

# Use hatch to build the package and install it
RUN hatch build && pip install dist/*.whl

# Set environment variables required for the MCP server
# These can be overridden at runtime with docker run --env
ENV OPENSEARCH_HOST="https://localhost:9200"
ENV OPENSEARCH_USERNAME="opensearch"
ENV OPENSEARCH_PASSWORD="test123"

# Expose the port the server is running on (if applicable)
EXPOSE 8000

# Command to run the server
ENTRYPOINT ["opensearch-mcp-server"]
```

--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------

```yaml
name: Release

on:
  push:
    tags:
      - 'v*'

jobs:
  release:
    runs-on: ubuntu-latest
    permissions:
      contents: write
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.x'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install git-cliff

      - name: Get version from tag
        id: get_version
        run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV

      - name: Generate changelog
        run: |
          git-cliff --output CHANGELOG.md --latest

      - name: Create Release
        uses: softprops/action-gh-release@v1
        with:
          name: v${{ env.VERSION }}
          body_path: CHANGELOG.md
          draft: false
          prerelease: false

```

--------------------------------------------------------------------------------
/.github/workflows/pypi-publish.yaml:
--------------------------------------------------------------------------------

```yaml
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: PyPI Publish

on:
  workflow_run:
    workflows: ["Release"]
    types:
      - completed

env:
  UV_PUBLISH_TOKEN: '${{ secrets.PYPI_API_TOKEN }}'

jobs:
  deploy:
    runs-on: ubuntu-latest
    if: ${{ github.event.workflow_run.conclusion == 'success' }}
    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.10.x'

    - name: Install dependencies
      run: |
        python -m pip install uv
        uv sync

    - name: Build package
      run: uv build

    - name: Publish package
      run: uv publish

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/es_client.py:
--------------------------------------------------------------------------------

```python
import logging
import os
from dotenv import load_dotenv
from opensearchpy import OpenSearch
import warnings


class OpensearchClient:
    def __init__(self, logger: logging.Logger):
        self.logger = logger
        self.es_client = self._create_opensearch_client()

    def _get_es_config(self):
        """Get OpenSearch configuration from environment variables."""
        # Load environment variables from .env file
        load_dotenv()
        config = {
            "host": os.getenv("OPENSEARCH_HOST"),
            "username": os.getenv("OPENSEARCH_USERNAME"),
            "password": os.getenv("OPENSEARCH_PASSWORD"),
            "dashboards_host": os.getenv("DASHBOARDS_HOST"),
        }

        if not all([config["username"], config["password"]]):
            self.logger.error(
                "Missing required OpenSearch configuration. Please check environment variables:"
            )
            self.logger.error(
                "OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD are required"
            )
            raise ValueError("Missing required OpenSearch configuration")

        return config

    def _create_opensearch_client(self) -> OpenSearch:
        """Create and return an OpenSearch client using configuration from environment."""
        config = self._get_es_config()

        # Disable SSL warnings
        warnings.filterwarnings(
            "ignore",
            message=".*TLS with verify_certs=False is insecure.*",
        )

        return OpenSearch(
            config["host"],
            http_auth=(config["username"], config["password"]),
            verify_certs=False,
        )

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import logging
from fastmcp import FastMCP
from .tools.index import IndexTools
from .tools.document import DocumentTools
from .tools.cluster import ClusterTools
from .tools.dashboards import DashboardTools
from .tools.es_admin.admin_index import AdminIndexTools
from .tools.es_admin.admin_cluster import AdminClusterTools
class OpensearchMCPServer:
    def __init__(self):
        self.name = "opensearch_mcp_server"
        self.mcp = FastMCP(self.name)
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(self.name)
        
        # Initialize tools
        self._register_tools()

    def _register_tools(self):
        """Register all MCP tools."""
        # Initialize tool classes
        index_tools = IndexTools(self.logger)
        document_tools = DocumentTools(self.logger)
        cluster_tools = ClusterTools(self.logger)
        dashboard_tools = DashboardTools(self.logger)
        admin_index_tools = AdminIndexTools(self.logger)
        admin_cluster_tools = AdminClusterTools(self.logger)

        # Register tools from each module
        index_tools.register_tools(self.mcp)
        document_tools.register_tools(self.mcp)
        cluster_tools.register_tools(self.mcp)
        dashboard_tools.register_tools(self.mcp)
        admin_index_tools.register_tools(self.mcp)
        admin_cluster_tools.register_tools(self.mcp)

    def run(self):
        """Run the MCP server."""
        self.mcp.run()

def main():
    server = OpensearchMCPServer()
    server.run()

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/cluster.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent

class ClusterTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register cluster-related tools."""
        
        @mcp.tool(description="Get cluster health status")
        async def get_cluster_health() -> list[TextContent]:
            """
            Get health status of the Opensearch cluster.
            Returns information about the number of nodes, shards, etc.
            """
            self.logger.info("Getting cluster health")
            try:
                response = self.es_client.cluster.health()
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error getting cluster health: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get cluster statistics")
        async def get_cluster_stats() -> list[TextContent]:
            """
            Get statistics from a cluster wide perspective. 
            The API returns basic index metrics (shard numbers, store size, memory usage) and information 
            about the current nodes that form the cluster (number, roles, os, jvm versions, memory usage, cpu and installed plugins).
            https://opensearch.org/docs/latest/tuning-your-cluster/
            """
            self.logger.info("Getting cluster stats")
            try:
                response = self.es_client.cluster.stats()
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error getting cluster stats: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/document.py:
--------------------------------------------------------------------------------

```python
import logging
import json
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent

class DocumentTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register document-related tools."""

        @mcp.tool(description="Search documents in an opensearch index with a custom query")
        async def search_documents(index: str, body: dict) -> list[TextContent]:
            """
            Search documents in a specified opensearch index using a custom query.
            
            Args:
                index: Name of the index to search
                body: Opensearch query DSL. If size is not specified, defaults to 20 results.
            """
            # Ensure reasonable default size limit is set
            if 'size' not in body:
                body['size'] = 20
            self.logger.info(f"Searching in index: {index} with query: {body}")
            try:
                response = self.es_client.search(index=index, body=body)
                # Extract and format relevant information
                formatted_response = {
                    'total_hits': response['hits']['total']['value'],
                    'max_score': response['hits']['max_score'],
                    'hits': []
                }

                # Process each hit
                for hit in response['hits']['hits']:
                    hit_data = {
                        '_id': hit['_id'],
                        '_score': hit['_score'],
                        'source': hit['_source']
                    }
                    formatted_response['hits'].append(hit_data)

                # Include aggregations if present
                if 'aggregations' in response:
                    formatted_response['aggregations'] = response['aggregations']

                return [TextContent(type="text", text=json.dumps(formatted_response, indent=2))]
            except Exception as e:
                self.logger.error(f"Error searching documents: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/index.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent

class IndexTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register index-related tools."""
        
        @mcp.tool(description="List all indices in the Opensearch cluster")
        async def list_indices() -> list[TextContent]:
            """
            List all indices in the Opensearch cluster.
            It is important to check the indices before searching documents
            to understand what indices are avilable.
            """
            self.logger.info("Listing indices...")
            try:
                indices = self.es_client.cat.indices(format="json")
                return [TextContent(type="text", text=str(indices))]
            except Exception as e:
                self.logger.error(f"Error listing indices: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get index mapping")
        async def get_mapping(index: str) -> list[TextContent]:
            """
            Get the mapping for an index.
            It is important to always check the mappings to understand 
            the exact field names and types before constructing queries or URLs.
            
            Args:
                index: Name of the index
            """
            self.logger.info(f"Getting mapping for index: {index}")
            try:
                response = self.es_client.indices.get_mapping(index=index)
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error getting mapping: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get index settings")
        async def get_settings(index: str) -> list[TextContent]:
            """
            Get the settings for an index.
            
            Args:
                index: Name of the index
            """
            self.logger.info(f"Getting settings for index: {index}")
            try:
                response = self.es_client.indices.get_settings(index=index, h=["index", "health"])
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error getting settings: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

```

--------------------------------------------------------------------------------
/cliff.toml:
--------------------------------------------------------------------------------

```toml
# git-cliff ~ configuration file
# https://git-cliff.org/docs/configuration

[changelog]
# template for the changelog header
header = """
# Changelog\n
"""
# template for the changelog body
# https://keats.github.io/tera/docs/#introduction
body = """
{% if version %}\
    {% if previous.version %}\
        ## [{{ version | trim_start_matches(pat="v") }}]($REPO/compare/{{ previous.version }}..{{ version }}) - {{ timestamp | date(format="%Y-%m-%d") }}
    {% else %}\
        ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }}
    {% endif %}\
{% else %}\
    ## [unreleased]
{% endif %}\
{% for group, commits in commits | group_by(attribute="group") %}
    ### {{ group | striptags | trim | upper_first }}
    {% for commit in commits
    | filter(attribute="scope")
    | sort(attribute="scope") %}
        - **({{commit.scope}})**{% if commit.breaking %} [**breaking**]{% endif %} \
            {{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
    {%- endfor -%}
    {% raw %}\n{% endraw %}\
    {%- for commit in commits %}
        {%- if commit.scope -%}
        {% else -%}
            - {% if commit.breaking %} [**breaking**]{% endif %}\
                {{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
        {% endif -%}
    {% endfor -%}
{% endfor %}\n
"""
# template for the changelog footer
footer = """
<!-- generated by git-cliff -->
"""
# remove the leading and trailing whitespace from the templates
trim = true
# postprocessors
postprocessors = [
    { pattern = '\$REPO', replace = "https://github.com/hyunjunseo/opensearch-mcp-server" }, # replace repository URL
]

[git]
# parse the commits based on https://www.conventionalcommits.org
conventional_commits = true
# filter out the commits that are not conventional
filter_unconventional = true
# process each line of a commit as an individual commit
split_commits = false
# regex for preprocessing the commit messages
commit_preprocessors = [
    # { pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](https://github.com/cr7258/elasticsearch-mcp-server/issues/${2}))"}, # replace issue numbers
]
# regex for parsing and grouping commits
commit_parsers = [
  { message = "^feat", group = "<!-- 0 -->⛰️  Features" },
  { message = "^fix", group = "<!-- 1 -->🐛 Bug Fixes" },
  { message = "^doc", group = "<!-- 3 -->📚 Documentation" },
  { message = "^perf", group = "<!-- 4 -->⚡ Performance" },
  { message = "^refactor\\(clippy\\)", skip = true },
  { message = "^refactor", group = "<!-- 2 -->🚜 Refactor" },
  { message = "^style", group = "<!-- 5 -->🎨 Styling" },
  { message = "^test", group = "<!-- 6 -->🧪 Testing" },
  { message = "^chore\\(release\\): prepare for", skip = true },
  { message = "^chore\\(deps.*\\)", skip = true },
  { message = "^chore\\(pr\\)", skip = true },
  { message = "^chore\\(pull\\)", skip = true },
  { message = "^chore\\(npm\\).*yarn\\.lock", skip = true },
  { message = "^chore|^ci", group = "<!-- 7 -->⚙️ Miscellaneous Tasks" },
  { body = ".*security", group = "<!-- 8 -->🛡️ Security" },
  { message = "^revert", group = "<!-- 9 -->◀️ Revert" },
]

# filter out the commits that are not matched by commit parsers
filter_commits = false
# sort the tags topologically
topo_order = false
# sort the commits inside sections by oldest/newest order
sort_commits = "oldest"
# regex for matching git tags
tag_pattern = "^v[0-9]"
# regex for skipping tags
skip_tags = ""
# regex for ignoring tags
ignore_tags = ""
# use tag date instead of commit date
date_order = true
# path to git binary
git_path = "git"
# whether to use relaxed or strict semver parsing
relaxed_semver = true
# only show the changes for the current version
tag_range = true
```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_index.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Dict, Any
from ...es_client import OpensearchClient
from mcp.types import TextContent

class AdminIndexTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register administrative index-related tools."""
        
        @mcp.tool(description="Get ISM policies and their configurations")
        async def get_ism_policies() -> list[TextContent]:
            """
            Get Index State Management policies and their configurations.
            Returns policy IDs, descriptions, states, and index patterns.
            This result should be useful in determining index lifecycle management configurations such as index size limits, index rollover policy
            and retention policy.
            """
            self.logger.info("Fetching ISM policies...")
            try:
                response = self.es_client.transport.perform_request(
                    'GET',
                    '/_plugins/_ism/policies',
                    params={'filter_path': 'policies.policy.policy_id,policies.policy.description,policies.policy.states,policies.policy.ism_template.index_patterns'}
                )
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error fetching ISM policies: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get index template configurations")
        async def get_index_templates() -> list[TextContent]:
            """
            Get index templates and their configurations.
            Returns template names and their configured number of shards.
            This helps understand how new indices will be created.
            """
            self.logger.info("Fetching index templates...")
            try:
                response = self.es_client.transport.perform_request(
                    'GET',
                    '/_index_template',
                    params={'filter_path': ' _index_template?filter_path=index_templates.name,index_templates.index_template.index_patterns,index_templates.index_template.template.settings.index.number_of_shards'}
                )
                return [TextContent(type="text", text=str(response))]
            except Exception as e:
                self.logger.error(f"Error fetching index templates: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get index shard allocation distribution")
        async def get_shard_allocation(latest_index: str) -> list[TextContent]:
            """
            Get the current index shard allocation distribution across nodes.
            Returns index name, shard number, primary/replica status, and node assignment.
            This helps understand how shards are distributed across the cluster.

            Args:
            latest_index: The most recent index of interest.
            """
            self.logger.info("Fetching shard allocation...")
            try:
                response = self.es_client.transport.perform_request(
                    'GET',
                    '/_cat/shards',
                    params={'h': 'index,shard,prirep,node', 'format': 'json'}
                )
                # Count shards per node
                shard_counts = {}
                for shard in response:
                    if shard['node'] not in shard_counts:
                        shard_counts[shard['node']] = 0
                    shard_counts[shard['node']] += 1
                
                # Format the response with both raw data and counts
                formatted_response = {
                    'shard_distribution': response,
                    'shards_per_node': shard_counts
                }
                return [TextContent(type="text", text=str(formatted_response))]
            except Exception as e:
                self.logger.error(f"Error fetching shard allocation: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/dashboards.py:
--------------------------------------------------------------------------------

```python
import logging
import json
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent
from urllib.parse import urlencode

class DashboardTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register dashboards-related tools."""


        @mcp.tool(description="List OpenSearch Dashboards index patterns2")
        async def list_index_patterns() -> list[TextContent]:
            """
            Find all index pattern IDs stored in the .kibana index. Especially useful for
            identifying the correct index pattern ID to use in the Discover view URL.
            This function queries the .kibana index for saved objects of type 'index-pattern'
            and returns a list of their titles and IDs.

            Returns:
            list[TextContent]: A list containing the found index patterns or an error message.
            """
            self.logger.info("Searching for index patterns")
            try:
                response = self.es_client.search(
                    index=".kibana",
                    body={
                        '_source': ['index-pattern.title', '_id'],
                        'query': {
                            'term': {
                                'type': 'index-pattern'
                            }
                        }
                    }
                )
                patterns = json.dumps([{hit["_source"]["index-pattern"]["title"]: hit["_id"].replace('index-pattern:', '')} 
                            for hit in response["hits"]["hits"]], indent=4)
                return [TextContent(type="text", text=(patterns))]
            except Exception as e:
                self.logger.error(f"Error finding index patterns: {e}")
                return [TextContent(type="text", text=f"Error: {(e)}")]

        @mcp.tool(description="Generate OpenSearch Dashboards Discover view URL")
        async def generate_discover_url(query: str, index_pattern_id: str, from_time: str, to_time: str) -> list[TextContent]:
            """
            Generate a URL for the OpenSearch Dashboards Discover view that will display the results of a query.
            The argument values must be compatible with the rison data format used by OpenSearch Dashboards.
            Use the list index patterns tool to determine the available index pattern IDs. 
            Index_pattern_id argument must be the ID of the index pattern to be used.
            The query arguement must be a valid OpenSearch lucene format.
            Refrain from using querying the timestamp or @timestamp fields in the query. Use from_time and to_time parameters instead
            The function constructs a URL that includes the query and index pattern as parameters.

            Args:
            query str: The query to apply in the Discover view in lucene format.
            index_pattern_id str: The index pattern ID to use in the Discover view URL.
            from_time str: The starting time for the query in the format like `now-15m`.
            to_time str: The ending time for the query in the format like `now`.

            Returns:
            list[TextContent]: A list containing the generated URL or an error message.
            """
            self.logger.info("Generating Discover view URL")
            config = self._get_es_config()
            try:
                base_url = config["dashboards_host"] + "/app/data-explorer/discover#?" #"http[s]://host[:port]/app/data-explorer/discover#? + query_params"
                query_params = {
                    "_g": "(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:'"+from_time+"',to:'"+to_time+"'))",
                    "_q": "(filters:!(),query:(language:lucene,query:\'"+query+"\'))",
                    "_a": "(discover:(columns:!(_source),isDirty:!f,sort:!()),metadata:(indexPattern:\'"+index_pattern_id+"\',view:discover))"   
                }
                url = base_url + urlencode(query_params, safe="(),:")
                return [TextContent(type="text", text=url)]
                
            except Exception as e:
                self.logger.error(f"Error generating Discover view URL: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

```

--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_cluster.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Dict, Any
from ...es_client import OpensearchClient
from mcp.types import TextContent

class AdminClusterTools(OpensearchClient):
    def register_tools(self, mcp: Any):
        """Register administrative cluster related tools."""
        
        @mcp.tool(description="Check hot threads on nodes")
        async def get_hot_threads() -> list[TextContent]:
            """
            Get hot threads information from all nodes, filtering for CPU percentage data.
            Returns only thread information containing percentage signs, indicating CPU usage.
            If no threads show percentage usage, indicates no hot threads were found.
            """
            self.logger.info("Fetching hot threads information...")
            try:
                response = self.es_client.transport.perform_request(
                    'GET',
                    '/_nodes/hot_threads'
                )
                # Filter lines containing '%'
                hot_lines = [line for line in str(response).split('\n') if '%' in line]
                
                if hot_lines:
                    return [TextContent(type="text", text='\n'.join(hot_lines))]
                else:
                    return [TextContent(type="text", text="No hot threads detected in the cluster.")]
            except Exception as e:
                self.logger.error(f"Error fetching hot threads: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get current tasks in cluster")
        async def get_tasks() -> list[TextContent]:
            """
            Get current tasks running in the cluster.
            Filters duplicate task types to show only unique operations.
            """
            self.logger.info("Fetching cluster tasks...")
            try:
                response = self.es_client.cat.tasks(v=True)
                lines = response.split('\n')
                seen_tasks = set()
                filtered_lines = []
                
                for line in lines:
                    if not line.strip():
                        continue
                    task_type = line.split()[0]
                    if task_type not in seen_tasks:
                        seen_tasks.add(task_type)
                        filtered_lines.append(line)
                        
                if filtered_lines:
                    return [TextContent(type="text", text='\n'.join(filtered_lines))]
                else:
                    return [TextContent(type="text", text="No tasks currently running in the cluster.")]
            except Exception as e:
                self.logger.error(f"Error fetching tasks: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

        @mcp.tool(description="Get recovery status and estimated completion time")
        async def get_recovery_status() -> list[TextContent]:
            """
            Get recovery status for shards that are currently being recovered.
            Includes progress percentage and estimated time remaining based on current recovery rate.
            """
            self.logger.info("Fetching recovery status...")
            try:
                # Get active recoveries with detailed stats
                response = self.es_client.cat.recovery(format='json', active_only=True, v=True)
                
                if not response:
                    # Get cluster health to show overall shard status if no active recoveries
                    health = self.es_client.cluster.health()
                    total_shards = health['active_shards'] + health['unassigned_shards'] + health['initializing_shards']
                    active_pct = (health['active_shards'] / total_shards) * 100 if total_shards > 0 else 100
                    
                    status_msg = (
                        f"No active recoveries. Cluster status: {health['status']}\n"
                        f"Active shards: {health['active_shards']}/{total_shards} ({active_pct:.1f}%)\n"
                        f"Initializing: {health['initializing_shards']}\n"
                        f"Unassigned: {health['unassigned_shards']}"
                    )
                    return [TextContent(type="text", text=status_msg)]

                # Process active recoveries
                summary = []
                for recovery in response:
                    index = recovery['index']
                    shard = recovery['shard']
                    stage = recovery.get('stage', 'unknown')
                    
                    # Calculate progress and time remaining
                    files_pct = float(recovery.get('files_percent', '0').rstrip('%'))
                    bytes_pct = float(recovery.get('bytes_percent', '0').rstrip('%'))
                    total_bytes = int(recovery.get('total_bytes', 0))
                    bytes_recovered = int(recovery.get('recovered_in_bytes', 0))
                    
                    # Parse time value which can be in format like "1.2s" or "3m" or "2.5h"
                    time_str = recovery.get('time', '0s')
                    try:
                        # Convert time string to milliseconds
                        if time_str.endswith('ms'):
                            time_spent_ms = float(time_str[:-2])
                        elif time_str.endswith('s'):
                            time_spent_ms = float(time_str[:-1]) * 1000
                        elif time_str.endswith('m'):
                            time_spent_ms = float(time_str[:-1]) * 60 * 1000
                        elif time_str.endswith('h'):
                            time_spent_ms = float(time_str[:-1]) * 60 * 60 * 1000
                        else:
                            time_spent_ms = 0
                    except ValueError:
                        time_spent_ms = 0
                    
                    # Calculate recovery rate and estimated time remaining
                    if bytes_recovered > 0 and time_spent_ms > 0:
                        rate_mb_sec = (bytes_recovered / 1024 / 1024) / (time_spent_ms / 1000)
                        remaining_bytes = total_bytes - bytes_recovered
                        est_seconds_remaining = (remaining_bytes / 1024 / 1024) / rate_mb_sec if rate_mb_sec > 0 else 0
                        
                        # Format time remaining in a human-readable way
                        if est_seconds_remaining < 60:
                            time_remaining = f"{est_seconds_remaining:.0f} seconds"
                        elif est_seconds_remaining < 3600:
                            time_remaining = f"{est_seconds_remaining/60:.1f} minutes"
                        else:
                            time_remaining = f"{est_seconds_remaining/3600:.1f} hours"
                        
                        recovery_info = (
                            f"Index: {index}, Shard: {shard}\n"
                            f"Stage: {stage}\n"
                            f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
                            f"Rate: {rate_mb_sec:.1f} MB/sec\n"
                            f"Est. time remaining: {time_remaining}\n"
                        )
                    else:
                        recovery_info = (
                            f"Index: {index}, Shard: {shard}\n"
                            f"Stage: {stage}\n"
                            f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
                            "Rate: calculating...\n"
                        )
                    
                    summary.append(recovery_info)
                
                return [TextContent(type="text", text="\n".join(summary))]
                
            except Exception as e:
                self.logger.error(f"Error fetching recovery status: {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]


```