# Directory Structure ``` ├── .DS_Store ├── docker-compose.yml ├── Dockerfile ├── images │ ├── .DS_Store │ ├── claude1.png │ ├── mcpDev0.png │ ├── mcpDev1.png │ └── osclientTest0.png ├── LICENSE ├── pyproject.toml ├── README.md ├── smithery.yaml ├── src │ ├── .DS_Store │ └── mcp-server-opensearch │ ├── __init__.py │ ├── __pycache__ │ │ ├── AsyncOpenSearchClient.cpython-310.pyc │ │ ├── demo.cpython-310.pyc │ │ ├── demo.cpython-312.pyc │ │ ├── opensearch.cpython-310.pyc │ │ ├── OpenSearchClient.cpython-310.pyc │ │ ├── OpenSearchClient.cpython-312.pyc │ │ ├── OpenSearchClient.cpython-313.pyc │ │ ├── server.cpython-310.pyc │ │ ├── server.cpython-313.pyc │ │ └── test_opensearch.cpython-310.pyc │ ├── AsyncOpenSearchClient.py │ ├── demo.py │ ├── OpenSearchClient.py │ ├── server.py │ ├── serverTest.py │ ├── test_AsyncClient.py │ └── test_opensearch.py ├── test_OpenSearchClient.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # mcp-server-opensearch: An OpenSearch MCP Server [](https://smithery.ai/server/@ibrooksSDX/mcp-server-opensearch) > The [Model Context Protocol (MCP)](https://modelcontextprotocol.io/introduction) is an open protocol that enables seamless integration between LLM applications and external data sources and tools. Whether you’re building an AI-powered IDE, enhancing a chat interface, or creating custom AI workflows, MCP provides a standardized way to connect LLMs with the context they need. This repository is an example of how to create a MCP server for [OpenSearch](https://opensearch.org/), a distributed search and analytics engine. # Under Contruction   ## Current Blocker - Async Client from OpenSearch isn't installing [Open Search Async Client Docs](https://github.com/opensearch-project/opensearch-py/blob/main/guides/async.m) ```shell pip install opensearch-py[async] zsh: no matches found: opensearch-py[async] ``` ## Overview A basic Model Context Protocol server for keeping and retrieving memories in the OpenSearch engine. It acts as a semantic memory layer on top of the OpenSearch database. ## Components ### Tools 1. `search-openSearch` - Store a memory in the OpenSearch database - Input: - `query` (json): prepared json query message - Returns: Confirmation message ## Installation ### Installing via Smithery To install mcp-server-opensearch for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ibrooksSDX/mcp-server-opensearch): ```bash npx -y @smithery/cli install @ibrooksSDX/mcp-server-opensearch --client claude ``` ### Using uv (recommended) When using [`uv`](https://docs.astral.sh/uv/) no specific installation is needed to directly run *mcp-server-opensearch*. ```shell uv run mcp-server-opensearch \ --opensearch-url "http://localhost:9200" \ --index-name "my_index" \ ``` or ```shell uv run fastmcp run demo.py:main ``` ## Testing - Local Open Search Client  ```shell uv run python src/mcp-server-opensearch/test_opensearch.py ``` ## Testing - MCP Server Connection to Open Search Client   ```shell cd src/mcp-server-opensearch uv run fastmcp dev demo.py ``` ## Usage with Claude Desktop To use this server with the Claude Desktop app, add the following configuration to the "mcpServers" section of your `claude_desktop_config.json`: ```json { "opensearch": { "command": "uvx", "args": [ "mcp-server-opensearch", "--opensearch-url", "http://localhost:9200", "--opensearch-api-key", "your_api_key", "--index-name", "your_index_name" ] }, "Demo": { "command": "uv", "args": [ "run", "--with", "fastmcp", "--with", "opensearch-py", "fastmcp", "run", "/Users/ibrooks/Documents/GitHub/mcp-server-opensearch/src/mcp-server-opensearch/demo.py" ] } } ``` Or use the FastMCP UI to install the server to Claude ```shell uv run fastmcp install demo.py ``` ## Environment Variables The configuration of the server can be also done using environment variables: - `OPENSEARCH_HOST`: URL of the OpenSearch server, e.g. `http://localhost` - `OPENSEARCH_HOSTPORT`: Port of the host of the OpenSearch server `9200` - `INDEX_NAME`: Name of the index to use ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/__init__.py: -------------------------------------------------------------------------------- ```python from . import server import asyncio def main(): """Main entry point for the package.""" asyncio.run(server.main()) # Optionally expose other important items at package level __all__ = ["server", "demo"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp-server-opensearch" version = "0.1.0" description = "MCP server for OpenSearch" readme = "README.md" requires-python = ">=3.10" dependencies = [ "anthropic>=0.44.0", "fastmcp>=0.4.1", "httpx>=0.28.1", "mcp[cli]>=1.2.0", "opensearch-py>=2.8.0", "python-dotenv>=1.0.1", ] ``` -------------------------------------------------------------------------------- /test_OpenSearchClient.py: -------------------------------------------------------------------------------- ```python from opensearchpy import OpenSearch host = 'localhost' port = 9200 auth = ('admin', 'pizzaParty123') # For testing only. Don't store credentials in code. # Create the client with SSL/TLS and hostname verification disabled. client = OpenSearch( hosts = [{'host': host, 'port': port}], http_compress = True, # enables gzip compression for request bodies http_auth = auth, use_ssl = True, verify_certs = False, ssl_assert_hostname = False, ssl_show_warn = False ) q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['category'] } } } response = client.search( body = query, index = 'opensearch_dashboards_sample_data_ecommerce' ) print('\nSearch results:') print(response) ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: stdio configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: - opensearchUrl - opensearchHostPort - indexName properties: opensearchUrl: type: string description: The URL of the OpenSearch server. opensearchHostPort: type: number description: The port of the host of the OpenSearch server. indexName: type: string description: The name of the index to use. commandFunction: # A function that produces the CLI command to start the MCP on stdio. |- config => ({command: 'uv', args: ['run', 'mcp-server-opensearch', '--opensearch-url', `${config.opensearchUrl}:${config.opensearchHostPort}`, '--index-name', config.indexName]}) ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/test_opensearch.py: -------------------------------------------------------------------------------- ```python import asyncio from OpenSearchClient import OpenSearchClient import json q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['category'] } } } async def test_opensearch(): # Initialize connector connector = OpenSearchClient( opensearch_host="localhost", opensearch_hostPort=9200, index_name="opensearch_dashboards_sample_data_ecommerce", bhttp_compress=True, buse_ssl=True, bverify_certs=False, bssl_assert_hostname=False, bssl_show_warn=False ) # Test search try: search_results = await connector.search_documents(query) print("Search results:", search_results) except Exception as e: print(f"Error during search: {e}") if __name__ == "__main__": asyncio.run(test_opensearch()) ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/test_AsyncClient.py: -------------------------------------------------------------------------------- ```python import asyncio from AsyncOpenSearchClient import OpenSearchClient import json q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['category'] } } } async def test_opensearch(): # Initialize connector connector = await OpenSearchClient( opensearch_host="localhost", opensearch_hostPort=9200, index_name="opensearch_dashboards_sample_data_ecommerce", bhttp_compress=True, buse_ssl=True, bverify_certs=False, bssl_assert_hostname=False, bssl_show_warn=False ) # Test search try: search_results = await connector.search_documents(query) print("Search results:", search_results) except Exception as e: print(f"Error during search: {e}") if __name__ == "__main__": asyncio.run(test_opensearch()) ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile # Start with a Python image that includes the uv tool pre-installed FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv # Set the working directory in the container WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy the project configuration and lockfile COPY pyproject.toml uv.lock ./ # Install the project's dependencies without installing the project itself RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-install-project --no-dev --no-editable # Add the rest of the project source code ADD src /app/src # Install the project RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev --no-editable # Define the entry point command to run the server ENTRYPOINT ["uv", "run", "mcp-server-opensearch", "--opensearch-url", "http://localhost:9200", "--index-name", "my_index"] ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/serverTest.py: -------------------------------------------------------------------------------- ```python import logging from OpenSearchClient import OpenSearchClient from fastmcp import FastMCP #from .tools.index import IndexTools #from .tools.document import DocumentTools #rom .tools.cluster import ClusterTools # TESTING VALUES opensearch_host="localhost" opensearch_hostPort=9200 index_name="test_index" http_compress=True use_ssl=False verify_certs=False ssl_assert_hostname=False ssl_show_warn=False q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['category'] } } } ### END OF TESTING VALUES class OpenSearchMCPServer: def __init__(self): self._name = "opensearch_mcp_server" self.mcp = FastMCP(self._name) #Configure and Establish Client to Open Search try: self._client = OpenSearchClient(opensearch_host, opensearch_hostPort, index_name, http_compress, use_ssl, verify_certs, ssl_assert_hostname, ssl_show_warn) except Exception as e: print(f"Error during search: {e}") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(self._name) # Initialize tools self._register_tools() def _testClient(self): self._client.search_documents(query) def _register_tools(self): """Register all MCP tools.""" # Initialize tool classes #index_tools = IndexTools(self.logger) #document_tools = DocumentTools(self.logger) #cluster_tools = ClusterTools(self.logger) # Register tools from each module #index_tools.register_tools(self.mcp) #document_tools.register_tools(self.mcp) #cluster_tools.register_tools(self.mcp) def run(self): """Run the MCP server.""" self.mcp.run() def main(): server = OpenSearchMCPServer() server.run() ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/OpenSearchClient.py: -------------------------------------------------------------------------------- ```python import os import asyncio import json from fastmcp import FastMCP from opensearchpy import OpenSearch host = 'localhost' port = 9200 def get_string_list_from_json(json_data): """Extracts a list of strings from a JSON object.""" if isinstance(json_data, list): return [item for item in json_data if isinstance(item, str)] elif isinstance(json_data, dict): return [value for value in json_data.values() if isinstance(value, str)] else: return [] class OpenSearchClient: def __init__( self, opensearch_host: str, opensearch_hostPort: str, index_name: str, bhttp_compress: bool, buse_ssl: bool, bverify_certs: bool, bssl_assert_hostname: bool, bssl_show_warn: bool ): self._index_name = index_name, self._host = [{'host': opensearch_host, 'port': opensearch_hostPort}], self._auth =('admin', 'pizzaParty123'), self._client = OpenSearch(hosts = [{'host': host, 'port': port}], http_compress = bhttp_compress, http_auth = ('admin', 'pizzaParty123') , use_ssl = buse_ssl, verify_certs = bverify_certs, ssl_assert_hostname = bssl_assert_hostname, ssl_show_warn = bssl_show_warn) async def search_documents(self, query: str) -> list[str]: """ Find documents in the OpenSearch index. If there are no documents found, an empty list is returned. :param query: The query to use for the search. :return: A list of documents found. """ index_exists = self._client.indices.exists(index=self._index_name) if not index_exists: return [] search_results = self._client.search( body=query, index = self._index_name ) #search_results_json = json.loads(search_results, indent=4) #print(search_results_json) #return search_results return search_results def _return_client(self) -> OpenSearch: """Create and return an OpenSearch client using configuration from environment.""" return self._client ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/AsyncOpenSearchClient.py: -------------------------------------------------------------------------------- ```python import os import asyncio from fastmcp import FastMCP from opensearchpy_async import AsyncOpenSearch, AsyncHttpConnection, helpers host = 'localhost' port = 9200 def get_string_list_from_json(json_data): """Extracts a list of strings from a JSON object.""" if isinstance(json_data, list): return [item for item in json_data if isinstance(item, str)] elif isinstance(json_data, dict): return [value for value in json_data.values() if isinstance(value, str)] else: return [] class OpenSearchClient: def __init__( self, opensearch_host: str, opensearch_hostPort: str, index_name: str, bhttp_compress: bool, buse_ssl: bool, bverify_certs: bool, bssl_assert_hostname: bool, bssl_show_warn: bool ): self._index_name = index_name, self._host = [{'host': opensearch_host, 'port': opensearch_hostPort}], self._auth =('admin', 'pizzaParty123'), self._client = AsyncOpenSearch(hosts = [{'host': host, 'port': port}], http_compress = bhttp_compress, http_auth = ('admin', 'pizzaParty123') , use_ssl = buse_ssl, verify_certs = bverify_certs, ssl_assert_hostname = bssl_assert_hostname, ssl_show_warn = bssl_show_warn) async def search_documents(self, query: str) -> list[str]: """ Find documents in the OpenSearch index. If there are no documents found, an empty list is returned. :param query: The query to use for the search. :return: A list of documents found. """ index_exists = self._client.indices.exists(index=self._index_name) if not index_exists: return [] search_results = self._client.search( body=query, index = self._index_name ) #search_results_json = json.loads(search_results, indent=4) #print(search_results_json) #return search_results return search_results def _return_client(self) -> AsyncOpenSearch: """Create and return an OpenSearch client using configuration from environment.""" return self._client ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/demo.py: -------------------------------------------------------------------------------- ```python # demo.py from fastmcp import FastMCP from OpenSearchClient import OpenSearchClient import mcp.types as types import asyncio import json mcp = FastMCP("Demo") SLEEP_DELAY = 2 # TESTING VALUES T_opensearch_host="localhost" T_opensearch_hostPort=9200 T_index_name="opensearch_dashboards_sample_data_ecommerce" T_http_compress=True T_use_ssl=False T_verify_certs=False T_ssl_assert_hostname=False T_ssl_show_warn=False q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': "Women's Clothing", 'fields': ['category'] } } } ### END OF TESTING VALUES @mcp.tool() async def search_openSearch(query) -> list[types.TextContent] : client = OpenSearchClient(opensearch_host= T_opensearch_host, opensearch_hostPort=T_opensearch_hostPort, index_name=T_index_name, bhttp_compress = T_http_compress, buse_ssl = T_use_ssl, bverify_certs = T_verify_certs, bssl_assert_hostname = T_ssl_assert_hostname, bssl_show_warn = T_ssl_show_warn) queryresult = await client.search_documents(query) content = [ types.TextContent( type="text", text=f"Documents for the query '{query}'" ), ] #for doc in queryresult: ## content.append( # types.TextContent(type="text", text=f"<document>{doc}</document>") # ) content.append(types.TextContent(type="text", text=f"<document>{str(queryresult)}</document>")) return content # Add a dynamic greeting resource @mcp.resource("greeting://{name}") def get_greeting(name: str) -> str: """Get a personalized greeting""" return f"Hello, {name}!" def main(): async def _run(): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): server = serve(search_openSearch) await server.run( read_stream, write_stream, InitializationOptions( server_name="demo", server_version="0.0.1", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) await server.serve_forever() #asyncio.run(_run()) if __name__ == "__main__": asyncio.run(main()._run()) ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml version: '3' services: opensearch-node1: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/) image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version container_name: opensearch-node1 environment: - cluster.name=opensearch-cluster # Name the cluster - node.name=opensearch-node1 # Name the node that will run in this container - discovery.seed_hosts=opensearch-node1,opensearch-node2 # Nodes to look for when discovering the cluster - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 # Nodes eligible to serve as cluster manager - bootstrap.memory_lock=true # Disable JVM heap memory swapping - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM - OPENSEARCH_INITIAL_ADMIN_PASSWORD=pizzaParty123 # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and later ulimits: memlock: soft: -1 # Set memlock to unlimited (no soft or hard limit) hard: -1 nofile: soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536 hard: 65536 volumes: - opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container ports: - 9200:9200 # REST API - 9600:9600 # Performance Analyzer networks: - opensearch-net # All of the containers will join the same Docker bridge network opensearch-node2: image: opensearchproject/opensearch:latest # This should be the same image used for opensearch-node1 to avoid issues container_name: opensearch-node2 environment: - cluster.name=opensearch-cluster - node.name=opensearch-node2 - discovery.seed_hosts=opensearch-node1,opensearch-node2 - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - OPENSEARCH_INITIAL_ADMIN_PASSWORD=pizzaParty123 ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-data2:/usr/share/opensearch/data networks: - opensearch-net opensearch-dashboards: image: opensearchproject/opensearch-dashboards:latest # Make sure the version of opensearch-dashboards matches the version of opensearch installed on other nodes container_name: opensearch-dashboards ports: - 5601:5601 # Map host port 5601 to container port 5601 expose: - "5601" # Expose port 5601 for web access to OpenSearch Dashboards environment: OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' # Define the OpenSearch nodes that OpenSearch Dashboards will query networks: - opensearch-net volumes: opensearch-data1: opensearch-data2: networks: opensearch-net: ``` -------------------------------------------------------------------------------- /src/mcp-server-opensearch/server.py: -------------------------------------------------------------------------------- ```python import httpx import click import asyncio #from mcp.server.lowlevel import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types from mcp.server.fastmcp import FastMCP from mcp.server import Server from OpenSearchClient import OpenSearchClient # TESTING VALUES T_opensearch_host="localhost" T_opensearch_hostPort=9200 T_index_name="opensearch_dashboards_sample_data_ecommerce" T_http_compress=True T_use_ssl=False T_verify_certs=False T_ssl_assert_hostname=False T_ssl_show_warn=False q = "Women's Clothing" query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['category'] } } } ### END OF TESTING VALUES def serve( opensearch_host: str, opensearch_hostPort: str, index_name: str, http_compress: bool, use_ssl: bool, verify_certs: bool, ssl_assert_hostname: bool, ssl_show_warn: bool ) -> Server: """ Instantiate the server and configure tools to store and find documents in OpenSearch. :param opensearch_host: The URL of the OpenSearch server. :param opensearch_hostPort: The port number of the OpenSearch server. :param index_name: The name of the index to use. """ serverAPP = FastMCP("opensearch") opensearch = OpenSearchClient( opensearch_host = T_opensearch_host, opensearch_hostPort = T_opensearch_hostPort, index_name = T_index_name, bhttp_compress=T_http_compress, buse_ssl=T_use_ssl, bverify_certs=T_verify_certs, bssl_assert_hostname=T_ssl_assert_hostname, bssl_show_warn=T_ssl_show_warn ) @serverAPP.list_tools() async def handle_list_tools() -> list[types.Tool]: """ Return the list of tools that the server provides. By default, there are two tools: one to store documents and another to find them. """ return [ types.Tool( name="opensearch-find-documents", description=( "Look up documents in OpenSearch. Use this tool when you need to: \n" " - Find documents by their index or content" ), inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The query to search for in the documents", }, }, "required": ["query"], }, ), ] @serverAPP.call_tool() async def handle_tool_call( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name not in ["opensearch-find-documents"]: raise ValueError(f"Unknown tool: {name}") if name == "opensearch-find-documents": if not arguments or "query" not in arguments: raise ValueError("Missing required argument 'query'") query = arguments["query"] documents = await opensearch.search_documents(query) content = [ types.TextContent( type="text", text=f"Documents for the query '{query}'" ), ] for doc in documents: content.append( types.TextContent(type="text", text=f"<document>{doc}</document>") ) return content return serverAPP @click.command() @click.option( "--opensearch_host", envvar="OPENSEARCH_HOST", required=True, help="Open Search Host URL", ) @click.option( "--opensearch_hostPort", envvar="OPENSEARCH_HOST_PORT", required=True, help="Open Search Port Number", ) @click.option( "--index-name", envvar="INDEX_NAME", required=True, help="Index name", ) def main( opensearch_host: str, opensearch_hostPort: str, index_name: str, http_compress: bool, use_ssl: bool, verify_certs: bool, ssl_assert_hostname: bool, ssl_show_warn: bool ): async def _run(): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): server = serve( opensearch_host, opensearch_hostPort, index_name, http_compress, use_ssl, verify_certs, ssl_assert_hostname, ssl_show_warn ) await server.run( read_stream, write_stream, InitializationOptions( server_name="openSearch", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) asyncio.run(_run()) ```