# Directory Structure
```
├── .env-template
├── .github
│ └── workflows
│ ├── pypi-publish.yaml
│ └── release.yml
├── .gitignore
├── .python-version
├── cliff.toml
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── src
│ └── opensearch_mcp_server
│ ├── __init__.py
│ ├── es_client.py
│ ├── server.py
│ └── tools
│ ├── cluster.py
│ ├── dashboards.py
│ ├── document.py
│ ├── es_admin
│ │ ├── admin_cluster.py
│ │ └── admin_index.py
│ └── index.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.10
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | .idea
2 | .vscode
3 | .venv
4 | dist
5 | __pycache__
6 | *.egg-info
7 | .env
8 | .DS_Store
9 | **/.DS_Store
10 |
```
--------------------------------------------------------------------------------
/.env-template:
--------------------------------------------------------------------------------
```
1 | # OpenSearch connection settings
2 | OPENSEARCH_HOST=https://localhost:9200
3 | OPENSEARCH_USERNAME=admin
4 | OPENSEARCH_PASSWORD=admin
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # OpenSearch MCP Server
2 |
3 | ## Overview
4 |
5 | This Repository Fork of [elastic-mcp-server](https://github.com/cr7258/elasticsearch-mcp-server) and Converted to [opensearch-mcp-server](https://github.com/seohyunjun/opensearch-mcp-server) MCP Server. It is a Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.
6 |
7 | A Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.
8 |
9 | ## Features
10 |
11 | ### Index Operations
12 |
13 | - `list_indices`: List all indices in the Opensearch cluster.
14 | - `get_mapping`: Retrieve the mapping configuration for a specific index.
15 | - `get_settings`: Get the settings configuration for a specific index.
16 |
17 | ### Document Operations
18 |
19 | - `search_documents`: Search documents in an index using Opensearch Query DSL.
20 |
21 | ### Cluster Operations
22 |
23 | - `get_cluster_health`: Get health status of the cluster.
24 | - `get_cluster_stats`: Get statistical information about the cluster.
25 |
26 |
27 | ## Start Opensearch Cluster
28 |
29 | Start the Opensearch cluster using Docker Compose:
30 |
31 | ```bash
32 | docker-compose up -d
33 | ```
34 |
35 | This will start a 3-node Opensearch cluster and Kibana. Default Opensearch username `opensearch`, password `test123`.
36 |
37 | You can access Kibana from http://localhost:5601.
38 |
39 | ## Usage with Claude Desktop
40 |
41 | ### Using uv with local development
42 |
43 | Using `uv` requires cloning the repository locally and specifying the path to the source code. Add the following configuration to Claude Desktop's config file `claude_desktop_config.json`.
44 |
45 | you need to change `path/to/src/opensearch_mcp_server` to the path where you cloned the repository.
46 |
47 | ```json
48 | {
49 | "mcpServers": {
50 | "opensearch": {
51 | "command": "uv",
52 | "args": [
53 | "--directory",
54 | "path/to/src/opensearch_mcp_server",
55 | "run",
56 | "opensearch-mcp-server"
57 | ],
58 | "env": {
59 | "OPENSEARCH_HOST": "https://localhost:9200",
60 | "OPENSEARCH_USERNAME": "opensearch",
61 | "OPENSEARCH_PASSWORD": "test123",
62 | "DASHBOARDS_HOST": "https://localhost:5601"
63 | }
64 | }
65 | }
66 | }
67 | ```
68 |
69 | - On macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
70 | - On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
71 |
72 | Restart Claude Desktop to load the new MCP server.
73 |
74 | Now you can interact with your Opensearch cluster through Claude using natural language commands like:
75 | - "List all indices in the cluster"
76 | - "How old is the student Bob?"
77 | - "Show me the cluster health status"
78 |
79 | ## License
80 |
81 | This project is licensed under the Apache License Version 2.0 - see the [LICENSE](LICENSE) file for details.
82 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from . import server
2 |
3 |
4 | def main():
5 | """Main entry point for the package."""
6 | server.main()
7 |
8 |
9 | # Optionally expose other important items at package level
10 | __all__ = ["main", "server"]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "opensearch-mcp-server"
3 | version = "1.0.0"
4 | description = "MCP Server for interacting with OpenSearch"
5 | readme = "README.md"
6 | requires-python = ">=3.10"
7 | dependencies = [
8 | "opensearch-py<=2.8.0",
9 | "mcp>=1.0.0",
10 | "python-dotenv>=1.0.0",
11 | "fastmcp>=0.4.0",
12 | ]
13 |
14 | [project.license]
15 | file = "LICENSE"
16 |
17 | [project.scripts]
18 | opensearch-mcp-server = "opensearch_mcp_server:main"
19 |
20 | [build-system]
21 | requires = [
22 | "hatchling",
23 | ]
24 | build-backend = "hatchling.build"
25 |
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
2 | # Start with a Python base image
3 | FROM python:3.10-slim
4 |
5 | # Set working directory
6 | WORKDIR /app
7 |
8 | # Copy necessary files
9 | COPY . .
10 |
11 | # Install hatch to handle the build
12 | RUN pip install hatch
13 |
14 | # Clean dist directory before build
15 | RUN rm -rf dist/*
16 |
17 | # Use hatch to build the package and install it
18 | RUN hatch build && pip install dist/*.whl
19 |
20 | # Set environment variables required for the MCP server
21 | # These can be overridden at runtime with docker run --env
22 | ENV OPENSEARCH_HOST="https://localhost:9200"
23 | ENV OPENSEARCH_USERNAME="opensearch"
24 | ENV OPENSEARCH_PASSWORD="test123"
25 |
26 | # Expose the port the server is running on (if applicable)
27 | EXPOSE 8000
28 |
29 | # Command to run the server
30 | ENTRYPOINT ["opensearch-mcp-server"]
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Release
2 |
3 | on:
4 | push:
5 | tags:
6 | - 'v*'
7 |
8 | jobs:
9 | release:
10 | runs-on: ubuntu-latest
11 | permissions:
12 | contents: write
13 | steps:
14 | - uses: actions/checkout@v4
15 | with:
16 | fetch-depth: 0
17 |
18 | - name: Set up Python
19 | uses: actions/setup-python@v4
20 | with:
21 | python-version: '3.x'
22 |
23 | - name: Install dependencies
24 | run: |
25 | python -m pip install --upgrade pip
26 | pip install git-cliff
27 |
28 | - name: Get version from tag
29 | id: get_version
30 | run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV
31 |
32 | - name: Generate changelog
33 | run: |
34 | git-cliff --output CHANGELOG.md --latest
35 |
36 | - name: Create Release
37 | uses: softprops/action-gh-release@v1
38 | with:
39 | name: v${{ env.VERSION }}
40 | body_path: CHANGELOG.md
41 | draft: false
42 | prerelease: false
43 |
```
--------------------------------------------------------------------------------
/.github/workflows/pypi-publish.yaml:
--------------------------------------------------------------------------------
```yaml
1 | # This workflow will upload a Python Package using Twine when a release is created
2 | # For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
3 |
4 | # This workflow uses actions that are not certified by GitHub.
5 | # They are provided by a third-party and are governed by
6 | # separate terms of service, privacy policy, and support
7 | # documentation.
8 |
9 | name: PyPI Publish
10 |
11 | on:
12 | workflow_run:
13 | workflows: ["Release"]
14 | types:
15 | - completed
16 |
17 | env:
18 | UV_PUBLISH_TOKEN: '${{ secrets.PYPI_API_TOKEN }}'
19 |
20 | jobs:
21 | deploy:
22 | runs-on: ubuntu-latest
23 | if: ${{ github.event.workflow_run.conclusion == 'success' }}
24 | steps:
25 | - uses: actions/checkout@v2
26 |
27 | - name: Set up Python
28 | uses: actions/setup-python@v2
29 | with:
30 | python-version: '3.10.x'
31 |
32 | - name: Install dependencies
33 | run: |
34 | python -m pip install uv
35 | uv sync
36 |
37 | - name: Build package
38 | run: uv build
39 |
40 | - name: Publish package
41 | run: uv publish
42 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/es_client.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import os
3 | from dotenv import load_dotenv
4 | from opensearchpy import OpenSearch
5 | import warnings
6 |
7 |
8 | class OpensearchClient:
9 | def __init__(self, logger: logging.Logger):
10 | self.logger = logger
11 | self.es_client = self._create_opensearch_client()
12 |
13 | def _get_es_config(self):
14 | """Get OpenSearch configuration from environment variables."""
15 | # Load environment variables from .env file
16 | load_dotenv()
17 | config = {
18 | "host": os.getenv("OPENSEARCH_HOST"),
19 | "username": os.getenv("OPENSEARCH_USERNAME"),
20 | "password": os.getenv("OPENSEARCH_PASSWORD"),
21 | "dashboards_host": os.getenv("DASHBOARDS_HOST"),
22 | }
23 |
24 | if not all([config["username"], config["password"]]):
25 | self.logger.error(
26 | "Missing required OpenSearch configuration. Please check environment variables:"
27 | )
28 | self.logger.error(
29 | "OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD are required"
30 | )
31 | raise ValueError("Missing required OpenSearch configuration")
32 |
33 | return config
34 |
35 | def _create_opensearch_client(self) -> OpenSearch:
36 | """Create and return an OpenSearch client using configuration from environment."""
37 | config = self._get_es_config()
38 |
39 | # Disable SSL warnings
40 | warnings.filterwarnings(
41 | "ignore",
42 | message=".*TLS with verify_certs=False is insecure.*",
43 | )
44 |
45 | return OpenSearch(
46 | config["host"],
47 | http_auth=(config["username"], config["password"]),
48 | verify_certs=False,
49 | )
50 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/server.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | import logging
3 | from fastmcp import FastMCP
4 | from .tools.index import IndexTools
5 | from .tools.document import DocumentTools
6 | from .tools.cluster import ClusterTools
7 | from .tools.dashboards import DashboardTools
8 | from .tools.es_admin.admin_index import AdminIndexTools
9 | from .tools.es_admin.admin_cluster import AdminClusterTools
10 | class OpensearchMCPServer:
11 | def __init__(self):
12 | self.name = "opensearch_mcp_server"
13 | self.mcp = FastMCP(self.name)
14 |
15 | # Configure logging
16 | logging.basicConfig(
17 | level=logging.INFO,
18 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19 | )
20 | self.logger = logging.getLogger(self.name)
21 |
22 | # Initialize tools
23 | self._register_tools()
24 |
25 | def _register_tools(self):
26 | """Register all MCP tools."""
27 | # Initialize tool classes
28 | index_tools = IndexTools(self.logger)
29 | document_tools = DocumentTools(self.logger)
30 | cluster_tools = ClusterTools(self.logger)
31 | dashboard_tools = DashboardTools(self.logger)
32 | admin_index_tools = AdminIndexTools(self.logger)
33 | admin_cluster_tools = AdminClusterTools(self.logger)
34 |
35 | # Register tools from each module
36 | index_tools.register_tools(self.mcp)
37 | document_tools.register_tools(self.mcp)
38 | cluster_tools.register_tools(self.mcp)
39 | dashboard_tools.register_tools(self.mcp)
40 | admin_index_tools.register_tools(self.mcp)
41 | admin_cluster_tools.register_tools(self.mcp)
42 |
43 | def run(self):
44 | """Run the MCP server."""
45 | self.mcp.run()
46 |
47 | def main():
48 | server = OpensearchMCPServer()
49 | server.run()
50 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/cluster.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Dict, Any
3 | from ..es_client import OpensearchClient
4 | from mcp.types import TextContent
5 |
6 | class ClusterTools(OpensearchClient):
7 | def register_tools(self, mcp: Any):
8 | """Register cluster-related tools."""
9 |
10 | @mcp.tool(description="Get cluster health status")
11 | async def get_cluster_health() -> list[TextContent]:
12 | """
13 | Get health status of the Opensearch cluster.
14 | Returns information about the number of nodes, shards, etc.
15 | """
16 | self.logger.info("Getting cluster health")
17 | try:
18 | response = self.es_client.cluster.health()
19 | return [TextContent(type="text", text=str(response))]
20 | except Exception as e:
21 | self.logger.error(f"Error getting cluster health: {e}")
22 | return [TextContent(type="text", text=f"Error: {str(e)}")]
23 |
24 | @mcp.tool(description="Get cluster statistics")
25 | async def get_cluster_stats() -> list[TextContent]:
26 | """
27 | Get statistics from a cluster wide perspective.
28 | The API returns basic index metrics (shard numbers, store size, memory usage) and information
29 | about the current nodes that form the cluster (number, roles, os, jvm versions, memory usage, cpu and installed plugins).
30 | https://opensearch.org/docs/latest/tuning-your-cluster/
31 | """
32 | self.logger.info("Getting cluster stats")
33 | try:
34 | response = self.es_client.cluster.stats()
35 | return [TextContent(type="text", text=str(response))]
36 | except Exception as e:
37 | self.logger.error(f"Error getting cluster stats: {e}")
38 | return [TextContent(type="text", text=f"Error: {str(e)}")]
39 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/document.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import json
3 | from typing import Dict, Any
4 | from ..es_client import OpensearchClient
5 | from mcp.types import TextContent
6 |
7 | class DocumentTools(OpensearchClient):
8 | def register_tools(self, mcp: Any):
9 | """Register document-related tools."""
10 |
11 | @mcp.tool(description="Search documents in an opensearch index with a custom query")
12 | async def search_documents(index: str, body: dict) -> list[TextContent]:
13 | """
14 | Search documents in a specified opensearch index using a custom query.
15 |
16 | Args:
17 | index: Name of the index to search
18 | body: Opensearch query DSL. If size is not specified, defaults to 20 results.
19 | """
20 | # Ensure reasonable default size limit is set
21 | if 'size' not in body:
22 | body['size'] = 20
23 | self.logger.info(f"Searching in index: {index} with query: {body}")
24 | try:
25 | response = self.es_client.search(index=index, body=body)
26 | # Extract and format relevant information
27 | formatted_response = {
28 | 'total_hits': response['hits']['total']['value'],
29 | 'max_score': response['hits']['max_score'],
30 | 'hits': []
31 | }
32 |
33 | # Process each hit
34 | for hit in response['hits']['hits']:
35 | hit_data = {
36 | '_id': hit['_id'],
37 | '_score': hit['_score'],
38 | 'source': hit['_source']
39 | }
40 | formatted_response['hits'].append(hit_data)
41 |
42 | # Include aggregations if present
43 | if 'aggregations' in response:
44 | formatted_response['aggregations'] = response['aggregations']
45 |
46 | return [TextContent(type="text", text=json.dumps(formatted_response, indent=2))]
47 | except Exception as e:
48 | self.logger.error(f"Error searching documents: {e}")
49 | return [TextContent(type="text", text=f"Error: {str(e)}")]
50 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/index.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Dict, Any
3 | from ..es_client import OpensearchClient
4 | from mcp.types import TextContent
5 |
6 | class IndexTools(OpensearchClient):
7 | def register_tools(self, mcp: Any):
8 | """Register index-related tools."""
9 |
10 | @mcp.tool(description="List all indices in the Opensearch cluster")
11 | async def list_indices() -> list[TextContent]:
12 | """
13 | List all indices in the Opensearch cluster.
14 | It is important to check the indices before searching documents
15 | to understand what indices are avilable.
16 | """
17 | self.logger.info("Listing indices...")
18 | try:
19 | indices = self.es_client.cat.indices(format="json")
20 | return [TextContent(type="text", text=str(indices))]
21 | except Exception as e:
22 | self.logger.error(f"Error listing indices: {e}")
23 | return [TextContent(type="text", text=f"Error: {str(e)}")]
24 |
25 | @mcp.tool(description="Get index mapping")
26 | async def get_mapping(index: str) -> list[TextContent]:
27 | """
28 | Get the mapping for an index.
29 | It is important to always check the mappings to understand
30 | the exact field names and types before constructing queries or URLs.
31 |
32 | Args:
33 | index: Name of the index
34 | """
35 | self.logger.info(f"Getting mapping for index: {index}")
36 | try:
37 | response = self.es_client.indices.get_mapping(index=index)
38 | return [TextContent(type="text", text=str(response))]
39 | except Exception as e:
40 | self.logger.error(f"Error getting mapping: {e}")
41 | return [TextContent(type="text", text=f"Error: {str(e)}")]
42 |
43 | @mcp.tool(description="Get index settings")
44 | async def get_settings(index: str) -> list[TextContent]:
45 | """
46 | Get the settings for an index.
47 |
48 | Args:
49 | index: Name of the index
50 | """
51 | self.logger.info(f"Getting settings for index: {index}")
52 | try:
53 | response = self.es_client.indices.get_settings(index=index, h=["index", "health"])
54 | return [TextContent(type="text", text=str(response))]
55 | except Exception as e:
56 | self.logger.error(f"Error getting settings: {e}")
57 | return [TextContent(type="text", text=f"Error: {str(e)}")]
58 |
```
--------------------------------------------------------------------------------
/cliff.toml:
--------------------------------------------------------------------------------
```toml
1 | # git-cliff ~ configuration file
2 | # https://git-cliff.org/docs/configuration
3 |
4 | [changelog]
5 | # template for the changelog header
6 | header = """
7 | # Changelog\n
8 | """
9 | # template for the changelog body
10 | # https://keats.github.io/tera/docs/#introduction
11 | body = """
12 | {% if version %}\
13 | {% if previous.version %}\
14 | ## [{{ version | trim_start_matches(pat="v") }}]($REPO/compare/{{ previous.version }}..{{ version }}) - {{ timestamp | date(format="%Y-%m-%d") }}
15 | {% else %}\
16 | ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }}
17 | {% endif %}\
18 | {% else %}\
19 | ## [unreleased]
20 | {% endif %}\
21 | {% for group, commits in commits | group_by(attribute="group") %}
22 | ### {{ group | striptags | trim | upper_first }}
23 | {% for commit in commits
24 | | filter(attribute="scope")
25 | | sort(attribute="scope") %}
26 | - **({{commit.scope}})**{% if commit.breaking %} [**breaking**]{% endif %} \
27 | {{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
28 | {%- endfor -%}
29 | {% raw %}\n{% endraw %}\
30 | {%- for commit in commits %}
31 | {%- if commit.scope -%}
32 | {% else -%}
33 | - {% if commit.breaking %} [**breaking**]{% endif %}\
34 | {{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
35 | {% endif -%}
36 | {% endfor -%}
37 | {% endfor %}\n
38 | """
39 | # template for the changelog footer
40 | footer = """
41 | <!-- generated by git-cliff -->
42 | """
43 | # remove the leading and trailing whitespace from the templates
44 | trim = true
45 | # postprocessors
46 | postprocessors = [
47 | { pattern = '\$REPO', replace = "https://github.com/hyunjunseo/opensearch-mcp-server" }, # replace repository URL
48 | ]
49 |
50 | [git]
51 | # parse the commits based on https://www.conventionalcommits.org
52 | conventional_commits = true
53 | # filter out the commits that are not conventional
54 | filter_unconventional = true
55 | # process each line of a commit as an individual commit
56 | split_commits = false
57 | # regex for preprocessing the commit messages
58 | commit_preprocessors = [
59 | # { pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](https://github.com/cr7258/elasticsearch-mcp-server/issues/${2}))"}, # replace issue numbers
60 | ]
61 | # regex for parsing and grouping commits
62 | commit_parsers = [
63 | { message = "^feat", group = "<!-- 0 -->⛰️ Features" },
64 | { message = "^fix", group = "<!-- 1 -->🐛 Bug Fixes" },
65 | { message = "^doc", group = "<!-- 3 -->📚 Documentation" },
66 | { message = "^perf", group = "<!-- 4 -->⚡ Performance" },
67 | { message = "^refactor\\(clippy\\)", skip = true },
68 | { message = "^refactor", group = "<!-- 2 -->🚜 Refactor" },
69 | { message = "^style", group = "<!-- 5 -->🎨 Styling" },
70 | { message = "^test", group = "<!-- 6 -->🧪 Testing" },
71 | { message = "^chore\\(release\\): prepare for", skip = true },
72 | { message = "^chore\\(deps.*\\)", skip = true },
73 | { message = "^chore\\(pr\\)", skip = true },
74 | { message = "^chore\\(pull\\)", skip = true },
75 | { message = "^chore\\(npm\\).*yarn\\.lock", skip = true },
76 | { message = "^chore|^ci", group = "<!-- 7 -->⚙️ Miscellaneous Tasks" },
77 | { body = ".*security", group = "<!-- 8 -->🛡️ Security" },
78 | { message = "^revert", group = "<!-- 9 -->◀️ Revert" },
79 | ]
80 |
81 | # filter out the commits that are not matched by commit parsers
82 | filter_commits = false
83 | # sort the tags topologically
84 | topo_order = false
85 | # sort the commits inside sections by oldest/newest order
86 | sort_commits = "oldest"
87 | # regex for matching git tags
88 | tag_pattern = "^v[0-9]"
89 | # regex for skipping tags
90 | skip_tags = ""
91 | # regex for ignoring tags
92 | ignore_tags = ""
93 | # use tag date instead of commit date
94 | date_order = true
95 | # path to git binary
96 | git_path = "git"
97 | # whether to use relaxed or strict semver parsing
98 | relaxed_semver = true
99 | # only show the changes for the current version
100 | tag_range = true
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_index.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Dict, Any
3 | from ...es_client import OpensearchClient
4 | from mcp.types import TextContent
5 |
6 | class AdminIndexTools(OpensearchClient):
7 | def register_tools(self, mcp: Any):
8 | """Register administrative index-related tools."""
9 |
10 | @mcp.tool(description="Get ISM policies and their configurations")
11 | async def get_ism_policies() -> list[TextContent]:
12 | """
13 | Get Index State Management policies and their configurations.
14 | Returns policy IDs, descriptions, states, and index patterns.
15 | This result should be useful in determining index lifecycle management configurations such as index size limits, index rollover policy
16 | and retention policy.
17 | """
18 | self.logger.info("Fetching ISM policies...")
19 | try:
20 | response = self.es_client.transport.perform_request(
21 | 'GET',
22 | '/_plugins/_ism/policies',
23 | params={'filter_path': 'policies.policy.policy_id,policies.policy.description,policies.policy.states,policies.policy.ism_template.index_patterns'}
24 | )
25 | return [TextContent(type="text", text=str(response))]
26 | except Exception as e:
27 | self.logger.error(f"Error fetching ISM policies: {e}")
28 | return [TextContent(type="text", text=f"Error: {str(e)}")]
29 |
30 | @mcp.tool(description="Get index template configurations")
31 | async def get_index_templates() -> list[TextContent]:
32 | """
33 | Get index templates and their configurations.
34 | Returns template names and their configured number of shards.
35 | This helps understand how new indices will be created.
36 | """
37 | self.logger.info("Fetching index templates...")
38 | try:
39 | response = self.es_client.transport.perform_request(
40 | 'GET',
41 | '/_index_template',
42 | params={'filter_path': ' _index_template?filter_path=index_templates.name,index_templates.index_template.index_patterns,index_templates.index_template.template.settings.index.number_of_shards'}
43 | )
44 | return [TextContent(type="text", text=str(response))]
45 | except Exception as e:
46 | self.logger.error(f"Error fetching index templates: {e}")
47 | return [TextContent(type="text", text=f"Error: {str(e)}")]
48 |
49 | @mcp.tool(description="Get index shard allocation distribution")
50 | async def get_shard_allocation(latest_index: str) -> list[TextContent]:
51 | """
52 | Get the current index shard allocation distribution across nodes.
53 | Returns index name, shard number, primary/replica status, and node assignment.
54 | This helps understand how shards are distributed across the cluster.
55 |
56 | Args:
57 | latest_index: The most recent index of interest.
58 | """
59 | self.logger.info("Fetching shard allocation...")
60 | try:
61 | response = self.es_client.transport.perform_request(
62 | 'GET',
63 | '/_cat/shards',
64 | params={'h': 'index,shard,prirep,node', 'format': 'json'}
65 | )
66 | # Count shards per node
67 | shard_counts = {}
68 | for shard in response:
69 | if shard['node'] not in shard_counts:
70 | shard_counts[shard['node']] = 0
71 | shard_counts[shard['node']] += 1
72 |
73 | # Format the response with both raw data and counts
74 | formatted_response = {
75 | 'shard_distribution': response,
76 | 'shards_per_node': shard_counts
77 | }
78 | return [TextContent(type="text", text=str(formatted_response))]
79 | except Exception as e:
80 | self.logger.error(f"Error fetching shard allocation: {e}")
81 | return [TextContent(type="text", text=f"Error: {str(e)}")]
82 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/dashboards.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import json
3 | from typing import Dict, Any
4 | from ..es_client import OpensearchClient
5 | from mcp.types import TextContent
6 | from urllib.parse import urlencode
7 |
8 | class DashboardTools(OpensearchClient):
9 | def register_tools(self, mcp: Any):
10 | """Register dashboards-related tools."""
11 |
12 |
13 | @mcp.tool(description="List OpenSearch Dashboards index patterns2")
14 | async def list_index_patterns() -> list[TextContent]:
15 | """
16 | Find all index pattern IDs stored in the .kibana index. Especially useful for
17 | identifying the correct index pattern ID to use in the Discover view URL.
18 | This function queries the .kibana index for saved objects of type 'index-pattern'
19 | and returns a list of their titles and IDs.
20 |
21 | Returns:
22 | list[TextContent]: A list containing the found index patterns or an error message.
23 | """
24 | self.logger.info("Searching for index patterns")
25 | try:
26 | response = self.es_client.search(
27 | index=".kibana",
28 | body={
29 | '_source': ['index-pattern.title', '_id'],
30 | 'query': {
31 | 'term': {
32 | 'type': 'index-pattern'
33 | }
34 | }
35 | }
36 | )
37 | patterns = json.dumps([{hit["_source"]["index-pattern"]["title"]: hit["_id"].replace('index-pattern:', '')}
38 | for hit in response["hits"]["hits"]], indent=4)
39 | return [TextContent(type="text", text=(patterns))]
40 | except Exception as e:
41 | self.logger.error(f"Error finding index patterns: {e}")
42 | return [TextContent(type="text", text=f"Error: {(e)}")]
43 |
44 | @mcp.tool(description="Generate OpenSearch Dashboards Discover view URL")
45 | async def generate_discover_url(query: str, index_pattern_id: str, from_time: str, to_time: str) -> list[TextContent]:
46 | """
47 | Generate a URL for the OpenSearch Dashboards Discover view that will display the results of a query.
48 | The argument values must be compatible with the rison data format used by OpenSearch Dashboards.
49 | Use the list index patterns tool to determine the available index pattern IDs.
50 | Index_pattern_id argument must be the ID of the index pattern to be used.
51 | The query arguement must be a valid OpenSearch lucene format.
52 | Refrain from using querying the timestamp or @timestamp fields in the query. Use from_time and to_time parameters instead
53 | The function constructs a URL that includes the query and index pattern as parameters.
54 |
55 | Args:
56 | query str: The query to apply in the Discover view in lucene format.
57 | index_pattern_id str: The index pattern ID to use in the Discover view URL.
58 | from_time str: The starting time for the query in the format like `now-15m`.
59 | to_time str: The ending time for the query in the format like `now`.
60 |
61 | Returns:
62 | list[TextContent]: A list containing the generated URL or an error message.
63 | """
64 | self.logger.info("Generating Discover view URL")
65 | config = self._get_es_config()
66 | try:
67 | base_url = config["dashboards_host"] + "/app/data-explorer/discover#?" #"http[s]://host[:port]/app/data-explorer/discover#? + query_params"
68 | query_params = {
69 | "_g": "(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:'"+from_time+"',to:'"+to_time+"'))",
70 | "_q": "(filters:!(),query:(language:lucene,query:\'"+query+"\'))",
71 | "_a": "(discover:(columns:!(_source),isDirty:!f,sort:!()),metadata:(indexPattern:\'"+index_pattern_id+"\',view:discover))"
72 | }
73 | url = base_url + urlencode(query_params, safe="(),:")
74 | return [TextContent(type="text", text=url)]
75 |
76 | except Exception as e:
77 | self.logger.error(f"Error generating Discover view URL: {e}")
78 | return [TextContent(type="text", text=f"Error: {str(e)}")]
79 |
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_cluster.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Dict, Any
3 | from ...es_client import OpensearchClient
4 | from mcp.types import TextContent
5 |
6 | class AdminClusterTools(OpensearchClient):
7 | def register_tools(self, mcp: Any):
8 | """Register administrative cluster related tools."""
9 |
10 | @mcp.tool(description="Check hot threads on nodes")
11 | async def get_hot_threads() -> list[TextContent]:
12 | """
13 | Get hot threads information from all nodes, filtering for CPU percentage data.
14 | Returns only thread information containing percentage signs, indicating CPU usage.
15 | If no threads show percentage usage, indicates no hot threads were found.
16 | """
17 | self.logger.info("Fetching hot threads information...")
18 | try:
19 | response = self.es_client.transport.perform_request(
20 | 'GET',
21 | '/_nodes/hot_threads'
22 | )
23 | # Filter lines containing '%'
24 | hot_lines = [line for line in str(response).split('\n') if '%' in line]
25 |
26 | if hot_lines:
27 | return [TextContent(type="text", text='\n'.join(hot_lines))]
28 | else:
29 | return [TextContent(type="text", text="No hot threads detected in the cluster.")]
30 | except Exception as e:
31 | self.logger.error(f"Error fetching hot threads: {e}")
32 | return [TextContent(type="text", text=f"Error: {str(e)}")]
33 |
34 | @mcp.tool(description="Get current tasks in cluster")
35 | async def get_tasks() -> list[TextContent]:
36 | """
37 | Get current tasks running in the cluster.
38 | Filters duplicate task types to show only unique operations.
39 | """
40 | self.logger.info("Fetching cluster tasks...")
41 | try:
42 | response = self.es_client.cat.tasks(v=True)
43 | lines = response.split('\n')
44 | seen_tasks = set()
45 | filtered_lines = []
46 |
47 | for line in lines:
48 | if not line.strip():
49 | continue
50 | task_type = line.split()[0]
51 | if task_type not in seen_tasks:
52 | seen_tasks.add(task_type)
53 | filtered_lines.append(line)
54 |
55 | if filtered_lines:
56 | return [TextContent(type="text", text='\n'.join(filtered_lines))]
57 | else:
58 | return [TextContent(type="text", text="No tasks currently running in the cluster.")]
59 | except Exception as e:
60 | self.logger.error(f"Error fetching tasks: {e}")
61 | return [TextContent(type="text", text=f"Error: {str(e)}")]
62 |
63 | @mcp.tool(description="Get recovery status and estimated completion time")
64 | async def get_recovery_status() -> list[TextContent]:
65 | """
66 | Get recovery status for shards that are currently being recovered.
67 | Includes progress percentage and estimated time remaining based on current recovery rate.
68 | """
69 | self.logger.info("Fetching recovery status...")
70 | try:
71 | # Get active recoveries with detailed stats
72 | response = self.es_client.cat.recovery(format='json', active_only=True, v=True)
73 |
74 | if not response:
75 | # Get cluster health to show overall shard status if no active recoveries
76 | health = self.es_client.cluster.health()
77 | total_shards = health['active_shards'] + health['unassigned_shards'] + health['initializing_shards']
78 | active_pct = (health['active_shards'] / total_shards) * 100 if total_shards > 0 else 100
79 |
80 | status_msg = (
81 | f"No active recoveries. Cluster status: {health['status']}\n"
82 | f"Active shards: {health['active_shards']}/{total_shards} ({active_pct:.1f}%)\n"
83 | f"Initializing: {health['initializing_shards']}\n"
84 | f"Unassigned: {health['unassigned_shards']}"
85 | )
86 | return [TextContent(type="text", text=status_msg)]
87 |
88 | # Process active recoveries
89 | summary = []
90 | for recovery in response:
91 | index = recovery['index']
92 | shard = recovery['shard']
93 | stage = recovery.get('stage', 'unknown')
94 |
95 | # Calculate progress and time remaining
96 | files_pct = float(recovery.get('files_percent', '0').rstrip('%'))
97 | bytes_pct = float(recovery.get('bytes_percent', '0').rstrip('%'))
98 | total_bytes = int(recovery.get('total_bytes', 0))
99 | bytes_recovered = int(recovery.get('recovered_in_bytes', 0))
100 |
101 | # Parse time value which can be in format like "1.2s" or "3m" or "2.5h"
102 | time_str = recovery.get('time', '0s')
103 | try:
104 | # Convert time string to milliseconds
105 | if time_str.endswith('ms'):
106 | time_spent_ms = float(time_str[:-2])
107 | elif time_str.endswith('s'):
108 | time_spent_ms = float(time_str[:-1]) * 1000
109 | elif time_str.endswith('m'):
110 | time_spent_ms = float(time_str[:-1]) * 60 * 1000
111 | elif time_str.endswith('h'):
112 | time_spent_ms = float(time_str[:-1]) * 60 * 60 * 1000
113 | else:
114 | time_spent_ms = 0
115 | except ValueError:
116 | time_spent_ms = 0
117 |
118 | # Calculate recovery rate and estimated time remaining
119 | if bytes_recovered > 0 and time_spent_ms > 0:
120 | rate_mb_sec = (bytes_recovered / 1024 / 1024) / (time_spent_ms / 1000)
121 | remaining_bytes = total_bytes - bytes_recovered
122 | est_seconds_remaining = (remaining_bytes / 1024 / 1024) / rate_mb_sec if rate_mb_sec > 0 else 0
123 |
124 | # Format time remaining in a human-readable way
125 | if est_seconds_remaining < 60:
126 | time_remaining = f"{est_seconds_remaining:.0f} seconds"
127 | elif est_seconds_remaining < 3600:
128 | time_remaining = f"{est_seconds_remaining/60:.1f} minutes"
129 | else:
130 | time_remaining = f"{est_seconds_remaining/3600:.1f} hours"
131 |
132 | recovery_info = (
133 | f"Index: {index}, Shard: {shard}\n"
134 | f"Stage: {stage}\n"
135 | f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
136 | f"Rate: {rate_mb_sec:.1f} MB/sec\n"
137 | f"Est. time remaining: {time_remaining}\n"
138 | )
139 | else:
140 | recovery_info = (
141 | f"Index: {index}, Shard: {shard}\n"
142 | f"Stage: {stage}\n"
143 | f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
144 | "Rate: calculating...\n"
145 | )
146 |
147 | summary.append(recovery_info)
148 |
149 | return [TextContent(type="text", text="\n".join(summary))]
150 |
151 | except Exception as e:
152 | self.logger.error(f"Error fetching recovery status: {e}")
153 | return [TextContent(type="text", text=f"Error: {str(e)}")]
154 |
155 |
```