# Directory Structure
```
├── .gitignore
├── .gitlab-ci.yml
├── .pre-commit-config.yaml
├── alembic
│ ├── env.py
│ ├── README
│ ├── script.py.mako
│ └── versions
│ ├── 2025_03_24_0945-ffe7a88d5b15_initial_migration.py
│ ├── 2025_04_09_1117-e25328dbfe80_set_logo_to_none.py
│ ├── 2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py
│ ├── 2025_04_25_1158-49e1ab420276_host_type.py
│ ├── 2025_04_30_1049-387bed3ce6ae_server_title.py
│ ├── 2025_05_12_1404-735603f5e4d5_creator_id.py
│ ├── 2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py
│ ├── 2025_05_13_1326-4b751cb703e2_timestamps.py
│ ├── 2025_05_15_1414-7a3913a3bb4c_logs.py
│ ├── 2025_05_27_0757-7e8c31ddb2db_json_logs.py
│ ├── 2025_05_28_0632-f2298e6acd39_base_image.py
│ ├── 2025_05_30_1231-4b5c0d56e1ca_build_instructions.py
│ ├── 2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py
│ ├── 2025_06_07_1322-aa9a14a698c5_jsonb_logs.py
│ └── 2025_06_17_1353-667874dd7dee_added_alias.py
├── alembic.ini
├── common
│ ├── __init__.py
│ ├── llm
│ │ ├── __init__.py
│ │ └── client.py
│ └── notifications
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ └── exceptions.py
├── deploy
│ ├── docker-version
│ ├── push-and-run
│ ├── ssh-agent
│ └── ssh-passphrase
├── docker-compose-debug.yaml
├── docker-compose.yaml
├── healthchecker
│ ├── __init__.py
│ ├── Dockerfile
│ ├── scripts
│ │ ├── worker-start-debug.sh
│ │ └── worker-start.sh
│ └── src
│ ├── __init__.py
│ ├── __main__.py
│ └── checker.py
├── LICENSE
├── mcp_server
│ ├── __init__.py
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── scripts
│ │ ├── __init__.py
│ │ ├── mcp_health_check.py
│ │ ├── start-debug.sh
│ │ └── start.sh
│ ├── server.json
│ └── src
│ ├── __init__.py
│ ├── decorators.py
│ ├── llm
│ │ ├── __init__.py
│ │ ├── prompts.py
│ │ └── tool_manager.py
│ ├── logger.py
│ ├── main.py
│ ├── registry_client.py
│ ├── schemas.py
│ ├── settings.py
│ └── tools.py
├── pyproject.toml
├── README.md
├── registry
│ ├── __init__.py
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── scripts
│ │ ├── requirements
│ │ ├── start-debug.sh
│ │ └── start.sh
│ └── src
│ ├── __init__.py
│ ├── base_schema.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── log.py
│ │ │ ├── mixins
│ │ │ │ ├── __init__.py
│ │ │ │ ├── has_created_at.py
│ │ │ │ ├── has_server_id.py
│ │ │ │ └── has_updated_at.py
│ │ │ ├── server.py
│ │ │ └── tool.py
│ │ └── session.py
│ ├── deployments
│ │ ├── __init__.py
│ │ ├── logs_repository.py
│ │ ├── router.py
│ │ ├── schemas.py
│ │ └── service.py
│ ├── errors.py
│ ├── logger.py
│ ├── main.py
│ ├── producer.py
│ ├── search
│ │ ├── __init__.py
│ │ ├── prompts.py
│ │ ├── schemas.py
│ │ └── service.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── repository.py
│ │ ├── router.py
│ │ ├── schemas.py
│ │ └── service.py
│ ├── settings.py
│ ├── types.py
│ └── validator
│ ├── __init__.py
│ ├── constants.py
│ ├── prompts.py
│ ├── schemas.py
│ ├── service.py
│ └── ssl.py
├── scripts
│ ├── darp-add.py
│ ├── darp-router.py
│ └── darp-search.py
└── worker
├── __init__.py
├── Dockerfile
├── requirements.txt
├── scripts
│ ├── worker-start-debug.sh
│ └── worker-start.sh
└── src
├── __init__.py
├── constants.py
├── consumer.py
├── deployment_service.py
├── docker_service.py
├── dockerfile_generators
│ ├── __init__.py
│ ├── base.py
│ ├── factory.py
│ ├── python
│ │ ├── __init__.py
│ │ └── generic.py
│ ├── typescript
│ │ ├── __init__.py
│ │ └── generic.py
│ └── user_provided.py
├── errors.py
├── main.py
├── schemas.py
└── user_logger.py
```
# Files
--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------
```yaml
exclude: 'alembic/versions'
repos:
- repo: https://github.com/psf/black
rev: 24.4.2
hooks:
- id: black
- repo: https://github.com/asottile/reorder-python-imports
rev: v3.13.0
hooks:
- id: reorder-python-imports
args: [--py312-plus]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.1
hooks:
- id: mypy
exclude: alembic
additional_dependencies: [types-requests]
- repo: https://github.com/PyCQA/flake8
rev: 7.1.0
hooks:
- id: flake8
additional_dependencies: [flake8-pyproject]
```
--------------------------------------------------------------------------------
/.gitlab-ci.yml:
--------------------------------------------------------------------------------
```yaml
stages:
- test
- build
- deploy
test-pre-commit:
stage: test
tags:
- darp-group-shell-runner
script:
- source ~/miniconda3/bin/activate && pre-commit run --all-files
test-migrations:
stage: test
tags:
- darp-group-shell-runner
script:
- duplicates=`grep -h down_revision alembic/versions/*.py | sort | uniq -d`
- echo "$duplicates"
- test -z "$duplicates"
clean-pre-commit:
stage: test
tags:
- darp-group-shell-runner
script:
- source ~/miniconda3/bin/activate && pre-commit clean
when: manual
build:
stage: build
when: manual
tags:
- darp-group-shell-runner
script:
- source deploy/docker-version
- docker compose --profile main build
deploy-test:
stage: deploy
needs: ["build"]
when: manual
tags:
- darp-group-shell-runner
script:
- DOCKER_NETWORK=highkey_network deploy/push-and-run "$TEST_DOCKER_HOST"
deploy-memelabs:
stage: deploy
needs: ["build"]
when: manual
tags:
- darp-group-shell-runner
script:
- DOCKER_NETWORK=highkey_network deploy/push-and-run "$MEMELABS_DOCKER_HOST"
rules:
- if: $CI_COMMIT_BRANCH == "main"
deploy-prod:
stage: deploy
when: manual
needs: ["build"]
tags:
- darp-group-shell-runner
script:
- DOCKER_NETWORK=highkey_network deploy/push-and-run "$PRODUCTION_DOCKER_HOST"
rules:
- if: $CI_COMMIT_BRANCH == "main"
deploy-toci-test:
stage: deploy
when: manual
needs: ["build"]
tags:
- darp-group-shell-runner
script:
- deploy/push-and-run "$TOCI_TEST_DOCKER_HOST"
deploy-toci-prod:
stage: deploy
when: manual
needs: ["build"]
tags:
- darp-group-shell-runner
script:
- deploy/push-and-run "$TOCI_PRODUCTION_DOCKER_HOST"
rules:
- if: $CI_COMMIT_BRANCH == "main"
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# VSCode
.vscode/
# JetBrains
.idea
# WandB
wandb/
# Old
old/
temp/
profiler/
# Logs
logs/
# eval
eval_results/
evalplus_codegen/
# All datasets
dataset/
dataset_processed/
dataset_processed_*/
PDF/
*.xlsx
*.csv
# All evaluation results
eval_baselines/
eval_results/
eval_results_temp/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.ipynb
*.pid
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pdm
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.env.*
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# MacOs
.DS_Store
# Local history
.history
# PyCharm
.idea/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# DARPEngine
The MCP searchengine for DARP.
[![X][x-image]][x-url]
[![Code style: black][black-image]][black-url]
[![Imports: reorder-python-imports][imports-image]][imports-url]
[![Pydantic v2][pydantic-image]][pydantic-url]
[![pre-commit][pre-commit-image]][pre-commit-url]
[![License MIT][license-image]][license-url]
DARPEngine stores metadata for MCP servers hosted online and provides smart search capabilites.
## Features
* Simple CLI
* API access to search
* MCP tool to retrieve search results for connecting manually
* Routing MCP tool based on the server: answer any question using the tools found for the user's request
### Coming soon
* Support for `.well-known/mcp.json`
* Crawler
* Nice frontend
* Hosted version
* Validate different levels of SSL certificates and integrate this info smarly to make sensitive MCP servers difficult to spoof
## Installation
```
export OPENAI_API_KEY=sk-...
docker network create highkey_network
docker compose build
docker compose -f docker-compose.yaml -f docker-compose-debug.yaml up --build --wait
```
## Getting started
You can connect the DARPEngine to an MCP Client (e.g. Claude Desktop or Cursor) using mcp tools provided. Just select SSE mode & specify `http://localhost:4689/sse` as the endpoint.
### Direct CLI use
Another way is to use CLI. Most of the scripts work with just standard Python libraries, but routing tool requires mcp package, you can install script requirements like this:
```
conda create -n darp 'python>=3.10'
conda activate darp
pip install -r mcp_server/requirements.txt
```
When installation is over we can use the scripts.
To begin with it we need to add some MCP servers to the engine, e.g:
```
$ python scripts/darp-add.py --url http://memelabs.ai:3006/sse --name code_analysis --description "Analyze gitlab repo for quality, topics, packages use"
```
Then we can make the requests:
```
$ python scripts/darp-search.py "Analyze https://github.com/BenderV/autochat"
Found 1 servers:
code_analysis
```
You can get more useful results with the routing tool:
```
$ python scripts/darp-router.py "Analyze https://github.com/BenderV/autochat"
assistant: [tool_calls]
[tool] ...
[tool] ...
[tool] ...
assistant:
### Code Quality
The code in the AutoChat repository demonstrates a good level of quality concerning **readability**, **maintainability**, and adherence to best practices:
- **Readability**: Consistent naming conventions are evident, with adherence to Python's PEP 8 styling guidelines. Descriptive function and variable names enhance the understanding of the code's purpose.
- **Maintainability**: The code is structured to allow easy updates and modifications, with a clear separation of concerns observed through the use of classes and methods handling distinct functionalities.
- **Best Practices**: Extensive use of exception handling, type annotations, and docstrings reflect best practices in Python development. Some TODO comments suggest areas for improvement, indicating that further attention is needed.
### Code Structure
The code is organized into multiple files and modules, each serving a distinct purpose:
- **Modular Design**: Various classes (e.g., `Autochat`, `Image`, `Message`) indicate a well-structured object-oriented design that promotes separation of concerns, making the code easier to navigate.
- **Logical Organization**: Files are logically separated based on functionality. For example, `chat.py` focuses on chat-related logic, while `model.py` handles message and image processing. The utility functions in `utils.py` enhance reusability.
- **Testing**: The presence of a test file (`tests/test_utils.py`) shows commitment to testing, crucial for code reliability. The use of `unittest` indicates a structured approach to testing individual components.
### Main Functionality
The code appears to be part of an **AutoChat package**, providing a framework for building conversational agents. Key functionalities include:
- **Chat Management**: The `Autochat` class acts as the main interface for managing conversations, handling message history, context, and interaction limits.
- **Message Handling**: Classes like `Message` and `MessagePart` enable structured message creation and processing, accommodating different message types, including text and images.
- **Functionality Extensions**: Methods like `add_tool` and `add_function` allow dynamic addition of tools and functions, facilitating customization of the chat experience.
- **Provider Integration**: Different API provider integrations (e.g., OpenAI, Anthropic) are encapsulated within respective classes, allowing flexibility in backend communication.
- **Utilities**: Utility functions offer additional capabilities such as CSV formatting and function parsing that support main chat operations.
Overall, the codebase is well-organized and showcases a thoughtful approach to developing a conversational AI framework. There is room for further refinement and enhancement, particularly in documentation and clarity of variable names.
### Library Usage
The project makes use of **AI libraries**, indicated by its functionality related to conversational agents and integration with AI service providers. This supports its ability to manage interactions with AI models efficiently.
### Summary
The AutoChat project is a chat system designed for communication with various AI models, primarily through the `Autochat` class, which manages conversations and supports complex message types, including text and images. The code is moderately complex due to its integration with external APIs and its ability to handle diverse interactions through extensible methods like `add_tool` and `add_function`. The quality of code is commendable, featuring a well-structured modular design that promotes readability and maintainability, although some areas require further documentation and refinement, such as clarifying variable names and enhancing comments. The organization into separate files for models, utilities, and tests aids development, but the utility functions could benefit from better categorization for improved clarity.
```
Of course, the usefulness of the result depends on the MCP servers you connect to the engine.
## Get help and support
Please feel free to connect with us using the [discussion section](https://github.com/hipasus/darp_engine/discussions).
## Contributing
Follow us on X: https://x.com/DARP_AI
## License
The DARPEngine codebase is under MIT license.
<br>
[x-image]: https://img.shields.io/twitter/follow/DARP_AI?style=social
[x-url]: https://x.com/DARP_AI
[black-image]: https://img.shields.io/badge/code%20style-black-000000.svg
[black-url]: https://github.com/psf/black
[imports-image]: https://img.shields.io/badge/%20imports-reorder_python_imports-%231674b1?style=flat&labelColor=ef8336
[imports-url]: https://github.com/asottile/reorder-python-imports/
[pydantic-image]: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json
[pydantic-url]: https://pydantic.dev
[pre-commit-image]: https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white
[pre-commit-url]: https://github.com/pre-commit/pre-commit
[license-image]: https://img.shields.io/github/license/DARPAI/darp_engine
[license-url]: https://opensource.org/licenses/MIT
```
--------------------------------------------------------------------------------
/common/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/common/llm/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/common/notifications/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/healthchecker/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/healthchecker/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/mcp_server/scripts/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/mcp_server/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/mcp_server/src/llm/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/database/models/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/deployments/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/search/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/servers/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/registry/src/validator/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/worker/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/worker/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/worker/scripts/worker-start.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env sh
set -e
python -m worker.src.main
```
--------------------------------------------------------------------------------
/healthchecker/scripts/worker-start.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env sh
set -e
python -m healthchecker.src
```
--------------------------------------------------------------------------------
/mcp_server/scripts/start.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
set -e
python3 -m mcp_server.src.main
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/python/__init__.py:
--------------------------------------------------------------------------------
```python
from .generic import PythonGenerator
__all__ = ["PythonGenerator"]
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/typescript/__init__.py:
--------------------------------------------------------------------------------
```python
from .generic import TypeScriptGenerator
__all__ = ["TypeScriptGenerator"]
```
--------------------------------------------------------------------------------
/mcp_server/requirements.txt:
--------------------------------------------------------------------------------
```
mcp==1.6.0
httpx==0.28.1
pydantic==2.11.1
pydantic-settings==2.8.1
openai==1.65.4
```
--------------------------------------------------------------------------------
/registry/src/database/models/base.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
```
--------------------------------------------------------------------------------
/common/notifications/exceptions.py:
--------------------------------------------------------------------------------
```python
class NotificationsServiceError(Exception):
"""Base notifications-service exception"""
```
--------------------------------------------------------------------------------
/worker/scripts/worker-start-debug.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env sh
set -e
pip install --upgrade -r requirements.txt
python -m worker.src.main
```
--------------------------------------------------------------------------------
/healthchecker/scripts/worker-start-debug.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env sh
set -e
pip install --upgrade -r requirements.txt
python -m healthchecker.src
```
--------------------------------------------------------------------------------
/mcp_server/scripts/start-debug.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
set -e
pip install --upgrade -r ./requirements.txt
python3 -m mcp_server.src.main
```
--------------------------------------------------------------------------------
/registry/scripts/start.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
set -e
alembic upgrade head
uvicorn --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app
```
--------------------------------------------------------------------------------
/registry/src/base_schema.py:
--------------------------------------------------------------------------------
```python
from pydantic import BaseModel
from pydantic import ConfigDict
class BaseSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
```
--------------------------------------------------------------------------------
/mcp_server/server.json:
--------------------------------------------------------------------------------
```json
{
"name": "darp_search_engine",
"description": "Search Engine for MCP servers",
"endpoint": "http://registry_mcp_server/sse",
"logo": null
}
```
--------------------------------------------------------------------------------
/registry/src/database/models/mixins/__init__.py:
--------------------------------------------------------------------------------
```python
from .has_created_at import HasCreatedAt
from .has_server_id import HasServerId
from .has_updated_at import HasUpdatedAt
__all__ = ["HasCreatedAt", "HasUpdatedAt", "HasServerId"]
```
--------------------------------------------------------------------------------
/registry/scripts/start-debug.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
set -e
pip install --upgrade -r requirements.txt
alembic upgrade head
uvicorn --reload --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app
```
--------------------------------------------------------------------------------
/mcp_server/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.11
WORKDIR /workspace
COPY mcp_server/requirements.txt .
RUN pip install -r requirements.txt
COPY common ./common
COPY mcp_server ./mcp_server
ENV PATH "$PATH:/workspace/mcp_server/scripts"
```
--------------------------------------------------------------------------------
/worker/src/constants.py:
--------------------------------------------------------------------------------
```python
from enum import StrEnum
class BaseImage(StrEnum):
python_3_10 = "python:3.10.17-slim"
python_3_11 = "python:3.11.12-slim"
python_3_12 = "python:3.12.10-slim"
node_lts = "node:lts-alpine"
```
--------------------------------------------------------------------------------
/worker/src/errors.py:
--------------------------------------------------------------------------------
```python
class ProcessingError(Exception):
def __init__(self, message: str | None = None):
self.message = message
class InvalidData(ProcessingError):
pass
class DependenciesError(ProcessingError):
pass
```
--------------------------------------------------------------------------------
/common/notifications/enums.py:
--------------------------------------------------------------------------------
```python
from enum import StrEnum
class NotificationEvent(StrEnum):
email_confirmation = "email-confirmation"
deploy_failed = "server-deploy-failed"
deploy_successful = "server-success-deploy"
server_healthcheck_failed = "server-healthcheck-failed"
```
--------------------------------------------------------------------------------
/registry/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.11
WORKDIR /workspace
COPY registry/requirements.txt .
RUN pip install -r requirements.txt
COPY alembic ./alembic
COPY alembic.ini ./alembic.ini
COPY common ./common
COPY registry ./registry
COPY worker ./worker
ENV PATH "$PATH:/workspace/registry/scripts"
```
--------------------------------------------------------------------------------
/registry/src/producer.py:
--------------------------------------------------------------------------------
```python
from aiokafka import AIOKafkaProducer
from registry.src.settings import settings
async def get_producer(url=settings.broker_url) -> AIOKafkaProducer:
return AIOKafkaProducer(
bootstrap_servers=url,
value_serializer=lambda m: m.model_dump_json().encode("utf-8"),
)
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
# pyproject.toml
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "--doctest-modules"
[tool.flake8]
ignore = ['E203', 'D203', 'E501', 'W503']
exclude = [
'.git',
'__pycache__',
'docs/source/conf.py',
'old',
'build',
'dist',
'.venv',
]
max-complexity = 25
```
--------------------------------------------------------------------------------
/registry/src/main.py:
--------------------------------------------------------------------------------
```python
from fastapi import FastAPI
from registry.src.deployments.router import router as deploy_router
from registry.src.servers.router import router as servers_router
app = FastAPI()
app.include_router(servers_router)
app.include_router(deploy_router)
@app.get("/healthcheck")
async def healthcheck():
return "Alive"
```
--------------------------------------------------------------------------------
/worker/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM docker:latest
RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev
WORKDIR /workspace
COPY worker/requirements.txt .
RUN pip config set global.break-system-packages true
RUN pip install -r requirements.txt
COPY registry ./registry
COPY worker ./worker
ENV PATH "$PATH:/workspace/worker/scripts"
```
--------------------------------------------------------------------------------
/registry/src/database/__init__.py:
--------------------------------------------------------------------------------
```python
from .models.base import Base
from .models.log import ServerLogs
from .models.server import Server
from .models.tool import Tool
from .session import get_session
from .session import get_unmanaged_session
__all__ = [
"Base",
"get_session",
"get_unmanaged_session",
"Server",
"Tool",
"ServerLogs",
]
```
--------------------------------------------------------------------------------
/mcp_server/src/llm/prompts.py:
--------------------------------------------------------------------------------
```python
default_prompt: str = (
"""
You are an expert in the topic of user's request.
Your assistant has prepared for you the tools that might be helpful to answer the request.
Your goal is to provide detailed, accurate information and analysis in response to user queries and if necessary to perform some actions using the tools.
""".strip()
)
```
--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_created_at.py:
--------------------------------------------------------------------------------
```python
from datetime import datetime
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr
@declarative_mixin
class HasCreatedAt:
@declared_attr
@classmethod
def created_at(cls):
return Column(DateTime, default=datetime.now, nullable=False)
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/__init__.py:
--------------------------------------------------------------------------------
```python
from .base import DockerfileGenerator
from .factory import get_dockerfile_generator
from .python import PythonGenerator
from .typescript import TypeScriptGenerator
from .user_provided import UserDockerfileGenerator
__all__ = [
"DockerfileGenerator",
"PythonGenerator",
"TypeScriptGenerator",
"UserDockerfileGenerator",
"get_dockerfile_generator",
]
```
--------------------------------------------------------------------------------
/healthchecker/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM docker:latest
RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev
WORKDIR /workspace
COPY registry/requirements.txt .
RUN pip config set global.break-system-packages true
RUN pip install -r requirements.txt
COPY common ./common
COPY registry ./registry
COPY worker ./worker
COPY healthchecker ./healthchecker
ENV PATH "$PATH:/workspace/healthchecker/scripts"
```
--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_updated_at.py:
--------------------------------------------------------------------------------
```python
from datetime import datetime
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr
@declarative_mixin
class HasUpdatedAt:
@declared_attr
@classmethod
def updated_at(cls):
return Column(
DateTime, onupdate=datetime.now, default=datetime.now, nullable=False
)
```
--------------------------------------------------------------------------------
/mcp_server/scripts/mcp_health_check.py:
--------------------------------------------------------------------------------
```python
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp_server.src.settings import settings
async def healthcheck():
async with sse_client(f"http://localhost:{settings.mcp_port}/sse") as (
read,
write,
):
async with ClientSession(read, write) as session:
await session.initialize()
if __name__ == "__main__":
asyncio.run(healthcheck())
```
--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_server_id.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy import Column
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr
@declarative_mixin
class HasServerId:
@declared_attr
def server_id(cls):
return Column(
Integer,
ForeignKey("server.id", ondelete="CASCADE"),
nullable=False,
primary_key=True,
)
```
--------------------------------------------------------------------------------
/registry/src/database/models/log.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from .base import Base
from .mixins import HasCreatedAt
from .mixins import HasServerId
from .mixins import HasUpdatedAt
class ServerLogs(HasCreatedAt, HasUpdatedAt, HasServerId, Base):
__tablename__ = "server_logs"
deployment_logs: Mapped[list[dict]] = mapped_column(
JSONB(), nullable=False, server_default="'[]'::jsonb"
)
```
--------------------------------------------------------------------------------
/registry/src/logger.py:
--------------------------------------------------------------------------------
```python
import logging
from .settings import settings
logger = logging.getLogger("registry")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
settings.log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(settings.log_dir / "registry.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
```
--------------------------------------------------------------------------------
/registry/src/types.py:
--------------------------------------------------------------------------------
```python
from enum import auto
from enum import StrEnum
class HostType(StrEnum):
internal = auto()
external = auto()
class ServerStatus(StrEnum):
active = auto()
inactive = auto()
in_processing = auto()
stopped = auto()
processing_failed = auto()
class TaskType(StrEnum):
deploy = auto()
start = auto()
stop = auto()
class ServerTransportProtocol(StrEnum):
SSE = "SSE"
STREAMABLE_HTTP = "STREAMABLE_HTTP"
class RoutingMode(StrEnum):
auto = "auto"
deepresearch = "deepresearch"
```
--------------------------------------------------------------------------------
/mcp_server/src/logger.py:
--------------------------------------------------------------------------------
```python
import logging
from mcp_server.src.settings import settings
logger = logging.getLogger("registry_mcp")
logger.setLevel(settings.log_level)
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
settings.log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(settings.log_dir / "registry_mcp.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_12_1404-735603f5e4d5_creator_id.py:
--------------------------------------------------------------------------------
```python
"""creator_id
Revision ID: 735603f5e4d5
Revises: 1c6edbdb4642
Create Date: 2025-05-12 14:04:17.438611
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '735603f5e4d5'
down_revision = '1c6edbdb4642'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('creator_id', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'creator_id')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_28_0632-f2298e6acd39_base_image.py:
--------------------------------------------------------------------------------
```python
"""base_image
Revision ID: f2298e6acd39
Revises: 7e8c31ddb2db
Create Date: 2025-05-28 06:32:44.781544
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f2298e6acd39'
down_revision = 'aa9a14a698c5'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('base_image', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'base_image')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_30_1231-4b5c0d56e1ca_build_instructions.py:
--------------------------------------------------------------------------------
```python
"""build_instructions
Revision ID: 4b5c0d56e1ca
Revises: f2298e6acd39
Create Date: 2025-05-30 12:31:08.515098
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4b5c0d56e1ca'
down_revision = 'f2298e6acd39'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('build_instructions', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'build_instructions')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/worker/src/main.py:
--------------------------------------------------------------------------------
```python
import asyncio
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from registry.src.logger import logger
from registry.src.settings import settings
from worker.src.consumer import Consumer
if __name__ == "__main__":
try:
admin = KafkaAdminClient(bootstrap_servers=settings.broker_url)
worker_topic = NewTopic(
name=settings.worker_topic, num_partitions=1, replication_factor=1
)
admin.create_topics([worker_topic])
except TopicAlreadyExistsError:
pass
consumer = Consumer()
logger.info("Starting consumer")
asyncio.run(consumer.start())
```
--------------------------------------------------------------------------------
/mcp_server/src/registry_client.py:
--------------------------------------------------------------------------------
```python
from httpx import AsyncClient
from mcp_server.src.schemas import RegistryServer
from mcp_server.src.settings import settings
class RegistryClient:
def __init__(self):
self.client = AsyncClient(base_url=settings.registry_url)
self.timeout = 90
async def search(self, request: str) -> list[RegistryServer]:
response = await self.client.get(
"/servers/search", params=dict(query=request), timeout=self.timeout
)
assert (
response.status_code == 200
), f"{response.status_code=} {response.content=}"
data = response.json()
return [RegistryServer.model_validate(server) for server in data]
```
--------------------------------------------------------------------------------
/mcp_server/src/schemas.py:
--------------------------------------------------------------------------------
```python
from typing import Any
from openai.types.chat import ChatCompletionMessage
from openai.types.chat import ChatCompletionToolMessageParam
from pydantic import BaseModel
from pydantic import ConfigDict
class BaseSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
class Tool(BaseSchema):
name: str
description: str
input_schema: dict[str, Any]
class RegistryServer(BaseSchema):
name: str
description: str
url: str
logo: str | None
id: int
tools: list[Tool]
class ToolInfo(BaseSchema):
tool_name: str
server: RegistryServer
class RoutingResponse(BaseSchema):
conversation: list[ChatCompletionMessage | ChatCompletionToolMessageParam]
```
--------------------------------------------------------------------------------
/alembic/versions/2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py:
--------------------------------------------------------------------------------
```python
"""Add status to servers.
Revision ID: 1c6edbdb4642
Revises: 49e1ab420276
Create Date: 2025-04-22 08:54:20.892397
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "1c6edbdb4642"
down_revision = "49e1ab420276"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"server",
sa.Column(
"status",
sa.String(),
nullable=True,
),
)
op.execute("""UPDATE server SET status = 'active'""")
op.alter_column("server", "status", nullable=False)
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("server", "status")
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/alembic/versions/2025_04_30_1049-387bed3ce6ae_server_title.py:
--------------------------------------------------------------------------------
```python
"""server title
Revision ID: 387bed3ce6ae
Revises: 49e1ab420276
Create Date: 2025-04-30 10:49:34.277571
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '387bed3ce6ae'
down_revision = '7a3913a3bb4c'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('title', sa.String(), nullable=True))
op.execute("""UPDATE server SET title = name""")
op.alter_column("server", "title", nullable=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'title')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/registry/src/search/prompts.py:
--------------------------------------------------------------------------------
```python
get_top_servers: str = (
"""
You are part of MCP search engine. You are given a list of MCP server descriptions and a user's query. Your task is to select a minimal list of MCP servers that have enough tools to satisfy the query. Results of this request will be returned to the user's system which will make a separate request to LLM with the selected tools.
Consider if it is possible to satisfy the request in one step or in several steps. Consider if some tools don't have enough data if they are enough to completely perform one step or if you need to select several alternatives for the step. It is important that you follow the output format precisely.
Input data for you:
```
mcp_servers = {servers_list!r}
user_request = {request!r}
```
""".strip()
)
```
--------------------------------------------------------------------------------
/alembic/versions/2025_06_17_1353-667874dd7dee_added_alias.py:
--------------------------------------------------------------------------------
```python
"""added llm_tool_name
Revision ID: 667874dd7dee
Revises: c98f44cea12f
Create Date: 2025-06-17 13:53:18.588902
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '667874dd7dee'
down_revision = 'c98f44cea12f'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('tool', sa.Column('alias', sa.String(), nullable=True))
op.execute("""
UPDATE tool
SET alias = tool.name || '__' || server.name
FROM server
WHERE server.url = tool.server_url;
""")
op.alter_column("tool", "alias", nullable=False)
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('tool', 'alias')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/alembic/versions/2025_04_25_1158-49e1ab420276_host_type.py:
--------------------------------------------------------------------------------
```python
"""host_type
Revision ID: 49e1ab420276
Revises: e25328dbfe80
Create Date: 2025-04-25 11:58:17.768233
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49e1ab420276'
down_revision = 'e25328dbfe80'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('host_type', sa.String(), nullable=True))
op.execute("UPDATE server SET host_type = 'internal'")
op.alter_column("server", "host_type", nullable=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'host_type')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/mcp_server/src/main.py:
--------------------------------------------------------------------------------
```python
from mcp.server.fastmcp import FastMCP
from .schemas import RoutingResponse
from mcp_server.src.settings import settings
from mcp_server.src.tools import Tools
mcp: FastMCP = FastMCP(settings.server_json.name, port=settings.mcp_port)
tools: Tools = Tools()
@mcp.tool()
async def search_urls(request: str) -> list[str]:
"""Return URLs of the best MCP servers for processing the given request."""
return await tools.search(request=request)
@mcp.tool()
async def routing(
request: str,
) -> str:
"""Respond to any user request using MCP tools selected specifically for it."""
response = await tools.routing(request=request)
return RoutingResponse(conversation=response).model_dump_json()
if __name__ == "__main__":
mcp.run(transport="sse")
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py:
--------------------------------------------------------------------------------
```python
"""Add transport-protocol column to servers
Revision ID: c98f44cea12f
Revises: f2298e6acd39
Create Date: 2025-05-31 11:56:05.469429
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "c98f44cea12f"
down_revision = "4b5c0d56e1ca"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("server", sa.Column("transport_protocol", sa.String(), nullable=True))
op.execute("""UPDATE server SET transport_protocol = 'SSE' WHERE status = 'active'""")
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("server", "transport_protocol")
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/registry/requirements.txt:
--------------------------------------------------------------------------------
```
alembic==1.15.1
annotated-types==0.7.0
anyio==4.9.0
asyncpg==0.30.0
certifi==2025.1.31
cfgv==3.4.0
chardet==5.2.0
charset-normalizer==3.4.1
click==8.1.8
distlib==0.3.9
fastapi==0.115.11
fastapi-pagination==0.12.34
filelock==3.18.0
greenlet==3.1.1
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
identify==2.6.9
idna==3.10
instructor==1.7.3
Mako==1.3.9
MarkupSafe==3.0.2
mcp==1.9.2
nodeenv==1.9.1
openai==1.65.4
platformdirs==4.3.7
pre_commit==4.2.0
psycopg2-binary==2.9.10
pydantic==2.10.6
pydantic-settings==2.8.1
pydantic_core==2.27.2
cryptography==44.0.2
python-dotenv==1.0.1
PyYAML==6.0.2
requests==2.32.3
setuptools==75.8.0
sniffio==1.3.1
SQLAlchemy==2.0.39
sse-starlette==2.2.1
starlette==0.46.1
typing_extensions==4.12.2
urllib3==2.3.0
uvicorn==0.34.0
virtualenv==20.29.3
wheel==0.45.1
jinja2==3.1.6
aiokafka==0.12.0
```
--------------------------------------------------------------------------------
/alembic/versions/2025_06_07_1322-aa9a14a698c5_jsonb_logs.py:
--------------------------------------------------------------------------------
```python
"""jsonb_logs
Revision ID: aa9a14a698c5
Revises: 7e8c31ddb2db
Create Date: 2025-06-07 13:22:23.229230
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'aa9a14a698c5'
down_revision = '7e8c31ddb2db'
branch_labels = None
depends_on = None
def upgrade():
op.drop_column('server_logs', 'deployment_logs')
op.add_column('server_logs', sa.Column(
'deployment_logs',
postgresql.JSONB(),
nullable=False,
server_default=sa.text("'[]'::jsonb")
))
def downgrade():
op.drop_column('server_logs', 'deployment_logs')
op.add_column('server_logs', sa.Column(
'deployment_logs',
sa.VARCHAR(),
nullable=False,
server_default=sa.text("'[]'::character varying")
))
```
--------------------------------------------------------------------------------
/worker/requirements.txt:
--------------------------------------------------------------------------------
```
alembic==1.15.1
annotated-types==0.7.0
anyio==4.9.0
asyncpg==0.30.0
certifi==2025.1.31
cfgv==3.4.0
chardet==5.2.0
charset-normalizer==3.4.1
click==8.1.8
distlib==0.3.9
fastapi==0.115.11
fastapi-pagination==0.12.34
filelock==3.18.0
greenlet==3.1.1
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
identify==2.6.9
idna==3.10
instructor==1.7.3
Mako==1.3.9
MarkupSafe==3.0.2
mcp==1.9.2
nodeenv==1.9.1
openai==1.65.4
platformdirs==4.3.7
pre_commit==4.2.0
psycopg2-binary==2.9.10
pydantic==2.10.6
pydantic-settings==2.8.1
pydantic_core==2.27.2
cryptography==44.0.2
python-dotenv==1.0.1
PyYAML==6.0.2
requests==2.32.3
setuptools==75.8.0
sniffio==1.3.1
SQLAlchemy==2.0.39
sse-starlette==2.2.1
starlette==0.46.1
typing_extensions==4.12.2
urllib3==2.3.0
uvicorn==0.34.0
virtualenv==20.29.3
wheel==0.45.1
jinja2==3.1.6
aiokafka==0.12.0
kafka-python==2.0.3
```
--------------------------------------------------------------------------------
/alembic/versions/2025_04_09_1117-e25328dbfe80_set_logo_to_none.py:
--------------------------------------------------------------------------------
```python
"""set logo to none
Revision ID: e25328dbfe80
Revises: ffe7a88d5b15
Create Date: 2025-04-09 11:17:59.990186
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e25328dbfe80'
down_revision = 'ffe7a88d5b15'
branch_labels = None
depends_on = None
def upgrade():
op.execute("UPDATE server SET logo = NULL WHERE logo = ''")
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('server', 'logo',
existing_type=sa.VARCHAR(),
nullable=True)
# ### end Alembic commands ###
def downgrade():
op.execute("UPDATE server SET logo = '' WHERE logo IS NULL")
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('server', 'logo',
existing_type=sa.VARCHAR(),
nullable=False)
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/registry/src/validator/prompts.py:
--------------------------------------------------------------------------------
```python
from jinja2 import Template
SENS_TOPIC_CHECK_SYSTEM_PROMPT: Template = Template(
"""
Analyze the description of an MCP server and its tools.
Your task is to determine whether the server likely works with sensitive data.
Consider the following rules:
1. Sensitive data indicators:
• Mentions of emails, phone numbers, SSNs, credit card numbers, access tokens, passwords, or other personal identifiable information (PII).
• Tools or functionality related to authentication, authorization, payments, identity verification, document uploads, etc.
2. Sensitive domains:
• Banking
• Finance
• Government services
• Or any tool that may require sending sensitive user data
"""
)
SENS_TOPIC_CHECK_USER_PROMPT: Template = Template(
"""
'''
Server's data: {{ server_data.model_dump_json() }}
--------------
Server's tools: {{ tools_data.model_dump_json() }}
'''
"""
)
```
--------------------------------------------------------------------------------
/registry/src/validator/schemas.py:
--------------------------------------------------------------------------------
```python
from enum import Enum
from pydantic import BaseModel
from pydantic import Field
from pydantic import RootModel
from registry.src.servers.schemas import ServerCreate
from registry.src.servers.schemas import Tool
class SSLAuthorityLevel(Enum):
NO_CERTIFICATE = 0
INVALID_CERTIFICATE = 1
SELF_SIGNED_CERTIFICATE = 2
CERTIFICATE_OK = 3
EXTENDED_CERTIFICATE = 4
class ServerNeedsSensitiveDataResponse(BaseModel):
reasoning: list[str] = Field(
...,
description="For each argument of each tool describe if you can send sensitive data to this argument, and what kind of it",
)
server_needs_sensitive_data: bool
class ServerWithTools(ServerCreate):
tools: list[Tool]
class ValidationResult(BaseModel):
server_requires_sensitive_data: bool = False
authority_level: SSLAuthorityLevel = SSLAuthorityLevel.NO_CERTIFICATE
Tools = RootModel[list[Tool]]
```
--------------------------------------------------------------------------------
/registry/src/validator/constants.py:
--------------------------------------------------------------------------------
```python
EV_OIDS: list[str] = [
"2.16.756.5.14.7.4.8",
"2.16.156.112554.3",
"2.16.840.1.114028.10.1.2",
"1.3.6.1.4.1.4788.2.202.1",
"2.16.840.1.114404.1.1.2.4.1",
"2.16.840.1.114413.1.7.23.3",
"2.16.840.1.114412.2.1",
"1.3.6.1.4.1.14777.6.1.2",
"2.16.578.1.26.1.3.3",
"1.3.6.1.4.1.14777.6.1.1",
"2.23.140.1.1",
"1.3.6.1.4.1.7879.13.24.1",
"1.3.6.1.4.1.40869.1.1.22.3",
"1.3.159.1.17.1",
"1.2.616.1.113527.2.5.1.1",
"2.16.840.1.114414.1.7.23.3",
"1.3.6.1.4.1.34697.2.1",
"2.16.756.1.89.1.2.1.1",
"1.3.6.1.4.1.6449.1.2.1.5.1",
"1.3.6.1.4.1.34697.2.3",
"1.3.6.1.4.1.13769.666.666.666.1.500.9.1",
"1.3.6.1.4.1.34697.2.2",
"1.2.156.112559.1.1.6.1",
"1.3.6.1.4.1.8024.0.2.100.1.2",
"1.3.6.1.4.1.34697.2.4",
"1.2.392.200091.100.721.1",
]
SELF_SIGNED_CERTIFICATE_ERR_CODE: int = 18
HTTPS_PORT: int = 443
HTTP_PORT: int = 80
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_27_0757-7e8c31ddb2db_json_logs.py:
--------------------------------------------------------------------------------
```python
"""json_logs
Revision ID: 7e8c31ddb2db
Revises: 7a3913a3bb4c
Create Date: 2025-05-27 07:57:51.044585
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '7e8c31ddb2db'
down_revision = '387bed3ce6ae'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.execute("UPDATE server_logs SET deployment_logs = '[]'")
op.alter_column(
"server_logs",
"deployment_logs",
server_default="[]",
existing_server_default=""
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"server_logs",
"deployment_logs",
server_default="",
existing_server_default="[]"
)
op.execute("UPDATE server_logs SET deployment_logs = ''")
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_13_1326-4b751cb703e2_timestamps.py:
--------------------------------------------------------------------------------
```python
"""timestamps
Revision ID: 4b751cb703e2
Revises: 3b8b8a5b99ad
Create Date: 2025-05-13 13:26:42.259154
"""
from datetime import datetime
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4b751cb703e2'
down_revision = '3b8b8a5b99ad'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('server', sa.Column('created_at', sa.DateTime(), nullable=True))
op.add_column('server', sa.Column('updated_at', sa.DateTime(), nullable=True))
current_time = f"timestamp '{datetime.now().isoformat()}'"
op.execute(f"""UPDATE server SET created_at = {current_time}, updated_at = {current_time}""")
op.alter_column("server", "created_at", nullable=False)
op.alter_column("server", "updated_at", nullable=False)
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('server', 'updated_at')
op.drop_column('server', 'created_at')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/registry/src/database/models/tool.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy import ForeignKey
from sqlalchemy import String
from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from ...database import models
from .base import Base
class Tool(Base):
__tablename__ = "tool"
__table_args__ = (
UniqueConstraint("name", "server_url", name="uq_tool_name_server_url"),
)
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(), nullable=False)
description: Mapped[str] = mapped_column(String(), nullable=False)
input_schema: Mapped[dict] = mapped_column(JSONB, nullable=False)
alias: Mapped[str] = mapped_column(String(), nullable=False)
server_url: Mapped[str] = mapped_column(
ForeignKey("server.url", ondelete="CASCADE")
)
server: Mapped["models.server.Server"] = relationship(back_populates="tools")
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_15_1414-7a3913a3bb4c_logs.py:
--------------------------------------------------------------------------------
```python
"""logs
Revision ID: 7a3913a3bb4c
Revises: 4b751cb703e2
Create Date: 2025-05-15 14:14:22.223452
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7a3913a3bb4c'
down_revision = '4b751cb703e2'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'server_logs',
sa.Column('deployment_logs', sa.String(), server_default='', nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('server_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['server_id'], ['server.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('server_id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('server_logs')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/worker/src/schemas.py:
--------------------------------------------------------------------------------
```python
from datetime import datetime
from enum import auto
from enum import StrEnum
from pydantic import Field
from pydantic import field_serializer
from registry.src.base_schema import BaseSchema
from registry.src.types import TaskType
class Task(BaseSchema):
task_type: TaskType
server_id: int
class LogLevel(StrEnum):
info = auto()
warning = auto()
error = auto()
class LoggingEvent(StrEnum):
command_start = auto()
command_result = auto()
general_message = auto()
class CommandStart(BaseSchema):
command: str
class CommandResult(BaseSchema):
output: str
success: bool
class GeneralLogMessage(BaseSchema):
log_level: LogLevel
message: str
class LogMessage(BaseSchema):
event_type: LoggingEvent
data: CommandStart | CommandResult | GeneralLogMessage
timestamp: datetime = Field(default_factory=datetime.now)
@field_serializer("timestamp")
def serialize_datetime(self, value: datetime, _info) -> str:
return value.isoformat()
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/user_provided.py:
--------------------------------------------------------------------------------
```python
from typing import override
from uuid import uuid4
from ..user_logger import UserLogger
from .base import DockerfileGenerator
from worker.src.constants import BaseImage
class UserDockerfileGenerator(DockerfileGenerator):
def __init__(
self,
base_image: BaseImage,
repo_folder: str,
user_logger: UserLogger,
build_instructions: str,
) -> None:
self.build_instructions = build_instructions
super().__init__(
base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
)
@override
async def generate_dockerfile(self) -> str:
await self._image_setup()
await self._code_setup()
return self._save_dockerfile()
@override
async def _code_setup(self) -> None:
heredoc_identifier = str(uuid4()).replace("-", "")
lines = [
"COPY ./ ./",
f"RUN <<{heredoc_identifier}",
self.build_instructions,
heredoc_identifier,
]
self._add_to_file(lines)
```
--------------------------------------------------------------------------------
/mcp_server/src/settings.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from pydantic import BaseModel
from pydantic import Field
from pydantic import field_validator
from pydantic_settings import BaseSettings
from pydantic_settings import SettingsConfigDict
parent_folder = Path(__file__).parent.parent
class ServerJson(BaseModel):
name: str
description: str
url: str = Field(alias="endpoint")
logo: str | None = None
@field_validator("url") # noqa
@classmethod
def url_ignore_anchor(cls, value: str) -> str:
return value.split("#")[0]
class Settings(BaseSettings):
model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True)
registry_url: str = "http://registry:80"
log_dir: Path = Path("logs")
mcp_port: int = 80
llm_proxy: str | None = None
openai_base_url: str = "https://openrouter.ai/api/v1"
openai_api_key: str
llm_model: str = "openai/gpt-4o-mini"
server_json: ServerJson = ServerJson.model_validate_json(
Path(parent_folder / "server.json").read_text()
)
log_level: str = "INFO"
settings = Settings()
```
--------------------------------------------------------------------------------
/registry/src/database/session.py:
--------------------------------------------------------------------------------
```python
from collections.abc import AsyncGenerator
from typing import Any
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from registry.src.errors import FastApiError
from registry.src.logger import logger
from registry.src.settings import settings
async_engine = create_async_engine(
settings.database_url_async,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_pre_ping=True,
)
session_maker = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async def get_unmanaged_session() -> AsyncSession:
return session_maker()
async def get_session() -> AsyncGenerator[AsyncSession, Any]:
session = session_maker()
try:
yield session
await session.commit()
except FastApiError as error:
await session.rollback()
raise error
except Exception as error:
logger.error(f"Encountered an exception while processing request:\n{error}")
await session.rollback()
raise
finally:
await session.close()
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/factory.py:
--------------------------------------------------------------------------------
```python
from ..constants import BaseImage
from ..user_logger import UserLogger
from .base import DockerfileGenerator
from .python import PythonGenerator
from .typescript import TypeScriptGenerator
from .user_provided import UserDockerfileGenerator
async def get_dockerfile_generator(
base_image: BaseImage,
repo_folder: str,
build_instructions: str | None,
user_logger: UserLogger,
) -> DockerfileGenerator:
if build_instructions:
await user_logger.info(message="Build instructions found.")
return UserDockerfileGenerator(
base_image=base_image,
repo_folder=repo_folder,
build_instructions=build_instructions,
user_logger=user_logger,
)
await user_logger.warning(
message="No build instructions found. Attempting to generate from scratch."
)
if base_image == BaseImage.node_lts:
return TypeScriptGenerator(
base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
)
else:
return PythonGenerator(
base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
)
```
--------------------------------------------------------------------------------
/alembic/versions/2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py:
--------------------------------------------------------------------------------
```python
"""deployment fields
Revision ID: 3b8b8a5b99ad
Revises: 735603f5e4d5
Create Date: 2025-05-13 06:34:52.115171
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b8b8a5b99ad'
down_revision = '735603f5e4d5'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('server', sa.Column('repo_url', sa.String(), nullable=True))
op.add_column('server', sa.Column('command', sa.String(), nullable=True))
op.alter_column('server', 'url',
existing_type=sa.VARCHAR(),
nullable=True)
op.create_unique_constraint("server_repo_url_unique", 'server', ['repo_url'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint("server_repo_url_unique", 'server', type_='unique')
op.alter_column('server', 'url',
existing_type=sa.VARCHAR(),
nullable=False)
op.drop_column('server', 'command')
op.drop_column('server', 'repo_url')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/registry/src/search/schemas.py:
--------------------------------------------------------------------------------
```python
from pydantic import Field
from registry.src.base_schema import BaseSchema
class SearchServerURL(BaseSchema):
url: str
class SearchServer(SearchServerURL):
id: int
name: str
description: str
class SolutionStep(BaseSchema):
step_description: str
best_server_description: str = Field(
...,
description="Description of the best server for this step, copy this from `mcp_servers` entry",
)
best_server_name: str = Field(
...,
description="Name of the best server for this step, copy this from `mcp_servers` entry",
)
best_server: SearchServerURL = Field(
..., description="The best server for this step"
)
confidence: str = Field(
...,
description="How confident you are that this server is enough for this step?",
)
additional_servers: list[SearchServerURL] = Field(
...,
description="Alternative servers if you think the `best_server` may not be enough",
)
class SearchResponse(BaseSchema):
solution_steps: list[SolutionStep] = Field(
..., description="List of solution steps and servers for each step"
)
```
--------------------------------------------------------------------------------
/scripts/darp-add.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Add new MCP servers to the registry
"""
import argparse
import json
import requests
default_registry_url: str = "http://localhost:80"
def main() -> None:
parser = argparse.ArgumentParser(description="Add a new MCP server to the registry")
parser.add_argument("--url", required=True, help="Endpoint URL for the server")
parser.add_argument("--name", required=True, help="Unique server name")
parser.add_argument("--description", required=True, help="Server description")
parser.add_argument("--logo", default=None, help="Logo URL")
parser.add_argument("--verbose", action="store_true", help="Verbose output")
parser.add_argument(
"--registry-url", default=default_registry_url, help="Registry API endpoint URL"
)
args = parser.parse_args()
server_data = {
"name": args.name,
"description": args.description,
"url": args.url,
"logo": args.logo,
}
response = requests.post(f"{args.registry_url}/servers/", json=server_data)
response.raise_for_status()
if args.verbose:
print(json.dumps(response.json(), indent=2))
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/registry/src/deployments/schemas.py:
--------------------------------------------------------------------------------
```python
from datetime import datetime
from typing import Annotated
from pydantic import Field
from pydantic import field_serializer
from pydantic import HttpUrl
from registry.src.base_schema import BaseSchema
from registry.src.servers.schemas import server_name_regex_pattern
from registry.src.types import HostType
from registry.src.types import ServerStatus
from worker.src.constants import BaseImage
from worker.src.schemas import LogMessage
class ServerLogsSchema(BaseSchema):
server_id: int
deployment_logs: list[LogMessage]
created_at: datetime
updated_at: datetime
class DeploymentCreateData(BaseSchema):
name: Annotated[str, Field(max_length=30, pattern=server_name_regex_pattern)]
title: str = Field(max_length=30)
description: str
repo_url: HttpUrl
command: str | None = None
logo: str | None = None
base_image: BaseImage | None = None
build_instructions: str | None = None
host_type: HostType = HostType.internal
status: ServerStatus = ServerStatus.in_processing
@field_serializer("repo_url")
def serialize_url(self, repo_url: HttpUrl) -> str:
return str(repo_url)
class DeploymentCreate(BaseSchema):
current_user_id: str
data: DeploymentCreateData
class UserId(BaseSchema):
current_user_id: str
```
--------------------------------------------------------------------------------
/registry/src/errors.py:
--------------------------------------------------------------------------------
```python
from fastapi import HTTPException
from fastapi import status
class FastApiError(HTTPException):
def __init__(self, message: str, **kwargs) -> None:
self.detail = {"message": message, **kwargs}
class InvalidServerNameError(FastApiError):
status_code: int = status.HTTP_400_BAD_REQUEST
def __init__(self, name: str) -> None:
message = "Name must have only letters, digits, underscore. Name must not start with digits."
super().__init__(message=message, name=name)
class ServerAlreadyExistsError(FastApiError):
status_code: int = status.HTTP_400_BAD_REQUEST
def __init__(self, dict_servers: list[dict]) -> None:
servers_str = ", ".join(server["name"] for server in dict_servers)
message = f"Server already exists: {servers_str}"
super().__init__(message=message, servers=dict_servers)
class ServersNotFoundError(FastApiError):
status_code = status.HTTP_404_NOT_FOUND
def __init__(self, ids: list[int]) -> None:
super().__init__(message=f"Server(s) not found: {ids}", ids=ids)
class NotAllowedError(FastApiError):
status_code = status.HTTP_403_FORBIDDEN
class InvalidData(FastApiError):
status_code = status.HTTP_400_BAD_REQUEST
class RemoteServerError(FastApiError):
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/typescript/generic.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from typing import override
from ...errors import DependenciesError
from ..base import DockerfileGenerator
class TypeScriptGenerator(DockerfileGenerator):
@override
async def _dependencies_installation(self) -> None:
repo_path = Path(self.repo_folder)
package_json = self._find_file(repo_path, "package.json")
if not package_json:
await self.user_logger.error(message="Error: no package.json found")
raise DependenciesError("No package.json found")
await self.user_logger.info(message="package.json found. Adding to Dockerfile")
package_json_folder = self._relative_path(package_json.parent.absolute())
lines = [
f"COPY {package_json_folder}/package*.json ./",
]
ts_config = self._find_file(repo_path, "tsconfig.json")
if ts_config:
await self.user_logger.info(
message="tsconfig.json found. Adding to Dockerfile"
)
lines.append(f"COPY {self._relative_path(ts_config.absolute())} ./")
lines.append("RUN npm install --ignore-scripts")
self._add_to_file(lines)
@override
async def _code_setup(self) -> None:
lines = [
"COPY ./ ./",
"RUN npm run build",
]
self._add_to_file(lines)
```
--------------------------------------------------------------------------------
/docker-compose-debug.yaml:
--------------------------------------------------------------------------------
```yaml
services:
registry:
image: registry
pull_policy: never
restart: unless-stopped
ports:
- "${REGISTRY_PORT:-80}:80"
profiles:
- ''
volumes:
- ./registry:/workspace/registry
- ./alembic:/workspace/alembic
- ./common:/workspace/common
- ./worker:/workspace/worker
command: start-debug.sh
registry_mcp_server:
image: registry_mcp_server
pull_policy: never
restart: unless-stopped
ports:
- "${REGISTRY_MCP_PORT:-4689}:80"
profiles:
- ''
volumes:
- ./mcp_server:/workspace/mcp_server
- ./common:/workspace/common
command: start-debug.sh
registry_deploy_worker:
pull_policy: never
profiles:
- ''
volumes:
- ./worker:/workspace/worker
- ./registry:/workspace/registry
restart: unless-stopped
command: worker-start-debug.sh
portal_zookeeper:
profiles:
- ''
restart: unless-stopped
portal_kafka:
profiles:
- ''
restart: unless-stopped
registry_postgres:
ports:
- "${REGISTRY_POSTGRES_PORT:-5432}:5432"
profiles:
- ''
restart: unless-stopped
registry_healthchecker:
pull_policy: never
profiles:
- ''
volumes:
- ./registry:/workspace/registry
- ./healthchecker:/workspace/healthchecker
restart: unless-stopped
command: worker-start-debug.sh
```
--------------------------------------------------------------------------------
/registry/src/settings.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from pydantic_settings import BaseSettings
from pydantic_settings import SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True)
postgres_user: str
postgres_db: str
postgres_password: str
postgres_host: str
postgres_port: int = 5432
db_pool_size: int = 50
db_max_overflow: int = 25
log_dir: Path = Path("logs")
llm_proxy: str | None = None
openai_api_key: str
llm_model: str = "openai/gpt-4o-mini"
openai_base_url: str = "https://openrouter.ai/api/v1"
worker_topic: str = "deployment"
deployment_server: str = ""
deployment_network: str = ""
broker_url: str = "portal_kafka:19092"
dockerhub_login: str = ""
dockerhub_password: str = ""
registry_url: str = "http://registry/"
healthcheck_running_interval: int = 3600
tool_retry_count: int = 60
tool_wait_interval: int = 5
deep_research_server_name: str = "deepresearch_darpserver"
@property
def database_url(self) -> str:
auth_data = f"{self.postgres_user}:{self.postgres_password}"
host = f"{self.postgres_host}:{self.postgres_port}"
return f"{auth_data}@{host}/{self.postgres_db}"
@property
def database_url_sync(self) -> str:
return f"postgresql+psycopg2://{self.database_url}"
@property
def database_url_async(self) -> str:
return f"postgresql+asyncpg://{self.database_url}"
settings = Settings()
```
--------------------------------------------------------------------------------
/mcp_server/src/decorators.py:
--------------------------------------------------------------------------------
```python
import functools
import inspect
import sys
import traceback
from collections.abc import Awaitable
from collections.abc import Callable
from collections.abc import Coroutine
from typing import Any
from typing import overload
from typing import ParamSpec
from typing import TypeVar
from mcp_server.src.logger import logger
X = TypeVar("X")
P = ParamSpec("P")
@overload
def log_errors( # noqa
func: Callable[P, Coroutine[Any, Any, X]]
) -> Callable[P, Coroutine[Any, Any, X]]: ...
@overload
def log_errors(func: Callable[P, X]) -> Callable[P, X]: ... # noqa
def log_errors(func: Callable[P, Any]) -> Callable[P, Any]:
def log_error(args: tuple, kwargs: dict) -> None:
info = sys.exc_info()[2]
assert (
info is not None and info.tb_next is not None
), f"No traceback available {sys.exc_info()[2]=}"
locals_vars = info.tb_frame.f_locals
logger.error(f"{traceback.format_exc()} \n{locals_vars=} \n{args=} \n{kwargs=}")
@functools.wraps(func)
def sync_wrapped(*args: P.args, **kwargs: P.kwargs) -> Any:
try:
return func(*args, **kwargs)
except Exception:
log_error(args, kwargs)
raise
@functools.wraps(func)
async def async_wrapped(*args: P.args, **kwargs: P.kwargs) -> Awaitable[Any]:
try:
return await func(*args, **kwargs)
except Exception:
log_error(args, kwargs)
raise
if inspect.iscoroutinefunction(func):
return async_wrapped
return sync_wrapped
```
--------------------------------------------------------------------------------
/alembic/versions/2025_03_24_0945-ffe7a88d5b15_initial_migration.py:
--------------------------------------------------------------------------------
```python
"""Initial migration
Revision ID: ffe7a88d5b15
Revises:
Create Date: 2025-03-24 09:45:42.840081
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'ffe7a88d5b15'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('server',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('url', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('logo', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name'),
sa.UniqueConstraint('url')
)
op.create_table('tool',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('input_schema', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('server_url', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['server_url'], ['server.url'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'server_url', name='uq_tool_name_server_url')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('tool')
op.drop_table('server')
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/healthchecker/src/__main__.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from common.notifications.client import NotificationsClient
from healthchecker.src.checker import HealthChecker
from registry.src.database.session import get_unmanaged_session
from registry.src.servers.repository import ServerRepository
from registry.src.settings import settings
logger: logging.Logger = logging.getLogger("Healthchecker")
def setup_logging() -> None:
logging.basicConfig(level=logging.INFO)
logger.info("Starting healthchecker")
logger.info(
"Healthchecker will run every %d seconds", settings.healthcheck_running_interval
)
async def perform_healthcheck_step() -> None:
logger.info("Running health-checks")
session: AsyncSession = await get_unmanaged_session()
repo: ServerRepository = ServerRepository(session)
notifications_client: NotificationsClient = NotificationsClient()
healthchecker: HealthChecker = HealthChecker(
session=session,
servers_repo=repo,
notifications_client=notifications_client,
)
try:
await healthchecker.run_once()
except Exception as error:
logger.error("Error in healthchecker", exc_info=error)
logger.info("Health-checks finished")
logger.info("Sleeping for %d seconds", settings.healthcheck_running_interval)
await session.commit()
await session.close()
async def main() -> None:
setup_logging()
while True:
await perform_healthcheck_step()
await asyncio.sleep(settings.healthcheck_running_interval)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/base.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from ..constants import BaseImage
from ..user_logger import UserLogger
class DockerfileGenerator:
def __init__(
self, base_image: BaseImage, repo_folder: str, user_logger: UserLogger
):
self.base_image = base_image
self.lines: list[str] = []
self.repo_folder = repo_folder
self.user_logger = user_logger
async def generate_dockerfile(self) -> str:
await self._image_setup()
await self._dependencies_installation()
await self._code_setup()
return self._save_dockerfile()
async def _image_setup(self) -> None:
lines = [
f"FROM {self.base_image.value}",
"WORKDIR /workspace",
]
self._add_to_file(lines)
async def _dependencies_installation(self) -> None:
raise NotImplementedError
async def _code_setup(self) -> None:
raise NotImplementedError
def _add_to_file(self, lines: list[str]) -> None:
self.lines.extend(lines)
@staticmethod
def _find_file(repo_folder: Path, file: str) -> Path | None:
files = repo_folder.rglob(file)
try:
return next(files)
except StopIteration:
return None
def _save_dockerfile(self) -> str:
dockerfile_text = "\n".join(self.lines)
with open(f"{self.repo_folder}/Dockerfile", "w") as dockerfile:
dockerfile.write(dockerfile_text)
return dockerfile_text
def _relative_path(self, path: Path) -> str:
str_path = str(path)
if str_path.startswith(self.repo_folder):
return "." + str(str_path[len(self.repo_folder) :])
return str_path
```
--------------------------------------------------------------------------------
/registry/src/deployments/router.py:
--------------------------------------------------------------------------------
```python
from fastapi import APIRouter
from fastapi import Depends
from fastapi import status
from .schemas import DeploymentCreate
from .schemas import ServerLogsSchema
from .schemas import UserId
from .service import DeploymentService
from registry.src.database import Server
from registry.src.database import ServerLogs
from registry.src.servers.schemas import ServerRead
router = APIRouter(prefix="/deployments")
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=ServerRead)
async def create_server(
data: DeploymentCreate,
service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> Server:
server = await service.create_server(data)
return server
@router.get("/{server_id}/logs", response_model=ServerLogsSchema)
async def get_logs(
server_id: int,
current_user_id: str,
service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> ServerLogs:
logs = await service.get_logs(server_id=server_id, current_user_id=current_user_id)
return logs
@router.post("/{server_id}/stop", status_code=status.HTTP_202_ACCEPTED)
async def stop_server(
server_id: int,
user_id: UserId,
service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> None:
await service.stop_server(
server_id=server_id, current_user_id=user_id.current_user_id
)
@router.post("/{server_id}/start", status_code=status.HTTP_202_ACCEPTED)
async def start_server(
server_id: int,
user_id: UserId,
service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> None:
await service.start_server(
server_id=server_id, current_user_id=user_id.current_user_id
)
```
--------------------------------------------------------------------------------
/registry/src/database/models/server.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy import String
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from ...database import models
from ...types import HostType
from ...types import ServerTransportProtocol
from .base import Base
from .mixins import HasCreatedAt
from .mixins import HasUpdatedAt
from registry.src.types import ServerStatus
class Server(HasCreatedAt, HasUpdatedAt, Base):
__tablename__ = "server"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(), nullable=False)
url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True)
name: Mapped[str] = mapped_column(String(), nullable=False, unique=True)
description: Mapped[str] = mapped_column(String(), nullable=False)
logo: Mapped[str | None] = mapped_column(String(), nullable=True)
host_type: Mapped[HostType] = mapped_column(String(), nullable=False)
creator_id: Mapped[str | None] = mapped_column(String(), nullable=True)
repo_url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True)
command: Mapped[str | None] = mapped_column(String(), nullable=True)
base_image: Mapped[str | None] = mapped_column(String(), nullable=True)
transport_protocol: Mapped[ServerTransportProtocol | None] = mapped_column(
String(),
nullable=True,
)
build_instructions: Mapped[str | None] = mapped_column(String(), nullable=True)
tools: Mapped[list["models.tool.Tool"]] = relationship(
back_populates="server", cascade="all, delete-orphan"
)
status: Mapped[ServerStatus] = mapped_column(
String(), nullable=False, default=ServerStatus.active
)
```
--------------------------------------------------------------------------------
/common/notifications/client.py:
--------------------------------------------------------------------------------
```python
from httpx import AsyncClient
from common.notifications.enums import NotificationEvent
from common.notifications.exceptions import NotificationsServiceError
DEFAULT_NOTIFICATIONS_CLIENT_URI: str = "http://mailing_api:8000"
class NotificationsClient:
def __init__(self, base_url: str | None = None) -> None:
if base_url is None:
base_url = DEFAULT_NOTIFICATIONS_CLIENT_URI
self._client: AsyncClient = AsyncClient(base_url=base_url)
async def _notify(
self,
user_id: str,
template: NotificationEvent,
data: dict,
) -> None:
response = await self._client.post(
f"/v1/emails/{template.value}",
json=dict(user_id=user_id, **data),
)
if response.status_code != 200:
raise NotificationsServiceError(f"Failed to send email: {response.text}")
async def send_server_down_alert(
self,
user_id: str,
server_name: str,
server_logs: str | None = None,
):
await self._notify(
user_id,
NotificationEvent.server_healthcheck_failed,
data={
"server_name": server_name,
"server_logs": server_logs,
},
)
async def notify_deploy_failed(self, user_id: str, server_name: str) -> None:
await self._notify(
user_id,
NotificationEvent.deploy_failed,
data={
"server_name": server_name,
},
)
async def notify_deploy_successful(self, user_id: str, server_name: str) -> None:
await self._notify(
user_id,
NotificationEvent.deploy_successful,
data={
"server_name": server_name,
},
)
```
--------------------------------------------------------------------------------
/registry/src/search/service.py:
--------------------------------------------------------------------------------
```python
from typing import Self
import instructor
from httpx import AsyncClient
from openai import AsyncOpenAI
from .prompts import get_top_servers
from .schemas import SearchResponse
from .schemas import SearchServer
from registry.src.logger import logger
from registry.src.settings import settings
class SearchService:
def __init__(self) -> None:
http_client: AsyncClient = (
AsyncClient()
if settings.llm_proxy is None
else AsyncClient(proxy=settings.llm_proxy)
)
self.llm = instructor.from_openai(
AsyncOpenAI(http_client=http_client, base_url=settings.openai_base_url)
)
async def _get_search_response(
self, servers: list[SearchServer], query: str
) -> SearchResponse:
prompt = get_top_servers.format(servers_list=servers, request=query)
completion = await self.llm.chat.completions.create(
model=settings.llm_model,
messages=[
{
"role": "user",
"content": prompt,
},
],
response_model=SearchResponse,
)
logger.debug(f"{completion=}")
return completion
@staticmethod
def _collect_urls(search_response: SearchResponse) -> list[str]:
server_urls = set()
for solution_step in search_response.solution_steps:
server_urls.add(solution_step.best_server.url)
server_urls |= {server.url for server in solution_step.additional_servers}
return list(server_urls)
async def get_fitting_servers(
self, servers: list[SearchServer], query: str
) -> list[str]:
search_response = await self._get_search_response(servers=servers, query=query)
return self._collect_urls(search_response=search_response)
@classmethod
async def get_new_instance(cls) -> Self:
return cls()
```
--------------------------------------------------------------------------------
/worker/src/user_logger.py:
--------------------------------------------------------------------------------
```python
from sqlalchemy.ext.asyncio import AsyncSession
from .schemas import CommandResult
from .schemas import CommandStart
from .schemas import GeneralLogMessage
from .schemas import LoggingEvent
from .schemas import LogLevel
from .schemas import LogMessage
from registry.src.deployments.logs_repository import LogsRepository
class UserLogger:
def __init__(self, session: AsyncSession, server_id: int):
self.logs_repo = LogsRepository(session=session)
self.server_id = server_id
async def clear_logs(self) -> None:
await self.logs_repo.clear_logs(server_id=self.server_id)
async def command_start(self, command: str) -> None:
log = LogMessage(
event_type=LoggingEvent.command_start, data=CommandStart(command=command)
)
await self.logs_repo.append_to_deployment_logs(
server_id=self.server_id, log=log
)
async def command_result(self, output: str, success: bool) -> None:
data = CommandResult(output=output, success=success)
log = LogMessage(event_type=LoggingEvent.command_result, data=data)
await self.logs_repo.append_to_deployment_logs(
server_id=self.server_id, log=log
)
async def info(self, message: str) -> None:
await self._add_message(log_level=LogLevel.info, message=message)
async def warning(self, message: str) -> None:
await self._add_message(log_level=LogLevel.warning, message=message)
async def error(self, message: str) -> None:
await self._add_message(log_level=LogLevel.error, message=message)
async def _add_message(self, log_level: LogLevel, message: str):
data = GeneralLogMessage(log_level=log_level, message=message)
log = LogMessage(event_type=LoggingEvent.general_message, data=data)
await self.logs_repo.append_to_deployment_logs(
server_id=self.server_id, log=log
)
```
--------------------------------------------------------------------------------
/registry/src/deployments/logs_repository.py:
--------------------------------------------------------------------------------
```python
from typing import Self
from fastapi import Depends
from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
from registry.src.database import get_session
from registry.src.database import ServerLogs
from worker.src.schemas import LogMessage
class LogsRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def create_logs(self, server_id: int) -> ServerLogs:
server_logs = ServerLogs(server_id=server_id)
self.session.add(server_logs)
await self.session.flush()
await self.session.refresh(server_logs)
return server_logs
async def get_server_logs(self, server_id: int) -> ServerLogs | None:
query = select(ServerLogs).where(ServerLogs.server_id == server_id)
logs = (await self.session.execute(query)).scalar_one_or_none()
return logs
async def update_server_logs(
self, server_id: int, deployment_logs: list[dict]
) -> None:
query = (
update(ServerLogs)
.where(ServerLogs.server_id == server_id)
.values(deployment_logs=deployment_logs)
)
await self.session.execute(query)
await self.session.flush()
await self.session.commit()
async def append_to_deployment_logs(self, server_id: int, log: LogMessage) -> None:
query = select(ServerLogs).where(ServerLogs.server_id == server_id)
logs: ServerLogs = (await self.session.execute(query)).scalar_one_or_none()
if not logs:
raise ValueError
deployment_logs = logs.deployment_logs + [log.model_dump()]
await self.update_server_logs(
server_id=server_id, deployment_logs=deployment_logs
)
async def clear_logs(self, server_id: int) -> None:
await self.update_server_logs(server_id=server_id, deployment_logs=[])
@classmethod
async def get_new_instance(
cls, session: AsyncSession = Depends(get_session)
) -> Self:
return cls(session=session)
```
--------------------------------------------------------------------------------
/alembic/env.py:
--------------------------------------------------------------------------------
```python
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from registry.src.database import Base
from registry.src.settings import settings
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
config.set_main_option("sqlalchemy.url", settings.database_url_sync)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
```
--------------------------------------------------------------------------------
/mcp_server/src/llm/tool_manager.py:
--------------------------------------------------------------------------------
```python
import json
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import CallToolResult
from openai.types.chat import ChatCompletionMessageToolCall
from openai.types.chat import ChatCompletionToolMessageParam
from openai.types.chat import ChatCompletionToolParam
from mcp_server.src.schemas import RegistryServer
from mcp_server.src.schemas import ToolInfo
class ToolManager:
def __init__(self, servers: list[RegistryServer]) -> None:
self.renamed_tools: dict[str, ToolInfo] = {}
self.tools: list[ChatCompletionToolParam] = self.set_tools(servers)
def rename_and_save(self, tool_name: str, server: RegistryServer) -> str:
renamed_tool = f"{tool_name}_mcp_{server.name}"
self.renamed_tools[renamed_tool] = ToolInfo(tool_name=tool_name, server=server)
return renamed_tool
def set_tools(self, servers: list[RegistryServer]) -> list[ChatCompletionToolParam]:
return [
ChatCompletionToolParam(
type="function",
function={
"name": self.rename_and_save(tool.name, server=server),
"description": tool.description,
"parameters": tool.input_schema,
},
)
for server in servers
for tool in server.tools
]
@staticmethod
async def _request_mcp(
arguments: str,
server_url: str,
tool_name: str,
) -> CallToolResult:
async with sse_client(server_url) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
return await session.call_tool(
tool_name,
arguments=(json.loads(arguments) if arguments else None),
)
async def handle_tool_call(
self, tool_call: ChatCompletionMessageToolCall
) -> ChatCompletionToolMessageParam:
tool_info = self.renamed_tools.get(tool_call.function.name)
if not tool_info:
return ChatCompletionToolMessageParam(
role="tool",
content="Error: Incorrect tool name",
tool_call_id=tool_call.id,
)
tool_call_result = await self._request_mcp(
arguments=tool_call.function.arguments,
server_url=tool_info.server.url,
tool_name=tool_info.tool_name,
)
return ChatCompletionToolMessageParam(
role="tool",
content=tool_call_result.content[0].text,
tool_call_id=tool_call.id,
)
```
--------------------------------------------------------------------------------
/registry/src/servers/schemas.py:
--------------------------------------------------------------------------------
```python
from datetime import datetime
from typing import Annotated
from typing import Any
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_serializer
from registry.src.base_schema import BaseSchema
from registry.src.types import HostType
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.constants import BaseImage
tool_name_regex = "^[a-zA-Z0-9_-]{1,64}$"
server_name_regex_pattern = "^[a-z0-9]+(_[a-z0-9]+)*$"
class Tool(BaseSchema):
model_config = ConfigDict(populate_by_name=True, **BaseSchema.model_config)
name: str = Field(pattern=tool_name_regex)
alias: str = Field(pattern=tool_name_regex)
description: str
input_schema: dict[str, Any] = Field(validation_alias="inputSchema")
class ServerCreateData(BaseSchema):
name: Annotated[str, Field(pattern=server_name_regex_pattern, max_length=30)]
title: str = Field(max_length=30)
description: str
url: str
logo: str | None = None
host_type: HostType = HostType.external
class ServerCreate(BaseSchema):
data: ServerCreateData
current_user_id: str | None = None
class ServerRead(BaseSchema):
title: str
id: int
status: ServerStatus
name: str
creator_id: str | None
description: str
repo_url: str | None
url: str | None
command: str | None
logo: str | None
host_type: HostType
base_image: BaseImage | None = None
build_instructions: str | None = None
transport_protocol: ServerTransportProtocol | None
created_at: datetime
updated_at: datetime
@field_serializer("created_at", "updated_at")
def serialize_datetime(self, value: datetime, _info) -> str:
return value.isoformat()
class ServerWithTools(ServerRead):
tools: list[Tool]
class ServerUpdateData(BaseSchema):
title: str | None = Field(max_length=30, default=None)
name: Annotated[
str | None, Field(pattern=server_name_regex_pattern, max_length=30)
] = None
description: str | None = None
url: str | None = None
logo: str | None = None
class DeploymentUpdateData(BaseSchema):
command: str | None = None
repo_url: str | None = None
base_image: BaseImage | None = None
build_instructions: str | None = None
class ServerUpdate(BaseSchema):
data: ServerUpdateData
deployment_data: DeploymentUpdateData
current_user_id: str | None = None
class MCP(BaseSchema):
name: str
description: str
endpoint: str = Field(validation_alias="url")
logo: str | None = None
class MCPJson(BaseSchema):
version: str = "1.0"
servers: list[MCP]
```
--------------------------------------------------------------------------------
/scripts/darp-search.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
List matching MCP servers from the registry.
"""
import argparse
import json
import sys
from typing import Any
from typing import Dict
from typing import List
import requests
default_registry_url: str = "http://localhost:80"
def main() -> None:
parser = argparse.ArgumentParser(description="Search for servers in the registry")
parser.add_argument("query", help="Search query string")
parser.add_argument(
"--registry-url",
default=default_registry_url,
help=f"Registry URL (default: {default_registry_url})",
)
parser.add_argument(
"--format",
choices=["names", "fulltext", "json"],
default="names",
help="Output format (default: names)",
)
args = parser.parse_args()
servers = search_servers(args.query, args.registry_url)
display_servers(servers, args.format)
def search_servers(
query: str, base_url: str = default_registry_url
) -> List[Dict[str, Any]]:
url = f"{base_url}/servers/search"
params = {"query": query}
try:
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
except requests.RequestException as error:
print(f"Error: Failed to search servers - {error}", file=sys.stderr)
sys.exit(1)
def display_servers(
servers: List[Dict[str, Any]], format_output: str = "names"
) -> None:
if format_output == "json":
display_servers_json(servers)
elif format_output == "names":
display_servers_names(servers)
elif format_output == "fulltext":
display_servers_fulltext(servers)
def display_servers_json(servers: List[Dict[str, Any]]) -> None:
print(json.dumps(servers, indent=2))
def display_servers_names(servers: List[Dict[str, Any]]) -> None:
print(f"Found {len(servers)} servers:")
for server in servers:
print(f"{server['name']}")
def display_servers_fulltext(servers: List[Dict[str, Any]]) -> None:
print(f"Found {len(servers)} servers:")
for server in servers:
print(f"{server['id']}: {server['name']}")
print(f"{indent(server['url'], 1)}")
print(f"{indent(server['description'], 1)}")
if server["tools"]:
print(indent("Tools:", 1))
for tool in server["tools"]:
print(f"{indent(tool['name'], 2)}")
print(f"{indent(tool['description'], 2)}")
print(f"{indent(json.dumps(tool['input_schema'], indent=2), 2)}")
def indent(text: str, level: int = 1, prefix: str = " ") -> str:
return "\n".join(prefix * level + line for line in text.split("\n"))
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/mcp_server/src/tools.py:
--------------------------------------------------------------------------------
```python
import logging
from openai.types.chat import ChatCompletionMessage
from openai.types.chat import ChatCompletionMessageParam
from openai.types.chat import ChatCompletionToolMessageParam
from openai.types.chat import ChatCompletionUserMessageParam
from common.llm.client import LLMClient
from mcp_server.src.decorators import log_errors
from mcp_server.src.llm.prompts import default_prompt
from mcp_server.src.llm.tool_manager import ToolManager
from mcp_server.src.registry_client import RegistryClient
from mcp_server.src.settings import settings
class Tools:
def __init__(self):
self.registry_client = RegistryClient()
logger = logging.getLogger("LLMClient")
self.llm_client = LLMClient(
logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url
)
@log_errors
async def search(self, request: str) -> list[str]:
servers = await self.registry_client.search(request=request)
return [server.url for server in servers]
@log_errors
async def routing(
self, request: str
) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]:
servers = await self.registry_client.search(request=request)
manager = ToolManager(servers)
user_message = ChatCompletionUserMessageParam(content=request, role="user")
resulting_conversation = await self._llm_request(
messages=[user_message], manager=manager
)
return resulting_conversation
async def _llm_request(
self,
messages: list[ChatCompletionMessageParam],
manager: ToolManager,
response_accumulator: (
list[ChatCompletionMessage | ChatCompletionToolMessageParam] | None
) = None,
) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]:
if response_accumulator is None:
response_accumulator = []
completion = await self.llm_client.request(
system_prompt=default_prompt,
model=settings.llm_model,
messages=messages,
tools=manager.tools,
)
choice = completion.choices[0]
response_accumulator.append(choice.message)
if choice.message.tool_calls:
tool_calls = choice.message.tool_calls
tool_result_messages = [
await manager.handle_tool_call(tool_call) for tool_call in tool_calls
]
response_accumulator += tool_result_messages
conversation = messages + [choice.message] + tool_result_messages
response_accumulator = await self._llm_request(
messages=conversation,
manager=manager,
response_accumulator=response_accumulator,
)
return response_accumulator
```
--------------------------------------------------------------------------------
/registry/src/validator/service.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Self
from fastapi import Depends
from openai.types.chat import ChatCompletionSystemMessageParam
from openai.types.chat import ChatCompletionUserMessageParam
from .prompts import SENS_TOPIC_CHECK_SYSTEM_PROMPT
from .prompts import SENS_TOPIC_CHECK_USER_PROMPT
from common.llm.client import LLMClient
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerCreate
from registry.src.servers.schemas import Tool
from registry.src.settings import settings
from registry.src.validator.schemas import ServerNeedsSensitiveDataResponse
from registry.src.validator.schemas import Tools
from registry.src.validator.schemas import ValidationResult
from registry.src.validator.ssl import SSLHelper
class ValidationService:
def __init__(
self,
servers_repo: ServerRepository,
ssl_helper: SSLHelper,
) -> None:
self._servers_repo = servers_repo
self._ssl_helper = ssl_helper
logger = logging.getLogger("LLMClient")
self._llm_client = LLMClient(
logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url
)
async def _mcp_needs_sensitive_data(
self,
data: ServerCreate,
tools: list[Tool],
) -> bool:
system_message = ChatCompletionSystemMessageParam(
content=SENS_TOPIC_CHECK_SYSTEM_PROMPT.render(), role="system"
)
message = ChatCompletionUserMessageParam(
content=SENS_TOPIC_CHECK_USER_PROMPT.render(
server_data=data,
tools_data=Tools.model_validate(tools),
),
role="user",
)
response = await self._llm_client.request_to_beta(
model=settings.llm_model,
messages=[system_message, message],
response_format=ServerNeedsSensitiveDataResponse,
)
answer = ServerNeedsSensitiveDataResponse.model_validate_json(
response.choices[0].message.content,
)
return answer.server_needs_sensitive_data
async def validate_server(
self, data: ServerCreate, tools: list[Tool]
) -> ValidationResult:
result = ValidationResult()
result.server_requires_sensitive_data = await self._mcp_needs_sensitive_data(
data,
tools,
)
result.authority_level = await self._ssl_helper.get_ssl_authority_level(
data.url,
)
return result
@classmethod
def get_new_instance(
cls,
servers_repo: ServerRepository = Depends(ServerRepository.get_new_instance),
ssl_helper: SSLHelper = Depends(SSLHelper.get_new_instance),
) -> Self:
return cls(
servers_repo=servers_repo,
ssl_helper=ssl_helper,
)
```
--------------------------------------------------------------------------------
/worker/src/consumer.py:
--------------------------------------------------------------------------------
```python
from datetime import timedelta
from typing import Any
from aiokafka import AIOKafkaConsumer
from .deployment_service import WorkerDeploymentService
from registry.src.database import get_unmanaged_session
from registry.src.logger import logger
from registry.src.servers.repository import ServerRepository
from registry.src.settings import settings
from worker.src.schemas import Task
from worker.src.schemas import TaskType
class Consumer:
def __init__(self) -> None:
self.max_poll_interval: int = int(timedelta(minutes=30).total_seconds())
self.session_timeout: int = int(timedelta(minutes=10).total_seconds())
self.task_handlers: dict[TaskType, Any] = {
TaskType.deploy: self.deploy_server,
TaskType.start: self.start_server,
TaskType.stop: self.stop_server,
}
async def start(self) -> None:
consumer = AIOKafkaConsumer(
settings.worker_topic,
bootstrap_servers=settings.broker_url,
auto_offset_reset="earliest",
group_id="deploy_workers",
group_instance_id="1",
max_poll_interval_ms=self.max_poll_interval * 1000,
session_timeout_ms=self.session_timeout * 1000,
)
await consumer.start()
try:
await self.process_messages(consumer)
finally:
await consumer.stop()
async def process_messages(self, consumer: AIOKafkaConsumer) -> None:
async for message in consumer:
incoming_message = self.parse_message(message.value)
if not incoming_message:
continue
await self.process_message(incoming_message)
async def process_message(self, task: Task) -> None:
session = await get_unmanaged_session()
try:
repo = ServerRepository(session)
await self.task_handlers[task.task_type](task.server_id, repo)
except Exception as error:
logger.error("Error while processing a task", exc_info=error)
await session.commit()
async def deploy_server(self, server_id: int, repo: ServerRepository) -> None:
service = WorkerDeploymentService(repo=repo, logger=logger)
await service.deploy_server(server_id=server_id)
async def stop_server(self, server_id: int, repo: ServerRepository) -> None:
raise NotImplementedError()
async def start_server(self, server_id: int, repo: ServerRepository) -> None:
raise NotImplementedError()
@staticmethod
def parse_message(message: bytes) -> Task | None:
logger.debug(f"Incoming message - {message.decode()}")
try:
task = Task.model_validate_json(message.decode())
return task
except Exception as error:
logger.error("Incorrect message received", exc_info=error)
return None
```
--------------------------------------------------------------------------------
/registry/src/validator/ssl.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
import ssl
from typing import Self
from urllib.parse import urlparse
from cryptography import x509
from cryptography.hazmat._oid import ExtensionOID
from cryptography.hazmat.backends import default_backend
from .constants import EV_OIDS
from .constants import HTTP_PORT
from .constants import HTTPS_PORT
from .constants import SELF_SIGNED_CERTIFICATE_ERR_CODE
from .schemas import SSLAuthorityLevel
class SSLHelper:
def __init__(self) -> None:
self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
async def get_ssl_authority_level(self, url: str) -> SSLAuthorityLevel:
try:
cert = await self._load_certificate(url)
except ssl.SSLCertVerificationError as exc:
if exc.verify_code == SELF_SIGNED_CERTIFICATE_ERR_CODE:
return SSLAuthorityLevel.SELF_SIGNED_CERTIFICATE
return SSLAuthorityLevel.INVALID_CERTIFICATE
except Exception as exc:
self._logger.error("Failed to fetch ssl cert", exc_info=exc)
return SSLAuthorityLevel.NO_CERTIFICATE
if self._is_cert_respects_ev(cert):
return SSLAuthorityLevel.EXTENDED_CERTIFICATE
return SSLAuthorityLevel.CERTIFICATE_OK
@classmethod
def get_new_instance(cls) -> Self:
return cls()
@staticmethod
def _extract_port(uri: str) -> int:
parsed = urlparse(uri)
if parsed.port:
return parsed.port
if parsed.scheme == "https":
return HTTPS_PORT
elif parsed.scheme == "http":
return HTTP_PORT
raise ValueError("Invalid URI", uri)
@staticmethod
def _extract_host(uri: str) -> str:
parsed = urlparse(uri)
if parsed.hostname is None:
raise ValueError("Invalid URL: No hostname found")
return parsed.hostname
@classmethod
async def _load_certificate(cls, url: str) -> x509.Certificate:
hostname = cls._extract_host(url)
port = cls._extract_port(url)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
reader, writer = await asyncio.open_connection(
hostname, port, ssl=ssl_context, server_hostname=hostname
)
ssl_object = writer.get_extra_info("ssl_object")
der_cert = ssl_object.getpeercert(binary_form=True)
writer.close()
await writer.wait_closed()
cert_obj = x509.load_der_x509_certificate(der_cert, default_backend())
return cert_obj
@staticmethod
def _is_cert_respects_ev(cert: x509.Certificate) -> bool:
try:
policies = cert.extensions.get_extension_for_oid(
ExtensionOID.CERTIFICATE_POLICIES
).value
except x509.ExtensionNotFound:
return False
for policy in policies:
if policy.policy_identifier.dotted_string in EV_OIDS:
return True
return False
```
--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/python/generic.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from typing import override
from ...errors import DependenciesError
from ..base import DockerfileGenerator
class PythonGenerator(DockerfileGenerator):
@override
async def _dependencies_installation(self) -> None:
repo_path = Path(self.repo_folder)
await self._install_node()
uv_file = self._find_file(repo_path, "uv.lock")
pyproject_file = self._find_file(repo_path, "pyproject.toml")
readme_file = self._find_file(repo_path, "README.md")
if pyproject_file and readme_file:
await self.user_logger.info(
message="pyproject.toml found. Installing dependencies with pip"
)
self._pip_dependencies(pyproject=pyproject_file, readme_file=readme_file)
return
requirements_file = self._find_file(repo_path, "requirements.txt")
if requirements_file:
await self.user_logger.info(
message="requirements.txt found. Installing dependencies with legacy pip method"
)
self._legacy_pip_dependencies(requirements_file)
return
if uv_file and pyproject_file and readme_file:
await self.user_logger.info(
message="uv.lock found. Installing dependencies with uv"
)
self._uv_dependencies(
uv_file=uv_file, pyproject=pyproject_file, readme_file=readme_file
)
return
raise DependenciesError("No supported dependencies installation method found")
@override
async def _code_setup(self) -> None:
lines = [
"COPY ./ ./",
]
self._add_to_file(lines)
async def _install_node(self) -> None:
await self.user_logger.info(message="Adding nodejs for supergateway")
lines = [
"RUN apt update && apt install -y nodejs npm",
]
self._add_to_file(lines)
def _legacy_pip_dependencies(self, requirements: Path) -> None:
lines = [
f"COPY {self._relative_path(requirements.absolute())} ./requirements.txt",
"RUN pip install -r requirements.txt",
]
self._add_to_file(lines)
def _pip_dependencies(self, pyproject: Path, readme_file: Path) -> None:
lines = [
f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml",
f"COPY {self._relative_path(readme_file.absolute())} ./README.md",
"RUN pip install .",
]
self._add_to_file(lines)
def _uv_dependencies(
self, uv_file: Path, pyproject: Path, readme_file: Path
) -> None:
lines = [
"COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/",
f"COPY {self._relative_path(uv_file.absolute())} ./uv.lock",
f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml",
f"COPY {self._relative_path(readme_file.absolute())} ./README.md",
"RUN uv pip install . --system",
]
self._add_to_file(lines)
```
--------------------------------------------------------------------------------
/healthchecker/src/checker.py:
--------------------------------------------------------------------------------
```python
import logging
from collections.abc import Sequence
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from common.notifications.client import NotificationsClient
from registry.src.database.models.server import Server
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import Tool
from registry.src.servers.service import ServerService
from registry.src.types import ServerStatus
class HealthChecker:
def __init__(
self,
session: AsyncSession,
servers_repo: ServerRepository,
notifications_client: NotificationsClient,
) -> None:
self.session = session
self.repo = servers_repo
self.notifications_client = notifications_client
self.logger = logging.getLogger("Healthchecker")
async def run_once(self) -> None:
servers = await self._fetch_servers()
for server in servers:
await self._check_server(server)
async def _fetch_servers(self) -> Sequence[Server]:
query: Select = await self.repo.get_all_servers()
servers: Sequence[Server] = (
(await self.session.execute(query)).scalars().fetchall()
)
return servers
async def _get_tools(self, server: Server) -> list[Tool]:
return await ServerService.get_tools(server.url, server.name)
async def _check_server(self, server: Server) -> None:
self.logger.info("Checking server %s", server.name)
try:
tools = await self._get_tools(server)
except Exception as error:
await self._handle_failed_server(server, error)
return
if not tools:
await self._handle_empty_tools(server)
return
await self._mark_server_healthy(server, tools)
async def _handle_failed_server(self, server: Server, error: Exception) -> None:
self.logger.error("Failed to fetch server's tools", exc_info=error)
self.logger.warning("Server %s will be marked as unhealthy", server.name)
await self.notifications_client.send_server_down_alert(
server.creator_id,
server_name=server.name,
)
await self._update_server_status(server, [], ServerStatus.inactive)
async def _handle_empty_tools(self, server: Server) -> None:
self.logger.warning("Server %s is alive, but has no tools", server.name)
self.logger.warning("Server will be marked as unhealthy.")
await self.notifications_client.send_server_down_alert(
server.creator_id,
server_name=server.name,
server_logs=(
"Server has no tools. It will be marked as unhealthy. "
"Please, check it out."
),
)
await self._update_server_status(server, [], ServerStatus.inactive)
async def _mark_server_healthy(self, server: Server, tools: list[Tool]) -> None:
await self._update_server_status(server, tools, ServerStatus.active)
async def _update_server_status(
self, server: Server, tools: list[Tool], status: ServerStatus
) -> None:
await self.repo.update_server(
id=server.id,
tools=tools,
status=status,
)
```
--------------------------------------------------------------------------------
/alembic.ini:
--------------------------------------------------------------------------------
```
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
```
--------------------------------------------------------------------------------
/common/llm/client.py:
--------------------------------------------------------------------------------
```python
from logging import Logger
from typing import Type
from httpx import AsyncClient
from openai import APIError
from openai import AsyncOpenAI
from openai import InternalServerError
from openai import NOT_GIVEN
from openai import OpenAIError
from openai.types.chat import ChatCompletion
from openai.types.chat import ChatCompletionMessageParam
from openai.types.chat import ChatCompletionToolParam
from pydantic import BaseModel
class LLMClient:
def __init__(
self, logger: Logger, proxy: str | None = None, base_url: str | None = None
) -> None:
self._logger = logger
if proxy is not None:
self._logger.info("Enabled proxy %s", proxy)
http_client = AsyncClient(proxy=proxy)
else:
http_client = AsyncClient()
self.openai_client = AsyncOpenAI(http_client=http_client, base_url=base_url)
@staticmethod
def _get_full_messages(
system_prompt: str | None, messages: list[ChatCompletionMessageParam]
) -> list[ChatCompletionMessageParam]:
system_message: list[dict] = []
if system_prompt:
system_message = [
{
"role": "system",
"content": system_prompt,
# Anthropic prompt caching
"cache_control": {"type": "ephemeral"},
}
]
full_messages = system_message + messages
return full_messages
async def request(
self,
model: str,
messages: list[ChatCompletionMessageParam],
system_prompt: str | None = None,
max_tokens: int | None = None,
tools: list[ChatCompletionToolParam] | None = None,
) -> ChatCompletion:
full_messages = self._get_full_messages(
system_prompt=system_prompt, messages=messages
)
try:
response = await self.openai_client.chat.completions.create(
model=model,
messages=full_messages,
max_tokens=max_tokens,
tools=tools or NOT_GIVEN,
timeout=30,
)
except APIError as error:
self._logger.error(f"{error.code=} {error.body=}")
raise
except InternalServerError as error:
self._logger.error(error.response.json())
raise
except OpenAIError as error:
self._logger.error(
"Request to Provider failed with the following exception",
exc_info=error,
)
raise
return response
async def request_to_beta(
self,
model: str,
messages: list[ChatCompletionMessageParam],
system_prompt: str | None = None,
max_tokens: int | None = None,
tools: list[ChatCompletionToolParam] | None = None,
response_format: Type[BaseModel] = NOT_GIVEN,
) -> ChatCompletion:
full_messages = self._get_full_messages(
system_prompt=system_prompt, messages=messages
)
try:
response = await self.openai_client.beta.chat.completions.parse(
model=model,
messages=full_messages,
max_tokens=max_tokens,
tools=tools or NOT_GIVEN,
timeout=30,
response_format=response_format,
)
except APIError as error:
self._logger.error(f"{error.code=} {error.body=}")
raise
except InternalServerError as error:
self._logger.error(error.response.json())
raise
except OpenAIError as error:
self._logger.error(
"Request to Provider failed with the following exception",
exc_info=error,
)
raise
return response
```
--------------------------------------------------------------------------------
/worker/src/docker_service.py:
--------------------------------------------------------------------------------
```python
import subprocess
from pathlib import Path
from .constants import BaseImage
from .dockerfile_generators import DockerfileGenerator
from .dockerfile_generators import get_dockerfile_generator
from .errors import InvalidData
from .errors import ProcessingError
from .user_logger import UserLogger
from registry.src.database import Server
from registry.src.logger import logger
from registry.src.settings import settings
class DockerService:
def __init__(
self,
docker_env: dict,
repo_folder: str,
server: Server,
user_logger: UserLogger,
) -> None:
self.docker_env = docker_env
self.repo_folder = repo_folder
self.server = server
self.image_name = f"hipasus/{self.server.name}"
self.container_name = f"darp_server_{server.id}"
self.user_logger = user_logger
async def clone_repo(self) -> None:
await self._execute(["git", "clone", self.server.repo_url, self.repo_folder])
def dockerfile_exists(self) -> bool:
path = Path(f"{self.repo_folder}/Dockerfile")
return path.exists()
async def generate_dockerfile(self) -> None:
if not self.server.base_image:
await self.user_logger.error(message="No base image provided.")
raise InvalidData
generator: DockerfileGenerator = await get_dockerfile_generator(
repo_folder=self.repo_folder,
base_image=BaseImage(self.server.base_image),
build_instructions=self.server.build_instructions,
user_logger=self.user_logger,
)
try:
logs = await generator.generate_dockerfile()
await self.user_logger.info(message=f"Generated Dockerfile:\n{logs}")
logger.info(logs)
except ProcessingError as error:
await self.user_logger.error(
message=f"Error generating Dockerfile: {error.message or ''}"
)
raise
async def build_image(self) -> None:
command = [
"docker",
"build",
"-t",
self.image_name,
"-f",
f"{self.repo_folder}/Dockerfile",
self.repo_folder,
]
await self._execute(command)
async def push_image(self) -> None:
docker_push_command = (
f"docker login -u {settings.dockerhub_login} -p {settings.dockerhub_password} && docker image push {self.image_name}",
)
await self.user_logger.info("Pushing image...")
result = subprocess.run(docker_push_command, shell=True)
if result.returncode != 0:
await self.user_logger.error(message="Docker push failed")
logger.error("docker push failed")
raise ProcessingError
async def run_container(self) -> None:
subprocess.run(
["docker", "rm", self.container_name, "--force"], env=self.docker_env
)
run_command = [
"docker",
"run",
"--network",
settings.deployment_network,
"-d",
"--restart",
"always",
"--pull",
"always",
"--name",
self.container_name,
self.image_name,
]
if self.server.command:
run_command.extend(["sh", "-c", self.server.command])
await self._execute(command=run_command, env=self.docker_env)
async def _execute(self, command: list[str], env: dict | None = None) -> None:
command_str = " ".join(command)
logger.info(f"{command_str=}")
await self.user_logger.command_start(command=command_str)
arguments: dict = dict(
args=command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if env:
arguments["env"] = env
result = subprocess.run(**arguments)
output = result.stdout.decode() if result.stdout else "" # noqa
if result.returncode != 0:
await self.user_logger.command_result(output=output, success=False)
logger.error(f"{' '.join(command[:2])} failed")
raise ProcessingError
await self.user_logger.command_result(output=output, success=True)
```
--------------------------------------------------------------------------------
/registry/src/servers/router.py:
--------------------------------------------------------------------------------
```python
from typing import Literal
from fastapi import APIRouter
from fastapi import Depends
from fastapi import Query
from fastapi import status as status_codes
from fastapi_pagination import add_pagination
from fastapi_pagination import Page
from fastapi_pagination import Params
from fastapi_pagination.ext.sqlalchemy import paginate
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from ..deployments.service import DeploymentService
from ..errors import InvalidData
from .schemas import MCPJson
from .schemas import ServerCreate
from .schemas import ServerRead
from .schemas import ServerUpdate
from .schemas import ServerWithTools
from .service import ServerService
from registry.src.database import get_session
from registry.src.database import Server
from registry.src.search.schemas import SearchServer
from registry.src.search.service import SearchService
from registry.src.types import RoutingMode
from registry.src.types import ServerStatus
router = APIRouter(prefix="/servers")
add_pagination(router)
@router.post(
"/", response_model=ServerWithTools, status_code=status_codes.HTTP_201_CREATED
)
async def create(
data: ServerCreate, service: ServerService = Depends(ServerService.get_new_instance)
) -> Server:
return await service.create(data)
@router.get("/search", response_model=list[ServerWithTools])
async def search(
query: str,
routing_mode: RoutingMode = RoutingMode.auto,
service: ServerService = Depends(ServerService.get_new_instance),
search_service: SearchService = Depends(SearchService.get_new_instance),
) -> list[Server]:
if routing_mode == RoutingMode.auto:
servers = await service.get_search_servers()
formatted_servers = [SearchServer.model_validate(server) for server in servers]
server_urls = await search_service.get_fitting_servers(
servers=formatted_servers, query=query
)
return await service.get_servers_by_urls(server_urls=server_urls)
elif routing_mode == RoutingMode.deepresearch:
return await service.get_deep_research()
raise InvalidData(f"Unsupported {routing_mode=}", routing_mode=routing_mode)
@router.get("/batch", response_model=list[ServerWithTools])
async def get_servers_by_ids(
ids: list[int] = Query(...),
service: ServerService = Depends(ServerService.get_new_instance),
) -> list[Server]:
return await service.get_servers_by_ids(ids=ids)
@router.get("/.well-known/mcp.json", response_model=MCPJson)
async def get_mcp_json(
service: ServerService = Depends(ServerService.get_new_instance),
) -> MCPJson:
return await service.get_mcp_json()
@router.delete("/{id}")
async def delete_server(
id: int,
current_user_id: str,
service: ServerService = Depends(ServerService.get_new_instance),
) -> None:
return await service.delete_server(id=id, current_user_id=current_user_id)
@router.get("/", response_model=Page[ServerWithTools])
async def get_all_servers(
query: str | None = None,
status: Literal["any"] | ServerStatus = Query(
title="Server status",
default=ServerStatus.active,
description="Filter servers by status, by default servers with status `active` will be returned",
),
creator_id: str | None = None,
params: Params = Depends(),
service: ServerService = Depends(ServerService.get_new_instance),
session: AsyncSession = Depends(get_session),
) -> Page[Server]:
status_filter: ServerStatus | None = None
if isinstance(status, ServerStatus):
status_filter = status
servers: Select = await service.get_all_servers(
search_query=query,
status=status_filter,
creator_id=creator_id,
)
return await paginate(session, servers, params)
@router.get("/{id}", response_model=ServerWithTools)
async def get_server_by_id(
id: int,
service: ServerService = Depends(ServerService.get_new_instance),
) -> Server:
return await service.get_server_by_id(id=id)
@router.put("/{id}", response_model=ServerRead)
async def update_server(
id: int,
data: ServerUpdate,
service: ServerService = Depends(ServerService.get_new_instance),
deployment_service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> Server:
await deployment_service.update_deployment(server_id=id, data=data)
server = await service.update_server(id=id, data=data)
return server
```
--------------------------------------------------------------------------------
/scripts/darp-router.py:
--------------------------------------------------------------------------------
```python
import argparse
import asyncio
from typing import Any
from typing import List
from typing import Literal
from typing import Union
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import CallToolResult
from openai.types.chat import ChatCompletionMessage
from openai.types.chat.chat_completion_content_part_text_param import (
ChatCompletionContentPartTextParam,
)
from pydantic import BaseModel
default_mcp_url: str = "http://localhost:4689/sse"
tool_name: str = "routing"
async def main() -> None:
parser = argparse.ArgumentParser(description="DARP Router")
parser.add_argument("request", type=str, help="Routing request")
parser.add_argument(
"--format", choices=["text", "json"], default="text", help="Output format"
)
parser.add_argument("--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
response = await request_mcp(
server_url=default_mcp_url,
tool_name=tool_name,
arguments={"request": args.request},
)
display_response(response.content[0].text, format=args.format, verbose=args.verbose)
async def request_mcp(
server_url: str = default_mcp_url,
tool_name: str = tool_name,
arguments: dict[str, Any] | None = None,
) -> CallToolResult:
async with sse_client(server_url) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
return await session.call_tool(
tool_name,
arguments=arguments or {},
)
def display_response(response: str, format: str = "text", verbose: bool = False):
if format == "json":
print(response)
else:
routing_response = RoutingResponse.model_validate_json(response)
for message in routing_response.conversation:
print(message.__str__(verbose))
class PrintableChatCompletionMessage(ChatCompletionMessage):
def __str__(self, verbose: bool = False) -> str:
contents = "\n".join(
filter(
None,
[
self._format_content("refusal", verbose),
self._format_content("function_call", verbose),
self._format_content("tool_calls", verbose),
self._format_content("content", verbose=True),
],
)
)
return f"{self.role}: {contents}"
def _format_content(self, key: str, verbose: bool = False) -> str | None:
value = getattr(self, f"_{key}_content")()
if value is None:
return None
if not verbose:
return f"[{key}]"
else:
return f"{value}"
def _refusal_content(self) -> str | None:
if self.refusal is not None:
return "\n" + indent(self.refusal)
return None
def _function_call_content(self) -> str | None:
if self.function_call is not None:
return f"{self.function_call.name}({self.function_call.arguments})"
return None
def _tool_calls_content(self) -> str | None:
if self.tool_calls is not None:
return "\n" + indent(
"\n".join(
f"{call.function.name}({call.function.arguments})"
for call in self.tool_calls
)
)
return None
def _content_content(self) -> str | None:
if self.content is not None:
return "\n" + indent(self.content)
return None
class PrintableChatCompletionToolMessageParam(BaseModel):
role: Literal["tool"]
content: Union[str, List[ChatCompletionContentPartTextParam]]
tool_call_id: str
def __str__(self, verbose: bool = False) -> str:
if not verbose:
return f"[{self.role}] ..."
if isinstance(self.content, str):
return f"{self.role}: {self.content}"
elif isinstance(self.content, list):
contents = "\n".join(
"{type}: {text}".format(**item) for item in self.content
)
return indent(f"{self.role}:\n{indent(contents)}")
else:
raise ValueError(f"Invalid content type: {type(self.content)}")
class RoutingResponse(BaseModel):
conversation: list[
PrintableChatCompletionMessage | PrintableChatCompletionToolMessageParam
]
def indent(text: str, indent: int = 1, prefix: str = " "):
return "\n".join(prefix * indent + line for line in text.split("\n"))
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/worker/src/deployment_service.py:
--------------------------------------------------------------------------------
```python
import asyncio
import os
import tempfile
from logging import Logger
from .docker_service import DockerService
from .user_logger import UserLogger
from common.notifications.client import NotificationsClient
from registry.src.database import Server
from registry.src.errors import InvalidData
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerUpdateData
from registry.src.servers.schemas import Tool
from registry.src.servers.service import ServerService
from registry.src.settings import settings
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.errors import ProcessingError
class WorkerDeploymentService:
def __init__(self, repo: ServerRepository, logger: Logger):
self.repo = repo
self.logger = logger
self.docker_env = dict(**os.environ)
self.docker_env["DOCKER_HOST"] = settings.deployment_server
self.notifications_client = NotificationsClient()
async def deploy_server(self, server_id: int) -> None:
server = await self.repo.get_server(id=server_id)
if not server:
raise ProcessingError
with tempfile.TemporaryDirectory(
prefix=f"server_{server.id}", dir="/tmp"
) as repo_folder:
user_logger = UserLogger(session=self.repo.session, server_id=server_id)
await user_logger.clear_logs()
service = DockerService(
docker_env=self.docker_env,
repo_folder=repo_folder,
server=server,
user_logger=user_logger,
)
server_url = f"http://{service.container_name}/sse"
try:
await self._deploy_repo(service=service)
tools = await self._get_tools(
server=server, service=service, server_url=server_url
)
except Exception:
await self._handle_deploy_failure(server)
raise
await user_logger.info("Deployment complete.")
await self._handle_deploy_success(
tools=tools,
server=server,
url=server_url,
transport_protocol=ServerTransportProtocol.SSE,
)
async def _deploy_repo(self, service: DockerService) -> None:
await service.clone_repo()
if not service.dockerfile_exists():
self.logger.info(f"No Dockerfile in repo: {service.server.repo_url}")
await self._ensure_command(service)
await service.user_logger.info(
message="No Dockerfile found in the project. Attempting to generate..."
)
await service.generate_dockerfile()
await service.build_image()
await service.push_image()
await service.run_container()
async def _handle_deploy_failure(self, server: Server) -> None:
await self.repo.update_server(
id=server.id,
data=ServerUpdateData(),
status=ServerStatus.processing_failed,
)
await self.notifications_client.notify_deploy_failed(
user_id=server.creator_id,
server_name=server.name,
)
async def _handle_deploy_success(
self,
tools: list[Tool],
server: Server,
url: str,
transport_protocol: ServerTransportProtocol,
) -> None:
await self.repo.update_server(
id=server.id,
tools=tools,
data=ServerUpdateData(url=url),
status=ServerStatus.active,
transport_protocol=transport_protocol,
)
await self.notifications_client.notify_deploy_successful(
user_id=server.creator_id,
server_name=server.name,
)
async def _get_tools(
self, server: Server, service: DockerService, server_url: str
) -> list[Tool]:
for i in range(settings.tool_retry_count):
tools = await self._fetch_tools(
server=server, server_url=server_url, service=service
)
if tools:
return tools
await service.user_logger.error("Error getting tools from server")
raise ProcessingError
async def _fetch_tools(
self, server: Server, server_url: str, service: DockerService
) -> list[Tool] | None:
try:
await service.user_logger.info(
f"Waiting {settings.tool_wait_interval} seconds for server to start up"
)
await asyncio.sleep(settings.tool_wait_interval)
tools = await ServerService.get_tools(
server_url=server_url, server_name=server.name
)
await service.user_logger.info(
"Successfully got a list of tools from server"
)
return tools
except InvalidData:
await service.user_logger.warning("Failed to connect to server")
return None
@staticmethod
async def _ensure_command(service: DockerService):
if not service.server.command:
await service.user_logger.error(
"Command must not be empty if project does not have a Dockerfile"
)
raise ProcessingError
```
--------------------------------------------------------------------------------
/registry/src/deployments/service.py:
--------------------------------------------------------------------------------
```python
import re
from typing import Self
from fastapi import Depends
from ..settings import settings
from ..types import HostType
from ..types import TaskType
from .logs_repository import LogsRepository
from .schemas import DeploymentCreate
from registry.src.database import Server
from registry.src.database import ServerLogs
from registry.src.errors import InvalidData
from registry.src.errors import InvalidServerNameError
from registry.src.errors import NotAllowedError
from registry.src.errors import ServerAlreadyExistsError
from registry.src.errors import ServersNotFoundError
from registry.src.producer import get_producer
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerRead
from registry.src.servers.schemas import ServerUpdate
from registry.src.servers.service import ID_REGEX_PATTERN
from registry.src.types import ServerStatus
from worker.src.schemas import Task
class DeploymentService:
def __init__(self, repo: ServerRepository, logs_repo: LogsRepository) -> None:
self.repo = repo
self.logs_repo = logs_repo
async def get_server(self, server_id: int, creator_id: str | None = None) -> Server:
await self._assure_server_found(server_id)
server = await self.repo.get_server(server_id)
if creator_id:
await self._assure_server_creator(server=server, user_id=creator_id)
return server
async def get_logs(self, server_id: int, current_user_id: str) -> ServerLogs:
server = await self.get_server(server_id=server_id, creator_id=current_user_id)
if server.host_type != HostType.internal:
raise InvalidData("Logs are only available for internal servers")
logs = await self.logs_repo.get_server_logs(server_id=server_id)
assert logs
return logs
async def create_server(self, data: DeploymentCreate) -> Server:
self._assure_server_name_is_valid_id(data.data.name)
await self._assure_server_not_exists(
name=data.data.name, repo_url=str(data.data.repo_url)
)
server = await self.repo.create_server(
data=data.data, creator_id=data.current_user_id, tools=[]
)
await self.logs_repo.create_logs(server_id=server.id)
await self.repo.session.commit()
await self._send_task(server_id=server.id, task_type=TaskType.deploy)
return server
async def update_deployment(self, server_id: int, data: ServerUpdate) -> Server:
server = await self.get_server(
server_id=server_id, creator_id=data.current_user_id
)
if not data.deployment_data.model_dump(exclude_none=True):
return server
server = await self.repo.update_server(
id=server_id,
**data.deployment_data.model_dump(),
status=ServerStatus.in_processing,
)
await self.repo.session.commit()
await self._send_task(server_id=server.id, task_type=TaskType.deploy)
return server
async def stop_server(self, server_id: int, current_user_id: str) -> None:
server = await self.get_server(server_id=server_id, creator_id=current_user_id)
if server.status == ServerStatus.stopped:
return
if server.status != ServerStatus.active:
raise InvalidData("Server is not active", server_status=server.status)
await self._send_task(server_id=server_id, task_type=TaskType.stop)
async def start_server(self, server_id: int, current_user_id: str) -> None:
server = await self.get_server(server_id=server_id, creator_id=current_user_id)
if server.status == ServerStatus.active:
return
if server.status != ServerStatus.stopped:
raise InvalidData("Invalid server state", server_status=server.status)
await self._send_task(server_id=server_id, task_type=TaskType.start)
@staticmethod
async def _send_task(server_id: int, task_type: TaskType) -> None:
producer = await get_producer()
await producer.start()
await producer.send_and_wait(
topic=settings.worker_topic,
value=Task(task_type=task_type, server_id=server_id),
)
await producer.stop()
async def _assure_server_found(self, server_id: int) -> None:
if not await self.repo.find_servers(id=server_id):
raise ServersNotFoundError([server_id])
@staticmethod
async def _assure_server_creator(server: Server, user_id: str) -> None:
if server.creator_id != user_id:
raise NotAllowedError(
"Must be server creator to use this function",
creator_id=server.creator_id,
current_user_id=user_id,
)
async def _assure_server_not_exists(
self,
name: str | None = None,
repo_url: str | None = None,
ignore_id: int | None = None,
) -> None:
if servers := await self.repo.find_servers(
name=name, repo_url=repo_url, ignore_id=ignore_id
):
dict_servers = [
ServerRead.model_validate(server).model_dump() for server in servers
]
raise ServerAlreadyExistsError(dict_servers)
@staticmethod
def _assure_server_name_is_valid_id(name: str) -> None:
if not re.match(ID_REGEX_PATTERN, name):
raise InvalidServerNameError(name=name)
@classmethod
def get_new_instance(
cls,
repo: ServerRepository = Depends(ServerRepository.get_new_instance),
logs_repo: LogsRepository = Depends(LogsRepository.get_new_instance),
) -> Self:
return cls(repo=repo, logs_repo=logs_repo)
```
--------------------------------------------------------------------------------
/registry/src/servers/repository.py:
--------------------------------------------------------------------------------
```python
from fastapi import Depends
from sqlalchemy import delete
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import Select
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from .schemas import ServerCreateData
from .schemas import ServerUpdateData
from .schemas import Tool
from registry.src.database import get_session
from registry.src.database import Server
from registry.src.database import Tool as DBTool
from registry.src.deployments.schemas import DeploymentCreateData
from registry.src.errors import ServersNotFoundError
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.constants import BaseImage
class ServerRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def find_servers(
self,
id: int | None = None,
name: str | None = None,
url: str | None = None,
ignore_id: int | None = None,
repo_url: str | None = None,
) -> list[Server]:
if id is None and name is None and url is None:
raise ValueError("At least one of 'id', 'name', or 'url' must be provided.")
if id == ignore_id and id is not None:
raise ValueError("id and ignore_id cannot be the same.")
query = select(Server).options(selectinload(Server.tools))
conditions = [
(Server.id == id),
(func.lower(Server.name) == func.lower(name)),
]
if url:
conditions.append(Server.url == url)
if repo_url:
conditions.append(Server.repo_url == repo_url)
query = query.filter(or_(*conditions))
query = query.filter(Server.id != ignore_id)
result = await self.session.execute(query)
return result.scalars().all()
async def get_server(self, id: int) -> Server:
query = select(Server).filter(Server.id == id)
query = query.options(selectinload(Server.tools))
result = await self.session.execute(query)
server = result.scalars().first()
if server is None:
raise ServersNotFoundError([id])
return server
async def create_server(
self,
data: ServerCreateData | DeploymentCreateData,
tools: list[Tool],
creator_id: str | None,
transport_protocol: ServerTransportProtocol | None = None,
) -> Server:
db_tools = []
if isinstance(data, ServerCreateData):
db_tools = self._convert_tools(tools, data.url)
server = Server(
**data.model_dump(exclude_none=True),
tools=db_tools,
creator_id=creator_id,
transport_protocol=transport_protocol,
)
self.session.add(server)
await self.session.flush()
await self.session.refresh(server)
return await self.get_server(server.id)
async def get_all_servers(
self,
search_query: str | None = None,
status: ServerStatus | None = None,
creator_id: str | None = None,
) -> Select:
query = select(Server).options(selectinload(Server.tools))
if status is not None:
query = query.where(Server.status == status)
if creator_id:
query = query.where(Server.creator_id == creator_id)
if search_query:
query_string = f"%{search_query}%"
query = query.where(
or_(
Server.name.ilike(query_string),
Server.description.ilike(query_string),
Server.title.ilike(query_string),
)
)
return query
async def get_servers_by_urls(self, urls: list[str]) -> list[Server]:
query = select(Server).where(Server.url.in_(urls))
result = await self.session.execute(query)
return list(result.scalars().all())
async def get_servers_by_ids(self, ids: list[int]) -> list[Server]:
query = (
select(Server).where(Server.id.in_(ids)).options(selectinload(Server.tools))
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def delete_server(self, id: int) -> None:
query = delete(Server).where(Server.id == id)
await self.session.execute(query)
async def update_server(
self,
id: int,
data: ServerUpdateData | None = None,
tools: list[Tool] | None = None,
status: ServerStatus | None = None,
repo_url: str | None = None,
command: str | None = None,
base_image: BaseImage | None = None,
build_instructions: str | None = None,
transport_protocol: ServerTransportProtocol | None = None,
) -> Server:
server = await self.get_server(id)
update_values = dict(
status=status,
repo_url=repo_url,
command=command,
transport_protocol=transport_protocol,
base_image=base_image,
build_instructions=build_instructions,
)
update_values = {
key: value for key, value in update_values.items() if value is not None
}
if data:
update_values.update(data.model_dump(exclude_none=True))
for key, value in update_values.items():
setattr(server, key, value)
if tools is not None:
query = delete(DBTool).where(DBTool.server_url == server.url)
await self.session.execute(query)
await self.session.flush()
await self.session.refresh(server)
server.tools.extend(self._convert_tools(tools, server.url))
await self.session.flush()
await self.session.refresh(server)
return server
def _convert_tools(self, tools: list[Tool], url: str) -> list[DBTool]:
return [
DBTool(**tool.model_dump(exclude_none=True), server_url=url)
for tool in tools
]
@classmethod
def get_new_instance(
cls, session: AsyncSession = Depends(get_session)
) -> "ServerRepository":
return cls(session)
```
--------------------------------------------------------------------------------
/docker-compose.yaml:
--------------------------------------------------------------------------------
```yaml
services:
registry:
build:
context: .
dockerfile: registry/Dockerfile
environment:
UVICORN_PORT: ${UVICORN_PORT:-80}
POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
POSTGRES_USER: ${POSTGRES_USER:-change_me}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
POSTGRES_DB: ${POSTGRES_DB:-registry}
POSTGRES_PORT: ${POSTGRES_PORT:-}
OPENAI_API_KEY: ${OPENAI_API_KEY}
OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
LLM_PROXY: ${LLM_PROXY:-}
LLM_MODEL: ${LLM_MODEL:-}
HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
image: ${REGISTRY_IMAGE:-registry}:${CI_COMMIT_BRANCH:-local}
container_name: registry
pull_policy: always
restart: always
depends_on:
registry_postgres:
condition: service_healthy
restart: true
profiles:
- main
volumes:
- /var/lib/hipasus/registry/api/logs:/workspace/logs
healthcheck:
test: curl -f http://localhost:${UVICORN_PORT:-80}/healthcheck
interval: 15s
timeout: 5s
retries: 3
start_period: 15s
command: start.sh
registry_mcp_server:
build:
context: .
dockerfile: mcp_server/Dockerfile
environment:
- MCP_PORT
- LOG_LEVEL
- REGISTRY_URL
- OPENAI_API_KEY
- OPENAI_BASE_URL
- LLM_PROXY
- LLM_MODEL
image: ${MCP_REGISTRY_IMAGE:-registry_mcp}:${CI_COMMIT_BRANCH:-local}
container_name: registry_mcp_server
pull_policy: always
restart: always
depends_on:
registry:
condition: service_healthy
restart: true
healthcheck:
test: python3 -m mcp_server.scripts.mcp_health_check
interval: 15s
timeout: 5s
retries: 3
start_period: 15s
profiles:
- main
volumes:
- /var/lib/hipasus/registry/mcp_server/logs:/workspace/logs
command: start.sh
registry_deploy_worker:
container_name: registry_deploy_worker
image: ${WORKER_IMAGE_NAME:-registry_worker}:${CI_COMMIT_BRANCH:-local}
build:
context: .
dockerfile: worker/Dockerfile
pull_policy: always
environment:
LOG_LEVEL: ${LOG_LEVEL}
POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
POSTGRES_USER: ${POSTGRES_USER:-change_me}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
POSTGRES_DB: ${POSTGRES_DB:-registry}
POSTGRES_PORT: ${POSTGRES_PORT:-5432}
OPENAI_API_KEY: ${OPENAI_API_KEY}
OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
LLM_PROXY: ${LLM_PROXY:-}
LLM_MODEL: ${LLM_MODEL:-}
DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN}
DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD}
DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network}
HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
profiles:
- main
depends_on:
registry_postgres:
condition: service_healthy
restart: true
portal_kafka:
condition: service_healthy
restart: true
volumes:
- /var/lib/hipasus/registry/worker/logs:/workspace/logs
- /var/run/docker.sock:/var/run/docker.sock
restart: always
command: worker-start.sh
portal_zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: portal_zookeeper
ports:
- "2181:2181"
profiles:
- main
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: portal_zookeeper:2888:3888
healthcheck:
test: nc -z localhost 2181 || exit -1
interval: 30s
timeout: 20s
retries: 5
restart: always
portal_kafka:
image: confluentinc/cp-kafka:7.3.2
container_name: portal_kafka
profiles:
- main
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://portal_kafka:19092,DOCKER://127.0.0.1:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "portal_zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
healthcheck:
test: kafka-cluster cluster-id --bootstrap-server 127.0.0.1:29092 || exit 1
interval: 30s
timeout: 20s
retries: 5
depends_on:
portal_zookeeper:
condition: service_healthy
restart: true
restart: always
registry_postgres:
container_name: registry_postgres
image: postgres:17
environment:
POSTGRES_USER: ${POSTGRES_USER:-change_me}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
POSTGRES_DB: ${POSTGRES_DB:-registry}
volumes:
- /var/lib/hipasus/registry/pg_data:/var/lib/postgresql/data
command: postgres -c 'max_connections=500'
profiles:
- main
healthcheck:
test: pg_isready -U ${POSTGRES_USER:-change_me} -d ${POSTGRES_DB:-registry}
interval: 7s
timeout: 5s
retries: 5
start_period: 5s
restart: always
registry_healthchecker:
container_name: registry_healthchecker
image: ${HEALTHCHECKER_IMAGE_NAME:-registry_healthchecker}:${CI_COMMIT_BRANCH:-local}
build:
context: .
dockerfile: healthchecker/Dockerfile
pull_policy: always
environment:
LOG_LEVEL: ${LOG_LEVEL}
POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
POSTGRES_USER: ${POSTGRES_USER:-change_me}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
POSTGRES_DB: ${POSTGRES_DB:-registry}
POSTGRES_PORT: ${POSTGRES_PORT:-5432}
OPENAI_API_KEY: ${OPENAI_API_KEY}
OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
LLM_PROXY: ${LLM_PROXY:-}
LLM_MODEL: ${LLM_MODEL:-}
DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN}
DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD}
DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network}
HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
profiles:
- main
depends_on:
registry:
condition: service_healthy
restart: true
restart: always
command: worker-start.sh
networks:
default:
name: ${DOCKER_NETWORK:-portal_network}
external: true
```
--------------------------------------------------------------------------------
/registry/src/servers/service.py:
--------------------------------------------------------------------------------
```python
import re
from contextlib import _AsyncGeneratorContextManager # noqa
from typing import Self
from fastapi import Depends
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from pydantic import ValidationError
from sqlalchemy import Select
from ..settings import settings
from .repository import ServerRepository
from .schemas import MCP
from .schemas import MCPJson
from .schemas import ServerCreate
from .schemas import ServerRead
from .schemas import ServerUpdate
from .schemas import Tool
from .schemas import tool_name_regex
from registry.src.database import Server
from registry.src.errors import InvalidData
from registry.src.errors import InvalidServerNameError
from registry.src.errors import NotAllowedError
from registry.src.errors import RemoteServerError
from registry.src.errors import ServerAlreadyExistsError
from registry.src.errors import ServersNotFoundError
from registry.src.logger import logger
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
BASE_SERVER_NAME = "base_darp_server#REPLACEME"
ID_REGEX_PATTERN = re.compile(rf"^[a-zA-Z_][a-zA-Z0-9_]*|{BASE_SERVER_NAME}$")
class ServerService:
def __init__(self, repo: ServerRepository) -> None:
self.repo = repo
async def create(self, data: ServerCreate) -> Server:
self._assure_server_name_is_valid_id(data.data.name)
await self._assure_server_not_exists(name=data.data.name, url=data.data.url)
server_transport_protocol = await self.detect_transport_protocol(
server_url=data.data.url
)
logger.info(
"Detected server's transport protocol %s", server_transport_protocol
)
tools = await self.get_tools(
server_url=data.data.url,
server_name=data.data.name,
transport=server_transport_protocol,
)
return await self.repo.create_server(
data.data,
tools,
creator_id=data.current_user_id,
transport_protocol=server_transport_protocol,
)
@classmethod
async def get_tools(
cls,
server_url: str,
server_name: str,
transport: ServerTransportProtocol | None = None,
) -> list[Tool]:
if transport is None:
transport = await cls.detect_transport_protocol(server_url)
try:
return await cls._fetch_tools(
server_url=server_url, server_name=server_name, transport=transport
)
except Exception as e:
if isinstance(e, InvalidData):
raise
logger.warning(
f"Error while getting tools from server {server_url}",
exc_info=e,
)
raise InvalidData(
"Server is unhealthy or URL does not lead to a valid MCP server",
url=server_url,
)
async def delete_server(self, id: int, current_user_id: str) -> None:
server = await self.get_server_by_id(id=id)
if server.creator_id != current_user_id:
raise NotAllowedError("Only server creator can delete it")
await self.repo.delete_server(id=id)
async def get_all_servers(
self,
status: ServerStatus | None = None,
search_query: str | None = None,
creator_id: str | None = None,
) -> Select:
return await self.repo.get_all_servers(
search_query=search_query, status=status, creator_id=creator_id
)
async def get_server_by_id(self, id: int) -> Server:
await self._assure_server_found(id)
return await self.repo.get_server(id=id)
async def get_servers_by_ids(self, ids: list[int]) -> list[Server]:
servers = await self.repo.get_servers_by_ids(ids=ids)
if len(ids) != len(servers):
retrieved_server_ids = {server.id for server in servers}
missing_server_ids = set(ids) - retrieved_server_ids
raise ServersNotFoundError(ids=list(missing_server_ids))
return servers
async def update_server(self, id: int, data: ServerUpdate) -> Server:
server = await self.get_server_by_id(id=id)
transport_protocol = None
if server.creator_id != data.current_user_id:
raise NotAllowedError("Only server creator can update it")
if data.data.name or data.data.url:
await self._assure_server_not_exists(
name=data.data.name, url=data.data.url, ignore_id=id
)
updated_server_url = data.data.url or server.url
if updated_server_url is not None:
transport_protocol = await self.detect_transport_protocol(
server_url=updated_server_url
)
tools = await self.get_tools(
server_url=updated_server_url,
server_name=data.data.name or server.name,
transport=transport_protocol,
)
else:
tools = []
return await self.repo.update_server(
id,
data.data,
tools,
transport_protocol=transport_protocol,
)
async def _assure_server_found(self, id: int) -> None:
if not await self.repo.find_servers(id=id):
raise ServersNotFoundError([id])
def _assure_server_name_is_valid_id(self, name: str) -> None:
if not re.match(ID_REGEX_PATTERN, name):
raise InvalidServerNameError(name=name)
async def _assure_server_not_exists(
self,
id: int | None = None,
name: str | None = None,
url: str | None = None,
ignore_id: int | None = None,
) -> None:
if servers := await self.repo.find_servers(id, name, url, ignore_id):
dict_servers = [
ServerRead.model_validate(server).model_dump() for server in servers
]
raise ServerAlreadyExistsError(dict_servers)
async def get_search_servers(self) -> list[Server]:
servers_query = await self.repo.get_all_servers()
servers_query = servers_query.where(Server.url != None) # noqa
servers = (await self.repo.session.execute(servers_query)).scalars().all()
return list(servers)
async def get_servers_by_urls(self, server_urls: list[str]) -> list[Server]:
servers = await self.repo.get_servers_by_urls(urls=server_urls)
if len(server_urls) != len(servers):
retrieved_server_urls = {server.url for server in servers}
missing_server_urls = set(server_urls) - retrieved_server_urls
logger.warning(
f"One or more server urls are incorrect {missing_server_urls=}"
)
return servers
async def get_mcp_json(self) -> MCPJson:
servers = await self.repo.get_all_servers()
servers = (await self.repo.session.execute(servers)).scalars().all()
return MCPJson(servers=[MCP.model_validate(server) for server in servers])
async def get_deep_research(self) -> list[Server]:
servers = await self.repo.find_servers(name=settings.deep_research_server_name)
if len(servers) == 0:
raise RemoteServerError(
f"{settings.deep_research_server_name} MCP server does not exist in registry"
)
assert len(servers) == 1, "Invalid state. Multiple deepresearch servers found"
if servers[0].status != ServerStatus.active:
raise RemoteServerError(
f"{settings.deep_research_server_name} MCP server is down."
)
return servers
@classmethod
def get_new_instance(
cls, repo: ServerRepository = Depends(ServerRepository.get_new_instance)
) -> Self:
return cls(repo=repo)
@classmethod
async def _fetch_tools(
cls, server_url: str, server_name: str, transport: ServerTransportProtocol
) -> list[Tool]:
client_ctx = cls._get_client_context(server_url, transport)
async with client_ctx as (read, write, *_):
async with ClientSession(read, write) as session:
await session.initialize()
tools_response = await session.list_tools()
try:
tools = [
Tool(
**tool.model_dump(exclude_none=True),
alias=f"{tool.name}__{server_name}",
)
for tool in tools_response.tools
]
except ValidationError:
raise InvalidData(
message=f"Invalid tool names. {{tool_name}}__{{server_name}} must fit {tool_name_regex}"
)
return tools
@classmethod
def _get_client_context(
cls,
server_url: str,
transport_protocol: ServerTransportProtocol,
) -> _AsyncGeneratorContextManager:
if transport_protocol == ServerTransportProtocol.STREAMABLE_HTTP:
client_ctx = streamablehttp_client(server_url)
elif transport_protocol == ServerTransportProtocol.SSE:
client_ctx = sse_client(server_url)
else:
raise RuntimeError(
"Unsupported transport protocol: %s", transport_protocol.name
)
return client_ctx
@classmethod
async def detect_transport_protocol(
cls,
server_url: str,
) -> ServerTransportProtocol:
for protocol in (
ServerTransportProtocol.STREAMABLE_HTTP,
ServerTransportProtocol.SSE,
):
if await cls._is_transport_supported(server_url, protocol):
return protocol
raise InvalidData(
"Can't detect server's transport protocol, maybe server is unhealthy?",
url=server_url,
)
@classmethod
async def _is_transport_supported(
cls,
server_url: str,
protocol: ServerTransportProtocol,
) -> bool:
try:
client_ctx = cls._get_client_context(server_url, protocol)
async with client_ctx as (read, write, *_):
return await cls._can_initialize_session(read, write)
except Exception as e:
logger.error(
"Failed to create %s client",
protocol.name,
exc_info=e,
)
return False
@staticmethod
async def _can_initialize_session(read, write) -> bool:
try:
async with ClientSession(read, write) as session:
await session.initialize()
return True
except Exception:
return False
```