#
tokens: 6648/50000 14/14 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .DS_Store
├── docker-compose.yml
├── Dockerfile
├── images
│   ├── .DS_Store
│   ├── claude1.png
│   ├── mcpDev0.png
│   ├── mcpDev1.png
│   └── osclientTest0.png
├── LICENSE
├── pyproject.toml
├── README.md
├── smithery.yaml
├── src
│   ├── .DS_Store
│   └── mcp-server-opensearch
│       ├── __init__.py
│       ├── __pycache__
│       │   ├── AsyncOpenSearchClient.cpython-310.pyc
│       │   ├── demo.cpython-310.pyc
│       │   ├── demo.cpython-312.pyc
│       │   ├── opensearch.cpython-310.pyc
│       │   ├── OpenSearchClient.cpython-310.pyc
│       │   ├── OpenSearchClient.cpython-312.pyc
│       │   ├── OpenSearchClient.cpython-313.pyc
│       │   ├── server.cpython-310.pyc
│       │   ├── server.cpython-313.pyc
│       │   └── test_opensearch.cpython-310.pyc
│       ├── AsyncOpenSearchClient.py
│       ├── demo.py
│       ├── OpenSearchClient.py
│       ├── server.py
│       ├── serverTest.py
│       ├── test_AsyncClient.py
│       └── test_opensearch.py
├── test_OpenSearchClient.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# mcp-server-opensearch: An OpenSearch MCP Server
[![smithery badge](https://smithery.ai/badge/@ibrooksSDX/mcp-server-opensearch)](https://smithery.ai/server/@ibrooksSDX/mcp-server-opensearch)

> The [Model Context Protocol (MCP)](https://modelcontextprotocol.io/introduction) is an open protocol that enables seamless integration between LLM applications and external data sources and tools. Whether you’re building an AI-powered IDE, enhancing a chat interface, or creating custom AI workflows, MCP provides a standardized way to connect LLMs with the context they need.

This repository is an example of how to create a MCP server for [OpenSearch](https://opensearch.org/), a distributed search and analytics engine.

# Under Contruction 

![image1](./images/claude1.png)
![image2](./images/mcpDev1.png)


## Current Blocker - Async Client from OpenSearch isn't installing

[Open Search Async Client Docs](https://github.com/opensearch-project/opensearch-py/blob/main/guides/async.m) 

```shell
pip install opensearch-py[async]
zsh: no matches found: opensearch-py[async]
```

## Overview 

A basic Model Context Protocol server for keeping and retrieving memories in the OpenSearch engine.
It acts as a semantic memory layer on top of the OpenSearch database.

## Components

### Tools

1. `search-openSearch`
   - Store a memory in the OpenSearch database
   - Input:
     - `query` (json): prepared json query message
   - Returns: Confirmation message

## Installation

### Installing via Smithery

To install mcp-server-opensearch for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ibrooksSDX/mcp-server-opensearch):

```bash
npx -y @smithery/cli install @ibrooksSDX/mcp-server-opensearch --client claude
```

### Using uv (recommended)

When using [`uv`](https://docs.astral.sh/uv/) no specific installation is needed to directly run *mcp-server-opensearch*.

```shell
uv run mcp-server-opensearch \
  --opensearch-url "http://localhost:9200" \
  --index-name "my_index" \
```
or 

```shell
uv run fastmcp run demo.py:main
```

## Testing - Local Open Search Client

![image4](./images/osclientTest0.png)

```shell
uv run python src/mcp-server-opensearch/test_opensearch.py
```
## Testing - MCP Server Connection to Open Search Client

![image1](./images/mcpDev0.png)
![image2](./images/mcpDev1.png)

```shell
cd src/mcp-server-opensearch
uv run fastmcp dev demo.py
```

## Usage with Claude Desktop

To use this server with the Claude Desktop app, add the following configuration to the "mcpServers" section of your `claude_desktop_config.json`:

```json
{
  "opensearch": {
    "command": "uvx",
    "args": [
      "mcp-server-opensearch",
      "--opensearch-url",
      "http://localhost:9200",
      "--opensearch-api-key",
      "your_api_key",
      "--index-name",
      "your_index_name"
    ]
  }, "Demo": {
      "command": "uv",
      "args": [
        "run",
        "--with",
        "fastmcp",
        "--with",
        "opensearch-py",
        "fastmcp",
        "run",
        "/Users/ibrooks/Documents/GitHub/mcp-server-opensearch/src/mcp-server-opensearch/demo.py"
      ]
    }
}
```

Or use the FastMCP UI to install the server to Claude

```shell
uv run fastmcp install demo.py
```

## Environment Variables

The configuration of the server can be also done using environment variables:

- `OPENSEARCH_HOST`: URL of the OpenSearch server, e.g. `http://localhost`
- `OPENSEARCH_HOSTPORT`: Port of the host of the OpenSearch server `9200`
- `INDEX_NAME`: Name of the index to use

```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/__init__.py:
--------------------------------------------------------------------------------

```python
from . import server
import asyncio

def main():
    """Main entry point for the package."""
    asyncio.run(server.main())


# Optionally expose other important items at package level
__all__ = ["server", "demo"]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-opensearch"
version = "0.1.0"
description = "MCP server for OpenSearch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "anthropic>=0.44.0",
    "fastmcp>=0.4.1",
    "httpx>=0.28.1",
    "mcp[cli]>=1.2.0",
    "opensearch-py>=2.8.0",
    "python-dotenv>=1.0.1",
]

```

--------------------------------------------------------------------------------
/test_OpenSearchClient.py:
--------------------------------------------------------------------------------

```python
from opensearchpy import OpenSearch

host = 'localhost'
port = 9200
auth = ('admin', 'pizzaParty123') # For testing only. Don't store credentials in code.

# Create the client with SSL/TLS and hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = auth,
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)

q = "Women's Clothing"
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['category']
    }
  }
}

response = client.search(
    body = query,
    index = 'opensearch_dashboards_sample_data_ecommerce'
)

print('\nSearch results:')
print(response)

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - opensearchUrl
      - opensearchHostPort
      - indexName
    properties:
      opensearchUrl:
        type: string
        description: The URL of the OpenSearch server.
      opensearchHostPort:
        type: number
        description: The port of the host of the OpenSearch server.
      indexName:
        type: string
        description: The name of the index to use.
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    config => ({command: 'uv', args: ['run', 'mcp-server-opensearch', '--opensearch-url', `${config.opensearchUrl}:${config.opensearchHostPort}`, '--index-name', config.indexName]})

```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/test_opensearch.py:
--------------------------------------------------------------------------------

```python
import asyncio
from OpenSearchClient import OpenSearchClient
import json


q = "Women's Clothing"
query = {
    'size': 5,
    'query': {
        'multi_match': {
        'query': q,
        'fields': ['category']
        }
    }
}

async def test_opensearch():
    # Initialize connector
    connector = OpenSearchClient(
        opensearch_host="localhost",
        opensearch_hostPort=9200,
        index_name="opensearch_dashboards_sample_data_ecommerce",
        bhttp_compress=True,
        buse_ssl=True,
        bverify_certs=False,
        bssl_assert_hostname=False,
        bssl_show_warn=False
    )

    # Test search
    try:
        search_results = await connector.search_documents(query)
        print("Search results:", search_results)
    except Exception as e:
        print(f"Error during search: {e}")

if __name__ == "__main__":
    asyncio.run(test_opensearch()) 
```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/test_AsyncClient.py:
--------------------------------------------------------------------------------

```python
import asyncio
from AsyncOpenSearchClient import OpenSearchClient
import json


q = "Women's Clothing"
query = {
    'size': 5,
    'query': {
        'multi_match': {
        'query': q,
        'fields': ['category']
        }
    }
}

async def test_opensearch():
    # Initialize connector
    connector = await OpenSearchClient(
        opensearch_host="localhost",
        opensearch_hostPort=9200,
        index_name="opensearch_dashboards_sample_data_ecommerce",
        bhttp_compress=True,
        buse_ssl=True,
        bverify_certs=False,
        bssl_assert_hostname=False,
        bssl_show_warn=False
    )

    # Test search
    try:
        search_results = await connector.search_documents(query)
        print("Search results:", search_results)
    except Exception as e:
        print(f"Error during search: {e}")

if __name__ == "__main__":
    asyncio.run(test_opensearch()) 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Start with a Python image that includes the uv tool pre-installed
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv

# Set the working directory in the container
WORKDIR /app

# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1

# Copy the project configuration and lockfile
COPY pyproject.toml uv.lock ./

# Install the project's dependencies without installing the project itself
RUN --mount=type=cache,target=/root/.cache/uv     uv sync --frozen --no-install-project --no-dev --no-editable

# Add the rest of the project source code
ADD src /app/src

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv     uv sync --frozen --no-dev --no-editable

# Define the entry point command to run the server
ENTRYPOINT ["uv", "run", "mcp-server-opensearch", "--opensearch-url", "http://localhost:9200", "--index-name", "my_index"]

```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/serverTest.py:
--------------------------------------------------------------------------------

```python
import logging
from OpenSearchClient import OpenSearchClient
from fastmcp import FastMCP

#from .tools.index import IndexTools
#from .tools.document import DocumentTools
#rom .tools.cluster import ClusterTools

# TESTING VALUES 
opensearch_host="localhost"
opensearch_hostPort=9200
index_name="test_index"
http_compress=True
use_ssl=False
verify_certs=False
ssl_assert_hostname=False
ssl_show_warn=False

q = "Women's Clothing"
query = {
    'size': 5,
    'query': {
        'multi_match': {
        'query': q,
        'fields': ['category']
        }
    }
}
### END OF TESTING VALUES


class OpenSearchMCPServer:
    def __init__(self):
        self._name = "opensearch_mcp_server"
        self.mcp = FastMCP(self._name)
        
        #Configure and Establish Client to Open Search
        try:
            self._client = OpenSearchClient(opensearch_host, opensearch_hostPort, index_name, http_compress, use_ssl, verify_certs, ssl_assert_hostname, ssl_show_warn)
        except Exception as e:
            print(f"Error during search: {e}")
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(self._name)
        
        # Initialize tools
        self._register_tools()

    def _testClient(self):
        self._client.search_documents(query)

    def _register_tools(self):
        """Register all MCP tools."""
        # Initialize tool classes
        #index_tools = IndexTools(self.logger)
        #document_tools = DocumentTools(self.logger)
        #cluster_tools = ClusterTools(self.logger)
        
        # Register tools from each module
        #index_tools.register_tools(self.mcp)
        #document_tools.register_tools(self.mcp)
        #cluster_tools.register_tools(self.mcp)

    def run(self):
        """Run the MCP server."""
        self.mcp.run()

def main():
    server = OpenSearchMCPServer()
    server.run()
```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/OpenSearchClient.py:
--------------------------------------------------------------------------------

```python
import os
import asyncio
import json
from fastmcp import FastMCP

from opensearchpy import OpenSearch
host = 'localhost'
port = 9200

def get_string_list_from_json(json_data):
    """Extracts a list of strings from a JSON object."""

    if isinstance(json_data, list):
        return [item for item in json_data if isinstance(item, str)]
    elif isinstance(json_data, dict):
        return [value for value in json_data.values() if isinstance(value, str)]
    else:
        return []


class OpenSearchClient:
    def __init__(
        self,
        opensearch_host: str,
        opensearch_hostPort: str,
        index_name: str,
        bhttp_compress: bool,
        buse_ssl: bool,
        bverify_certs: bool,
        bssl_assert_hostname: bool,
        bssl_show_warn: bool
    ):
        self._index_name = index_name,
        self._host = [{'host':  opensearch_host, 'port': opensearch_hostPort}],
        self._auth =('admin', 'pizzaParty123'),
        self._client = OpenSearch(hosts = [{'host': host, 'port': port}], http_compress = bhttp_compress, http_auth = ('admin', 'pizzaParty123') , use_ssl = buse_ssl, verify_certs = bverify_certs, ssl_assert_hostname = bssl_assert_hostname, ssl_show_warn = bssl_show_warn)
    
    async def search_documents(self, query: str) -> list[str]:
        """
        Find documents in the OpenSearch index. If there are no documents found, an empty list is returned.
        :param query: The query to use for the search.
        :return: A list of documents found.
        """
        index_exists = self._client.indices.exists(index=self._index_name)
        if not index_exists:
           return []
        search_results = self._client.search(
            body=query,
           index = self._index_name
        )
        #search_results_json = json.loads(search_results, indent=4)
        #print(search_results_json)
        #return search_results
    
        return search_results

   
    def _return_client(self) -> OpenSearch:
        """Create and return an OpenSearch client using configuration from environment."""
        return self._client

```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/AsyncOpenSearchClient.py:
--------------------------------------------------------------------------------

```python
import os
import asyncio
from fastmcp import FastMCP

from opensearchpy_async import AsyncOpenSearch, AsyncHttpConnection, helpers
host = 'localhost'
port = 9200

def get_string_list_from_json(json_data):
    """Extracts a list of strings from a JSON object."""

    if isinstance(json_data, list):
        return [item for item in json_data if isinstance(item, str)]
    elif isinstance(json_data, dict):
        return [value for value in json_data.values() if isinstance(value, str)]
    else:
        return []


class OpenSearchClient:
    def __init__(
        self,
        opensearch_host: str,
        opensearch_hostPort: str,
        index_name: str,
        bhttp_compress: bool,
        buse_ssl: bool,
        bverify_certs: bool,
        bssl_assert_hostname: bool,
        bssl_show_warn: bool
    ):
        self._index_name = index_name,
        self._host = [{'host':  opensearch_host, 'port': opensearch_hostPort}],
        self._auth =('admin', 'pizzaParty123'),
        self._client = AsyncOpenSearch(hosts = [{'host': host, 'port': port}], http_compress = bhttp_compress, http_auth = ('admin', 'pizzaParty123') , use_ssl = buse_ssl, verify_certs = bverify_certs, ssl_assert_hostname = bssl_assert_hostname, ssl_show_warn = bssl_show_warn)
    
    async def search_documents(self, query: str) -> list[str]:
        """
        Find documents in the OpenSearch index. If there are no documents found, an empty list is returned.
        :param query: The query to use for the search.
        :return: A list of documents found.
        """
        index_exists = self._client.indices.exists(index=self._index_name)
        if not index_exists:
           return []
        search_results = self._client.search(
            body=query,
           index = self._index_name
        )
        #search_results_json = json.loads(search_results, indent=4)
        #print(search_results_json)
        #return search_results
    
        return search_results

   
    def _return_client(self) -> AsyncOpenSearch:
        """Create and return an OpenSearch client using configuration from environment."""
        return self._client

```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/demo.py:
--------------------------------------------------------------------------------

```python
# demo.py

from fastmcp import FastMCP
from OpenSearchClient import OpenSearchClient
import mcp.types as types
import asyncio
import json

mcp = FastMCP("Demo")

SLEEP_DELAY = 2

# TESTING VALUES 
T_opensearch_host="localhost"
T_opensearch_hostPort=9200
T_index_name="opensearch_dashboards_sample_data_ecommerce"
T_http_compress=True
T_use_ssl=False
T_verify_certs=False
T_ssl_assert_hostname=False
T_ssl_show_warn=False

q = "Women's Clothing"
query = {
    'size': 5,
    'query': {
        'multi_match': {
        'query': "Women's Clothing",
        'fields': ['category']
        }
    }
}
### END OF TESTING VALUES

@mcp.tool()
async def search_openSearch(query) -> list[types.TextContent] :
    client = OpenSearchClient(opensearch_host= T_opensearch_host, opensearch_hostPort=T_opensearch_hostPort, index_name=T_index_name, bhttp_compress = T_http_compress, buse_ssl = T_use_ssl, bverify_certs = T_verify_certs, bssl_assert_hostname = T_ssl_assert_hostname, bssl_show_warn = T_ssl_show_warn)
    queryresult = await client.search_documents(query)

    content = [
            types.TextContent(
                type="text", text=f"Documents for the query '{query}'"
            ),
        ]
    #for doc in queryresult:
    ##       content.append(
    #          types.TextContent(type="text", text=f"<document>{doc}</document>")
    #      )

    content.append(types.TextContent(type="text", text=f"<document>{str(queryresult)}</document>"))
    return content


# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
    """Get a personalized greeting"""
    return f"Hello, {name}!"


def main():
    async def _run():
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            server = serve(search_openSearch)   
            await server.run(
                read_stream,
                write_stream,
                InitializationOptions(
                    server_name="demo",
                    server_version="0.0.1",
                    capabilities=server.get_capabilities(
                        notification_options=NotificationOptions(),
                        experimental_capabilities={},
                    ),
                ),
            )
            await server.serve_forever()

    #asyncio.run(_run())

if __name__ == "__main__":
    asyncio.run(main()._run())
```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: '3'
services:
  opensearch-node1: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/)
    image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version
    container_name: opensearch-node1
    environment:
      - cluster.name=opensearch-cluster # Name the cluster
      - node.name=opensearch-node1 # Name the node that will run in this container
      - discovery.seed_hosts=opensearch-node1,opensearch-node2 # Nodes to look for when discovering the cluster
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 # Nodes eligible to serve as cluster manager
      - bootstrap.memory_lock=true # Disable JVM heap memory swapping
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=pizzaParty123    # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and later
    ulimits:
      memlock:
        soft: -1 # Set memlock to unlimited (no soft or hard limit)
        hard: -1
      nofile:
        soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container
    ports:
      - 9200:9200 # REST API
      - 9600:9600 # Performance Analyzer
    networks:
      - opensearch-net # All of the containers will join the same Docker bridge network
  opensearch-node2:
    image: opensearchproject/opensearch:latest # This should be the same image used for opensearch-node1 to avoid issues
    container_name: opensearch-node2
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node2
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=pizzaParty123
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data2:/usr/share/opensearch/data
    networks:
      - opensearch-net
  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:latest # Make sure the version of opensearch-dashboards matches the version of opensearch installed on other nodes
    container_name: opensearch-dashboards
    ports:
      - 5601:5601 # Map host port 5601 to container port 5601
    expose:
      - "5601" # Expose port 5601 for web access to OpenSearch Dashboards
    environment:
      OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' # Define the OpenSearch nodes that OpenSearch Dashboards will query
    networks:
      - opensearch-net

volumes:
  opensearch-data1:
  opensearch-data2:

networks:
  opensearch-net:
```

--------------------------------------------------------------------------------
/src/mcp-server-opensearch/server.py:
--------------------------------------------------------------------------------

```python
import httpx
import click
import asyncio

#from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
from mcp.server.fastmcp import FastMCP
from mcp.server import Server



from OpenSearchClient import OpenSearchClient

# TESTING VALUES 
T_opensearch_host="localhost"
T_opensearch_hostPort=9200
T_index_name="opensearch_dashboards_sample_data_ecommerce"
T_http_compress=True
T_use_ssl=False
T_verify_certs=False
T_ssl_assert_hostname=False
T_ssl_show_warn=False

q = "Women's Clothing"
query = {
    'size': 5,
    'query': {
        'multi_match': {
        'query': q,
        'fields': ['category']
        }
    }
}
### END OF TESTING VALUES

def serve(
    opensearch_host: str,
    opensearch_hostPort: str,
    index_name: str,
    http_compress: bool,
    use_ssl: bool,
    verify_certs: bool,
    ssl_assert_hostname: bool,
    ssl_show_warn: bool
) -> Server:
    """
    Instantiate the server and configure tools to store and find documents in OpenSearch.
    :param opensearch_host: The URL of the OpenSearch server.
    :param opensearch_hostPort: The port number of the OpenSearch server.
    :param index_name: The name of the index to use.
    """
    serverAPP = FastMCP("opensearch")

    opensearch = OpenSearchClient(
       opensearch_host = T_opensearch_host, opensearch_hostPort = T_opensearch_hostPort, index_name = T_index_name, bhttp_compress=T_http_compress, buse_ssl=T_use_ssl, bverify_certs=T_verify_certs, bssl_assert_hostname=T_ssl_assert_hostname, bssl_show_warn=T_ssl_show_warn
    )

    @serverAPP.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
        """
        Return the list of tools that the server provides. By default, there are two
        tools: one to store documents and another to find them. 
        """
        return [
            types.Tool(
                name="opensearch-find-documents",
                description=(
                    "Look up documents in OpenSearch. Use this tool when you need to: \n"
                    " - Find documents by their index or content"
                ),
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The query to search for in the documents",
                        },
                    },
                    "required": ["query"],
                },
            ),
        ]

    @serverAPP.call_tool()
    async def handle_tool_call(
        name: str, arguments: dict | None
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        if name not in ["opensearch-find-documents"]:
            raise ValueError(f"Unknown tool: {name}")

        if name == "opensearch-find-documents":
            if not arguments or "query" not in arguments:
                raise ValueError("Missing required argument 'query'")
            query = arguments["query"]
            documents = await opensearch.search_documents(query)
            content = [
                types.TextContent(
                    type="text", text=f"Documents for the query '{query}'"
                ),
            ]
            for doc in documents:
                content.append(
                    types.TextContent(type="text", text=f"<document>{doc}</document>")
                )
            return content

    return serverAPP


@click.command()
@click.option(
    "--opensearch_host",
    envvar="OPENSEARCH_HOST",
    required=True,
    help="Open Search Host URL",
)
@click.option(
    "--opensearch_hostPort",
    envvar="OPENSEARCH_HOST_PORT",
    required=True,
    help="Open Search Port Number",
)
@click.option(
    "--index-name",
    envvar="INDEX_NAME",
    required=True,
    help="Index name",
)
def main(
    opensearch_host: str,
    opensearch_hostPort: str,
    index_name: str,
    http_compress: bool,
    use_ssl: bool,
    verify_certs: bool,
    ssl_assert_hostname: bool,
    ssl_show_warn: bool
):
    async def _run():
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            server = serve(
                opensearch_host,
                opensearch_hostPort,
                index_name,
                http_compress,
                use_ssl,
                verify_certs,
                ssl_assert_hostname,
                ssl_show_warn
            )
            await server.run(
                read_stream,
               write_stream,
               InitializationOptions(
                server_name="openSearch",
                    server_version="0.1.0",
                    capabilities=server.get_capabilities(
                       notification_options=NotificationOptions(),
                       experimental_capabilities={},
                   ),
                ),
            )
    asyncio.run(_run())
```