# Directory Structure
```
├── .env-template
├── .github
│ └── workflows
│ ├── pypi-publish.yaml
│ └── release.yml
├── .gitignore
├── .python-version
├── cliff.toml
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── src
│ └── opensearch_mcp_server
│ ├── __init__.py
│ ├── es_client.py
│ ├── server.py
│ └── tools
│ ├── cluster.py
│ ├── dashboards.py
│ ├── document.py
│ ├── es_admin
│ │ ├── admin_cluster.py
│ │ └── admin_index.py
│ └── index.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.10
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.idea
.vscode
.venv
dist
__pycache__
*.egg-info
.env
.DS_Store
**/.DS_Store
```
--------------------------------------------------------------------------------
/.env-template:
--------------------------------------------------------------------------------
```
# OpenSearch connection settings
OPENSEARCH_HOST=https://localhost:9200
OPENSEARCH_USERNAME=admin
OPENSEARCH_PASSWORD=admin
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# OpenSearch MCP Server
## Overview
This Repository Fork of [elastic-mcp-server](https://github.com/cr7258/elasticsearch-mcp-server) and Converted to [opensearch-mcp-server](https://github.com/seohyunjun/opensearch-mcp-server) MCP Server. It is a Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.
A Model Context Protocol (MCP) server implementation that provides opensearch interaction. This server enables searching documents, analyzing indices, and managing cluster through a set of tools.
## Features
### Index Operations
- `list_indices`: List all indices in the Opensearch cluster.
- `get_mapping`: Retrieve the mapping configuration for a specific index.
- `get_settings`: Get the settings configuration for a specific index.
### Document Operations
- `search_documents`: Search documents in an index using Opensearch Query DSL.
### Cluster Operations
- `get_cluster_health`: Get health status of the cluster.
- `get_cluster_stats`: Get statistical information about the cluster.
## Start Opensearch Cluster
Start the Opensearch cluster using Docker Compose:
```bash
docker-compose up -d
```
This will start a 3-node Opensearch cluster and Kibana. Default Opensearch username `opensearch`, password `test123`.
You can access Kibana from http://localhost:5601.
## Usage with Claude Desktop
### Using uv with local development
Using `uv` requires cloning the repository locally and specifying the path to the source code. Add the following configuration to Claude Desktop's config file `claude_desktop_config.json`.
you need to change `path/to/src/opensearch_mcp_server` to the path where you cloned the repository.
```json
{
"mcpServers": {
"opensearch": {
"command": "uv",
"args": [
"--directory",
"path/to/src/opensearch_mcp_server",
"run",
"opensearch-mcp-server"
],
"env": {
"OPENSEARCH_HOST": "https://localhost:9200",
"OPENSEARCH_USERNAME": "opensearch",
"OPENSEARCH_PASSWORD": "test123",
"DASHBOARDS_HOST": "https://localhost:5601"
}
}
}
}
```
- On macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
- On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
Restart Claude Desktop to load the new MCP server.
Now you can interact with your Opensearch cluster through Claude using natural language commands like:
- "List all indices in the cluster"
- "How old is the student Bob?"
- "Show me the cluster health status"
## License
This project is licensed under the Apache License Version 2.0 - see the [LICENSE](LICENSE) file for details.
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
from . import server
def main():
"""Main entry point for the package."""
server.main()
# Optionally expose other important items at package level
__all__ = ["main", "server"]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "opensearch-mcp-server"
version = "1.0.0"
description = "MCP Server for interacting with OpenSearch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"opensearch-py<=2.8.0",
"mcp>=1.0.0",
"python-dotenv>=1.0.0",
"fastmcp>=0.4.0",
]
[project.license]
file = "LICENSE"
[project.scripts]
opensearch-mcp-server = "opensearch_mcp_server:main"
[build-system]
requires = [
"hatchling",
]
build-backend = "hatchling.build"
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Start with a Python base image
FROM python:3.10-slim
# Set working directory
WORKDIR /app
# Copy necessary files
COPY . .
# Install hatch to handle the build
RUN pip install hatch
# Clean dist directory before build
RUN rm -rf dist/*
# Use hatch to build the package and install it
RUN hatch build && pip install dist/*.whl
# Set environment variables required for the MCP server
# These can be overridden at runtime with docker run --env
ENV OPENSEARCH_HOST="https://localhost:9200"
ENV OPENSEARCH_USERNAME="opensearch"
ENV OPENSEARCH_PASSWORD="test123"
# Expose the port the server is running on (if applicable)
EXPOSE 8000
# Command to run the server
ENTRYPOINT ["opensearch-mcp-server"]
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
name: Release
on:
push:
tags:
- 'v*'
jobs:
release:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install git-cliff
- name: Get version from tag
id: get_version
run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV
- name: Generate changelog
run: |
git-cliff --output CHANGELOG.md --latest
- name: Create Release
uses: softprops/action-gh-release@v1
with:
name: v${{ env.VERSION }}
body_path: CHANGELOG.md
draft: false
prerelease: false
```
--------------------------------------------------------------------------------
/.github/workflows/pypi-publish.yaml:
--------------------------------------------------------------------------------
```yaml
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: PyPI Publish
on:
workflow_run:
workflows: ["Release"]
types:
- completed
env:
UV_PUBLISH_TOKEN: '${{ secrets.PYPI_API_TOKEN }}'
jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.10.x'
- name: Install dependencies
run: |
python -m pip install uv
uv sync
- name: Build package
run: uv build
- name: Publish package
run: uv publish
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/es_client.py:
--------------------------------------------------------------------------------
```python
import logging
import os
from dotenv import load_dotenv
from opensearchpy import OpenSearch
import warnings
class OpensearchClient:
def __init__(self, logger: logging.Logger):
self.logger = logger
self.es_client = self._create_opensearch_client()
def _get_es_config(self):
"""Get OpenSearch configuration from environment variables."""
# Load environment variables from .env file
load_dotenv()
config = {
"host": os.getenv("OPENSEARCH_HOST"),
"username": os.getenv("OPENSEARCH_USERNAME"),
"password": os.getenv("OPENSEARCH_PASSWORD"),
"dashboards_host": os.getenv("DASHBOARDS_HOST"),
}
if not all([config["username"], config["password"]]):
self.logger.error(
"Missing required OpenSearch configuration. Please check environment variables:"
)
self.logger.error(
"OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD are required"
)
raise ValueError("Missing required OpenSearch configuration")
return config
def _create_opensearch_client(self) -> OpenSearch:
"""Create and return an OpenSearch client using configuration from environment."""
config = self._get_es_config()
# Disable SSL warnings
warnings.filterwarnings(
"ignore",
message=".*TLS with verify_certs=False is insecure.*",
)
return OpenSearch(
config["host"],
http_auth=(config["username"], config["password"]),
verify_certs=False,
)
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
import logging
from fastmcp import FastMCP
from .tools.index import IndexTools
from .tools.document import DocumentTools
from .tools.cluster import ClusterTools
from .tools.dashboards import DashboardTools
from .tools.es_admin.admin_index import AdminIndexTools
from .tools.es_admin.admin_cluster import AdminClusterTools
class OpensearchMCPServer:
def __init__(self):
self.name = "opensearch_mcp_server"
self.mcp = FastMCP(self.name)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(self.name)
# Initialize tools
self._register_tools()
def _register_tools(self):
"""Register all MCP tools."""
# Initialize tool classes
index_tools = IndexTools(self.logger)
document_tools = DocumentTools(self.logger)
cluster_tools = ClusterTools(self.logger)
dashboard_tools = DashboardTools(self.logger)
admin_index_tools = AdminIndexTools(self.logger)
admin_cluster_tools = AdminClusterTools(self.logger)
# Register tools from each module
index_tools.register_tools(self.mcp)
document_tools.register_tools(self.mcp)
cluster_tools.register_tools(self.mcp)
dashboard_tools.register_tools(self.mcp)
admin_index_tools.register_tools(self.mcp)
admin_cluster_tools.register_tools(self.mcp)
def run(self):
"""Run the MCP server."""
self.mcp.run()
def main():
server = OpensearchMCPServer()
server.run()
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/cluster.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent
class ClusterTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register cluster-related tools."""
@mcp.tool(description="Get cluster health status")
async def get_cluster_health() -> list[TextContent]:
"""
Get health status of the Opensearch cluster.
Returns information about the number of nodes, shards, etc.
"""
self.logger.info("Getting cluster health")
try:
response = self.es_client.cluster.health()
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error getting cluster health: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get cluster statistics")
async def get_cluster_stats() -> list[TextContent]:
"""
Get statistics from a cluster wide perspective.
The API returns basic index metrics (shard numbers, store size, memory usage) and information
about the current nodes that form the cluster (number, roles, os, jvm versions, memory usage, cpu and installed plugins).
https://opensearch.org/docs/latest/tuning-your-cluster/
"""
self.logger.info("Getting cluster stats")
try:
response = self.es_client.cluster.stats()
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error getting cluster stats: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/document.py:
--------------------------------------------------------------------------------
```python
import logging
import json
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent
class DocumentTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register document-related tools."""
@mcp.tool(description="Search documents in an opensearch index with a custom query")
async def search_documents(index: str, body: dict) -> list[TextContent]:
"""
Search documents in a specified opensearch index using a custom query.
Args:
index: Name of the index to search
body: Opensearch query DSL. If size is not specified, defaults to 20 results.
"""
# Ensure reasonable default size limit is set
if 'size' not in body:
body['size'] = 20
self.logger.info(f"Searching in index: {index} with query: {body}")
try:
response = self.es_client.search(index=index, body=body)
# Extract and format relevant information
formatted_response = {
'total_hits': response['hits']['total']['value'],
'max_score': response['hits']['max_score'],
'hits': []
}
# Process each hit
for hit in response['hits']['hits']:
hit_data = {
'_id': hit['_id'],
'_score': hit['_score'],
'source': hit['_source']
}
formatted_response['hits'].append(hit_data)
# Include aggregations if present
if 'aggregations' in response:
formatted_response['aggregations'] = response['aggregations']
return [TextContent(type="text", text=json.dumps(formatted_response, indent=2))]
except Exception as e:
self.logger.error(f"Error searching documents: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/index.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent
class IndexTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register index-related tools."""
@mcp.tool(description="List all indices in the Opensearch cluster")
async def list_indices() -> list[TextContent]:
"""
List all indices in the Opensearch cluster.
It is important to check the indices before searching documents
to understand what indices are avilable.
"""
self.logger.info("Listing indices...")
try:
indices = self.es_client.cat.indices(format="json")
return [TextContent(type="text", text=str(indices))]
except Exception as e:
self.logger.error(f"Error listing indices: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get index mapping")
async def get_mapping(index: str) -> list[TextContent]:
"""
Get the mapping for an index.
It is important to always check the mappings to understand
the exact field names and types before constructing queries or URLs.
Args:
index: Name of the index
"""
self.logger.info(f"Getting mapping for index: {index}")
try:
response = self.es_client.indices.get_mapping(index=index)
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error getting mapping: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get index settings")
async def get_settings(index: str) -> list[TextContent]:
"""
Get the settings for an index.
Args:
index: Name of the index
"""
self.logger.info(f"Getting settings for index: {index}")
try:
response = self.es_client.indices.get_settings(index=index, h=["index", "health"])
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error getting settings: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/cliff.toml:
--------------------------------------------------------------------------------
```toml
# git-cliff ~ configuration file
# https://git-cliff.org/docs/configuration
[changelog]
# template for the changelog header
header = """
# Changelog\n
"""
# template for the changelog body
# https://keats.github.io/tera/docs/#introduction
body = """
{% if version %}\
{% if previous.version %}\
## [{{ version | trim_start_matches(pat="v") }}]($REPO/compare/{{ previous.version }}..{{ version }}) - {{ timestamp | date(format="%Y-%m-%d") }}
{% else %}\
## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }}
{% endif %}\
{% else %}\
## [unreleased]
{% endif %}\
{% for group, commits in commits | group_by(attribute="group") %}
### {{ group | striptags | trim | upper_first }}
{% for commit in commits
| filter(attribute="scope")
| sort(attribute="scope") %}
- **({{commit.scope}})**{% if commit.breaking %} [**breaking**]{% endif %} \
{{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
{%- endfor -%}
{% raw %}\n{% endraw %}\
{%- for commit in commits %}
{%- if commit.scope -%}
{% else -%}
- {% if commit.breaking %} [**breaking**]{% endif %}\
{{ commit.message }} - ([{{ commit.id | truncate(length=7, end="") }}]($REPO/commit/{{ commit.id }})) - @{{ commit.author.name }}
{% endif -%}
{% endfor -%}
{% endfor %}\n
"""
# template for the changelog footer
footer = """
<!-- generated by git-cliff -->
"""
# remove the leading and trailing whitespace from the templates
trim = true
# postprocessors
postprocessors = [
{ pattern = '\$REPO', replace = "https://github.com/hyunjunseo/opensearch-mcp-server" }, # replace repository URL
]
[git]
# parse the commits based on https://www.conventionalcommits.org
conventional_commits = true
# filter out the commits that are not conventional
filter_unconventional = true
# process each line of a commit as an individual commit
split_commits = false
# regex for preprocessing the commit messages
commit_preprocessors = [
# { pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](https://github.com/cr7258/elasticsearch-mcp-server/issues/${2}))"}, # replace issue numbers
]
# regex for parsing and grouping commits
commit_parsers = [
{ message = "^feat", group = "<!-- 0 -->⛰️ Features" },
{ message = "^fix", group = "<!-- 1 -->🐛 Bug Fixes" },
{ message = "^doc", group = "<!-- 3 -->📚 Documentation" },
{ message = "^perf", group = "<!-- 4 -->⚡ Performance" },
{ message = "^refactor\\(clippy\\)", skip = true },
{ message = "^refactor", group = "<!-- 2 -->🚜 Refactor" },
{ message = "^style", group = "<!-- 5 -->🎨 Styling" },
{ message = "^test", group = "<!-- 6 -->🧪 Testing" },
{ message = "^chore\\(release\\): prepare for", skip = true },
{ message = "^chore\\(deps.*\\)", skip = true },
{ message = "^chore\\(pr\\)", skip = true },
{ message = "^chore\\(pull\\)", skip = true },
{ message = "^chore\\(npm\\).*yarn\\.lock", skip = true },
{ message = "^chore|^ci", group = "<!-- 7 -->⚙️ Miscellaneous Tasks" },
{ body = ".*security", group = "<!-- 8 -->🛡️ Security" },
{ message = "^revert", group = "<!-- 9 -->◀️ Revert" },
]
# filter out the commits that are not matched by commit parsers
filter_commits = false
# sort the tags topologically
topo_order = false
# sort the commits inside sections by oldest/newest order
sort_commits = "oldest"
# regex for matching git tags
tag_pattern = "^v[0-9]"
# regex for skipping tags
skip_tags = ""
# regex for ignoring tags
ignore_tags = ""
# use tag date instead of commit date
date_order = true
# path to git binary
git_path = "git"
# whether to use relaxed or strict semver parsing
relaxed_semver = true
# only show the changes for the current version
tag_range = true
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_index.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Dict, Any
from ...es_client import OpensearchClient
from mcp.types import TextContent
class AdminIndexTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register administrative index-related tools."""
@mcp.tool(description="Get ISM policies and their configurations")
async def get_ism_policies() -> list[TextContent]:
"""
Get Index State Management policies and their configurations.
Returns policy IDs, descriptions, states, and index patterns.
This result should be useful in determining index lifecycle management configurations such as index size limits, index rollover policy
and retention policy.
"""
self.logger.info("Fetching ISM policies...")
try:
response = self.es_client.transport.perform_request(
'GET',
'/_plugins/_ism/policies',
params={'filter_path': 'policies.policy.policy_id,policies.policy.description,policies.policy.states,policies.policy.ism_template.index_patterns'}
)
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error fetching ISM policies: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get index template configurations")
async def get_index_templates() -> list[TextContent]:
"""
Get index templates and their configurations.
Returns template names and their configured number of shards.
This helps understand how new indices will be created.
"""
self.logger.info("Fetching index templates...")
try:
response = self.es_client.transport.perform_request(
'GET',
'/_index_template',
params={'filter_path': ' _index_template?filter_path=index_templates.name,index_templates.index_template.index_patterns,index_templates.index_template.template.settings.index.number_of_shards'}
)
return [TextContent(type="text", text=str(response))]
except Exception as e:
self.logger.error(f"Error fetching index templates: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get index shard allocation distribution")
async def get_shard_allocation(latest_index: str) -> list[TextContent]:
"""
Get the current index shard allocation distribution across nodes.
Returns index name, shard number, primary/replica status, and node assignment.
This helps understand how shards are distributed across the cluster.
Args:
latest_index: The most recent index of interest.
"""
self.logger.info("Fetching shard allocation...")
try:
response = self.es_client.transport.perform_request(
'GET',
'/_cat/shards',
params={'h': 'index,shard,prirep,node', 'format': 'json'}
)
# Count shards per node
shard_counts = {}
for shard in response:
if shard['node'] not in shard_counts:
shard_counts[shard['node']] = 0
shard_counts[shard['node']] += 1
# Format the response with both raw data and counts
formatted_response = {
'shard_distribution': response,
'shards_per_node': shard_counts
}
return [TextContent(type="text", text=str(formatted_response))]
except Exception as e:
self.logger.error(f"Error fetching shard allocation: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/dashboards.py:
--------------------------------------------------------------------------------
```python
import logging
import json
from typing import Dict, Any
from ..es_client import OpensearchClient
from mcp.types import TextContent
from urllib.parse import urlencode
class DashboardTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register dashboards-related tools."""
@mcp.tool(description="List OpenSearch Dashboards index patterns2")
async def list_index_patterns() -> list[TextContent]:
"""
Find all index pattern IDs stored in the .kibana index. Especially useful for
identifying the correct index pattern ID to use in the Discover view URL.
This function queries the .kibana index for saved objects of type 'index-pattern'
and returns a list of their titles and IDs.
Returns:
list[TextContent]: A list containing the found index patterns or an error message.
"""
self.logger.info("Searching for index patterns")
try:
response = self.es_client.search(
index=".kibana",
body={
'_source': ['index-pattern.title', '_id'],
'query': {
'term': {
'type': 'index-pattern'
}
}
}
)
patterns = json.dumps([{hit["_source"]["index-pattern"]["title"]: hit["_id"].replace('index-pattern:', '')}
for hit in response["hits"]["hits"]], indent=4)
return [TextContent(type="text", text=(patterns))]
except Exception as e:
self.logger.error(f"Error finding index patterns: {e}")
return [TextContent(type="text", text=f"Error: {(e)}")]
@mcp.tool(description="Generate OpenSearch Dashboards Discover view URL")
async def generate_discover_url(query: str, index_pattern_id: str, from_time: str, to_time: str) -> list[TextContent]:
"""
Generate a URL for the OpenSearch Dashboards Discover view that will display the results of a query.
The argument values must be compatible with the rison data format used by OpenSearch Dashboards.
Use the list index patterns tool to determine the available index pattern IDs.
Index_pattern_id argument must be the ID of the index pattern to be used.
The query arguement must be a valid OpenSearch lucene format.
Refrain from using querying the timestamp or @timestamp fields in the query. Use from_time and to_time parameters instead
The function constructs a URL that includes the query and index pattern as parameters.
Args:
query str: The query to apply in the Discover view in lucene format.
index_pattern_id str: The index pattern ID to use in the Discover view URL.
from_time str: The starting time for the query in the format like `now-15m`.
to_time str: The ending time for the query in the format like `now`.
Returns:
list[TextContent]: A list containing the generated URL or an error message.
"""
self.logger.info("Generating Discover view URL")
config = self._get_es_config()
try:
base_url = config["dashboards_host"] + "/app/data-explorer/discover#?" #"http[s]://host[:port]/app/data-explorer/discover#? + query_params"
query_params = {
"_g": "(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:'"+from_time+"',to:'"+to_time+"'))",
"_q": "(filters:!(),query:(language:lucene,query:\'"+query+"\'))",
"_a": "(discover:(columns:!(_source),isDirty:!f,sort:!()),metadata:(indexPattern:\'"+index_pattern_id+"\',view:discover))"
}
url = base_url + urlencode(query_params, safe="(),:")
return [TextContent(type="text", text=url)]
except Exception as e:
self.logger.error(f"Error generating Discover view URL: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/opensearch_mcp_server/tools/es_admin/admin_cluster.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Dict, Any
from ...es_client import OpensearchClient
from mcp.types import TextContent
class AdminClusterTools(OpensearchClient):
def register_tools(self, mcp: Any):
"""Register administrative cluster related tools."""
@mcp.tool(description="Check hot threads on nodes")
async def get_hot_threads() -> list[TextContent]:
"""
Get hot threads information from all nodes, filtering for CPU percentage data.
Returns only thread information containing percentage signs, indicating CPU usage.
If no threads show percentage usage, indicates no hot threads were found.
"""
self.logger.info("Fetching hot threads information...")
try:
response = self.es_client.transport.perform_request(
'GET',
'/_nodes/hot_threads'
)
# Filter lines containing '%'
hot_lines = [line for line in str(response).split('\n') if '%' in line]
if hot_lines:
return [TextContent(type="text", text='\n'.join(hot_lines))]
else:
return [TextContent(type="text", text="No hot threads detected in the cluster.")]
except Exception as e:
self.logger.error(f"Error fetching hot threads: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get current tasks in cluster")
async def get_tasks() -> list[TextContent]:
"""
Get current tasks running in the cluster.
Filters duplicate task types to show only unique operations.
"""
self.logger.info("Fetching cluster tasks...")
try:
response = self.es_client.cat.tasks(v=True)
lines = response.split('\n')
seen_tasks = set()
filtered_lines = []
for line in lines:
if not line.strip():
continue
task_type = line.split()[0]
if task_type not in seen_tasks:
seen_tasks.add(task_type)
filtered_lines.append(line)
if filtered_lines:
return [TextContent(type="text", text='\n'.join(filtered_lines))]
else:
return [TextContent(type="text", text="No tasks currently running in the cluster.")]
except Exception as e:
self.logger.error(f"Error fetching tasks: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
@mcp.tool(description="Get recovery status and estimated completion time")
async def get_recovery_status() -> list[TextContent]:
"""
Get recovery status for shards that are currently being recovered.
Includes progress percentage and estimated time remaining based on current recovery rate.
"""
self.logger.info("Fetching recovery status...")
try:
# Get active recoveries with detailed stats
response = self.es_client.cat.recovery(format='json', active_only=True, v=True)
if not response:
# Get cluster health to show overall shard status if no active recoveries
health = self.es_client.cluster.health()
total_shards = health['active_shards'] + health['unassigned_shards'] + health['initializing_shards']
active_pct = (health['active_shards'] / total_shards) * 100 if total_shards > 0 else 100
status_msg = (
f"No active recoveries. Cluster status: {health['status']}\n"
f"Active shards: {health['active_shards']}/{total_shards} ({active_pct:.1f}%)\n"
f"Initializing: {health['initializing_shards']}\n"
f"Unassigned: {health['unassigned_shards']}"
)
return [TextContent(type="text", text=status_msg)]
# Process active recoveries
summary = []
for recovery in response:
index = recovery['index']
shard = recovery['shard']
stage = recovery.get('stage', 'unknown')
# Calculate progress and time remaining
files_pct = float(recovery.get('files_percent', '0').rstrip('%'))
bytes_pct = float(recovery.get('bytes_percent', '0').rstrip('%'))
total_bytes = int(recovery.get('total_bytes', 0))
bytes_recovered = int(recovery.get('recovered_in_bytes', 0))
# Parse time value which can be in format like "1.2s" or "3m" or "2.5h"
time_str = recovery.get('time', '0s')
try:
# Convert time string to milliseconds
if time_str.endswith('ms'):
time_spent_ms = float(time_str[:-2])
elif time_str.endswith('s'):
time_spent_ms = float(time_str[:-1]) * 1000
elif time_str.endswith('m'):
time_spent_ms = float(time_str[:-1]) * 60 * 1000
elif time_str.endswith('h'):
time_spent_ms = float(time_str[:-1]) * 60 * 60 * 1000
else:
time_spent_ms = 0
except ValueError:
time_spent_ms = 0
# Calculate recovery rate and estimated time remaining
if bytes_recovered > 0 and time_spent_ms > 0:
rate_mb_sec = (bytes_recovered / 1024 / 1024) / (time_spent_ms / 1000)
remaining_bytes = total_bytes - bytes_recovered
est_seconds_remaining = (remaining_bytes / 1024 / 1024) / rate_mb_sec if rate_mb_sec > 0 else 0
# Format time remaining in a human-readable way
if est_seconds_remaining < 60:
time_remaining = f"{est_seconds_remaining:.0f} seconds"
elif est_seconds_remaining < 3600:
time_remaining = f"{est_seconds_remaining/60:.1f} minutes"
else:
time_remaining = f"{est_seconds_remaining/3600:.1f} hours"
recovery_info = (
f"Index: {index}, Shard: {shard}\n"
f"Stage: {stage}\n"
f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
f"Rate: {rate_mb_sec:.1f} MB/sec\n"
f"Est. time remaining: {time_remaining}\n"
)
else:
recovery_info = (
f"Index: {index}, Shard: {shard}\n"
f"Stage: {stage}\n"
f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
"Rate: calculating...\n"
)
summary.append(recovery_info)
return [TextContent(type="text", text="\n".join(summary))]
except Exception as e:
self.logger.error(f"Error fetching recovery status: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
```