#
tokens: 10370/50000 13/13 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── pyproject.toml
├── README.md
├── src
│   └── brev_mcp
│       ├── __init__.py
│       ├── api.py
│       ├── cli.py
│       ├── instance_types.py
│       ├── models.py
│       ├── server.py
│       ├── tools.py
│       └── workspace.py
├── tests
│   └── test.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.10
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Brev MCP server
 2 | 
 3 | This is a MCP server implementation for Brev.
 4 | 
 5 | ## Configuration
 6 | 
 7 | The MCP server uses the Brev CLI's API access token and currently set org. 
 8 | 
 9 | Follow the [Brev documentation](https://www.brev.dev/docs/reference/brev-cli) to download the CLI and login if you haven't already.
10 | 
11 | If you want to switch your Brev org, run `brev set <org-name>`
12 | 
13 | The CLI access token expires every hour. If you have any 403 errors, simply run `brev ls` to refresh the access token.
14 | 
15 | ## Quickstart
16 | 
17 | ### Setup repository locally
18 | 
19 | `git clone [email protected]:brevdev/brev-mcp.git`
20 | 
21 | ### Install uv
22 | 
23 | Follow the [uv installation guide](https://docs.astral.sh/uv/getting-started/installation/)
24 | 
25 | #### Claude Desktop
26 | 
27 | On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
28 | On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
29 | 
30 | Add the following to your `claude_desktop_config.json`:
31 | 
32 | <details>
33 |   <summary>Development/Unpublished Servers Configuration</summary>
34 | 
35 |   ```json
36 |   "mcpServers": {
37 |     "brev_mcp": {
38 |       "command": "uv",
39 |       "args": [
40 |         "--directory",
41 |         "<path-to-repo>",
42 |         "run",
43 |         "brev-mcp"
44 |       ]
45 |     }
46 |   }
47 |   ```
48 | </details>
49 | 
50 | ## Development
51 | 
52 | ### Building and Publishing
53 | 
54 | To prepare the package for distribution:
55 | 
56 | 1. Sync dependencies and update lockfile:
57 | ```bash
58 | uv sync
59 | ```
60 | 
61 | 2. Build package distributions:
62 | ```bash
63 | uv build
64 | ```
65 | 
66 | This will create source and wheel distributions in the `dist/` directory.
67 | 
68 | 3. Publish to PyPI:
69 | ```bash
70 | uv publish
71 | ```
72 | 
73 | Note: You'll need to set PyPI credentials via environment variables or command flags:
74 | - Token: `--token` or `UV_PUBLISH_TOKEN`
75 | - Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
76 | 
77 | ### Debugging
78 | 
79 | Since MCP servers run over stdio, debugging can be challenging. For the best debugging
80 | experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
81 | 
82 | 
83 | You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
84 | 
85 | ```bash
86 | npx @modelcontextprotocol/inspector uv --directory /Users/tmontfort/Brev/repos/brev_mcp run brev-mcp
87 | ```
88 | 
89 | 
90 | Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
```

--------------------------------------------------------------------------------
/src/brev_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
1 | from . import server
2 | import asyncio
3 | 
4 | def main():
5 |     """Main entry point for the package."""
6 |     asyncio.run(server.main())
7 | 
8 | # Optionally expose other important items at package level
9 | __all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "brev-mcp"
 3 | version = "0.1.0"
 4 | description = "A MCP server project"
 5 | readme = "README.md"
 6 | requires-python = ">=3.10"
 7 | dependencies = [
 8 |  "httpx>=0.28.1",
 9 |  "mcp>=1.1.0",
10 |  "protobuf>=5.29.1",
11 | ]
12 | [[project.authors]]
13 | name = "tmonty12"
14 | email = "[email protected]"
15 | 
16 | [build-system]
17 | requires = [ "hatchling",]
18 | build-backend = "hatchling.build"
19 | 
20 | [project.scripts]
21 | brev-mcp = "brev_mcp:main"
22 | 
```

--------------------------------------------------------------------------------
/src/brev_mcp/workspace.py:
--------------------------------------------------------------------------------

```python
 1 | import json
 2 | from .api import create_workspace
 3 | from .models import (
 4 |     CloudProvider,
 5 |     CreateWorkspaceRequest
 6 | )
 7 | 
 8 | async def create_provider_workspace(name: str, cloud_provider: CloudProvider, instance_type: str) -> str:
 9 |     req = CreateWorkspaceRequest(
10 |         name=name,
11 |         workspaceGroupId=cloud_provider.get_workspace_group_id(),
12 |         instanceType=instance_type,
13 |     )
14 |     workspace = await create_workspace(req)
15 |     return json.dumps(workspace.model_dump(), indent=2)
```

--------------------------------------------------------------------------------
/tests/test.py:
--------------------------------------------------------------------------------

```python
 1 | from brev_mcp import server
 2 | 
 3 | async def test_read_resource():
 4 |     instance_types = await server.read_resource("brev://instance-types/crusoe")
 5 |     print(instance_types)
 6 | 
 7 | async def test_get_instance_types_tool():
 8 |     tool_output = await server.call_tool("get_instance_types", {"cloud_provider": "crusoe"})
 9 |     print(tool_output[0].text)
10 | 
11 | async def test_create_workspace_tool():
12 |     tool_output = await server.call_tool("create_workspace", {
13 |         "name": "test-workspace-2",
14 |         "cloud_provider": "crusoe",
15 |         "instance_type": "l40s-48gb.1x"
16 |     })
17 |     print(tool_output[0].text)
18 | 
19 | 
20 | if __name__ == "__main__":
21 |     import asyncio
22 |     asyncio.run(test_get_instance_types_tool())
```

--------------------------------------------------------------------------------
/src/brev_mcp/cli.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | import json
 3 | from .models import CredentialsFile, ActiveOrgFile
 4 | 
 5 | CREDENTIALS_FILEPATH = "~/.brev/credentials.json"
 6 | 
 7 | ACTIVEORG_FILEPATH = "~/.brev/active_org.json"
 8 | 
 9 | def get_acess_token() -> str:
10 |     env_token = os.getenv("BREV_API_TOKEN")
11 |     if env_token:
12 |         return env_token
13 | 
14 |     credentials_path = os.path.expanduser(CREDENTIALS_FILEPATH)
15 | 
16 |     if not os.path.exists(credentials_path):
17 |         raise RuntimeError(f"brev credentials file {CREDENTIALS_FILEPATH} not found")
18 |     
19 |     with open(credentials_path, "r") as f:
20 |         credentials = json.load(f)
21 |         credential_file = CredentialsFile.model_validate(credentials)
22 |         return credential_file.access_token
23 |     
24 | def get_active_org_id() -> str:
25 |     env_org_id = os.getenv("BREV_ORG_ID")
26 |     if env_org_id:
27 |         return env_org_id
28 | 
29 |     activeorg_path = os.path.expanduser(ACTIVEORG_FILEPATH)
30 | 
31 |     if not os.path.exists(activeorg_path):
32 |         raise RuntimeError(f"brev active org file {ACTIVEORG_FILEPATH} not found")
33 |     
34 |     with open(activeorg_path, "r") as f:
35 |         active_org = json.load(f)
36 |         active_org_file = ActiveOrgFile.model_validate(active_org)
37 |         return active_org_file.id
```

--------------------------------------------------------------------------------
/src/brev_mcp/instance_types.py:
--------------------------------------------------------------------------------

```python
 1 | import json
 2 | import logging
 3 | from typing import List
 4 | 
 5 | from .models import (
 6 |     CloudProvider,
 7 |     AllInstanceTypeObj,
 8 |     InstanceType,
 9 | )
10 | from .api import get_instance_types 
11 | 
12 | logging.basicConfig(level=logging.INFO)
13 | logger = logging.getLogger("instance-types")
14 | 
15 | 
16 | async def get_provider_instance_types(provider: CloudProvider)-> str:
17 |     try:
18 |         all_instance_types_obj = await get_instance_types()
19 |         instance_types = filter_instance_types(all_instance_types_obj)
20 |         for cloud_provider, instance_type_list in instance_types.items():
21 |             logger.info(f"Number of instance types for {cloud_provider.value}: {len(instance_type_list)}")
22 |         if provider not in instance_types:
23 |             raise ValueError(f"Provider {provider.value} not found in instance types")
24 |         instance_type_dicts = [
25 |             instance_type.model_dump(exclude_none=True) 
26 |             for instance_type in instance_types[provider]
27 |         ] 
28 |         return json.dumps(instance_type_dicts, indent=2)
29 |     except Exception as e:
30 |         logger.error(f"Error getting instance types: {str(e)}")
31 |         raise RuntimeError(f"Failed to get instance types: {str(e)}")
32 |     
33 | def filter_instance_types(all_instance_types: AllInstanceTypeObj) -> dict[CloudProvider, List[InstanceType]]:
34 |     instance_types: dict[CloudProvider, List[InstanceType]]= {}
35 |     for it_wg in all_instance_types.all_instance_types:
36 |         if len(it_wg.workspace_groups) == 0:
37 |             continue
38 |         cloud_provider = CloudProvider(it_wg.workspace_groups[0].platform_type)
39 |         instance_type_data = it_wg.model_dump(exclude={'workspace_groups'})
40 |         instance_type = InstanceType.model_validate(instance_type_data)
41 | 
42 |         if not instance_type:
43 |             logger.warning(f"Instance type {it_wg.type} has no attributes")
44 |             continue
45 |         if cloud_provider not in instance_types:
46 |             instance_types[cloud_provider] = [instance_type]
47 |         else:
48 |             instance_types[cloud_provider].append(instance_type)
49 |     return instance_types
```

--------------------------------------------------------------------------------
/src/brev_mcp/api.py:
--------------------------------------------------------------------------------

```python
 1 | import httpx
 2 | from pydantic import ValidationError
 3 | 
 4 | from .models import (
 5 |     AllInstanceTypeObj,
 6 |     CreateWorkspaceRequest,
 7 |     Workspace
 8 | )
 9 | from .cli import get_acess_token, get_active_org_id
10 | 
11 | BASE_API_URL = "https://brevapi.us-west-2-prod.control-plane.brev.dev/api"
12 | 
13 | async def get_instance_types() -> AllInstanceTypeObj:
14 |     access_token = get_acess_token() 
15 |     org_id = get_active_org_id()
16 |     try:
17 |         async with httpx.AsyncClient(timeout=httpx.Timeout(25.0)) as client:
18 |             response = await client.get(
19 |                 f"{BASE_API_URL}/instances/alltypesavailable/{org_id}",
20 |                 headers={
21 |                     "Authorization": f"Bearer {access_token}",
22 |                     "Content-Type": "application/json"
23 |                 },
24 |             )
25 |             response.raise_for_status()
26 |             data = response.json()
27 |             all_instance_types_obj = AllInstanceTypeObj.model_validate(data)
28 |             return all_instance_types_obj
29 |     except ValidationError as e:    
30 |         raise RuntimeError(f"Failed to validate instance types: {str(e)}")
31 |     except Exception as e:
32 |         raise RuntimeError(f"Failed to get instance types: {str(e)}")
33 | 
34 | async def create_workspace(request: CreateWorkspaceRequest) -> Workspace:
35 |     access_token = get_acess_token() 
36 |     org_id = get_active_org_id()
37 |     try:
38 |         async with httpx.AsyncClient(timeout=httpx.Timeout(25.0)) as client:
39 |             json = request.model_dump(by_alias=True)
40 |             response = await client.post(
41 |                 f"{BASE_API_URL}/organizations/{org_id}/workspaces",
42 |                 headers={
43 |                     "Authorization": f"Bearer {access_token}",
44 |                     "Content-Type": "application/json"
45 |                 },
46 |                 json=json            
47 |             )
48 |             response.raise_for_status()
49 |             data = response.json()
50 |             workspace = Workspace.model_validate(data)
51 |             return workspace
52 |     except ValidationError as e:    
53 |         raise RuntimeError(f"Failed to validate workspace: {str(e)}")
54 |     except Exception as e:
55 |         raise RuntimeError(f"Failed to create workspace: {str(e)}")
```

--------------------------------------------------------------------------------
/src/brev_mcp/tools.py:
--------------------------------------------------------------------------------

```python
 1 | from mcp.types import (
 2 |     TextContent,
 3 |     Tool
 4 | )
 5 | from .models import CloudProvider, ToolModel
 6 | from .instance_types import get_provider_instance_types
 7 | from .workspace import create_provider_workspace
 8 | 
 9 | 
10 | async def get_instance_types_tool(args: dict[str, str]) -> TextContent:
11 |     if "cloud_provider" not in args:
12 |         raise ValueError("cloud_provider argument is required for get_instance_types tool")
13 | 
14 |     cloud_provider = CloudProvider(args["cloud_provider"])
15 |     instance_types = await get_provider_instance_types(cloud_provider)
16 |     return [
17 |         TextContent(
18 |             type="text", 
19 |             text=instance_types
20 |         )
21 |     ]
22 | 
23 | async def create_workspace_tool(args: dict[str, str]) -> TextContent:
24 |     if "name" not in args or "cloud_provider" not in args or "instance_type" not in args:
25 |         raise ValueError("missing required arguments for create_workspace tool")
26 | 
27 |     cloud_provider = CloudProvider(args["cloud_provider"])
28 |     workspace = await create_provider_workspace(args["name"], cloud_provider, args["instance_type"])
29 |     return [
30 |         TextContent(
31 |             type="text", 
32 |             text=workspace
33 |         )
34 |     ]
35 | 
36 | 
37 | tool_models = {
38 |     "get_instance_types": ToolModel(
39 |         tool=Tool(
40 |             name="get_instance_types",
41 |             description="Get available instances types for a cloud provider",
42 |             inputSchema={
43 |                 "type": "object",
44 |                 "properties": {
45 |                     "cloud_provider": {
46 |                         "description": "The cloud provider to get instance types for",
47 |                         "enum": [provider.value for provider in CloudProvider]
48 |                     }
49 |                 },
50 |                 "required": ["cloud_provider"]
51 |             }
52 |         ),
53 |         call_tool=get_instance_types_tool
54 |     ),
55 |     "create_workspace": ToolModel(
56 |         tool=Tool(
57 |             name="create_workspace",
58 |             description="Create a workspace from an instance type and cloud provider",
59 |             inputSchema={
60 |                 "type": "object",
61 |                 "properties": {
62 |                     "name": {
63 |                         "description": "The name of the workspace",
64 |                         "type": "string",
65 |                     },
66 |                     "cloud_provider": {
67 |                         "description": "The cloud provider for the workspace",
68 |                         "enum": [provider.value for provider in CloudProvider]
69 |                     },
70 |                     "instance_type": {
71 |                         "description": "The instance type of the workspace",
72 |                         "type": "string",
73 |                     }
74 |                 },
75 |                 "required": ["cloud_provider"]
76 |             }
77 |         ),
78 |         call_tool=create_workspace_tool
79 |     )
80 | }
```

--------------------------------------------------------------------------------
/src/brev_mcp/server.py:
--------------------------------------------------------------------------------

```python
 1 | import logging
 2 | from typing import Any
 3 | from collections.abc import Sequence
 4 | from .instance_types import get_instance_types
 5 | from .models import CloudProvider
 6 | from .tools import tool_models
 7 | 
 8 | from mcp.server.models import InitializationOptions
 9 | from mcp.types import (
10 |     Resource,
11 |     Tool,
12 |     TextContent,
13 |     ImageContent,
14 |     EmbeddedResource
15 | )
16 | from mcp.server import NotificationOptions, Server
17 | from pydantic import AnyUrl
18 | import mcp.server.stdio
19 | 
20 | logging.basicConfig(level=logging.INFO)
21 | logger = logging.getLogger("brev-server")
22 | 
23 | app = Server("brev_mcp")
24 | 
25 | @app.list_resources()
26 | async def list_resources() -> list[Resource]:
27 |     """List available Brev resources."""
28 |     return [
29 |         Resource(
30 |             uri=f"brev://instance-types/{provider.value}",
31 |             name=f"{provider.value} Instance Types",
32 |             mimeType="application/json",
33 |             description=f"Available virtual machine instance types for Brev provider {provider.value}",
34 |         )
35 |         for provider in CloudProvider
36 |     ]
37 | 
38 | @app.read_resource()
39 | async def read_resource(uri: AnyUrl) -> str:
40 |     """Read resource content."""
41 |     if str(uri).startswith("brev://instance-types/"):
42 |         provider = CloudProvider(str(uri).split("/")[-1])
43 |     else:
44 |         raise ValueError(f"Unknown resource: {uri}")
45 | 
46 | 
47 |     instance_types = await get_instance_types(provider)
48 |     return instance_types
49 | 
50 | @app.list_tools()
51 | async def list_tools() -> list[Tool]:
52 |     """List available brev tools."""
53 |     return [tool_model.tool for _, tool_model in tool_models.items()]
54 | 
55 | @app.call_tool()
56 | async def call_tool(tool_name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
57 |     """Handle brev tool calls"""
58 |     if tool_name not in tool_models:
59 |         raise ValueError(f"Unknown tool: {tool_name}")
60 | 
61 |     if not isinstance(arguments, dict):
62 |         raise ValueError(f"Invalid {tool_name} arguments")
63 |     
64 |     return await tool_models[tool_name].call_tool(arguments)
65 | 
66 | # TO DO: aws instance types response is too long: result exceeds maximum length of 1048576
67 | # TO DO: should have a tool call that can filter based on providers -> doing a query like "I'll help you find all single H100 instances across the cloud providers."
68 | # requires a bunch of api calls
69 | # TO DO: Error executing code: MCP error -2: Request timed out
70 | # TO DO: handle notifications
71 | async def main():
72 |     # Run the server using stdin/stdout streams
73 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
74 |         await app.run(
75 |             read_stream,
76 |             write_stream,
77 |             InitializationOptions(
78 |                 server_name="brev_mcp",
79 |                 server_version="0.1.0",
80 |                 capabilities=app.get_capabilities(
81 |                     notification_options=NotificationOptions(),
82 |                     experimental_capabilities={},
83 |                 ),
84 |             ),
85 |         )
```

--------------------------------------------------------------------------------
/src/brev_mcp/models.py:
--------------------------------------------------------------------------------

```python
  1 | from typing import Dict, List, Optional, Literal, Callable, Awaitable
  2 | from pydantic import BaseModel, Field
  3 | from datetime import datetime
  4 | from enum import Enum
  5 | from mcp.types import Tool, TextContent
  6 | 
  7 | class Quota(BaseModel):
  8 |     id: Optional[str] = None
  9 |     name: Optional[str] = None
 10 |     maximum: Optional[int] = None
 11 |     current: Optional[int] = None
 12 |     unit: Optional[str] = None
 13 | 
 14 | class InstanceTypeQuota(BaseModel):
 15 |     on_demand: Optional[Quota] = Field(None, alias="onDemand")
 16 |     spot: Optional[Quota] = None
 17 |     reserved: Optional[Quota] = None
 18 | 
 19 |     class Config:
 20 |         populate_by_name = True
 21 | 
 22 | class InstanceTypePrice(BaseModel):
 23 |     instance_type_price_id: Optional[str] = Field(None, alias="instanceTypePriceId")
 24 |     operating_system: Optional[str] = Field(None, alias="operatingSystem")
 25 |     instance_type: Optional[str] = Field(None, alias="instanceType")
 26 |     term_type: Optional[str] = Field(None, alias="termType")
 27 |     term_attributes: Optional[Dict[str, str]] = Field(None, alias="termAttributes")
 28 |     unit: Optional[str] = None
 29 |     price_usd: Optional[str] = Field(None, alias="priceUsd")
 30 |     usage_type: Optional[str] = Field(None, alias="usageType")
 31 | 
 32 |     class Config:
 33 |         populate_by_name = True
 34 | 
 35 | class CurrencyAmount(BaseModel):
 36 |     currency: Optional[str] = None
 37 |     amount: Optional[str] = None
 38 | 
 39 | class Storage(BaseModel):
 40 |     count: Optional[int] = None
 41 |     size: Optional[str] = None
 42 |     type: Optional[str] = None
 43 |     min_size: Optional[str] = Field(None, alias="minSize")
 44 |     max_size: Optional[str] = Field(None, alias="maxSize")
 45 |     price_per_gb_hr: Optional[CurrencyAmount] = Field(None, alias="pricePerGbHr")
 46 | 
 47 |     class Config:
 48 |         populate_by_name = True
 49 | 
 50 | class Gpu(BaseModel):
 51 |     count: Optional[int] = None
 52 |     memory: Optional[str] = None
 53 |     manufacturer: Optional[str] = None
 54 |     name: Optional[str] = None
 55 |     network_details: Optional[str] = Field(None, alias="networkDetails")
 56 |     memory_details: Optional[str] = Field(None, alias="memoryDetails")
 57 | 
 58 |     class Config:
 59 |         populate_by_name = True
 60 | 
 61 | class WorkspaceGroupPlatform(str, Enum):
 62 |     NOOP = "noop"
 63 |     AWS = "aws"
 64 |     DEV_PLANE = "dev-plane"
 65 |     AWS_EC2_SPOT = "aws:ec2:spot"
 66 | 
 67 | class WorkspaceGroupStatus(str, Enum):
 68 |     DEPLOYING = "DEPLOYING"
 69 |     RUNNING = "RUNNING"
 70 |     DEPRECATED = "DEPRECATED"
 71 |     DELETING = "DELETING"
 72 |     FAILURE = "FAILURE"
 73 | 
 74 | class TenantType(str, Enum):
 75 |     SHARED = "shared"
 76 |     ISOLATED = "isolated"
 77 | 
 78 | class Metadata(BaseModel):
 79 |     created_at: Optional[datetime] = Field(None, alias="createdAt")
 80 |     updated_at: Optional[datetime] = Field(None, alias="updatedAt")
 81 |     deleted_at: Optional[datetime] = Field(None, alias="deletedAt")
 82 |     id: Optional[str] = None
 83 |     org_id: Optional[str] = Field(None, alias="orgId")
 84 | 
 85 |     class Config:
 86 |         populate_by_name = True
 87 | 
 88 | class Workspace(BaseModel):
 89 |     # Add workspace fields based on your needs
 90 |     pass
 91 | 
 92 | class WorkspaceGroup(BaseModel):
 93 |     metadata: Optional[Metadata] = None
 94 |     name: Optional[str] = None
 95 |     host: Optional[str] = None  # Assuming uri.Host is a string
 96 |     platform: Optional[WorkspaceGroupPlatform] = None
 97 |     platform_id: Optional[str] = Field(None, alias="platformId")
 98 |     platform_region: Optional[str] = Field(None, alias="platformRegion")
 99 |     platform_type: Optional[str] = Field(None, alias="platformType")
100 |     usable_regions: Optional[List[str]] = Field(None, alias="usableRegions")
101 |     status: Optional[WorkspaceGroupStatus] = None
102 |     workspaces: Optional[List[Workspace]] = None
103 |     tenant_type: Optional[TenantType] = Field(None, alias="tenantType")
104 |     version: Optional[str] = None
105 |     tags: Optional[Dict[str, List[str]]] = None
106 | 
107 |     class Config:
108 |         populate_by_name = True
109 | 
110 | class Location(BaseModel):
111 |     name: Optional[str] = None
112 |     description: Optional[str] = None
113 |     available: Optional[bool] = None
114 |     endpoint: Optional[str] = None
115 |     country: Optional[str] = None
116 | 
117 | class WorkspaceGroupWithLocations(WorkspaceGroup):
118 |     locations: Optional[List[Location]] = None
119 | 
120 | class InstanceType(BaseModel):
121 |     type: Optional[str] = None
122 |     supported_gpus: Optional[List[Gpu]] = Field(None, alias="supportedGpus")
123 |     supported_storage: Optional[List[Storage]] = Field(None, alias="supportedStorage")
124 |     memory: Optional[str] = None
125 |     maximum_network_interfaces: Optional[int] = Field(None, alias="maximumNetworkInterfaces")
126 |     network_performance: Optional[str] = Field(None, alias="networkPerformance")
127 |     supported_num_cores: Optional[List[int]] = Field(None, alias="supportedNumCores")
128 |     default_cores: Optional[int] = Field(None, alias="defaultCores")
129 |     vcpu: Optional[int] = None
130 |     supported_architectures: Optional[List[str]] = Field(None, alias="supportedArchitectures")
131 |     clock_speed_in_ghz: Optional[str] = Field(None, alias="clockSpeedInGhz")
132 |     sub_location: Optional[str] = Field(None, alias="subLocation")
133 |     prices: Optional[List[InstanceTypePrice]] = None
134 |     default_price: Optional[str] = Field(None, alias="defaultPrice")
135 |     elastic_root_volume: Optional[bool] = Field(None, alias="elasticRootVolume")
136 |     supported_usage_classes: Optional[List[str]] = Field(None, alias="supportedUsageClasses")
137 |     quota: Optional[InstanceTypeQuota] = None
138 |     location: Optional[str] = None
139 |     is_available: Optional[bool] = Field(None, alias="isAvailable")
140 |     variable_price: Optional[bool] = Field(None, alias="variablePrice")
141 |     rebootable: Optional[bool] = None
142 |     preemptible: Optional[bool] = None
143 |     base_price: Optional[CurrencyAmount] = Field(None, alias="basePrice")
144 |     sub_location_type_changeable: Optional[bool] = Field(None, alias="subLocationTypeChangeable")
145 |     estimated_deploy_time: Optional[str] = Field(None, alias="estimatedDeployTime")
146 |     user_privilege_escalation_disabled: Optional[bool] = Field(None, alias="userPrivilegeEscalationDisabled")
147 |     not_privileged: Optional[bool] = Field(None, alias="notPrivileged")
148 |     is_container: Optional[bool] = Field(None, alias="isContainer")
149 | 
150 |     class Config:
151 |         populate_by_name = True
152 | 
153 | class InstanceTypeWorkspaceGroup(InstanceType):
154 |     workspace_groups: Optional[List[WorkspaceGroup]] = Field(None, alias="workspaceGroups")
155 | 
156 |     class Config:
157 |         populate_by_name = True
158 | 
159 | 
160 | class WorkspaceGroupError(BaseModel):
161 |     workspace_group: Optional[WorkspaceGroup] = Field(None, alias="workspaceGroup")
162 |     error_message: Optional[str] = Field(None, alias="errorMessage")
163 | 
164 |     class Config:
165 |         populate_by_name = True
166 | 
167 | 
168 | class AllInstanceTypeObj(BaseModel):
169 |     all_instance_types: Optional[List[InstanceTypeWorkspaceGroup]] = Field(None, alias="allInstanceTypes")
170 |     workspace_group_errors: Optional[List[WorkspaceGroupError]] = Field(None, alias="workspaceGroupErrors")
171 | 
172 |     class Config:
173 |         populate_by_name = True
174 | 
175 | Workspace_Group_Ids: dict[str, str] = {
176 |     "aws": "devplane-brev-1",
177 |     "gcp": "GCP",
178 |     "azure": "azure-dgxc-wg",
179 |     "crusoe": "crusoe-brev-wg",
180 |     "lambda-labs": "lambda-labs-test",
181 |     "fluidstack": "FluidStack",
182 |     "launchpad": "launchpad-test-wg",
183 |     "akash": "akash-brev-wg",
184 |     "gcpalpha": "dgxc-gcp",
185 | }
186 | 
187 | class CloudProvider(str, Enum):
188 |     AWS = "aws"
189 |     GCP = "gcp"
190 |     AZURE = "azure"
191 |     CRUSOE = "crusoe"
192 |     LAMBDA_LABS = "lambda-labs"
193 |     FLUIDSTACK = "fluidstack"
194 |     LAUNCHPAD = "launchpad"
195 |     AKASH = "akash"
196 |     GCPALPHA = "gcpalpha"
197 | 
198 | 
199 |     def get_workspace_group_id(self):
200 |         return Workspace_Group_Ids[self.value]
201 | 
202 | DEFAULT_VERB_CONFIG = "build:\n  system_packages: []\n  python_version: '3.10'\n  cuda: 12.0.1\n  python_packages:\n    - jupyterlab\n  run:\n    - sh -c \"$(curl -fsSL https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh)\" \"\" --unattended\nuser:\n  shell: zsh\n  authorized_keys_path: /home/ubuntu/.ssh/authorized_keys\nports:\n  - '2222:22'\nservices:\n  - name: jupyter\n    entrypoint: jupyter-lab --ip=0.0.0.0 --no-browser --NotebookApp.token='' --NotebookApp.password=''\n    ports:\n      - '8888'"
203 | 
204 | class CreateWorkspaceRequest(BaseModel):
205 |     # version: Optional[str] = None
206 |     name: str
207 |     # description: Optional[str] = None
208 |     workspace_group_id: Optional[str] = Field(None, alias="workspaceGroupId")
209 |     # workspace_template_id: Optional[str] = Field(None, alias="workspaceTemplateId")
210 |     # workspace_class: Optional[WorkspaceClassID] = Field(None, alias="workspaceClassId")
211 |     # git_repo: Optional[str] = Field(None, alias="gitRepo")
212 |     # is_stoppable: Optional[bool] = Field(None, alias="isStoppable")
213 |     # tunnel: Optional[WorkspaceTunnel] = None
214 |     # primary_application_id: Optional[str] = Field(None, alias="primaryApplicationId")
215 |     # startup_script: Optional[str] = Field(None, alias="startupScript")
216 |     # startup_script_path: Optional[str] = Field(None, alias="startupScriptPath")
217 |     # ide_config: Optional[ClientConfig] = Field(None, alias="ideConfig")
218 |     # dont_check_ssh_keys: bool = Field(False, alias="dontCheckSSHKeys")
219 |     # repos: ReposV0
220 |     # execs: ExecsV0
221 |     # init_branch: Optional[str] = Field(None, alias="initBranch")
222 |     # dot_brev_path: Optional[str] = Field(None, alias="dotBrevPath")
223 |     # repos_v1: Optional[ReposV1] = Field(None, alias="reposV1")
224 |     # execs_v1: Optional[ExecsV1] = Field(None, alias="execsV1")
225 |     instance_type: Optional[str] = Field(None, alias="instanceType")
226 |     # disk_storage: Optional[str] = Field(None, alias="diskStorage")
227 |     # region: Optional[str] = None
228 |     # image: Optional[str] = None
229 |     # architecture: Optional[Architecture] = None
230 |     # spot: bool = False
231 |     # on_container: bool = Field(False, alias="onContainer")
232 |     # initial_container_image: Optional[str] = Field(None, alias="containerImage")
233 |     verb_yaml: Optional[str] = Field(DEFAULT_VERB_CONFIG, alias="verbYaml")
234 |     # base_image: Optional[str] = Field(None, alias="baseImage")
235 |     # custom_container: Optional[CustomContainer] = Field(None, alias="customContainer")
236 |     # port_mappings: Optional[Dict[str, str]] = Field(None, alias="portMappings")
237 |     workspace_version: Optional[Literal["v1", "v0"]] = Field("v1", alias="workspaceVersion")
238 |     # retry_for: Optional[str] = Field(None, alias="retryFor")
239 |     # vm_only_mode: bool = Field(False, alias="vmOnlyMode")
240 |     # files: Optional[List[FileRequest]] = None
241 |     # labels: Optional[Dict[str, str]] = None
242 |     # launch_jupyter_on_start: bool = Field(False, alias="launchJupyterOnStart")
243 | 
244 |     class Config:
245 |         populate_by_name = True
246 | 
247 | class WorkspaceStatus(str, Enum):
248 |     DEPLOYING = "DEPLOYING"
249 |     STARTING = "STARTING"
250 |     RUNNING = "RUNNING"
251 |     STOPPING = "STOPPING"
252 |     STOPPED = "STOPPED"
253 |     DELETING = "DELETING"
254 |     FAILURE = "FAILURE"
255 | 
256 | class HealthStatus(str, Enum):
257 |     UNSPECIFIED = "" 
258 |     HEALTHY = "HEALTHY"
259 |     UNHEALTHY = "UNHEALTHY"
260 |     UNAVAILABLE = "UNAVAILABLE"
261 | 
262 | class ServiceType(str, Enum):
263 |     SSH = "SSH"
264 |     HTTP = "HTTP"
265 |     HTTPS = "HTTPS"
266 |     TCP = "TCP"
267 |     RDP = "RDP"
268 | 
269 | class WorkspaceCapability(str, Enum):
270 |     STOP_START_INSTANCE = "stop-start-instance"
271 |     AUTOSTOP = "autostop"
272 |     EXPOSE_PUBLIC_PORTS = "expose-public-ports"
273 |     CLONE = "clone"
274 |     RETIRE_VOLUME = "retire-volume"
275 |     MACHINE_IMAGE = "machine-image"
276 |     MODIFY_FIREWALL = "modify-firewall"
277 |     INSTANCE_USER_DATA = "instance-userdata"
278 |     VPC_SUBNETS = "vpc-subnets"
279 |     CONTAINER_CLOUD = "container-cloud"
280 | 
281 | 
282 | class VerbBuildStatus(str, Enum):
283 |     UNSPECIFIED = ""
284 |     CREATE_FAILED = "CREATE_FAILED"
285 |     PENDING = "PENDING"
286 |     BUILDING = "BUILDING"
287 |     COMPLETED = "COMPLETED"
288 | 
289 | class FileType(str, Enum):
290 |     COLAB = "colab"
291 |     NOTEBOOK = "notebook"
292 |     GITHUB = "github"
293 |     GITLAB = "gitlab"
294 | 
295 | class WorkspaceStartStatus(str, Enum):
296 |     UNSPECIFIED = ""
297 |     STARTING = "STARTING"
298 |     FAILURE = "FAILURE"
299 |     STARTED = "STARTED"
300 | 
301 | class WorkspaceVersion(str, Enum):
302 |     UNSPECIFIED = ""
303 |     V0 = "v0"
304 |     V1 = "v1"
305 | 
306 | class WorkspaceApplicationAPIKey(BaseModel):
307 |     enabled: bool
308 |     id: str
309 |     client_id: str = Field(alias="clientID")
310 |     client_secret: str = Field(alias="clientSecret")
311 | 
312 | class WorkspaceApplicationPolicy(BaseModel):
313 |     allowed_user_auth_ids: List[str] = Field(alias="allowedUserAuthIDs")
314 |     allow_everyone: bool = Field(alias="allowEveryone")
315 |     api_key: WorkspaceApplicationAPIKey = Field(alias="apiKey")
316 |     allowed_user_provider_ids: List[str] = Field(alias="allowedUserProviderIDs")
317 | 
318 | class WorkspaceApplication(BaseModel):
319 |     cloudflare_application_id: str = Field(alias="cloudflareApplicationID")
320 |     cloudflare_dns_record_id: str = Field(alias="cloudflareDnsRecordID")
321 |     hostname: str
322 |     name: str
323 |     service_type: ServiceType = Field(alias="serviceType")
324 |     port: int
325 |     application_setup_bash: str = Field(alias="userApplicationSetupBash")
326 |     policy: WorkspaceApplicationPolicy
327 |     health_check_id: str = Field(alias="healthCheckID")
328 | 
329 | class WorkspaceTunnel(BaseModel):
330 |     tunnel_id: str = Field(alias="tunnelID")
331 |     applications: List[WorkspaceApplication]
332 |     tunnel_setup_bash: str = Field(alias="tunnelSetupBash")
333 |     tunnel_status: HealthStatus = Field(alias="tunnelStatus")
334 | 
335 | class Thresholds(BaseModel):
336 |     failure_threshold: int = Field(alias="failureThreshold")
337 |     success_threshold: int = Field(alias="successThreshold")
338 | 
339 | class Timestamp(BaseModel):
340 |     seconds: int
341 |     nanos: int
342 | 
343 | class HealthCheck(BaseModel):
344 |     health_check_id: str = Field(alias="healthCheckId")
345 |     create_time: Optional[Timestamp] = Field(alias="createTime")
346 |     update_time: Optional[Timestamp] = Field(alias="updateTime")
347 |     labels: Dict[str, str]
348 |     status: str
349 |     thresholds: Optional[Thresholds] = None
350 | 
351 | class FileMetadata(BaseModel):
352 |     type: FileType
353 | 
354 | class FileObject(BaseModel):
355 |     url: str
356 |     path: str
357 |     metadata: FileMetadata
358 | 
359 | class ClientConfig(BaseModel):
360 |     # Add fields based on data.ClientConfig
361 |     pass
362 | 
363 | class ReposV0(BaseModel):
364 |     # Add fields based on data.ReposV0
365 |     pass
366 | 
367 | class ExecsV0(BaseModel):
368 |     # Add fields based on data.ExecsV0
369 |     pass
370 | 
371 | class ReposV1(BaseModel):
372 |     # Add fields based on data.ReposV1
373 |     pass
374 | 
375 | class ExecsV1(BaseModel):
376 |     # Add fields based on data.ExecsV1
377 |     pass
378 | 
379 | class CustomContainer(BaseModel):
380 |     # Add fields based on data.CustomContainer
381 |     pass
382 | 
383 | class WorkspaceTemplateJSON(BaseModel):
384 |     # Add fields based on WorkspaceTemplateJSON
385 |     pass
386 | 
387 | class Workspace(BaseModel):
388 |     id: str
389 |     workspace_group_id: str = Field(alias="workspaceGroupId")
390 |     organization_id: str = Field(alias="organizationId")
391 |     name: str
392 |     description: str
393 |     created_by_user_id: str = Field(alias="createdByUserId")
394 |     dns: Optional[str] = None
395 |     password: Optional[str] = None
396 |     workspace_class: str = Field(alias="workspaceClassId")
397 |     git_repo: Optional[str] = Field(None, alias="gitRepo")
398 |     workspace_template: WorkspaceTemplateJSON = Field(alias="workspaceTemplate")
399 |     status: WorkspaceStatus
400 |     status_message: str = Field(alias="statusMessage")
401 |     health_status: HealthStatus = Field(alias="healthStatus")
402 |     last_online_at: str = Field(alias="lastOnlineAt")
403 |     created_at: str = Field(alias="createdAt")
404 |     updated_at: str = Field(alias="updatedAt")
405 |     version: str
406 |     ssh_port: int = Field(alias="sshPort")
407 |     ssh_user: str = Field(alias="sshUser")
408 |     ssh_proxy_hostname: str = Field(alias="sshProxyHostname")
409 |     host_ssh_port: int = Field(alias="hostSshPort")
410 |     host_ssh_user: str = Field(alias="hostSshUser")
411 |     host_ssh_proxy_hostname: str = Field(alias="hostSshProxyHostname")
412 |     on_container: bool = Field(alias="onContainer")
413 |     is_stoppable: bool = Field(alias="isStoppable")
414 |     tunnel: Optional[WorkspaceTunnel] = None
415 |     primary_application_id: Optional[str] = Field(None, alias="primaryApplicationId")
416 |     startup_script: str = Field(alias="startupScript")
417 |     startup_script_path: str = Field(alias="startupScriptPath")
418 |     init_branch: str = Field(alias="initBranch")
419 |     dot_brev_path: str = Field(alias="dotBrevPath")
420 |     network_id: str = Field(alias="networkId")
421 |     ide_config: ClientConfig = Field(alias="ideConfig")
422 |     # repos: ReposV0
423 |     # execs: ExecsV0
424 |     repos_v1: Optional[ReposV1] = Field(None, alias="reposV1")
425 |     execs_v1: Optional[ExecsV1] = Field(None, alias="execsV1")
426 |     stop_timeout: Optional[int] = Field(None, alias="stopTimeout")
427 |     instance_type: str = Field(alias="instanceType")
428 |     disk_storage: str = Field(alias="diskStorage")
429 |     image: str
430 |     region: str
431 |     exposed_ports: List[int] = Field(alias="exposedPorts")
432 |     spot: bool
433 |     workspace_capabilities: List[WorkspaceCapability] = Field(alias="workspaceCapabilities")
434 |     workspace_image_uri: str = Field(alias="workspaceImageUri")
435 |     verb_yaml: str = Field(None, alias="verbYaml")
436 |     verb_build_status: VerbBuildStatus = Field(alias="verbBuildStatus")
437 |     health_checks: Optional[List[HealthCheck]] = Field(alias="healthCheck")
438 |     file_objects: Optional[Dict[str, FileObject]] = Field(None, alias="fileObjects")
439 |     additional_users: Optional[List[str]] = Field(None, alias="additionalUsers")
440 |     base_image: Optional[str] = Field(None, alias="baseImage")
441 |     port_mappings: Optional[Dict[str, str]] = Field(None, alias="portMappings")
442 |     last_start_status: WorkspaceStartStatus = Field(alias="lastStartStatus")
443 |     last_start_status_message: str = Field(alias="lastStartStatusMessage")
444 |     workspace_version: WorkspaceVersion = Field(alias="workspaceVersion")
445 |     vm_only_mode: bool = Field(alias="vmOnlyMode")
446 |     custom_container: Optional[CustomContainer] = Field(None, alias="customContainer")
447 |     instance_type_info: Optional[InstanceType] = Field(None, alias="instanceTypeInfo")
448 | 
449 |     class Config:
450 |         populate_by_name = True
451 | 
452 | class CredentialsFile(BaseModel):
453 |     access_token: str
454 |     refresh_token: str
455 | 
456 | class ActiveOrgFile(BaseModel):
457 |     id: str
458 |     name: str
459 |     userNetworkId: str
460 | 
461 | class ToolModel(BaseModel):
462 |     tool: Tool
463 |     call_tool: Callable[..., Awaitable[TextContent]]
```