This is page 1 of 2. Use http://codebase.md/redis/mcp-redis?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ └── workflows
│ ├── ci.yml
│ ├── release.yml
│ └── stale-issues.yml
├── .gitignore
├── Dockerfile
├── examples
│ └── redis_assistant.py
├── fly.toml
├── gemini-extension.json
├── GEMINI.md
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ ├── __init__.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── connection.py
│ │ ├── entraid_auth.py
│ │ ├── logging_utils.py
│ │ └── server.py
│ ├── main.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── hash.py
│ │ ├── json.py
│ │ ├── list.py
│ │ ├── misc.py
│ │ ├── pub_sub.py
│ │ ├── redis_query_engine.py
│ │ ├── server_management.py
│ │ ├── set.py
│ │ ├── sorted_set.py
│ │ ├── stream.py
│ │ └── string.py
│ └── version.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_config.py
│ ├── test_connection.py
│ ├── test_entraid_auth.py
│ ├── test_integration.py
│ ├── test_logging_utils.py
│ ├── test_main.py
│ ├── test_server.py
│ └── tools
│ ├── __init__.py
│ ├── test_hash.py
│ ├── test_json.py
│ ├── test_list.py
│ ├── test_pub_sub.py
│ ├── test_redis_query_engine.py
│ ├── test_server_management.py
│ ├── test_set.py
│ ├── test_sorted_set.py
│ ├── test_stream.py
│ └── test_string.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
*
!src
!uv.lock
!pyproject.toml
!README.md
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.env
env/
ENV/
env.bak/
venv.bak/
# UV lock file
uv.lock
# IDE files
.idea/
.vscode/
*.swp
*.swo
*~
/bandit-report.json
/safety-report.json
/coverage.xml
/.coverage
/htmlcov/
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
REDIS_HOST=your_redis_host
REDIS_PORT=6379
REDIS_DB=0
REDIS_USERNAME=default
REDIS_PWD=your_password
REDIS_SSL=False
REDIS_SSL_CA_PATH=/path/to/ca.pem
REDIS_SSL_KEYFILE=/path/to/key.pem
REDIS_SSL_CERTFILE=/path/to/cert.pem
REDIS_SSL_CERT_REQS=required
REDIS_SSL_CA_CERTS=/path/to/ca_certs.pem
REDIS_CLUSTER_MODE=False
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Redis MCP Server
[](https://github.com/redis/mcp-redis/actions/workflows/ci.yml)
[](https://pypi.org/project/redis-mcp-server/)
[](https://www.python.org/downloads/)
[](./LICENSE.txt)
[](https://mseep.ai/app/70102150-efe0-4705-9f7d-87980109a279)
[](https://hub.docker.com/r/mcp/redis)
[](https://codecov.io/gh/redis/mcp-redis)
[](https://discord.gg/redis)
[](https://www.twitch.tv/redisinc)
[](https://www.youtube.com/redisinc)
[](https://twitter.com/redisinc)
[](https://stackoverflow.com/questions/tagged/mcp-redis)
## Overview
The Redis MCP Server is a **natural language interface** designed for agentic applications to efficiently manage and search data in Redis. It integrates seamlessly with **MCP (Model Content Protocol) clients**, enabling AI-driven workflows to interact with structured and unstructured data in Redis. Using this MCP Server, you can ask questions like:
- "Store the entire conversation in a stream"
- "Cache this item"
- "Store the session with an expiration time"
- "Index and search this vector"
## Table of Contents
- [Overview](#overview)
- [Features](#features)
- [Tools](#tools)
- [Installation](#installation)
- [From PyPI (recommended)](#from-pypi-recommended)
- [Testing the PyPI package](#testing-the-pypi-package)
- [From GitHub](#from-github)
- [Development Installation](#development-installation)
- [With Docker](#with-docker)
- [Configuration](#configuration)
- [Redis ACL](#redis-acl)
- [Configuration via command line arguments](#configuration-via-command-line-arguments)
- [Configuration via Environment Variables](#configuration-via-environment-variables)
- [EntraID Authentication for Azure Managed Redis](#entraid-authentication-for-azure-managed-redis)
- [Logging](#logging)
- [Integrations](#integrations)
- [OpenAI Agents SDK](#openai-agents-sdk)
- [Augment](#augment)
- [Claude Desktop](#claude-desktop)
- [VS Code with GitHub Copilot](#vs-code-with-github-copilot)
- [Testing](#testing)
- [Example Use Cases](#example-use-cases)
- [Contributing](#contributing)
- [License](#license)
- [Badges](#badges)
- [Contact](#contact)
## Features
- **Natural Language Queries**: Enables AI agents to query and update Redis using natural language.
- **Seamless MCP Integration**: Works with any **MCP client** for smooth communication.
- **Full Redis Support**: Handles **hashes, lists, sets, sorted sets, streams**, and more.
- **Search & Filtering**: Supports efficient data retrieval and searching in Redis.
- **Scalable & Lightweight**: Designed for **high-performance** data operations.
- **EntraID Authentication**: Native support for Azure Active Directory authentication with Azure Managed Redis.
- The Redis MCP Server supports the `stdio` [transport](https://modelcontextprotocol.io/docs/concepts/transports#standard-input%2Foutput-stdio). Support to the `stremable-http` transport will be added in the future.
## Tools
This MCP Server provides tools to manage the data stored in Redis.
- `string` tools to set, get strings with expiration. Useful for storing simple configuration values, session data, or caching responses.
- `hash` tools to store field-value pairs within a single key. The hash can store vector embeddings. Useful for representing objects with multiple attributes, user profiles, or product information where fields can be accessed individually.
- `list` tools with common operations to append and pop items. Useful for queues, message brokers, or maintaining a list of most recent actions.
- `set` tools to add, remove and list set members. Useful for tracking unique values like user IDs or tags, and for performing set operations like intersection.
- `sorted set` tools to manage data for e.g. leaderboards, priority queues, or time-based analytics with score-based ordering.
- `pub/sub` functionality to publish messages to channels and subscribe to receive them. Useful for real-time notifications, chat applications, or distributing updates to multiple clients.
- `streams` tools to add, read, and delete from data streams. Useful for event sourcing, activity feeds, or sensor data logging with consumer groups support.
- `JSON` tools to store, retrieve, and manipulate JSON documents in Redis. Useful for complex nested data structures, document databases, or configuration management with path-based access.
Additional tools.
- `query engine` tools to manage vector indexes and perform vector search
- `server management` tool to retrieve information about the database
## Installation
The Redis MCP Server is available as a PyPI package and as direct installation from the GitHub repository.
### From PyPI (recommended)
Configuring the latest Redis MCP Server version from PyPI, as an example, can be done importing the following JSON configuration in the desired framework or tool.
The `uvx` command will download the server on the fly (if not cached already), create a temporary environment, and then run it.
```commandline
{
"mcpServers": {
"RedisMCPServer": {
"command": "uvx",
"args": [
"--from",
"redis-mcp-server@latest",
"redis-mcp-server",
"--url",
"\"redis://localhost:6379/0\""
]
}
}
}
```
#### URL specification
The format to specify the `--url` argument follows the [redis](https://www.iana.org/assignments/uri-schemes/prov/redis) and [rediss](https://www.iana.org/assignments/uri-schemes/prov/rediss) schemes:
```commandline
redis://user:secret@localhost:6379/0?foo=bar&qux=baz
```
As an example, you can easily connect to a localhost server with:
```commandline
redis://localhost:6379/0
```
Where `0` is the [logical database](https://redis.io/docs/latest/commands/select/) you'd like to connect to.
For an encrypted connection to the database (e.g., connecting to a [Redis Cloud](https://redis.io/cloud/) database), you'd use the `rediss` scheme.
```commandline
rediss://user:secret@localhost:6379/0?foo=bar&qux=baz
```
To verify the server's identity, specify `ssl_ca_certs`.
```commandline
rediss://user:secret@hostname:port?ssl_cert_reqs=required&ssl_ca_certs=path_to_the_certificate
```
For an unverified connection, set `ssl_cert_reqs` to `none`
```commandline
rediss://user:secret@hostname:port?ssl_cert_reqs=none
```
Configure your connection using the available options in the section "Available CLI Options".
### Testing the PyPI package
You can install the package as follows:
```sh
pip install redis-mcp-server
```
And start it using `uv` the package in your environment.
```sh
uv python install 3.14
uv sync
uv run redis-mcp-server --url redis://localhost:6379/0
```
However, starting the MCP Server is most useful when delegate to the framework or tool where this MCP Server is configured.
### From GitHub
You can configure the desired Redis MCP Server version with `uvx`, which allows you to run it directly from GitHub (from a branch, or use a tagged release).
> It is recommended to use a tagged release, the `main` branch is under active development and may contain breaking changes.
As an example, you can execute the following command to run the `0.2.0` release:
```commandline
uvx --from git+https://github.com/redis/[email protected] redis-mcp-server --url redis://localhost:6379/0
```
Check the release notes for the latest version in the [Releases](https://github.com/redis/mcp-redis/releases) section.
Additional examples are provided below.
```sh
# Run with Redis URI
uvx --from git+https://github.com/redis/mcp-redis.git redis-mcp-server --url redis://localhost:6379/0
# Run with Redis URI and SSL
uvx --from git+https://github.com/redis/mcp-redis.git redis-mcp-server --url "rediss://<USERNAME>:<PASSWORD>@<HOST>:<PORT>?ssl_cert_reqs=required&ssl_ca_certs=<PATH_TO_CERT>"
# Run with individual parameters
uvx --from git+https://github.com/redis/mcp-redis.git redis-mcp-server --host localhost --port 6379 --password mypassword
# See all options
uvx --from git+https://github.com/redis/mcp-redis.git redis-mcp-server --help
```
### Development Installation
For development or if you prefer to clone the repository:
```sh
# Clone the repository
git clone https://github.com/redis/mcp-redis.git
cd mcp-redis
# Install dependencies using uv
uv venv
source .venv/bin/activate
uv sync
# Run with CLI interface
uv run redis-mcp-server --help
# Or run the main file directly (uses environment variables)
uv run src/main.py
```
Once you cloned the repository, installed the dependencies and verified you can run the server, you can configure Claude Desktop or any other MCP Client to use this MCP Server running the main file directly (it uses environment variables). This is usually preferred for development.
The following example is for Claude Desktop, but the same applies to any other MCP Client.
1. Specify your Redis credentials and TLS configuration
2. Retrieve your `uv` command full path (e.g. `which uv`)
3. Edit the `claude_desktop_config.json` configuration file
- on a MacOS, at `~/Library/Application\ Support/Claude/`
```json
{
"mcpServers": {
"redis": {
"command": "<full_path_uv_command>",
"args": [
"--directory",
"<your_mcp_server_directory>",
"run",
"src/main.py"
],
"env": {
"REDIS_HOST": "<your_redis_database_hostname>",
"REDIS_PORT": "<your_redis_database_port>",
"REDIS_PWD": "<your_redis_database_password>",
"REDIS_SSL": True|False,
"REDIS_SSL_CA_PATH": "<your_redis_ca_path>",
"REDIS_CLUSTER_MODE": True|False
}
}
}
}
```
You can troubleshoot problems by tailing the log file.
```commandline
tail -f ~/Library/Logs/Claude/mcp-server-redis.log
```
### With Docker
You can use a dockerized deployment of this server. You can either build your own image or use the official [Redis MCP Docker](https://hub.docker.com/r/mcp/redis) image.
If you'd like to build your own image, the Redis MCP Server provides a Dockerfile. Build this server's image with:
```commandline
docker build -t mcp-redis .
```
Finally, configure the client to create the container at start-up. An example for Claude Desktop is provided below. Edit the `claude_desktop_config.json` and add:
```json
{
"mcpServers": {
"redis": {
"command": "docker",
"args": ["run",
"--rm",
"--name",
"redis-mcp-server",
"-i",
"-e", "REDIS_HOST=<redis_hostname>",
"-e", "REDIS_PORT=<redis_port>",
"-e", "REDIS_USERNAME=<redis_username>",
"-e", "REDIS_PWD=<redis_password>",
"mcp-redis"]
}
}
}
```
To use the official [Redis MCP Docker](https://hub.docker.com/r/mcp/redis) image, just replace your image name (`mcp-redis` in the example above) with `mcp/redis`.
## Configuration
The Redis MCP Server can be configured in two ways: via command line arguments or via environment variables.
The precedence is: command line arguments > environment variables > default values.
### Redis ACL
You can configure Redis ACL to restrict the access to the Redis database. For example, to create a read-only user:
```
127.0.0.1:6379> ACL SETUSER readonlyuser on >mypassword ~* +@read -@write
```
Configure the user via command line arguments or environment variables.
### Configuration via command line arguments
When using the CLI interface, you can configure the server with command line arguments:
```sh
# Basic Redis connection
uvx --from redis-mcp-server@latest redis-mcp-server \
--host localhost \
--port 6379 \
--password mypassword
# Using Redis URI (simpler)
uvx --from redis-mcp-server@latest redis-mcp-server \
--url redis://user:pass@localhost:6379/0
# SSL connection
uvx --from redis-mcp-server@latest redis-mcp-server \
--url rediss://user:[email protected]:6379/0
# See all available options
uvx --from redis-mcp-server@latest redis-mcp-server --help
```
**Available CLI Options:**
- `--url` - Redis connection URI (redis://user:pass@host:port/db)
- `--host` - Redis hostname (default: 127.0.0.1)
- `--port` - Redis port (default: 6379)
- `--db` - Redis database number (default: 0)
- `--username` - Redis username
- `--password` - Redis password
- `--ssl` - Enable SSL connection
- `--ssl-ca-path` - Path to CA certificate file
- `--ssl-keyfile` - Path to SSL key file
- `--ssl-certfile` - Path to SSL certificate file
- `--ssl-cert-reqs` - SSL certificate requirements (default: required)
- `--ssl-ca-certs` - Path to CA certificates file
- `--cluster-mode` - Enable Redis cluster mode
### Configuration via Environment Variables
If desired, you can use environment variables. Defaults are provided for all variables.
| Name | Description | Default Value |
|----------------------|-----------------------------------------------------------|---------------|
| `REDIS_HOST` | Redis IP or hostname | `"127.0.0.1"` |
| `REDIS_PORT` | Redis port | `6379` |
| `REDIS_DB` | Database | 0 |
| `REDIS_USERNAME` | Default database username | `"default"` |
| `REDIS_PWD` | Default database password | "" |
| `REDIS_SSL` | Enables or disables SSL/TLS | `False` |
| `REDIS_SSL_CA_PATH` | CA certificate for verifying server | None |
| `REDIS_SSL_KEYFILE` | Client's private key file for client authentication | None |
| `REDIS_SSL_CERTFILE` | Client's certificate file for client authentication | None |
| `REDIS_SSL_CERT_REQS`| Whether the client should verify the server's certificate | `"required"` |
| `REDIS_SSL_CA_CERTS` | Path to the trusted CA certificates file | None |
| `REDIS_CLUSTER_MODE` | Enable Redis Cluster mode | `False` |
### EntraID Authentication for Azure Managed Redis
The Redis MCP Server supports **EntraID (Azure Active Directory) authentication** for Azure Managed Redis, enabling OAuth-based authentication with automatic token management.
#### Authentication Providers
**Service Principal Authentication** - Application-based authentication using client credentials:
```bash
export REDIS_ENTRAID_AUTH_FLOW=service_principal
export REDIS_ENTRAID_CLIENT_ID=your-client-id
export REDIS_ENTRAID_CLIENT_SECRET=your-client-secret
export REDIS_ENTRAID_TENANT_ID=your-tenant-id
```
**Managed Identity Authentication** - For Azure-hosted applications:
```bash
# System-assigned managed identity
export REDIS_ENTRAID_AUTH_FLOW=managed_identity
export REDIS_ENTRAID_IDENTITY_TYPE=system_assigned
# User-assigned managed identity
export REDIS_ENTRAID_AUTH_FLOW=managed_identity
export REDIS_ENTRAID_IDENTITY_TYPE=user_assigned
export REDIS_ENTRAID_USER_ASSIGNED_CLIENT_ID=your-identity-client-id
```
**Default Azure Credential** - Automatic credential discovery (recommended for development):
```bash
export REDIS_ENTRAID_AUTH_FLOW=default_credential
export REDIS_ENTRAID_SCOPES=https://redis.azure.com/.default
```
#### EntraID Configuration Variables
| Name | Description | Default Value |
|-----------------------------------------|-----------------------------------------------------------|--------------------------------------|
| `REDIS_ENTRAID_AUTH_FLOW` | Authentication flow type | None (EntraID disabled) |
| `REDIS_ENTRAID_CLIENT_ID` | Service Principal client ID | None |
| `REDIS_ENTRAID_CLIENT_SECRET` | Service Principal client secret | None |
| `REDIS_ENTRAID_TENANT_ID` | Azure tenant ID | None |
| `REDIS_ENTRAID_IDENTITY_TYPE` | Managed identity type | `"system_assigned"` |
| `REDIS_ENTRAID_USER_ASSIGNED_CLIENT_ID` | User-assigned managed identity client ID | None |
| `REDIS_ENTRAID_SCOPES` | OAuth scopes for Default Azure Credential | `"https://redis.azure.com/.default"` |
| `REDIS_ENTRAID_RESOURCE` | Azure Redis resource identifier | `"https://redis.azure.com/"` |
#### Key Features
- **Automatic token renewal** - Background token refresh with no manual intervention
- **Graceful fallback** - Falls back to standard Redis authentication when EntraID not configured
- **Multiple auth flows** - Supports Service Principal, Managed Identity, and Default Azure Credential
- **Enterprise ready** - Designed for Azure Managed Redis with centralized identity management
#### Example Configuration
For **local development** with Azure CLI:
```bash
# Login with Azure CLI
az login
# Configure MCP server
export REDIS_ENTRAID_AUTH_FLOW=default_credential
export REDIS_URL=redis://your-azure-redis.redis.cache.windows.net:6379
```
For **production** with Service Principal:
```bash
export REDIS_ENTRAID_AUTH_FLOW=service_principal
export REDIS_ENTRAID_CLIENT_ID=your-app-client-id
export REDIS_ENTRAID_CLIENT_SECRET=your-app-secret
export REDIS_ENTRAID_TENANT_ID=your-tenant-id
export REDIS_URL=redis://your-azure-redis.redis.cache.windows.net:6379
```
For **Azure-hosted applications** with Managed Identity:
```bash
export REDIS_ENTRAID_AUTH_FLOW=managed_identity
export REDIS_ENTRAID_IDENTITY_TYPE=system_assigned
export REDIS_URL=redis://your-azure-redis.redis.cache.windows.net:6379
```
There are several ways to set environment variables:
1. **Using a `.env` File**:
Place a `.env` file in your project directory with key-value pairs for each environment variable. Tools like `python-dotenv`, `pipenv`, and `uv` can automatically load these variables when running your application. This is a convenient and secure way to manage configuration, as it keeps sensitive data out of your shell history and version control (if `.env` is in `.gitignore`).
For example, create a `.env` file with the following content from the `.env.example` file provided in the repository:
```bash
cp .env.example .env
```
Then edit the `.env` file to set your Redis configuration:
OR,
2. **Setting Variables in the Shell**:
You can export environment variables directly in your shell before running your application. For example:
```sh
export REDIS_HOST=your_redis_host
export REDIS_PORT=6379
# Other variables will be set similarly...
```
This method is useful for temporary overrides or quick testing.
### Logging
The server uses Python's standard logging and is configured at startup. By default it logs at WARNING and above. You can change verbosity with the `MCP_REDIS_LOG_LEVEL` environment variable.
- Accepted values (case-insensitive): `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`, `NOTSET`
- Aliases supported: `WARN` → `WARNING`, `FATAL` → `CRITICAL`
- Numeric values are also accepted, including signed (e.g., `"10"`, `"+20"`)
- Default when unset or unrecognized: `WARNING`
Handler behavior
- If the host (e.g., `uv`, VS Code, pytest) already installed console handlers, the server will NOT add its own; it only lowers overly-restrictive handler thresholds so your chosen level is not filtered out. It will never raise a handler's threshold.
- If no handlers are present, the server adds a single stderr StreamHandler with a simple format.
Examples
```bash
# See normal lifecycle messages
MCP_REDIS_LOG_LEVEL=INFO uv run src/main.py
# Very verbose for debugging
MCP_REDIS_LOG_LEVEL=DEBUG uvx --from redis-mcp-server@latest redis-mcp-server --url redis://localhost:6379/0
```
In MCP client configs that support env, add it alongside your Redis settings. For example:
```json
{
"mcpServers": {
"redis": {
"command": "uvx",
"args": ["--from", "redis-mcp-server@latest", "redis-mcp-server", "--url", "redis://localhost:6379/0"],
"env": {
"REDIS_HOST": "localhost",
"REDIS_PORT": "6379",
"MCP_REDIS_LOG_LEVEL": "INFO"
}
}
}
}
```
## Integrations
Integrating this MCP Server to development frameworks like OpenAI Agents SDK, or with tools like Claude Desktop, VS Code, or Augment is described in the following sections.
### OpenAI Agents SDK
Integrate this MCP Server with the OpenAI Agents SDK. Read the [documents](https://openai.github.io/openai-agents-python/mcp/) to learn more about the integration of the SDK with MCP.
Install the Python SDK.
```commandline
pip install openai-agents
```
Configure the OpenAI token:
```commandline
export OPENAI_API_KEY="<openai_token>"
```
And run the [application](./examples/redis_assistant.py).
```commandline
python3.14 redis_assistant.py
```
You can troubleshoot your agent workflows using the [OpenAI dashboard](https://platform.openai.com/traces/).
### Augment
The preferred way of configuring the Redis MCP Server in Augment is to use the [Easy MCP](https://docs.augmentcode.com/setup-augment/mcp#redis) feature.
You can also configure the Redis MCP Server in Augment manually by importing the server via JSON:
```json
{
"mcpServers": {
"Redis MCP Server": {
"command": "uvx",
"args": [
"--from",
"redis-mcp-server@latest",
"redis-mcp-server",
"--url",
"redis://localhost:6379/0"
]
}
}
}
```
### Claude Desktop
The simplest way to configure MCP clients is using `uvx`. Add the following JSON to your `claude_desktop_config.json`, remember to provide the full path to `uvx`.
**Basic Redis connection:**
```json
{
"mcpServers": {
"redis-mcp-server": {
"type": "stdio",
"command": "/Users/mortensi/.local/bin/uvx",
"args": [
"--from", "redis-mcp-server@latest",
"redis-mcp-server",
"--url", "redis://localhost:6379/0"
]
}
}
}
```
**Azure Managed Redis with EntraID authentication:**
```json
{
"mcpServers": {
"redis-mcp-server": {
"type": "stdio",
"command": "/Users/mortensi/.local/bin/uvx",
"args": [
"--from", "redis-mcp-server@latest",
"redis-mcp-server",
"--url", "redis://your-azure-redis.redis.cache.windows.net:6379"
],
"env": {
"REDIS_ENTRAID_AUTH_FLOW": "default_credential",
"REDIS_ENTRAID_SCOPES": "https://redis.azure.com/.default"
}
}
}
}
```
### VS Code with GitHub Copilot
To use the Redis MCP Server with VS Code, you must nable the [agent mode](https://code.visualstudio.com/docs/copilot/chat/chat-agent-mode) tools. Add the following to your `settings.json`:
```json
{
"chat.agent.enabled": true
}
```
You can start the GitHub desired version of the Redis MCP server using `uvx` by adding the following JSON to your `mcp.json` file:
```json
"servers": {
"redis": {
"type": "stdio",
"command": "uvx",
"args": [
"--from", "redis-mcp-server@latest",
"redis-mcp-server",
"--url", "redis://localhost:6379/0"
]
},
}
```
#### Suppressing uvx Installation Messages
If you want to suppress uvx installation messages that may appear as warnings in MCP client logs, use the `-qq` flag:
```json
"servers": {
"redis": {
"type": "stdio",
"command": "uvx",
"args": [
"-qq",
"--from", "redis-mcp-server@latest",
"redis-mcp-server",
"--url", "redis://localhost:6379/0"
]
},
}
```
The `-qq` flag enables silent mode, which suppresses "Installed X packages" messages that uvx writes to stderr during package installation.
Alternatively, you can start the server using `uv` and configure your `mcp.json`. This is usually desired for development.
```json
// mcp.json
{
"servers": {
"redis": {
"type": "stdio",
"command": "<full_path_uv_command>",
"args": [
"--directory",
"<your_mcp_server_directory>",
"run",
"src/main.py"
],
"env": {
"REDIS_HOST": "<your_redis_database_hostname>",
"REDIS_PORT": "<your_redis_database_port>",
"REDIS_USERNAME": "<your_redis_database_username>",
"REDIS_PWD": "<your_redis_database_password>",
}
}
}
}
```
For more information, see the [VS Code documentation](https://code.visualstudio.com/docs/copilot/chat/mcp-servers).
> **Tip:** You can prompt Copilot chat to use the Redis MCP tools by including `#redis` in your message.
> **Note:** Starting with [VS Code v1.102](https://code.visualstudio.com/updates/v1_102),
> MCP servers are now stored in a dedicated `mcp.json` file instead of `settings.json`.
## Testing
You can use the [MCP Inspector](https://modelcontextprotocol.io/docs/tools/inspector) for visual debugging of this MCP Server.
```sh
npx @modelcontextprotocol/inspector uv run src/main.py
```
## Example Use Cases
- **AI Assistants**: Enable LLMs to fetch, store, and process data in Redis.
- **Chatbots & Virtual Agents**: Retrieve session data, manage queues, and personalize responses.
- **Data Search & Analytics**: Query Redis for **real-time insights and fast lookups**.
- **Event Processing**: Manage event streams with **Redis Streams**.
## Contributing
1. Fork the repo
2. Create a new branch (`feature-branch`)
3. Commit your changes
4. Push to your branch and submit a PR!
## License
This project is licensed under the **MIT License**.
## Badges
<a href="https://glama.ai/mcp/servers/@redis/mcp-redis">
<img width="380" height="200" src="https://glama.ai/mcp/servers/@redis/mcp-redis/badge" alt="Redis Server MCP server" />
</a>
## Contact
For questions or support, reach out via [GitHub Issues](https://github.com/redis/mcp-redis/issues).
Alternatively, you can join the [Redis Discord server](https://discord.gg/redis) and ask in the `#redis-mcp-server` channel.
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/common/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/tools/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/version.py:
--------------------------------------------------------------------------------
```python
__version__ = "0.3.5"
```
--------------------------------------------------------------------------------
/tests/tools/__init__.py:
--------------------------------------------------------------------------------
```python
# Tests for tools package
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
# Tests package for Redis MCP Server
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.14-slim
RUN pip install --upgrade uv
WORKDIR /app
COPY . /app
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked
CMD ["uv", "run", "python", "src/main.py"]
```
--------------------------------------------------------------------------------
/gemini-extension.json:
--------------------------------------------------------------------------------
```json
{
"name": "redis",
"version": "0.1.0",
"description": "Manage and search data in Redis efficiently within Gemini CLI.",
"mcpServers": {
"redis": {
"command": "uvx",
"args": [
"--from",
"redis-mcp-server@latest",
"redis-mcp-server",
"--url", "${REDIS_URL}"
]
}
}
}
```
--------------------------------------------------------------------------------
/src/common/server.py:
--------------------------------------------------------------------------------
```python
import importlib
import pkgutil
from mcp.server.fastmcp import FastMCP
def load_tools():
import src.tools as tools_pkg
for _, module_name, _ in pkgutil.iter_modules(tools_pkg.__path__):
importlib.import_module(f"src.tools.{module_name}")
# Initialize FastMCP server
mcp = FastMCP("Redis MCP Server", dependencies=["redis", "dotenv", "numpy"])
# Load tools
load_tools()
```
--------------------------------------------------------------------------------
/fly.toml:
--------------------------------------------------------------------------------
```toml
# fly.toml app configuration file generated for mcp-redis-7s on 2025-07-24T07:40:43Z
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = 'mcp-redis-7s'
primary_region = 'sin'
[build]
[http_service]
internal_port = 8080
force_https = true
auto_stop_machines = 'stop'
auto_start_machines = true
min_machines_running = 0
processes = ['app']
[[vm]]
memory = '1gb'
cpu_kind = 'shared'
cpus = 1
memory_mb = 1024
```
--------------------------------------------------------------------------------
/.github/dependabot.yml:
--------------------------------------------------------------------------------
```yaml
version: 2
updates:
# Enable version updates for Python dependencies
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
day: "monday"
time: "09:00"
open-pull-requests-limit: 10
commit-message:
prefix: "deps"
include: "scope"
labels:
- "dependencies"
# Enable version updates for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
day: "monday"
time: "09:00"
open-pull-requests-limit: 5
commit-message:
prefix: "ci"
include: "scope"
labels:
- "dependencies"
# Enable version updates for Docker
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
day: "monday"
time: "09:00"
open-pull-requests-limit: 5
commit-message:
prefix: "docker"
include: "scope"
labels:
- "dependencies"
```
--------------------------------------------------------------------------------
/src/tools/server_management.py:
--------------------------------------------------------------------------------
```python
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def dbsize() -> int:
"""Get the number of keys stored in the Redis database"""
try:
r = RedisConnectionManager.get_connection()
return r.dbsize()
except RedisError as e:
return f"Error getting database size: {str(e)}"
@mcp.tool()
async def info(section: str = "default") -> dict:
"""Get Redis server information and statistics.
Args:
section: The section of the info command (default, memory, cpu, etc.).
Returns:
A dictionary of server information or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
info = r.info(section)
return info
except RedisError as e:
return f"Error retrieving Redis info: {str(e)}"
@mcp.tool()
async def client_list() -> list:
"""Get a list of connected clients to the Redis server."""
try:
r = RedisConnectionManager.get_connection()
clients = r.client_list()
return clients
except RedisError as e:
return f"Error retrieving client list: {str(e)}"
```
--------------------------------------------------------------------------------
/src/tools/pub_sub.py:
--------------------------------------------------------------------------------
```python
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def publish(channel: str, message: str) -> str:
"""Publish a message to a Redis channel.
Args:
channel: The Redis channel to publish to.
message: The message to send.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
r.publish(channel, message)
return f"Message published to channel '{channel}'."
except RedisError as e:
return f"Error publishing message to channel '{channel}': {str(e)}"
@mcp.tool()
async def subscribe(channel: str) -> str:
"""Subscribe to a Redis channel.
Args:
channel: The Redis channel to subscribe to.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
pubsub = r.pubsub()
pubsub.subscribe(channel)
return f"Subscribed to channel '{channel}'."
except RedisError as e:
return f"Error subscribing to channel '{channel}': {str(e)}"
@mcp.tool()
async def unsubscribe(channel: str) -> str:
"""Unsubscribe from a Redis channel.
Args:
channel: The Redis channel to unsubscribe from.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
pubsub = r.pubsub()
pubsub.unsubscribe(channel)
return f"Unsubscribed from channel '{channel}'."
except RedisError as e:
return f"Error unsubscribing from channel '{channel}': {str(e)}"
```
--------------------------------------------------------------------------------
/src/tools/string.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Union, Optional
from redis.exceptions import RedisError
from redis import Redis
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def set(
key: str,
value: Union[str, bytes, int, float, dict],
expiration: Optional[int] = None,
) -> str:
"""Set a Redis string value with an optional expiration time.
Args:
key (str): The key to set.
value (str, bytes, int, float, dict): The value to store.
expiration (int, optional): Expiration time in seconds.
Returns:
str: Confirmation message or an error message.
"""
if isinstance(value, bytes):
encoded_value = value
elif isinstance(value, dict):
encoded_value = json.dumps(value)
else:
encoded_value = str(value)
if isinstance(encoded_value, str):
encoded_value = encoded_value.encode("utf-8")
try:
r: Redis = RedisConnectionManager.get_connection()
if expiration:
r.setex(key, expiration, encoded_value)
else:
r.set(key, encoded_value)
return f"Successfully set {key}" + (
f" with expiration {expiration} seconds" if expiration else ""
)
except RedisError as e:
return f"Error setting key {key}: {str(e)}"
@mcp.tool()
async def get(key: str) -> Union[str, bytes]:
"""Get a Redis string value.
Args:
key (str): The key to retrieve.
Returns:
str, bytes: The stored value or an error message.
"""
try:
r: Redis = RedisConnectionManager.get_connection()
value = r.get(key)
if value is None:
return f"Key {key} does not exist"
if isinstance(value, bytes):
try:
text = value.decode("utf-8")
return text
except UnicodeDecodeError:
return value
return value
except RedisError as e:
return f"Error retrieving key {key}: {str(e)}"
```
--------------------------------------------------------------------------------
/src/common/logging_utils.py:
--------------------------------------------------------------------------------
```python
import logging
import os
import sys
def resolve_log_level() -> int:
"""Resolve desired log level from MCP_REDIS_LOG_LEVEL.
Accepts numeric strings or standard level names (DEBUG, INFO, WARNING,
ERROR, CRITICAL, NOTSET) including aliases WARN and FATAL. Defaults to WARNING.
"""
name = os.getenv("MCP_REDIS_LOG_LEVEL")
if name:
s = name.strip()
try:
return int(s)
except ValueError:
pass
level = getattr(logging, s.upper(), None)
if isinstance(level, int):
return level
return logging.WARNING
def configure_logging() -> int:
"""Configure logging based on environment.
- Default level WARNING
- MCP_REDIS_LOG_LEVEL to override
Returns the resolved log level. Idempotent.
"""
level = resolve_log_level()
root = logging.getLogger()
# Always set the root logger level
root.setLevel(level)
# Only lower overly-restrictive handler thresholds to avoid host filtering.
# - Leave NOTSET (0) alone so it defers to logger/root levels
# - Do not raise handler thresholds (respect host-configured verbosity)
for h in root.handlers:
try:
cur = getattr(h, "level", None)
if isinstance(cur, int) and cur != logging.NOTSET and cur > level:
h.setLevel(level)
except Exception:
# Log at DEBUG to avoid noisy stderr while still providing diagnostics.
logging.getLogger(__name__).debug(
"Failed to adjust handler level for handler %r", h, exc_info=True
)
# Only add our own stderr handler if there are NO handlers at all.
# Many hosts (pytest, uv, VS Code) install a console handler already.
if not root.handlers:
sh = logging.StreamHandler(sys.stderr)
sh.setLevel(level)
sh.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
root.addHandler(sh)
# Route warnings.warn(...) through logging
logging.captureWarnings(True)
return level
```
--------------------------------------------------------------------------------
/src/tools/set.py:
--------------------------------------------------------------------------------
```python
from typing import Union, List, Optional
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def sadd(name: str, value: str, expire_seconds: Optional[int] = None) -> str:
"""Add a value to a Redis set with an optional expiration time.
Args:
name: The Redis set key.
value: The value to add to the set.
expire_seconds: Optional; time in seconds after which the set should expire.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
r.sadd(name, value)
if expire_seconds is not None:
r.expire(name, expire_seconds)
return f"Value '{value}' added successfully to set '{name}'." + (
f" Expires in {expire_seconds} seconds." if expire_seconds else ""
)
except RedisError as e:
return f"Error adding value '{value}' to set '{name}': {str(e)}"
@mcp.tool()
async def srem(name: str, value: str) -> str:
"""Remove a value from a Redis set.
Args:
name: The Redis set key.
value: The value to remove from the set.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
removed = r.srem(name, value)
return (
f"Value '{value}' removed from set '{name}'."
if removed
else f"Value '{value}' not found in set '{name}'."
)
except RedisError as e:
return f"Error removing value '{value}' from set '{name}': {str(e)}"
@mcp.tool()
async def smembers(name: str) -> Union[str, List[str]]:
"""Get all members of a Redis set.
Args:
name: The Redis set key.
Returns:
A list of values in the set or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
members = r.smembers(name)
return list(members) if members else f"Set '{name}' is empty or does not exist."
except RedisError as e:
return f"Error retrieving members of set '{name}': {str(e)}"
```
--------------------------------------------------------------------------------
/src/tools/stream.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, Any, Optional
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def xadd(
key: str, fields: Dict[str, Any], expiration: Optional[int] = None
) -> str:
"""Add an entry to a Redis stream with an optional expiration time.
Args:
key (str): The stream key.
fields (dict): The fields and values for the stream entry.
expiration (int, optional): Expiration time in seconds.
Returns:
str: The ID of the added entry or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
entry_id = r.xadd(key, fields)
if expiration:
r.expire(key, expiration)
return f"Successfully added entry {entry_id} to {key}" + (
f" with expiration {expiration} seconds" if expiration else ""
)
except RedisError as e:
return f"Error adding to stream {key}: {str(e)}"
@mcp.tool()
async def xrange(key: str, count: int = 1) -> str:
"""Read entries from a Redis stream.
Args:
key (str): The stream key.
count (int, optional): Number of entries to retrieve.
Returns:
str: The retrieved stream entries or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
entries = r.xrange(key, count=count)
return str(entries) if entries else f"Stream {key} is empty or does not exist"
except RedisError as e:
return f"Error reading from stream {key}: {str(e)}"
@mcp.tool()
async def xdel(key: str, entry_id: str) -> str:
"""Delete an entry from a Redis stream.
Args:
key (str): The stream key.
entry_id (str): The ID of the entry to delete.
Returns:
str: Confirmation message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
result = r.xdel(key, entry_id)
return (
f"Successfully deleted entry {entry_id} from {key}"
if result
else f"Entry {entry_id} not found in {key}"
)
except RedisError as e:
return f"Error deleting from stream {key}: {str(e)}"
```
--------------------------------------------------------------------------------
/src/tools/sorted_set.py:
--------------------------------------------------------------------------------
```python
from typing import Optional
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def zadd(
key: str, score: float, member: str, expiration: Optional[int] = None
) -> str:
"""Add a member to a Redis sorted set with an optional expiration time.
Args:
key (str): The sorted set key.
score (float): The score of the member.
member (str): The member to add.
expiration (int, optional): Expiration time in seconds.
Returns:
str: Confirmation message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
r.zadd(key, {member: score})
if expiration:
r.expire(key, expiration)
return f"Successfully added {member} to {key} with score {score}" + (
f" and expiration {expiration} seconds" if expiration else ""
)
except RedisError as e:
return f"Error adding to sorted set {key}: {str(e)}"
@mcp.tool()
async def zrange(key: str, start: int, end: int, with_scores: bool = False) -> str:
"""Retrieve a range of members from a Redis sorted set.
Args:
key (str): The sorted set key.
start (int): The starting index.
end (int): The ending index.
with_scores (bool, optional): Whether to include scores in the result.
Returns:
str: The sorted set members in the given range or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
members = r.zrange(key, start, end, withscores=with_scores)
return (
str(members) if members else f"Sorted set {key} is empty or does not exist"
)
except RedisError as e:
return f"Error retrieving sorted set {key}: {str(e)}"
@mcp.tool()
async def zrem(key: str, member: str) -> str:
"""Remove a member from a Redis sorted set.
Args:
key (str): The sorted set key.
member (str): The member to remove.
Returns:
str: Confirmation message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
result = r.zrem(key, member)
return (
f"Successfully removed {member} from {key}"
if result
else f"Member {member} not found in {key}"
)
except RedisError as e:
return f"Error removing from sorted set {key}: {str(e)}"
```
--------------------------------------------------------------------------------
/examples/redis_assistant.py:
--------------------------------------------------------------------------------
```python
import asyncio
from agents import Agent, Runner
from openai.types.responses import ResponseTextDeltaEvent
from agents.mcp import MCPServerStdio
from collections import deque
# Set up and create the agent
async def build_agent():
# Redis MCP Server. Pass the environment configuration for the MCP Server in the JSON
server = MCPServerStdio(
params={
"command": "uv",
"args": [
"--directory",
"../src/", # change with the path to the MCP server
"run",
"main.py",
],
"env": {
"REDIS_HOST": "127.0.0.1",
"REDIS_PORT": "6379",
"REDIS_USERNAME": "default",
"REDIS_PWD": "",
},
}
)
await server.connect()
# Create and return the agent
agent = Agent(
name="Redis Assistant",
instructions="You are a helpful assistant capable of reading and writing to Redis. Store every question and answer in the Redis Stream app:logger",
mcp_servers=[server],
)
return agent
# CLI interaction
async def cli(agent, max_history=30):
print("🔧 Redis Assistant CLI — Ask me something (type 'exit' to quit):\n")
conversation_history = deque(maxlen=max_history)
while True:
q = input("❓> ")
if q.strip().lower() in {"exit", "quit"}:
break
if len(q.strip()) > 0:
# Format the context into a single string
history = ""
for turn in conversation_history:
prefix = "User" if turn["role"] == "user" else "Assistant"
history += f"{prefix}: {turn['content']}\n"
context = f"Conversation history:/n{history.strip()} /n New question:/n{q.strip()}"
result = Runner.run_streamed(agent, context)
response_text = ""
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
print(event.data.delta, end="", flush=True)
response_text += event.data.delta
print("\n")
# Add the user's message and the assistant's reply in history
conversation_history.append({"role": "user", "content": q})
conversation_history.append({"role": "assistant", "content": response_text})
# Main entry point
async def main():
agent = await build_agent()
await cli(agent)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/src/tools/list.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Union, List, Optional
from redis.exceptions import RedisError
from redis.typing import FieldT
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def lpush(name: str, value: FieldT, expire: Optional[int] = None) -> str:
"""Push a value onto the left of a Redis list and optionally set an expiration time."""
try:
r = RedisConnectionManager.get_connection()
r.lpush(name, value)
if expire:
r.expire(name, expire)
return f"Value '{value}' pushed to the left of list '{name}'."
except RedisError as e:
return f"Error pushing value to list '{name}': {str(e)}"
@mcp.tool()
async def rpush(name: str, value: FieldT, expire: Optional[int] = None) -> str:
"""Push a value onto the right of a Redis list and optionally set an expiration time."""
try:
r = RedisConnectionManager.get_connection()
r.rpush(name, value)
if expire:
r.expire(name, expire)
return f"Value '{value}' pushed to the right of list '{name}'."
except RedisError as e:
return f"Error pushing value to list '{name}': {str(e)}"
@mcp.tool()
async def lpop(name: str) -> str:
"""Remove and return the first element from a Redis list."""
try:
r = RedisConnectionManager.get_connection()
value = r.lpop(name)
return value if value else f"List '{name}' is empty or does not exist."
except RedisError as e:
return f"Error popping value from list '{name}': {str(e)}"
@mcp.tool()
async def rpop(name: str) -> str:
"""Remove and return the last element from a Redis list."""
try:
r = RedisConnectionManager.get_connection()
value = r.rpop(name)
return value if value else f"List '{name}' is empty or does not exist."
except RedisError as e:
return f"Error popping value from list '{name}': {str(e)}"
@mcp.tool()
async def lrange(name: str, start: int, stop: int) -> Union[str, List[str]]:
"""Get elements from a Redis list within a specific range.
Returns:
str: A JSON string containing the list of elements or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
values = r.lrange(name, start, stop)
if not values:
return f"List '{name}' is empty or does not exist."
else:
return json.dumps(values)
except RedisError as e:
return f"Error retrieving values from list '{name}': {str(e)}"
@mcp.tool()
async def llen(name: str) -> int:
"""Get the length of a Redis list."""
try:
r = RedisConnectionManager.get_connection()
return r.llen(name)
except RedisError as e:
return f"Error retrieving length of list '{name}': {str(e)}"
```
--------------------------------------------------------------------------------
/src/tools/json.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Optional
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def json_set(
name: str,
path: str,
value: str,
expire_seconds: Optional[int] = None,
) -> str:
"""Set a JSON value in Redis at a given path with an optional expiration time.
Args:
name: The Redis key where the JSON document is stored.
path: The JSON path where the value should be set.
value: The JSON value to store (as JSON string, or will be auto-converted).
expire_seconds: Optional; time in seconds after which the key should expire.
Returns:
A success message or an error message.
"""
# Try to parse the value as JSON, if it fails, treat it as a plain string
try:
parsed_value = json.loads(value)
except (json.JSONDecodeError, TypeError):
parsed_value = value
try:
r = RedisConnectionManager.get_connection()
r.json().set(name, path, parsed_value)
if expire_seconds is not None:
r.expire(name, expire_seconds)
return f"JSON value set at path '{path}' in '{name}'." + (
f" Expires in {expire_seconds} seconds." if expire_seconds else ""
)
except RedisError as e:
return f"Error setting JSON value at path '{path}' in '{name}': {str(e)}"
@mcp.tool()
async def json_get(name: str, path: str = "$") -> str:
"""Retrieve a JSON value from Redis at a given path.
Args:
name: The Redis key where the JSON document is stored.
path: The JSON path to retrieve (default: root '$').
Returns:
The retrieved JSON value or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
value = r.json().get(name, path)
if value is not None:
# Convert the value to JSON string for consistent return type
return json.dumps(value, ensure_ascii=False, indent=2)
else:
return f"No data found at path '{path}' in '{name}'."
except RedisError as e:
return f"Error retrieving JSON value at path '{path}' in '{name}': {str(e)}"
@mcp.tool()
async def json_del(name: str, path: str = "$") -> str:
"""Delete a JSON value from Redis at a given path.
Args:
name: The Redis key where the JSON document is stored.
path: The JSON path to delete (default: root '$').
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
deleted = r.json().delete(name, path)
return (
f"Deleted JSON value at path '{path}' in '{name}'."
if deleted
else f"No JSON value found at path '{path}' in '{name}'."
)
except RedisError as e:
return f"Error deleting JSON value at path '{path}' in '{name}': {str(e)}"
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "redis-mcp-server"
version = "0.3.5"
description = "Redis MCP Server - Model Context Protocol server for Redis"
readme = "README.md"
requires-python = ">=3.10"
license = "MIT"
authors = [
{name = "Redis", email = "[email protected]"}
]
keywords = ["redis", "mcp", "model-context-protocol", "ai", "llm"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: Database",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"mcp[cli]>=1.9.4",
"redis>=6.0.0",
"dotenv>=0.9.9",
"numpy>=2.2.4",
"click>=8.0.0",
"redis-entraid>=1.0.0",
]
[project.scripts]
redis-mcp-server = "src.main:cli"
[project.urls]
Homepage = "https://github.com/redis/mcp-redis"
Repository = "https://github.com/redis/mcp-redis"
Issues = "https://github.com/redis/mcp-redis/issues"
[build-system]
requires = ["uv_build>=0.8.3,<0.10.0"]
build-backend = "uv_build"
[tool.uv.build-backend]
module-name = "src"
module-root = "."
# Security configuration for bandit
[tool.bandit]
exclude_dirs = ["tests", "build", "dist"]
skips = ["B101", "B601"] # Skip assert_used and shell_injection_process_args if needed
[tool.bandit.assert_used]
skips = ["*_test.py", "*/test_*.py"]
[dependency-groups]
dev = [
"bandit[toml]>=1.8.6",
"black>=25.1.0",
"coverage>=7.10.1",
"mypy>=1.17.0",
"pytest>=8.4.1",
"pytest-asyncio>=1.1.0",
"pytest-cov>=6.2.1",
"pytest-mock>=3.12.0",
"ruff>=0.12.5",
"safety>=3.6.0",
"twine>=4.0",
]
test = [
"pytest>=8.4.1",
"pytest-asyncio>=1.1.0",
"pytest-cov>=6.2.1",
"pytest-mock>=3.12.0",
"coverage>=7.10.1",
]
# Testing configuration
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--strict-config",
"--verbose",
"--cov=src",
"--cov-report=html",
"--cov-report=term",
"--cov-report=xml",
"--cov-fail-under=80",
]
markers = [
"unit: marks tests as unit tests",
"integration: marks tests as integration tests",
"slow: marks tests as slow running",
]
asyncio_mode = "auto"
filterwarnings = [
"ignore::DeprecationWarning",
"ignore::PendingDeprecationWarning",
]
[tool.coverage.run]
source = ["src"]
omit = [
"*/tests/*",
"*/test_*.py",
"*/__pycache__/*",
"*/venv/*",
"*/.venv/*",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]
```
--------------------------------------------------------------------------------
/.github/workflows/stale-issues.yml:
--------------------------------------------------------------------------------
```yaml
name: "Stale Issue Management"
on:
schedule:
# Run daily at midnight UTC
- cron: "0 0 * * *"
workflow_dispatch: # Allow manual triggering
env:
# Default stale policy timeframes
DAYS_BEFORE_STALE: 365
DAYS_BEFORE_CLOSE: 30
# Accelerated timeline for needs-information issues
NEEDS_INFO_DAYS_BEFORE_STALE: 30
NEEDS_INFO_DAYS_BEFORE_CLOSE: 7
jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v10
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
dry-run: true
# Default stale policy
days-before-stale: ${{ env.DAYS_BEFORE_STALE }}
days-before-close: ${{ env.DAYS_BEFORE_CLOSE }}
# Explicit stale label configuration
stale-issue-label: "stale"
stale-pr-label: "stale"
stale-issue-message: |
This issue has been automatically marked as stale due to inactivity.
It will be closed in 30 days if no further activity occurs.
If you believe this issue is still relevant, please add a comment to keep it open.
close-issue-message: |
This issue has been automatically closed due to inactivity.
If you believe this issue is still relevant, please reopen it or create a new issue with updated information.
# Exclude needs-information issues from this job
exempt-issue-labels: 'no-stale,needs-information'
# Remove stale label when issue/PR becomes active again
remove-stale-when-updated: true
# Apply to pull requests with same timeline
days-before-pr-stale: ${{ env.DAYS_BEFORE_STALE }}
days-before-pr-close: ${{ env.DAYS_BEFORE_CLOSE }}
stale-pr-message: |
This pull request has been automatically marked as stale due to inactivity.
It will be closed in 30 days if no further activity occurs.
close-pr-message: |
This pull request has been automatically closed due to inactivity.
If you would like to continue this work, please reopen the PR or create a new one.
# Only exclude no-stale PRs (needs-information PRs follow standard timeline)
exempt-pr-labels: 'no-stale'
# Separate job for needs-information issues ONLY with accelerated timeline
stale-needs-info:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v10
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
dry-run: true
# Accelerated timeline for needs-information
days-before-stale: ${{ env.NEEDS_INFO_DAYS_BEFORE_STALE }}
days-before-close: ${{ env.NEEDS_INFO_DAYS_BEFORE_CLOSE }}
# Explicit stale label configuration
stale-issue-label: "stale"
# Only target ISSUES with needs-information label (not PRs)
only-issue-labels: 'needs-information'
stale-issue-message: |
This issue has been marked as stale because it requires additional information
that has not been provided for 30 days. It will be closed in 7 days if the
requested information is not provided.
close-issue-message: |
This issue has been closed because the requested information was not provided within the specified timeframe.
If you can provide the missing information, please reopen this issue or create a new one.
# Disable PR processing for this job
days-before-pr-stale: -1
days-before-pr-close: -1
# Remove stale label when issue becomes active again
remove-stale-when-updated: true
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""
Pytest configuration and fixtures for Redis MCP Server tests.
"""
from unittest.mock import Mock, patch
import pytest
import redis
from redis.exceptions import ConnectionError, RedisError, TimeoutError
@pytest.fixture
def mock_redis():
"""Create a mock Redis connection."""
mock = Mock(spec=redis.Redis)
return mock
@pytest.fixture
def mock_redis_cluster():
"""Create a mock Redis Cluster connection."""
mock = Mock(spec=redis.cluster.RedisCluster)
return mock
@pytest.fixture
def mock_redis_connection_manager():
"""Mock the RedisConnectionManager to return a mock Redis connection."""
with patch(
"src.common.connection.RedisConnectionManager.get_connection"
) as mock_get_conn:
mock_redis = Mock(spec=redis.Redis)
mock_get_conn.return_value = mock_redis
yield mock_redis
@pytest.fixture
def redis_config():
"""Sample Redis configuration for testing."""
return {
"host": "localhost",
"port": 6379,
"db": 0,
"username": None,
"password": "",
"ssl": False,
"ssl_ca_path": None,
"ssl_keyfile": None,
"ssl_certfile": None,
"ssl_cert_reqs": "required",
"ssl_ca_certs": None,
"cluster_mode": False,
}
@pytest.fixture
def redis_uri_samples():
"""Sample Redis URIs for testing."""
return {
"basic": "redis://localhost:6379/0",
"with_auth": "redis://user:pass@localhost:6379/0",
"ssl": "rediss://user:pass@localhost:6379/0",
"with_query": "redis://localhost:6379/0?ssl_cert_reqs=required",
"cluster": "redis://localhost:6379/0?cluster_mode=true",
}
@pytest.fixture
def sample_vector():
"""Sample vector for testing vector operations."""
return [0.1, 0.2, 0.3, 0.4, 0.5]
@pytest.fixture
def sample_json_data():
"""Sample JSON data for testing."""
return {
"name": "John Doe",
"age": 30,
"city": "New York",
"hobbies": ["reading", "swimming"],
}
@pytest.fixture
def redis_error_scenarios():
"""Common Redis error scenarios for testing."""
return {
"connection_error": ConnectionError("Connection refused"),
"timeout_error": TimeoutError("Operation timed out"),
"generic_error": RedisError("Generic Redis error"),
"auth_error": RedisError("NOAUTH Authentication required"),
"wrong_type": RedisError(
"WRONGTYPE Operation against a key holding the wrong kind of value"
),
}
@pytest.fixture(autouse=True)
def reset_connection_manager():
"""Reset the RedisConnectionManager singleton before each test."""
from src.common.connection import RedisConnectionManager
RedisConnectionManager._instance = None
yield
RedisConnectionManager._instance = None
@pytest.fixture
def mock_numpy_array():
"""Mock numpy array for vector testing."""
with patch("numpy.array") as mock_array:
mock_array.return_value.tobytes.return_value = b"mock_binary_data"
yield mock_array
@pytest.fixture
def mock_numpy_frombuffer():
"""Mock numpy frombuffer for vector testing."""
with patch("numpy.frombuffer") as mock_frombuffer:
mock_frombuffer.return_value.tolist.return_value = [0.1, 0.2, 0.3]
yield mock_frombuffer
# Async test helpers
@pytest.fixture
def event_loop():
"""Create an event loop for async tests."""
import asyncio
loop = asyncio.new_event_loop()
yield loop
loop.close()
# Mark configurations
def pytest_configure(config):
"""Configure pytest markers."""
config.addinivalue_line("markers", "unit: mark test as a unit test")
config.addinivalue_line("markers", "integration: mark test as an integration test")
config.addinivalue_line("markers", "slow: mark test as slow running")
```
--------------------------------------------------------------------------------
/tests/test_logging_utils.py:
--------------------------------------------------------------------------------
```python
import logging
import sys
import pytest
from src.common.logging_utils import resolve_log_level, configure_logging
@pytest.fixture()
def preserve_logging():
"""Snapshot and restore the root logger state to avoid cross-test interference."""
root = logging.getLogger()
saved_level = root.level
saved_handlers = list(root.handlers)
saved_handler_levels = [h.level for h in saved_handlers]
try:
yield
finally:
# Remove any handlers added during the test
for h in list(root.handlers):
try:
root.removeHandler(h)
except Exception:
pass
# Restore original handlers and their levels
for h, lvl in zip(saved_handlers, saved_handler_levels):
try:
root.addHandler(h)
h.setLevel(lvl)
except Exception:
pass
# Restore original root level
root.setLevel(saved_level)
# Best-effort: disable warnings capture enabled by configure_logging
try:
logging.captureWarnings(False)
except Exception:
pass
def test_resolve_log_level_default_warning(monkeypatch):
monkeypatch.delenv("MCP_REDIS_LOG_LEVEL", raising=False)
assert resolve_log_level() == logging.WARNING
def test_resolve_log_level_parses_name_and_alias(monkeypatch):
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "info")
assert resolve_log_level() == logging.INFO
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "WARN")
assert resolve_log_level() == logging.WARNING
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "fatal")
assert resolve_log_level() == logging.CRITICAL
def test_resolve_log_level_parses_numeric(monkeypatch):
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "10")
assert resolve_log_level() == 10
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "+20")
assert resolve_log_level() == 20
def test_configure_logging_adds_stderr_handler_when_none(monkeypatch, preserve_logging):
# Ensure no handlers exist before configuring
root = logging.getLogger()
for h in list(root.handlers):
root.removeHandler(h)
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "INFO")
level = configure_logging()
assert level == logging.INFO
assert len(root.handlers) == 1, (
"Should add exactly one stderr handler when none exist"
)
handler = root.handlers[0]
assert isinstance(handler, logging.StreamHandler)
# StreamHandler exposes the underlying stream attribute
assert getattr(handler, "stream", None) is sys.stderr
assert handler.level == logging.INFO
assert root.level == logging.INFO
def test_configure_logging_only_lowers_restrictive_handlers(
monkeypatch, preserve_logging
):
root = logging.getLogger()
# Start from a clean handler set
for h in list(root.handlers):
root.removeHandler(h)
# Add two handlers: one restrictive WARNING, one permissive NOTSET
h_warning = logging.StreamHandler(sys.stderr)
h_warning.setLevel(logging.WARNING)
root.addHandler(h_warning)
h_notset = logging.StreamHandler(sys.stderr)
h_notset.setLevel(logging.NOTSET)
root.addHandler(h_notset)
# Request DEBUG
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "DEBUG")
configure_logging()
# The WARNING handler should be lowered to DEBUG; NOTSET should remain NOTSET
assert h_warning.level == logging.DEBUG
assert h_notset.level == logging.NOTSET
def test_configure_logging_does_not_raise_handler_threshold(
monkeypatch, preserve_logging
):
root = logging.getLogger()
# Clean handlers
for h in list(root.handlers):
root.removeHandler(h)
# Add a handler at WARNING and then set env to ERROR
h_warning = logging.StreamHandler(sys.stderr)
h_warning.setLevel(logging.WARNING)
root.addHandler(h_warning)
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "ERROR")
configure_logging()
# Handler should remain at WARNING (30), not be raised to ERROR (40)
assert h_warning.level == logging.WARNING
# Root level should reflect ERROR
assert root.level == logging.ERROR
def test_configure_logging_does_not_add_handler_if_exists(
monkeypatch, preserve_logging
):
root = logging.getLogger()
# Start with one existing handler
for h in list(root.handlers):
root.removeHandler(h)
existing = logging.StreamHandler(sys.stderr)
root.addHandler(existing)
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "INFO")
configure_logging()
# Should not add another handler
assert len(root.handlers) == 1
assert root.handlers[0] is existing
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/common/server.py
"""
from unittest.mock import patch
from src.common.server import mcp
class TestMCPServer:
"""Test cases for MCP server initialization."""
def test_mcp_server_instance_exists(self):
"""Test that mcp server instance is created."""
assert mcp is not None
assert hasattr(mcp, "run")
assert hasattr(mcp, "tool")
def test_mcp_server_name(self):
"""Test that mcp server has correct name."""
# The FastMCP server should have the correct name
assert hasattr(mcp, "name") or hasattr(mcp, "_name")
# We can't directly access the name in FastMCP, but we can verify it's a FastMCP instance
assert str(type(mcp)) == "<class 'mcp.server.fastmcp.server.FastMCP'>"
def test_mcp_server_dependencies(self):
"""Test that mcp server has correct dependencies."""
# FastMCP should have dependencies configured
# We can't directly test this without accessing private attributes
# but we can verify the server was initialized properly
assert mcp is not None
@patch("mcp.server.fastmcp.FastMCP")
def test_mcp_server_initialization(self, mock_fastmcp):
"""Test MCP server initialization with correct parameters."""
# Re-import to trigger initialization
import importlib
import src.common.server
importlib.reload(src.common.server)
# Verify FastMCP was called with correct parameters
mock_fastmcp.assert_called_once_with(
"Redis MCP Server", dependencies=["redis", "dotenv", "numpy"]
)
def test_mcp_server_tool_decorator(self):
"""Test that mcp server provides tool decorator."""
assert hasattr(mcp, "tool")
assert callable(mcp.tool)
def test_mcp_server_run_method(self):
"""Test that mcp server provides run method."""
assert hasattr(mcp, "run")
assert callable(mcp.run)
@patch.object(mcp, "run")
def test_mcp_server_run_can_be_called(self, mock_run):
"""Test that mcp server run method can be called."""
mcp.run()
mock_run.assert_called_once()
def test_mcp_tool_decorator_functionality(self):
"""Test that the tool decorator can be used."""
# Test that we can use the decorator (this tests the decorator exists and is callable)
@mcp.tool()
async def test_tool():
"""Test tool for decorator functionality."""
return "test"
# Verify the decorator worked
assert callable(test_tool)
assert hasattr(test_tool, "__name__")
assert test_tool.__name__ == "test_tool"
def test_mcp_tool_decorator_with_parameters(self):
"""Test that the tool decorator works with parameters."""
@mcp.tool()
async def test_tool_with_params(param1: str, param2: int = 10):
"""Test tool with parameters."""
return f"{param1}:{param2}"
# Verify the decorator worked
assert callable(test_tool_with_params)
assert hasattr(test_tool_with_params, "__name__")
def test_mcp_server_is_singleton(self):
"""Test that importing server multiple times returns same instance."""
from src.common.server import mcp as mcp1
from src.common.server import mcp as mcp2
assert mcp1 is mcp2
assert id(mcp1) == id(mcp2)
@patch("mcp.server.fastmcp.FastMCP")
def test_mcp_server_dependencies_list(self, mock_fastmcp):
"""Test that MCP server is initialized with correct dependencies list."""
# Re-import to trigger initialization
import importlib
import src.common.server
importlib.reload(src.common.server)
# Get the call arguments
call_args = mock_fastmcp.call_args
assert call_args[0][0] == "Redis MCP Server" # First positional argument
assert call_args[1]["dependencies"] == [
"redis",
"dotenv",
"numpy",
] # Keyword argument
def test_mcp_server_type(self):
"""Test that mcp server is of correct type."""
from mcp.server.fastmcp import FastMCP
assert isinstance(mcp, FastMCP)
def test_mcp_server_attributes(self):
"""Test that mcp server has expected attributes."""
# Test for common FastMCP attributes
expected_attributes = ["run", "tool"]
for attr in expected_attributes:
assert hasattr(mcp, attr), f"MCP server missing attribute: {attr}"
assert callable(getattr(mcp, attr)), (
f"MCP server attribute {attr} is not callable"
)
```
--------------------------------------------------------------------------------
/src/common/connection.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Optional, Type, Union
import redis
from redis import Redis
from redis.cluster import RedisCluster
from src.common.config import REDIS_CFG, is_entraid_auth_enabled
from src.common.entraid_auth import (
create_credential_provider,
EntraIDAuthenticationError,
)
from src.version import __version__
_logger = logging.getLogger(__name__)
class RedisConnectionManager:
_instance: Optional[Redis] = None
@classmethod
def get_connection(cls, decode_responses=True) -> Redis:
if cls._instance is None:
try:
# Create Entra ID credential provider if configured
credential_provider = None
if is_entraid_auth_enabled():
try:
credential_provider = create_credential_provider()
except EntraIDAuthenticationError as e:
_logger.error(
"Failed to create Entra ID credential provider: %s", e
)
raise
if REDIS_CFG["cluster_mode"]:
redis_class: Type[Union[Redis, RedisCluster]] = (
redis.cluster.RedisCluster
)
connection_params = {
"host": REDIS_CFG["host"],
"port": REDIS_CFG["port"],
"username": REDIS_CFG["username"],
"password": REDIS_CFG["password"],
"ssl": REDIS_CFG["ssl"],
"ssl_ca_path": REDIS_CFG["ssl_ca_path"],
"ssl_keyfile": REDIS_CFG["ssl_keyfile"],
"ssl_certfile": REDIS_CFG["ssl_certfile"],
"ssl_cert_reqs": REDIS_CFG["ssl_cert_reqs"],
"ssl_ca_certs": REDIS_CFG["ssl_ca_certs"],
"decode_responses": decode_responses,
"lib_name": f"redis-py(mcp-server_v{__version__})",
"max_connections_per_node": 10,
}
# Add credential provider if available
if credential_provider:
connection_params["credential_provider"] = credential_provider
# Note: Azure Redis Enterprise with EntraID uses plain text connections
# SSL setting is controlled by REDIS_SSL environment variable
else:
redis_class: Type[Union[Redis, RedisCluster]] = redis.Redis
connection_params = {
"host": REDIS_CFG["host"],
"port": REDIS_CFG["port"],
"db": REDIS_CFG["db"],
"username": REDIS_CFG["username"],
"password": REDIS_CFG["password"],
"ssl": REDIS_CFG["ssl"],
"ssl_ca_path": REDIS_CFG["ssl_ca_path"],
"ssl_keyfile": REDIS_CFG["ssl_keyfile"],
"ssl_certfile": REDIS_CFG["ssl_certfile"],
"ssl_cert_reqs": REDIS_CFG["ssl_cert_reqs"],
"ssl_ca_certs": REDIS_CFG["ssl_ca_certs"],
"decode_responses": decode_responses,
"lib_name": f"redis-py(mcp-server_v{__version__})",
"max_connections": 10,
}
# Add credential provider if available
if credential_provider:
connection_params["credential_provider"] = credential_provider
# Note: Azure Redis Enterprise with EntraID uses plain text connections
# SSL setting is controlled by REDIS_SSL environment variable
cls._instance = redis_class(**connection_params)
except redis.exceptions.ConnectionError:
_logger.error("Failed to connect to Redis server")
raise
except redis.exceptions.AuthenticationError:
_logger.error("Authentication failed")
raise
except redis.exceptions.TimeoutError:
_logger.error("Connection timed out")
raise
except redis.exceptions.ResponseError as e:
_logger.error("Response error: %s", e)
raise
except redis.exceptions.RedisError as e:
_logger.error("Redis error: %s", e)
raise
except redis.exceptions.ClusterError as e:
_logger.error("Redis Cluster error: %s", e)
raise
except Exception as e:
_logger.error("Unexpected error: %s", e)
raise
return cls._instance
```
--------------------------------------------------------------------------------
/src/tools/redis_query_engine.py:
--------------------------------------------------------------------------------
```python
import json
from typing import List, Optional, Union, Dict, Any
import numpy as np
from redis.commands.search.field import VectorField
from redis.commands.search.index_definition import IndexDefinition
from redis.commands.search.query import Query
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def get_indexes() -> str:
"""List of indexes in the Redis database
Returns:
str: A JSON string containing the list of indexes or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
return json.dumps(r.execute_command("FT._LIST"))
except RedisError as e:
return f"Error retrieving indexes: {str(e)}"
@mcp.tool()
async def get_index_info(index_name: str) -> str:
"""Retrieve schema and information about a specific Redis index using FT.INFO.
Args:
index_name (str): The name of the index to retrieve information about.
Returns:
str: Information about the specified index or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
info = r.ft(index_name).info()
return json.dumps(info, ensure_ascii=False, indent=2)
except RedisError as e:
return f"Error retrieving index info: {str(e)}"
@mcp.tool()
async def get_indexed_keys_number(index_name: str) -> str:
"""Retrieve the number of indexed keys by the index
Args:
index_name (str): The name of the index to retrieve information about.
Returns:
str: Number of indexed keys as a string
"""
try:
r = RedisConnectionManager.get_connection()
total = r.ft(index_name).search(Query("*")).total
return str(total)
except RedisError as e:
return f"Error retrieving number of keys: {str(e)}"
@mcp.tool()
async def create_vector_index_hash(
index_name: str = "vector_index",
prefix: str = "doc:",
vector_field: str = "vector",
dim: int = 1536,
distance_metric: str = "COSINE",
) -> str:
"""
Create a Redis 8 vector similarity index using HNSW on a Redis hash.
This function sets up a Redis index for approximate nearest neighbor (ANN)
search using the HNSW algorithm and float32 vector embeddings.
Args:
index_name: The name of the Redis index to create. Unless specifically required, use the default name for the index.
prefix: The key prefix used to identify documents to index (e.g., 'doc:'). Unless specifically required, use the default prefix.
vector_field: The name of the vector field to be indexed for similarity search. Unless specifically required, use the default field name
dim: The dimensionality of the vectors stored under the vector_field.
distance_metric: The distance function to use (e.g., 'COSINE', 'L2', 'IP').
Returns:
A string indicating whether the index was created successfully or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
index_def = IndexDefinition(prefix=[prefix])
schema = VectorField(
vector_field,
"HNSW",
{"TYPE": "FLOAT32", "DIM": dim, "DISTANCE_METRIC": distance_metric},
)
r.ft(index_name).create_index([schema], definition=index_def)
return f"Index '{index_name}' created successfully."
except RedisError as e:
return f"Error creating index '{index_name}': {str(e)}"
@mcp.tool()
async def vector_search_hash(
query_vector: List[float],
index_name: str = "vector_index",
vector_field: str = "vector",
k: int = 5,
return_fields: Optional[List[str]] = None,
) -> Union[List[Dict[str, Any]], str]:
"""
Perform a KNN vector similarity search using Redis 8 or later version on vectors stored in hash data structures.
Args:
query_vector: List of floats to use as the query vector.
index_name: Name of the Redis index. Unless specifically specified, use the default index name.
vector_field: Name of the indexed vector field. Unless specifically required, use the default field name
k: Number of nearest neighbors to return.
return_fields: List of fields to return (optional).
Returns:
A list of matched documents or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
# Convert query vector to float32 binary blob
vector_blob = np.array(query_vector, dtype=np.float32).tobytes()
# Build the KNN query
base_query = f"*=>[KNN {k} @{vector_field} $vec_param AS score]"
query = (
Query(base_query)
.sort_by("score")
.paging(0, k)
.return_fields("id", "score", *return_fields or [])
.dialect(2)
)
# Perform the search with vector parameter
results = r.ft(index_name).search(
query, query_params={"vec_param": vector_blob}
)
# Format and return the results
return [doc.__dict__ for doc in results.docs]
except RedisError as e:
return f"Error performing vector search on index '{index_name}': {str(e)}"
```
--------------------------------------------------------------------------------
/src/common/entraid_auth.py:
--------------------------------------------------------------------------------
```python
"""
Entra ID authentication provider factory for Redis MCP Server.
This module provides factory methods to create credential providers for different
Azure authentication flows based on configuration.
"""
import logging
from src.common.config import (
ENTRAID_CFG,
is_entraid_auth_enabled,
validate_entraid_config,
)
_logger = logging.getLogger(__name__)
# Reduce Azure SDK logging verbosity
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
logging.WARNING
)
logging.getLogger("azure.identity").setLevel(logging.WARNING)
logging.getLogger("redis.auth.token_manager").setLevel(logging.WARNING)
# Import redis-entraid components only when needed
try:
from redis_entraid.cred_provider import (
create_from_default_azure_credential,
create_from_managed_identity,
create_from_service_principal,
ManagedIdentityType,
TokenManagerConfig,
RetryPolicy,
)
ENTRAID_AVAILABLE = True
except ImportError:
_logger.warning(
"redis-entraid package not available. Entra ID authentication will be disabled."
)
ENTRAID_AVAILABLE = False
class EntraIDAuthenticationError(Exception):
"""Exception raised for Entra ID authentication configuration errors."""
pass
def create_credential_provider():
"""
Create an Entra ID credential provider based on the current configuration.
Returns:
Credential provider instance or None if Entra ID auth is not configured.
Raises:
EntraIDAuthenticationError: If configuration is invalid or required packages are missing.
"""
if not is_entraid_auth_enabled():
return None
if not ENTRAID_AVAILABLE:
raise EntraIDAuthenticationError(
"redis-entraid package is required for Entra ID authentication. "
"Install it with: pip install redis-entraid"
)
# Validate configuration
is_valid, error_message = validate_entraid_config()
if not is_valid:
raise EntraIDAuthenticationError(
f"Invalid Entra ID configuration: {error_message}"
)
auth_flow = ENTRAID_CFG["auth_flow"]
try:
# Create token manager configuration
token_manager_config = _create_token_manager_config()
if auth_flow == "service_principal":
return _create_service_principal_provider(token_manager_config)
elif auth_flow == "managed_identity":
return _create_managed_identity_provider(token_manager_config)
elif auth_flow == "default_credential":
return _create_default_credential_provider(token_manager_config)
else:
raise EntraIDAuthenticationError(
f"Unsupported authentication flow: {auth_flow}"
)
except Exception as e:
_logger.error("Failed to create Entra ID credential provider: %s", e)
raise EntraIDAuthenticationError(f"Failed to create credential provider: {e}")
def _create_token_manager_config():
"""Create TokenManagerConfig from current configuration."""
retry_policy = RetryPolicy(
max_attempts=ENTRAID_CFG["retry_max_attempts"],
delay_in_ms=ENTRAID_CFG["retry_delay_ms"],
)
return TokenManagerConfig(
expiration_refresh_ratio=ENTRAID_CFG["token_expiration_refresh_ratio"],
lower_refresh_bound_millis=ENTRAID_CFG["lower_refresh_bound_millis"],
token_request_execution_timeout_in_ms=ENTRAID_CFG[
"token_request_execution_timeout_ms"
],
retry_policy=retry_policy,
)
def _create_service_principal_provider(token_manager_config):
"""Create service principal credential provider."""
return create_from_service_principal(
client_id=ENTRAID_CFG["client_id"],
client_credential=ENTRAID_CFG["client_secret"],
tenant_id=ENTRAID_CFG["tenant_id"],
token_manager_config=token_manager_config,
)
def _create_managed_identity_provider(token_manager_config):
"""Create managed identity credential provider."""
identity_type_str = ENTRAID_CFG["identity_type"]
if identity_type_str == "system_assigned":
identity_type = ManagedIdentityType.SYSTEM_ASSIGNED
return create_from_managed_identity(
identity_type=identity_type,
resource=ENTRAID_CFG["resource"],
token_manager_config=token_manager_config,
)
elif identity_type_str == "user_assigned":
identity_type = ManagedIdentityType.USER_ASSIGNED
return create_from_managed_identity(
identity_type=identity_type,
resource=ENTRAID_CFG["resource"],
client_id=ENTRAID_CFG["user_assigned_identity_client_id"],
token_manager_config=token_manager_config,
)
else:
raise EntraIDAuthenticationError(f"Invalid identity type: {identity_type_str}")
def _create_default_credential_provider(token_manager_config):
"""Create default Azure credential provider."""
# Parse scopes from configuration
scopes_str = ENTRAID_CFG["scopes"]
scopes = tuple(scope.strip() for scope in scopes_str.split(","))
return create_from_default_azure_credential(
scopes=scopes, token_manager_config=token_manager_config
)
```
--------------------------------------------------------------------------------
/src/tools/hash.py:
--------------------------------------------------------------------------------
```python
from typing import List, Union, Optional
import numpy as np
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def hset(
name: str, key: str, value: str | int | float, expire_seconds: Optional[int] = None
) -> str:
"""Set a field in a hash stored at key with an optional expiration time.
Args:
name: The Redis hash key.
key: The field name inside the hash.
value: The value to set.
expire_seconds: Optional; time in seconds after which the key should expire.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
r.hset(name, key, str(value))
if expire_seconds is not None:
r.expire(name, expire_seconds)
return f"Field '{key}' set successfully in hash '{name}'." + (
f" Expires in {expire_seconds} seconds." if expire_seconds else ""
)
except RedisError as e:
return f"Error setting field '{key}' in hash '{name}': {str(e)}"
@mcp.tool()
async def hget(name: str, key: str) -> str:
"""Get the value of a field in a Redis hash.
Args:
name: The Redis hash key.
key: The field name inside the hash.
Returns:
The field value or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
value = r.hget(name, key)
return value if value else f"Field '{key}' not found in hash '{name}'."
except RedisError as e:
return f"Error getting field '{key}' from hash '{name}': {str(e)}"
@mcp.tool()
async def hdel(name: str, key: str) -> str:
"""Delete a field from a Redis hash.
Args:
name: The Redis hash key.
key: The field name inside the hash.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
deleted = r.hdel(name, key)
return (
f"Field '{key}' deleted from hash '{name}'."
if deleted
else f"Field '{key}' not found in hash '{name}'."
)
except RedisError as e:
return f"Error deleting field '{key}' from hash '{name}': {str(e)}"
@mcp.tool()
async def hgetall(name: str) -> dict:
"""Get all fields and values from a Redis hash.
Args:
name: The Redis hash key.
Returns:
A dictionary of field-value pairs or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
hash_data = r.hgetall(name)
return (
{k: v for k, v in hash_data.items()}
if hash_data
else f"Hash '{name}' is empty or does not exist."
)
except RedisError as e:
return f"Error getting all fields from hash '{name}': {str(e)}"
@mcp.tool()
async def hexists(name: str, key: str) -> bool:
"""Check if a field exists in a Redis hash.
Args:
name: The Redis hash key.
key: The field name inside the hash.
Returns:
True if the field exists, False otherwise.
"""
try:
r = RedisConnectionManager.get_connection()
return r.hexists(name, key)
except RedisError as e:
return f"Error checking existence of field '{key}' in hash '{name}': {str(e)}"
@mcp.tool()
async def set_vector_in_hash(
name: str, vector: List[float], vector_field: str = "vector"
) -> Union[bool, str]:
"""Store a vector as a field in a Redis hash.
Args:
name: The Redis hash key.
vector_field: The field name inside the hash. Unless specifically required, use the default field name
vector: The vector (list of numbers) to store in the hash.
Returns:
True if the vector was successfully stored, False otherwise.
"""
try:
r = RedisConnectionManager.get_connection()
# Convert the vector to a NumPy array, then to a binary blob using np.float32
vector_array = np.array(vector, dtype=np.float32)
binary_blob = vector_array.tobytes()
r.hset(name, vector_field, binary_blob)
return True
except RedisError as e:
return f"Error storing vector in hash '{name}' with field '{vector_field}': {str(e)}"
@mcp.tool()
async def get_vector_from_hash(name: str, vector_field: str = "vector"):
"""Retrieve a vector from a Redis hash and convert it back from binary blob.
Args:
name: The Redis hash key.
vector_field: The field name inside the hash. Unless specifically required, use the default field name
Returns:
The vector as a list of floats, or an error message if retrieval fails.
"""
try:
r = RedisConnectionManager.get_connection(decode_responses=False)
# Retrieve the binary blob stored in the hash
binary_blob = r.hget(name, vector_field)
if binary_blob:
# Convert the binary blob back to a NumPy array (assuming it's stored as float32)
vector_array = np.frombuffer(binary_blob, dtype=np.float32)
return vector_array.tolist()
else:
return f"Field '{vector_field}' not found in hash '{name}'."
except RedisError as e:
return f"Error retrieving vector field '{vector_field}' from hash '{name}': {str(e)}"
```
--------------------------------------------------------------------------------
/src/main.py:
--------------------------------------------------------------------------------
```python
import sys
import logging
import click
from src.common.config import (
parse_redis_uri,
set_redis_config_from_cli,
set_entraid_config_from_cli,
)
from src.common.server import mcp
from src.common.logging_utils import configure_logging
class RedisMCPServer:
def __init__(self):
# Configure logging on server initialization (idempotent)
configure_logging()
self._logger = logging.getLogger(__name__)
self._logger.info("Starting the Redis MCP Server")
def run(self):
mcp.run()
@click.command()
@click.option(
"--url",
help="Redis connection URI (redis://user:pass@host:port/db or rediss:// for SSL)",
)
@click.option("--host", default="127.0.0.1", help="Redis host")
@click.option("--port", default=6379, type=int, help="Redis port")
@click.option("--db", default=0, type=int, help="Redis database number")
@click.option("--username", help="Redis username")
@click.option("--password", help="Redis password")
@click.option("--ssl", is_flag=True, help="Use SSL connection")
@click.option("--ssl-ca-path", help="Path to CA certificate file")
@click.option("--ssl-keyfile", help="Path to SSL key file")
@click.option("--ssl-certfile", help="Path to SSL certificate file")
@click.option(
"--ssl-cert-reqs", default="required", help="SSL certificate requirements"
)
@click.option("--ssl-ca-certs", help="Path to CA certificates file")
@click.option("--cluster-mode", is_flag=True, help="Enable Redis cluster mode")
# Entra ID Authentication Options
@click.option(
"--entraid-auth-flow",
type=click.Choice(["service_principal", "managed_identity", "default_credential"]),
help="Entra ID authentication flow",
)
@click.option(
"--entraid-client-id",
help="Entra ID client ID (for service principal or user-assigned managed identity)",
)
@click.option(
"--entraid-client-secret", help="Entra ID client secret (for service principal)"
)
@click.option("--entraid-tenant-id", help="Entra ID tenant ID (for service principal)")
@click.option(
"--entraid-identity-type",
type=click.Choice(["system_assigned", "user_assigned"]),
default="system_assigned",
help="Managed identity type",
)
@click.option(
"--entraid-scopes",
default="https://redis.azure.com/.default",
help="Entra ID scopes (comma-separated)",
)
@click.option(
"--entraid-resource", default="https://redis.azure.com/", help="Entra ID resource"
)
@click.option(
"--entraid-token-refresh-ratio",
type=float,
default=0.9,
help="Token expiration refresh ratio",
)
@click.option(
"--entraid-retry-max-attempts",
type=int,
default=3,
help="Maximum retry attempts for token requests",
)
@click.option(
"--entraid-retry-delay-ms",
type=int,
default=100,
help="Retry delay in milliseconds",
)
def cli(
url,
host,
port,
db,
username,
password,
ssl,
ssl_ca_path,
ssl_keyfile,
ssl_certfile,
ssl_cert_reqs,
ssl_ca_certs,
cluster_mode,
entraid_auth_flow,
entraid_client_id,
entraid_client_secret,
entraid_tenant_id,
entraid_identity_type,
entraid_scopes,
entraid_resource,
entraid_token_refresh_ratio,
entraid_retry_max_attempts,
entraid_retry_delay_ms,
):
"""Redis MCP Server - Model Context Protocol server for Redis."""
# Handle Redis URI if provided (and not empty)
# Note: gemini-cli passes the raw "${REDIS_URL}" string when the env var is not set
if url and url.strip() and url.strip() != "${REDIS_URL}":
try:
uri_config = parse_redis_uri(url)
set_redis_config_from_cli(uri_config)
except ValueError as e:
click.echo(f"Error parsing Redis URI: {e}", err=True)
sys.exit(1)
else:
# Set individual Redis parameters
config = {
"host": host,
"port": port,
"db": db,
"ssl": ssl,
"cluster_mode": cluster_mode,
}
if username:
config["username"] = username
if password:
config["password"] = password
if ssl_ca_path:
config["ssl_ca_path"] = ssl_ca_path
if ssl_keyfile:
config["ssl_keyfile"] = ssl_keyfile
if ssl_certfile:
config["ssl_certfile"] = ssl_certfile
if ssl_cert_reqs:
config["ssl_cert_reqs"] = ssl_cert_reqs
if ssl_ca_certs:
config["ssl_ca_certs"] = ssl_ca_certs
set_redis_config_from_cli(config)
# Handle Entra ID authentication configuration
entraid_config = {}
if entraid_auth_flow:
entraid_config["auth_flow"] = entraid_auth_flow
if entraid_client_id:
entraid_config["client_id"] = entraid_client_id
if entraid_client_secret:
entraid_config["client_secret"] = entraid_client_secret
if entraid_tenant_id:
entraid_config["tenant_id"] = entraid_tenant_id
if entraid_identity_type:
entraid_config["identity_type"] = entraid_identity_type
if entraid_scopes:
entraid_config["scopes"] = entraid_scopes
if entraid_resource:
entraid_config["resource"] = entraid_resource
if entraid_token_refresh_ratio is not None:
entraid_config["token_expiration_refresh_ratio"] = entraid_token_refresh_ratio
if entraid_retry_max_attempts is not None:
entraid_config["retry_max_attempts"] = entraid_retry_max_attempts
if entraid_retry_delay_ms is not None:
entraid_config["retry_delay_ms"] = entraid_retry_delay_ms
# For user-assigned managed identity, use client_id as user_assigned_identity_client_id
if (
entraid_auth_flow == "managed_identity"
and entraid_identity_type == "user_assigned"
and entraid_client_id
):
entraid_config["user_assigned_identity_client_id"] = entraid_client_id
if entraid_config:
set_entraid_config_from_cli(entraid_config)
# Start the server
server = RedisMCPServer()
server.run()
def main():
"""Legacy main function for backward compatibility."""
server = RedisMCPServer()
server.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/GEMINI.md:
--------------------------------------------------------------------------------
```markdown
# Redis MCP Server Extension
This extension provides a natural language interface for managing and searching data in Redis through the Model Context Protocol (MCP).
## What this extension provides
The Redis MCP Server enables AI agents to efficiently interact with Redis databases using natural language commands. You can:
- **Store and retrieve data**: Cache items, store session data, manage configuration values
- **Work with data structures**: Manage hashes, lists, sets, sorted sets, and streams
- **Search and filter**: Perform efficient data retrieval and searching operations
- **Pub/Sub messaging**: Publish and subscribe to real-time message channels
- **JSON operations**: Store, retrieve, and manipulate JSON documents
- **Vector search**: Manage vector indexes and perform similarity searches
## Available Tools
### String Operations
- Set, get, and manage string values with optional expiration
- Useful for caching, session data, and simple configuration
### Hash Operations
- Store field-value pairs within a single key
- Support for vector embeddings storage
- Ideal for user profiles, product information, and structured objects
### List Operations
- Append, pop, and manage list items
- Perfect for queues, message brokers, and activity logs
### Set Operations
- Add, remove, and list unique set members
- Perform set operations like intersection and union
- Great for tracking unique values and tags
### Sorted Set Operations
- Manage score-based ordered data
- Ideal for leaderboards, priority queues, and time-based analytics
### Pub/Sub Operations
- Publish messages to channels and subscribe to receive them
- Real-time notifications and chat applications
### Stream Operations
- Add, read, and delete from data streams
- Event sourcing, activity feeds, and sensor data logging
### JSON Operations
- Store, retrieve, and manipulate JSON documents
- Complex nested data structures with path-based access
### Vector Search
- Manage vector indexes and perform similarity searches
- AI/ML applications and semantic search
### Server Management
- Retrieve database information and statistics
- Monitor Redis server status and performance
## Usage Examples
You can interact with Redis using natural language:
- "Store this user session data with a 1-hour expiration"
- "Add this item to the shopping cart list"
- "Search for similar vectors in the product embeddings"
- "Publish a notification to the alerts channel"
- "Get the top 10 scores from the leaderboard"
- "Cache this API response for 5 minutes"
## Configuration
The extension connects to Redis using a Redis URL. Default configuration connects to `redis://127.0.0.1:6379/0`.
### Primary Configuration: Redis URL
Set the `REDIS_URL` environment variable to configure your Redis connection:
```bash
export REDIS_URL=redis://[username:password@]host:port/database
```
### Configuration Examples
**Local Redis (no authentication):**
```bash
export REDIS_URL=redis://127.0.0.1:6379/0
# or
export REDIS_URL=redis://localhost:6379/0
```
**Redis with password:**
```bash
export REDIS_URL=redis://:mypassword@localhost:6379/0
```
**Redis with username and password:**
```bash
export REDIS_URL=redis://myuser:mypassword@localhost:6379/0
```
**Redis Cloud:**
```bash
export REDIS_URL=redis://default:[email protected]:12345/0
```
**Redis with SSL:**
```bash
export REDIS_URL=rediss://user:[email protected]:6380/0
```
**Redis with SSL and certificates:**
```bash
export REDIS_URL=rediss://user:pass@host:6380/0?ssl_cert_reqs=required&ssl_ca_certs=/path/to/ca.pem
```
**AWS ElastiCache:**
```bash
export REDIS_URL=redis://my-cluster.abc123.cache.amazonaws.com:6379/0
```
**Azure Cache for Redis:**
```bash
export REDIS_URL=rediss://mycache.redis.cache.windows.net:6380/0?ssl_cert_reqs=required
```
### Backward Compatibility: Individual Environment Variables
If `REDIS_URL` is not set, the extension will fall back to individual environment variables:
- `REDIS_HOST` - Redis hostname (default: 127.0.0.1)
- `REDIS_PORT` - Redis port (default: 6379)
- `REDIS_DB` - Database number (default: 0)
- `REDIS_USERNAME` - Redis username (optional)
- `REDIS_PWD` - Redis password (optional)
- `REDIS_SSL` - Enable SSL: "true" or "false" (default: false)
- `REDIS_SSL_CA_PATH` - Path to CA certificate file
- `REDIS_SSL_KEYFILE` - Path to SSL key file
- `REDIS_SSL_CERTFILE` - Path to SSL certificate file
- `REDIS_SSL_CERT_REQS` - SSL certificate requirements (default: "required")
- `REDIS_SSL_CA_CERTS` - Path to CA certificates file
- `REDIS_CLUSTER_MODE` - Enable cluster mode: "true" or "false" (default: false)
**Example using individual variables:**
```bash
export REDIS_HOST=my-redis-server.com
export REDIS_PORT=6379
export REDIS_PWD=mypassword
export REDIS_SSL=true
```
### Configuration Priority
1. **`REDIS_URL`** (highest priority) - If set, this will be used exclusively
2. **Individual environment variables** - Used as fallback when `REDIS_URL` is not set
3. **Built-in defaults** - Used when no configuration is provided
### Configuration Methods
1. **Environment Variables**: Set variables in your shell or system
2. **`.env` File**: Create a `.env` file in your project directory
3. **System Environment**: Set variables at the system level
4. **Shell Profile**: Add exports to your `.bashrc`, `.zshrc`, etc.
### No Configuration Required
If you don't set any configuration, the extension will automatically connect to a local Redis instance at `redis://127.0.0.1:6379/0`.
### Advanced SSL Configuration
For production environments with custom SSL certificates, you can use query parameters in the Redis URL:
```bash
export REDIS_URL=rediss://user:pass@host:6380/0?ssl_cert_reqs=required&ssl_ca_path=/path/to/ca.pem&ssl_keyfile=/path/to/key.pem&ssl_certfile=/path/to/cert.pem
```
Supported SSL query parameters:
- `ssl_cert_reqs` - Certificate requirements: "required", "optional", "none"
- `ssl_ca_certs` - Path to CA certificates file
- `ssl_ca_path` - Path to CA certificate file
- `ssl_keyfile` - Path to SSL private key file
- `ssl_certfile` - Path to SSL certificate file
For detailed configuration options and Redis URL format, see the main Redis MCP Server documentation.
```
--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------
```yaml
name: CI
on:
push:
branches:
- 'main'
- '[0-9].[0-9]'
pull_request:
branches:
- 'main'
- '[0-9].[0-9]'
schedule:
- cron: '0 1 * * *' # nightly build
permissions:
contents: read
jobs:
lint-and-format:
runs-on: ubuntu-latest
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set Python up and add dependencies
run: |
uv python install 3.12
uv sync --all-extras --dev
uv add --dev ruff mypy
- name: ⚙️ Run linters and formatters
run: |
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/
# uv run mypy src/ --ignore-missing-imports
security-scan:
runs-on: ubuntu-latest
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set Python up and add dependencies
run: |
uv python install 3.12
uv sync --all-extras --dev
uv add --dev bandit
- name: ⚙️ Run security scan with bandit
run: |
uv run bandit -r src/ -f json -o bandit-report.json || true
uv run bandit -r src/
- name: ⚙️ Upload security reports
uses: actions/upload-artifact@v5
if: always()
with:
name: security-reports
path: |
bandit-report.json
retention-days: 30
test-ubuntu:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
services:
redis:
image: redis:latest
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set Python ${{ matrix.python-version }} up and add dependencies
run: |
uv python install ${{ matrix.python-version }}
uv sync --all-extras --dev
uv add --dev pytest pytest-cov pytest-asyncio coverage
- name: ⚙️ Run tests with coverage
run: |
uv run pytest tests/ -v --cov=src --cov-report=xml --cov-report=html --cov-report=term
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
- name: ⚙️ Test MCP server startup
run: |
timeout 10s uv run python src/main.py || test $? = 124
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
- name: ⚙️ Upload coverage reports
uses: codecov/codecov-action@v5
if: matrix.python-version == '3.12'
with:
files: ./coverage.xml
flags: unittests
name: codecov-umbrella
test-other-os:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [windows-latest, macos-latest]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set Python ${{ matrix.python-version }} up and add dependencies
run: |
uv python install ${{ matrix.python-version }}
uv sync --all-extras --dev
uv add --dev pytest pytest-cov pytest-asyncio coverage
- name: ⚙️ Run tests (without Redis services)
run: |
uv run pytest tests/ -v
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
- name: ⚙️ Test MCP server startup (macOS)
run: |
brew install coreutils
gtimeout 10s uv run python src/main.py || test $? = 124
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
if: matrix.os == 'macos-latest'
- name: ⚙️ Test MCP server startup (Windows)
run: |
Start-Process -FilePath "uv" -ArgumentList "run", "python", "src/main.py" -PassThru | Wait-Process -Timeout 10 -ErrorAction SilentlyContinue
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
if: matrix.os == 'windows-latest'
build-test:
runs-on: ubuntu-latest
needs: [lint-and-format, security-scan, test-ubuntu, test-other-os]
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
with:
fetch-depth: 0 # Full history for UV build
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set up Python
run: uv python install 3.12
- name: ⚙️ Build package
run: |
uv build --sdist --wheel
- name: ⚙️ Check package
run: |
uv add --dev twine
uv run twine check dist/*
- name: ⚙️ Test package installation
run: |
uv venv test-env
source test-env/bin/activate
pip install dist/*.whl
redis-mcp-server --help
- name: ⚙️ Upload build artifacts
uses: actions/upload-artifact@v5
with:
name: dist-files
path: dist/
retention-days: 7
docker-test:
runs-on: ubuntu-latest
needs: [lint-and-format, security-scan]
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
- name: ⚙️ Build Docker image
run: docker build -t redis-mcp-server:test .
- name: ⚙️ Test Docker image
run: |
docker run --rm redis-mcp-server:test uv run python -c "import src.main; print('Docker build successful')"
```
--------------------------------------------------------------------------------
/src/tools/misc.py:
--------------------------------------------------------------------------------
```python
from typing import Any, Dict, Union, List
from redis.exceptions import RedisError
from src.common.connection import RedisConnectionManager
from src.common.server import mcp
@mcp.tool()
async def delete(key: str) -> str:
"""Delete a Redis key.
Args:
key (str): The key to delete.
Returns:
str: Confirmation message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
result = r.delete(key)
return f"Successfully deleted {key}" if result else f"Key {key} not found"
except RedisError as e:
return f"Error deleting key {key}: {str(e)}"
@mcp.tool()
async def type(key: str) -> Dict[str, Any]:
"""Returns the string representation of the type of the value stored at key
Args:
key (str): The key to check.
Returns:
str: The type of key, or none when key doesn't exist
"""
try:
r = RedisConnectionManager.get_connection()
key_type = r.type(key)
info = {"key": key, "type": key_type, "ttl": r.ttl(key)}
return info
except RedisError as e:
return {"error": str(e)}
@mcp.tool()
async def expire(name: str, expire_seconds: int) -> str:
"""Set an expiration time for a Redis key.
Args:
name: The Redis key.
expire_seconds: Time in seconds after which the key should expire.
Returns:
A success message or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
success = r.expire(name, expire_seconds)
return (
f"Expiration set to {expire_seconds} seconds for '{name}'."
if success
else f"Key '{name}' does not exist."
)
except RedisError as e:
return f"Error setting expiration for key '{name}': {str(e)}"
@mcp.tool()
async def rename(old_key: str, new_key: str) -> Dict[str, Any]:
"""
Renames a Redis key from old_key to new_key.
Args:
old_key (str): The current name of the Redis key to rename.
new_key (str): The new name to assign to the key.
Returns:
Dict[str, Any]: A dictionary containing the result of the operation.
On success: {"status": "success", "message": "..."}
On error: {"error": "..."}
"""
try:
r = RedisConnectionManager.get_connection()
# Check if the old key exists
if not r.exists(old_key):
return {"error": f"Key '{old_key}' does not exist."}
# Rename the key
r.rename(old_key, new_key)
return {
"status": "success",
"message": f"Renamed key '{old_key}' to '{new_key}'",
}
except RedisError as e:
return {"error": str(e)}
@mcp.tool()
async def scan_keys(
pattern: str = "*", count: int = 100, cursor: int = 0
) -> Union[str, Dict[str, Any]]:
"""
Scan keys in the Redis database using the SCAN command (non-blocking, production-safe).
⚠️ IMPORTANT: This returns PARTIAL results from one iteration. Use scan_all_keys()
to get ALL matching keys, or call this function multiple times with the returned cursor
until cursor becomes 0.
The SCAN command iterates through the keyspace in small chunks, making it safe to use
on large databases without blocking other operations.
Args:
pattern: Pattern to match keys against (default is "*" for all keys).
Common patterns: "user:*", "cache:*", "*:123", etc.
count: Hint for the number of keys to return per iteration (default 100).
Redis may return more or fewer keys than this hint.
cursor: The cursor position to start scanning from (0 to start from beginning).
To continue scanning, use the cursor value returned from previous call.
Returns:
A dictionary containing:
- 'cursor': Next cursor position (0 means scan is complete)
- 'keys': List of keys found in this iteration (PARTIAL RESULTS)
- 'total_scanned': Number of keys returned in this batch
- 'scan_complete': Boolean indicating if scan is finished
Or an error message if something goes wrong.
Example usage:
First call: scan_keys("user:*") -> returns cursor=1234, keys=[...], scan_complete=False
Next call: scan_keys("user:*", cursor=1234) -> continues from where it left off
Final call: returns cursor=0, scan_complete=True when done
"""
try:
r = RedisConnectionManager.get_connection()
cursor, keys = r.scan(cursor=cursor, match=pattern, count=count)
# Convert bytes to strings if needed
decoded_keys = [
key.decode("utf-8") if isinstance(key, bytes) else key for key in keys
]
return {
"cursor": cursor,
"keys": decoded_keys,
"total_scanned": len(decoded_keys),
"scan_complete": cursor == 0,
}
except RedisError as e:
return f"Error scanning keys with pattern '{pattern}': {str(e)}"
@mcp.tool()
async def scan_all_keys(
pattern: str = "*", batch_size: int = 100
) -> Union[str, List[str]]:
"""
Scan and return ALL keys matching a pattern using multiple SCAN iterations.
This function automatically handles the SCAN cursor iteration to collect all matching keys.
It's safer than KEYS * for large databases but will still collect all results in memory.
⚠️ WARNING: With very large datasets (millions of keys), this may consume significant memory.
For large-scale operations, consider using scan_keys() with manual iteration instead.
Args:
pattern: Pattern to match keys against (default is "*" for all keys).
batch_size: Number of keys to scan per iteration (default 100).
Returns:
A list of all keys matching the pattern or an error message.
"""
try:
r = RedisConnectionManager.get_connection()
all_keys = []
cursor = 0
while True:
cursor, keys = r.scan(cursor=cursor, match=pattern, count=batch_size)
# Convert bytes to strings if needed and add to results
decoded_keys = [
key.decode("utf-8") if isinstance(key, bytes) else key for key in keys
]
all_keys.extend(decoded_keys)
# Break when scan is complete (cursor returns to 0)
if cursor == 0:
break
return all_keys
except RedisError as e:
return f"Error scanning all keys with pattern '{pattern}': {str(e)}"
```
--------------------------------------------------------------------------------
/tests/tools/test_string.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/tools/string.py
"""
from unittest.mock import Mock, patch
import pytest
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from src.tools.string import get, set
class TestStringOperations:
"""Test cases for Redis string operations."""
@pytest.mark.asyncio
async def test_set_success(self, mock_redis_connection_manager):
"""Test successful string set operation."""
mock_redis = mock_redis_connection_manager
mock_redis.set.return_value = True
result = await set("test_key", "test_value")
mock_redis.set.assert_called_once_with("test_key", b"test_value")
assert "Successfully set test_key" in result
@pytest.mark.asyncio
async def test_set_with_expiration(self, mock_redis_connection_manager):
"""Test string set operation with expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.setex.return_value = True
result = await set("test_key", "test_value", 60)
mock_redis.setex.assert_called_once_with("test_key", 60, b"test_value")
assert "Successfully set test_key" in result
assert "with expiration 60 seconds" in result
@pytest.mark.asyncio
async def test_set_redis_error(self, mock_redis_connection_manager):
"""Test string set operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.set.side_effect = RedisError("Connection failed")
result = await set("test_key", "test_value")
assert "Error setting key test_key: Connection failed" in result
@pytest.mark.asyncio
async def test_set_connection_error(self, mock_redis_connection_manager):
"""Test string set operation with connection error."""
mock_redis = mock_redis_connection_manager
mock_redis.set.side_effect = ConnectionError("Redis server unavailable")
result = await set("test_key", "test_value")
assert "Error setting key test_key: Redis server unavailable" in result
@pytest.mark.asyncio
async def test_set_timeout_error(self, mock_redis_connection_manager):
"""Test string set operation with timeout error."""
mock_redis = mock_redis_connection_manager
mock_redis.setex.side_effect = TimeoutError("Operation timed out")
result = await set("test_key", "test_value", 30)
assert "Error setting key test_key: Operation timed out" in result
@pytest.mark.asyncio
async def test_get_success(self, mock_redis_connection_manager):
"""Test successful string get operation."""
mock_redis = mock_redis_connection_manager
mock_redis.get.return_value = "test_value"
result = await get("test_key")
mock_redis.get.assert_called_once_with("test_key")
assert result == "test_value"
@pytest.mark.asyncio
async def test_get_key_not_found(self, mock_redis_connection_manager):
"""Test string get operation when key doesn't exist."""
mock_redis = mock_redis_connection_manager
mock_redis.get.return_value = None
result = await get("nonexistent_key")
mock_redis.get.assert_called_once_with("nonexistent_key")
assert "Key nonexistent_key does not exist" in result
@pytest.mark.asyncio
async def test_get_redis_error(self, mock_redis_connection_manager):
"""Test string get operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.get.side_effect = RedisError("Connection failed")
result = await get("test_key")
assert "Error retrieving key test_key: Connection failed" in result
@pytest.mark.asyncio
async def test_get_empty_string_value(self, mock_redis_connection_manager):
"""Test string get operation returning empty string."""
mock_redis = mock_redis_connection_manager
mock_redis.get.return_value = b"" # Redis returns bytes
result = await get("test_key")
# The implementation correctly handles empty bytes and returns empty string
assert result == ""
@pytest.mark.asyncio
async def test_set_with_zero_expiration(self, mock_redis_connection_manager):
"""Test string set operation with zero expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.set.return_value = True
result = await set("test_key", "test_value", 0)
# Should use regular set, not setex for zero expiration
mock_redis.set.assert_called_once_with("test_key", b"test_value")
assert "Successfully set test_key" in result
@pytest.mark.asyncio
async def test_set_with_negative_expiration(self, mock_redis_connection_manager):
"""Test string set operation with negative expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.setex.return_value = True
result = await set("test_key", "test_value", -1)
# Negative expiration is truthy in Python, so setex is called
mock_redis.setex.assert_called_once_with("test_key", -1, b"test_value")
assert "Successfully set test_key" in result
assert "with expiration -1 seconds" in result
@pytest.mark.asyncio
async def test_set_with_large_expiration(self, mock_redis_connection_manager):
"""Test string set operation with large expiration value."""
mock_redis = mock_redis_connection_manager
mock_redis.setex.return_value = True
result = await set("test_key", "test_value", 86400) # 24 hours
mock_redis.setex.assert_called_once_with("test_key", 86400, b"test_value")
assert "with expiration 86400 seconds" in result
@pytest.mark.asyncio
async def test_get_with_special_characters(self, mock_redis_connection_manager):
"""Test string get operation with special characters in key."""
mock_redis = mock_redis_connection_manager
mock_redis.get.return_value = "special_value"
special_key = "test:key:with:colons"
result = await get(special_key)
mock_redis.get.assert_called_once_with(special_key)
assert result == "special_value"
@pytest.mark.asyncio
async def test_set_with_unicode_value(self, mock_redis_connection_manager):
"""Test string set operation with unicode value."""
mock_redis = mock_redis_connection_manager
mock_redis.set.return_value = True
unicode_value = "测试值 🚀"
result = await set("test_key", unicode_value)
mock_redis.set.assert_called_once_with(
"test_key", unicode_value.encode("utf-8")
)
assert "Successfully set test_key" in result
@pytest.mark.asyncio
async def test_connection_manager_called_correctly(self):
"""Test that RedisConnectionManager.get_connection is called correctly."""
with patch(
"src.tools.string.RedisConnectionManager.get_connection"
) as mock_get_conn:
mock_redis = Mock()
mock_redis.set.return_value = True
mock_get_conn.return_value = mock_redis
await set("test_key", "test_value")
mock_get_conn.assert_called_once()
# Verify the actual call was made with bytes
mock_redis.set.assert_called_once_with("test_key", b"test_value")
@pytest.mark.asyncio
async def test_function_signatures(self):
"""Test that functions have correct signatures."""
import inspect
# Test set function signature
set_sig = inspect.signature(set)
set_params = list(set_sig.parameters.keys())
assert set_params == ["key", "value", "expiration"]
assert set_sig.parameters["expiration"].default is None
# Test get function signature
get_sig = inspect.signature(get)
get_params = list(get_sig.parameters.keys())
assert get_params == ["key"]
```
--------------------------------------------------------------------------------
/src/common/config.py:
--------------------------------------------------------------------------------
```python
import os
import urllib.parse
from dotenv import load_dotenv
load_dotenv()
# Default values for Entra ID authentication
DEFAULT_TOKEN_EXPIRATION_REFRESH_RATIO = 0.9
DEFAULT_LOWER_REFRESH_BOUND_MILLIS = 30000 # 30 seconds
DEFAULT_TOKEN_REQUEST_EXECUTION_TIMEOUT_MS = 10000 # 10 seconds
DEFAULT_RETRY_MAX_ATTEMPTS = 3
DEFAULT_RETRY_DELAY_MS = 100
REDIS_CFG = {
"host": os.getenv("REDIS_HOST", "127.0.0.1"),
"port": int(os.getenv("REDIS_PORT", 6379)),
"username": os.getenv("REDIS_USERNAME", None),
"password": os.getenv("REDIS_PWD", ""),
"ssl": os.getenv("REDIS_SSL", False) in ("true", "1", "t"),
"ssl_ca_path": os.getenv("REDIS_SSL_CA_PATH", None),
"ssl_keyfile": os.getenv("REDIS_SSL_KEYFILE", None),
"ssl_certfile": os.getenv("REDIS_SSL_CERTFILE", None),
"ssl_cert_reqs": os.getenv("REDIS_SSL_CERT_REQS", "required"),
"ssl_ca_certs": os.getenv("REDIS_SSL_CA_CERTS", None),
"cluster_mode": os.getenv("REDIS_CLUSTER_MODE", False) in ("true", "1", "t"),
"db": int(os.getenv("REDIS_DB", 0)),
}
# Entra ID Authentication Configuration
ENTRAID_CFG = {
# Authentication flow selection
"auth_flow": os.getenv(
"REDIS_ENTRAID_AUTH_FLOW", None
), # service_principal, managed_identity, default_credential
# Service Principal Authentication
"client_id": os.getenv("REDIS_ENTRAID_CLIENT_ID", None),
"client_secret": os.getenv("REDIS_ENTRAID_CLIENT_SECRET", None),
"tenant_id": os.getenv("REDIS_ENTRAID_TENANT_ID", None),
# Managed Identity Authentication
"identity_type": os.getenv(
"REDIS_ENTRAID_IDENTITY_TYPE", "system_assigned"
), # system_assigned, user_assigned
"user_assigned_identity_client_id": os.getenv(
"REDIS_ENTRAID_USER_ASSIGNED_CLIENT_ID", None
),
# Default Azure Credential Authentication
"scopes": os.getenv("REDIS_ENTRAID_SCOPES", "https://redis.azure.com/.default"),
# Token lifecycle configuration
"token_expiration_refresh_ratio": float(
os.getenv(
"REDIS_ENTRAID_TOKEN_EXPIRATION_REFRESH_RATIO",
DEFAULT_TOKEN_EXPIRATION_REFRESH_RATIO,
)
),
"lower_refresh_bound_millis": int(
os.getenv(
"REDIS_ENTRAID_LOWER_REFRESH_BOUND_MILLIS",
DEFAULT_LOWER_REFRESH_BOUND_MILLIS,
)
),
"token_request_execution_timeout_ms": int(
os.getenv(
"REDIS_ENTRAID_TOKEN_REQUEST_EXECUTION_TIMEOUT_MS",
DEFAULT_TOKEN_REQUEST_EXECUTION_TIMEOUT_MS,
)
),
# Retry configuration
"retry_max_attempts": int(
os.getenv("REDIS_ENTRAID_RETRY_MAX_ATTEMPTS", DEFAULT_RETRY_MAX_ATTEMPTS)
),
"retry_delay_ms": int(
os.getenv("REDIS_ENTRAID_RETRY_DELAY_MS", DEFAULT_RETRY_DELAY_MS)
),
# Resource configuration
"resource": os.getenv("REDIS_ENTRAID_RESOURCE", "https://redis.azure.com/"),
}
def parse_redis_uri(uri: str) -> dict:
"""Parse a Redis URI and return connection parameters."""
parsed = urllib.parse.urlparse(uri)
config = {}
# Scheme determines SSL
if parsed.scheme == "rediss":
config["ssl"] = True
elif parsed.scheme == "redis":
config["ssl"] = False
else:
raise ValueError(f"Unsupported scheme: {parsed.scheme}")
# Host and port
config["host"] = parsed.hostname or "127.0.0.1"
config["port"] = parsed.port or 6379
# Database
if parsed.path and parsed.path != "/":
try:
config["db"] = int(parsed.path.lstrip("/"))
except ValueError:
config["db"] = 0
else:
config["db"] = 0
# Authentication
if parsed.username:
config["username"] = parsed.username
if parsed.password:
config["password"] = parsed.password
# Parse query parameters for SSL and other options
if parsed.query:
query_params = urllib.parse.parse_qs(parsed.query)
# Handle SSL parameters
if "ssl_cert_reqs" in query_params:
config["ssl_cert_reqs"] = query_params["ssl_cert_reqs"][0]
if "ssl_ca_certs" in query_params:
config["ssl_ca_certs"] = query_params["ssl_ca_certs"][0]
if "ssl_ca_path" in query_params:
config["ssl_ca_path"] = query_params["ssl_ca_path"][0]
if "ssl_keyfile" in query_params:
config["ssl_keyfile"] = query_params["ssl_keyfile"][0]
if "ssl_certfile" in query_params:
config["ssl_certfile"] = query_params["ssl_certfile"][0]
# Handle other parameters. According to https://www.iana.org/assignments/uri-schemes/prov/redis,
# The database number to use for the Redis SELECT command comes from
# either the "db-number" portion of the URI (described in the previous
# section) or the value from the key-value pair from the "query" URI
# field with the key "db". If neither of these are present, the
# default database number is 0.
if "db" in query_params:
try:
config["db"] = int(query_params["db"][0])
except ValueError:
pass
return config
def set_redis_config_from_cli(config: dict):
for key, value in config.items():
if key in ["port", "db"]:
# Keep port and db as integers
REDIS_CFG[key] = int(value)
elif key == "ssl" or key == "cluster_mode":
# Keep ssl and cluster_mode as booleans
REDIS_CFG[key] = bool(value)
elif isinstance(value, bool):
# Convert other booleans to strings for environment compatibility
REDIS_CFG[key] = "true" if value else "false"
else:
# Convert other values to strings
REDIS_CFG[key] = str(value) if value is not None else None
def set_entraid_config_from_cli(config: dict):
"""Update Entra ID configuration from CLI parameters."""
for key, value in config.items():
if value is not None:
if key in ["token_expiration_refresh_ratio"]:
# Keep float values as floats
ENTRAID_CFG[key] = float(value)
elif key in [
"lower_refresh_bound_millis",
"token_request_execution_timeout_ms",
"retry_max_attempts",
"retry_delay_ms",
]:
# Keep integer values as integers
ENTRAID_CFG[key] = int(value)
else:
# Convert other values to strings
ENTRAID_CFG[key] = str(value)
def is_entraid_auth_enabled() -> bool:
"""Check if Entra ID authentication is enabled."""
return ENTRAID_CFG["auth_flow"] is not None
def get_entraid_auth_flow() -> str:
"""Get the configured Entra ID authentication flow."""
return ENTRAID_CFG["auth_flow"]
def validate_entraid_config() -> tuple[bool, str]:
"""Validate Entra ID configuration based on the selected auth flow.
Returns:
tuple: (is_valid, error_message)
"""
auth_flow = ENTRAID_CFG["auth_flow"]
if not auth_flow:
return True, "" # No Entra ID auth configured, which is valid
if auth_flow == "service_principal":
required_fields = ["client_id", "client_secret", "tenant_id"]
missing_fields = [field for field in required_fields if not ENTRAID_CFG[field]]
if missing_fields:
return (
False,
f"Service principal authentication requires: {', '.join(missing_fields)}",
)
elif auth_flow == "managed_identity":
identity_type = ENTRAID_CFG["identity_type"]
if (
identity_type == "user_assigned"
and not ENTRAID_CFG["user_assigned_identity_client_id"]
):
return (
False,
"User-assigned managed identity requires user_assigned_identity_client_id",
)
elif auth_flow == "default_credential":
# Default credential doesn't require specific configuration
pass
else:
return (
False,
f"Invalid auth_flow: {auth_flow}. Must be one of: service_principal, managed_identity, default_credential",
)
return True, ""
```
--------------------------------------------------------------------------------
/tests/test_main.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/main.py
"""
import logging
from unittest.mock import Mock, patch
import pytest
from click.testing import CliRunner
from src.main import RedisMCPServer, cli
class TestRedisMCPServer:
"""Test cases for RedisMCPServer class."""
def test_init_logs_startup_message(self, capsys, caplog, monkeypatch):
"""Startup should emit an INFO log; client may route it via handlers.
Accept either stderr output or log record text.
"""
monkeypatch.setenv("MCP_REDIS_LOG_LEVEL", "INFO")
with caplog.at_level(logging.INFO):
server = RedisMCPServer()
assert server is not None
captured = capsys.readouterr()
stderr_text = captured.err or ""
log_text = caplog.text or "" # collected by pytest logging handler
combined = stderr_text + "\n" + log_text
assert "Starting the Redis MCP Server" in combined
@patch("src.main.mcp.run")
def test_run_calls_mcp_run(self, mock_mcp_run):
"""Test that RedisMCPServer.run() calls mcp.run()."""
server = RedisMCPServer()
server.run()
mock_mcp_run.assert_called_once()
@patch("src.main.mcp.run")
def test_run_propagates_exceptions(self, mock_mcp_run):
"""Test that exceptions from mcp.run() are propagated."""
mock_mcp_run.side_effect = Exception("MCP run failed")
server = RedisMCPServer()
with pytest.raises(Exception, match="MCP run failed"):
server.run()
class TestCLI:
"""Test cases for CLI interface."""
def setup_method(self):
"""Set up test fixtures."""
self.runner = CliRunner()
@patch("src.main.parse_redis_uri")
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_with_url_parameter(
self, mock_server_class, mock_set_config, mock_parse_uri
):
"""Test CLI with --url parameter."""
mock_parse_uri.return_value = {"host": "localhost", "port": 6379}
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(cli, ["--url", "redis://localhost:6379/0"])
assert result.exit_code == 0
mock_parse_uri.assert_called_once_with("redis://localhost:6379/0")
mock_set_config.assert_called_once_with({"host": "localhost", "port": 6379})
mock_server_class.assert_called_once()
mock_server.run.assert_called_once()
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_with_individual_parameters(self, mock_server_class, mock_set_config):
"""Test CLI with individual connection parameters."""
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(
cli,
[
"--host",
"redis.example.com",
"--port",
"6380",
"--db",
"1",
"--username",
"testuser",
"--password",
"testpass",
"--ssl",
],
)
assert result.exit_code == 0
mock_set_config.assert_called_once()
# Verify the config passed to set_redis_config_from_cli
call_args = mock_set_config.call_args[0][0]
assert call_args["host"] == "redis.example.com"
assert call_args["port"] == 6380
assert call_args["db"] == 1
assert call_args["username"] == "testuser"
assert call_args["password"] == "testpass"
assert call_args["ssl"] is True
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_with_ssl_parameters(self, mock_server_class, mock_set_config):
"""Test CLI with SSL-specific parameters."""
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(
cli,
[
"--ssl",
"--ssl-ca-path",
"/path/to/ca.pem",
"--ssl-keyfile",
"/path/to/key.pem",
"--ssl-certfile",
"/path/to/cert.pem",
"--ssl-cert-reqs",
"optional",
"--ssl-ca-certs",
"/path/to/ca-bundle.pem",
],
)
assert result.exit_code == 0
call_args = mock_set_config.call_args[0][0]
assert call_args["ssl"] is True
assert call_args["ssl_ca_path"] == "/path/to/ca.pem"
assert call_args["ssl_keyfile"] == "/path/to/key.pem"
assert call_args["ssl_certfile"] == "/path/to/cert.pem"
assert call_args["ssl_cert_reqs"] == "optional"
assert call_args["ssl_ca_certs"] == "/path/to/ca-bundle.pem"
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_with_cluster_mode(self, mock_server_class, mock_set_config):
"""Test CLI with cluster mode enabled."""
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(cli, ["--cluster-mode"])
assert result.exit_code == 0
call_args = mock_set_config.call_args[0][0]
assert call_args["cluster_mode"] is True
@patch("src.main.parse_redis_uri")
def test_cli_with_invalid_url(self, mock_parse_uri):
"""Test CLI with invalid Redis URL."""
mock_parse_uri.side_effect = ValueError("Invalid Redis URI")
result = self.runner.invoke(cli, ["--url", "invalid://url"])
assert result.exit_code != 0
assert "Invalid Redis URI" in result.output
@patch("src.main.RedisMCPServer")
def test_cli_server_initialization_failure(self, mock_server_class):
"""Test CLI when server initialization fails."""
mock_server_class.side_effect = Exception("Server init failed")
result = self.runner.invoke(cli, [])
assert result.exit_code != 0
@patch("src.main.RedisMCPServer")
def test_cli_server_run_failure(self, mock_server_class):
"""Test CLI when server run fails."""
mock_server = Mock()
mock_server.run.side_effect = Exception("Server run failed")
mock_server_class.return_value = mock_server
result = self.runner.invoke(cli, [])
assert result.exit_code != 0
def test_cli_help(self):
"""Test CLI help output."""
result = self.runner.invoke(cli, ["--help"])
assert result.exit_code == 0
assert "Redis connection URI" in result.output
assert "--host" in result.output
assert "--port" in result.output
assert "--ssl" in result.output
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_default_values(self, mock_server_class, mock_set_config):
"""Test CLI with default values."""
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(cli, [])
assert result.exit_code == 0
# Should be called with empty config when no parameters provided
mock_set_config.assert_called_once()
call_args = mock_set_config.call_args[0][0]
# Check that only non-None values are in the config
for key, value in call_args.items():
if value is not None:
# These should be the default values or explicitly set values
assert isinstance(value, (str, int, bool))
@patch("src.main.parse_redis_uri")
@patch("src.main.set_redis_config_from_cli")
@patch("src.main.RedisMCPServer")
def test_cli_url_overrides_individual_params(
self, mock_server_class, mock_set_config, mock_parse_uri
):
"""Test that --url parameter takes precedence over individual parameters."""
mock_parse_uri.return_value = {"host": "uri-host", "port": 9999}
mock_server = Mock()
mock_server_class.return_value = mock_server
result = self.runner.invoke(
cli,
[
"--url",
"redis://uri-host:9999/0",
"--host",
"individual-host",
"--port",
"6379",
],
)
assert result.exit_code == 0
mock_parse_uri.assert_called_once_with("redis://uri-host:9999/0")
# Should use URI config, not individual parameters
call_args = mock_set_config.call_args[0][0]
assert call_args["host"] == "uri-host"
assert call_args["port"] == 9999
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
name: Release to PyPI
on:
release:
types: [published]
workflow_dispatch:
inputs:
version:
description: 'Version to release (e.g., 0.3.0)'
required: true
type: string
environment:
description: 'Target environment'
required: true
default: 'pypi'
type: choice
options:
- pypi
- testpypi
dry_run:
description: 'Dry run (build only, do not publish)'
required: false
default: false
type: boolean
permissions:
contents: read
jobs:
validate-release:
runs-on: ubuntu-latest
outputs:
version: ${{ steps.get-version.outputs.version }}
tag-version: ${{ steps.get-version.outputs.tag-version }}
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
with:
fetch-depth: 0 # Full history for UV build
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set up Python
run: uv python install 3.12
security-scan:
runs-on: ubuntu-latest
needs: validate-release
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
with:
fetch-depth: 0
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set Python up and add dependencies
run: |
uv python install 3.12
uv sync --all-extras --dev
uv add --dev bandit
- name: ⚙️ Run security scan with bandit
run: |
uv run bandit -r src/
test:
runs-on: ubuntu-latest
needs: validate-release
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
services:
redis:
image: redis:latest
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
with:
fetch-depth: 0
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set up Python ${{ matrix.python-version }}
run: |
uv python install ${{ matrix.python-version }}
uv sync --all-extras --dev
- name: ⚙️ Run tests
run: uv run pytest tests/ -v --tb=short
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
- name: ⚙️ Test MCP server startup
run: |
timeout 10s uv run python src/main.py || test $? = 124
env:
REDIS_HOST: localhost
REDIS_PORT: 6379
build-and-publish:
runs-on: ubuntu-latest
needs: [validate-release, security-scan, test]
environment:
name: ${{ github.event.inputs.environment || 'pypi' }}
url: ${{ github.event.inputs.environment == 'testpypi' && 'https://test.pypi.org/p/redis-mcp-server' || 'https://pypi.org/p/redis-mcp-server' }}
permissions:
id-token: write # IMPORTANT: mandatory for trusted publishing
contents: read
attestations: write # For build attestations
steps:
- name: ⚙️ Harden Runner
uses: step-security/harden-runner@v2
with:
egress-policy: audit
- name: ⚙️ Checkout the project
uses: actions/checkout@v5
with:
fetch-depth: 0 # Full history for UV build
- name: ⚙️ Install uv
uses: astral-sh/setup-uv@v7
with:
version: "latest"
enable-cache: false
- name: ⚙️ Set up Python
run: uv python install 3.12
- name: ⚙️ Override version in pyproject.toml
if: (github.event_name == 'workflow_dispatch' && github.event.inputs.version) || github.event_name == 'release'
run: |
# Determine the target version
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
TARGET_VERSION="${{ github.event.inputs.version }}"
echo "Overriding version from manual trigger: $TARGET_VERSION"
elif [[ "${{ github.event_name }}" == "release" ]]; then
RELEASE_TAG="${{ github.event.release.tag_name }}"
TARGET_VERSION=$(echo "$RELEASE_TAG" | sed 's/^v//')
echo "Overriding version from release tag: $TARGET_VERSION (tag: $RELEASE_TAG)"
fi
# Get current version for comparison
CURRENT_VERSION=$(grep '^version = ' pyproject.toml | sed 's/version = "\(.*\)"/\1/')
echo "Current version in pyproject.toml: $CURRENT_VERSION"
# Check if override is needed
if [[ "$CURRENT_VERSION" == "$TARGET_VERSION" ]]; then
echo "Version already matches target: $TARGET_VERSION"
else
echo "Version override needed: $CURRENT_VERSION → $TARGET_VERSION"
# Update version in pyproject.toml
sed -i "s/^version = \".*\"/version = \"$TARGET_VERSION\"/" pyproject.toml
# Verify the change
NEW_VERSION=$(grep '^version = ' pyproject.toml | sed 's/version = "\(.*\)"/\1/')
echo "Updated version in pyproject.toml: $NEW_VERSION"
# Validate the change was successful
if [[ "$NEW_VERSION" != "$TARGET_VERSION" ]]; then
echo "Version override failed! Expected: $TARGET_VERSION, Got: $NEW_VERSION"
exit 1
fi
echo "Version successfully changed: $CURRENT_VERSION → $NEW_VERSION"
fi
- name: ⚙️ Build package
run: |
uv build --sdist --wheel
- name: ⚙️ Check package
run: |
uv add --dev twine
uv run twine check dist/*
- name: ⚙️ Generate build attestation
uses: actions/attest-build-provenance@v3
with:
subject-path: 'dist/*'
- name: ⚙️ Publish to PyPI
if: ${{ !inputs.dry_run }}
uses: pypa/gh-action-pypi-publish@release/v1
with:
repository-url: ${{ github.event.inputs.environment == 'testpypi' && 'https://test.pypi.org/legacy/' || '' }}
print-hash: true
attestations: true
- name: ⚙️ Dry run - Package ready for publishing
if: ${{ inputs.dry_run }}
run: |
echo "🔍 DRY RUN MODE - Package built successfully but not published"
echo "📦 Built packages:"
ls -la dist/
echo ""
echo "✅ Package is ready for publishing to ${{ github.event.inputs.environment || 'pypi' }}"
- name: ⚙️ Upload build artifacts
uses: actions/upload-artifact@v5
with:
name: dist-${{ needs.validate-release.outputs.version }}
path: dist/
retention-days: 90
notify-success:
runs-on: ubuntu-latest
needs: [validate-release, build-and-publish]
if: success()
steps:
- name: ⚙️ Success notification
run: |
if [[ "${{ inputs.dry_run }}" == "true" ]]; then
echo "🔍 DRY RUN COMPLETED - Redis MCP Server v${{ github.event.inputs.version || needs.validate-release.outputs.version }} ready for release!"
echo "📦 Package built successfully but not published"
echo "🎯 Target environment: ${{ github.event.inputs.environment || 'pypi' }}"
else
echo "🎉 Successfully released Redis MCP Server v${{ github.event.inputs.version || needs.validate-release.outputs.version }} to ${{ github.event.inputs.environment || 'PyPI' }}!"
if [[ "${{ github.event.inputs.environment }}" == "testpypi" ]]; then
echo "📦 Package: https://test.pypi.org/project/redis-mcp-server/${{ github.event.inputs.version || needs.validate-release.outputs.version }}/"
else
echo "📦 Package: https://pypi.org/project/redis-mcp-server/${{ github.event.inputs.version || needs.validate-release.outputs.version }}/"
fi
if [[ "${{ github.event_name }}" == "release" ]]; then
echo "🏷️ Release: https://github.com/${{ github.repository }}/releases/tag/${{ github.ref_name }}"
else
echo "🚀 Manual release triggered by: ${{ github.actor }}"
fi
fi
```
--------------------------------------------------------------------------------
/tests/test_config.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/common/config.py
"""
import os
from unittest.mock import patch
import pytest
from src.common.config import REDIS_CFG, parse_redis_uri, set_redis_config_from_cli
class TestParseRedisURI:
"""Test cases for parse_redis_uri function."""
def test_parse_basic_redis_uri(self):
"""Test parsing basic Redis URI."""
uri = "redis://localhost:6379/0"
result = parse_redis_uri(uri)
expected = {"ssl": False, "host": "localhost", "port": 6379, "db": 0}
assert result == expected
def test_parse_redis_uri_with_auth(self):
"""Test parsing Redis URI with authentication."""
uri = "redis://user:pass@localhost:6379/1"
result = parse_redis_uri(uri)
expected = {
"ssl": False,
"host": "localhost",
"port": 6379,
"db": 1,
"username": "user",
"password": "pass",
}
assert result == expected
def test_parse_rediss_uri(self):
"""Test parsing Redis SSL URI."""
uri = "rediss://user:[email protected]:6380/2"
result = parse_redis_uri(uri)
expected = {
"ssl": True,
"host": "redis.example.com",
"port": 6380,
"db": 2,
"username": "user",
"password": "pass",
}
assert result == expected
def test_parse_uri_with_query_parameters(self):
"""Test parsing URI with query parameters."""
uri = "redis://localhost:6379/0?ssl_cert_reqs=optional&ssl_ca_certs=/path/to/ca.pem"
result = parse_redis_uri(uri)
assert result["ssl"] is False
assert result["host"] == "localhost"
assert result["port"] == 6379
assert result["db"] == 0
assert result["ssl_cert_reqs"] == "optional"
assert result["ssl_ca_certs"] == "/path/to/ca.pem"
def test_parse_uri_with_db_in_query(self):
"""Test parsing URI with database number in query parameters."""
uri = "redis://localhost:6379?db=5"
result = parse_redis_uri(uri)
assert result["db"] == 5
def test_parse_uri_with_ssl_parameters(self):
"""Test parsing URI with SSL-related query parameters."""
uri = "rediss://localhost:6379/0?ssl_keyfile=/key.pem&ssl_certfile=/cert.pem&ssl_ca_path=/ca.pem"
result = parse_redis_uri(uri)
assert result["ssl"] is True
assert result["ssl_keyfile"] == "/key.pem"
assert result["ssl_certfile"] == "/cert.pem"
assert result["ssl_ca_path"] == "/ca.pem"
def test_parse_uri_defaults(self):
"""Test parsing URI with default values."""
uri = "redis://example.com"
result = parse_redis_uri(uri)
assert result["host"] == "example.com"
assert result["port"] == 6379 # Default port
assert result["db"] == 0 # Default database
def test_parse_uri_no_path(self):
"""Test parsing URI without path."""
uri = "redis://localhost:6379"
result = parse_redis_uri(uri)
assert result["db"] == 0
def test_parse_uri_root_path(self):
"""Test parsing URI with root path."""
uri = "redis://localhost:6379/"
result = parse_redis_uri(uri)
assert result["db"] == 0
def test_parse_uri_invalid_db_in_path(self):
"""Test parsing URI with invalid database number in path."""
uri = "redis://localhost:6379/invalid"
result = parse_redis_uri(uri)
assert result["db"] == 0 # Should default to 0
def test_parse_uri_invalid_db_in_query(self):
"""Test parsing URI with invalid database number in query."""
uri = "redis://localhost:6379?db=invalid"
result = parse_redis_uri(uri)
# Should not have db key or should be handled gracefully
assert "db" not in result or result["db"] == 0
def test_parse_uri_unsupported_scheme(self):
"""Test parsing URI with unsupported scheme."""
uri = "http://localhost:6379/0"
with pytest.raises(ValueError, match="Unsupported scheme: http"):
parse_redis_uri(uri)
class TestSetRedisConfigFromCLI:
"""Test cases for set_redis_config_from_cli function."""
def setup_method(self):
"""Set up test fixtures."""
# Store original config
self.original_config = REDIS_CFG.copy()
def teardown_method(self):
"""Restore original config."""
REDIS_CFG.clear()
REDIS_CFG.update(self.original_config)
def test_set_string_values(self):
"""Test setting string configuration values."""
config = {
"host": "redis.example.com",
"username": "testuser",
"password": "testpass",
}
set_redis_config_from_cli(config)
assert REDIS_CFG["host"] == "redis.example.com"
assert REDIS_CFG["username"] == "testuser"
assert REDIS_CFG["password"] == "testpass"
def test_set_integer_values(self):
"""Test setting integer configuration values."""
config = {"port": 6380, "db": 2}
set_redis_config_from_cli(config)
assert REDIS_CFG["port"] == 6380
assert isinstance(REDIS_CFG["port"], int)
assert REDIS_CFG["db"] == 2
assert isinstance(REDIS_CFG["db"], int)
def test_set_boolean_values(self):
"""Test setting boolean configuration values."""
config = {"ssl": True, "cluster_mode": False}
set_redis_config_from_cli(config)
assert REDIS_CFG["ssl"] is True
assert isinstance(REDIS_CFG["ssl"], bool)
assert REDIS_CFG["cluster_mode"] is False
assert isinstance(REDIS_CFG["cluster_mode"], bool)
def test_set_none_values(self):
"""Test setting None configuration values."""
config = {"ssl_ca_path": None, "ssl_keyfile": None}
set_redis_config_from_cli(config)
assert REDIS_CFG["ssl_ca_path"] is None
assert REDIS_CFG["ssl_keyfile"] is None
def test_set_mixed_values(self):
"""Test setting mixed configuration values."""
config = {
"host": "localhost",
"port": 6379,
"ssl": True,
"ssl_ca_path": "/path/to/ca.pem",
"cluster_mode": False,
"username": None,
}
set_redis_config_from_cli(config)
assert REDIS_CFG["host"] == "localhost"
assert REDIS_CFG["port"] == 6379
assert REDIS_CFG["ssl"] is True
assert REDIS_CFG["ssl_ca_path"] == "/path/to/ca.pem"
assert REDIS_CFG["cluster_mode"] is False
assert REDIS_CFG["username"] is None
def test_convert_string_integers(self):
"""Test converting string integers to integers."""
config = {"port": "6380", "db": "1"}
set_redis_config_from_cli(config)
assert REDIS_CFG["port"] == 6380
assert isinstance(REDIS_CFG["port"], int)
assert REDIS_CFG["db"] == 1
assert isinstance(REDIS_CFG["db"], int)
def test_convert_other_booleans_to_strings(self):
"""Test converting non-ssl/cluster_mode booleans to strings."""
# This tests the behavior where other boolean values are converted to strings
# for environment compatibility
config = {"some_other_bool": True}
set_redis_config_from_cli(config)
# This would be converted to string for environment compatibility
assert REDIS_CFG["some_other_bool"] == "true"
def test_empty_config(self):
"""Test setting empty configuration."""
original_config = REDIS_CFG.copy()
config = {}
set_redis_config_from_cli(config)
# Config should remain unchanged
assert REDIS_CFG == original_config
@patch.dict(os.environ, {}, clear=True)
class TestRedisConfigDefaults:
"""Test cases for REDIS_CFG default values."""
@patch("src.common.config.load_dotenv")
def test_default_config_values(self, mock_load_dotenv):
"""Test default configuration values when no environment variables are set."""
# Re-import to get fresh config
import importlib
import src.common.config
importlib.reload(src.common.config)
config = src.common.config.REDIS_CFG
assert config["host"] == "127.0.0.1"
assert config["port"] == 6379
assert config["username"] is None
assert config["password"] == ""
assert config["ssl"] is False
assert config["cluster_mode"] is False
assert config["db"] == 0
@patch.dict(
os.environ,
{
"REDIS_HOST": "redis.example.com",
"REDIS_PORT": "6380",
"REDIS_SSL": "true",
"REDIS_CLUSTER_MODE": "1",
},
)
@patch("src.common.config.load_dotenv")
def test_config_from_environment(self, mock_load_dotenv):
"""Test configuration loading from environment variables."""
# Re-import to get fresh config
import importlib
import src.common.config
importlib.reload(src.common.config)
config = src.common.config.REDIS_CFG
assert config["host"] == "redis.example.com"
assert config["port"] == 6380
assert config["ssl"] is True
assert config["cluster_mode"] is True
```
--------------------------------------------------------------------------------
/tests/tools/test_set.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/tools/set.py
"""
from unittest.mock import Mock, patch
import pytest
from redis.exceptions import RedisError
from src.tools.set import sadd, smembers, srem
class TestSetOperations:
"""Test cases for Redis set operations."""
@pytest.mark.asyncio
async def test_sadd_success(self, mock_redis_connection_manager):
"""Test successful set add operation."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1 # Number of elements added
result = await sadd("test_set", "member1")
mock_redis.sadd.assert_called_once_with("test_set", "member1")
assert "Value 'member1' added successfully to set 'test_set'" in result
@pytest.mark.asyncio
async def test_sadd_with_expiration(self, mock_redis_connection_manager):
"""Test set add operation with expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1
mock_redis.expire.return_value = True
result = await sadd("test_set", "member1", 60)
mock_redis.sadd.assert_called_once_with("test_set", "member1")
mock_redis.expire.assert_called_once_with("test_set", 60)
assert "Expires in 60 seconds" in result
@pytest.mark.asyncio
async def test_sadd_member_already_exists(self, mock_redis_connection_manager):
"""Test set add operation when member already exists."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 0 # Member already exists
result = await sadd("test_set", "existing_member")
assert "Value 'existing_member' added successfully to set 'test_set'" in result
@pytest.mark.asyncio
async def test_sadd_redis_error(self, mock_redis_connection_manager):
"""Test set add operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.side_effect = RedisError("Connection failed")
result = await sadd("test_set", "member1")
assert (
"Error adding value 'member1' to set 'test_set': Connection failed"
in result
)
@pytest.mark.asyncio
async def test_sadd_numeric_member(self, mock_redis_connection_manager):
"""Test set add operation with numeric member."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1
result = await sadd("test_set", 42)
mock_redis.sadd.assert_called_once_with("test_set", 42)
assert "Value '42' added successfully to set 'test_set'" in result
@pytest.mark.asyncio
async def test_srem_success(self, mock_redis_connection_manager):
"""Test successful set remove operation."""
mock_redis = mock_redis_connection_manager
mock_redis.srem.return_value = 1 # Number of elements removed
result = await srem("test_set", "member1")
mock_redis.srem.assert_called_once_with("test_set", "member1")
assert "Value 'member1' removed from set 'test_set'" in result
@pytest.mark.asyncio
async def test_srem_member_not_exists(self, mock_redis_connection_manager):
"""Test set remove operation when member doesn't exist."""
mock_redis = mock_redis_connection_manager
mock_redis.srem.return_value = 0 # Member doesn't exist
result = await srem("test_set", "nonexistent_member")
assert "Value 'nonexistent_member' not found in set 'test_set'" in result
@pytest.mark.asyncio
async def test_srem_redis_error(self, mock_redis_connection_manager):
"""Test set remove operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.srem.side_effect = RedisError("Connection failed")
result = await srem("test_set", "member1")
assert (
"Error removing value 'member1' from set 'test_set': Connection failed"
in result
)
@pytest.mark.asyncio
async def test_srem_numeric_member(self, mock_redis_connection_manager):
"""Test set remove operation with numeric member."""
mock_redis = mock_redis_connection_manager
mock_redis.srem.return_value = 1
result = await srem("test_set", 42)
mock_redis.srem.assert_called_once_with("test_set", 42)
assert "Value '42' removed from set 'test_set'" in result
@pytest.mark.asyncio
async def test_smembers_success(self, mock_redis_connection_manager):
"""Test successful set members operation."""
mock_redis = mock_redis_connection_manager
mock_redis.smembers.return_value = {"member1", "member2", "member3"}
result = await smembers("test_set")
mock_redis.smembers.assert_called_once_with("test_set")
assert set(result) == {"member1", "member2", "member3"}
@pytest.mark.asyncio
async def test_smembers_empty_set(self, mock_redis_connection_manager):
"""Test set members operation on empty set."""
mock_redis = mock_redis_connection_manager
mock_redis.smembers.return_value = set()
result = await smembers("empty_set")
assert "Set 'empty_set' is empty or does not exist" in result
@pytest.mark.asyncio
async def test_smembers_redis_error(self, mock_redis_connection_manager):
"""Test set members operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.smembers.side_effect = RedisError("Connection failed")
result = await smembers("test_set")
assert "Error retrieving members of set 'test_set': Connection failed" in result
@pytest.mark.asyncio
async def test_smembers_single_member(self, mock_redis_connection_manager):
"""Test set members operation with single member."""
mock_redis = mock_redis_connection_manager
mock_redis.smembers.return_value = {"single_member"}
result = await smembers("test_set")
assert result == ["single_member"]
@pytest.mark.asyncio
async def test_smembers_numeric_members(self, mock_redis_connection_manager):
"""Test set members operation with numeric members."""
mock_redis = mock_redis_connection_manager
mock_redis.smembers.return_value = {"1", "2", "3", "42"}
result = await smembers("numeric_set")
assert set(result) == {"1", "2", "3", "42"}
@pytest.mark.asyncio
async def test_sadd_expiration_error(self, mock_redis_connection_manager):
"""Test set add operation when expiration fails."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1
mock_redis.expire.side_effect = RedisError("Expire failed")
result = await sadd("test_set", "member1", 60)
assert "Error adding value 'member1' to set 'test_set': Expire failed" in result
@pytest.mark.asyncio
async def test_sadd_with_special_characters(self, mock_redis_connection_manager):
"""Test set add operation with special characters in member."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1
special_member = "member:with:colons"
result = await sadd("test_set", special_member)
mock_redis.sadd.assert_called_once_with("test_set", special_member)
assert (
f"Value '{special_member}' added successfully to set 'test_set'" in result
)
@pytest.mark.asyncio
async def test_sadd_with_unicode_member(self, mock_redis_connection_manager):
"""Test set add operation with unicode member."""
mock_redis = mock_redis_connection_manager
mock_redis.sadd.return_value = 1
unicode_member = "测试成员 🚀"
result = await sadd("test_set", unicode_member)
mock_redis.sadd.assert_called_once_with("test_set", unicode_member)
assert (
f"Value '{unicode_member}' added successfully to set 'test_set'" in result
)
@pytest.mark.asyncio
async def test_smembers_large_set(self, mock_redis_connection_manager):
"""Test set members operation with large set."""
mock_redis = mock_redis_connection_manager
large_set = {f"member_{i}" for i in range(1000)}
mock_redis.smembers.return_value = large_set
result = await smembers("large_set")
# smembers returns a list, not a set
assert isinstance(result, list)
assert len(result) == 1000
@pytest.mark.asyncio
async def test_srem_multiple_members_behavior(self, mock_redis_connection_manager):
"""Test that srem function handles single member correctly."""
mock_redis = mock_redis_connection_manager
mock_redis.srem.return_value = 1
result = await srem("test_set", "single_member")
# Should call srem with single member, not multiple members
mock_redis.srem.assert_called_once_with("test_set", "single_member")
assert "Value 'single_member' removed from set 'test_set'" in result
@pytest.mark.asyncio
async def test_connection_manager_called_correctly(self):
"""Test that RedisConnectionManager.get_connection is called correctly."""
with patch(
"src.tools.set.RedisConnectionManager.get_connection"
) as mock_get_conn:
mock_redis = Mock()
mock_redis.sadd.return_value = 1
mock_get_conn.return_value = mock_redis
await sadd("test_set", "member1")
mock_get_conn.assert_called_once()
@pytest.mark.asyncio
async def test_function_signatures(self):
"""Test that functions have correct signatures."""
import inspect
# Test sadd function signature
sadd_sig = inspect.signature(sadd)
sadd_params = list(sadd_sig.parameters.keys())
assert sadd_params == ["name", "value", "expire_seconds"]
assert sadd_sig.parameters["expire_seconds"].default is None
# Test srem function signature
srem_sig = inspect.signature(srem)
srem_params = list(srem_sig.parameters.keys())
assert srem_params == ["name", "value"]
# Test smembers function signature
smembers_sig = inspect.signature(smembers)
smembers_params = list(smembers_sig.parameters.keys())
assert smembers_params == ["name"]
```
--------------------------------------------------------------------------------
/tests/tools/test_server_management.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/tools/server_management.py
"""
import pytest
from redis.exceptions import ConnectionError, RedisError
from src.tools.server_management import client_list, dbsize, info
class TestServerManagementOperations:
"""Test cases for Redis server management operations."""
@pytest.mark.asyncio
async def test_dbsize_success(self, mock_redis_connection_manager):
"""Test successful database size operation."""
mock_redis = mock_redis_connection_manager
mock_redis.dbsize.return_value = 1000
result = await dbsize()
mock_redis.dbsize.assert_called_once()
assert result == 1000
@pytest.mark.asyncio
async def test_dbsize_zero_keys(self, mock_redis_connection_manager):
"""Test database size operation with empty database."""
mock_redis = mock_redis_connection_manager
mock_redis.dbsize.return_value = 0
result = await dbsize()
assert result == 0
@pytest.mark.asyncio
async def test_dbsize_redis_error(self, mock_redis_connection_manager):
"""Test database size operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.dbsize.side_effect = RedisError("Connection failed")
result = await dbsize()
assert "Error getting database size: Connection failed" in result
@pytest.mark.asyncio
async def test_info_success_default_section(self, mock_redis_connection_manager):
"""Test successful info operation with default section."""
mock_redis = mock_redis_connection_manager
mock_info = {
"redis_version": "7.0.0",
"used_memory": "1024000",
"connected_clients": "5",
"total_commands_processed": "1000",
}
mock_redis.info.return_value = mock_info
result = await info()
mock_redis.info.assert_called_once_with("default")
assert result == mock_info
@pytest.mark.asyncio
async def test_info_success_specific_section(self, mock_redis_connection_manager):
"""Test successful info operation with specific section."""
mock_redis = mock_redis_connection_manager
mock_memory_info = {
"used_memory": "2048000",
"used_memory_human": "2.00M",
"used_memory_peak": "3072000",
"used_memory_peak_human": "3.00M",
}
mock_redis.info.return_value = mock_memory_info
result = await info("memory")
mock_redis.info.assert_called_once_with("memory")
assert result == mock_memory_info
@pytest.mark.asyncio
async def test_info_all_sections(self, mock_redis_connection_manager):
"""Test info operation with 'all' section."""
mock_redis = mock_redis_connection_manager
mock_all_info = {
"redis_version": "7.0.0",
"used_memory": "1024000",
"connected_clients": "5",
"keyspace_hits": "500",
"keyspace_misses": "100",
}
mock_redis.info.return_value = mock_all_info
result = await info("all")
mock_redis.info.assert_called_once_with("all")
assert result == mock_all_info
@pytest.mark.asyncio
async def test_info_redis_error(self, mock_redis_connection_manager):
"""Test info operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.info.side_effect = RedisError("Connection failed")
result = await info("server")
assert "Error retrieving Redis info: Connection failed" in result
@pytest.mark.asyncio
async def test_info_invalid_section(self, mock_redis_connection_manager):
"""Test info operation with invalid section."""
mock_redis = mock_redis_connection_manager
mock_redis.info.side_effect = RedisError("Unknown section")
result = await info("invalid_section")
assert "Error retrieving Redis info: Unknown section" in result
@pytest.mark.asyncio
async def test_client_list_success(self, mock_redis_connection_manager):
"""Test successful client list operation."""
mock_redis = mock_redis_connection_manager
mock_clients = [
{
"id": "1",
"addr": "127.0.0.1:12345",
"name": "client1",
"age": "100",
"idle": "0",
"flags": "N",
"db": "0",
"sub": "0",
"psub": "0",
"multi": "-1",
"qbuf": "0",
"qbuf-free": "32768",
"obl": "0",
"oll": "0",
"omem": "0",
"events": "r",
"cmd": "client",
},
{
"id": "2",
"addr": "127.0.0.1:12346",
"name": "client2",
"age": "200",
"idle": "5",
"flags": "N",
"db": "1",
"sub": "0",
"psub": "0",
"multi": "-1",
"qbuf": "0",
"qbuf-free": "32768",
"obl": "0",
"oll": "0",
"omem": "0",
"events": "r",
"cmd": "get",
},
]
mock_redis.client_list.return_value = mock_clients
result = await client_list()
mock_redis.client_list.assert_called_once()
assert result == mock_clients
assert len(result) == 2
assert result[0]["id"] == "1"
assert result[1]["id"] == "2"
@pytest.mark.asyncio
async def test_client_list_empty(self, mock_redis_connection_manager):
"""Test client list operation with no clients."""
mock_redis = mock_redis_connection_manager
mock_redis.client_list.return_value = []
result = await client_list()
assert result == []
@pytest.mark.asyncio
async def test_client_list_redis_error(self, mock_redis_connection_manager):
"""Test client list operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.client_list.side_effect = RedisError("Connection failed")
result = await client_list()
assert "Error retrieving client list: Connection failed" in result
@pytest.mark.asyncio
async def test_client_list_connection_error(self, mock_redis_connection_manager):
"""Test client list operation with connection error."""
mock_redis = mock_redis_connection_manager
mock_redis.client_list.side_effect = ConnectionError("Redis server unavailable")
result = await client_list()
assert "Error retrieving client list: Redis server unavailable" in result
@pytest.mark.asyncio
async def test_info_stats_section(self, mock_redis_connection_manager):
"""Test info operation with stats section."""
mock_redis = mock_redis_connection_manager
mock_stats_info = {
"total_connections_received": "1000",
"total_commands_processed": "5000",
"instantaneous_ops_per_sec": "10",
"total_net_input_bytes": "1024000",
"total_net_output_bytes": "2048000",
"instantaneous_input_kbps": "1.5",
"instantaneous_output_kbps": "3.0",
"rejected_connections": "0",
"sync_full": "0",
"sync_partial_ok": "0",
"sync_partial_err": "0",
"expired_keys": "100",
"evicted_keys": "0",
"keyspace_hits": "4000",
"keyspace_misses": "1000",
"pubsub_channels": "0",
"pubsub_patterns": "0",
"latest_fork_usec": "0",
}
mock_redis.info.return_value = mock_stats_info
result = await info("stats")
mock_redis.info.assert_called_once_with("stats")
assert result == mock_stats_info
assert "keyspace_hits" in result
assert "keyspace_misses" in result
@pytest.mark.asyncio
async def test_info_replication_section(self, mock_redis_connection_manager):
"""Test info operation with replication section."""
mock_redis = mock_redis_connection_manager
mock_replication_info = {
"role": "master",
"connected_slaves": "2",
"master_replid": "abc123def456",
"master_replid2": "0000000000000000000000000000000000000000",
"master_repl_offset": "1000",
"second_repl_offset": "-1",
"repl_backlog_active": "1",
"repl_backlog_size": "1048576",
"repl_backlog_first_byte_offset": "1",
"repl_backlog_histlen": "1000",
}
mock_redis.info.return_value = mock_replication_info
result = await info("replication")
mock_redis.info.assert_called_once_with("replication")
assert result == mock_replication_info
assert result["role"] == "master"
assert result["connected_slaves"] == "2"
@pytest.mark.asyncio
async def test_dbsize_large_number(self, mock_redis_connection_manager):
"""Test database size operation with large number of keys."""
mock_redis = mock_redis_connection_manager
mock_redis.dbsize.return_value = 1000000 # 1 million keys
result = await dbsize()
assert result == 1000000
@pytest.mark.asyncio
async def test_client_list_single_client(self, mock_redis_connection_manager):
"""Test client list operation with single client."""
mock_redis = mock_redis_connection_manager
mock_clients = [
{
"id": "1",
"addr": "127.0.0.1:12345",
"name": "",
"age": "50",
"idle": "0",
"flags": "N",
"db": "0",
"sub": "0",
"psub": "0",
"multi": "-1",
"qbuf": "0",
"qbuf-free": "32768",
"obl": "0",
"oll": "0",
"omem": "0",
"events": "r",
"cmd": "ping",
}
]
mock_redis.client_list.return_value = mock_clients
result = await client_list()
assert len(result) == 1
assert result[0]["id"] == "1"
assert result[0]["cmd"] == "ping"
```
--------------------------------------------------------------------------------
/tests/tools/test_list.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/tools/list.py
"""
import pytest
from redis.exceptions import RedisError
from src.tools.list import llen, lpop, lpush, lrange, rpop, rpush
class TestListOperations:
"""Test cases for Redis list operations."""
@pytest.mark.asyncio
async def test_lpush_success(self, mock_redis_connection_manager):
"""Test successful left push operation."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.return_value = 2 # New length of list
result = await lpush("test_list", "value1")
mock_redis.lpush.assert_called_once_with("test_list", "value1")
assert "Value 'value1' pushed to the left of list 'test_list'" in result
@pytest.mark.asyncio
async def test_lpush_with_expiration(self, mock_redis_connection_manager):
"""Test left push operation with expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.return_value = 1
mock_redis.expire.return_value = True
result = await lpush("test_list", "value1", 60)
mock_redis.lpush.assert_called_once_with("test_list", "value1")
mock_redis.expire.assert_called_once_with("test_list", 60)
# The implementation doesn't include expiration info in the message
assert "Value 'value1' pushed to the left of list 'test_list'" in result
@pytest.mark.asyncio
async def test_lpush_redis_error(self, mock_redis_connection_manager):
"""Test left push operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.side_effect = RedisError("Connection failed")
result = await lpush("test_list", "value1")
assert "Error pushing value to list 'test_list': Connection failed" in result
@pytest.mark.asyncio
async def test_rpush_success(self, mock_redis_connection_manager):
"""Test successful right push operation."""
mock_redis = mock_redis_connection_manager
mock_redis.rpush.return_value = 3
result = await rpush("test_list", "value2")
mock_redis.rpush.assert_called_once_with("test_list", "value2")
assert "Value 'value2' pushed to the right of list 'test_list'" in result
@pytest.mark.asyncio
async def test_rpush_with_expiration(self, mock_redis_connection_manager):
"""Test right push operation with expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.rpush.return_value = 1
mock_redis.expire.return_value = True
result = await rpush("test_list", "value2", 120)
mock_redis.rpush.assert_called_once_with("test_list", "value2")
mock_redis.expire.assert_called_once_with("test_list", 120)
# The implementation doesn't include expiration info in the message
assert "Value 'value2' pushed to the right of list 'test_list'" in result
@pytest.mark.asyncio
async def test_rpush_redis_error(self, mock_redis_connection_manager):
"""Test right push operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.rpush.side_effect = RedisError("Connection failed")
result = await rpush("test_list", "value2")
assert "Error pushing value to list 'test_list': Connection failed" in result
@pytest.mark.asyncio
async def test_lpop_success(self, mock_redis_connection_manager):
"""Test successful left pop operation."""
mock_redis = mock_redis_connection_manager
mock_redis.lpop.return_value = "popped_value"
result = await lpop("test_list")
mock_redis.lpop.assert_called_once_with("test_list")
assert result == "popped_value"
@pytest.mark.asyncio
async def test_lpop_empty_list(self, mock_redis_connection_manager):
"""Test left pop operation on empty list."""
mock_redis = mock_redis_connection_manager
mock_redis.lpop.return_value = None
result = await lpop("empty_list")
assert "List 'empty_list' is empty" in result
@pytest.mark.asyncio
async def test_lpop_redis_error(self, mock_redis_connection_manager):
"""Test left pop operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.lpop.side_effect = RedisError("Connection failed")
result = await lpop("test_list")
assert "Error popping value from list 'test_list': Connection failed" in result
@pytest.mark.asyncio
async def test_rpop_success(self, mock_redis_connection_manager):
"""Test successful right pop operation."""
mock_redis = mock_redis_connection_manager
mock_redis.rpop.return_value = "right_popped_value"
result = await rpop("test_list")
mock_redis.rpop.assert_called_once_with("test_list")
assert result == "right_popped_value"
@pytest.mark.asyncio
async def test_rpop_empty_list(self, mock_redis_connection_manager):
"""Test right pop operation on empty list."""
mock_redis = mock_redis_connection_manager
mock_redis.rpop.return_value = None
result = await rpop("empty_list")
assert "List 'empty_list' is empty" in result
@pytest.mark.asyncio
async def test_rpop_redis_error(self, mock_redis_connection_manager):
"""Test right pop operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.rpop.side_effect = RedisError("Connection failed")
result = await rpop("test_list")
assert "Error popping value from list 'test_list': Connection failed" in result
@pytest.mark.asyncio
async def test_lrange_success(self, mock_redis_connection_manager):
"""Test successful list range operation."""
mock_redis = mock_redis_connection_manager
mock_redis.lrange.return_value = ["item1", "item2", "item3"]
result = await lrange("test_list", 0, 2)
mock_redis.lrange.assert_called_once_with("test_list", 0, 2)
assert result == '["item1", "item2", "item3"]'
@pytest.mark.asyncio
async def test_lrange_default_parameters(self, mock_redis_connection_manager):
"""Test list range operation with default parameters."""
mock_redis = mock_redis_connection_manager
mock_redis.lrange.return_value = ["item1", "item2"]
result = await lrange("test_list", 0, -1)
mock_redis.lrange.assert_called_once_with("test_list", 0, -1)
assert result == '["item1", "item2"]'
@pytest.mark.asyncio
async def test_lrange_empty_list(self, mock_redis_connection_manager):
"""Test list range operation on empty list."""
mock_redis = mock_redis_connection_manager
mock_redis.lrange.return_value = []
result = await lrange("empty_list", 0, -1)
assert "List 'empty_list' is empty or does not exist" in result
@pytest.mark.asyncio
async def test_lrange_redis_error(self, mock_redis_connection_manager):
"""Test list range operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.lrange.side_effect = RedisError("Connection failed")
result = await lrange("test_list", 0, -1)
assert (
"Error retrieving values from list 'test_list': Connection failed" in result
)
@pytest.mark.asyncio
async def test_llen_success(self, mock_redis_connection_manager):
"""Test successful list length operation."""
mock_redis = mock_redis_connection_manager
mock_redis.llen.return_value = 5
result = await llen("test_list")
mock_redis.llen.assert_called_once_with("test_list")
assert result == 5
@pytest.mark.asyncio
async def test_llen_empty_list(self, mock_redis_connection_manager):
"""Test list length operation on empty list."""
mock_redis = mock_redis_connection_manager
mock_redis.llen.return_value = 0
result = await llen("empty_list")
assert result == 0
@pytest.mark.asyncio
async def test_llen_redis_error(self, mock_redis_connection_manager):
"""Test list length operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.llen.side_effect = RedisError("Connection failed")
result = await llen("test_list")
assert (
"Error retrieving length of list 'test_list': Connection failed" in result
)
@pytest.mark.asyncio
async def test_push_operations_with_numeric_values(
self, mock_redis_connection_manager
):
"""Test push operations with numeric values."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.return_value = 1
mock_redis.rpush.return_value = 2
# Test with integer
result1 = await lpush("test_list", 42)
mock_redis.lpush.assert_called_with("test_list", 42)
# Test with float
result2 = await rpush("test_list", 3.14)
mock_redis.rpush.assert_called_with("test_list", 3.14)
assert "pushed to the left of list" in result1
assert "pushed to the right of list" in result2
@pytest.mark.asyncio
async def test_lrange_with_negative_indices(self, mock_redis_connection_manager):
"""Test list range operation with negative indices."""
mock_redis = mock_redis_connection_manager
mock_redis.lrange.return_value = ["last_item"]
result = await lrange("test_list", -1, -1)
mock_redis.lrange.assert_called_once_with("test_list", -1, -1)
assert result == '["last_item"]'
@pytest.mark.asyncio
async def test_expiration_error_handling(self, mock_redis_connection_manager):
"""Test expiration error handling in push operations."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.return_value = 1
mock_redis.expire.side_effect = RedisError("Expire failed")
result = await lpush("test_list", "value", 60)
# Should report the expire error
assert "Error pushing value to list 'test_list': Expire failed" in result
@pytest.mark.asyncio
async def test_push_operations_return_new_length(
self, mock_redis_connection_manager
):
"""Test that push operations handle return values correctly."""
mock_redis = mock_redis_connection_manager
mock_redis.lpush.return_value = 3
mock_redis.rpush.return_value = 4
result1 = await lpush("test_list", "value1")
result2 = await rpush("test_list", "value2")
# Results should indicate successful push regardless of return value
assert "pushed to the left of list" in result1
assert "pushed to the right of list" in result2
```
--------------------------------------------------------------------------------
/tests/tools/test_sorted_set.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for src/tools/sorted_set.py
"""
from unittest.mock import Mock, patch
import pytest
from redis.exceptions import RedisError
from src.tools.sorted_set import zadd, zrange, zrem
class TestSortedSetOperations:
"""Test cases for Redis sorted set operations."""
@pytest.mark.asyncio
async def test_zadd_success(self, mock_redis_connection_manager):
"""Test successful sorted set add operation."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1 # Number of elements added
result = await zadd("test_zset", 1.5, "member1")
mock_redis.zadd.assert_called_once_with("test_zset", {"member1": 1.5})
assert "Successfully added member1 to test_zset with score 1.5" in result
@pytest.mark.asyncio
async def test_zadd_with_expiration(self, mock_redis_connection_manager):
"""Test sorted set add operation with expiration."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
mock_redis.expire.return_value = True
result = await zadd("test_zset", 2.0, "member1", 60)
mock_redis.zadd.assert_called_once_with("test_zset", {"member1": 2.0})
mock_redis.expire.assert_called_once_with("test_zset", 60)
assert "and expiration 60 seconds" in result
@pytest.mark.asyncio
async def test_zadd_member_updated(self, mock_redis_connection_manager):
"""Test sorted set add operation when member score is updated."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 0 # Member already exists, score updated
result = await zadd("test_zset", 3.0, "existing_member")
assert (
"Successfully added existing_member to test_zset with score 3.0" in result
)
@pytest.mark.asyncio
async def test_zadd_redis_error(self, mock_redis_connection_manager):
"""Test sorted set add operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.side_effect = RedisError("Connection failed")
result = await zadd("test_zset", 1.0, "member1")
assert "Error adding to sorted set test_zset: Connection failed" in result
@pytest.mark.asyncio
async def test_zadd_integer_score(self, mock_redis_connection_manager):
"""Test sorted set add operation with integer score."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
result = await zadd("test_zset", 5, "member1")
mock_redis.zadd.assert_called_once_with("test_zset", {"member1": 5})
assert "Successfully added member1 to test_zset with score 5" in result
@pytest.mark.asyncio
async def test_zrange_success_without_scores(self, mock_redis_connection_manager):
"""Test successful sorted set range operation without scores."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.return_value = ["member1", "member2", "member3"]
result = await zrange("test_zset", 0, 2)
mock_redis.zrange.assert_called_once_with("test_zset", 0, 2, withscores=False)
assert result == "['member1', 'member2', 'member3']"
@pytest.mark.asyncio
async def test_zrange_success_with_scores(self, mock_redis_connection_manager):
"""Test successful sorted set range operation with scores."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.return_value = [
("member1", 1.0),
("member2", 2.0),
("member3", 3.0),
]
result = await zrange("test_zset", 0, 2, True)
mock_redis.zrange.assert_called_once_with("test_zset", 0, 2, withscores=True)
assert result == "[('member1', 1.0), ('member2', 2.0), ('member3', 3.0)]"
@pytest.mark.asyncio
async def test_zrange_default_parameters(self, mock_redis_connection_manager):
"""Test sorted set range operation with default parameters."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.return_value = ["member1", "member2"]
result = await zrange("test_zset", 0, -1)
mock_redis.zrange.assert_called_once_with("test_zset", 0, -1, withscores=False)
assert result == "['member1', 'member2']"
@pytest.mark.asyncio
async def test_zrange_empty_set(self, mock_redis_connection_manager):
"""Test sorted set range operation on empty set."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.return_value = []
result = await zrange("empty_zset", 0, -1)
mock_redis.zrange.assert_called_once_with("empty_zset", 0, -1, withscores=False)
assert "Sorted set empty_zset is empty or does not exist" in result
@pytest.mark.asyncio
async def test_zrange_redis_error(self, mock_redis_connection_manager):
"""Test sorted set range operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.side_effect = RedisError("Connection failed")
result = await zrange("test_zset", 0, -1)
assert "Error retrieving sorted set test_zset: Connection failed" in result
@pytest.mark.asyncio
async def test_zrem_success(self, mock_redis_connection_manager):
"""Test successful sorted set remove operation."""
mock_redis = mock_redis_connection_manager
mock_redis.zrem.return_value = 1 # Number of elements removed
result = await zrem("test_zset", "member1")
mock_redis.zrem.assert_called_once_with("test_zset", "member1")
assert "Successfully removed member1 from test_zset" in result
@pytest.mark.asyncio
async def test_zrem_member_not_exists(self, mock_redis_connection_manager):
"""Test sorted set remove operation when member doesn't exist."""
mock_redis = mock_redis_connection_manager
mock_redis.zrem.return_value = 0 # Member doesn't exist
result = await zrem("test_zset", "nonexistent_member")
assert "Member nonexistent_member not found in test_zset" in result
@pytest.mark.asyncio
async def test_zrem_redis_error(self, mock_redis_connection_manager):
"""Test sorted set remove operation with Redis error."""
mock_redis = mock_redis_connection_manager
mock_redis.zrem.side_effect = RedisError("Connection failed")
result = await zrem("test_zset", "member1")
assert "Error removing from sorted set test_zset: Connection failed" in result
@pytest.mark.asyncio
async def test_zadd_negative_score(self, mock_redis_connection_manager):
"""Test sorted set add operation with negative score."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
result = await zadd("test_zset", -1.5, "negative_member")
mock_redis.zadd.assert_called_once_with("test_zset", {"negative_member": -1.5})
assert (
"Successfully added negative_member to test_zset with score -1.5" in result
)
@pytest.mark.asyncio
async def test_zadd_zero_score(self, mock_redis_connection_manager):
"""Test sorted set add operation with zero score."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
result = await zadd("test_zset", 0, "zero_member")
mock_redis.zadd.assert_called_once_with("test_zset", {"zero_member": 0})
assert "Successfully added zero_member to test_zset with score 0" in result
@pytest.mark.asyncio
async def test_zrange_negative_indices(self, mock_redis_connection_manager):
"""Test sorted set range operation with negative indices."""
mock_redis = mock_redis_connection_manager
mock_redis.zrange.return_value = ["last_member"]
result = await zrange("test_zset", -1, -1)
mock_redis.zrange.assert_called_once_with("test_zset", -1, -1, withscores=False)
assert result == "['last_member']"
@pytest.mark.asyncio
async def test_zadd_expiration_error(self, mock_redis_connection_manager):
"""Test sorted set add operation when expiration fails."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
mock_redis.expire.side_effect = RedisError("Expire failed")
result = await zadd("test_zset", 1.0, "member1", 60)
assert "Error adding to sorted set test_zset: Expire failed" in result
@pytest.mark.asyncio
async def test_zadd_with_unicode_member(self, mock_redis_connection_manager):
"""Test sorted set add operation with unicode member."""
mock_redis = mock_redis_connection_manager
mock_redis.zadd.return_value = 1
unicode_member = "测试成员 🚀"
result = await zadd("test_zset", 1.0, unicode_member)
mock_redis.zadd.assert_called_once_with("test_zset", {unicode_member: 1.0})
assert (
f"Successfully added {unicode_member} to test_zset with score 1.0" in result
)
@pytest.mark.asyncio
async def test_zrange_large_range(self, mock_redis_connection_manager):
"""Test sorted set range operation with large range."""
mock_redis = mock_redis_connection_manager
large_result = [f"member_{i}" for i in range(1000)]
mock_redis.zrange.return_value = large_result
result = await zrange("large_zset", 0, 999)
# The function returns a string representation
assert result == str(large_result)
# Check that the original list had 1000 items
assert len(large_result) == 1000
@pytest.mark.asyncio
async def test_connection_manager_called_correctly(self):
"""Test that RedisConnectionManager.get_connection is called correctly."""
with patch(
"src.tools.sorted_set.RedisConnectionManager.get_connection"
) as mock_get_conn:
mock_redis = Mock()
mock_redis.zadd.return_value = 1
mock_get_conn.return_value = mock_redis
await zadd("test_zset", 1.0, "member1")
mock_get_conn.assert_called_once()
@pytest.mark.asyncio
async def test_function_signatures(self):
"""Test that functions have correct signatures."""
import inspect
# Test zadd function signature
zadd_sig = inspect.signature(zadd)
zadd_params = list(zadd_sig.parameters.keys())
assert zadd_params == ["key", "score", "member", "expiration"]
assert zadd_sig.parameters["expiration"].default is None
# Test zrange function signature
zrange_sig = inspect.signature(zrange)
zrange_params = list(zrange_sig.parameters.keys())
assert zrange_params == ["key", "start", "end", "with_scores"]
# start and end are required parameters (no defaults)
assert zrange_sig.parameters["start"].default == inspect.Parameter.empty
assert zrange_sig.parameters["end"].default == inspect.Parameter.empty
assert zrange_sig.parameters["with_scores"].default is False
# Test zrem function signature
zrem_sig = inspect.signature(zrem)
zrem_params = list(zrem_sig.parameters.keys())
assert zrem_params == ["key", "member"]
```