#
tokens: 40284/50000 126/126 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .gitlab-ci.yml
├── .pre-commit-config.yaml
├── alembic
│   ├── env.py
│   ├── README
│   ├── script.py.mako
│   └── versions
│       ├── 2025_03_24_0945-ffe7a88d5b15_initial_migration.py
│       ├── 2025_04_09_1117-e25328dbfe80_set_logo_to_none.py
│       ├── 2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py
│       ├── 2025_04_25_1158-49e1ab420276_host_type.py
│       ├── 2025_04_30_1049-387bed3ce6ae_server_title.py
│       ├── 2025_05_12_1404-735603f5e4d5_creator_id.py
│       ├── 2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py
│       ├── 2025_05_13_1326-4b751cb703e2_timestamps.py
│       ├── 2025_05_15_1414-7a3913a3bb4c_logs.py
│       ├── 2025_05_27_0757-7e8c31ddb2db_json_logs.py
│       ├── 2025_05_28_0632-f2298e6acd39_base_image.py
│       ├── 2025_05_30_1231-4b5c0d56e1ca_build_instructions.py
│       ├── 2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py
│       ├── 2025_06_07_1322-aa9a14a698c5_jsonb_logs.py
│       └── 2025_06_17_1353-667874dd7dee_added_alias.py
├── alembic.ini
├── common
│   ├── __init__.py
│   ├── llm
│   │   ├── __init__.py
│   │   └── client.py
│   └── notifications
│       ├── __init__.py
│       ├── client.py
│       ├── enums.py
│       └── exceptions.py
├── deploy
│   ├── docker-version
│   ├── push-and-run
│   ├── ssh-agent
│   └── ssh-passphrase
├── docker-compose-debug.yaml
├── docker-compose.yaml
├── healthchecker
│   ├── __init__.py
│   ├── Dockerfile
│   ├── scripts
│   │   ├── worker-start-debug.sh
│   │   └── worker-start.sh
│   └── src
│       ├── __init__.py
│       ├── __main__.py
│       └── checker.py
├── LICENSE
├── mcp_server
│   ├── __init__.py
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── scripts
│   │   ├── __init__.py
│   │   ├── mcp_health_check.py
│   │   ├── start-debug.sh
│   │   └── start.sh
│   ├── server.json
│   └── src
│       ├── __init__.py
│       ├── decorators.py
│       ├── llm
│       │   ├── __init__.py
│       │   ├── prompts.py
│       │   └── tool_manager.py
│       ├── logger.py
│       ├── main.py
│       ├── registry_client.py
│       ├── schemas.py
│       ├── settings.py
│       └── tools.py
├── pyproject.toml
├── README.md
├── registry
│   ├── __init__.py
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── scripts
│   │   ├── requirements
│   │   ├── start-debug.sh
│   │   └── start.sh
│   └── src
│       ├── __init__.py
│       ├── base_schema.py
│       ├── database
│       │   ├── __init__.py
│       │   ├── models
│       │   │   ├── __init__.py
│       │   │   ├── base.py
│       │   │   ├── log.py
│       │   │   ├── mixins
│       │   │   │   ├── __init__.py
│       │   │   │   ├── has_created_at.py
│       │   │   │   ├── has_server_id.py
│       │   │   │   └── has_updated_at.py
│       │   │   ├── server.py
│       │   │   └── tool.py
│       │   └── session.py
│       ├── deployments
│       │   ├── __init__.py
│       │   ├── logs_repository.py
│       │   ├── router.py
│       │   ├── schemas.py
│       │   └── service.py
│       ├── errors.py
│       ├── logger.py
│       ├── main.py
│       ├── producer.py
│       ├── search
│       │   ├── __init__.py
│       │   ├── prompts.py
│       │   ├── schemas.py
│       │   └── service.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── repository.py
│       │   ├── router.py
│       │   ├── schemas.py
│       │   └── service.py
│       ├── settings.py
│       ├── types.py
│       └── validator
│           ├── __init__.py
│           ├── constants.py
│           ├── prompts.py
│           ├── schemas.py
│           ├── service.py
│           └── ssl.py
├── scripts
│   ├── darp-add.py
│   ├── darp-router.py
│   └── darp-search.py
└── worker
    ├── __init__.py
    ├── Dockerfile
    ├── requirements.txt
    ├── scripts
    │   ├── worker-start-debug.sh
    │   └── worker-start.sh
    └── src
        ├── __init__.py
        ├── constants.py
        ├── consumer.py
        ├── deployment_service.py
        ├── docker_service.py
        ├── dockerfile_generators
        │   ├── __init__.py
        │   ├── base.py
        │   ├── factory.py
        │   ├── python
        │   │   ├── __init__.py
        │   │   └── generic.py
        │   ├── typescript
        │   │   ├── __init__.py
        │   │   └── generic.py
        │   └── user_provided.py
        ├── errors.py
        ├── main.py
        ├── schemas.py
        └── user_logger.py
```

# Files

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
exclude: 'alembic/versions'
repos:
-   repo: https://github.com/psf/black
    rev: 24.4.2
    hooks:
    -   id: black

-   repo: https://github.com/asottile/reorder-python-imports
    rev: v3.13.0
    hooks:
    -   id: reorder-python-imports
        args: [--py312-plus]

-   repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.10.1
    hooks:
    -   id: mypy
        exclude: alembic
        additional_dependencies: [types-requests]

-   repo: https://github.com/PyCQA/flake8
    rev: 7.1.0
    hooks:
    -   id: flake8
        additional_dependencies: [flake8-pyproject]

```

--------------------------------------------------------------------------------
/.gitlab-ci.yml:
--------------------------------------------------------------------------------

```yaml
stages:
  - test
  - build
  - deploy

test-pre-commit:
  stage: test
  tags:
    - darp-group-shell-runner
  script:
    - source ~/miniconda3/bin/activate && pre-commit run --all-files

test-migrations:
  stage: test
  tags:
    - darp-group-shell-runner
  script:
    - duplicates=`grep -h down_revision alembic/versions/*.py | sort | uniq -d`
    - echo "$duplicates"
    - test -z "$duplicates"

clean-pre-commit:
  stage: test
  tags:
    - darp-group-shell-runner
  script:
    - source ~/miniconda3/bin/activate && pre-commit clean
  when: manual

build:
  stage: build
  when: manual
  tags:
    - darp-group-shell-runner
  script:
    - source deploy/docker-version
    - docker compose --profile main build

deploy-test:
  stage: deploy
  needs: ["build"]
  when: manual
  tags:
    - darp-group-shell-runner
  script:
    - DOCKER_NETWORK=highkey_network deploy/push-and-run "$TEST_DOCKER_HOST"

deploy-memelabs:
  stage: deploy
  needs: ["build"]
  when: manual
  tags:
    - darp-group-shell-runner
  script:
    - DOCKER_NETWORK=highkey_network deploy/push-and-run "$MEMELABS_DOCKER_HOST"
  rules:
    - if: $CI_COMMIT_BRANCH == "main"


deploy-prod:
  stage: deploy
  when: manual
  needs: ["build"]
  tags:
    - darp-group-shell-runner
  script:
    - DOCKER_NETWORK=highkey_network deploy/push-and-run "$PRODUCTION_DOCKER_HOST"
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

deploy-toci-test:
  stage: deploy
  when: manual
  needs: ["build"]
  tags:
    - darp-group-shell-runner
  script:
    - deploy/push-and-run "$TOCI_TEST_DOCKER_HOST"

deploy-toci-prod:
  stage: deploy
  when: manual
  needs: ["build"]
  tags:
    - darp-group-shell-runner
  script:
    - deploy/push-and-run "$TOCI_PRODUCTION_DOCKER_HOST"
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# VSCode
.vscode/

# JetBrains
.idea

# WandB
wandb/

# Old
old/
temp/
profiler/

# Logs
logs/

# eval
eval_results/
evalplus_codegen/

# All datasets
dataset/
dataset_processed/
dataset_processed_*/
PDF/
*.xlsx
*.csv

# All evaluation results
eval_baselines/
eval_results/
eval_results_temp/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.ipynb
*.pid

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pdm
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.env.*
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# MacOs
.DS_Store

# Local history
.history

# PyCharm
.idea/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# DARPEngine
The MCP searchengine for DARP.

[![X][x-image]][x-url]
[![Code style: black][black-image]][black-url]
[![Imports: reorder-python-imports][imports-image]][imports-url]
[![Pydantic v2][pydantic-image]][pydantic-url]
[![pre-commit][pre-commit-image]][pre-commit-url]
[![License MIT][license-image]][license-url]

DARPEngine stores metadata for MCP servers hosted online and provides smart search capabilites.

## Features

* Simple CLI
* API access to search
* MCP tool to retrieve search results for connecting manually
* Routing MCP tool based on the server: answer any question using the tools found for the user's request

### Coming soon

* Support for `.well-known/mcp.json`
* Crawler
* Nice frontend
* Hosted version
* Validate different levels of SSL certificates and integrate this info smarly to make sensitive MCP servers difficult to spoof

## Installation

```
export OPENAI_API_KEY=sk-...
docker network create highkey_network
docker compose build
docker compose -f docker-compose.yaml -f docker-compose-debug.yaml up --build --wait
```

## Getting started

You can connect the DARPEngine to an MCP Client (e.g. Claude Desktop or Cursor) using mcp tools provided. Just select SSE mode & specify `http://localhost:4689/sse` as the endpoint.


### Direct CLI use

Another way is to use CLI. Most of the scripts work with just standard Python libraries, but routing tool requires mcp package, you can install script requirements like this:

```
conda create -n darp 'python>=3.10'
conda activate darp
pip install -r mcp_server/requirements.txt
```

When installation is over we can use the scripts.

To begin with it we need to add some MCP servers to the engine, e.g:

```
$ python scripts/darp-add.py --url http://memelabs.ai:3006/sse --name code_analysis --description "Analyze gitlab repo for quality, topics, packages use"
```

Then we can make the requests:

```
$ python scripts/darp-search.py "Analyze https://github.com/BenderV/autochat"
Found 1 servers:
code_analysis
```

You can get more useful results with the routing tool:

```
$ python scripts/darp-router.py "Analyze https://github.com/BenderV/autochat"
assistant: [tool_calls]
[tool] ...
[tool] ...
[tool] ...
assistant:
  ### Code Quality
  The code in the AutoChat repository demonstrates a good level of quality concerning **readability**, **maintainability**, and adherence to best practices:

  - **Readability**: Consistent naming conventions are evident, with adherence to Python's PEP 8 styling guidelines. Descriptive function and variable names enhance the understanding of the code's purpose.

  - **Maintainability**: The code is structured to allow easy updates and modifications, with a clear separation of concerns observed through the use of classes and methods handling distinct functionalities.

  - **Best Practices**: Extensive use of exception handling, type annotations, and docstrings reflect best practices in Python development. Some TODO comments suggest areas for improvement, indicating that further attention is needed.

  ### Code Structure
  The code is organized into multiple files and modules, each serving a distinct purpose:

  - **Modular Design**: Various classes (e.g., `Autochat`, `Image`, `Message`) indicate a well-structured object-oriented design that promotes separation of concerns, making the code easier to navigate.

  - **Logical Organization**: Files are logically separated based on functionality. For example, `chat.py` focuses on chat-related logic, while `model.py` handles message and image processing. The utility functions in `utils.py` enhance reusability.

  - **Testing**: The presence of a test file (`tests/test_utils.py`) shows commitment to testing, crucial for code reliability. The use of `unittest` indicates a structured approach to testing individual components.

  ### Main Functionality
  The code appears to be part of an **AutoChat package**, providing a framework for building conversational agents. Key functionalities include:

  - **Chat Management**: The `Autochat` class acts as the main interface for managing conversations, handling message history, context, and interaction limits.

  - **Message Handling**: Classes like `Message` and `MessagePart` enable structured message creation and processing, accommodating different message types, including text and images.

  - **Functionality Extensions**: Methods like `add_tool` and `add_function` allow dynamic addition of tools and functions, facilitating customization of the chat experience.

  - **Provider Integration**: Different API provider integrations (e.g., OpenAI, Anthropic) are encapsulated within respective classes, allowing flexibility in backend communication.

  - **Utilities**: Utility functions offer additional capabilities such as CSV formatting and function parsing that support main chat operations.

  Overall, the codebase is well-organized and showcases a thoughtful approach to developing a conversational AI framework. There is room for further refinement and enhancement, particularly in documentation and clarity of variable names.

  ### Library Usage
  The project makes use of **AI libraries**, indicated by its functionality related to conversational agents and integration with AI service providers. This supports its ability to manage interactions with AI models efficiently.

  ### Summary
  The AutoChat project is a chat system designed for communication with various AI models, primarily through the `Autochat` class, which manages conversations and supports complex message types, including text and images. The code is moderately complex due to its integration with external APIs and its ability to handle diverse interactions through extensible methods like `add_tool` and `add_function`. The quality of code is commendable, featuring a well-structured modular design that promotes readability and maintainability, although some areas require further documentation and refinement, such as clarifying variable names and enhancing comments. The organization into separate files for models, utilities, and tests aids development, but the utility functions could benefit from better categorization for improved clarity.
```

Of course, the usefulness of the result depends on the MCP servers you connect to the engine.


## Get help and support

Please feel free to connect with us using the [discussion section](https://github.com/hipasus/darp_engine/discussions).

## Contributing

Follow us on X: https://x.com/DARP_AI

## License

The DARPEngine codebase is under MIT license.

<br>

[x-image]: https://img.shields.io/twitter/follow/DARP_AI?style=social
[x-url]: https://x.com/DARP_AI
[black-image]: https://img.shields.io/badge/code%20style-black-000000.svg
[black-url]: https://github.com/psf/black
[imports-image]: https://img.shields.io/badge/%20imports-reorder_python_imports-%231674b1?style=flat&labelColor=ef8336
[imports-url]: https://github.com/asottile/reorder-python-imports/
[pydantic-image]: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json
[pydantic-url]: https://pydantic.dev
[pre-commit-image]: https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white
[pre-commit-url]: https://github.com/pre-commit/pre-commit
[license-image]: https://img.shields.io/github/license/DARPAI/darp_engine
[license-url]: https://opensource.org/licenses/MIT

```

--------------------------------------------------------------------------------
/common/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/common/llm/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/common/notifications/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/healthchecker/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/healthchecker/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_server/scripts/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_server/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/mcp_server/src/llm/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/database/models/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/deployments/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/search/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/servers/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/registry/src/validator/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/worker/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/worker/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/worker/scripts/worker-start.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env sh

set -e

python -m worker.src.main

```

--------------------------------------------------------------------------------
/healthchecker/scripts/worker-start.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env sh

set -e

python -m healthchecker.src

```

--------------------------------------------------------------------------------
/mcp_server/scripts/start.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

set -e

python3 -m mcp_server.src.main

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/python/__init__.py:
--------------------------------------------------------------------------------

```python
from .generic import PythonGenerator


__all__ = ["PythonGenerator"]

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/typescript/__init__.py:
--------------------------------------------------------------------------------

```python
from .generic import TypeScriptGenerator


__all__ = ["TypeScriptGenerator"]

```

--------------------------------------------------------------------------------
/mcp_server/requirements.txt:
--------------------------------------------------------------------------------

```
mcp==1.6.0
httpx==0.28.1
pydantic==2.11.1
pydantic-settings==2.8.1
openai==1.65.4
```

--------------------------------------------------------------------------------
/registry/src/database/models/base.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass

```

--------------------------------------------------------------------------------
/common/notifications/exceptions.py:
--------------------------------------------------------------------------------

```python
class NotificationsServiceError(Exception):
    """Base notifications-service exception"""

```

--------------------------------------------------------------------------------
/worker/scripts/worker-start-debug.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env sh

set -e

pip install --upgrade -r requirements.txt
python -m worker.src.main
```

--------------------------------------------------------------------------------
/healthchecker/scripts/worker-start-debug.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env sh

set -e

pip install --upgrade -r requirements.txt
python -m healthchecker.src
```

--------------------------------------------------------------------------------
/mcp_server/scripts/start-debug.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

set -e

pip install --upgrade -r ./requirements.txt
python3 -m mcp_server.src.main

```

--------------------------------------------------------------------------------
/registry/scripts/start.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

set -e

alembic upgrade head
uvicorn --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app
```

--------------------------------------------------------------------------------
/registry/src/base_schema.py:
--------------------------------------------------------------------------------

```python
from pydantic import BaseModel
from pydantic import ConfigDict


class BaseSchema(BaseModel):
    model_config = ConfigDict(from_attributes=True)

```

--------------------------------------------------------------------------------
/mcp_server/server.json:
--------------------------------------------------------------------------------

```json
{
  "name": "darp_search_engine",
  "description": "Search Engine for MCP servers",
  "endpoint": "http://registry_mcp_server/sse",
  "logo": null
}

```

--------------------------------------------------------------------------------
/registry/src/database/models/mixins/__init__.py:
--------------------------------------------------------------------------------

```python
from .has_created_at import HasCreatedAt
from .has_server_id import HasServerId
from .has_updated_at import HasUpdatedAt


__all__ = ["HasCreatedAt", "HasUpdatedAt", "HasServerId"]

```

--------------------------------------------------------------------------------
/registry/scripts/start-debug.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

set -e

pip install --upgrade -r requirements.txt
alembic upgrade head
uvicorn --reload --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app

```

--------------------------------------------------------------------------------
/mcp_server/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.11
WORKDIR /workspace
COPY mcp_server/requirements.txt .
RUN pip install -r requirements.txt

COPY common ./common
COPY mcp_server ./mcp_server
ENV PATH "$PATH:/workspace/mcp_server/scripts"
```

--------------------------------------------------------------------------------
/worker/src/constants.py:
--------------------------------------------------------------------------------

```python
from enum import StrEnum


class BaseImage(StrEnum):
    python_3_10 = "python:3.10.17-slim"
    python_3_11 = "python:3.11.12-slim"
    python_3_12 = "python:3.12.10-slim"

    node_lts = "node:lts-alpine"

```

--------------------------------------------------------------------------------
/worker/src/errors.py:
--------------------------------------------------------------------------------

```python
class ProcessingError(Exception):
    def __init__(self, message: str | None = None):
        self.message = message


class InvalidData(ProcessingError):
    pass


class DependenciesError(ProcessingError):
    pass

```

--------------------------------------------------------------------------------
/common/notifications/enums.py:
--------------------------------------------------------------------------------

```python
from enum import StrEnum


class NotificationEvent(StrEnum):
    email_confirmation = "email-confirmation"
    deploy_failed = "server-deploy-failed"
    deploy_successful = "server-success-deploy"
    server_healthcheck_failed = "server-healthcheck-failed"

```

--------------------------------------------------------------------------------
/registry/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.11
WORKDIR /workspace
COPY registry/requirements.txt .
RUN pip install -r requirements.txt

COPY alembic ./alembic
COPY alembic.ini ./alembic.ini
COPY common ./common
COPY registry ./registry
COPY worker ./worker
ENV PATH "$PATH:/workspace/registry/scripts"

```

--------------------------------------------------------------------------------
/registry/src/producer.py:
--------------------------------------------------------------------------------

```python
from aiokafka import AIOKafkaProducer

from registry.src.settings import settings


async def get_producer(url=settings.broker_url) -> AIOKafkaProducer:
    return AIOKafkaProducer(
        bootstrap_servers=url,
        value_serializer=lambda m: m.model_dump_json().encode("utf-8"),
    )

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
# pyproject.toml
[tool.pytest.ini_options]
minversion = "6.0"
addopts = "--doctest-modules"

[tool.flake8]
ignore = ['E203', 'D203', 'E501', 'W503']
exclude = [
        '.git',
        '__pycache__',
        'docs/source/conf.py',
        'old',
        'build',
        'dist',
        '.venv',
]
max-complexity = 25
```

--------------------------------------------------------------------------------
/registry/src/main.py:
--------------------------------------------------------------------------------

```python
from fastapi import FastAPI

from registry.src.deployments.router import router as deploy_router
from registry.src.servers.router import router as servers_router

app = FastAPI()

app.include_router(servers_router)
app.include_router(deploy_router)


@app.get("/healthcheck")
async def healthcheck():
    return "Alive"

```

--------------------------------------------------------------------------------
/worker/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM docker:latest

RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev
WORKDIR /workspace
COPY worker/requirements.txt .
RUN pip config set global.break-system-packages true
RUN pip install -r requirements.txt

COPY registry ./registry
COPY worker ./worker

ENV PATH "$PATH:/workspace/worker/scripts"

```

--------------------------------------------------------------------------------
/registry/src/database/__init__.py:
--------------------------------------------------------------------------------

```python
from .models.base import Base
from .models.log import ServerLogs
from .models.server import Server
from .models.tool import Tool
from .session import get_session
from .session import get_unmanaged_session

__all__ = [
    "Base",
    "get_session",
    "get_unmanaged_session",
    "Server",
    "Tool",
    "ServerLogs",
]

```

--------------------------------------------------------------------------------
/mcp_server/src/llm/prompts.py:
--------------------------------------------------------------------------------

```python
default_prompt: str = (
    """
You are an expert in the topic of user's request.

Your assistant has prepared for you the tools that might be helpful to answer the request.

Your goal is to provide detailed, accurate information and analysis in response to user queries and if necessary to perform some actions using the tools.
""".strip()
)

```

--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_created_at.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr


@declarative_mixin
class HasCreatedAt:
    @declared_attr
    @classmethod
    def created_at(cls):
        return Column(DateTime, default=datetime.now, nullable=False)

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/__init__.py:
--------------------------------------------------------------------------------

```python
from .base import DockerfileGenerator
from .factory import get_dockerfile_generator
from .python import PythonGenerator
from .typescript import TypeScriptGenerator
from .user_provided import UserDockerfileGenerator


__all__ = [
    "DockerfileGenerator",
    "PythonGenerator",
    "TypeScriptGenerator",
    "UserDockerfileGenerator",
    "get_dockerfile_generator",
]

```

--------------------------------------------------------------------------------
/healthchecker/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM docker:latest

RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev
WORKDIR /workspace
COPY registry/requirements.txt .

RUN pip config set global.break-system-packages true
RUN pip install -r requirements.txt

COPY common ./common
COPY registry ./registry
COPY worker ./worker
COPY healthchecker ./healthchecker

ENV PATH "$PATH:/workspace/healthchecker/scripts"

```

--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_updated_at.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr


@declarative_mixin
class HasUpdatedAt:
    @declared_attr
    @classmethod
    def updated_at(cls):
        return Column(
            DateTime, onupdate=datetime.now, default=datetime.now, nullable=False
        )

```

--------------------------------------------------------------------------------
/mcp_server/scripts/mcp_health_check.py:
--------------------------------------------------------------------------------

```python
import asyncio

from mcp import ClientSession
from mcp.client.sse import sse_client

from mcp_server.src.settings import settings


async def healthcheck():
    async with sse_client(f"http://localhost:{settings.mcp_port}/sse") as (
        read,
        write,
    ):
        async with ClientSession(read, write) as session:
            await session.initialize()


if __name__ == "__main__":
    asyncio.run(healthcheck())

```

--------------------------------------------------------------------------------
/registry/src/database/models/mixins/has_server_id.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy import Column
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy.orm import declarative_mixin
from sqlalchemy.orm import declared_attr


@declarative_mixin
class HasServerId:
    @declared_attr
    def server_id(cls):
        return Column(
            Integer,
            ForeignKey("server.id", ondelete="CASCADE"),
            nullable=False,
            primary_key=True,
        )

```

--------------------------------------------------------------------------------
/registry/src/database/models/log.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

from .base import Base
from .mixins import HasCreatedAt
from .mixins import HasServerId
from .mixins import HasUpdatedAt


class ServerLogs(HasCreatedAt, HasUpdatedAt, HasServerId, Base):
    __tablename__ = "server_logs"

    deployment_logs: Mapped[list[dict]] = mapped_column(
        JSONB(), nullable=False, server_default="'[]'::jsonb"
    )

```

--------------------------------------------------------------------------------
/registry/src/logger.py:
--------------------------------------------------------------------------------

```python
import logging

from .settings import settings

logger = logging.getLogger("registry")
logger.setLevel(logging.INFO)

formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

settings.log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(settings.log_dir / "registry.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

```

--------------------------------------------------------------------------------
/registry/src/types.py:
--------------------------------------------------------------------------------

```python
from enum import auto
from enum import StrEnum


class HostType(StrEnum):
    internal = auto()
    external = auto()


class ServerStatus(StrEnum):
    active = auto()
    inactive = auto()
    in_processing = auto()
    stopped = auto()
    processing_failed = auto()


class TaskType(StrEnum):
    deploy = auto()
    start = auto()
    stop = auto()


class ServerTransportProtocol(StrEnum):
    SSE = "SSE"
    STREAMABLE_HTTP = "STREAMABLE_HTTP"


class RoutingMode(StrEnum):
    auto = "auto"
    deepresearch = "deepresearch"

```

--------------------------------------------------------------------------------
/mcp_server/src/logger.py:
--------------------------------------------------------------------------------

```python
import logging

from mcp_server.src.settings import settings

logger = logging.getLogger("registry_mcp")
logger.setLevel(settings.log_level)

formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s")

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

settings.log_dir.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(settings.log_dir / "registry_mcp.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_12_1404-735603f5e4d5_creator_id.py:
--------------------------------------------------------------------------------

```python
"""creator_id

Revision ID: 735603f5e4d5
Revises: 1c6edbdb4642
Create Date: 2025-05-12 14:04:17.438611

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '735603f5e4d5'
down_revision = '1c6edbdb4642'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('creator_id', sa.String(), nullable=True))
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'creator_id')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_28_0632-f2298e6acd39_base_image.py:
--------------------------------------------------------------------------------

```python
"""base_image

Revision ID: f2298e6acd39
Revises: 7e8c31ddb2db
Create Date: 2025-05-28 06:32:44.781544

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'f2298e6acd39'
down_revision = 'aa9a14a698c5'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('base_image', sa.String(), nullable=True))
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'base_image')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_30_1231-4b5c0d56e1ca_build_instructions.py:
--------------------------------------------------------------------------------

```python
"""build_instructions

Revision ID: 4b5c0d56e1ca
Revises: f2298e6acd39
Create Date: 2025-05-30 12:31:08.515098

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '4b5c0d56e1ca'
down_revision = 'f2298e6acd39'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('build_instructions', sa.String(), nullable=True))
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'build_instructions')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/worker/src/main.py:
--------------------------------------------------------------------------------

```python
import asyncio

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError

from registry.src.logger import logger
from registry.src.settings import settings
from worker.src.consumer import Consumer


if __name__ == "__main__":
    try:
        admin = KafkaAdminClient(bootstrap_servers=settings.broker_url)

        worker_topic = NewTopic(
            name=settings.worker_topic, num_partitions=1, replication_factor=1
        )
        admin.create_topics([worker_topic])
    except TopicAlreadyExistsError:
        pass

    consumer = Consumer()
    logger.info("Starting consumer")
    asyncio.run(consumer.start())

```

--------------------------------------------------------------------------------
/mcp_server/src/registry_client.py:
--------------------------------------------------------------------------------

```python
from httpx import AsyncClient

from mcp_server.src.schemas import RegistryServer
from mcp_server.src.settings import settings


class RegistryClient:
    def __init__(self):
        self.client = AsyncClient(base_url=settings.registry_url)
        self.timeout = 90

    async def search(self, request: str) -> list[RegistryServer]:
        response = await self.client.get(
            "/servers/search", params=dict(query=request), timeout=self.timeout
        )
        assert (
            response.status_code == 200
        ), f"{response.status_code=} {response.content=}"
        data = response.json()
        return [RegistryServer.model_validate(server) for server in data]

```

--------------------------------------------------------------------------------
/mcp_server/src/schemas.py:
--------------------------------------------------------------------------------

```python
from typing import Any

from openai.types.chat import ChatCompletionMessage
from openai.types.chat import ChatCompletionToolMessageParam
from pydantic import BaseModel
from pydantic import ConfigDict


class BaseSchema(BaseModel):
    model_config = ConfigDict(from_attributes=True)


class Tool(BaseSchema):
    name: str
    description: str
    input_schema: dict[str, Any]


class RegistryServer(BaseSchema):
    name: str
    description: str
    url: str
    logo: str | None
    id: int
    tools: list[Tool]


class ToolInfo(BaseSchema):
    tool_name: str
    server: RegistryServer


class RoutingResponse(BaseSchema):
    conversation: list[ChatCompletionMessage | ChatCompletionToolMessageParam]

```

--------------------------------------------------------------------------------
/alembic/versions/2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py:
--------------------------------------------------------------------------------

```python
"""Add status to servers.

Revision ID: 1c6edbdb4642
Revises: 49e1ab420276
Create Date: 2025-04-22 08:54:20.892397

"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "1c6edbdb4642"
down_revision = "49e1ab420276"
branch_labels = None
depends_on = None


def upgrade():
    op.add_column(
        "server",
        sa.Column(
            "status",
            sa.String(),
            nullable=True,
        ),
    )
    op.execute("""UPDATE server SET status = 'active'""")
    op.alter_column("server", "status", nullable=False)


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column("server", "status")
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/alembic/versions/2025_04_30_1049-387bed3ce6ae_server_title.py:
--------------------------------------------------------------------------------

```python
"""server title

Revision ID: 387bed3ce6ae
Revises: 49e1ab420276
Create Date: 2025-04-30 10:49:34.277571

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '387bed3ce6ae'
down_revision = '7a3913a3bb4c'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('title', sa.String(), nullable=True))
    op.execute("""UPDATE server SET title = name""")
    op.alter_column("server", "title", nullable=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'title')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/registry/src/search/prompts.py:
--------------------------------------------------------------------------------

```python
get_top_servers: str = (
    """
You are part of MCP search engine. You are given a list of MCP server descriptions and a user's query. Your task is to select a minimal list of MCP servers that have enough tools to satisfy the query. Results of this request will be returned to the user's system which will make a separate request to LLM with the selected tools.

Consider if it is possible to satisfy the request in one step or in several steps. Consider if some tools don't have enough data if they are enough to completely perform one step or if you need to select several alternatives for the step. It is important that you follow the output format precisely.

Input data for you:

```
mcp_servers = {servers_list!r}
user_request = {request!r}
```
""".strip()
)

```

--------------------------------------------------------------------------------
/alembic/versions/2025_06_17_1353-667874dd7dee_added_alias.py:
--------------------------------------------------------------------------------

```python
"""added llm_tool_name

Revision ID: 667874dd7dee
Revises: c98f44cea12f
Create Date: 2025-06-17 13:53:18.588902

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '667874dd7dee'
down_revision = 'c98f44cea12f'
branch_labels = None
depends_on = None


def upgrade():
    op.add_column('tool', sa.Column('alias', sa.String(), nullable=True))
    op.execute("""
        UPDATE tool
        SET alias = tool.name || '__' || server.name
        FROM server
        WHERE server.url = tool.server_url;
    """)
    op.alter_column("tool", "alias", nullable=False)

def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('tool', 'alias')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/alembic/versions/2025_04_25_1158-49e1ab420276_host_type.py:
--------------------------------------------------------------------------------

```python
"""host_type

Revision ID: 49e1ab420276
Revises: e25328dbfe80
Create Date: 2025-04-25 11:58:17.768233

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '49e1ab420276'
down_revision = 'e25328dbfe80'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('host_type', sa.String(), nullable=True))
    op.execute("UPDATE server SET host_type = 'internal'")
    op.alter_column("server", "host_type", nullable=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'host_type')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/mcp_server/src/main.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP

from .schemas import RoutingResponse
from mcp_server.src.settings import settings
from mcp_server.src.tools import Tools


mcp: FastMCP = FastMCP(settings.server_json.name, port=settings.mcp_port)
tools: Tools = Tools()


@mcp.tool()
async def search_urls(request: str) -> list[str]:
    """Return URLs of the best MCP servers for processing the given request."""
    return await tools.search(request=request)


@mcp.tool()
async def routing(
    request: str,
) -> str:
    """Respond to any user request using MCP tools selected specifically for it."""
    response = await tools.routing(request=request)
    return RoutingResponse(conversation=response).model_dump_json()


if __name__ == "__main__":
    mcp.run(transport="sse")

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py:
--------------------------------------------------------------------------------

```python
"""Add transport-protocol column to servers

Revision ID: c98f44cea12f
Revises: f2298e6acd39
Create Date: 2025-05-31 11:56:05.469429

"""

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "c98f44cea12f"
down_revision = "4b5c0d56e1ca"
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column("server", sa.Column("transport_protocol", sa.String(), nullable=True))
    op.execute("""UPDATE server SET transport_protocol = 'SSE' WHERE status = 'active'""")
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column("server", "transport_protocol")
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/registry/requirements.txt:
--------------------------------------------------------------------------------

```
alembic==1.15.1
annotated-types==0.7.0
anyio==4.9.0
asyncpg==0.30.0
certifi==2025.1.31
cfgv==3.4.0
chardet==5.2.0
charset-normalizer==3.4.1
click==8.1.8
distlib==0.3.9
fastapi==0.115.11
fastapi-pagination==0.12.34
filelock==3.18.0
greenlet==3.1.1
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
identify==2.6.9
idna==3.10
instructor==1.7.3
Mako==1.3.9
MarkupSafe==3.0.2
mcp==1.9.2
nodeenv==1.9.1
openai==1.65.4
platformdirs==4.3.7
pre_commit==4.2.0
psycopg2-binary==2.9.10
pydantic==2.10.6
pydantic-settings==2.8.1
pydantic_core==2.27.2
cryptography==44.0.2
python-dotenv==1.0.1
PyYAML==6.0.2
requests==2.32.3
setuptools==75.8.0
sniffio==1.3.1
SQLAlchemy==2.0.39
sse-starlette==2.2.1
starlette==0.46.1
typing_extensions==4.12.2
urllib3==2.3.0
uvicorn==0.34.0
virtualenv==20.29.3
wheel==0.45.1
jinja2==3.1.6
aiokafka==0.12.0

```

--------------------------------------------------------------------------------
/alembic/versions/2025_06_07_1322-aa9a14a698c5_jsonb_logs.py:
--------------------------------------------------------------------------------

```python
"""jsonb_logs

Revision ID: aa9a14a698c5
Revises: 7e8c31ddb2db
Create Date: 2025-06-07 13:22:23.229230

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'aa9a14a698c5'
down_revision = '7e8c31ddb2db'
branch_labels = None
depends_on = None


def upgrade():
    op.drop_column('server_logs', 'deployment_logs')
    op.add_column('server_logs', sa.Column(
        'deployment_logs',
        postgresql.JSONB(),
        nullable=False,
        server_default=sa.text("'[]'::jsonb")
    ))


def downgrade():
    op.drop_column('server_logs', 'deployment_logs')
    op.add_column('server_logs', sa.Column(
        'deployment_logs',
        sa.VARCHAR(),
        nullable=False,
        server_default=sa.text("'[]'::character varying")
    ))

```

--------------------------------------------------------------------------------
/worker/requirements.txt:
--------------------------------------------------------------------------------

```
alembic==1.15.1
annotated-types==0.7.0
anyio==4.9.0
asyncpg==0.30.0
certifi==2025.1.31
cfgv==3.4.0
chardet==5.2.0
charset-normalizer==3.4.1
click==8.1.8
distlib==0.3.9
fastapi==0.115.11
fastapi-pagination==0.12.34
filelock==3.18.0
greenlet==3.1.1
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
identify==2.6.9
idna==3.10
instructor==1.7.3
Mako==1.3.9
MarkupSafe==3.0.2
mcp==1.9.2
nodeenv==1.9.1
openai==1.65.4
platformdirs==4.3.7
pre_commit==4.2.0
psycopg2-binary==2.9.10
pydantic==2.10.6
pydantic-settings==2.8.1
pydantic_core==2.27.2
cryptography==44.0.2
python-dotenv==1.0.1
PyYAML==6.0.2
requests==2.32.3
setuptools==75.8.0
sniffio==1.3.1
SQLAlchemy==2.0.39
sse-starlette==2.2.1
starlette==0.46.1
typing_extensions==4.12.2
urllib3==2.3.0
uvicorn==0.34.0
virtualenv==20.29.3
wheel==0.45.1
jinja2==3.1.6
aiokafka==0.12.0
kafka-python==2.0.3

```

--------------------------------------------------------------------------------
/alembic/versions/2025_04_09_1117-e25328dbfe80_set_logo_to_none.py:
--------------------------------------------------------------------------------

```python
"""set logo to none

Revision ID: e25328dbfe80
Revises: ffe7a88d5b15
Create Date: 2025-04-09 11:17:59.990186

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e25328dbfe80'
down_revision = 'ffe7a88d5b15'
branch_labels = None
depends_on = None


def upgrade():
    op.execute("UPDATE server SET logo = NULL WHERE logo = ''")
    # ### commands auto generated by Alembic - please adjust! ###
    op.alter_column('server', 'logo',
               existing_type=sa.VARCHAR(),
               nullable=True)
    # ### end Alembic commands ###


def downgrade():
    op.execute("UPDATE server SET logo = '' WHERE logo IS NULL")
    # ### commands auto generated by Alembic - please adjust! ###
    op.alter_column('server', 'logo',
               existing_type=sa.VARCHAR(),
               nullable=False)
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/registry/src/validator/prompts.py:
--------------------------------------------------------------------------------

```python
from jinja2 import Template

SENS_TOPIC_CHECK_SYSTEM_PROMPT: Template = Template(
    """
Analyze the description of an MCP server and its tools.

Your task is to determine whether the server likely works with sensitive data.

Consider the following rules:
1.	Sensitive data indicators:
•	Mentions of emails, phone numbers, SSNs, credit card numbers, access tokens, passwords, or other personal identifiable information (PII).
•	Tools or functionality related to authentication, authorization, payments, identity verification, document uploads, etc.
2.	Sensitive domains:
•	Banking
•	Finance
•	Government services
•	Or any tool that may require sending sensitive user data
"""
)

SENS_TOPIC_CHECK_USER_PROMPT: Template = Template(
    """
    '''
    Server's data: {{ server_data.model_dump_json() }}
    --------------
    Server's tools: {{ tools_data.model_dump_json() }}
    '''
    """
)

```

--------------------------------------------------------------------------------
/registry/src/validator/schemas.py:
--------------------------------------------------------------------------------

```python
from enum import Enum

from pydantic import BaseModel
from pydantic import Field
from pydantic import RootModel

from registry.src.servers.schemas import ServerCreate
from registry.src.servers.schemas import Tool


class SSLAuthorityLevel(Enum):
    NO_CERTIFICATE = 0
    INVALID_CERTIFICATE = 1
    SELF_SIGNED_CERTIFICATE = 2
    CERTIFICATE_OK = 3
    EXTENDED_CERTIFICATE = 4


class ServerNeedsSensitiveDataResponse(BaseModel):
    reasoning: list[str] = Field(
        ...,
        description="For each argument of each tool describe if you can send sensitive data to this argument, and what kind of it",
    )
    server_needs_sensitive_data: bool


class ServerWithTools(ServerCreate):
    tools: list[Tool]


class ValidationResult(BaseModel):
    server_requires_sensitive_data: bool = False
    authority_level: SSLAuthorityLevel = SSLAuthorityLevel.NO_CERTIFICATE


Tools = RootModel[list[Tool]]

```

--------------------------------------------------------------------------------
/registry/src/validator/constants.py:
--------------------------------------------------------------------------------

```python
EV_OIDS: list[str] = [
    "2.16.756.5.14.7.4.8",
    "2.16.156.112554.3",
    "2.16.840.1.114028.10.1.2",
    "1.3.6.1.4.1.4788.2.202.1",
    "2.16.840.1.114404.1.1.2.4.1",
    "2.16.840.1.114413.1.7.23.3",
    "2.16.840.1.114412.2.1",
    "1.3.6.1.4.1.14777.6.1.2",
    "2.16.578.1.26.1.3.3",
    "1.3.6.1.4.1.14777.6.1.1",
    "2.23.140.1.1",
    "1.3.6.1.4.1.7879.13.24.1",
    "1.3.6.1.4.1.40869.1.1.22.3",
    "1.3.159.1.17.1",
    "1.2.616.1.113527.2.5.1.1",
    "2.16.840.1.114414.1.7.23.3",
    "1.3.6.1.4.1.34697.2.1",
    "2.16.756.1.89.1.2.1.1",
    "1.3.6.1.4.1.6449.1.2.1.5.1",
    "1.3.6.1.4.1.34697.2.3",
    "1.3.6.1.4.1.13769.666.666.666.1.500.9.1",
    "1.3.6.1.4.1.34697.2.2",
    "1.2.156.112559.1.1.6.1",
    "1.3.6.1.4.1.8024.0.2.100.1.2",
    "1.3.6.1.4.1.34697.2.4",
    "1.2.392.200091.100.721.1",
]
SELF_SIGNED_CERTIFICATE_ERR_CODE: int = 18
HTTPS_PORT: int = 443
HTTP_PORT: int = 80

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_27_0757-7e8c31ddb2db_json_logs.py:
--------------------------------------------------------------------------------

```python
"""json_logs

Revision ID: 7e8c31ddb2db
Revises: 7a3913a3bb4c
Create Date: 2025-05-27 07:57:51.044585

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '7e8c31ddb2db'
down_revision = '387bed3ce6ae'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.execute("UPDATE server_logs SET deployment_logs = '[]'")
    op.alter_column(
        "server_logs",
        "deployment_logs",
        server_default="[]",
        existing_server_default=""
    )
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.alter_column(
        "server_logs",
        "deployment_logs",
        server_default="",
        existing_server_default="[]"
    )
    op.execute("UPDATE server_logs SET deployment_logs = ''")
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_13_1326-4b751cb703e2_timestamps.py:
--------------------------------------------------------------------------------

```python
"""timestamps

Revision ID: 4b751cb703e2
Revises: 3b8b8a5b99ad
Create Date: 2025-05-13 13:26:42.259154

"""
from datetime import datetime

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '4b751cb703e2'
down_revision = '3b8b8a5b99ad'
branch_labels = None
depends_on = None


def upgrade():
    op.add_column('server', sa.Column('created_at', sa.DateTime(), nullable=True))
    op.add_column('server', sa.Column('updated_at', sa.DateTime(), nullable=True))

    current_time = f"timestamp '{datetime.now().isoformat()}'"
    op.execute(f"""UPDATE server SET created_at = {current_time}, updated_at = {current_time}""")
    op.alter_column("server", "created_at", nullable=False)
    op.alter_column("server", "updated_at", nullable=False)


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('server', 'updated_at')
    op.drop_column('server', 'created_at')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/registry/src/database/models/tool.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy import ForeignKey
from sqlalchemy import String
from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship

from ...database import models
from .base import Base


class Tool(Base):
    __tablename__ = "tool"
    __table_args__ = (
        UniqueConstraint("name", "server_url", name="uq_tool_name_server_url"),
    )

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(), nullable=False)
    description: Mapped[str] = mapped_column(String(), nullable=False)
    input_schema: Mapped[dict] = mapped_column(JSONB, nullable=False)
    alias: Mapped[str] = mapped_column(String(), nullable=False)
    server_url: Mapped[str] = mapped_column(
        ForeignKey("server.url", ondelete="CASCADE")
    )
    server: Mapped["models.server.Server"] = relationship(back_populates="tools")

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_15_1414-7a3913a3bb4c_logs.py:
--------------------------------------------------------------------------------

```python
"""logs

Revision ID: 7a3913a3bb4c
Revises: 4b751cb703e2
Create Date: 2025-05-15 14:14:22.223452

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '7a3913a3bb4c'
down_revision = '4b751cb703e2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        'server_logs',
        sa.Column('deployment_logs', sa.String(), server_default='', nullable=False),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.Column('updated_at', sa.DateTime(), nullable=False),
        sa.Column('server_id', sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(['server_id'], ['server.id'], ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('server_id')
    )
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('server_logs')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/worker/src/schemas.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime
from enum import auto
from enum import StrEnum

from pydantic import Field
from pydantic import field_serializer

from registry.src.base_schema import BaseSchema
from registry.src.types import TaskType


class Task(BaseSchema):
    task_type: TaskType
    server_id: int


class LogLevel(StrEnum):
    info = auto()
    warning = auto()
    error = auto()


class LoggingEvent(StrEnum):
    command_start = auto()
    command_result = auto()
    general_message = auto()


class CommandStart(BaseSchema):
    command: str


class CommandResult(BaseSchema):
    output: str
    success: bool


class GeneralLogMessage(BaseSchema):
    log_level: LogLevel
    message: str


class LogMessage(BaseSchema):
    event_type: LoggingEvent
    data: CommandStart | CommandResult | GeneralLogMessage
    timestamp: datetime = Field(default_factory=datetime.now)

    @field_serializer("timestamp")
    def serialize_datetime(self, value: datetime, _info) -> str:
        return value.isoformat()

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/user_provided.py:
--------------------------------------------------------------------------------

```python
from typing import override
from uuid import uuid4

from ..user_logger import UserLogger
from .base import DockerfileGenerator
from worker.src.constants import BaseImage


class UserDockerfileGenerator(DockerfileGenerator):
    def __init__(
        self,
        base_image: BaseImage,
        repo_folder: str,
        user_logger: UserLogger,
        build_instructions: str,
    ) -> None:
        self.build_instructions = build_instructions
        super().__init__(
            base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
        )

    @override
    async def generate_dockerfile(self) -> str:
        await self._image_setup()
        await self._code_setup()
        return self._save_dockerfile()

    @override
    async def _code_setup(self) -> None:
        heredoc_identifier = str(uuid4()).replace("-", "")
        lines = [
            "COPY ./ ./",
            f"RUN <<{heredoc_identifier}",
            self.build_instructions,
            heredoc_identifier,
        ]
        self._add_to_file(lines)

```

--------------------------------------------------------------------------------
/mcp_server/src/settings.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path

from pydantic import BaseModel
from pydantic import Field
from pydantic import field_validator
from pydantic_settings import BaseSettings
from pydantic_settings import SettingsConfigDict


parent_folder = Path(__file__).parent.parent


class ServerJson(BaseModel):
    name: str
    description: str
    url: str = Field(alias="endpoint")
    logo: str | None = None

    @field_validator("url")  # noqa
    @classmethod
    def url_ignore_anchor(cls, value: str) -> str:
        return value.split("#")[0]


class Settings(BaseSettings):
    model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True)

    registry_url: str = "http://registry:80"
    log_dir: Path = Path("logs")
    mcp_port: int = 80
    llm_proxy: str | None = None
    openai_base_url: str = "https://openrouter.ai/api/v1"
    openai_api_key: str
    llm_model: str = "openai/gpt-4o-mini"
    server_json: ServerJson = ServerJson.model_validate_json(
        Path(parent_folder / "server.json").read_text()
    )
    log_level: str = "INFO"


settings = Settings()

```

--------------------------------------------------------------------------------
/registry/src/database/session.py:
--------------------------------------------------------------------------------

```python
from collections.abc import AsyncGenerator
from typing import Any

from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

from registry.src.errors import FastApiError
from registry.src.logger import logger
from registry.src.settings import settings

async_engine = create_async_engine(
    settings.database_url_async,
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_pre_ping=True,
)

session_maker = async_sessionmaker(bind=async_engine, expire_on_commit=False)


async def get_unmanaged_session() -> AsyncSession:
    return session_maker()


async def get_session() -> AsyncGenerator[AsyncSession, Any]:
    session = session_maker()

    try:
        yield session
        await session.commit()
    except FastApiError as error:
        await session.rollback()
        raise error
    except Exception as error:
        logger.error(f"Encountered an exception while processing request:\n{error}")
        await session.rollback()
        raise
    finally:
        await session.close()

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/factory.py:
--------------------------------------------------------------------------------

```python
from ..constants import BaseImage
from ..user_logger import UserLogger
from .base import DockerfileGenerator
from .python import PythonGenerator
from .typescript import TypeScriptGenerator
from .user_provided import UserDockerfileGenerator


async def get_dockerfile_generator(
    base_image: BaseImage,
    repo_folder: str,
    build_instructions: str | None,
    user_logger: UserLogger,
) -> DockerfileGenerator:
    if build_instructions:
        await user_logger.info(message="Build instructions found.")
        return UserDockerfileGenerator(
            base_image=base_image,
            repo_folder=repo_folder,
            build_instructions=build_instructions,
            user_logger=user_logger,
        )
    await user_logger.warning(
        message="No build instructions found. Attempting to generate from scratch."
    )
    if base_image == BaseImage.node_lts:
        return TypeScriptGenerator(
            base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
        )
    else:
        return PythonGenerator(
            base_image=base_image, repo_folder=repo_folder, user_logger=user_logger
        )

```

--------------------------------------------------------------------------------
/alembic/versions/2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py:
--------------------------------------------------------------------------------

```python
"""deployment fields

Revision ID: 3b8b8a5b99ad
Revises: 735603f5e4d5
Create Date: 2025-05-13 06:34:52.115171

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '3b8b8a5b99ad'
down_revision = '735603f5e4d5'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('server', sa.Column('repo_url', sa.String(), nullable=True))
    op.add_column('server', sa.Column('command', sa.String(), nullable=True))
    op.alter_column('server', 'url',
               existing_type=sa.VARCHAR(),
               nullable=True)
    op.create_unique_constraint("server_repo_url_unique", 'server', ['repo_url'])
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_constraint("server_repo_url_unique", 'server', type_='unique')
    op.alter_column('server', 'url',
               existing_type=sa.VARCHAR(),
               nullable=False)
    op.drop_column('server', 'command')
    op.drop_column('server', 'repo_url')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/registry/src/search/schemas.py:
--------------------------------------------------------------------------------

```python
from pydantic import Field

from registry.src.base_schema import BaseSchema


class SearchServerURL(BaseSchema):
    url: str


class SearchServer(SearchServerURL):
    id: int
    name: str
    description: str


class SolutionStep(BaseSchema):
    step_description: str
    best_server_description: str = Field(
        ...,
        description="Description of the best server for this step, copy this from `mcp_servers` entry",
    )
    best_server_name: str = Field(
        ...,
        description="Name of the best server for this step, copy this from `mcp_servers` entry",
    )
    best_server: SearchServerURL = Field(
        ..., description="The best server for this step"
    )
    confidence: str = Field(
        ...,
        description="How confident you are that this server is enough for this step?",
    )
    additional_servers: list[SearchServerURL] = Field(
        ...,
        description="Alternative servers if you think the `best_server` may not be enough",
    )


class SearchResponse(BaseSchema):
    solution_steps: list[SolutionStep] = Field(
        ..., description="List of solution steps and servers for each step"
    )

```

--------------------------------------------------------------------------------
/scripts/darp-add.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Add new MCP servers to the registry
"""
import argparse
import json

import requests

default_registry_url: str = "http://localhost:80"


def main() -> None:
    parser = argparse.ArgumentParser(description="Add a new MCP server to the registry")
    parser.add_argument("--url", required=True, help="Endpoint URL for the server")
    parser.add_argument("--name", required=True, help="Unique server name")
    parser.add_argument("--description", required=True, help="Server description")
    parser.add_argument("--logo", default=None, help="Logo URL")
    parser.add_argument("--verbose", action="store_true", help="Verbose output")
    parser.add_argument(
        "--registry-url", default=default_registry_url, help="Registry API endpoint URL"
    )

    args = parser.parse_args()

    server_data = {
        "name": args.name,
        "description": args.description,
        "url": args.url,
        "logo": args.logo,
    }

    response = requests.post(f"{args.registry_url}/servers/", json=server_data)
    response.raise_for_status()
    if args.verbose:
        print(json.dumps(response.json(), indent=2))


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/registry/src/deployments/schemas.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime
from typing import Annotated

from pydantic import Field
from pydantic import field_serializer
from pydantic import HttpUrl

from registry.src.base_schema import BaseSchema
from registry.src.servers.schemas import server_name_regex_pattern
from registry.src.types import HostType
from registry.src.types import ServerStatus
from worker.src.constants import BaseImage
from worker.src.schemas import LogMessage


class ServerLogsSchema(BaseSchema):
    server_id: int
    deployment_logs: list[LogMessage]
    created_at: datetime
    updated_at: datetime


class DeploymentCreateData(BaseSchema):
    name: Annotated[str, Field(max_length=30, pattern=server_name_regex_pattern)]
    title: str = Field(max_length=30)
    description: str
    repo_url: HttpUrl
    command: str | None = None
    logo: str | None = None
    base_image: BaseImage | None = None
    build_instructions: str | None = None
    host_type: HostType = HostType.internal
    status: ServerStatus = ServerStatus.in_processing

    @field_serializer("repo_url")
    def serialize_url(self, repo_url: HttpUrl) -> str:
        return str(repo_url)


class DeploymentCreate(BaseSchema):
    current_user_id: str
    data: DeploymentCreateData


class UserId(BaseSchema):
    current_user_id: str

```

--------------------------------------------------------------------------------
/registry/src/errors.py:
--------------------------------------------------------------------------------

```python
from fastapi import HTTPException
from fastapi import status


class FastApiError(HTTPException):

    def __init__(self, message: str, **kwargs) -> None:
        self.detail = {"message": message, **kwargs}


class InvalidServerNameError(FastApiError):
    status_code: int = status.HTTP_400_BAD_REQUEST

    def __init__(self, name: str) -> None:
        message = "Name must have only letters, digits, underscore. Name must not start with digits."
        super().__init__(message=message, name=name)


class ServerAlreadyExistsError(FastApiError):
    status_code: int = status.HTTP_400_BAD_REQUEST

    def __init__(self, dict_servers: list[dict]) -> None:
        servers_str = ", ".join(server["name"] for server in dict_servers)
        message = f"Server already exists: {servers_str}"
        super().__init__(message=message, servers=dict_servers)


class ServersNotFoundError(FastApiError):
    status_code = status.HTTP_404_NOT_FOUND

    def __init__(self, ids: list[int]) -> None:
        super().__init__(message=f"Server(s) not found: {ids}", ids=ids)


class NotAllowedError(FastApiError):
    status_code = status.HTTP_403_FORBIDDEN


class InvalidData(FastApiError):
    status_code = status.HTTP_400_BAD_REQUEST


class RemoteServerError(FastApiError):
    status_code = status.HTTP_503_SERVICE_UNAVAILABLE

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/typescript/generic.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from typing import override

from ...errors import DependenciesError
from ..base import DockerfileGenerator


class TypeScriptGenerator(DockerfileGenerator):
    @override
    async def _dependencies_installation(self) -> None:
        repo_path = Path(self.repo_folder)
        package_json = self._find_file(repo_path, "package.json")
        if not package_json:
            await self.user_logger.error(message="Error: no package.json found")
            raise DependenciesError("No package.json found")
        await self.user_logger.info(message="package.json found. Adding to Dockerfile")
        package_json_folder = self._relative_path(package_json.parent.absolute())
        lines = [
            f"COPY {package_json_folder}/package*.json ./",
        ]
        ts_config = self._find_file(repo_path, "tsconfig.json")
        if ts_config:
            await self.user_logger.info(
                message="tsconfig.json found. Adding to Dockerfile"
            )
            lines.append(f"COPY {self._relative_path(ts_config.absolute())} ./")
        lines.append("RUN npm install --ignore-scripts")
        self._add_to_file(lines)

    @override
    async def _code_setup(self) -> None:
        lines = [
            "COPY ./ ./",
            "RUN npm run build",
        ]
        self._add_to_file(lines)

```

--------------------------------------------------------------------------------
/docker-compose-debug.yaml:
--------------------------------------------------------------------------------

```yaml
services:

  registry:
    image: registry
    pull_policy: never
    restart: unless-stopped
    ports:
      - "${REGISTRY_PORT:-80}:80"
    profiles:
      - ''
    volumes:
      - ./registry:/workspace/registry
      - ./alembic:/workspace/alembic
      - ./common:/workspace/common
      - ./worker:/workspace/worker
    command: start-debug.sh

  registry_mcp_server:
    image: registry_mcp_server
    pull_policy: never
    restart: unless-stopped
    ports:
      - "${REGISTRY_MCP_PORT:-4689}:80"
    profiles:
      - ''
    volumes:
      - ./mcp_server:/workspace/mcp_server
      - ./common:/workspace/common
    command: start-debug.sh

  registry_deploy_worker:
    pull_policy: never
    profiles:
      - ''
    volumes:
      - ./worker:/workspace/worker
      - ./registry:/workspace/registry
    restart: unless-stopped
    command: worker-start-debug.sh

  portal_zookeeper:
    profiles:
      - ''
    restart: unless-stopped

  portal_kafka:
    profiles:
      - ''
    restart: unless-stopped

  registry_postgres:
    ports:
      - "${REGISTRY_POSTGRES_PORT:-5432}:5432"
    profiles:
      - ''
    restart: unless-stopped

  registry_healthchecker:
    pull_policy: never
    profiles:
      - ''
    volumes:
      - ./registry:/workspace/registry
      - ./healthchecker:/workspace/healthchecker
    restart: unless-stopped
    command: worker-start-debug.sh
```

--------------------------------------------------------------------------------
/registry/src/settings.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path

from pydantic_settings import BaseSettings
from pydantic_settings import SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True)

    postgres_user: str
    postgres_db: str
    postgres_password: str
    postgres_host: str
    postgres_port: int = 5432

    db_pool_size: int = 50
    db_max_overflow: int = 25
    log_dir: Path = Path("logs")
    llm_proxy: str | None = None
    openai_api_key: str
    llm_model: str = "openai/gpt-4o-mini"
    openai_base_url: str = "https://openrouter.ai/api/v1"

    worker_topic: str = "deployment"
    deployment_server: str = ""
    deployment_network: str = ""
    broker_url: str = "portal_kafka:19092"
    dockerhub_login: str = ""
    dockerhub_password: str = ""
    registry_url: str = "http://registry/"
    healthcheck_running_interval: int = 3600
    tool_retry_count: int = 60
    tool_wait_interval: int = 5

    deep_research_server_name: str = "deepresearch_darpserver"

    @property
    def database_url(self) -> str:
        auth_data = f"{self.postgres_user}:{self.postgres_password}"
        host = f"{self.postgres_host}:{self.postgres_port}"
        return f"{auth_data}@{host}/{self.postgres_db}"

    @property
    def database_url_sync(self) -> str:
        return f"postgresql+psycopg2://{self.database_url}"

    @property
    def database_url_async(self) -> str:
        return f"postgresql+asyncpg://{self.database_url}"


settings = Settings()

```

--------------------------------------------------------------------------------
/mcp_server/src/decorators.py:
--------------------------------------------------------------------------------

```python
import functools
import inspect
import sys
import traceback
from collections.abc import Awaitable
from collections.abc import Callable
from collections.abc import Coroutine
from typing import Any
from typing import overload
from typing import ParamSpec
from typing import TypeVar

from mcp_server.src.logger import logger

X = TypeVar("X")
P = ParamSpec("P")


@overload
def log_errors(  # noqa
    func: Callable[P, Coroutine[Any, Any, X]]
) -> Callable[P, Coroutine[Any, Any, X]]: ...


@overload
def log_errors(func: Callable[P, X]) -> Callable[P, X]: ...  # noqa


def log_errors(func: Callable[P, Any]) -> Callable[P, Any]:

    def log_error(args: tuple, kwargs: dict) -> None:
        info = sys.exc_info()[2]
        assert (
            info is not None and info.tb_next is not None
        ), f"No traceback available {sys.exc_info()[2]=}"
        locals_vars = info.tb_frame.f_locals
        logger.error(f"{traceback.format_exc()} \n{locals_vars=} \n{args=} \n{kwargs=}")

    @functools.wraps(func)
    def sync_wrapped(*args: P.args, **kwargs: P.kwargs) -> Any:
        try:
            return func(*args, **kwargs)
        except Exception:
            log_error(args, kwargs)
            raise

    @functools.wraps(func)
    async def async_wrapped(*args: P.args, **kwargs: P.kwargs) -> Awaitable[Any]:
        try:
            return await func(*args, **kwargs)
        except Exception:
            log_error(args, kwargs)
            raise

    if inspect.iscoroutinefunction(func):
        return async_wrapped

    return sync_wrapped

```

--------------------------------------------------------------------------------
/alembic/versions/2025_03_24_0945-ffe7a88d5b15_initial_migration.py:
--------------------------------------------------------------------------------

```python
"""Initial migration

Revision ID: ffe7a88d5b15
Revises: 
Create Date: 2025-03-24 09:45:42.840081

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'ffe7a88d5b15'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('server',
    sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
    sa.Column('url', sa.String(), nullable=False),
    sa.Column('name', sa.String(), nullable=False),
    sa.Column('description', sa.String(), nullable=False),
    sa.Column('logo', sa.String(), nullable=False),
    sa.PrimaryKeyConstraint('id'),
    sa.UniqueConstraint('name'),
    sa.UniqueConstraint('url')
    )
    op.create_table('tool',
    sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
    sa.Column('name', sa.String(), nullable=False),
    sa.Column('description', sa.String(), nullable=False),
    sa.Column('input_schema', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
    sa.Column('server_url', sa.String(), nullable=False),
    sa.ForeignKeyConstraint(['server_url'], ['server.url'], ondelete='CASCADE'),
    sa.PrimaryKeyConstraint('id'),
    sa.UniqueConstraint('name', 'server_url', name='uq_tool_name_server_url')
    )
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('tool')
    op.drop_table('server')
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/healthchecker/src/__main__.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging

from sqlalchemy.ext.asyncio import AsyncSession

from common.notifications.client import NotificationsClient
from healthchecker.src.checker import HealthChecker
from registry.src.database.session import get_unmanaged_session
from registry.src.servers.repository import ServerRepository
from registry.src.settings import settings

logger: logging.Logger = logging.getLogger("Healthchecker")


def setup_logging() -> None:
    logging.basicConfig(level=logging.INFO)
    logger.info("Starting healthchecker")
    logger.info(
        "Healthchecker will run every %d seconds", settings.healthcheck_running_interval
    )


async def perform_healthcheck_step() -> None:
    logger.info("Running health-checks")
    session: AsyncSession = await get_unmanaged_session()
    repo: ServerRepository = ServerRepository(session)
    notifications_client: NotificationsClient = NotificationsClient()
    healthchecker: HealthChecker = HealthChecker(
        session=session,
        servers_repo=repo,
        notifications_client=notifications_client,
    )

    try:
        await healthchecker.run_once()
    except Exception as error:
        logger.error("Error in healthchecker", exc_info=error)

    logger.info("Health-checks finished")
    logger.info("Sleeping for %d seconds", settings.healthcheck_running_interval)

    await session.commit()
    await session.close()


async def main() -> None:
    setup_logging()

    while True:
        await perform_healthcheck_step()
        await asyncio.sleep(settings.healthcheck_running_interval)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/base.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path

from ..constants import BaseImage
from ..user_logger import UserLogger


class DockerfileGenerator:
    def __init__(
        self, base_image: BaseImage, repo_folder: str, user_logger: UserLogger
    ):
        self.base_image = base_image
        self.lines: list[str] = []
        self.repo_folder = repo_folder
        self.user_logger = user_logger

    async def generate_dockerfile(self) -> str:
        await self._image_setup()
        await self._dependencies_installation()
        await self._code_setup()
        return self._save_dockerfile()

    async def _image_setup(self) -> None:
        lines = [
            f"FROM {self.base_image.value}",
            "WORKDIR /workspace",
        ]
        self._add_to_file(lines)

    async def _dependencies_installation(self) -> None:
        raise NotImplementedError

    async def _code_setup(self) -> None:
        raise NotImplementedError

    def _add_to_file(self, lines: list[str]) -> None:
        self.lines.extend(lines)

    @staticmethod
    def _find_file(repo_folder: Path, file: str) -> Path | None:
        files = repo_folder.rglob(file)
        try:
            return next(files)
        except StopIteration:
            return None

    def _save_dockerfile(self) -> str:
        dockerfile_text = "\n".join(self.lines)
        with open(f"{self.repo_folder}/Dockerfile", "w") as dockerfile:
            dockerfile.write(dockerfile_text)
        return dockerfile_text

    def _relative_path(self, path: Path) -> str:
        str_path = str(path)
        if str_path.startswith(self.repo_folder):
            return "." + str(str_path[len(self.repo_folder) :])
        return str_path

```

--------------------------------------------------------------------------------
/registry/src/deployments/router.py:
--------------------------------------------------------------------------------

```python
from fastapi import APIRouter
from fastapi import Depends
from fastapi import status

from .schemas import DeploymentCreate
from .schemas import ServerLogsSchema
from .schemas import UserId
from .service import DeploymentService
from registry.src.database import Server
from registry.src.database import ServerLogs
from registry.src.servers.schemas import ServerRead

router = APIRouter(prefix="/deployments")


@router.post("/", status_code=status.HTTP_201_CREATED, response_model=ServerRead)
async def create_server(
    data: DeploymentCreate,
    service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> Server:
    server = await service.create_server(data)
    return server


@router.get("/{server_id}/logs", response_model=ServerLogsSchema)
async def get_logs(
    server_id: int,
    current_user_id: str,
    service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> ServerLogs:
    logs = await service.get_logs(server_id=server_id, current_user_id=current_user_id)
    return logs


@router.post("/{server_id}/stop", status_code=status.HTTP_202_ACCEPTED)
async def stop_server(
    server_id: int,
    user_id: UserId,
    service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> None:
    await service.stop_server(
        server_id=server_id, current_user_id=user_id.current_user_id
    )


@router.post("/{server_id}/start", status_code=status.HTTP_202_ACCEPTED)
async def start_server(
    server_id: int,
    user_id: UserId,
    service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> None:
    await service.start_server(
        server_id=server_id, current_user_id=user_id.current_user_id
    )

```

--------------------------------------------------------------------------------
/registry/src/database/models/server.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy import String
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship

from ...database import models
from ...types import HostType
from ...types import ServerTransportProtocol
from .base import Base
from .mixins import HasCreatedAt
from .mixins import HasUpdatedAt
from registry.src.types import ServerStatus


class Server(HasCreatedAt, HasUpdatedAt, Base):
    __tablename__ = "server"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(), nullable=False)
    url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True)
    name: Mapped[str] = mapped_column(String(), nullable=False, unique=True)
    description: Mapped[str] = mapped_column(String(), nullable=False)
    logo: Mapped[str | None] = mapped_column(String(), nullable=True)
    host_type: Mapped[HostType] = mapped_column(String(), nullable=False)
    creator_id: Mapped[str | None] = mapped_column(String(), nullable=True)
    repo_url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True)
    command: Mapped[str | None] = mapped_column(String(), nullable=True)
    base_image: Mapped[str | None] = mapped_column(String(), nullable=True)
    transport_protocol: Mapped[ServerTransportProtocol | None] = mapped_column(
        String(),
        nullable=True,
    )
    build_instructions: Mapped[str | None] = mapped_column(String(), nullable=True)
    tools: Mapped[list["models.tool.Tool"]] = relationship(
        back_populates="server", cascade="all, delete-orphan"
    )
    status: Mapped[ServerStatus] = mapped_column(
        String(), nullable=False, default=ServerStatus.active
    )

```

--------------------------------------------------------------------------------
/common/notifications/client.py:
--------------------------------------------------------------------------------

```python
from httpx import AsyncClient

from common.notifications.enums import NotificationEvent
from common.notifications.exceptions import NotificationsServiceError

DEFAULT_NOTIFICATIONS_CLIENT_URI: str = "http://mailing_api:8000"


class NotificationsClient:
    def __init__(self, base_url: str | None = None) -> None:
        if base_url is None:
            base_url = DEFAULT_NOTIFICATIONS_CLIENT_URI

        self._client: AsyncClient = AsyncClient(base_url=base_url)

    async def _notify(
        self,
        user_id: str,
        template: NotificationEvent,
        data: dict,
    ) -> None:
        response = await self._client.post(
            f"/v1/emails/{template.value}",
            json=dict(user_id=user_id, **data),
        )

        if response.status_code != 200:
            raise NotificationsServiceError(f"Failed to send email: {response.text}")

    async def send_server_down_alert(
        self,
        user_id: str,
        server_name: str,
        server_logs: str | None = None,
    ):
        await self._notify(
            user_id,
            NotificationEvent.server_healthcheck_failed,
            data={
                "server_name": server_name,
                "server_logs": server_logs,
            },
        )

    async def notify_deploy_failed(self, user_id: str, server_name: str) -> None:
        await self._notify(
            user_id,
            NotificationEvent.deploy_failed,
            data={
                "server_name": server_name,
            },
        )

    async def notify_deploy_successful(self, user_id: str, server_name: str) -> None:
        await self._notify(
            user_id,
            NotificationEvent.deploy_successful,
            data={
                "server_name": server_name,
            },
        )

```

--------------------------------------------------------------------------------
/registry/src/search/service.py:
--------------------------------------------------------------------------------

```python
from typing import Self

import instructor
from httpx import AsyncClient
from openai import AsyncOpenAI

from .prompts import get_top_servers
from .schemas import SearchResponse
from .schemas import SearchServer
from registry.src.logger import logger
from registry.src.settings import settings


class SearchService:
    def __init__(self) -> None:
        http_client: AsyncClient = (
            AsyncClient()
            if settings.llm_proxy is None
            else AsyncClient(proxy=settings.llm_proxy)
        )
        self.llm = instructor.from_openai(
            AsyncOpenAI(http_client=http_client, base_url=settings.openai_base_url)
        )

    async def _get_search_response(
        self, servers: list[SearchServer], query: str
    ) -> SearchResponse:
        prompt = get_top_servers.format(servers_list=servers, request=query)
        completion = await self.llm.chat.completions.create(
            model=settings.llm_model,
            messages=[
                {
                    "role": "user",
                    "content": prompt,
                },
            ],
            response_model=SearchResponse,
        )
        logger.debug(f"{completion=}")
        return completion

    @staticmethod
    def _collect_urls(search_response: SearchResponse) -> list[str]:
        server_urls = set()
        for solution_step in search_response.solution_steps:
            server_urls.add(solution_step.best_server.url)
            server_urls |= {server.url for server in solution_step.additional_servers}
        return list(server_urls)

    async def get_fitting_servers(
        self, servers: list[SearchServer], query: str
    ) -> list[str]:
        search_response = await self._get_search_response(servers=servers, query=query)
        return self._collect_urls(search_response=search_response)

    @classmethod
    async def get_new_instance(cls) -> Self:
        return cls()

```

--------------------------------------------------------------------------------
/worker/src/user_logger.py:
--------------------------------------------------------------------------------

```python
from sqlalchemy.ext.asyncio import AsyncSession

from .schemas import CommandResult
from .schemas import CommandStart
from .schemas import GeneralLogMessage
from .schemas import LoggingEvent
from .schemas import LogLevel
from .schemas import LogMessage
from registry.src.deployments.logs_repository import LogsRepository


class UserLogger:

    def __init__(self, session: AsyncSession, server_id: int):
        self.logs_repo = LogsRepository(session=session)
        self.server_id = server_id

    async def clear_logs(self) -> None:
        await self.logs_repo.clear_logs(server_id=self.server_id)

    async def command_start(self, command: str) -> None:
        log = LogMessage(
            event_type=LoggingEvent.command_start, data=CommandStart(command=command)
        )
        await self.logs_repo.append_to_deployment_logs(
            server_id=self.server_id, log=log
        )

    async def command_result(self, output: str, success: bool) -> None:
        data = CommandResult(output=output, success=success)
        log = LogMessage(event_type=LoggingEvent.command_result, data=data)
        await self.logs_repo.append_to_deployment_logs(
            server_id=self.server_id, log=log
        )

    async def info(self, message: str) -> None:
        await self._add_message(log_level=LogLevel.info, message=message)

    async def warning(self, message: str) -> None:
        await self._add_message(log_level=LogLevel.warning, message=message)

    async def error(self, message: str) -> None:
        await self._add_message(log_level=LogLevel.error, message=message)

    async def _add_message(self, log_level: LogLevel, message: str):
        data = GeneralLogMessage(log_level=log_level, message=message)
        log = LogMessage(event_type=LoggingEvent.general_message, data=data)
        await self.logs_repo.append_to_deployment_logs(
            server_id=self.server_id, log=log
        )

```

--------------------------------------------------------------------------------
/registry/src/deployments/logs_repository.py:
--------------------------------------------------------------------------------

```python
from typing import Self

from fastapi import Depends
from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession

from registry.src.database import get_session
from registry.src.database import ServerLogs
from worker.src.schemas import LogMessage


class LogsRepository:
    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def create_logs(self, server_id: int) -> ServerLogs:
        server_logs = ServerLogs(server_id=server_id)
        self.session.add(server_logs)
        await self.session.flush()
        await self.session.refresh(server_logs)
        return server_logs

    async def get_server_logs(self, server_id: int) -> ServerLogs | None:
        query = select(ServerLogs).where(ServerLogs.server_id == server_id)
        logs = (await self.session.execute(query)).scalar_one_or_none()
        return logs

    async def update_server_logs(
        self, server_id: int, deployment_logs: list[dict]
    ) -> None:
        query = (
            update(ServerLogs)
            .where(ServerLogs.server_id == server_id)
            .values(deployment_logs=deployment_logs)
        )
        await self.session.execute(query)
        await self.session.flush()
        await self.session.commit()

    async def append_to_deployment_logs(self, server_id: int, log: LogMessage) -> None:
        query = select(ServerLogs).where(ServerLogs.server_id == server_id)
        logs: ServerLogs = (await self.session.execute(query)).scalar_one_or_none()
        if not logs:
            raise ValueError
        deployment_logs = logs.deployment_logs + [log.model_dump()]
        await self.update_server_logs(
            server_id=server_id, deployment_logs=deployment_logs
        )

    async def clear_logs(self, server_id: int) -> None:
        await self.update_server_logs(server_id=server_id, deployment_logs=[])

    @classmethod
    async def get_new_instance(
        cls, session: AsyncSession = Depends(get_session)
    ) -> Self:
        return cls(session=session)

```

--------------------------------------------------------------------------------
/alembic/env.py:
--------------------------------------------------------------------------------

```python
from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context
from registry.src.database import Base
from registry.src.settings import settings

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config


# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata

target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


config.set_main_option("sqlalchemy.url", settings.database_url_sync)


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

```

--------------------------------------------------------------------------------
/mcp_server/src/llm/tool_manager.py:
--------------------------------------------------------------------------------

```python
import json

from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import CallToolResult
from openai.types.chat import ChatCompletionMessageToolCall
from openai.types.chat import ChatCompletionToolMessageParam
from openai.types.chat import ChatCompletionToolParam

from mcp_server.src.schemas import RegistryServer
from mcp_server.src.schemas import ToolInfo


class ToolManager:
    def __init__(self, servers: list[RegistryServer]) -> None:
        self.renamed_tools: dict[str, ToolInfo] = {}
        self.tools: list[ChatCompletionToolParam] = self.set_tools(servers)

    def rename_and_save(self, tool_name: str, server: RegistryServer) -> str:
        renamed_tool = f"{tool_name}_mcp_{server.name}"
        self.renamed_tools[renamed_tool] = ToolInfo(tool_name=tool_name, server=server)
        return renamed_tool

    def set_tools(self, servers: list[RegistryServer]) -> list[ChatCompletionToolParam]:
        return [
            ChatCompletionToolParam(
                type="function",
                function={
                    "name": self.rename_and_save(tool.name, server=server),
                    "description": tool.description,
                    "parameters": tool.input_schema,
                },
            )
            for server in servers
            for tool in server.tools
        ]

    @staticmethod
    async def _request_mcp(
        arguments: str,
        server_url: str,
        tool_name: str,
    ) -> CallToolResult:
        async with sse_client(server_url) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()

                return await session.call_tool(
                    tool_name,
                    arguments=(json.loads(arguments) if arguments else None),
                )

    async def handle_tool_call(
        self, tool_call: ChatCompletionMessageToolCall
    ) -> ChatCompletionToolMessageParam:
        tool_info = self.renamed_tools.get(tool_call.function.name)
        if not tool_info:
            return ChatCompletionToolMessageParam(
                role="tool",
                content="Error: Incorrect tool name",
                tool_call_id=tool_call.id,
            )
        tool_call_result = await self._request_mcp(
            arguments=tool_call.function.arguments,
            server_url=tool_info.server.url,
            tool_name=tool_info.tool_name,
        )
        return ChatCompletionToolMessageParam(
            role="tool",
            content=tool_call_result.content[0].text,
            tool_call_id=tool_call.id,
        )

```

--------------------------------------------------------------------------------
/registry/src/servers/schemas.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime
from typing import Annotated
from typing import Any

from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_serializer

from registry.src.base_schema import BaseSchema
from registry.src.types import HostType
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.constants import BaseImage


tool_name_regex = "^[a-zA-Z0-9_-]{1,64}$"
server_name_regex_pattern = "^[a-z0-9]+(_[a-z0-9]+)*$"


class Tool(BaseSchema):
    model_config = ConfigDict(populate_by_name=True, **BaseSchema.model_config)
    name: str = Field(pattern=tool_name_regex)
    alias: str = Field(pattern=tool_name_regex)
    description: str
    input_schema: dict[str, Any] = Field(validation_alias="inputSchema")


class ServerCreateData(BaseSchema):
    name: Annotated[str, Field(pattern=server_name_regex_pattern, max_length=30)]
    title: str = Field(max_length=30)
    description: str
    url: str
    logo: str | None = None
    host_type: HostType = HostType.external


class ServerCreate(BaseSchema):
    data: ServerCreateData
    current_user_id: str | None = None


class ServerRead(BaseSchema):
    title: str
    id: int
    status: ServerStatus
    name: str
    creator_id: str | None
    description: str
    repo_url: str | None
    url: str | None
    command: str | None
    logo: str | None
    host_type: HostType
    base_image: BaseImage | None = None
    build_instructions: str | None = None
    transport_protocol: ServerTransportProtocol | None
    created_at: datetime
    updated_at: datetime

    @field_serializer("created_at", "updated_at")
    def serialize_datetime(self, value: datetime, _info) -> str:
        return value.isoformat()


class ServerWithTools(ServerRead):
    tools: list[Tool]


class ServerUpdateData(BaseSchema):
    title: str | None = Field(max_length=30, default=None)
    name: Annotated[
        str | None, Field(pattern=server_name_regex_pattern, max_length=30)
    ] = None
    description: str | None = None
    url: str | None = None
    logo: str | None = None


class DeploymentUpdateData(BaseSchema):
    command: str | None = None
    repo_url: str | None = None
    base_image: BaseImage | None = None
    build_instructions: str | None = None


class ServerUpdate(BaseSchema):
    data: ServerUpdateData
    deployment_data: DeploymentUpdateData
    current_user_id: str | None = None


class MCP(BaseSchema):
    name: str
    description: str
    endpoint: str = Field(validation_alias="url")
    logo: str | None = None


class MCPJson(BaseSchema):
    version: str = "1.0"
    servers: list[MCP]

```

--------------------------------------------------------------------------------
/scripts/darp-search.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
List matching MCP servers from the registry.
"""
import argparse
import json
import sys
from typing import Any
from typing import Dict
from typing import List

import requests

default_registry_url: str = "http://localhost:80"


def main() -> None:
    parser = argparse.ArgumentParser(description="Search for servers in the registry")
    parser.add_argument("query", help="Search query string")
    parser.add_argument(
        "--registry-url",
        default=default_registry_url,
        help=f"Registry URL (default: {default_registry_url})",
    )
    parser.add_argument(
        "--format",
        choices=["names", "fulltext", "json"],
        default="names",
        help="Output format (default: names)",
    )

    args = parser.parse_args()

    servers = search_servers(args.query, args.registry_url)
    display_servers(servers, args.format)


def search_servers(
    query: str, base_url: str = default_registry_url
) -> List[Dict[str, Any]]:
    url = f"{base_url}/servers/search"
    params = {"query": query}

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as error:
        print(f"Error: Failed to search servers - {error}", file=sys.stderr)
        sys.exit(1)


def display_servers(
    servers: List[Dict[str, Any]], format_output: str = "names"
) -> None:
    if format_output == "json":
        display_servers_json(servers)
    elif format_output == "names":
        display_servers_names(servers)
    elif format_output == "fulltext":
        display_servers_fulltext(servers)


def display_servers_json(servers: List[Dict[str, Any]]) -> None:
    print(json.dumps(servers, indent=2))


def display_servers_names(servers: List[Dict[str, Any]]) -> None:
    print(f"Found {len(servers)} servers:")
    for server in servers:
        print(f"{server['name']}")


def display_servers_fulltext(servers: List[Dict[str, Any]]) -> None:
    print(f"Found {len(servers)} servers:")
    for server in servers:
        print(f"{server['id']}: {server['name']}")
        print(f"{indent(server['url'], 1)}")
        print(f"{indent(server['description'], 1)}")
        if server["tools"]:
            print(indent("Tools:", 1))
        for tool in server["tools"]:
            print(f"{indent(tool['name'], 2)}")
            print(f"{indent(tool['description'], 2)}")
            print(f"{indent(json.dumps(tool['input_schema'], indent=2), 2)}")


def indent(text: str, level: int = 1, prefix: str = "  ") -> str:
    return "\n".join(prefix * level + line for line in text.split("\n"))


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/mcp_server/src/tools.py:
--------------------------------------------------------------------------------

```python
import logging

from openai.types.chat import ChatCompletionMessage
from openai.types.chat import ChatCompletionMessageParam
from openai.types.chat import ChatCompletionToolMessageParam
from openai.types.chat import ChatCompletionUserMessageParam

from common.llm.client import LLMClient
from mcp_server.src.decorators import log_errors
from mcp_server.src.llm.prompts import default_prompt
from mcp_server.src.llm.tool_manager import ToolManager
from mcp_server.src.registry_client import RegistryClient
from mcp_server.src.settings import settings


class Tools:
    def __init__(self):
        self.registry_client = RegistryClient()
        logger = logging.getLogger("LLMClient")
        self.llm_client = LLMClient(
            logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url
        )

    @log_errors
    async def search(self, request: str) -> list[str]:
        servers = await self.registry_client.search(request=request)
        return [server.url for server in servers]

    @log_errors
    async def routing(
        self, request: str
    ) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]:
        servers = await self.registry_client.search(request=request)
        manager = ToolManager(servers)
        user_message = ChatCompletionUserMessageParam(content=request, role="user")
        resulting_conversation = await self._llm_request(
            messages=[user_message], manager=manager
        )
        return resulting_conversation

    async def _llm_request(
        self,
        messages: list[ChatCompletionMessageParam],
        manager: ToolManager,
        response_accumulator: (
            list[ChatCompletionMessage | ChatCompletionToolMessageParam] | None
        ) = None,
    ) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]:
        if response_accumulator is None:
            response_accumulator = []
        completion = await self.llm_client.request(
            system_prompt=default_prompt,
            model=settings.llm_model,
            messages=messages,
            tools=manager.tools,
        )
        choice = completion.choices[0]
        response_accumulator.append(choice.message)
        if choice.message.tool_calls:
            tool_calls = choice.message.tool_calls
            tool_result_messages = [
                await manager.handle_tool_call(tool_call) for tool_call in tool_calls
            ]
            response_accumulator += tool_result_messages
            conversation = messages + [choice.message] + tool_result_messages
            response_accumulator = await self._llm_request(
                messages=conversation,
                manager=manager,
                response_accumulator=response_accumulator,
            )
        return response_accumulator

```

--------------------------------------------------------------------------------
/registry/src/validator/service.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Self

from fastapi import Depends
from openai.types.chat import ChatCompletionSystemMessageParam
from openai.types.chat import ChatCompletionUserMessageParam

from .prompts import SENS_TOPIC_CHECK_SYSTEM_PROMPT
from .prompts import SENS_TOPIC_CHECK_USER_PROMPT
from common.llm.client import LLMClient
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerCreate
from registry.src.servers.schemas import Tool
from registry.src.settings import settings
from registry.src.validator.schemas import ServerNeedsSensitiveDataResponse
from registry.src.validator.schemas import Tools
from registry.src.validator.schemas import ValidationResult
from registry.src.validator.ssl import SSLHelper


class ValidationService:
    def __init__(
        self,
        servers_repo: ServerRepository,
        ssl_helper: SSLHelper,
    ) -> None:
        self._servers_repo = servers_repo
        self._ssl_helper = ssl_helper
        logger = logging.getLogger("LLMClient")
        self._llm_client = LLMClient(
            logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url
        )

    async def _mcp_needs_sensitive_data(
        self,
        data: ServerCreate,
        tools: list[Tool],
    ) -> bool:
        system_message = ChatCompletionSystemMessageParam(
            content=SENS_TOPIC_CHECK_SYSTEM_PROMPT.render(), role="system"
        )

        message = ChatCompletionUserMessageParam(
            content=SENS_TOPIC_CHECK_USER_PROMPT.render(
                server_data=data,
                tools_data=Tools.model_validate(tools),
            ),
            role="user",
        )

        response = await self._llm_client.request_to_beta(
            model=settings.llm_model,
            messages=[system_message, message],
            response_format=ServerNeedsSensitiveDataResponse,
        )

        answer = ServerNeedsSensitiveDataResponse.model_validate_json(
            response.choices[0].message.content,
        )

        return answer.server_needs_sensitive_data

    async def validate_server(
        self, data: ServerCreate, tools: list[Tool]
    ) -> ValidationResult:
        result = ValidationResult()

        result.server_requires_sensitive_data = await self._mcp_needs_sensitive_data(
            data,
            tools,
        )
        result.authority_level = await self._ssl_helper.get_ssl_authority_level(
            data.url,
        )

        return result

    @classmethod
    def get_new_instance(
        cls,
        servers_repo: ServerRepository = Depends(ServerRepository.get_new_instance),
        ssl_helper: SSLHelper = Depends(SSLHelper.get_new_instance),
    ) -> Self:
        return cls(
            servers_repo=servers_repo,
            ssl_helper=ssl_helper,
        )

```

--------------------------------------------------------------------------------
/worker/src/consumer.py:
--------------------------------------------------------------------------------

```python
from datetime import timedelta
from typing import Any

from aiokafka import AIOKafkaConsumer

from .deployment_service import WorkerDeploymentService
from registry.src.database import get_unmanaged_session
from registry.src.logger import logger
from registry.src.servers.repository import ServerRepository
from registry.src.settings import settings
from worker.src.schemas import Task
from worker.src.schemas import TaskType


class Consumer:
    def __init__(self) -> None:
        self.max_poll_interval: int = int(timedelta(minutes=30).total_seconds())
        self.session_timeout: int = int(timedelta(minutes=10).total_seconds())
        self.task_handlers: dict[TaskType, Any] = {
            TaskType.deploy: self.deploy_server,
            TaskType.start: self.start_server,
            TaskType.stop: self.stop_server,
        }

    async def start(self) -> None:
        consumer = AIOKafkaConsumer(
            settings.worker_topic,
            bootstrap_servers=settings.broker_url,
            auto_offset_reset="earliest",
            group_id="deploy_workers",
            group_instance_id="1",
            max_poll_interval_ms=self.max_poll_interval * 1000,
            session_timeout_ms=self.session_timeout * 1000,
        )
        await consumer.start()
        try:
            await self.process_messages(consumer)
        finally:
            await consumer.stop()

    async def process_messages(self, consumer: AIOKafkaConsumer) -> None:
        async for message in consumer:
            incoming_message = self.parse_message(message.value)
            if not incoming_message:
                continue
            await self.process_message(incoming_message)

    async def process_message(self, task: Task) -> None:
        session = await get_unmanaged_session()
        try:
            repo = ServerRepository(session)
            await self.task_handlers[task.task_type](task.server_id, repo)
        except Exception as error:
            logger.error("Error while processing a task", exc_info=error)
        await session.commit()

    async def deploy_server(self, server_id: int, repo: ServerRepository) -> None:
        service = WorkerDeploymentService(repo=repo, logger=logger)
        await service.deploy_server(server_id=server_id)

    async def stop_server(self, server_id: int, repo: ServerRepository) -> None:
        raise NotImplementedError()

    async def start_server(self, server_id: int, repo: ServerRepository) -> None:
        raise NotImplementedError()

    @staticmethod
    def parse_message(message: bytes) -> Task | None:
        logger.debug(f"Incoming message - {message.decode()}")
        try:
            task = Task.model_validate_json(message.decode())
            return task
        except Exception as error:
            logger.error("Incorrect message received", exc_info=error)
            return None

```

--------------------------------------------------------------------------------
/registry/src/validator/ssl.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging
import ssl
from typing import Self
from urllib.parse import urlparse

from cryptography import x509
from cryptography.hazmat._oid import ExtensionOID
from cryptography.hazmat.backends import default_backend

from .constants import EV_OIDS
from .constants import HTTP_PORT
from .constants import HTTPS_PORT
from .constants import SELF_SIGNED_CERTIFICATE_ERR_CODE
from .schemas import SSLAuthorityLevel


class SSLHelper:
    def __init__(self) -> None:
        self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

    async def get_ssl_authority_level(self, url: str) -> SSLAuthorityLevel:
        try:
            cert = await self._load_certificate(url)
        except ssl.SSLCertVerificationError as exc:
            if exc.verify_code == SELF_SIGNED_CERTIFICATE_ERR_CODE:
                return SSLAuthorityLevel.SELF_SIGNED_CERTIFICATE
            return SSLAuthorityLevel.INVALID_CERTIFICATE
        except Exception as exc:
            self._logger.error("Failed to fetch ssl cert", exc_info=exc)
            return SSLAuthorityLevel.NO_CERTIFICATE

        if self._is_cert_respects_ev(cert):
            return SSLAuthorityLevel.EXTENDED_CERTIFICATE

        return SSLAuthorityLevel.CERTIFICATE_OK

    @classmethod
    def get_new_instance(cls) -> Self:
        return cls()

    @staticmethod
    def _extract_port(uri: str) -> int:
        parsed = urlparse(uri)

        if parsed.port:
            return parsed.port

        if parsed.scheme == "https":
            return HTTPS_PORT
        elif parsed.scheme == "http":
            return HTTP_PORT

        raise ValueError("Invalid URI", uri)

    @staticmethod
    def _extract_host(uri: str) -> str:
        parsed = urlparse(uri)

        if parsed.hostname is None:
            raise ValueError("Invalid URL: No hostname found")

        return parsed.hostname

    @classmethod
    async def _load_certificate(cls, url: str) -> x509.Certificate:
        hostname = cls._extract_host(url)
        port = cls._extract_port(url)

        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = True
        ssl_context.verify_mode = ssl.CERT_REQUIRED

        reader, writer = await asyncio.open_connection(
            hostname, port, ssl=ssl_context, server_hostname=hostname
        )

        ssl_object = writer.get_extra_info("ssl_object")
        der_cert = ssl_object.getpeercert(binary_form=True)
        writer.close()
        await writer.wait_closed()

        cert_obj = x509.load_der_x509_certificate(der_cert, default_backend())
        return cert_obj

    @staticmethod
    def _is_cert_respects_ev(cert: x509.Certificate) -> bool:
        try:
            policies = cert.extensions.get_extension_for_oid(
                ExtensionOID.CERTIFICATE_POLICIES
            ).value
        except x509.ExtensionNotFound:
            return False

        for policy in policies:
            if policy.policy_identifier.dotted_string in EV_OIDS:
                return True

        return False

```

--------------------------------------------------------------------------------
/worker/src/dockerfile_generators/python/generic.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from typing import override

from ...errors import DependenciesError
from ..base import DockerfileGenerator


class PythonGenerator(DockerfileGenerator):
    @override
    async def _dependencies_installation(self) -> None:
        repo_path = Path(self.repo_folder)
        await self._install_node()
        uv_file = self._find_file(repo_path, "uv.lock")
        pyproject_file = self._find_file(repo_path, "pyproject.toml")
        readme_file = self._find_file(repo_path, "README.md")
        if pyproject_file and readme_file:
            await self.user_logger.info(
                message="pyproject.toml found. Installing dependencies with pip"
            )
            self._pip_dependencies(pyproject=pyproject_file, readme_file=readme_file)
            return
        requirements_file = self._find_file(repo_path, "requirements.txt")
        if requirements_file:
            await self.user_logger.info(
                message="requirements.txt found. Installing dependencies with legacy pip method"
            )
            self._legacy_pip_dependencies(requirements_file)
            return
        if uv_file and pyproject_file and readme_file:
            await self.user_logger.info(
                message="uv.lock found. Installing dependencies with uv"
            )
            self._uv_dependencies(
                uv_file=uv_file, pyproject=pyproject_file, readme_file=readme_file
            )
            return
        raise DependenciesError("No supported dependencies installation method found")

    @override
    async def _code_setup(self) -> None:
        lines = [
            "COPY ./ ./",
        ]
        self._add_to_file(lines)

    async def _install_node(self) -> None:
        await self.user_logger.info(message="Adding nodejs for supergateway")
        lines = [
            "RUN apt update && apt install -y nodejs npm",
        ]
        self._add_to_file(lines)

    def _legacy_pip_dependencies(self, requirements: Path) -> None:
        lines = [
            f"COPY {self._relative_path(requirements.absolute())} ./requirements.txt",
            "RUN pip install -r requirements.txt",
        ]
        self._add_to_file(lines)

    def _pip_dependencies(self, pyproject: Path, readme_file: Path) -> None:
        lines = [
            f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml",
            f"COPY {self._relative_path(readme_file.absolute())} ./README.md",
            "RUN pip install .",
        ]
        self._add_to_file(lines)

    def _uv_dependencies(
        self, uv_file: Path, pyproject: Path, readme_file: Path
    ) -> None:
        lines = [
            "COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/",
            f"COPY {self._relative_path(uv_file.absolute())} ./uv.lock",
            f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml",
            f"COPY {self._relative_path(readme_file.absolute())} ./README.md",
            "RUN uv pip install . --system",
        ]
        self._add_to_file(lines)

```

--------------------------------------------------------------------------------
/healthchecker/src/checker.py:
--------------------------------------------------------------------------------

```python
import logging
from collections.abc import Sequence

from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession

from common.notifications.client import NotificationsClient
from registry.src.database.models.server import Server
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import Tool
from registry.src.servers.service import ServerService
from registry.src.types import ServerStatus


class HealthChecker:
    def __init__(
        self,
        session: AsyncSession,
        servers_repo: ServerRepository,
        notifications_client: NotificationsClient,
    ) -> None:
        self.session = session
        self.repo = servers_repo
        self.notifications_client = notifications_client
        self.logger = logging.getLogger("Healthchecker")

    async def run_once(self) -> None:
        servers = await self._fetch_servers()

        for server in servers:
            await self._check_server(server)

    async def _fetch_servers(self) -> Sequence[Server]:
        query: Select = await self.repo.get_all_servers()
        servers: Sequence[Server] = (
            (await self.session.execute(query)).scalars().fetchall()
        )
        return servers

    async def _get_tools(self, server: Server) -> list[Tool]:
        return await ServerService.get_tools(server.url, server.name)

    async def _check_server(self, server: Server) -> None:
        self.logger.info("Checking server %s", server.name)

        try:
            tools = await self._get_tools(server)
        except Exception as error:
            await self._handle_failed_server(server, error)
            return

        if not tools:
            await self._handle_empty_tools(server)
            return

        await self._mark_server_healthy(server, tools)

    async def _handle_failed_server(self, server: Server, error: Exception) -> None:
        self.logger.error("Failed to fetch server's tools", exc_info=error)
        self.logger.warning("Server %s will be marked as unhealthy", server.name)
        await self.notifications_client.send_server_down_alert(
            server.creator_id,
            server_name=server.name,
        )
        await self._update_server_status(server, [], ServerStatus.inactive)

    async def _handle_empty_tools(self, server: Server) -> None:
        self.logger.warning("Server %s is alive, but has no tools", server.name)
        self.logger.warning("Server will be marked as unhealthy.")
        await self.notifications_client.send_server_down_alert(
            server.creator_id,
            server_name=server.name,
            server_logs=(
                "Server has no tools. It will be marked as unhealthy. "
                "Please, check it out."
            ),
        )
        await self._update_server_status(server, [], ServerStatus.inactive)

    async def _mark_server_healthy(self, server: Server, tools: list[Tool]) -> None:
        await self._update_server_status(server, tools, ServerStatus.active)

    async def _update_server_status(
        self, server: Server, tools: list[Tool], status: ServerStatus
    ) -> None:
        await self.repo.update_server(
            id=server.id,
            tools=tools,
            status=status,
        )

```

--------------------------------------------------------------------------------
/alembic.ini:
--------------------------------------------------------------------------------

```
# A generic, single database configuration.

[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =

# max length of characters to apply to the "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to alembic/versions.  When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

sqlalchemy.url = driver://user:pass@localhost/dbname


[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts.  See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARNING
handlers = console
qualname =

[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

```

--------------------------------------------------------------------------------
/common/llm/client.py:
--------------------------------------------------------------------------------

```python
from logging import Logger
from typing import Type

from httpx import AsyncClient
from openai import APIError
from openai import AsyncOpenAI
from openai import InternalServerError
from openai import NOT_GIVEN
from openai import OpenAIError
from openai.types.chat import ChatCompletion
from openai.types.chat import ChatCompletionMessageParam
from openai.types.chat import ChatCompletionToolParam
from pydantic import BaseModel


class LLMClient:
    def __init__(
        self, logger: Logger, proxy: str | None = None, base_url: str | None = None
    ) -> None:
        self._logger = logger

        if proxy is not None:
            self._logger.info("Enabled proxy %s", proxy)
            http_client = AsyncClient(proxy=proxy)
        else:
            http_client = AsyncClient()

        self.openai_client = AsyncOpenAI(http_client=http_client, base_url=base_url)

    @staticmethod
    def _get_full_messages(
        system_prompt: str | None, messages: list[ChatCompletionMessageParam]
    ) -> list[ChatCompletionMessageParam]:
        system_message: list[dict] = []
        if system_prompt:
            system_message = [
                {
                    "role": "system",
                    "content": system_prompt,
                    # Anthropic prompt caching
                    "cache_control": {"type": "ephemeral"},
                }
            ]
        full_messages = system_message + messages
        return full_messages

    async def request(
        self,
        model: str,
        messages: list[ChatCompletionMessageParam],
        system_prompt: str | None = None,
        max_tokens: int | None = None,
        tools: list[ChatCompletionToolParam] | None = None,
    ) -> ChatCompletion:
        full_messages = self._get_full_messages(
            system_prompt=system_prompt, messages=messages
        )
        try:
            response = await self.openai_client.chat.completions.create(
                model=model,
                messages=full_messages,
                max_tokens=max_tokens,
                tools=tools or NOT_GIVEN,
                timeout=30,
            )
        except APIError as error:
            self._logger.error(f"{error.code=} {error.body=}")
            raise
        except InternalServerError as error:
            self._logger.error(error.response.json())
            raise
        except OpenAIError as error:
            self._logger.error(
                "Request to Provider failed with the following exception",
                exc_info=error,
            )
            raise
        return response

    async def request_to_beta(
        self,
        model: str,
        messages: list[ChatCompletionMessageParam],
        system_prompt: str | None = None,
        max_tokens: int | None = None,
        tools: list[ChatCompletionToolParam] | None = None,
        response_format: Type[BaseModel] = NOT_GIVEN,
    ) -> ChatCompletion:
        full_messages = self._get_full_messages(
            system_prompt=system_prompt, messages=messages
        )
        try:
            response = await self.openai_client.beta.chat.completions.parse(
                model=model,
                messages=full_messages,
                max_tokens=max_tokens,
                tools=tools or NOT_GIVEN,
                timeout=30,
                response_format=response_format,
            )
        except APIError as error:
            self._logger.error(f"{error.code=} {error.body=}")
            raise
        except InternalServerError as error:
            self._logger.error(error.response.json())
            raise
        except OpenAIError as error:
            self._logger.error(
                "Request to Provider failed with the following exception",
                exc_info=error,
            )
            raise
        return response

```

--------------------------------------------------------------------------------
/worker/src/docker_service.py:
--------------------------------------------------------------------------------

```python
import subprocess
from pathlib import Path

from .constants import BaseImage
from .dockerfile_generators import DockerfileGenerator
from .dockerfile_generators import get_dockerfile_generator
from .errors import InvalidData
from .errors import ProcessingError
from .user_logger import UserLogger
from registry.src.database import Server
from registry.src.logger import logger
from registry.src.settings import settings


class DockerService:
    def __init__(
        self,
        docker_env: dict,
        repo_folder: str,
        server: Server,
        user_logger: UserLogger,
    ) -> None:
        self.docker_env = docker_env
        self.repo_folder = repo_folder
        self.server = server
        self.image_name = f"hipasus/{self.server.name}"
        self.container_name = f"darp_server_{server.id}"
        self.user_logger = user_logger

    async def clone_repo(self) -> None:
        await self._execute(["git", "clone", self.server.repo_url, self.repo_folder])

    def dockerfile_exists(self) -> bool:
        path = Path(f"{self.repo_folder}/Dockerfile")
        return path.exists()

    async def generate_dockerfile(self) -> None:
        if not self.server.base_image:
            await self.user_logger.error(message="No base image provided.")
            raise InvalidData
        generator: DockerfileGenerator = await get_dockerfile_generator(
            repo_folder=self.repo_folder,
            base_image=BaseImage(self.server.base_image),
            build_instructions=self.server.build_instructions,
            user_logger=self.user_logger,
        )
        try:
            logs = await generator.generate_dockerfile()
            await self.user_logger.info(message=f"Generated Dockerfile:\n{logs}")
            logger.info(logs)
        except ProcessingError as error:
            await self.user_logger.error(
                message=f"Error generating Dockerfile: {error.message or ''}"
            )
            raise

    async def build_image(self) -> None:
        command = [
            "docker",
            "build",
            "-t",
            self.image_name,
            "-f",
            f"{self.repo_folder}/Dockerfile",
            self.repo_folder,
        ]
        await self._execute(command)

    async def push_image(self) -> None:
        docker_push_command = (
            f"docker login -u {settings.dockerhub_login} -p {settings.dockerhub_password} && docker image push {self.image_name}",
        )
        await self.user_logger.info("Pushing image...")
        result = subprocess.run(docker_push_command, shell=True)
        if result.returncode != 0:
            await self.user_logger.error(message="Docker push failed")
            logger.error("docker push failed")
            raise ProcessingError

    async def run_container(self) -> None:
        subprocess.run(
            ["docker", "rm", self.container_name, "--force"], env=self.docker_env
        )
        run_command = [
            "docker",
            "run",
            "--network",
            settings.deployment_network,
            "-d",
            "--restart",
            "always",
            "--pull",
            "always",
            "--name",
            self.container_name,
            self.image_name,
        ]
        if self.server.command:
            run_command.extend(["sh", "-c", self.server.command])
        await self._execute(command=run_command, env=self.docker_env)

    async def _execute(self, command: list[str], env: dict | None = None) -> None:
        command_str = " ".join(command)
        logger.info(f"{command_str=}")
        await self.user_logger.command_start(command=command_str)
        arguments: dict = dict(
            args=command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        if env:
            arguments["env"] = env
        result = subprocess.run(**arguments)
        output = result.stdout.decode() if result.stdout else ""  # noqa
        if result.returncode != 0:
            await self.user_logger.command_result(output=output, success=False)
            logger.error(f"{' '.join(command[:2])} failed")
            raise ProcessingError
        await self.user_logger.command_result(output=output, success=True)

```

--------------------------------------------------------------------------------
/registry/src/servers/router.py:
--------------------------------------------------------------------------------

```python
from typing import Literal

from fastapi import APIRouter
from fastapi import Depends
from fastapi import Query
from fastapi import status as status_codes
from fastapi_pagination import add_pagination
from fastapi_pagination import Page
from fastapi_pagination import Params
from fastapi_pagination.ext.sqlalchemy import paginate
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession

from ..deployments.service import DeploymentService
from ..errors import InvalidData
from .schemas import MCPJson
from .schemas import ServerCreate
from .schemas import ServerRead
from .schemas import ServerUpdate
from .schemas import ServerWithTools
from .service import ServerService
from registry.src.database import get_session
from registry.src.database import Server
from registry.src.search.schemas import SearchServer
from registry.src.search.service import SearchService
from registry.src.types import RoutingMode
from registry.src.types import ServerStatus

router = APIRouter(prefix="/servers")
add_pagination(router)


@router.post(
    "/", response_model=ServerWithTools, status_code=status_codes.HTTP_201_CREATED
)
async def create(
    data: ServerCreate, service: ServerService = Depends(ServerService.get_new_instance)
) -> Server:
    return await service.create(data)


@router.get("/search", response_model=list[ServerWithTools])
async def search(
    query: str,
    routing_mode: RoutingMode = RoutingMode.auto,
    service: ServerService = Depends(ServerService.get_new_instance),
    search_service: SearchService = Depends(SearchService.get_new_instance),
) -> list[Server]:
    if routing_mode == RoutingMode.auto:
        servers = await service.get_search_servers()
        formatted_servers = [SearchServer.model_validate(server) for server in servers]
        server_urls = await search_service.get_fitting_servers(
            servers=formatted_servers, query=query
        )
        return await service.get_servers_by_urls(server_urls=server_urls)
    elif routing_mode == RoutingMode.deepresearch:
        return await service.get_deep_research()
    raise InvalidData(f"Unsupported {routing_mode=}", routing_mode=routing_mode)


@router.get("/batch", response_model=list[ServerWithTools])
async def get_servers_by_ids(
    ids: list[int] = Query(...),
    service: ServerService = Depends(ServerService.get_new_instance),
) -> list[Server]:
    return await service.get_servers_by_ids(ids=ids)


@router.get("/.well-known/mcp.json", response_model=MCPJson)
async def get_mcp_json(
    service: ServerService = Depends(ServerService.get_new_instance),
) -> MCPJson:
    return await service.get_mcp_json()


@router.delete("/{id}")
async def delete_server(
    id: int,
    current_user_id: str,
    service: ServerService = Depends(ServerService.get_new_instance),
) -> None:
    return await service.delete_server(id=id, current_user_id=current_user_id)


@router.get("/", response_model=Page[ServerWithTools])
async def get_all_servers(
    query: str | None = None,
    status: Literal["any"] | ServerStatus = Query(
        title="Server status",
        default=ServerStatus.active,
        description="Filter servers by status, by default servers with status `active` will be returned",
    ),
    creator_id: str | None = None,
    params: Params = Depends(),
    service: ServerService = Depends(ServerService.get_new_instance),
    session: AsyncSession = Depends(get_session),
) -> Page[Server]:
    status_filter: ServerStatus | None = None

    if isinstance(status, ServerStatus):
        status_filter = status

    servers: Select = await service.get_all_servers(
        search_query=query,
        status=status_filter,
        creator_id=creator_id,
    )
    return await paginate(session, servers, params)


@router.get("/{id}", response_model=ServerWithTools)
async def get_server_by_id(
    id: int,
    service: ServerService = Depends(ServerService.get_new_instance),
) -> Server:
    return await service.get_server_by_id(id=id)


@router.put("/{id}", response_model=ServerRead)
async def update_server(
    id: int,
    data: ServerUpdate,
    service: ServerService = Depends(ServerService.get_new_instance),
    deployment_service: DeploymentService = Depends(DeploymentService.get_new_instance),
) -> Server:
    await deployment_service.update_deployment(server_id=id, data=data)
    server = await service.update_server(id=id, data=data)
    return server

```

--------------------------------------------------------------------------------
/scripts/darp-router.py:
--------------------------------------------------------------------------------

```python
import argparse
import asyncio
from typing import Any
from typing import List
from typing import Literal
from typing import Union

from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import CallToolResult
from openai.types.chat import ChatCompletionMessage
from openai.types.chat.chat_completion_content_part_text_param import (
    ChatCompletionContentPartTextParam,
)
from pydantic import BaseModel

default_mcp_url: str = "http://localhost:4689/sse"
tool_name: str = "routing"


async def main() -> None:
    parser = argparse.ArgumentParser(description="DARP Router")
    parser.add_argument("request", type=str, help="Routing request")
    parser.add_argument(
        "--format", choices=["text", "json"], default="text", help="Output format"
    )
    parser.add_argument("--verbose", action="store_true", help="Verbose output")
    args = parser.parse_args()

    response = await request_mcp(
        server_url=default_mcp_url,
        tool_name=tool_name,
        arguments={"request": args.request},
    )
    display_response(response.content[0].text, format=args.format, verbose=args.verbose)


async def request_mcp(
    server_url: str = default_mcp_url,
    tool_name: str = tool_name,
    arguments: dict[str, Any] | None = None,
) -> CallToolResult:
    async with sse_client(server_url) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            return await session.call_tool(
                tool_name,
                arguments=arguments or {},
            )


def display_response(response: str, format: str = "text", verbose: bool = False):
    if format == "json":
        print(response)
    else:
        routing_response = RoutingResponse.model_validate_json(response)
        for message in routing_response.conversation:
            print(message.__str__(verbose))


class PrintableChatCompletionMessage(ChatCompletionMessage):
    def __str__(self, verbose: bool = False) -> str:
        contents = "\n".join(
            filter(
                None,
                [
                    self._format_content("refusal", verbose),
                    self._format_content("function_call", verbose),
                    self._format_content("tool_calls", verbose),
                    self._format_content("content", verbose=True),
                ],
            )
        )
        return f"{self.role}: {contents}"

    def _format_content(self, key: str, verbose: bool = False) -> str | None:
        value = getattr(self, f"_{key}_content")()
        if value is None:
            return None
        if not verbose:
            return f"[{key}]"
        else:
            return f"{value}"

    def _refusal_content(self) -> str | None:
        if self.refusal is not None:
            return "\n" + indent(self.refusal)
        return None

    def _function_call_content(self) -> str | None:
        if self.function_call is not None:
            return f"{self.function_call.name}({self.function_call.arguments})"
        return None

    def _tool_calls_content(self) -> str | None:
        if self.tool_calls is not None:
            return "\n" + indent(
                "\n".join(
                    f"{call.function.name}({call.function.arguments})"
                    for call in self.tool_calls
                )
            )
        return None

    def _content_content(self) -> str | None:
        if self.content is not None:
            return "\n" + indent(self.content)
        return None


class PrintableChatCompletionToolMessageParam(BaseModel):
    role: Literal["tool"]
    content: Union[str, List[ChatCompletionContentPartTextParam]]
    tool_call_id: str

    def __str__(self, verbose: bool = False) -> str:
        if not verbose:
            return f"[{self.role}] ..."
        if isinstance(self.content, str):
            return f"{self.role}: {self.content}"
        elif isinstance(self.content, list):
            contents = "\n".join(
                "{type}: {text}".format(**item) for item in self.content
            )
            return indent(f"{self.role}:\n{indent(contents)}")
        else:
            raise ValueError(f"Invalid content type: {type(self.content)}")


class RoutingResponse(BaseModel):
    conversation: list[
        PrintableChatCompletionMessage | PrintableChatCompletionToolMessageParam
    ]


def indent(text: str, indent: int = 1, prefix: str = "  "):
    return "\n".join(prefix * indent + line for line in text.split("\n"))


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/worker/src/deployment_service.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
import tempfile
from logging import Logger

from .docker_service import DockerService
from .user_logger import UserLogger
from common.notifications.client import NotificationsClient
from registry.src.database import Server
from registry.src.errors import InvalidData
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerUpdateData
from registry.src.servers.schemas import Tool
from registry.src.servers.service import ServerService
from registry.src.settings import settings
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.errors import ProcessingError


class WorkerDeploymentService:
    def __init__(self, repo: ServerRepository, logger: Logger):
        self.repo = repo
        self.logger = logger
        self.docker_env = dict(**os.environ)
        self.docker_env["DOCKER_HOST"] = settings.deployment_server
        self.notifications_client = NotificationsClient()

    async def deploy_server(self, server_id: int) -> None:
        server = await self.repo.get_server(id=server_id)
        if not server:
            raise ProcessingError
        with tempfile.TemporaryDirectory(
            prefix=f"server_{server.id}", dir="/tmp"
        ) as repo_folder:
            user_logger = UserLogger(session=self.repo.session, server_id=server_id)
            await user_logger.clear_logs()
            service = DockerService(
                docker_env=self.docker_env,
                repo_folder=repo_folder,
                server=server,
                user_logger=user_logger,
            )
            server_url = f"http://{service.container_name}/sse"
            try:
                await self._deploy_repo(service=service)
                tools = await self._get_tools(
                    server=server, service=service, server_url=server_url
                )
            except Exception:
                await self._handle_deploy_failure(server)
                raise
        await user_logger.info("Deployment complete.")
        await self._handle_deploy_success(
            tools=tools,
            server=server,
            url=server_url,
            transport_protocol=ServerTransportProtocol.SSE,
        )

    async def _deploy_repo(self, service: DockerService) -> None:
        await service.clone_repo()
        if not service.dockerfile_exists():
            self.logger.info(f"No Dockerfile in repo: {service.server.repo_url}")
            await self._ensure_command(service)
            await service.user_logger.info(
                message="No Dockerfile found in the project. Attempting to generate..."
            )
            await service.generate_dockerfile()
        await service.build_image()
        await service.push_image()
        await service.run_container()

    async def _handle_deploy_failure(self, server: Server) -> None:
        await self.repo.update_server(
            id=server.id,
            data=ServerUpdateData(),
            status=ServerStatus.processing_failed,
        )

        await self.notifications_client.notify_deploy_failed(
            user_id=server.creator_id,
            server_name=server.name,
        )

    async def _handle_deploy_success(
        self,
        tools: list[Tool],
        server: Server,
        url: str,
        transport_protocol: ServerTransportProtocol,
    ) -> None:
        await self.repo.update_server(
            id=server.id,
            tools=tools,
            data=ServerUpdateData(url=url),
            status=ServerStatus.active,
            transport_protocol=transport_protocol,
        )

        await self.notifications_client.notify_deploy_successful(
            user_id=server.creator_id,
            server_name=server.name,
        )

    async def _get_tools(
        self, server: Server, service: DockerService, server_url: str
    ) -> list[Tool]:
        for i in range(settings.tool_retry_count):
            tools = await self._fetch_tools(
                server=server, server_url=server_url, service=service
            )
            if tools:
                return tools
        await service.user_logger.error("Error getting tools from server")
        raise ProcessingError

    async def _fetch_tools(
        self, server: Server, server_url: str, service: DockerService
    ) -> list[Tool] | None:
        try:
            await service.user_logger.info(
                f"Waiting {settings.tool_wait_interval} seconds for server to start up"
            )
            await asyncio.sleep(settings.tool_wait_interval)
            tools = await ServerService.get_tools(
                server_url=server_url, server_name=server.name
            )
            await service.user_logger.info(
                "Successfully got a list of tools from server"
            )
            return tools
        except InvalidData:
            await service.user_logger.warning("Failed to connect to server")
            return None

    @staticmethod
    async def _ensure_command(service: DockerService):
        if not service.server.command:
            await service.user_logger.error(
                "Command must not be empty if project does not have a Dockerfile"
            )
            raise ProcessingError

```

--------------------------------------------------------------------------------
/registry/src/deployments/service.py:
--------------------------------------------------------------------------------

```python
import re
from typing import Self

from fastapi import Depends

from ..settings import settings
from ..types import HostType
from ..types import TaskType
from .logs_repository import LogsRepository
from .schemas import DeploymentCreate
from registry.src.database import Server
from registry.src.database import ServerLogs
from registry.src.errors import InvalidData
from registry.src.errors import InvalidServerNameError
from registry.src.errors import NotAllowedError
from registry.src.errors import ServerAlreadyExistsError
from registry.src.errors import ServersNotFoundError
from registry.src.producer import get_producer
from registry.src.servers.repository import ServerRepository
from registry.src.servers.schemas import ServerRead
from registry.src.servers.schemas import ServerUpdate
from registry.src.servers.service import ID_REGEX_PATTERN
from registry.src.types import ServerStatus
from worker.src.schemas import Task


class DeploymentService:
    def __init__(self, repo: ServerRepository, logs_repo: LogsRepository) -> None:
        self.repo = repo
        self.logs_repo = logs_repo

    async def get_server(self, server_id: int, creator_id: str | None = None) -> Server:
        await self._assure_server_found(server_id)
        server = await self.repo.get_server(server_id)
        if creator_id:
            await self._assure_server_creator(server=server, user_id=creator_id)
        return server

    async def get_logs(self, server_id: int, current_user_id: str) -> ServerLogs:
        server = await self.get_server(server_id=server_id, creator_id=current_user_id)
        if server.host_type != HostType.internal:
            raise InvalidData("Logs are only available for internal servers")
        logs = await self.logs_repo.get_server_logs(server_id=server_id)
        assert logs
        return logs

    async def create_server(self, data: DeploymentCreate) -> Server:
        self._assure_server_name_is_valid_id(data.data.name)
        await self._assure_server_not_exists(
            name=data.data.name, repo_url=str(data.data.repo_url)
        )
        server = await self.repo.create_server(
            data=data.data, creator_id=data.current_user_id, tools=[]
        )
        await self.logs_repo.create_logs(server_id=server.id)
        await self.repo.session.commit()
        await self._send_task(server_id=server.id, task_type=TaskType.deploy)
        return server

    async def update_deployment(self, server_id: int, data: ServerUpdate) -> Server:
        server = await self.get_server(
            server_id=server_id, creator_id=data.current_user_id
        )
        if not data.deployment_data.model_dump(exclude_none=True):
            return server
        server = await self.repo.update_server(
            id=server_id,
            **data.deployment_data.model_dump(),
            status=ServerStatus.in_processing,
        )
        await self.repo.session.commit()
        await self._send_task(server_id=server.id, task_type=TaskType.deploy)
        return server

    async def stop_server(self, server_id: int, current_user_id: str) -> None:
        server = await self.get_server(server_id=server_id, creator_id=current_user_id)
        if server.status == ServerStatus.stopped:
            return
        if server.status != ServerStatus.active:
            raise InvalidData("Server is not active", server_status=server.status)
        await self._send_task(server_id=server_id, task_type=TaskType.stop)

    async def start_server(self, server_id: int, current_user_id: str) -> None:
        server = await self.get_server(server_id=server_id, creator_id=current_user_id)
        if server.status == ServerStatus.active:
            return
        if server.status != ServerStatus.stopped:
            raise InvalidData("Invalid server state", server_status=server.status)
        await self._send_task(server_id=server_id, task_type=TaskType.start)

    @staticmethod
    async def _send_task(server_id: int, task_type: TaskType) -> None:
        producer = await get_producer()
        await producer.start()
        await producer.send_and_wait(
            topic=settings.worker_topic,
            value=Task(task_type=task_type, server_id=server_id),
        )
        await producer.stop()

    async def _assure_server_found(self, server_id: int) -> None:
        if not await self.repo.find_servers(id=server_id):
            raise ServersNotFoundError([server_id])

    @staticmethod
    async def _assure_server_creator(server: Server, user_id: str) -> None:
        if server.creator_id != user_id:
            raise NotAllowedError(
                "Must be server creator to use this function",
                creator_id=server.creator_id,
                current_user_id=user_id,
            )

    async def _assure_server_not_exists(
        self,
        name: str | None = None,
        repo_url: str | None = None,
        ignore_id: int | None = None,
    ) -> None:
        if servers := await self.repo.find_servers(
            name=name, repo_url=repo_url, ignore_id=ignore_id
        ):
            dict_servers = [
                ServerRead.model_validate(server).model_dump() for server in servers
            ]
            raise ServerAlreadyExistsError(dict_servers)

    @staticmethod
    def _assure_server_name_is_valid_id(name: str) -> None:
        if not re.match(ID_REGEX_PATTERN, name):
            raise InvalidServerNameError(name=name)

    @classmethod
    def get_new_instance(
        cls,
        repo: ServerRepository = Depends(ServerRepository.get_new_instance),
        logs_repo: LogsRepository = Depends(LogsRepository.get_new_instance),
    ) -> Self:
        return cls(repo=repo, logs_repo=logs_repo)

```

--------------------------------------------------------------------------------
/registry/src/servers/repository.py:
--------------------------------------------------------------------------------

```python
from fastapi import Depends
from sqlalchemy import delete
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import Select
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

from .schemas import ServerCreateData
from .schemas import ServerUpdateData
from .schemas import Tool
from registry.src.database import get_session
from registry.src.database import Server
from registry.src.database import Tool as DBTool
from registry.src.deployments.schemas import DeploymentCreateData
from registry.src.errors import ServersNotFoundError
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol
from worker.src.constants import BaseImage


class ServerRepository:

    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def find_servers(
        self,
        id: int | None = None,
        name: str | None = None,
        url: str | None = None,
        ignore_id: int | None = None,
        repo_url: str | None = None,
    ) -> list[Server]:
        if id is None and name is None and url is None:
            raise ValueError("At least one of 'id', 'name', or 'url' must be provided.")
        if id == ignore_id and id is not None:
            raise ValueError("id and ignore_id cannot be the same.")

        query = select(Server).options(selectinload(Server.tools))
        conditions = [
            (Server.id == id),
            (func.lower(Server.name) == func.lower(name)),
        ]
        if url:
            conditions.append(Server.url == url)
        if repo_url:
            conditions.append(Server.repo_url == repo_url)
        query = query.filter(or_(*conditions))
        query = query.filter(Server.id != ignore_id)

        result = await self.session.execute(query)
        return result.scalars().all()

    async def get_server(self, id: int) -> Server:
        query = select(Server).filter(Server.id == id)
        query = query.options(selectinload(Server.tools))
        result = await self.session.execute(query)
        server = result.scalars().first()
        if server is None:
            raise ServersNotFoundError([id])
        return server

    async def create_server(
        self,
        data: ServerCreateData | DeploymentCreateData,
        tools: list[Tool],
        creator_id: str | None,
        transport_protocol: ServerTransportProtocol | None = None,
    ) -> Server:
        db_tools = []
        if isinstance(data, ServerCreateData):
            db_tools = self._convert_tools(tools, data.url)
        server = Server(
            **data.model_dump(exclude_none=True),
            tools=db_tools,
            creator_id=creator_id,
            transport_protocol=transport_protocol,
        )
        self.session.add(server)
        await self.session.flush()
        await self.session.refresh(server)
        return await self.get_server(server.id)

    async def get_all_servers(
        self,
        search_query: str | None = None,
        status: ServerStatus | None = None,
        creator_id: str | None = None,
    ) -> Select:
        query = select(Server).options(selectinload(Server.tools))

        if status is not None:
            query = query.where(Server.status == status)

        if creator_id:
            query = query.where(Server.creator_id == creator_id)

        if search_query:
            query_string = f"%{search_query}%"
            query = query.where(
                or_(
                    Server.name.ilike(query_string),
                    Server.description.ilike(query_string),
                    Server.title.ilike(query_string),
                )
            )
        return query

    async def get_servers_by_urls(self, urls: list[str]) -> list[Server]:
        query = select(Server).where(Server.url.in_(urls))
        result = await self.session.execute(query)
        return list(result.scalars().all())

    async def get_servers_by_ids(self, ids: list[int]) -> list[Server]:
        query = (
            select(Server).where(Server.id.in_(ids)).options(selectinload(Server.tools))
        )
        result = await self.session.execute(query)
        return list(result.scalars().all())

    async def delete_server(self, id: int) -> None:
        query = delete(Server).where(Server.id == id)
        await self.session.execute(query)

    async def update_server(
        self,
        id: int,
        data: ServerUpdateData | None = None,
        tools: list[Tool] | None = None,
        status: ServerStatus | None = None,
        repo_url: str | None = None,
        command: str | None = None,
        base_image: BaseImage | None = None,
        build_instructions: str | None = None,
        transport_protocol: ServerTransportProtocol | None = None,
    ) -> Server:
        server = await self.get_server(id)
        update_values = dict(
            status=status,
            repo_url=repo_url,
            command=command,
            transport_protocol=transport_protocol,
            base_image=base_image,
            build_instructions=build_instructions,
        )
        update_values = {
            key: value for key, value in update_values.items() if value is not None
        }
        if data:
            update_values.update(data.model_dump(exclude_none=True))
        for key, value in update_values.items():
            setattr(server, key, value)

        if tools is not None:
            query = delete(DBTool).where(DBTool.server_url == server.url)
            await self.session.execute(query)
            await self.session.flush()
            await self.session.refresh(server)
            server.tools.extend(self._convert_tools(tools, server.url))

        await self.session.flush()
        await self.session.refresh(server)
        return server

    def _convert_tools(self, tools: list[Tool], url: str) -> list[DBTool]:
        return [
            DBTool(**tool.model_dump(exclude_none=True), server_url=url)
            for tool in tools
        ]

    @classmethod
    def get_new_instance(
        cls, session: AsyncSession = Depends(get_session)
    ) -> "ServerRepository":
        return cls(session)

```

--------------------------------------------------------------------------------
/docker-compose.yaml:
--------------------------------------------------------------------------------

```yaml
services:

  registry:
    build:
      context: .
      dockerfile: registry/Dockerfile
    environment:
      UVICORN_PORT: ${UVICORN_PORT:-80}
      POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
      POSTGRES_USER: ${POSTGRES_USER:-change_me}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
      POSTGRES_DB: ${POSTGRES_DB:-registry}
      POSTGRES_PORT: ${POSTGRES_PORT:-}
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
      LLM_PROXY: ${LLM_PROXY:-}
      LLM_MODEL: ${LLM_MODEL:-}
      HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
      DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
      TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
      TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
    image: ${REGISTRY_IMAGE:-registry}:${CI_COMMIT_BRANCH:-local}
    container_name: registry
    pull_policy: always
    restart: always
    depends_on:
      registry_postgres:
        condition: service_healthy
        restart: true
    profiles:
      - main
    volumes:
      - /var/lib/hipasus/registry/api/logs:/workspace/logs
    healthcheck:
      test: curl -f http://localhost:${UVICORN_PORT:-80}/healthcheck
      interval: 15s
      timeout: 5s
      retries: 3
      start_period: 15s
    command: start.sh

  registry_mcp_server:
    build:
      context: .
      dockerfile: mcp_server/Dockerfile
    environment:
      - MCP_PORT
      - LOG_LEVEL
      - REGISTRY_URL
      - OPENAI_API_KEY
      - OPENAI_BASE_URL
      - LLM_PROXY
      - LLM_MODEL
    image: ${MCP_REGISTRY_IMAGE:-registry_mcp}:${CI_COMMIT_BRANCH:-local}
    container_name: registry_mcp_server
    pull_policy: always
    restart: always
    depends_on:
      registry:
        condition: service_healthy
        restart: true
    healthcheck:
      test: python3 -m mcp_server.scripts.mcp_health_check
      interval: 15s
      timeout: 5s
      retries: 3
      start_period: 15s
    profiles:
      - main
    volumes:
      - /var/lib/hipasus/registry/mcp_server/logs:/workspace/logs
    command: start.sh

  registry_deploy_worker:
    container_name: registry_deploy_worker
    image: ${WORKER_IMAGE_NAME:-registry_worker}:${CI_COMMIT_BRANCH:-local}
    build:
      context: .
      dockerfile: worker/Dockerfile
    pull_policy: always
    environment:
      LOG_LEVEL: ${LOG_LEVEL}
      POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
      POSTGRES_USER: ${POSTGRES_USER:-change_me}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
      POSTGRES_DB: ${POSTGRES_DB:-registry}
      POSTGRES_PORT: ${POSTGRES_PORT:-5432}
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
      LLM_PROXY: ${LLM_PROXY:-}
      LLM_MODEL: ${LLM_MODEL:-}
      DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN}
      DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD}
      DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network}
      HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
      DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
      TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
      TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
    profiles:
      - main
    depends_on:
      registry_postgres:
        condition: service_healthy
        restart: true
      portal_kafka:
        condition: service_healthy
        restart: true
    volumes:
      - /var/lib/hipasus/registry/worker/logs:/workspace/logs
      - /var/run/docker.sock:/var/run/docker.sock
    restart: always
    command: worker-start.sh

  portal_zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: portal_zookeeper
    ports:
      - "2181:2181"
    profiles:
      - main
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: portal_zookeeper:2888:3888
    healthcheck:
      test: nc -z localhost 2181 || exit -1
      interval: 30s
      timeout: 20s
      retries: 5
    restart: always


  portal_kafka:
    image: confluentinc/cp-kafka:7.3.2
    container_name: portal_kafka
    profiles:
      - main
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://portal_kafka:19092,DOCKER://127.0.0.1:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "portal_zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    healthcheck:
      test: kafka-cluster cluster-id --bootstrap-server 127.0.0.1:29092 || exit 1
      interval: 30s
      timeout: 20s
      retries: 5
    depends_on:
      portal_zookeeper:
        condition: service_healthy
        restart: true
    restart: always

  registry_postgres:
    container_name: registry_postgres
    image: postgres:17
    environment:
      POSTGRES_USER: ${POSTGRES_USER:-change_me}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
      POSTGRES_DB: ${POSTGRES_DB:-registry}
    volumes:
      - /var/lib/hipasus/registry/pg_data:/var/lib/postgresql/data
    command: postgres -c 'max_connections=500'
    profiles:
      - main
    healthcheck:
      test: pg_isready -U ${POSTGRES_USER:-change_me} -d ${POSTGRES_DB:-registry}
      interval: 7s
      timeout: 5s
      retries: 5
      start_period: 5s
    restart: always

  registry_healthchecker:
    container_name: registry_healthchecker
    image: ${HEALTHCHECKER_IMAGE_NAME:-registry_healthchecker}:${CI_COMMIT_BRANCH:-local}
    build:
      context: .
      dockerfile: healthchecker/Dockerfile
    pull_policy: always
    environment:
      LOG_LEVEL: ${LOG_LEVEL}
      POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres}
      POSTGRES_USER: ${POSTGRES_USER:-change_me}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me}
      POSTGRES_DB: ${POSTGRES_DB:-registry}
      POSTGRES_PORT: ${POSTGRES_PORT:-5432}
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      OPENAI_BASE_URL: ${OPENAI_BASE_URL:-}
      LLM_PROXY: ${LLM_PROXY:-}
      LLM_MODEL: ${LLM_MODEL:-}
      DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN}
      DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD}
      DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network}
      HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-}
      DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-}
      TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-}
      TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-}
    profiles:
      - main
    depends_on:
      registry:
        condition: service_healthy
        restart: true
    restart: always
    command: worker-start.sh

networks:
  default:
    name: ${DOCKER_NETWORK:-portal_network}
    external: true

```

--------------------------------------------------------------------------------
/registry/src/servers/service.py:
--------------------------------------------------------------------------------

```python
import re
from contextlib import _AsyncGeneratorContextManager  # noqa
from typing import Self

from fastapi import Depends
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from pydantic import ValidationError
from sqlalchemy import Select

from ..settings import settings
from .repository import ServerRepository
from .schemas import MCP
from .schemas import MCPJson
from .schemas import ServerCreate
from .schemas import ServerRead
from .schemas import ServerUpdate
from .schemas import Tool
from .schemas import tool_name_regex
from registry.src.database import Server
from registry.src.errors import InvalidData
from registry.src.errors import InvalidServerNameError
from registry.src.errors import NotAllowedError
from registry.src.errors import RemoteServerError
from registry.src.errors import ServerAlreadyExistsError
from registry.src.errors import ServersNotFoundError
from registry.src.logger import logger
from registry.src.types import ServerStatus
from registry.src.types import ServerTransportProtocol

BASE_SERVER_NAME = "base_darp_server#REPLACEME"
ID_REGEX_PATTERN = re.compile(rf"^[a-zA-Z_][a-zA-Z0-9_]*|{BASE_SERVER_NAME}$")


class ServerService:
    def __init__(self, repo: ServerRepository) -> None:
        self.repo = repo

    async def create(self, data: ServerCreate) -> Server:
        self._assure_server_name_is_valid_id(data.data.name)
        await self._assure_server_not_exists(name=data.data.name, url=data.data.url)

        server_transport_protocol = await self.detect_transport_protocol(
            server_url=data.data.url
        )

        logger.info(
            "Detected server's transport protocol %s", server_transport_protocol
        )

        tools = await self.get_tools(
            server_url=data.data.url,
            server_name=data.data.name,
            transport=server_transport_protocol,
        )
        return await self.repo.create_server(
            data.data,
            tools,
            creator_id=data.current_user_id,
            transport_protocol=server_transport_protocol,
        )

    @classmethod
    async def get_tools(
        cls,
        server_url: str,
        server_name: str,
        transport: ServerTransportProtocol | None = None,
    ) -> list[Tool]:

        if transport is None:
            transport = await cls.detect_transport_protocol(server_url)

        try:
            return await cls._fetch_tools(
                server_url=server_url, server_name=server_name, transport=transport
            )
        except Exception as e:
            if isinstance(e, InvalidData):
                raise
            logger.warning(
                f"Error while getting tools from server {server_url}",
                exc_info=e,
            )
            raise InvalidData(
                "Server is unhealthy or URL does not lead to a valid MCP server",
                url=server_url,
            )

    async def delete_server(self, id: int, current_user_id: str) -> None:
        server = await self.get_server_by_id(id=id)
        if server.creator_id != current_user_id:
            raise NotAllowedError("Only server creator can delete it")
        await self.repo.delete_server(id=id)

    async def get_all_servers(
        self,
        status: ServerStatus | None = None,
        search_query: str | None = None,
        creator_id: str | None = None,
    ) -> Select:
        return await self.repo.get_all_servers(
            search_query=search_query, status=status, creator_id=creator_id
        )

    async def get_server_by_id(self, id: int) -> Server:
        await self._assure_server_found(id)
        return await self.repo.get_server(id=id)

    async def get_servers_by_ids(self, ids: list[int]) -> list[Server]:
        servers = await self.repo.get_servers_by_ids(ids=ids)
        if len(ids) != len(servers):
            retrieved_server_ids = {server.id for server in servers}
            missing_server_ids = set(ids) - retrieved_server_ids
            raise ServersNotFoundError(ids=list(missing_server_ids))
        return servers

    async def update_server(self, id: int, data: ServerUpdate) -> Server:
        server = await self.get_server_by_id(id=id)
        transport_protocol = None
        if server.creator_id != data.current_user_id:
            raise NotAllowedError("Only server creator can update it")
        if data.data.name or data.data.url:
            await self._assure_server_not_exists(
                name=data.data.name, url=data.data.url, ignore_id=id
            )
        updated_server_url = data.data.url or server.url
        if updated_server_url is not None:
            transport_protocol = await self.detect_transport_protocol(
                server_url=updated_server_url
            )
            tools = await self.get_tools(
                server_url=updated_server_url,
                server_name=data.data.name or server.name,
                transport=transport_protocol,
            )
        else:
            tools = []

        return await self.repo.update_server(
            id,
            data.data,
            tools,
            transport_protocol=transport_protocol,
        )

    async def _assure_server_found(self, id: int) -> None:
        if not await self.repo.find_servers(id=id):
            raise ServersNotFoundError([id])

    def _assure_server_name_is_valid_id(self, name: str) -> None:
        if not re.match(ID_REGEX_PATTERN, name):
            raise InvalidServerNameError(name=name)

    async def _assure_server_not_exists(
        self,
        id: int | None = None,
        name: str | None = None,
        url: str | None = None,
        ignore_id: int | None = None,
    ) -> None:
        if servers := await self.repo.find_servers(id, name, url, ignore_id):
            dict_servers = [
                ServerRead.model_validate(server).model_dump() for server in servers
            ]
            raise ServerAlreadyExistsError(dict_servers)

    async def get_search_servers(self) -> list[Server]:
        servers_query = await self.repo.get_all_servers()
        servers_query = servers_query.where(Server.url != None)  # noqa
        servers = (await self.repo.session.execute(servers_query)).scalars().all()
        return list(servers)

    async def get_servers_by_urls(self, server_urls: list[str]) -> list[Server]:
        servers = await self.repo.get_servers_by_urls(urls=server_urls)
        if len(server_urls) != len(servers):
            retrieved_server_urls = {server.url for server in servers}
            missing_server_urls = set(server_urls) - retrieved_server_urls
            logger.warning(
                f"One or more server urls are incorrect {missing_server_urls=}"
            )
        return servers

    async def get_mcp_json(self) -> MCPJson:
        servers = await self.repo.get_all_servers()
        servers = (await self.repo.session.execute(servers)).scalars().all()
        return MCPJson(servers=[MCP.model_validate(server) for server in servers])

    async def get_deep_research(self) -> list[Server]:
        servers = await self.repo.find_servers(name=settings.deep_research_server_name)
        if len(servers) == 0:
            raise RemoteServerError(
                f"{settings.deep_research_server_name} MCP server does not exist in registry"
            )
        assert len(servers) == 1, "Invalid state. Multiple deepresearch servers found"
        if servers[0].status != ServerStatus.active:
            raise RemoteServerError(
                f"{settings.deep_research_server_name} MCP server is down."
            )
        return servers

    @classmethod
    def get_new_instance(
        cls, repo: ServerRepository = Depends(ServerRepository.get_new_instance)
    ) -> Self:
        return cls(repo=repo)

    @classmethod
    async def _fetch_tools(
        cls, server_url: str, server_name: str, transport: ServerTransportProtocol
    ) -> list[Tool]:
        client_ctx = cls._get_client_context(server_url, transport)

        async with client_ctx as (read, write, *_):
            async with ClientSession(read, write) as session:
                await session.initialize()
                tools_response = await session.list_tools()
        try:
            tools = [
                Tool(
                    **tool.model_dump(exclude_none=True),
                    alias=f"{tool.name}__{server_name}",
                )
                for tool in tools_response.tools
            ]
        except ValidationError:
            raise InvalidData(
                message=f"Invalid tool names. {{tool_name}}__{{server_name}} must fit {tool_name_regex}"
            )
        return tools

    @classmethod
    def _get_client_context(
        cls,
        server_url: str,
        transport_protocol: ServerTransportProtocol,
    ) -> _AsyncGeneratorContextManager:
        if transport_protocol == ServerTransportProtocol.STREAMABLE_HTTP:
            client_ctx = streamablehttp_client(server_url)
        elif transport_protocol == ServerTransportProtocol.SSE:
            client_ctx = sse_client(server_url)
        else:
            raise RuntimeError(
                "Unsupported transport protocol: %s", transport_protocol.name
            )

        return client_ctx

    @classmethod
    async def detect_transport_protocol(
        cls,
        server_url: str,
    ) -> ServerTransportProtocol:
        for protocol in (
            ServerTransportProtocol.STREAMABLE_HTTP,
            ServerTransportProtocol.SSE,
        ):
            if await cls._is_transport_supported(server_url, protocol):
                return protocol

        raise InvalidData(
            "Can't detect server's transport protocol, maybe server is unhealthy?",
            url=server_url,
        )

    @classmethod
    async def _is_transport_supported(
        cls,
        server_url: str,
        protocol: ServerTransportProtocol,
    ) -> bool:
        try:
            client_ctx = cls._get_client_context(server_url, protocol)
            async with client_ctx as (read, write, *_):
                return await cls._can_initialize_session(read, write)

        except Exception as e:
            logger.error(
                "Failed to create %s client",
                protocol.name,
                exc_info=e,
            )
            return False

    @staticmethod
    async def _can_initialize_session(read, write) -> bool:
        try:
            async with ClientSession(read, write) as session:
                await session.initialize()
            return True
        except Exception:
            return False

```