# Directory Structure ``` ├── .gitignore ├── .gitlab-ci.yml ├── .pre-commit-config.yaml ├── alembic │ ├── env.py │ ├── README │ ├── script.py.mako │ └── versions │ ├── 2025_03_24_0945-ffe7a88d5b15_initial_migration.py │ ├── 2025_04_09_1117-e25328dbfe80_set_logo_to_none.py │ ├── 2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py │ ├── 2025_04_25_1158-49e1ab420276_host_type.py │ ├── 2025_04_30_1049-387bed3ce6ae_server_title.py │ ├── 2025_05_12_1404-735603f5e4d5_creator_id.py │ ├── 2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py │ ├── 2025_05_13_1326-4b751cb703e2_timestamps.py │ ├── 2025_05_15_1414-7a3913a3bb4c_logs.py │ ├── 2025_05_27_0757-7e8c31ddb2db_json_logs.py │ ├── 2025_05_28_0632-f2298e6acd39_base_image.py │ ├── 2025_05_30_1231-4b5c0d56e1ca_build_instructions.py │ ├── 2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py │ ├── 2025_06_07_1322-aa9a14a698c5_jsonb_logs.py │ └── 2025_06_17_1353-667874dd7dee_added_alias.py ├── alembic.ini ├── common │ ├── __init__.py │ ├── llm │ │ ├── __init__.py │ │ └── client.py │ └── notifications │ ├── __init__.py │ ├── client.py │ ├── enums.py │ └── exceptions.py ├── deploy │ ├── docker-version │ ├── push-and-run │ ├── ssh-agent │ └── ssh-passphrase ├── docker-compose-debug.yaml ├── docker-compose.yaml ├── healthchecker │ ├── __init__.py │ ├── Dockerfile │ ├── scripts │ │ ├── worker-start-debug.sh │ │ └── worker-start.sh │ └── src │ ├── __init__.py │ ├── __main__.py │ └── checker.py ├── LICENSE ├── mcp_server │ ├── __init__.py │ ├── Dockerfile │ ├── requirements.txt │ ├── scripts │ │ ├── __init__.py │ │ ├── mcp_health_check.py │ │ ├── start-debug.sh │ │ └── start.sh │ ├── server.json │ └── src │ ├── __init__.py │ ├── decorators.py │ ├── llm │ │ ├── __init__.py │ │ ├── prompts.py │ │ └── tool_manager.py │ ├── logger.py │ ├── main.py │ ├── registry_client.py │ ├── schemas.py │ ├── settings.py │ └── tools.py ├── pyproject.toml ├── README.md ├── registry │ ├── __init__.py │ ├── Dockerfile │ ├── requirements.txt │ ├── scripts │ │ ├── requirements │ │ ├── start-debug.sh │ │ └── start.sh │ └── src │ ├── __init__.py │ ├── base_schema.py │ ├── database │ │ ├── __init__.py │ │ ├── models │ │ │ ├── __init__.py │ │ │ ├── base.py │ │ │ ├── log.py │ │ │ ├── mixins │ │ │ │ ├── __init__.py │ │ │ │ ├── has_created_at.py │ │ │ │ ├── has_server_id.py │ │ │ │ └── has_updated_at.py │ │ │ ├── server.py │ │ │ └── tool.py │ │ └── session.py │ ├── deployments │ │ ├── __init__.py │ │ ├── logs_repository.py │ │ ├── router.py │ │ ├── schemas.py │ │ └── service.py │ ├── errors.py │ ├── logger.py │ ├── main.py │ ├── producer.py │ ├── search │ │ ├── __init__.py │ │ ├── prompts.py │ │ ├── schemas.py │ │ └── service.py │ ├── servers │ │ ├── __init__.py │ │ ├── repository.py │ │ ├── router.py │ │ ├── schemas.py │ │ └── service.py │ ├── settings.py │ ├── types.py │ └── validator │ ├── __init__.py │ ├── constants.py │ ├── prompts.py │ ├── schemas.py │ ├── service.py │ └── ssl.py ├── scripts │ ├── darp-add.py │ ├── darp-router.py │ └── darp-search.py └── worker ├── __init__.py ├── Dockerfile ├── requirements.txt ├── scripts │ ├── worker-start-debug.sh │ └── worker-start.sh └── src ├── __init__.py ├── constants.py ├── consumer.py ├── deployment_service.py ├── docker_service.py ├── dockerfile_generators │ ├── __init__.py │ ├── base.py │ ├── factory.py │ ├── python │ │ ├── __init__.py │ │ └── generic.py │ ├── typescript │ │ ├── __init__.py │ │ └── generic.py │ └── user_provided.py ├── errors.py ├── main.py ├── schemas.py └── user_logger.py ``` # Files -------------------------------------------------------------------------------- /.pre-commit-config.yaml: -------------------------------------------------------------------------------- ```yaml exclude: 'alembic/versions' repos: - repo: https://github.com/psf/black rev: 24.4.2 hooks: - id: black - repo: https://github.com/asottile/reorder-python-imports rev: v3.13.0 hooks: - id: reorder-python-imports args: [--py312-plus] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.10.1 hooks: - id: mypy exclude: alembic additional_dependencies: [types-requests] - repo: https://github.com/PyCQA/flake8 rev: 7.1.0 hooks: - id: flake8 additional_dependencies: [flake8-pyproject] ``` -------------------------------------------------------------------------------- /.gitlab-ci.yml: -------------------------------------------------------------------------------- ```yaml stages: - test - build - deploy test-pre-commit: stage: test tags: - darp-group-shell-runner script: - source ~/miniconda3/bin/activate && pre-commit run --all-files test-migrations: stage: test tags: - darp-group-shell-runner script: - duplicates=`grep -h down_revision alembic/versions/*.py | sort | uniq -d` - echo "$duplicates" - test -z "$duplicates" clean-pre-commit: stage: test tags: - darp-group-shell-runner script: - source ~/miniconda3/bin/activate && pre-commit clean when: manual build: stage: build when: manual tags: - darp-group-shell-runner script: - source deploy/docker-version - docker compose --profile main build deploy-test: stage: deploy needs: ["build"] when: manual tags: - darp-group-shell-runner script: - DOCKER_NETWORK=highkey_network deploy/push-and-run "$TEST_DOCKER_HOST" deploy-memelabs: stage: deploy needs: ["build"] when: manual tags: - darp-group-shell-runner script: - DOCKER_NETWORK=highkey_network deploy/push-and-run "$MEMELABS_DOCKER_HOST" rules: - if: $CI_COMMIT_BRANCH == "main" deploy-prod: stage: deploy when: manual needs: ["build"] tags: - darp-group-shell-runner script: - DOCKER_NETWORK=highkey_network deploy/push-and-run "$PRODUCTION_DOCKER_HOST" rules: - if: $CI_COMMIT_BRANCH == "main" deploy-toci-test: stage: deploy when: manual needs: ["build"] tags: - darp-group-shell-runner script: - deploy/push-and-run "$TOCI_TEST_DOCKER_HOST" deploy-toci-prod: stage: deploy when: manual needs: ["build"] tags: - darp-group-shell-runner script: - deploy/push-and-run "$TOCI_PRODUCTION_DOCKER_HOST" rules: - if: $CI_COMMIT_BRANCH == "main" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # VSCode .vscode/ # JetBrains .idea # WandB wandb/ # Old old/ temp/ profiler/ # Logs logs/ # eval eval_results/ evalplus_codegen/ # All datasets dataset/ dataset_processed/ dataset_processed_*/ PDF/ *.xlsx *.csv # All evaluation results eval_baselines/ eval_results/ eval_results_temp/ # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST *.ipynb *.pid # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pdm .pdm.toml # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .env.* .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # MacOs .DS_Store # Local history .history # PyCharm .idea/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # DARPEngine The MCP searchengine for DARP. [![X][x-image]][x-url] [![Code style: black][black-image]][black-url] [![Imports: reorder-python-imports][imports-image]][imports-url] [![Pydantic v2][pydantic-image]][pydantic-url] [![pre-commit][pre-commit-image]][pre-commit-url] [![License MIT][license-image]][license-url] DARPEngine stores metadata for MCP servers hosted online and provides smart search capabilites. ## Features * Simple CLI * API access to search * MCP tool to retrieve search results for connecting manually * Routing MCP tool based on the server: answer any question using the tools found for the user's request ### Coming soon * Support for `.well-known/mcp.json` * Crawler * Nice frontend * Hosted version * Validate different levels of SSL certificates and integrate this info smarly to make sensitive MCP servers difficult to spoof ## Installation ``` export OPENAI_API_KEY=sk-... docker network create highkey_network docker compose build docker compose -f docker-compose.yaml -f docker-compose-debug.yaml up --build --wait ``` ## Getting started You can connect the DARPEngine to an MCP Client (e.g. Claude Desktop or Cursor) using mcp tools provided. Just select SSE mode & specify `http://localhost:4689/sse` as the endpoint. ### Direct CLI use Another way is to use CLI. Most of the scripts work with just standard Python libraries, but routing tool requires mcp package, you can install script requirements like this: ``` conda create -n darp 'python>=3.10' conda activate darp pip install -r mcp_server/requirements.txt ``` When installation is over we can use the scripts. To begin with it we need to add some MCP servers to the engine, e.g: ``` $ python scripts/darp-add.py --url http://memelabs.ai:3006/sse --name code_analysis --description "Analyze gitlab repo for quality, topics, packages use" ``` Then we can make the requests: ``` $ python scripts/darp-search.py "Analyze https://github.com/BenderV/autochat" Found 1 servers: code_analysis ``` You can get more useful results with the routing tool: ``` $ python scripts/darp-router.py "Analyze https://github.com/BenderV/autochat" assistant: [tool_calls] [tool] ... [tool] ... [tool] ... assistant: ### Code Quality The code in the AutoChat repository demonstrates a good level of quality concerning **readability**, **maintainability**, and adherence to best practices: - **Readability**: Consistent naming conventions are evident, with adherence to Python's PEP 8 styling guidelines. Descriptive function and variable names enhance the understanding of the code's purpose. - **Maintainability**: The code is structured to allow easy updates and modifications, with a clear separation of concerns observed through the use of classes and methods handling distinct functionalities. - **Best Practices**: Extensive use of exception handling, type annotations, and docstrings reflect best practices in Python development. Some TODO comments suggest areas for improvement, indicating that further attention is needed. ### Code Structure The code is organized into multiple files and modules, each serving a distinct purpose: - **Modular Design**: Various classes (e.g., `Autochat`, `Image`, `Message`) indicate a well-structured object-oriented design that promotes separation of concerns, making the code easier to navigate. - **Logical Organization**: Files are logically separated based on functionality. For example, `chat.py` focuses on chat-related logic, while `model.py` handles message and image processing. The utility functions in `utils.py` enhance reusability. - **Testing**: The presence of a test file (`tests/test_utils.py`) shows commitment to testing, crucial for code reliability. The use of `unittest` indicates a structured approach to testing individual components. ### Main Functionality The code appears to be part of an **AutoChat package**, providing a framework for building conversational agents. Key functionalities include: - **Chat Management**: The `Autochat` class acts as the main interface for managing conversations, handling message history, context, and interaction limits. - **Message Handling**: Classes like `Message` and `MessagePart` enable structured message creation and processing, accommodating different message types, including text and images. - **Functionality Extensions**: Methods like `add_tool` and `add_function` allow dynamic addition of tools and functions, facilitating customization of the chat experience. - **Provider Integration**: Different API provider integrations (e.g., OpenAI, Anthropic) are encapsulated within respective classes, allowing flexibility in backend communication. - **Utilities**: Utility functions offer additional capabilities such as CSV formatting and function parsing that support main chat operations. Overall, the codebase is well-organized and showcases a thoughtful approach to developing a conversational AI framework. There is room for further refinement and enhancement, particularly in documentation and clarity of variable names. ### Library Usage The project makes use of **AI libraries**, indicated by its functionality related to conversational agents and integration with AI service providers. This supports its ability to manage interactions with AI models efficiently. ### Summary The AutoChat project is a chat system designed for communication with various AI models, primarily through the `Autochat` class, which manages conversations and supports complex message types, including text and images. The code is moderately complex due to its integration with external APIs and its ability to handle diverse interactions through extensible methods like `add_tool` and `add_function`. The quality of code is commendable, featuring a well-structured modular design that promotes readability and maintainability, although some areas require further documentation and refinement, such as clarifying variable names and enhancing comments. The organization into separate files for models, utilities, and tests aids development, but the utility functions could benefit from better categorization for improved clarity. ``` Of course, the usefulness of the result depends on the MCP servers you connect to the engine. ## Get help and support Please feel free to connect with us using the [discussion section](https://github.com/hipasus/darp_engine/discussions). ## Contributing Follow us on X: https://x.com/DARP_AI ## License The DARPEngine codebase is under MIT license. <br> [x-image]: https://img.shields.io/twitter/follow/DARP_AI?style=social [x-url]: https://x.com/DARP_AI [black-image]: https://img.shields.io/badge/code%20style-black-000000.svg [black-url]: https://github.com/psf/black [imports-image]: https://img.shields.io/badge/%20imports-reorder_python_imports-%231674b1?style=flat&labelColor=ef8336 [imports-url]: https://github.com/asottile/reorder-python-imports/ [pydantic-image]: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json [pydantic-url]: https://pydantic.dev [pre-commit-image]: https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white [pre-commit-url]: https://github.com/pre-commit/pre-commit [license-image]: https://img.shields.io/github/license/DARPAI/darp_engine [license-url]: https://opensource.org/licenses/MIT ``` -------------------------------------------------------------------------------- /common/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /common/llm/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /common/notifications/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /healthchecker/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /healthchecker/src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_server/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_server/scripts/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_server/src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /mcp_server/src/llm/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/database/models/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/deployments/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/search/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/servers/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /registry/src/validator/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /worker/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /worker/src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /worker/scripts/worker-start.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env sh set -e python -m worker.src.main ``` -------------------------------------------------------------------------------- /healthchecker/scripts/worker-start.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env sh set -e python -m healthchecker.src ``` -------------------------------------------------------------------------------- /mcp_server/scripts/start.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash set -e python3 -m mcp_server.src.main ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/python/__init__.py: -------------------------------------------------------------------------------- ```python from .generic import PythonGenerator __all__ = ["PythonGenerator"] ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/typescript/__init__.py: -------------------------------------------------------------------------------- ```python from .generic import TypeScriptGenerator __all__ = ["TypeScriptGenerator"] ``` -------------------------------------------------------------------------------- /mcp_server/requirements.txt: -------------------------------------------------------------------------------- ``` mcp==1.6.0 httpx==0.28.1 pydantic==2.11.1 pydantic-settings==2.8.1 openai==1.65.4 ``` -------------------------------------------------------------------------------- /registry/src/database/models/base.py: -------------------------------------------------------------------------------- ```python from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass ``` -------------------------------------------------------------------------------- /common/notifications/exceptions.py: -------------------------------------------------------------------------------- ```python class NotificationsServiceError(Exception): """Base notifications-service exception""" ``` -------------------------------------------------------------------------------- /worker/scripts/worker-start-debug.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env sh set -e pip install --upgrade -r requirements.txt python -m worker.src.main ``` -------------------------------------------------------------------------------- /healthchecker/scripts/worker-start-debug.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env sh set -e pip install --upgrade -r requirements.txt python -m healthchecker.src ``` -------------------------------------------------------------------------------- /mcp_server/scripts/start-debug.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash set -e pip install --upgrade -r ./requirements.txt python3 -m mcp_server.src.main ``` -------------------------------------------------------------------------------- /registry/scripts/start.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash set -e alembic upgrade head uvicorn --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app ``` -------------------------------------------------------------------------------- /registry/src/base_schema.py: -------------------------------------------------------------------------------- ```python from pydantic import BaseModel from pydantic import ConfigDict class BaseSchema(BaseModel): model_config = ConfigDict(from_attributes=True) ``` -------------------------------------------------------------------------------- /mcp_server/server.json: -------------------------------------------------------------------------------- ```json { "name": "darp_search_engine", "description": "Search Engine for MCP servers", "endpoint": "http://registry_mcp_server/sse", "logo": null } ``` -------------------------------------------------------------------------------- /registry/src/database/models/mixins/__init__.py: -------------------------------------------------------------------------------- ```python from .has_created_at import HasCreatedAt from .has_server_id import HasServerId from .has_updated_at import HasUpdatedAt __all__ = ["HasCreatedAt", "HasUpdatedAt", "HasServerId"] ``` -------------------------------------------------------------------------------- /registry/scripts/start-debug.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash set -e pip install --upgrade -r requirements.txt alembic upgrade head uvicorn --reload --proxy-headers --host 0.0.0.0 --port $UVICORN_PORT registry.src.main:app ``` -------------------------------------------------------------------------------- /mcp_server/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.11 WORKDIR /workspace COPY mcp_server/requirements.txt . RUN pip install -r requirements.txt COPY common ./common COPY mcp_server ./mcp_server ENV PATH "$PATH:/workspace/mcp_server/scripts" ``` -------------------------------------------------------------------------------- /worker/src/constants.py: -------------------------------------------------------------------------------- ```python from enum import StrEnum class BaseImage(StrEnum): python_3_10 = "python:3.10.17-slim" python_3_11 = "python:3.11.12-slim" python_3_12 = "python:3.12.10-slim" node_lts = "node:lts-alpine" ``` -------------------------------------------------------------------------------- /worker/src/errors.py: -------------------------------------------------------------------------------- ```python class ProcessingError(Exception): def __init__(self, message: str | None = None): self.message = message class InvalidData(ProcessingError): pass class DependenciesError(ProcessingError): pass ``` -------------------------------------------------------------------------------- /common/notifications/enums.py: -------------------------------------------------------------------------------- ```python from enum import StrEnum class NotificationEvent(StrEnum): email_confirmation = "email-confirmation" deploy_failed = "server-deploy-failed" deploy_successful = "server-success-deploy" server_healthcheck_failed = "server-healthcheck-failed" ``` -------------------------------------------------------------------------------- /registry/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.11 WORKDIR /workspace COPY registry/requirements.txt . RUN pip install -r requirements.txt COPY alembic ./alembic COPY alembic.ini ./alembic.ini COPY common ./common COPY registry ./registry COPY worker ./worker ENV PATH "$PATH:/workspace/registry/scripts" ``` -------------------------------------------------------------------------------- /registry/src/producer.py: -------------------------------------------------------------------------------- ```python from aiokafka import AIOKafkaProducer from registry.src.settings import settings async def get_producer(url=settings.broker_url) -> AIOKafkaProducer: return AIOKafkaProducer( bootstrap_servers=url, value_serializer=lambda m: m.model_dump_json().encode("utf-8"), ) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml # pyproject.toml [tool.pytest.ini_options] minversion = "6.0" addopts = "--doctest-modules" [tool.flake8] ignore = ['E203', 'D203', 'E501', 'W503'] exclude = [ '.git', '__pycache__', 'docs/source/conf.py', 'old', 'build', 'dist', '.venv', ] max-complexity = 25 ``` -------------------------------------------------------------------------------- /registry/src/main.py: -------------------------------------------------------------------------------- ```python from fastapi import FastAPI from registry.src.deployments.router import router as deploy_router from registry.src.servers.router import router as servers_router app = FastAPI() app.include_router(servers_router) app.include_router(deploy_router) @app.get("/healthcheck") async def healthcheck(): return "Alive" ``` -------------------------------------------------------------------------------- /worker/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM docker:latest RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev WORKDIR /workspace COPY worker/requirements.txt . RUN pip config set global.break-system-packages true RUN pip install -r requirements.txt COPY registry ./registry COPY worker ./worker ENV PATH "$PATH:/workspace/worker/scripts" ``` -------------------------------------------------------------------------------- /registry/src/database/__init__.py: -------------------------------------------------------------------------------- ```python from .models.base import Base from .models.log import ServerLogs from .models.server import Server from .models.tool import Tool from .session import get_session from .session import get_unmanaged_session __all__ = [ "Base", "get_session", "get_unmanaged_session", "Server", "Tool", "ServerLogs", ] ``` -------------------------------------------------------------------------------- /mcp_server/src/llm/prompts.py: -------------------------------------------------------------------------------- ```python default_prompt: str = ( """ You are an expert in the topic of user's request. Your assistant has prepared for you the tools that might be helpful to answer the request. Your goal is to provide detailed, accurate information and analysis in response to user queries and if necessary to perform some actions using the tools. """.strip() ) ``` -------------------------------------------------------------------------------- /registry/src/database/models/mixins/has_created_at.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from sqlalchemy import Column from sqlalchemy import DateTime from sqlalchemy.orm import declarative_mixin from sqlalchemy.orm import declared_attr @declarative_mixin class HasCreatedAt: @declared_attr @classmethod def created_at(cls): return Column(DateTime, default=datetime.now, nullable=False) ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/__init__.py: -------------------------------------------------------------------------------- ```python from .base import DockerfileGenerator from .factory import get_dockerfile_generator from .python import PythonGenerator from .typescript import TypeScriptGenerator from .user_provided import UserDockerfileGenerator __all__ = [ "DockerfileGenerator", "PythonGenerator", "TypeScriptGenerator", "UserDockerfileGenerator", "get_dockerfile_generator", ] ``` -------------------------------------------------------------------------------- /healthchecker/Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM docker:latest RUN apk add --no-cache python3 py3-pip gcc python3-dev libc-dev zlib-dev WORKDIR /workspace COPY registry/requirements.txt . RUN pip config set global.break-system-packages true RUN pip install -r requirements.txt COPY common ./common COPY registry ./registry COPY worker ./worker COPY healthchecker ./healthchecker ENV PATH "$PATH:/workspace/healthchecker/scripts" ``` -------------------------------------------------------------------------------- /registry/src/database/models/mixins/has_updated_at.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from sqlalchemy import Column from sqlalchemy import DateTime from sqlalchemy.orm import declarative_mixin from sqlalchemy.orm import declared_attr @declarative_mixin class HasUpdatedAt: @declared_attr @classmethod def updated_at(cls): return Column( DateTime, onupdate=datetime.now, default=datetime.now, nullable=False ) ``` -------------------------------------------------------------------------------- /mcp_server/scripts/mcp_health_check.py: -------------------------------------------------------------------------------- ```python import asyncio from mcp import ClientSession from mcp.client.sse import sse_client from mcp_server.src.settings import settings async def healthcheck(): async with sse_client(f"http://localhost:{settings.mcp_port}/sse") as ( read, write, ): async with ClientSession(read, write) as session: await session.initialize() if __name__ == "__main__": asyncio.run(healthcheck()) ``` -------------------------------------------------------------------------------- /registry/src/database/models/mixins/has_server_id.py: -------------------------------------------------------------------------------- ```python from sqlalchemy import Column from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy.orm import declarative_mixin from sqlalchemy.orm import declared_attr @declarative_mixin class HasServerId: @declared_attr def server_id(cls): return Column( Integer, ForeignKey("server.id", ondelete="CASCADE"), nullable=False, primary_key=True, ) ``` -------------------------------------------------------------------------------- /registry/src/database/models/log.py: -------------------------------------------------------------------------------- ```python from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from .base import Base from .mixins import HasCreatedAt from .mixins import HasServerId from .mixins import HasUpdatedAt class ServerLogs(HasCreatedAt, HasUpdatedAt, HasServerId, Base): __tablename__ = "server_logs" deployment_logs: Mapped[list[dict]] = mapped_column( JSONB(), nullable=False, server_default="'[]'::jsonb" ) ``` -------------------------------------------------------------------------------- /registry/src/logger.py: -------------------------------------------------------------------------------- ```python import logging from .settings import settings logger = logging.getLogger("registry") logger.setLevel(logging.INFO) formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s") stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) settings.log_dir.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(settings.log_dir / "registry.log") file_handler.setFormatter(formatter) logger.addHandler(file_handler) ``` -------------------------------------------------------------------------------- /registry/src/types.py: -------------------------------------------------------------------------------- ```python from enum import auto from enum import StrEnum class HostType(StrEnum): internal = auto() external = auto() class ServerStatus(StrEnum): active = auto() inactive = auto() in_processing = auto() stopped = auto() processing_failed = auto() class TaskType(StrEnum): deploy = auto() start = auto() stop = auto() class ServerTransportProtocol(StrEnum): SSE = "SSE" STREAMABLE_HTTP = "STREAMABLE_HTTP" class RoutingMode(StrEnum): auto = "auto" deepresearch = "deepresearch" ``` -------------------------------------------------------------------------------- /mcp_server/src/logger.py: -------------------------------------------------------------------------------- ```python import logging from mcp_server.src.settings import settings logger = logging.getLogger("registry_mcp") logger.setLevel(settings.log_level) formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s]: %(message)s") stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) settings.log_dir.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(settings.log_dir / "registry_mcp.log") file_handler.setFormatter(formatter) logger.addHandler(file_handler) ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_12_1404-735603f5e4d5_creator_id.py: -------------------------------------------------------------------------------- ```python """creator_id Revision ID: 735603f5e4d5 Revises: 1c6edbdb4642 Create Date: 2025-05-12 14:04:17.438611 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '735603f5e4d5' down_revision = '1c6edbdb4642' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('creator_id', sa.String(), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'creator_id') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_28_0632-f2298e6acd39_base_image.py: -------------------------------------------------------------------------------- ```python """base_image Revision ID: f2298e6acd39 Revises: 7e8c31ddb2db Create Date: 2025-05-28 06:32:44.781544 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = 'f2298e6acd39' down_revision = 'aa9a14a698c5' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('base_image', sa.String(), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'base_image') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_30_1231-4b5c0d56e1ca_build_instructions.py: -------------------------------------------------------------------------------- ```python """build_instructions Revision ID: 4b5c0d56e1ca Revises: f2298e6acd39 Create Date: 2025-05-30 12:31:08.515098 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '4b5c0d56e1ca' down_revision = 'f2298e6acd39' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('build_instructions', sa.String(), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'build_instructions') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /worker/src/main.py: -------------------------------------------------------------------------------- ```python import asyncio from kafka import KafkaAdminClient from kafka.admin import NewTopic from kafka.errors import TopicAlreadyExistsError from registry.src.logger import logger from registry.src.settings import settings from worker.src.consumer import Consumer if __name__ == "__main__": try: admin = KafkaAdminClient(bootstrap_servers=settings.broker_url) worker_topic = NewTopic( name=settings.worker_topic, num_partitions=1, replication_factor=1 ) admin.create_topics([worker_topic]) except TopicAlreadyExistsError: pass consumer = Consumer() logger.info("Starting consumer") asyncio.run(consumer.start()) ``` -------------------------------------------------------------------------------- /mcp_server/src/registry_client.py: -------------------------------------------------------------------------------- ```python from httpx import AsyncClient from mcp_server.src.schemas import RegistryServer from mcp_server.src.settings import settings class RegistryClient: def __init__(self): self.client = AsyncClient(base_url=settings.registry_url) self.timeout = 90 async def search(self, request: str) -> list[RegistryServer]: response = await self.client.get( "/servers/search", params=dict(query=request), timeout=self.timeout ) assert ( response.status_code == 200 ), f"{response.status_code=} {response.content=}" data = response.json() return [RegistryServer.model_validate(server) for server in data] ``` -------------------------------------------------------------------------------- /mcp_server/src/schemas.py: -------------------------------------------------------------------------------- ```python from typing import Any from openai.types.chat import ChatCompletionMessage from openai.types.chat import ChatCompletionToolMessageParam from pydantic import BaseModel from pydantic import ConfigDict class BaseSchema(BaseModel): model_config = ConfigDict(from_attributes=True) class Tool(BaseSchema): name: str description: str input_schema: dict[str, Any] class RegistryServer(BaseSchema): name: str description: str url: str logo: str | None id: int tools: list[Tool] class ToolInfo(BaseSchema): tool_name: str server: RegistryServer class RoutingResponse(BaseSchema): conversation: list[ChatCompletionMessage | ChatCompletionToolMessageParam] ``` -------------------------------------------------------------------------------- /alembic/versions/2025_04_22_0854-1c6edbdb4642_add_status_to_servers.py: -------------------------------------------------------------------------------- ```python """Add status to servers. Revision ID: 1c6edbdb4642 Revises: 49e1ab420276 Create Date: 2025-04-22 08:54:20.892397 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = "1c6edbdb4642" down_revision = "49e1ab420276" branch_labels = None depends_on = None def upgrade(): op.add_column( "server", sa.Column( "status", sa.String(), nullable=True, ), ) op.execute("""UPDATE server SET status = 'active'""") op.alter_column("server", "status", nullable=False) def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column("server", "status") # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /alembic/versions/2025_04_30_1049-387bed3ce6ae_server_title.py: -------------------------------------------------------------------------------- ```python """server title Revision ID: 387bed3ce6ae Revises: 49e1ab420276 Create Date: 2025-04-30 10:49:34.277571 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '387bed3ce6ae' down_revision = '7a3913a3bb4c' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('title', sa.String(), nullable=True)) op.execute("""UPDATE server SET title = name""") op.alter_column("server", "title", nullable=False) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'title') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /registry/src/search/prompts.py: -------------------------------------------------------------------------------- ```python get_top_servers: str = ( """ You are part of MCP search engine. You are given a list of MCP server descriptions and a user's query. Your task is to select a minimal list of MCP servers that have enough tools to satisfy the query. Results of this request will be returned to the user's system which will make a separate request to LLM with the selected tools. Consider if it is possible to satisfy the request in one step or in several steps. Consider if some tools don't have enough data if they are enough to completely perform one step or if you need to select several alternatives for the step. It is important that you follow the output format precisely. Input data for you: ``` mcp_servers = {servers_list!r} user_request = {request!r} ``` """.strip() ) ``` -------------------------------------------------------------------------------- /alembic/versions/2025_06_17_1353-667874dd7dee_added_alias.py: -------------------------------------------------------------------------------- ```python """added llm_tool_name Revision ID: 667874dd7dee Revises: c98f44cea12f Create Date: 2025-06-17 13:53:18.588902 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '667874dd7dee' down_revision = 'c98f44cea12f' branch_labels = None depends_on = None def upgrade(): op.add_column('tool', sa.Column('alias', sa.String(), nullable=True)) op.execute(""" UPDATE tool SET alias = tool.name || '__' || server.name FROM server WHERE server.url = tool.server_url; """) op.alter_column("tool", "alias", nullable=False) def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('tool', 'alias') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /alembic/versions/2025_04_25_1158-49e1ab420276_host_type.py: -------------------------------------------------------------------------------- ```python """host_type Revision ID: 49e1ab420276 Revises: e25328dbfe80 Create Date: 2025-04-25 11:58:17.768233 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '49e1ab420276' down_revision = 'e25328dbfe80' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('host_type', sa.String(), nullable=True)) op.execute("UPDATE server SET host_type = 'internal'") op.alter_column("server", "host_type", nullable=False) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'host_type') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /mcp_server/src/main.py: -------------------------------------------------------------------------------- ```python from mcp.server.fastmcp import FastMCP from .schemas import RoutingResponse from mcp_server.src.settings import settings from mcp_server.src.tools import Tools mcp: FastMCP = FastMCP(settings.server_json.name, port=settings.mcp_port) tools: Tools = Tools() @mcp.tool() async def search_urls(request: str) -> list[str]: """Return URLs of the best MCP servers for processing the given request.""" return await tools.search(request=request) @mcp.tool() async def routing( request: str, ) -> str: """Respond to any user request using MCP tools selected specifically for it.""" response = await tools.routing(request=request) return RoutingResponse(conversation=response).model_dump_json() if __name__ == "__main__": mcp.run(transport="sse") ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_31_1156-c98f44cea12f_add_transport_protocol_column_to_servers.py: -------------------------------------------------------------------------------- ```python """Add transport-protocol column to servers Revision ID: c98f44cea12f Revises: f2298e6acd39 Create Date: 2025-05-31 11:56:05.469429 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = "c98f44cea12f" down_revision = "4b5c0d56e1ca" branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column("server", sa.Column("transport_protocol", sa.String(), nullable=True)) op.execute("""UPDATE server SET transport_protocol = 'SSE' WHERE status = 'active'""") # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column("server", "transport_protocol") # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /registry/requirements.txt: -------------------------------------------------------------------------------- ``` alembic==1.15.1 annotated-types==0.7.0 anyio==4.9.0 asyncpg==0.30.0 certifi==2025.1.31 cfgv==3.4.0 chardet==5.2.0 charset-normalizer==3.4.1 click==8.1.8 distlib==0.3.9 fastapi==0.115.11 fastapi-pagination==0.12.34 filelock==3.18.0 greenlet==3.1.1 h11==0.14.0 httpcore==1.0.7 httpx==0.28.1 httpx-sse==0.4.0 identify==2.6.9 idna==3.10 instructor==1.7.3 Mako==1.3.9 MarkupSafe==3.0.2 mcp==1.9.2 nodeenv==1.9.1 openai==1.65.4 platformdirs==4.3.7 pre_commit==4.2.0 psycopg2-binary==2.9.10 pydantic==2.10.6 pydantic-settings==2.8.1 pydantic_core==2.27.2 cryptography==44.0.2 python-dotenv==1.0.1 PyYAML==6.0.2 requests==2.32.3 setuptools==75.8.0 sniffio==1.3.1 SQLAlchemy==2.0.39 sse-starlette==2.2.1 starlette==0.46.1 typing_extensions==4.12.2 urllib3==2.3.0 uvicorn==0.34.0 virtualenv==20.29.3 wheel==0.45.1 jinja2==3.1.6 aiokafka==0.12.0 ``` -------------------------------------------------------------------------------- /alembic/versions/2025_06_07_1322-aa9a14a698c5_jsonb_logs.py: -------------------------------------------------------------------------------- ```python """jsonb_logs Revision ID: aa9a14a698c5 Revises: 7e8c31ddb2db Create Date: 2025-06-07 13:22:23.229230 """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision = 'aa9a14a698c5' down_revision = '7e8c31ddb2db' branch_labels = None depends_on = None def upgrade(): op.drop_column('server_logs', 'deployment_logs') op.add_column('server_logs', sa.Column( 'deployment_logs', postgresql.JSONB(), nullable=False, server_default=sa.text("'[]'::jsonb") )) def downgrade(): op.drop_column('server_logs', 'deployment_logs') op.add_column('server_logs', sa.Column( 'deployment_logs', sa.VARCHAR(), nullable=False, server_default=sa.text("'[]'::character varying") )) ``` -------------------------------------------------------------------------------- /worker/requirements.txt: -------------------------------------------------------------------------------- ``` alembic==1.15.1 annotated-types==0.7.0 anyio==4.9.0 asyncpg==0.30.0 certifi==2025.1.31 cfgv==3.4.0 chardet==5.2.0 charset-normalizer==3.4.1 click==8.1.8 distlib==0.3.9 fastapi==0.115.11 fastapi-pagination==0.12.34 filelock==3.18.0 greenlet==3.1.1 h11==0.14.0 httpcore==1.0.7 httpx==0.28.1 httpx-sse==0.4.0 identify==2.6.9 idna==3.10 instructor==1.7.3 Mako==1.3.9 MarkupSafe==3.0.2 mcp==1.9.2 nodeenv==1.9.1 openai==1.65.4 platformdirs==4.3.7 pre_commit==4.2.0 psycopg2-binary==2.9.10 pydantic==2.10.6 pydantic-settings==2.8.1 pydantic_core==2.27.2 cryptography==44.0.2 python-dotenv==1.0.1 PyYAML==6.0.2 requests==2.32.3 setuptools==75.8.0 sniffio==1.3.1 SQLAlchemy==2.0.39 sse-starlette==2.2.1 starlette==0.46.1 typing_extensions==4.12.2 urllib3==2.3.0 uvicorn==0.34.0 virtualenv==20.29.3 wheel==0.45.1 jinja2==3.1.6 aiokafka==0.12.0 kafka-python==2.0.3 ``` -------------------------------------------------------------------------------- /alembic/versions/2025_04_09_1117-e25328dbfe80_set_logo_to_none.py: -------------------------------------------------------------------------------- ```python """set logo to none Revision ID: e25328dbfe80 Revises: ffe7a88d5b15 Create Date: 2025-04-09 11:17:59.990186 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = 'e25328dbfe80' down_revision = 'ffe7a88d5b15' branch_labels = None depends_on = None def upgrade(): op.execute("UPDATE server SET logo = NULL WHERE logo = ''") # ### commands auto generated by Alembic - please adjust! ### op.alter_column('server', 'logo', existing_type=sa.VARCHAR(), nullable=True) # ### end Alembic commands ### def downgrade(): op.execute("UPDATE server SET logo = '' WHERE logo IS NULL") # ### commands auto generated by Alembic - please adjust! ### op.alter_column('server', 'logo', existing_type=sa.VARCHAR(), nullable=False) # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /registry/src/validator/prompts.py: -------------------------------------------------------------------------------- ```python from jinja2 import Template SENS_TOPIC_CHECK_SYSTEM_PROMPT: Template = Template( """ Analyze the description of an MCP server and its tools. Your task is to determine whether the server likely works with sensitive data. Consider the following rules: 1. Sensitive data indicators: • Mentions of emails, phone numbers, SSNs, credit card numbers, access tokens, passwords, or other personal identifiable information (PII). • Tools or functionality related to authentication, authorization, payments, identity verification, document uploads, etc. 2. Sensitive domains: • Banking • Finance • Government services • Or any tool that may require sending sensitive user data """ ) SENS_TOPIC_CHECK_USER_PROMPT: Template = Template( """ ''' Server's data: {{ server_data.model_dump_json() }} -------------- Server's tools: {{ tools_data.model_dump_json() }} ''' """ ) ``` -------------------------------------------------------------------------------- /registry/src/validator/schemas.py: -------------------------------------------------------------------------------- ```python from enum import Enum from pydantic import BaseModel from pydantic import Field from pydantic import RootModel from registry.src.servers.schemas import ServerCreate from registry.src.servers.schemas import Tool class SSLAuthorityLevel(Enum): NO_CERTIFICATE = 0 INVALID_CERTIFICATE = 1 SELF_SIGNED_CERTIFICATE = 2 CERTIFICATE_OK = 3 EXTENDED_CERTIFICATE = 4 class ServerNeedsSensitiveDataResponse(BaseModel): reasoning: list[str] = Field( ..., description="For each argument of each tool describe if you can send sensitive data to this argument, and what kind of it", ) server_needs_sensitive_data: bool class ServerWithTools(ServerCreate): tools: list[Tool] class ValidationResult(BaseModel): server_requires_sensitive_data: bool = False authority_level: SSLAuthorityLevel = SSLAuthorityLevel.NO_CERTIFICATE Tools = RootModel[list[Tool]] ``` -------------------------------------------------------------------------------- /registry/src/validator/constants.py: -------------------------------------------------------------------------------- ```python EV_OIDS: list[str] = [ "2.16.756.5.14.7.4.8", "2.16.156.112554.3", "2.16.840.1.114028.10.1.2", "1.3.6.1.4.1.4788.2.202.1", "2.16.840.1.114404.1.1.2.4.1", "2.16.840.1.114413.1.7.23.3", "2.16.840.1.114412.2.1", "1.3.6.1.4.1.14777.6.1.2", "2.16.578.1.26.1.3.3", "1.3.6.1.4.1.14777.6.1.1", "2.23.140.1.1", "1.3.6.1.4.1.7879.13.24.1", "1.3.6.1.4.1.40869.1.1.22.3", "1.3.159.1.17.1", "1.2.616.1.113527.2.5.1.1", "2.16.840.1.114414.1.7.23.3", "1.3.6.1.4.1.34697.2.1", "2.16.756.1.89.1.2.1.1", "1.3.6.1.4.1.6449.1.2.1.5.1", "1.3.6.1.4.1.34697.2.3", "1.3.6.1.4.1.13769.666.666.666.1.500.9.1", "1.3.6.1.4.1.34697.2.2", "1.2.156.112559.1.1.6.1", "1.3.6.1.4.1.8024.0.2.100.1.2", "1.3.6.1.4.1.34697.2.4", "1.2.392.200091.100.721.1", ] SELF_SIGNED_CERTIFICATE_ERR_CODE: int = 18 HTTPS_PORT: int = 443 HTTP_PORT: int = 80 ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_27_0757-7e8c31ddb2db_json_logs.py: -------------------------------------------------------------------------------- ```python """json_logs Revision ID: 7e8c31ddb2db Revises: 7a3913a3bb4c Create Date: 2025-05-27 07:57:51.044585 """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision = '7e8c31ddb2db' down_revision = '387bed3ce6ae' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.execute("UPDATE server_logs SET deployment_logs = '[]'") op.alter_column( "server_logs", "deployment_logs", server_default="[]", existing_server_default="" ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.alter_column( "server_logs", "deployment_logs", server_default="", existing_server_default="[]" ) op.execute("UPDATE server_logs SET deployment_logs = ''") # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_13_1326-4b751cb703e2_timestamps.py: -------------------------------------------------------------------------------- ```python """timestamps Revision ID: 4b751cb703e2 Revises: 3b8b8a5b99ad Create Date: 2025-05-13 13:26:42.259154 """ from datetime import datetime from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '4b751cb703e2' down_revision = '3b8b8a5b99ad' branch_labels = None depends_on = None def upgrade(): op.add_column('server', sa.Column('created_at', sa.DateTime(), nullable=True)) op.add_column('server', sa.Column('updated_at', sa.DateTime(), nullable=True)) current_time = f"timestamp '{datetime.now().isoformat()}'" op.execute(f"""UPDATE server SET created_at = {current_time}, updated_at = {current_time}""") op.alter_column("server", "created_at", nullable=False) op.alter_column("server", "updated_at", nullable=False) def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('server', 'updated_at') op.drop_column('server', 'created_at') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /registry/src/database/models/tool.py: -------------------------------------------------------------------------------- ```python from sqlalchemy import ForeignKey from sqlalchemy import String from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship from ...database import models from .base import Base class Tool(Base): __tablename__ = "tool" __table_args__ = ( UniqueConstraint("name", "server_url", name="uq_tool_name_server_url"), ) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) name: Mapped[str] = mapped_column(String(), nullable=False) description: Mapped[str] = mapped_column(String(), nullable=False) input_schema: Mapped[dict] = mapped_column(JSONB, nullable=False) alias: Mapped[str] = mapped_column(String(), nullable=False) server_url: Mapped[str] = mapped_column( ForeignKey("server.url", ondelete="CASCADE") ) server: Mapped["models.server.Server"] = relationship(back_populates="tools") ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_15_1414-7a3913a3bb4c_logs.py: -------------------------------------------------------------------------------- ```python """logs Revision ID: 7a3913a3bb4c Revises: 4b751cb703e2 Create Date: 2025-05-15 14:14:22.223452 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '7a3913a3bb4c' down_revision = '4b751cb703e2' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table( 'server_logs', sa.Column('deployment_logs', sa.String(), server_default='', nullable=False), sa.Column('created_at', sa.DateTime(), nullable=False), sa.Column('updated_at', sa.DateTime(), nullable=False), sa.Column('server_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['server_id'], ['server.id'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('server_id') ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_table('server_logs') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /worker/src/schemas.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from enum import auto from enum import StrEnum from pydantic import Field from pydantic import field_serializer from registry.src.base_schema import BaseSchema from registry.src.types import TaskType class Task(BaseSchema): task_type: TaskType server_id: int class LogLevel(StrEnum): info = auto() warning = auto() error = auto() class LoggingEvent(StrEnum): command_start = auto() command_result = auto() general_message = auto() class CommandStart(BaseSchema): command: str class CommandResult(BaseSchema): output: str success: bool class GeneralLogMessage(BaseSchema): log_level: LogLevel message: str class LogMessage(BaseSchema): event_type: LoggingEvent data: CommandStart | CommandResult | GeneralLogMessage timestamp: datetime = Field(default_factory=datetime.now) @field_serializer("timestamp") def serialize_datetime(self, value: datetime, _info) -> str: return value.isoformat() ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/user_provided.py: -------------------------------------------------------------------------------- ```python from typing import override from uuid import uuid4 from ..user_logger import UserLogger from .base import DockerfileGenerator from worker.src.constants import BaseImage class UserDockerfileGenerator(DockerfileGenerator): def __init__( self, base_image: BaseImage, repo_folder: str, user_logger: UserLogger, build_instructions: str, ) -> None: self.build_instructions = build_instructions super().__init__( base_image=base_image, repo_folder=repo_folder, user_logger=user_logger ) @override async def generate_dockerfile(self) -> str: await self._image_setup() await self._code_setup() return self._save_dockerfile() @override async def _code_setup(self) -> None: heredoc_identifier = str(uuid4()).replace("-", "") lines = [ "COPY ./ ./", f"RUN <<{heredoc_identifier}", self.build_instructions, heredoc_identifier, ] self._add_to_file(lines) ``` -------------------------------------------------------------------------------- /mcp_server/src/settings.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from pydantic import BaseModel from pydantic import Field from pydantic import field_validator from pydantic_settings import BaseSettings from pydantic_settings import SettingsConfigDict parent_folder = Path(__file__).parent.parent class ServerJson(BaseModel): name: str description: str url: str = Field(alias="endpoint") logo: str | None = None @field_validator("url") # noqa @classmethod def url_ignore_anchor(cls, value: str) -> str: return value.split("#")[0] class Settings(BaseSettings): model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True) registry_url: str = "http://registry:80" log_dir: Path = Path("logs") mcp_port: int = 80 llm_proxy: str | None = None openai_base_url: str = "https://openrouter.ai/api/v1" openai_api_key: str llm_model: str = "openai/gpt-4o-mini" server_json: ServerJson = ServerJson.model_validate_json( Path(parent_folder / "server.json").read_text() ) log_level: str = "INFO" settings = Settings() ``` -------------------------------------------------------------------------------- /registry/src/database/session.py: -------------------------------------------------------------------------------- ```python from collections.abc import AsyncGenerator from typing import Any from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from registry.src.errors import FastApiError from registry.src.logger import logger from registry.src.settings import settings async_engine = create_async_engine( settings.database_url_async, pool_size=settings.db_pool_size, max_overflow=settings.db_max_overflow, pool_pre_ping=True, ) session_maker = async_sessionmaker(bind=async_engine, expire_on_commit=False) async def get_unmanaged_session() -> AsyncSession: return session_maker() async def get_session() -> AsyncGenerator[AsyncSession, Any]: session = session_maker() try: yield session await session.commit() except FastApiError as error: await session.rollback() raise error except Exception as error: logger.error(f"Encountered an exception while processing request:\n{error}") await session.rollback() raise finally: await session.close() ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/factory.py: -------------------------------------------------------------------------------- ```python from ..constants import BaseImage from ..user_logger import UserLogger from .base import DockerfileGenerator from .python import PythonGenerator from .typescript import TypeScriptGenerator from .user_provided import UserDockerfileGenerator async def get_dockerfile_generator( base_image: BaseImage, repo_folder: str, build_instructions: str | None, user_logger: UserLogger, ) -> DockerfileGenerator: if build_instructions: await user_logger.info(message="Build instructions found.") return UserDockerfileGenerator( base_image=base_image, repo_folder=repo_folder, build_instructions=build_instructions, user_logger=user_logger, ) await user_logger.warning( message="No build instructions found. Attempting to generate from scratch." ) if base_image == BaseImage.node_lts: return TypeScriptGenerator( base_image=base_image, repo_folder=repo_folder, user_logger=user_logger ) else: return PythonGenerator( base_image=base_image, repo_folder=repo_folder, user_logger=user_logger ) ``` -------------------------------------------------------------------------------- /alembic/versions/2025_05_13_0634-3b8b8a5b99ad_deployment_fields.py: -------------------------------------------------------------------------------- ```python """deployment fields Revision ID: 3b8b8a5b99ad Revises: 735603f5e4d5 Create Date: 2025-05-13 06:34:52.115171 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '3b8b8a5b99ad' down_revision = '735603f5e4d5' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('server', sa.Column('repo_url', sa.String(), nullable=True)) op.add_column('server', sa.Column('command', sa.String(), nullable=True)) op.alter_column('server', 'url', existing_type=sa.VARCHAR(), nullable=True) op.create_unique_constraint("server_repo_url_unique", 'server', ['repo_url']) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_constraint("server_repo_url_unique", 'server', type_='unique') op.alter_column('server', 'url', existing_type=sa.VARCHAR(), nullable=False) op.drop_column('server', 'command') op.drop_column('server', 'repo_url') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /registry/src/search/schemas.py: -------------------------------------------------------------------------------- ```python from pydantic import Field from registry.src.base_schema import BaseSchema class SearchServerURL(BaseSchema): url: str class SearchServer(SearchServerURL): id: int name: str description: str class SolutionStep(BaseSchema): step_description: str best_server_description: str = Field( ..., description="Description of the best server for this step, copy this from `mcp_servers` entry", ) best_server_name: str = Field( ..., description="Name of the best server for this step, copy this from `mcp_servers` entry", ) best_server: SearchServerURL = Field( ..., description="The best server for this step" ) confidence: str = Field( ..., description="How confident you are that this server is enough for this step?", ) additional_servers: list[SearchServerURL] = Field( ..., description="Alternative servers if you think the `best_server` may not be enough", ) class SearchResponse(BaseSchema): solution_steps: list[SolutionStep] = Field( ..., description="List of solution steps and servers for each step" ) ``` -------------------------------------------------------------------------------- /scripts/darp-add.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Add new MCP servers to the registry """ import argparse import json import requests default_registry_url: str = "http://localhost:80" def main() -> None: parser = argparse.ArgumentParser(description="Add a new MCP server to the registry") parser.add_argument("--url", required=True, help="Endpoint URL for the server") parser.add_argument("--name", required=True, help="Unique server name") parser.add_argument("--description", required=True, help="Server description") parser.add_argument("--logo", default=None, help="Logo URL") parser.add_argument("--verbose", action="store_true", help="Verbose output") parser.add_argument( "--registry-url", default=default_registry_url, help="Registry API endpoint URL" ) args = parser.parse_args() server_data = { "name": args.name, "description": args.description, "url": args.url, "logo": args.logo, } response = requests.post(f"{args.registry_url}/servers/", json=server_data) response.raise_for_status() if args.verbose: print(json.dumps(response.json(), indent=2)) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /registry/src/deployments/schemas.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from typing import Annotated from pydantic import Field from pydantic import field_serializer from pydantic import HttpUrl from registry.src.base_schema import BaseSchema from registry.src.servers.schemas import server_name_regex_pattern from registry.src.types import HostType from registry.src.types import ServerStatus from worker.src.constants import BaseImage from worker.src.schemas import LogMessage class ServerLogsSchema(BaseSchema): server_id: int deployment_logs: list[LogMessage] created_at: datetime updated_at: datetime class DeploymentCreateData(BaseSchema): name: Annotated[str, Field(max_length=30, pattern=server_name_regex_pattern)] title: str = Field(max_length=30) description: str repo_url: HttpUrl command: str | None = None logo: str | None = None base_image: BaseImage | None = None build_instructions: str | None = None host_type: HostType = HostType.internal status: ServerStatus = ServerStatus.in_processing @field_serializer("repo_url") def serialize_url(self, repo_url: HttpUrl) -> str: return str(repo_url) class DeploymentCreate(BaseSchema): current_user_id: str data: DeploymentCreateData class UserId(BaseSchema): current_user_id: str ``` -------------------------------------------------------------------------------- /registry/src/errors.py: -------------------------------------------------------------------------------- ```python from fastapi import HTTPException from fastapi import status class FastApiError(HTTPException): def __init__(self, message: str, **kwargs) -> None: self.detail = {"message": message, **kwargs} class InvalidServerNameError(FastApiError): status_code: int = status.HTTP_400_BAD_REQUEST def __init__(self, name: str) -> None: message = "Name must have only letters, digits, underscore. Name must not start with digits." super().__init__(message=message, name=name) class ServerAlreadyExistsError(FastApiError): status_code: int = status.HTTP_400_BAD_REQUEST def __init__(self, dict_servers: list[dict]) -> None: servers_str = ", ".join(server["name"] for server in dict_servers) message = f"Server already exists: {servers_str}" super().__init__(message=message, servers=dict_servers) class ServersNotFoundError(FastApiError): status_code = status.HTTP_404_NOT_FOUND def __init__(self, ids: list[int]) -> None: super().__init__(message=f"Server(s) not found: {ids}", ids=ids) class NotAllowedError(FastApiError): status_code = status.HTTP_403_FORBIDDEN class InvalidData(FastApiError): status_code = status.HTTP_400_BAD_REQUEST class RemoteServerError(FastApiError): status_code = status.HTTP_503_SERVICE_UNAVAILABLE ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/typescript/generic.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from typing import override from ...errors import DependenciesError from ..base import DockerfileGenerator class TypeScriptGenerator(DockerfileGenerator): @override async def _dependencies_installation(self) -> None: repo_path = Path(self.repo_folder) package_json = self._find_file(repo_path, "package.json") if not package_json: await self.user_logger.error(message="Error: no package.json found") raise DependenciesError("No package.json found") await self.user_logger.info(message="package.json found. Adding to Dockerfile") package_json_folder = self._relative_path(package_json.parent.absolute()) lines = [ f"COPY {package_json_folder}/package*.json ./", ] ts_config = self._find_file(repo_path, "tsconfig.json") if ts_config: await self.user_logger.info( message="tsconfig.json found. Adding to Dockerfile" ) lines.append(f"COPY {self._relative_path(ts_config.absolute())} ./") lines.append("RUN npm install --ignore-scripts") self._add_to_file(lines) @override async def _code_setup(self) -> None: lines = [ "COPY ./ ./", "RUN npm run build", ] self._add_to_file(lines) ``` -------------------------------------------------------------------------------- /docker-compose-debug.yaml: -------------------------------------------------------------------------------- ```yaml services: registry: image: registry pull_policy: never restart: unless-stopped ports: - "${REGISTRY_PORT:-80}:80" profiles: - '' volumes: - ./registry:/workspace/registry - ./alembic:/workspace/alembic - ./common:/workspace/common - ./worker:/workspace/worker command: start-debug.sh registry_mcp_server: image: registry_mcp_server pull_policy: never restart: unless-stopped ports: - "${REGISTRY_MCP_PORT:-4689}:80" profiles: - '' volumes: - ./mcp_server:/workspace/mcp_server - ./common:/workspace/common command: start-debug.sh registry_deploy_worker: pull_policy: never profiles: - '' volumes: - ./worker:/workspace/worker - ./registry:/workspace/registry restart: unless-stopped command: worker-start-debug.sh portal_zookeeper: profiles: - '' restart: unless-stopped portal_kafka: profiles: - '' restart: unless-stopped registry_postgres: ports: - "${REGISTRY_POSTGRES_PORT:-5432}:5432" profiles: - '' restart: unless-stopped registry_healthchecker: pull_policy: never profiles: - '' volumes: - ./registry:/workspace/registry - ./healthchecker:/workspace/healthchecker restart: unless-stopped command: worker-start-debug.sh ``` -------------------------------------------------------------------------------- /registry/src/settings.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from pydantic_settings import BaseSettings from pydantic_settings import SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict(extra="ignore", env_ignore_empty=True) postgres_user: str postgres_db: str postgres_password: str postgres_host: str postgres_port: int = 5432 db_pool_size: int = 50 db_max_overflow: int = 25 log_dir: Path = Path("logs") llm_proxy: str | None = None openai_api_key: str llm_model: str = "openai/gpt-4o-mini" openai_base_url: str = "https://openrouter.ai/api/v1" worker_topic: str = "deployment" deployment_server: str = "" deployment_network: str = "" broker_url: str = "portal_kafka:19092" dockerhub_login: str = "" dockerhub_password: str = "" registry_url: str = "http://registry/" healthcheck_running_interval: int = 3600 tool_retry_count: int = 60 tool_wait_interval: int = 5 deep_research_server_name: str = "deepresearch_darpserver" @property def database_url(self) -> str: auth_data = f"{self.postgres_user}:{self.postgres_password}" host = f"{self.postgres_host}:{self.postgres_port}" return f"{auth_data}@{host}/{self.postgres_db}" @property def database_url_sync(self) -> str: return f"postgresql+psycopg2://{self.database_url}" @property def database_url_async(self) -> str: return f"postgresql+asyncpg://{self.database_url}" settings = Settings() ``` -------------------------------------------------------------------------------- /mcp_server/src/decorators.py: -------------------------------------------------------------------------------- ```python import functools import inspect import sys import traceback from collections.abc import Awaitable from collections.abc import Callable from collections.abc import Coroutine from typing import Any from typing import overload from typing import ParamSpec from typing import TypeVar from mcp_server.src.logger import logger X = TypeVar("X") P = ParamSpec("P") @overload def log_errors( # noqa func: Callable[P, Coroutine[Any, Any, X]] ) -> Callable[P, Coroutine[Any, Any, X]]: ... @overload def log_errors(func: Callable[P, X]) -> Callable[P, X]: ... # noqa def log_errors(func: Callable[P, Any]) -> Callable[P, Any]: def log_error(args: tuple, kwargs: dict) -> None: info = sys.exc_info()[2] assert ( info is not None and info.tb_next is not None ), f"No traceback available {sys.exc_info()[2]=}" locals_vars = info.tb_frame.f_locals logger.error(f"{traceback.format_exc()} \n{locals_vars=} \n{args=} \n{kwargs=}") @functools.wraps(func) def sync_wrapped(*args: P.args, **kwargs: P.kwargs) -> Any: try: return func(*args, **kwargs) except Exception: log_error(args, kwargs) raise @functools.wraps(func) async def async_wrapped(*args: P.args, **kwargs: P.kwargs) -> Awaitable[Any]: try: return await func(*args, **kwargs) except Exception: log_error(args, kwargs) raise if inspect.iscoroutinefunction(func): return async_wrapped return sync_wrapped ``` -------------------------------------------------------------------------------- /alembic/versions/2025_03_24_0945-ffe7a88d5b15_initial_migration.py: -------------------------------------------------------------------------------- ```python """Initial migration Revision ID: ffe7a88d5b15 Revises: Create Date: 2025-03-24 09:45:42.840081 """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision = 'ffe7a88d5b15' down_revision = None branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('server', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('url', sa.String(), nullable=False), sa.Column('name', sa.String(), nullable=False), sa.Column('description', sa.String(), nullable=False), sa.Column('logo', sa.String(), nullable=False), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name'), sa.UniqueConstraint('url') ) op.create_table('tool', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('name', sa.String(), nullable=False), sa.Column('description', sa.String(), nullable=False), sa.Column('input_schema', postgresql.JSONB(astext_type=sa.Text()), nullable=False), sa.Column('server_url', sa.String(), nullable=False), sa.ForeignKeyConstraint(['server_url'], ['server.url'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name', 'server_url', name='uq_tool_name_server_url') ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_table('tool') op.drop_table('server') # ### end Alembic commands ### ``` -------------------------------------------------------------------------------- /healthchecker/src/__main__.py: -------------------------------------------------------------------------------- ```python import asyncio import logging from sqlalchemy.ext.asyncio import AsyncSession from common.notifications.client import NotificationsClient from healthchecker.src.checker import HealthChecker from registry.src.database.session import get_unmanaged_session from registry.src.servers.repository import ServerRepository from registry.src.settings import settings logger: logging.Logger = logging.getLogger("Healthchecker") def setup_logging() -> None: logging.basicConfig(level=logging.INFO) logger.info("Starting healthchecker") logger.info( "Healthchecker will run every %d seconds", settings.healthcheck_running_interval ) async def perform_healthcheck_step() -> None: logger.info("Running health-checks") session: AsyncSession = await get_unmanaged_session() repo: ServerRepository = ServerRepository(session) notifications_client: NotificationsClient = NotificationsClient() healthchecker: HealthChecker = HealthChecker( session=session, servers_repo=repo, notifications_client=notifications_client, ) try: await healthchecker.run_once() except Exception as error: logger.error("Error in healthchecker", exc_info=error) logger.info("Health-checks finished") logger.info("Sleeping for %d seconds", settings.healthcheck_running_interval) await session.commit() await session.close() async def main() -> None: setup_logging() while True: await perform_healthcheck_step() await asyncio.sleep(settings.healthcheck_running_interval) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/base.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from ..constants import BaseImage from ..user_logger import UserLogger class DockerfileGenerator: def __init__( self, base_image: BaseImage, repo_folder: str, user_logger: UserLogger ): self.base_image = base_image self.lines: list[str] = [] self.repo_folder = repo_folder self.user_logger = user_logger async def generate_dockerfile(self) -> str: await self._image_setup() await self._dependencies_installation() await self._code_setup() return self._save_dockerfile() async def _image_setup(self) -> None: lines = [ f"FROM {self.base_image.value}", "WORKDIR /workspace", ] self._add_to_file(lines) async def _dependencies_installation(self) -> None: raise NotImplementedError async def _code_setup(self) -> None: raise NotImplementedError def _add_to_file(self, lines: list[str]) -> None: self.lines.extend(lines) @staticmethod def _find_file(repo_folder: Path, file: str) -> Path | None: files = repo_folder.rglob(file) try: return next(files) except StopIteration: return None def _save_dockerfile(self) -> str: dockerfile_text = "\n".join(self.lines) with open(f"{self.repo_folder}/Dockerfile", "w") as dockerfile: dockerfile.write(dockerfile_text) return dockerfile_text def _relative_path(self, path: Path) -> str: str_path = str(path) if str_path.startswith(self.repo_folder): return "." + str(str_path[len(self.repo_folder) :]) return str_path ``` -------------------------------------------------------------------------------- /registry/src/deployments/router.py: -------------------------------------------------------------------------------- ```python from fastapi import APIRouter from fastapi import Depends from fastapi import status from .schemas import DeploymentCreate from .schemas import ServerLogsSchema from .schemas import UserId from .service import DeploymentService from registry.src.database import Server from registry.src.database import ServerLogs from registry.src.servers.schemas import ServerRead router = APIRouter(prefix="/deployments") @router.post("/", status_code=status.HTTP_201_CREATED, response_model=ServerRead) async def create_server( data: DeploymentCreate, service: DeploymentService = Depends(DeploymentService.get_new_instance), ) -> Server: server = await service.create_server(data) return server @router.get("/{server_id}/logs", response_model=ServerLogsSchema) async def get_logs( server_id: int, current_user_id: str, service: DeploymentService = Depends(DeploymentService.get_new_instance), ) -> ServerLogs: logs = await service.get_logs(server_id=server_id, current_user_id=current_user_id) return logs @router.post("/{server_id}/stop", status_code=status.HTTP_202_ACCEPTED) async def stop_server( server_id: int, user_id: UserId, service: DeploymentService = Depends(DeploymentService.get_new_instance), ) -> None: await service.stop_server( server_id=server_id, current_user_id=user_id.current_user_id ) @router.post("/{server_id}/start", status_code=status.HTTP_202_ACCEPTED) async def start_server( server_id: int, user_id: UserId, service: DeploymentService = Depends(DeploymentService.get_new_instance), ) -> None: await service.start_server( server_id=server_id, current_user_id=user_id.current_user_id ) ``` -------------------------------------------------------------------------------- /registry/src/database/models/server.py: -------------------------------------------------------------------------------- ```python from sqlalchemy import String from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship from ...database import models from ...types import HostType from ...types import ServerTransportProtocol from .base import Base from .mixins import HasCreatedAt from .mixins import HasUpdatedAt from registry.src.types import ServerStatus class Server(HasCreatedAt, HasUpdatedAt, Base): __tablename__ = "server" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) title: Mapped[str] = mapped_column(String(), nullable=False) url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True) name: Mapped[str] = mapped_column(String(), nullable=False, unique=True) description: Mapped[str] = mapped_column(String(), nullable=False) logo: Mapped[str | None] = mapped_column(String(), nullable=True) host_type: Mapped[HostType] = mapped_column(String(), nullable=False) creator_id: Mapped[str | None] = mapped_column(String(), nullable=True) repo_url: Mapped[str | None] = mapped_column(String(), nullable=True, unique=True) command: Mapped[str | None] = mapped_column(String(), nullable=True) base_image: Mapped[str | None] = mapped_column(String(), nullable=True) transport_protocol: Mapped[ServerTransportProtocol | None] = mapped_column( String(), nullable=True, ) build_instructions: Mapped[str | None] = mapped_column(String(), nullable=True) tools: Mapped[list["models.tool.Tool"]] = relationship( back_populates="server", cascade="all, delete-orphan" ) status: Mapped[ServerStatus] = mapped_column( String(), nullable=False, default=ServerStatus.active ) ``` -------------------------------------------------------------------------------- /common/notifications/client.py: -------------------------------------------------------------------------------- ```python from httpx import AsyncClient from common.notifications.enums import NotificationEvent from common.notifications.exceptions import NotificationsServiceError DEFAULT_NOTIFICATIONS_CLIENT_URI: str = "http://mailing_api:8000" class NotificationsClient: def __init__(self, base_url: str | None = None) -> None: if base_url is None: base_url = DEFAULT_NOTIFICATIONS_CLIENT_URI self._client: AsyncClient = AsyncClient(base_url=base_url) async def _notify( self, user_id: str, template: NotificationEvent, data: dict, ) -> None: response = await self._client.post( f"/v1/emails/{template.value}", json=dict(user_id=user_id, **data), ) if response.status_code != 200: raise NotificationsServiceError(f"Failed to send email: {response.text}") async def send_server_down_alert( self, user_id: str, server_name: str, server_logs: str | None = None, ): await self._notify( user_id, NotificationEvent.server_healthcheck_failed, data={ "server_name": server_name, "server_logs": server_logs, }, ) async def notify_deploy_failed(self, user_id: str, server_name: str) -> None: await self._notify( user_id, NotificationEvent.deploy_failed, data={ "server_name": server_name, }, ) async def notify_deploy_successful(self, user_id: str, server_name: str) -> None: await self._notify( user_id, NotificationEvent.deploy_successful, data={ "server_name": server_name, }, ) ``` -------------------------------------------------------------------------------- /registry/src/search/service.py: -------------------------------------------------------------------------------- ```python from typing import Self import instructor from httpx import AsyncClient from openai import AsyncOpenAI from .prompts import get_top_servers from .schemas import SearchResponse from .schemas import SearchServer from registry.src.logger import logger from registry.src.settings import settings class SearchService: def __init__(self) -> None: http_client: AsyncClient = ( AsyncClient() if settings.llm_proxy is None else AsyncClient(proxy=settings.llm_proxy) ) self.llm = instructor.from_openai( AsyncOpenAI(http_client=http_client, base_url=settings.openai_base_url) ) async def _get_search_response( self, servers: list[SearchServer], query: str ) -> SearchResponse: prompt = get_top_servers.format(servers_list=servers, request=query) completion = await self.llm.chat.completions.create( model=settings.llm_model, messages=[ { "role": "user", "content": prompt, }, ], response_model=SearchResponse, ) logger.debug(f"{completion=}") return completion @staticmethod def _collect_urls(search_response: SearchResponse) -> list[str]: server_urls = set() for solution_step in search_response.solution_steps: server_urls.add(solution_step.best_server.url) server_urls |= {server.url for server in solution_step.additional_servers} return list(server_urls) async def get_fitting_servers( self, servers: list[SearchServer], query: str ) -> list[str]: search_response = await self._get_search_response(servers=servers, query=query) return self._collect_urls(search_response=search_response) @classmethod async def get_new_instance(cls) -> Self: return cls() ``` -------------------------------------------------------------------------------- /worker/src/user_logger.py: -------------------------------------------------------------------------------- ```python from sqlalchemy.ext.asyncio import AsyncSession from .schemas import CommandResult from .schemas import CommandStart from .schemas import GeneralLogMessage from .schemas import LoggingEvent from .schemas import LogLevel from .schemas import LogMessage from registry.src.deployments.logs_repository import LogsRepository class UserLogger: def __init__(self, session: AsyncSession, server_id: int): self.logs_repo = LogsRepository(session=session) self.server_id = server_id async def clear_logs(self) -> None: await self.logs_repo.clear_logs(server_id=self.server_id) async def command_start(self, command: str) -> None: log = LogMessage( event_type=LoggingEvent.command_start, data=CommandStart(command=command) ) await self.logs_repo.append_to_deployment_logs( server_id=self.server_id, log=log ) async def command_result(self, output: str, success: bool) -> None: data = CommandResult(output=output, success=success) log = LogMessage(event_type=LoggingEvent.command_result, data=data) await self.logs_repo.append_to_deployment_logs( server_id=self.server_id, log=log ) async def info(self, message: str) -> None: await self._add_message(log_level=LogLevel.info, message=message) async def warning(self, message: str) -> None: await self._add_message(log_level=LogLevel.warning, message=message) async def error(self, message: str) -> None: await self._add_message(log_level=LogLevel.error, message=message) async def _add_message(self, log_level: LogLevel, message: str): data = GeneralLogMessage(log_level=log_level, message=message) log = LogMessage(event_type=LoggingEvent.general_message, data=data) await self.logs_repo.append_to_deployment_logs( server_id=self.server_id, log=log ) ``` -------------------------------------------------------------------------------- /registry/src/deployments/logs_repository.py: -------------------------------------------------------------------------------- ```python from typing import Self from fastapi import Depends from sqlalchemy import select from sqlalchemy import update from sqlalchemy.ext.asyncio import AsyncSession from registry.src.database import get_session from registry.src.database import ServerLogs from worker.src.schemas import LogMessage class LogsRepository: def __init__(self, session: AsyncSession) -> None: self.session = session async def create_logs(self, server_id: int) -> ServerLogs: server_logs = ServerLogs(server_id=server_id) self.session.add(server_logs) await self.session.flush() await self.session.refresh(server_logs) return server_logs async def get_server_logs(self, server_id: int) -> ServerLogs | None: query = select(ServerLogs).where(ServerLogs.server_id == server_id) logs = (await self.session.execute(query)).scalar_one_or_none() return logs async def update_server_logs( self, server_id: int, deployment_logs: list[dict] ) -> None: query = ( update(ServerLogs) .where(ServerLogs.server_id == server_id) .values(deployment_logs=deployment_logs) ) await self.session.execute(query) await self.session.flush() await self.session.commit() async def append_to_deployment_logs(self, server_id: int, log: LogMessage) -> None: query = select(ServerLogs).where(ServerLogs.server_id == server_id) logs: ServerLogs = (await self.session.execute(query)).scalar_one_or_none() if not logs: raise ValueError deployment_logs = logs.deployment_logs + [log.model_dump()] await self.update_server_logs( server_id=server_id, deployment_logs=deployment_logs ) async def clear_logs(self, server_id: int) -> None: await self.update_server_logs(server_id=server_id, deployment_logs=[]) @classmethod async def get_new_instance( cls, session: AsyncSession = Depends(get_session) ) -> Self: return cls(session=session) ``` -------------------------------------------------------------------------------- /alembic/env.py: -------------------------------------------------------------------------------- ```python from logging.config import fileConfig from sqlalchemy import engine_from_config from sqlalchemy import pool from alembic import context from registry.src.database import Base from registry.src.settings import settings # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Interpret the config file for Python logging. # This line sets up loggers basically. if config.config_file_name is not None: fileConfig(config.config_file_name) # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata target_metadata = Base.metadata # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") # ... etc. config.set_main_option("sqlalchemy.url", settings.database_url_sync) def run_migrations_offline() -> None: """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def run_migrations_online() -> None: """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online() ``` -------------------------------------------------------------------------------- /mcp_server/src/llm/tool_manager.py: -------------------------------------------------------------------------------- ```python import json from mcp import ClientSession from mcp.client.sse import sse_client from mcp.types import CallToolResult from openai.types.chat import ChatCompletionMessageToolCall from openai.types.chat import ChatCompletionToolMessageParam from openai.types.chat import ChatCompletionToolParam from mcp_server.src.schemas import RegistryServer from mcp_server.src.schemas import ToolInfo class ToolManager: def __init__(self, servers: list[RegistryServer]) -> None: self.renamed_tools: dict[str, ToolInfo] = {} self.tools: list[ChatCompletionToolParam] = self.set_tools(servers) def rename_and_save(self, tool_name: str, server: RegistryServer) -> str: renamed_tool = f"{tool_name}_mcp_{server.name}" self.renamed_tools[renamed_tool] = ToolInfo(tool_name=tool_name, server=server) return renamed_tool def set_tools(self, servers: list[RegistryServer]) -> list[ChatCompletionToolParam]: return [ ChatCompletionToolParam( type="function", function={ "name": self.rename_and_save(tool.name, server=server), "description": tool.description, "parameters": tool.input_schema, }, ) for server in servers for tool in server.tools ] @staticmethod async def _request_mcp( arguments: str, server_url: str, tool_name: str, ) -> CallToolResult: async with sse_client(server_url) as (read, write): async with ClientSession(read, write) as session: await session.initialize() return await session.call_tool( tool_name, arguments=(json.loads(arguments) if arguments else None), ) async def handle_tool_call( self, tool_call: ChatCompletionMessageToolCall ) -> ChatCompletionToolMessageParam: tool_info = self.renamed_tools.get(tool_call.function.name) if not tool_info: return ChatCompletionToolMessageParam( role="tool", content="Error: Incorrect tool name", tool_call_id=tool_call.id, ) tool_call_result = await self._request_mcp( arguments=tool_call.function.arguments, server_url=tool_info.server.url, tool_name=tool_info.tool_name, ) return ChatCompletionToolMessageParam( role="tool", content=tool_call_result.content[0].text, tool_call_id=tool_call.id, ) ``` -------------------------------------------------------------------------------- /registry/src/servers/schemas.py: -------------------------------------------------------------------------------- ```python from datetime import datetime from typing import Annotated from typing import Any from pydantic import ConfigDict from pydantic import Field from pydantic import field_serializer from registry.src.base_schema import BaseSchema from registry.src.types import HostType from registry.src.types import ServerStatus from registry.src.types import ServerTransportProtocol from worker.src.constants import BaseImage tool_name_regex = "^[a-zA-Z0-9_-]{1,64}$" server_name_regex_pattern = "^[a-z0-9]+(_[a-z0-9]+)*$" class Tool(BaseSchema): model_config = ConfigDict(populate_by_name=True, **BaseSchema.model_config) name: str = Field(pattern=tool_name_regex) alias: str = Field(pattern=tool_name_regex) description: str input_schema: dict[str, Any] = Field(validation_alias="inputSchema") class ServerCreateData(BaseSchema): name: Annotated[str, Field(pattern=server_name_regex_pattern, max_length=30)] title: str = Field(max_length=30) description: str url: str logo: str | None = None host_type: HostType = HostType.external class ServerCreate(BaseSchema): data: ServerCreateData current_user_id: str | None = None class ServerRead(BaseSchema): title: str id: int status: ServerStatus name: str creator_id: str | None description: str repo_url: str | None url: str | None command: str | None logo: str | None host_type: HostType base_image: BaseImage | None = None build_instructions: str | None = None transport_protocol: ServerTransportProtocol | None created_at: datetime updated_at: datetime @field_serializer("created_at", "updated_at") def serialize_datetime(self, value: datetime, _info) -> str: return value.isoformat() class ServerWithTools(ServerRead): tools: list[Tool] class ServerUpdateData(BaseSchema): title: str | None = Field(max_length=30, default=None) name: Annotated[ str | None, Field(pattern=server_name_regex_pattern, max_length=30) ] = None description: str | None = None url: str | None = None logo: str | None = None class DeploymentUpdateData(BaseSchema): command: str | None = None repo_url: str | None = None base_image: BaseImage | None = None build_instructions: str | None = None class ServerUpdate(BaseSchema): data: ServerUpdateData deployment_data: DeploymentUpdateData current_user_id: str | None = None class MCP(BaseSchema): name: str description: str endpoint: str = Field(validation_alias="url") logo: str | None = None class MCPJson(BaseSchema): version: str = "1.0" servers: list[MCP] ``` -------------------------------------------------------------------------------- /scripts/darp-search.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ List matching MCP servers from the registry. """ import argparse import json import sys from typing import Any from typing import Dict from typing import List import requests default_registry_url: str = "http://localhost:80" def main() -> None: parser = argparse.ArgumentParser(description="Search for servers in the registry") parser.add_argument("query", help="Search query string") parser.add_argument( "--registry-url", default=default_registry_url, help=f"Registry URL (default: {default_registry_url})", ) parser.add_argument( "--format", choices=["names", "fulltext", "json"], default="names", help="Output format (default: names)", ) args = parser.parse_args() servers = search_servers(args.query, args.registry_url) display_servers(servers, args.format) def search_servers( query: str, base_url: str = default_registry_url ) -> List[Dict[str, Any]]: url = f"{base_url}/servers/search" params = {"query": query} try: response = requests.get(url, params=params) response.raise_for_status() return response.json() except requests.RequestException as error: print(f"Error: Failed to search servers - {error}", file=sys.stderr) sys.exit(1) def display_servers( servers: List[Dict[str, Any]], format_output: str = "names" ) -> None: if format_output == "json": display_servers_json(servers) elif format_output == "names": display_servers_names(servers) elif format_output == "fulltext": display_servers_fulltext(servers) def display_servers_json(servers: List[Dict[str, Any]]) -> None: print(json.dumps(servers, indent=2)) def display_servers_names(servers: List[Dict[str, Any]]) -> None: print(f"Found {len(servers)} servers:") for server in servers: print(f"{server['name']}") def display_servers_fulltext(servers: List[Dict[str, Any]]) -> None: print(f"Found {len(servers)} servers:") for server in servers: print(f"{server['id']}: {server['name']}") print(f"{indent(server['url'], 1)}") print(f"{indent(server['description'], 1)}") if server["tools"]: print(indent("Tools:", 1)) for tool in server["tools"]: print(f"{indent(tool['name'], 2)}") print(f"{indent(tool['description'], 2)}") print(f"{indent(json.dumps(tool['input_schema'], indent=2), 2)}") def indent(text: str, level: int = 1, prefix: str = " ") -> str: return "\n".join(prefix * level + line for line in text.split("\n")) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /mcp_server/src/tools.py: -------------------------------------------------------------------------------- ```python import logging from openai.types.chat import ChatCompletionMessage from openai.types.chat import ChatCompletionMessageParam from openai.types.chat import ChatCompletionToolMessageParam from openai.types.chat import ChatCompletionUserMessageParam from common.llm.client import LLMClient from mcp_server.src.decorators import log_errors from mcp_server.src.llm.prompts import default_prompt from mcp_server.src.llm.tool_manager import ToolManager from mcp_server.src.registry_client import RegistryClient from mcp_server.src.settings import settings class Tools: def __init__(self): self.registry_client = RegistryClient() logger = logging.getLogger("LLMClient") self.llm_client = LLMClient( logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url ) @log_errors async def search(self, request: str) -> list[str]: servers = await self.registry_client.search(request=request) return [server.url for server in servers] @log_errors async def routing( self, request: str ) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]: servers = await self.registry_client.search(request=request) manager = ToolManager(servers) user_message = ChatCompletionUserMessageParam(content=request, role="user") resulting_conversation = await self._llm_request( messages=[user_message], manager=manager ) return resulting_conversation async def _llm_request( self, messages: list[ChatCompletionMessageParam], manager: ToolManager, response_accumulator: ( list[ChatCompletionMessage | ChatCompletionToolMessageParam] | None ) = None, ) -> list[ChatCompletionMessage | ChatCompletionToolMessageParam]: if response_accumulator is None: response_accumulator = [] completion = await self.llm_client.request( system_prompt=default_prompt, model=settings.llm_model, messages=messages, tools=manager.tools, ) choice = completion.choices[0] response_accumulator.append(choice.message) if choice.message.tool_calls: tool_calls = choice.message.tool_calls tool_result_messages = [ await manager.handle_tool_call(tool_call) for tool_call in tool_calls ] response_accumulator += tool_result_messages conversation = messages + [choice.message] + tool_result_messages response_accumulator = await self._llm_request( messages=conversation, manager=manager, response_accumulator=response_accumulator, ) return response_accumulator ``` -------------------------------------------------------------------------------- /registry/src/validator/service.py: -------------------------------------------------------------------------------- ```python import logging from typing import Self from fastapi import Depends from openai.types.chat import ChatCompletionSystemMessageParam from openai.types.chat import ChatCompletionUserMessageParam from .prompts import SENS_TOPIC_CHECK_SYSTEM_PROMPT from .prompts import SENS_TOPIC_CHECK_USER_PROMPT from common.llm.client import LLMClient from registry.src.servers.repository import ServerRepository from registry.src.servers.schemas import ServerCreate from registry.src.servers.schemas import Tool from registry.src.settings import settings from registry.src.validator.schemas import ServerNeedsSensitiveDataResponse from registry.src.validator.schemas import Tools from registry.src.validator.schemas import ValidationResult from registry.src.validator.ssl import SSLHelper class ValidationService: def __init__( self, servers_repo: ServerRepository, ssl_helper: SSLHelper, ) -> None: self._servers_repo = servers_repo self._ssl_helper = ssl_helper logger = logging.getLogger("LLMClient") self._llm_client = LLMClient( logger, proxy=settings.llm_proxy, base_url=settings.openai_base_url ) async def _mcp_needs_sensitive_data( self, data: ServerCreate, tools: list[Tool], ) -> bool: system_message = ChatCompletionSystemMessageParam( content=SENS_TOPIC_CHECK_SYSTEM_PROMPT.render(), role="system" ) message = ChatCompletionUserMessageParam( content=SENS_TOPIC_CHECK_USER_PROMPT.render( server_data=data, tools_data=Tools.model_validate(tools), ), role="user", ) response = await self._llm_client.request_to_beta( model=settings.llm_model, messages=[system_message, message], response_format=ServerNeedsSensitiveDataResponse, ) answer = ServerNeedsSensitiveDataResponse.model_validate_json( response.choices[0].message.content, ) return answer.server_needs_sensitive_data async def validate_server( self, data: ServerCreate, tools: list[Tool] ) -> ValidationResult: result = ValidationResult() result.server_requires_sensitive_data = await self._mcp_needs_sensitive_data( data, tools, ) result.authority_level = await self._ssl_helper.get_ssl_authority_level( data.url, ) return result @classmethod def get_new_instance( cls, servers_repo: ServerRepository = Depends(ServerRepository.get_new_instance), ssl_helper: SSLHelper = Depends(SSLHelper.get_new_instance), ) -> Self: return cls( servers_repo=servers_repo, ssl_helper=ssl_helper, ) ``` -------------------------------------------------------------------------------- /worker/src/consumer.py: -------------------------------------------------------------------------------- ```python from datetime import timedelta from typing import Any from aiokafka import AIOKafkaConsumer from .deployment_service import WorkerDeploymentService from registry.src.database import get_unmanaged_session from registry.src.logger import logger from registry.src.servers.repository import ServerRepository from registry.src.settings import settings from worker.src.schemas import Task from worker.src.schemas import TaskType class Consumer: def __init__(self) -> None: self.max_poll_interval: int = int(timedelta(minutes=30).total_seconds()) self.session_timeout: int = int(timedelta(minutes=10).total_seconds()) self.task_handlers: dict[TaskType, Any] = { TaskType.deploy: self.deploy_server, TaskType.start: self.start_server, TaskType.stop: self.stop_server, } async def start(self) -> None: consumer = AIOKafkaConsumer( settings.worker_topic, bootstrap_servers=settings.broker_url, auto_offset_reset="earliest", group_id="deploy_workers", group_instance_id="1", max_poll_interval_ms=self.max_poll_interval * 1000, session_timeout_ms=self.session_timeout * 1000, ) await consumer.start() try: await self.process_messages(consumer) finally: await consumer.stop() async def process_messages(self, consumer: AIOKafkaConsumer) -> None: async for message in consumer: incoming_message = self.parse_message(message.value) if not incoming_message: continue await self.process_message(incoming_message) async def process_message(self, task: Task) -> None: session = await get_unmanaged_session() try: repo = ServerRepository(session) await self.task_handlers[task.task_type](task.server_id, repo) except Exception as error: logger.error("Error while processing a task", exc_info=error) await session.commit() async def deploy_server(self, server_id: int, repo: ServerRepository) -> None: service = WorkerDeploymentService(repo=repo, logger=logger) await service.deploy_server(server_id=server_id) async def stop_server(self, server_id: int, repo: ServerRepository) -> None: raise NotImplementedError() async def start_server(self, server_id: int, repo: ServerRepository) -> None: raise NotImplementedError() @staticmethod def parse_message(message: bytes) -> Task | None: logger.debug(f"Incoming message - {message.decode()}") try: task = Task.model_validate_json(message.decode()) return task except Exception as error: logger.error("Incorrect message received", exc_info=error) return None ``` -------------------------------------------------------------------------------- /registry/src/validator/ssl.py: -------------------------------------------------------------------------------- ```python import asyncio import logging import ssl from typing import Self from urllib.parse import urlparse from cryptography import x509 from cryptography.hazmat._oid import ExtensionOID from cryptography.hazmat.backends import default_backend from .constants import EV_OIDS from .constants import HTTP_PORT from .constants import HTTPS_PORT from .constants import SELF_SIGNED_CERTIFICATE_ERR_CODE from .schemas import SSLAuthorityLevel class SSLHelper: def __init__(self) -> None: self._logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") async def get_ssl_authority_level(self, url: str) -> SSLAuthorityLevel: try: cert = await self._load_certificate(url) except ssl.SSLCertVerificationError as exc: if exc.verify_code == SELF_SIGNED_CERTIFICATE_ERR_CODE: return SSLAuthorityLevel.SELF_SIGNED_CERTIFICATE return SSLAuthorityLevel.INVALID_CERTIFICATE except Exception as exc: self._logger.error("Failed to fetch ssl cert", exc_info=exc) return SSLAuthorityLevel.NO_CERTIFICATE if self._is_cert_respects_ev(cert): return SSLAuthorityLevel.EXTENDED_CERTIFICATE return SSLAuthorityLevel.CERTIFICATE_OK @classmethod def get_new_instance(cls) -> Self: return cls() @staticmethod def _extract_port(uri: str) -> int: parsed = urlparse(uri) if parsed.port: return parsed.port if parsed.scheme == "https": return HTTPS_PORT elif parsed.scheme == "http": return HTTP_PORT raise ValueError("Invalid URI", uri) @staticmethod def _extract_host(uri: str) -> str: parsed = urlparse(uri) if parsed.hostname is None: raise ValueError("Invalid URL: No hostname found") return parsed.hostname @classmethod async def _load_certificate(cls, url: str) -> x509.Certificate: hostname = cls._extract_host(url) port = cls._extract_port(url) ssl_context = ssl.create_default_context() ssl_context.check_hostname = True ssl_context.verify_mode = ssl.CERT_REQUIRED reader, writer = await asyncio.open_connection( hostname, port, ssl=ssl_context, server_hostname=hostname ) ssl_object = writer.get_extra_info("ssl_object") der_cert = ssl_object.getpeercert(binary_form=True) writer.close() await writer.wait_closed() cert_obj = x509.load_der_x509_certificate(der_cert, default_backend()) return cert_obj @staticmethod def _is_cert_respects_ev(cert: x509.Certificate) -> bool: try: policies = cert.extensions.get_extension_for_oid( ExtensionOID.CERTIFICATE_POLICIES ).value except x509.ExtensionNotFound: return False for policy in policies: if policy.policy_identifier.dotted_string in EV_OIDS: return True return False ``` -------------------------------------------------------------------------------- /worker/src/dockerfile_generators/python/generic.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from typing import override from ...errors import DependenciesError from ..base import DockerfileGenerator class PythonGenerator(DockerfileGenerator): @override async def _dependencies_installation(self) -> None: repo_path = Path(self.repo_folder) await self._install_node() uv_file = self._find_file(repo_path, "uv.lock") pyproject_file = self._find_file(repo_path, "pyproject.toml") readme_file = self._find_file(repo_path, "README.md") if pyproject_file and readme_file: await self.user_logger.info( message="pyproject.toml found. Installing dependencies with pip" ) self._pip_dependencies(pyproject=pyproject_file, readme_file=readme_file) return requirements_file = self._find_file(repo_path, "requirements.txt") if requirements_file: await self.user_logger.info( message="requirements.txt found. Installing dependencies with legacy pip method" ) self._legacy_pip_dependencies(requirements_file) return if uv_file and pyproject_file and readme_file: await self.user_logger.info( message="uv.lock found. Installing dependencies with uv" ) self._uv_dependencies( uv_file=uv_file, pyproject=pyproject_file, readme_file=readme_file ) return raise DependenciesError("No supported dependencies installation method found") @override async def _code_setup(self) -> None: lines = [ "COPY ./ ./", ] self._add_to_file(lines) async def _install_node(self) -> None: await self.user_logger.info(message="Adding nodejs for supergateway") lines = [ "RUN apt update && apt install -y nodejs npm", ] self._add_to_file(lines) def _legacy_pip_dependencies(self, requirements: Path) -> None: lines = [ f"COPY {self._relative_path(requirements.absolute())} ./requirements.txt", "RUN pip install -r requirements.txt", ] self._add_to_file(lines) def _pip_dependencies(self, pyproject: Path, readme_file: Path) -> None: lines = [ f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml", f"COPY {self._relative_path(readme_file.absolute())} ./README.md", "RUN pip install .", ] self._add_to_file(lines) def _uv_dependencies( self, uv_file: Path, pyproject: Path, readme_file: Path ) -> None: lines = [ "COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/", f"COPY {self._relative_path(uv_file.absolute())} ./uv.lock", f"COPY {self._relative_path(pyproject.absolute())} ./pyproject.toml", f"COPY {self._relative_path(readme_file.absolute())} ./README.md", "RUN uv pip install . --system", ] self._add_to_file(lines) ``` -------------------------------------------------------------------------------- /healthchecker/src/checker.py: -------------------------------------------------------------------------------- ```python import logging from collections.abc import Sequence from sqlalchemy import Select from sqlalchemy.ext.asyncio import AsyncSession from common.notifications.client import NotificationsClient from registry.src.database.models.server import Server from registry.src.servers.repository import ServerRepository from registry.src.servers.schemas import Tool from registry.src.servers.service import ServerService from registry.src.types import ServerStatus class HealthChecker: def __init__( self, session: AsyncSession, servers_repo: ServerRepository, notifications_client: NotificationsClient, ) -> None: self.session = session self.repo = servers_repo self.notifications_client = notifications_client self.logger = logging.getLogger("Healthchecker") async def run_once(self) -> None: servers = await self._fetch_servers() for server in servers: await self._check_server(server) async def _fetch_servers(self) -> Sequence[Server]: query: Select = await self.repo.get_all_servers() servers: Sequence[Server] = ( (await self.session.execute(query)).scalars().fetchall() ) return servers async def _get_tools(self, server: Server) -> list[Tool]: return await ServerService.get_tools(server.url, server.name) async def _check_server(self, server: Server) -> None: self.logger.info("Checking server %s", server.name) try: tools = await self._get_tools(server) except Exception as error: await self._handle_failed_server(server, error) return if not tools: await self._handle_empty_tools(server) return await self._mark_server_healthy(server, tools) async def _handle_failed_server(self, server: Server, error: Exception) -> None: self.logger.error("Failed to fetch server's tools", exc_info=error) self.logger.warning("Server %s will be marked as unhealthy", server.name) await self.notifications_client.send_server_down_alert( server.creator_id, server_name=server.name, ) await self._update_server_status(server, [], ServerStatus.inactive) async def _handle_empty_tools(self, server: Server) -> None: self.logger.warning("Server %s is alive, but has no tools", server.name) self.logger.warning("Server will be marked as unhealthy.") await self.notifications_client.send_server_down_alert( server.creator_id, server_name=server.name, server_logs=( "Server has no tools. It will be marked as unhealthy. " "Please, check it out." ), ) await self._update_server_status(server, [], ServerStatus.inactive) async def _mark_server_healthy(self, server: Server, tools: list[Tool]) -> None: await self._update_server_status(server, tools, ServerStatus.active) async def _update_server_status( self, server: Server, tools: list[Tool], status: ServerStatus ) -> None: await self.repo.update_server( id=server.id, tools=tools, status=status, ) ``` -------------------------------------------------------------------------------- /alembic.ini: -------------------------------------------------------------------------------- ``` # A generic, single database configuration. [alembic] # path to migration scripts # Use forward slashes (/) also on windows to provide an os agnostic path script_location = alembic # template used to generate migration file names; The default value is %%(rev)s_%%(slug)s # Uncomment the line below if you want the files to be prepended with date and time # see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file # for all available tokens file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s # sys.path path, will be prepended to sys.path if present. # defaults to the current working directory. prepend_sys_path = . # timezone to use when rendering the date within the migration file # as well as the filename. # If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. # Any required deps can installed by adding `alembic[tz]` to the pip requirements # string value is passed to ZoneInfo() # leave blank for localtime # timezone = # max length of characters to apply to the "slug" field # truncate_slug_length = 40 # set to 'true' to run the environment during # the 'revision' command, regardless of autogenerate # revision_environment = false # set to 'true' to allow .pyc and .pyo files without # a source .py file to be detected as revisions in the # versions/ directory # sourceless = false # version location specification; This defaults # to alembic/versions. When using multiple version # directories, initial revisions must be specified with --version-path. # The path separator used here should be the separator specified by "version_path_separator" below. # version_locations = %(here)s/bar:%(here)s/bat:alembic/versions # version path separator; As mentioned above, this is the character used to split # version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. # If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. # Valid values for version_path_separator are: # # version_path_separator = : # version_path_separator = ; # version_path_separator = space # version_path_separator = newline # # Use os.pathsep. Default configuration used for new projects. version_path_separator = os # set to 'true' to search source files recursively # in each "version_locations" directory # new in Alembic version 1.10 # recursive_version_locations = false # the output encoding used when revision files # are written from script.py.mako # output_encoding = utf-8 sqlalchemy.url = driver://user:pass@localhost/dbname [post_write_hooks] # post_write_hooks defines scripts or Python functions that are run # on newly generated revision scripts. See the documentation for further # detail and examples # format using "black" - use the console_scripts runner, against the "black" entrypoint # hooks = black # black.type = console_scripts # black.entrypoint = black # black.options = -l 79 REVISION_SCRIPT_FILENAME # lint with attempts to fix using "ruff" - use the exec runner, execute a binary # hooks = ruff # ruff.type = exec # ruff.executable = %(here)s/.venv/bin/ruff # ruff.options = --fix REVISION_SCRIPT_FILENAME # Logging configuration [loggers] keys = root,sqlalchemy,alembic [handlers] keys = console [formatters] keys = generic [logger_root] level = WARNING handlers = console qualname = [logger_sqlalchemy] level = WARNING handlers = qualname = sqlalchemy.engine [logger_alembic] level = INFO handlers = qualname = alembic [handler_console] class = StreamHandler args = (sys.stderr,) level = NOTSET formatter = generic [formatter_generic] format = %(levelname)-5.5s [%(name)s] %(message)s datefmt = %H:%M:%S ``` -------------------------------------------------------------------------------- /common/llm/client.py: -------------------------------------------------------------------------------- ```python from logging import Logger from typing import Type from httpx import AsyncClient from openai import APIError from openai import AsyncOpenAI from openai import InternalServerError from openai import NOT_GIVEN from openai import OpenAIError from openai.types.chat import ChatCompletion from openai.types.chat import ChatCompletionMessageParam from openai.types.chat import ChatCompletionToolParam from pydantic import BaseModel class LLMClient: def __init__( self, logger: Logger, proxy: str | None = None, base_url: str | None = None ) -> None: self._logger = logger if proxy is not None: self._logger.info("Enabled proxy %s", proxy) http_client = AsyncClient(proxy=proxy) else: http_client = AsyncClient() self.openai_client = AsyncOpenAI(http_client=http_client, base_url=base_url) @staticmethod def _get_full_messages( system_prompt: str | None, messages: list[ChatCompletionMessageParam] ) -> list[ChatCompletionMessageParam]: system_message: list[dict] = [] if system_prompt: system_message = [ { "role": "system", "content": system_prompt, # Anthropic prompt caching "cache_control": {"type": "ephemeral"}, } ] full_messages = system_message + messages return full_messages async def request( self, model: str, messages: list[ChatCompletionMessageParam], system_prompt: str | None = None, max_tokens: int | None = None, tools: list[ChatCompletionToolParam] | None = None, ) -> ChatCompletion: full_messages = self._get_full_messages( system_prompt=system_prompt, messages=messages ) try: response = await self.openai_client.chat.completions.create( model=model, messages=full_messages, max_tokens=max_tokens, tools=tools or NOT_GIVEN, timeout=30, ) except APIError as error: self._logger.error(f"{error.code=} {error.body=}") raise except InternalServerError as error: self._logger.error(error.response.json()) raise except OpenAIError as error: self._logger.error( "Request to Provider failed with the following exception", exc_info=error, ) raise return response async def request_to_beta( self, model: str, messages: list[ChatCompletionMessageParam], system_prompt: str | None = None, max_tokens: int | None = None, tools: list[ChatCompletionToolParam] | None = None, response_format: Type[BaseModel] = NOT_GIVEN, ) -> ChatCompletion: full_messages = self._get_full_messages( system_prompt=system_prompt, messages=messages ) try: response = await self.openai_client.beta.chat.completions.parse( model=model, messages=full_messages, max_tokens=max_tokens, tools=tools or NOT_GIVEN, timeout=30, response_format=response_format, ) except APIError as error: self._logger.error(f"{error.code=} {error.body=}") raise except InternalServerError as error: self._logger.error(error.response.json()) raise except OpenAIError as error: self._logger.error( "Request to Provider failed with the following exception", exc_info=error, ) raise return response ``` -------------------------------------------------------------------------------- /worker/src/docker_service.py: -------------------------------------------------------------------------------- ```python import subprocess from pathlib import Path from .constants import BaseImage from .dockerfile_generators import DockerfileGenerator from .dockerfile_generators import get_dockerfile_generator from .errors import InvalidData from .errors import ProcessingError from .user_logger import UserLogger from registry.src.database import Server from registry.src.logger import logger from registry.src.settings import settings class DockerService: def __init__( self, docker_env: dict, repo_folder: str, server: Server, user_logger: UserLogger, ) -> None: self.docker_env = docker_env self.repo_folder = repo_folder self.server = server self.image_name = f"hipasus/{self.server.name}" self.container_name = f"darp_server_{server.id}" self.user_logger = user_logger async def clone_repo(self) -> None: await self._execute(["git", "clone", self.server.repo_url, self.repo_folder]) def dockerfile_exists(self) -> bool: path = Path(f"{self.repo_folder}/Dockerfile") return path.exists() async def generate_dockerfile(self) -> None: if not self.server.base_image: await self.user_logger.error(message="No base image provided.") raise InvalidData generator: DockerfileGenerator = await get_dockerfile_generator( repo_folder=self.repo_folder, base_image=BaseImage(self.server.base_image), build_instructions=self.server.build_instructions, user_logger=self.user_logger, ) try: logs = await generator.generate_dockerfile() await self.user_logger.info(message=f"Generated Dockerfile:\n{logs}") logger.info(logs) except ProcessingError as error: await self.user_logger.error( message=f"Error generating Dockerfile: {error.message or ''}" ) raise async def build_image(self) -> None: command = [ "docker", "build", "-t", self.image_name, "-f", f"{self.repo_folder}/Dockerfile", self.repo_folder, ] await self._execute(command) async def push_image(self) -> None: docker_push_command = ( f"docker login -u {settings.dockerhub_login} -p {settings.dockerhub_password} && docker image push {self.image_name}", ) await self.user_logger.info("Pushing image...") result = subprocess.run(docker_push_command, shell=True) if result.returncode != 0: await self.user_logger.error(message="Docker push failed") logger.error("docker push failed") raise ProcessingError async def run_container(self) -> None: subprocess.run( ["docker", "rm", self.container_name, "--force"], env=self.docker_env ) run_command = [ "docker", "run", "--network", settings.deployment_network, "-d", "--restart", "always", "--pull", "always", "--name", self.container_name, self.image_name, ] if self.server.command: run_command.extend(["sh", "-c", self.server.command]) await self._execute(command=run_command, env=self.docker_env) async def _execute(self, command: list[str], env: dict | None = None) -> None: command_str = " ".join(command) logger.info(f"{command_str=}") await self.user_logger.command_start(command=command_str) arguments: dict = dict( args=command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if env: arguments["env"] = env result = subprocess.run(**arguments) output = result.stdout.decode() if result.stdout else "" # noqa if result.returncode != 0: await self.user_logger.command_result(output=output, success=False) logger.error(f"{' '.join(command[:2])} failed") raise ProcessingError await self.user_logger.command_result(output=output, success=True) ``` -------------------------------------------------------------------------------- /registry/src/servers/router.py: -------------------------------------------------------------------------------- ```python from typing import Literal from fastapi import APIRouter from fastapi import Depends from fastapi import Query from fastapi import status as status_codes from fastapi_pagination import add_pagination from fastapi_pagination import Page from fastapi_pagination import Params from fastapi_pagination.ext.sqlalchemy import paginate from sqlalchemy import Select from sqlalchemy.ext.asyncio import AsyncSession from ..deployments.service import DeploymentService from ..errors import InvalidData from .schemas import MCPJson from .schemas import ServerCreate from .schemas import ServerRead from .schemas import ServerUpdate from .schemas import ServerWithTools from .service import ServerService from registry.src.database import get_session from registry.src.database import Server from registry.src.search.schemas import SearchServer from registry.src.search.service import SearchService from registry.src.types import RoutingMode from registry.src.types import ServerStatus router = APIRouter(prefix="/servers") add_pagination(router) @router.post( "/", response_model=ServerWithTools, status_code=status_codes.HTTP_201_CREATED ) async def create( data: ServerCreate, service: ServerService = Depends(ServerService.get_new_instance) ) -> Server: return await service.create(data) @router.get("/search", response_model=list[ServerWithTools]) async def search( query: str, routing_mode: RoutingMode = RoutingMode.auto, service: ServerService = Depends(ServerService.get_new_instance), search_service: SearchService = Depends(SearchService.get_new_instance), ) -> list[Server]: if routing_mode == RoutingMode.auto: servers = await service.get_search_servers() formatted_servers = [SearchServer.model_validate(server) for server in servers] server_urls = await search_service.get_fitting_servers( servers=formatted_servers, query=query ) return await service.get_servers_by_urls(server_urls=server_urls) elif routing_mode == RoutingMode.deepresearch: return await service.get_deep_research() raise InvalidData(f"Unsupported {routing_mode=}", routing_mode=routing_mode) @router.get("/batch", response_model=list[ServerWithTools]) async def get_servers_by_ids( ids: list[int] = Query(...), service: ServerService = Depends(ServerService.get_new_instance), ) -> list[Server]: return await service.get_servers_by_ids(ids=ids) @router.get("/.well-known/mcp.json", response_model=MCPJson) async def get_mcp_json( service: ServerService = Depends(ServerService.get_new_instance), ) -> MCPJson: return await service.get_mcp_json() @router.delete("/{id}") async def delete_server( id: int, current_user_id: str, service: ServerService = Depends(ServerService.get_new_instance), ) -> None: return await service.delete_server(id=id, current_user_id=current_user_id) @router.get("/", response_model=Page[ServerWithTools]) async def get_all_servers( query: str | None = None, status: Literal["any"] | ServerStatus = Query( title="Server status", default=ServerStatus.active, description="Filter servers by status, by default servers with status `active` will be returned", ), creator_id: str | None = None, params: Params = Depends(), service: ServerService = Depends(ServerService.get_new_instance), session: AsyncSession = Depends(get_session), ) -> Page[Server]: status_filter: ServerStatus | None = None if isinstance(status, ServerStatus): status_filter = status servers: Select = await service.get_all_servers( search_query=query, status=status_filter, creator_id=creator_id, ) return await paginate(session, servers, params) @router.get("/{id}", response_model=ServerWithTools) async def get_server_by_id( id: int, service: ServerService = Depends(ServerService.get_new_instance), ) -> Server: return await service.get_server_by_id(id=id) @router.put("/{id}", response_model=ServerRead) async def update_server( id: int, data: ServerUpdate, service: ServerService = Depends(ServerService.get_new_instance), deployment_service: DeploymentService = Depends(DeploymentService.get_new_instance), ) -> Server: await deployment_service.update_deployment(server_id=id, data=data) server = await service.update_server(id=id, data=data) return server ``` -------------------------------------------------------------------------------- /scripts/darp-router.py: -------------------------------------------------------------------------------- ```python import argparse import asyncio from typing import Any from typing import List from typing import Literal from typing import Union from mcp import ClientSession from mcp.client.sse import sse_client from mcp.types import CallToolResult from openai.types.chat import ChatCompletionMessage from openai.types.chat.chat_completion_content_part_text_param import ( ChatCompletionContentPartTextParam, ) from pydantic import BaseModel default_mcp_url: str = "http://localhost:4689/sse" tool_name: str = "routing" async def main() -> None: parser = argparse.ArgumentParser(description="DARP Router") parser.add_argument("request", type=str, help="Routing request") parser.add_argument( "--format", choices=["text", "json"], default="text", help="Output format" ) parser.add_argument("--verbose", action="store_true", help="Verbose output") args = parser.parse_args() response = await request_mcp( server_url=default_mcp_url, tool_name=tool_name, arguments={"request": args.request}, ) display_response(response.content[0].text, format=args.format, verbose=args.verbose) async def request_mcp( server_url: str = default_mcp_url, tool_name: str = tool_name, arguments: dict[str, Any] | None = None, ) -> CallToolResult: async with sse_client(server_url) as (read, write): async with ClientSession(read, write) as session: await session.initialize() return await session.call_tool( tool_name, arguments=arguments or {}, ) def display_response(response: str, format: str = "text", verbose: bool = False): if format == "json": print(response) else: routing_response = RoutingResponse.model_validate_json(response) for message in routing_response.conversation: print(message.__str__(verbose)) class PrintableChatCompletionMessage(ChatCompletionMessage): def __str__(self, verbose: bool = False) -> str: contents = "\n".join( filter( None, [ self._format_content("refusal", verbose), self._format_content("function_call", verbose), self._format_content("tool_calls", verbose), self._format_content("content", verbose=True), ], ) ) return f"{self.role}: {contents}" def _format_content(self, key: str, verbose: bool = False) -> str | None: value = getattr(self, f"_{key}_content")() if value is None: return None if not verbose: return f"[{key}]" else: return f"{value}" def _refusal_content(self) -> str | None: if self.refusal is not None: return "\n" + indent(self.refusal) return None def _function_call_content(self) -> str | None: if self.function_call is not None: return f"{self.function_call.name}({self.function_call.arguments})" return None def _tool_calls_content(self) -> str | None: if self.tool_calls is not None: return "\n" + indent( "\n".join( f"{call.function.name}({call.function.arguments})" for call in self.tool_calls ) ) return None def _content_content(self) -> str | None: if self.content is not None: return "\n" + indent(self.content) return None class PrintableChatCompletionToolMessageParam(BaseModel): role: Literal["tool"] content: Union[str, List[ChatCompletionContentPartTextParam]] tool_call_id: str def __str__(self, verbose: bool = False) -> str: if not verbose: return f"[{self.role}] ..." if isinstance(self.content, str): return f"{self.role}: {self.content}" elif isinstance(self.content, list): contents = "\n".join( "{type}: {text}".format(**item) for item in self.content ) return indent(f"{self.role}:\n{indent(contents)}") else: raise ValueError(f"Invalid content type: {type(self.content)}") class RoutingResponse(BaseModel): conversation: list[ PrintableChatCompletionMessage | PrintableChatCompletionToolMessageParam ] def indent(text: str, indent: int = 1, prefix: str = " "): return "\n".join(prefix * indent + line for line in text.split("\n")) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /worker/src/deployment_service.py: -------------------------------------------------------------------------------- ```python import asyncio import os import tempfile from logging import Logger from .docker_service import DockerService from .user_logger import UserLogger from common.notifications.client import NotificationsClient from registry.src.database import Server from registry.src.errors import InvalidData from registry.src.servers.repository import ServerRepository from registry.src.servers.schemas import ServerUpdateData from registry.src.servers.schemas import Tool from registry.src.servers.service import ServerService from registry.src.settings import settings from registry.src.types import ServerStatus from registry.src.types import ServerTransportProtocol from worker.src.errors import ProcessingError class WorkerDeploymentService: def __init__(self, repo: ServerRepository, logger: Logger): self.repo = repo self.logger = logger self.docker_env = dict(**os.environ) self.docker_env["DOCKER_HOST"] = settings.deployment_server self.notifications_client = NotificationsClient() async def deploy_server(self, server_id: int) -> None: server = await self.repo.get_server(id=server_id) if not server: raise ProcessingError with tempfile.TemporaryDirectory( prefix=f"server_{server.id}", dir="/tmp" ) as repo_folder: user_logger = UserLogger(session=self.repo.session, server_id=server_id) await user_logger.clear_logs() service = DockerService( docker_env=self.docker_env, repo_folder=repo_folder, server=server, user_logger=user_logger, ) server_url = f"http://{service.container_name}/sse" try: await self._deploy_repo(service=service) tools = await self._get_tools( server=server, service=service, server_url=server_url ) except Exception: await self._handle_deploy_failure(server) raise await user_logger.info("Deployment complete.") await self._handle_deploy_success( tools=tools, server=server, url=server_url, transport_protocol=ServerTransportProtocol.SSE, ) async def _deploy_repo(self, service: DockerService) -> None: await service.clone_repo() if not service.dockerfile_exists(): self.logger.info(f"No Dockerfile in repo: {service.server.repo_url}") await self._ensure_command(service) await service.user_logger.info( message="No Dockerfile found in the project. Attempting to generate..." ) await service.generate_dockerfile() await service.build_image() await service.push_image() await service.run_container() async def _handle_deploy_failure(self, server: Server) -> None: await self.repo.update_server( id=server.id, data=ServerUpdateData(), status=ServerStatus.processing_failed, ) await self.notifications_client.notify_deploy_failed( user_id=server.creator_id, server_name=server.name, ) async def _handle_deploy_success( self, tools: list[Tool], server: Server, url: str, transport_protocol: ServerTransportProtocol, ) -> None: await self.repo.update_server( id=server.id, tools=tools, data=ServerUpdateData(url=url), status=ServerStatus.active, transport_protocol=transport_protocol, ) await self.notifications_client.notify_deploy_successful( user_id=server.creator_id, server_name=server.name, ) async def _get_tools( self, server: Server, service: DockerService, server_url: str ) -> list[Tool]: for i in range(settings.tool_retry_count): tools = await self._fetch_tools( server=server, server_url=server_url, service=service ) if tools: return tools await service.user_logger.error("Error getting tools from server") raise ProcessingError async def _fetch_tools( self, server: Server, server_url: str, service: DockerService ) -> list[Tool] | None: try: await service.user_logger.info( f"Waiting {settings.tool_wait_interval} seconds for server to start up" ) await asyncio.sleep(settings.tool_wait_interval) tools = await ServerService.get_tools( server_url=server_url, server_name=server.name ) await service.user_logger.info( "Successfully got a list of tools from server" ) return tools except InvalidData: await service.user_logger.warning("Failed to connect to server") return None @staticmethod async def _ensure_command(service: DockerService): if not service.server.command: await service.user_logger.error( "Command must not be empty if project does not have a Dockerfile" ) raise ProcessingError ``` -------------------------------------------------------------------------------- /registry/src/deployments/service.py: -------------------------------------------------------------------------------- ```python import re from typing import Self from fastapi import Depends from ..settings import settings from ..types import HostType from ..types import TaskType from .logs_repository import LogsRepository from .schemas import DeploymentCreate from registry.src.database import Server from registry.src.database import ServerLogs from registry.src.errors import InvalidData from registry.src.errors import InvalidServerNameError from registry.src.errors import NotAllowedError from registry.src.errors import ServerAlreadyExistsError from registry.src.errors import ServersNotFoundError from registry.src.producer import get_producer from registry.src.servers.repository import ServerRepository from registry.src.servers.schemas import ServerRead from registry.src.servers.schemas import ServerUpdate from registry.src.servers.service import ID_REGEX_PATTERN from registry.src.types import ServerStatus from worker.src.schemas import Task class DeploymentService: def __init__(self, repo: ServerRepository, logs_repo: LogsRepository) -> None: self.repo = repo self.logs_repo = logs_repo async def get_server(self, server_id: int, creator_id: str | None = None) -> Server: await self._assure_server_found(server_id) server = await self.repo.get_server(server_id) if creator_id: await self._assure_server_creator(server=server, user_id=creator_id) return server async def get_logs(self, server_id: int, current_user_id: str) -> ServerLogs: server = await self.get_server(server_id=server_id, creator_id=current_user_id) if server.host_type != HostType.internal: raise InvalidData("Logs are only available for internal servers") logs = await self.logs_repo.get_server_logs(server_id=server_id) assert logs return logs async def create_server(self, data: DeploymentCreate) -> Server: self._assure_server_name_is_valid_id(data.data.name) await self._assure_server_not_exists( name=data.data.name, repo_url=str(data.data.repo_url) ) server = await self.repo.create_server( data=data.data, creator_id=data.current_user_id, tools=[] ) await self.logs_repo.create_logs(server_id=server.id) await self.repo.session.commit() await self._send_task(server_id=server.id, task_type=TaskType.deploy) return server async def update_deployment(self, server_id: int, data: ServerUpdate) -> Server: server = await self.get_server( server_id=server_id, creator_id=data.current_user_id ) if not data.deployment_data.model_dump(exclude_none=True): return server server = await self.repo.update_server( id=server_id, **data.deployment_data.model_dump(), status=ServerStatus.in_processing, ) await self.repo.session.commit() await self._send_task(server_id=server.id, task_type=TaskType.deploy) return server async def stop_server(self, server_id: int, current_user_id: str) -> None: server = await self.get_server(server_id=server_id, creator_id=current_user_id) if server.status == ServerStatus.stopped: return if server.status != ServerStatus.active: raise InvalidData("Server is not active", server_status=server.status) await self._send_task(server_id=server_id, task_type=TaskType.stop) async def start_server(self, server_id: int, current_user_id: str) -> None: server = await self.get_server(server_id=server_id, creator_id=current_user_id) if server.status == ServerStatus.active: return if server.status != ServerStatus.stopped: raise InvalidData("Invalid server state", server_status=server.status) await self._send_task(server_id=server_id, task_type=TaskType.start) @staticmethod async def _send_task(server_id: int, task_type: TaskType) -> None: producer = await get_producer() await producer.start() await producer.send_and_wait( topic=settings.worker_topic, value=Task(task_type=task_type, server_id=server_id), ) await producer.stop() async def _assure_server_found(self, server_id: int) -> None: if not await self.repo.find_servers(id=server_id): raise ServersNotFoundError([server_id]) @staticmethod async def _assure_server_creator(server: Server, user_id: str) -> None: if server.creator_id != user_id: raise NotAllowedError( "Must be server creator to use this function", creator_id=server.creator_id, current_user_id=user_id, ) async def _assure_server_not_exists( self, name: str | None = None, repo_url: str | None = None, ignore_id: int | None = None, ) -> None: if servers := await self.repo.find_servers( name=name, repo_url=repo_url, ignore_id=ignore_id ): dict_servers = [ ServerRead.model_validate(server).model_dump() for server in servers ] raise ServerAlreadyExistsError(dict_servers) @staticmethod def _assure_server_name_is_valid_id(name: str) -> None: if not re.match(ID_REGEX_PATTERN, name): raise InvalidServerNameError(name=name) @classmethod def get_new_instance( cls, repo: ServerRepository = Depends(ServerRepository.get_new_instance), logs_repo: LogsRepository = Depends(LogsRepository.get_new_instance), ) -> Self: return cls(repo=repo, logs_repo=logs_repo) ``` -------------------------------------------------------------------------------- /registry/src/servers/repository.py: -------------------------------------------------------------------------------- ```python from fastapi import Depends from sqlalchemy import delete from sqlalchemy import func from sqlalchemy import or_ from sqlalchemy import Select from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from .schemas import ServerCreateData from .schemas import ServerUpdateData from .schemas import Tool from registry.src.database import get_session from registry.src.database import Server from registry.src.database import Tool as DBTool from registry.src.deployments.schemas import DeploymentCreateData from registry.src.errors import ServersNotFoundError from registry.src.types import ServerStatus from registry.src.types import ServerTransportProtocol from worker.src.constants import BaseImage class ServerRepository: def __init__(self, session: AsyncSession) -> None: self.session = session async def find_servers( self, id: int | None = None, name: str | None = None, url: str | None = None, ignore_id: int | None = None, repo_url: str | None = None, ) -> list[Server]: if id is None and name is None and url is None: raise ValueError("At least one of 'id', 'name', or 'url' must be provided.") if id == ignore_id and id is not None: raise ValueError("id and ignore_id cannot be the same.") query = select(Server).options(selectinload(Server.tools)) conditions = [ (Server.id == id), (func.lower(Server.name) == func.lower(name)), ] if url: conditions.append(Server.url == url) if repo_url: conditions.append(Server.repo_url == repo_url) query = query.filter(or_(*conditions)) query = query.filter(Server.id != ignore_id) result = await self.session.execute(query) return result.scalars().all() async def get_server(self, id: int) -> Server: query = select(Server).filter(Server.id == id) query = query.options(selectinload(Server.tools)) result = await self.session.execute(query) server = result.scalars().first() if server is None: raise ServersNotFoundError([id]) return server async def create_server( self, data: ServerCreateData | DeploymentCreateData, tools: list[Tool], creator_id: str | None, transport_protocol: ServerTransportProtocol | None = None, ) -> Server: db_tools = [] if isinstance(data, ServerCreateData): db_tools = self._convert_tools(tools, data.url) server = Server( **data.model_dump(exclude_none=True), tools=db_tools, creator_id=creator_id, transport_protocol=transport_protocol, ) self.session.add(server) await self.session.flush() await self.session.refresh(server) return await self.get_server(server.id) async def get_all_servers( self, search_query: str | None = None, status: ServerStatus | None = None, creator_id: str | None = None, ) -> Select: query = select(Server).options(selectinload(Server.tools)) if status is not None: query = query.where(Server.status == status) if creator_id: query = query.where(Server.creator_id == creator_id) if search_query: query_string = f"%{search_query}%" query = query.where( or_( Server.name.ilike(query_string), Server.description.ilike(query_string), Server.title.ilike(query_string), ) ) return query async def get_servers_by_urls(self, urls: list[str]) -> list[Server]: query = select(Server).where(Server.url.in_(urls)) result = await self.session.execute(query) return list(result.scalars().all()) async def get_servers_by_ids(self, ids: list[int]) -> list[Server]: query = ( select(Server).where(Server.id.in_(ids)).options(selectinload(Server.tools)) ) result = await self.session.execute(query) return list(result.scalars().all()) async def delete_server(self, id: int) -> None: query = delete(Server).where(Server.id == id) await self.session.execute(query) async def update_server( self, id: int, data: ServerUpdateData | None = None, tools: list[Tool] | None = None, status: ServerStatus | None = None, repo_url: str | None = None, command: str | None = None, base_image: BaseImage | None = None, build_instructions: str | None = None, transport_protocol: ServerTransportProtocol | None = None, ) -> Server: server = await self.get_server(id) update_values = dict( status=status, repo_url=repo_url, command=command, transport_protocol=transport_protocol, base_image=base_image, build_instructions=build_instructions, ) update_values = { key: value for key, value in update_values.items() if value is not None } if data: update_values.update(data.model_dump(exclude_none=True)) for key, value in update_values.items(): setattr(server, key, value) if tools is not None: query = delete(DBTool).where(DBTool.server_url == server.url) await self.session.execute(query) await self.session.flush() await self.session.refresh(server) server.tools.extend(self._convert_tools(tools, server.url)) await self.session.flush() await self.session.refresh(server) return server def _convert_tools(self, tools: list[Tool], url: str) -> list[DBTool]: return [ DBTool(**tool.model_dump(exclude_none=True), server_url=url) for tool in tools ] @classmethod def get_new_instance( cls, session: AsyncSession = Depends(get_session) ) -> "ServerRepository": return cls(session) ``` -------------------------------------------------------------------------------- /docker-compose.yaml: -------------------------------------------------------------------------------- ```yaml services: registry: build: context: . dockerfile: registry/Dockerfile environment: UVICORN_PORT: ${UVICORN_PORT:-80} POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres} POSTGRES_USER: ${POSTGRES_USER:-change_me} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me} POSTGRES_DB: ${POSTGRES_DB:-registry} POSTGRES_PORT: ${POSTGRES_PORT:-} OPENAI_API_KEY: ${OPENAI_API_KEY} OPENAI_BASE_URL: ${OPENAI_BASE_URL:-} LLM_PROXY: ${LLM_PROXY:-} LLM_MODEL: ${LLM_MODEL:-} HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-} DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-} TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-} TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-} image: ${REGISTRY_IMAGE:-registry}:${CI_COMMIT_BRANCH:-local} container_name: registry pull_policy: always restart: always depends_on: registry_postgres: condition: service_healthy restart: true profiles: - main volumes: - /var/lib/hipasus/registry/api/logs:/workspace/logs healthcheck: test: curl -f http://localhost:${UVICORN_PORT:-80}/healthcheck interval: 15s timeout: 5s retries: 3 start_period: 15s command: start.sh registry_mcp_server: build: context: . dockerfile: mcp_server/Dockerfile environment: - MCP_PORT - LOG_LEVEL - REGISTRY_URL - OPENAI_API_KEY - OPENAI_BASE_URL - LLM_PROXY - LLM_MODEL image: ${MCP_REGISTRY_IMAGE:-registry_mcp}:${CI_COMMIT_BRANCH:-local} container_name: registry_mcp_server pull_policy: always restart: always depends_on: registry: condition: service_healthy restart: true healthcheck: test: python3 -m mcp_server.scripts.mcp_health_check interval: 15s timeout: 5s retries: 3 start_period: 15s profiles: - main volumes: - /var/lib/hipasus/registry/mcp_server/logs:/workspace/logs command: start.sh registry_deploy_worker: container_name: registry_deploy_worker image: ${WORKER_IMAGE_NAME:-registry_worker}:${CI_COMMIT_BRANCH:-local} build: context: . dockerfile: worker/Dockerfile pull_policy: always environment: LOG_LEVEL: ${LOG_LEVEL} POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres} POSTGRES_USER: ${POSTGRES_USER:-change_me} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me} POSTGRES_DB: ${POSTGRES_DB:-registry} POSTGRES_PORT: ${POSTGRES_PORT:-5432} OPENAI_API_KEY: ${OPENAI_API_KEY} OPENAI_BASE_URL: ${OPENAI_BASE_URL:-} LLM_PROXY: ${LLM_PROXY:-} LLM_MODEL: ${LLM_MODEL:-} DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN} DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD} DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network} HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-} DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-} TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-} TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-} profiles: - main depends_on: registry_postgres: condition: service_healthy restart: true portal_kafka: condition: service_healthy restart: true volumes: - /var/lib/hipasus/registry/worker/logs:/workspace/logs - /var/run/docker.sock:/var/run/docker.sock restart: always command: worker-start.sh portal_zookeeper: image: confluentinc/cp-zookeeper:7.3.2 container_name: portal_zookeeper ports: - "2181:2181" profiles: - main environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: portal_zookeeper:2888:3888 healthcheck: test: nc -z localhost 2181 || exit -1 interval: 30s timeout: 20s retries: 5 restart: always portal_kafka: image: confluentinc/cp-kafka:7.3.2 container_name: portal_kafka profiles: - main environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://portal_kafka:19092,DOCKER://127.0.0.1:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "portal_zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" healthcheck: test: kafka-cluster cluster-id --bootstrap-server 127.0.0.1:29092 || exit 1 interval: 30s timeout: 20s retries: 5 depends_on: portal_zookeeper: condition: service_healthy restart: true restart: always registry_postgres: container_name: registry_postgres image: postgres:17 environment: POSTGRES_USER: ${POSTGRES_USER:-change_me} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me} POSTGRES_DB: ${POSTGRES_DB:-registry} volumes: - /var/lib/hipasus/registry/pg_data:/var/lib/postgresql/data command: postgres -c 'max_connections=500' profiles: - main healthcheck: test: pg_isready -U ${POSTGRES_USER:-change_me} -d ${POSTGRES_DB:-registry} interval: 7s timeout: 5s retries: 5 start_period: 5s restart: always registry_healthchecker: container_name: registry_healthchecker image: ${HEALTHCHECKER_IMAGE_NAME:-registry_healthchecker}:${CI_COMMIT_BRANCH:-local} build: context: . dockerfile: healthchecker/Dockerfile pull_policy: always environment: LOG_LEVEL: ${LOG_LEVEL} POSTGRES_HOST: ${POSTGRES_HOST:-registry_postgres} POSTGRES_USER: ${POSTGRES_USER:-change_me} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change_me} POSTGRES_DB: ${POSTGRES_DB:-registry} POSTGRES_PORT: ${POSTGRES_PORT:-5432} OPENAI_API_KEY: ${OPENAI_API_KEY} OPENAI_BASE_URL: ${OPENAI_BASE_URL:-} LLM_PROXY: ${LLM_PROXY:-} LLM_MODEL: ${LLM_MODEL:-} DOCKERHUB_LOGIN: ${DOCKERHUB_LOGIN} DOCKERHUB_PASSWORD: ${DOCKERHUB_PASSWORD} DEPLOYMENT_NETWORK: ${DOCKER_NETWORK:-portal_network} HEALTHCHECK_RUNNING_INTERVAL: ${HEALTHCHECK_RUNNING_INTERVAL:-} DEEP_RESEARCH_SERVER_NAME: ${DEEP_RESEARCH_SERVER_NAME:-} TOOL_RETRY_COUNT: ${TOOL_RETRY_COUNT:-} TOOL_WAIT_INTERVAL: ${TOOL_WAIT_INTERVAL:-} profiles: - main depends_on: registry: condition: service_healthy restart: true restart: always command: worker-start.sh networks: default: name: ${DOCKER_NETWORK:-portal_network} external: true ``` -------------------------------------------------------------------------------- /registry/src/servers/service.py: -------------------------------------------------------------------------------- ```python import re from contextlib import _AsyncGeneratorContextManager # noqa from typing import Self from fastapi import Depends from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import streamablehttp_client from pydantic import ValidationError from sqlalchemy import Select from ..settings import settings from .repository import ServerRepository from .schemas import MCP from .schemas import MCPJson from .schemas import ServerCreate from .schemas import ServerRead from .schemas import ServerUpdate from .schemas import Tool from .schemas import tool_name_regex from registry.src.database import Server from registry.src.errors import InvalidData from registry.src.errors import InvalidServerNameError from registry.src.errors import NotAllowedError from registry.src.errors import RemoteServerError from registry.src.errors import ServerAlreadyExistsError from registry.src.errors import ServersNotFoundError from registry.src.logger import logger from registry.src.types import ServerStatus from registry.src.types import ServerTransportProtocol BASE_SERVER_NAME = "base_darp_server#REPLACEME" ID_REGEX_PATTERN = re.compile(rf"^[a-zA-Z_][a-zA-Z0-9_]*|{BASE_SERVER_NAME}$") class ServerService: def __init__(self, repo: ServerRepository) -> None: self.repo = repo async def create(self, data: ServerCreate) -> Server: self._assure_server_name_is_valid_id(data.data.name) await self._assure_server_not_exists(name=data.data.name, url=data.data.url) server_transport_protocol = await self.detect_transport_protocol( server_url=data.data.url ) logger.info( "Detected server's transport protocol %s", server_transport_protocol ) tools = await self.get_tools( server_url=data.data.url, server_name=data.data.name, transport=server_transport_protocol, ) return await self.repo.create_server( data.data, tools, creator_id=data.current_user_id, transport_protocol=server_transport_protocol, ) @classmethod async def get_tools( cls, server_url: str, server_name: str, transport: ServerTransportProtocol | None = None, ) -> list[Tool]: if transport is None: transport = await cls.detect_transport_protocol(server_url) try: return await cls._fetch_tools( server_url=server_url, server_name=server_name, transport=transport ) except Exception as e: if isinstance(e, InvalidData): raise logger.warning( f"Error while getting tools from server {server_url}", exc_info=e, ) raise InvalidData( "Server is unhealthy or URL does not lead to a valid MCP server", url=server_url, ) async def delete_server(self, id: int, current_user_id: str) -> None: server = await self.get_server_by_id(id=id) if server.creator_id != current_user_id: raise NotAllowedError("Only server creator can delete it") await self.repo.delete_server(id=id) async def get_all_servers( self, status: ServerStatus | None = None, search_query: str | None = None, creator_id: str | None = None, ) -> Select: return await self.repo.get_all_servers( search_query=search_query, status=status, creator_id=creator_id ) async def get_server_by_id(self, id: int) -> Server: await self._assure_server_found(id) return await self.repo.get_server(id=id) async def get_servers_by_ids(self, ids: list[int]) -> list[Server]: servers = await self.repo.get_servers_by_ids(ids=ids) if len(ids) != len(servers): retrieved_server_ids = {server.id for server in servers} missing_server_ids = set(ids) - retrieved_server_ids raise ServersNotFoundError(ids=list(missing_server_ids)) return servers async def update_server(self, id: int, data: ServerUpdate) -> Server: server = await self.get_server_by_id(id=id) transport_protocol = None if server.creator_id != data.current_user_id: raise NotAllowedError("Only server creator can update it") if data.data.name or data.data.url: await self._assure_server_not_exists( name=data.data.name, url=data.data.url, ignore_id=id ) updated_server_url = data.data.url or server.url if updated_server_url is not None: transport_protocol = await self.detect_transport_protocol( server_url=updated_server_url ) tools = await self.get_tools( server_url=updated_server_url, server_name=data.data.name or server.name, transport=transport_protocol, ) else: tools = [] return await self.repo.update_server( id, data.data, tools, transport_protocol=transport_protocol, ) async def _assure_server_found(self, id: int) -> None: if not await self.repo.find_servers(id=id): raise ServersNotFoundError([id]) def _assure_server_name_is_valid_id(self, name: str) -> None: if not re.match(ID_REGEX_PATTERN, name): raise InvalidServerNameError(name=name) async def _assure_server_not_exists( self, id: int | None = None, name: str | None = None, url: str | None = None, ignore_id: int | None = None, ) -> None: if servers := await self.repo.find_servers(id, name, url, ignore_id): dict_servers = [ ServerRead.model_validate(server).model_dump() for server in servers ] raise ServerAlreadyExistsError(dict_servers) async def get_search_servers(self) -> list[Server]: servers_query = await self.repo.get_all_servers() servers_query = servers_query.where(Server.url != None) # noqa servers = (await self.repo.session.execute(servers_query)).scalars().all() return list(servers) async def get_servers_by_urls(self, server_urls: list[str]) -> list[Server]: servers = await self.repo.get_servers_by_urls(urls=server_urls) if len(server_urls) != len(servers): retrieved_server_urls = {server.url for server in servers} missing_server_urls = set(server_urls) - retrieved_server_urls logger.warning( f"One or more server urls are incorrect {missing_server_urls=}" ) return servers async def get_mcp_json(self) -> MCPJson: servers = await self.repo.get_all_servers() servers = (await self.repo.session.execute(servers)).scalars().all() return MCPJson(servers=[MCP.model_validate(server) for server in servers]) async def get_deep_research(self) -> list[Server]: servers = await self.repo.find_servers(name=settings.deep_research_server_name) if len(servers) == 0: raise RemoteServerError( f"{settings.deep_research_server_name} MCP server does not exist in registry" ) assert len(servers) == 1, "Invalid state. Multiple deepresearch servers found" if servers[0].status != ServerStatus.active: raise RemoteServerError( f"{settings.deep_research_server_name} MCP server is down." ) return servers @classmethod def get_new_instance( cls, repo: ServerRepository = Depends(ServerRepository.get_new_instance) ) -> Self: return cls(repo=repo) @classmethod async def _fetch_tools( cls, server_url: str, server_name: str, transport: ServerTransportProtocol ) -> list[Tool]: client_ctx = cls._get_client_context(server_url, transport) async with client_ctx as (read, write, *_): async with ClientSession(read, write) as session: await session.initialize() tools_response = await session.list_tools() try: tools = [ Tool( **tool.model_dump(exclude_none=True), alias=f"{tool.name}__{server_name}", ) for tool in tools_response.tools ] except ValidationError: raise InvalidData( message=f"Invalid tool names. {{tool_name}}__{{server_name}} must fit {tool_name_regex}" ) return tools @classmethod def _get_client_context( cls, server_url: str, transport_protocol: ServerTransportProtocol, ) -> _AsyncGeneratorContextManager: if transport_protocol == ServerTransportProtocol.STREAMABLE_HTTP: client_ctx = streamablehttp_client(server_url) elif transport_protocol == ServerTransportProtocol.SSE: client_ctx = sse_client(server_url) else: raise RuntimeError( "Unsupported transport protocol: %s", transport_protocol.name ) return client_ctx @classmethod async def detect_transport_protocol( cls, server_url: str, ) -> ServerTransportProtocol: for protocol in ( ServerTransportProtocol.STREAMABLE_HTTP, ServerTransportProtocol.SSE, ): if await cls._is_transport_supported(server_url, protocol): return protocol raise InvalidData( "Can't detect server's transport protocol, maybe server is unhealthy?", url=server_url, ) @classmethod async def _is_transport_supported( cls, server_url: str, protocol: ServerTransportProtocol, ) -> bool: try: client_ctx = cls._get_client_context(server_url, protocol) async with client_ctx as (read, write, *_): return await cls._can_initialize_session(read, write) except Exception as e: logger.error( "Failed to create %s client", protocol.name, exc_info=e, ) return False @staticmethod async def _can_initialize_session(read, write) -> bool: try: async with ClientSession(read, write) as session: await session.initialize() return True except Exception: return False ```