This is page 2 of 5. Use http://codebase.md/datalayer/jupyter-mcp-server?page={x} to view the full context. # Directory Structure ``` ├── .dockerignore ├── .github │ ├── copilot-instructions.md │ ├── dependabot.yml │ └── workflows │ ├── build.yml │ ├── fix-license-header.yml │ ├── lint.sh │ ├── prep-release.yml │ ├── publish-release.yml │ └── test.yml ├── .gitignore ├── .licenserc.yaml ├── .pre-commit-config.yaml ├── .vscode │ ├── mcp.json │ └── settings.json ├── ARCHITECTURE.md ├── CHANGELOG.md ├── CODE_OF_CONDUCT.md ├── CONTRIBUTING.md ├── dev │ ├── content │ │ ├── new.ipynb │ │ ├── notebook.ipynb │ │ └── README.md │ └── README.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .yarnrc.yml │ ├── babel.config.js │ ├── docs │ │ ├── _category_.yaml │ │ ├── clients │ │ │ ├── _category_.yaml │ │ │ ├── claude_desktop │ │ │ │ ├── _category_.yaml │ │ │ │ └── index.mdx │ │ │ ├── cline │ │ │ │ ├── _category_.yaml │ │ │ │ └── index.mdx │ │ │ ├── cursor │ │ │ │ ├── _category_.yaml │ │ │ │ └── index.mdx │ │ │ ├── index.mdx │ │ │ ├── vscode │ │ │ │ ├── _category_.yaml │ │ │ │ └── index.mdx │ │ │ └── windsurf │ │ │ ├── _category_.yaml │ │ │ └── index.mdx │ │ ├── configure │ │ │ ├── _category_.yaml │ │ │ └── index.mdx │ │ ├── contribute │ │ │ ├── _category_.yaml │ │ │ └── index.mdx │ │ ├── deployment │ │ │ ├── _category_.yaml │ │ │ ├── datalayer │ │ │ │ ├── _category_.yaml │ │ │ │ └── streamable-http │ │ │ │ └── index.mdx │ │ │ ├── index.mdx │ │ │ └── jupyter │ │ │ ├── _category_.yaml │ │ │ ├── index.mdx │ │ │ ├── stdio │ │ │ │ ├── _category_.yaml │ │ │ │ └── index.mdx │ │ │ └── streamable-http │ │ │ ├── _category_.yaml │ │ │ ├── jupyter-extension │ │ │ │ └── index.mdx │ │ │ └── standalone │ │ │ └── index.mdx │ │ ├── index.mdx │ │ ├── releases │ │ │ ├── _category_.yaml │ │ │ └── index.mdx │ │ ├── resources │ │ │ ├── _category_.yaml │ │ │ └── index.mdx │ │ └── tools │ │ ├── _category_.yaml │ │ └── index.mdx │ ├── docusaurus.config.js │ ├── LICENSE │ ├── Makefile │ ├── package.json │ ├── README.md │ ├── sidebars.js │ ├── src │ │ ├── components │ │ │ ├── HomepageFeatures.js │ │ │ ├── HomepageFeatures.module.css │ │ │ ├── HomepageProducts.js │ │ │ └── HomepageProducts.module.css │ │ ├── css │ │ │ └── custom.css │ │ ├── pages │ │ │ ├── index.module.css │ │ │ ├── markdown-page.md │ │ │ └── testimonials.tsx │ │ └── theme │ │ └── CustomDocItem.tsx │ └── static │ └── img │ ├── datalayer │ │ ├── logo.png │ │ └── logo.svg │ ├── favicon.ico │ ├── feature_1.svg │ ├── feature_2.svg │ ├── feature_3.svg │ ├── product_1.svg │ ├── product_2.svg │ └── product_3.svg ├── examples │ └── integration_example.py ├── jupyter_mcp_server │ ├── __init__.py │ ├── __main__.py │ ├── __version__.py │ ├── config.py │ ├── enroll.py │ ├── env.py │ ├── jupyter_extension │ │ ├── __init__.py │ │ ├── backends │ │ │ ├── __init__.py │ │ │ ├── base.py │ │ │ ├── local_backend.py │ │ │ └── remote_backend.py │ │ ├── context.py │ │ ├── extension.py │ │ ├── handlers.py │ │ └── protocol │ │ ├── __init__.py │ │ └── messages.py │ ├── models.py │ ├── notebook_manager.py │ ├── server_modes.py │ ├── server.py │ ├── tools │ │ ├── __init__.py │ │ ├── _base.py │ │ ├── _registry.py │ │ ├── assign_kernel_to_notebook_tool.py │ │ ├── delete_cell_tool.py │ │ ├── execute_cell_tool.py │ │ ├── execute_ipython_tool.py │ │ ├── insert_cell_tool.py │ │ ├── insert_execute_code_cell_tool.py │ │ ├── list_cells_tool.py │ │ ├── list_files_tool.py │ │ ├── list_kernels_tool.py │ │ ├── list_notebooks_tool.py │ │ ├── overwrite_cell_source_tool.py │ │ ├── read_cell_tool.py │ │ ├── read_cells_tool.py │ │ ├── restart_notebook_tool.py │ │ ├── unuse_notebook_tool.py │ │ └── use_notebook_tool.py │ └── utils.py ├── jupyter-config │ ├── jupyter_notebook_config │ │ └── jupyter_mcp_server.json │ └── jupyter_server_config.d │ └── jupyter_mcp_server.json ├── LICENSE ├── Makefile ├── pyproject.toml ├── pytest.ini ├── README.md ├── RELEASE.md ├── smithery.yaml └── tests ├── __init__.py ├── conftest.py ├── test_common.py ├── test_config.py ├── test_jupyter_extension.py ├── test_list_kernels.py ├── test_tools.py └── test_use_notebook.py ``` # Files -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/list_cells_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """List cells tool implementation.""" from typing import Any, Optional from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager from jupyter_mcp_server.config import get_config from jupyter_nbmodel_client import NbModelClient from jupyter_mcp_server.utils import normalize_cell_source, format_TSV class ListCellsTool(BaseTool): """Tool to list basic information of all cells.""" @property def name(self) -> str: return "list_cells" @property def description(self) -> str: return """List the basic information of all cells in the notebook. Returns a formatted table showing the index, type, execution count (for code cells), and first line of each cell. This provides a quick overview of the notebook structure and is useful for locating specific cells for operations like delete or insert. Returns: str: Formatted table with cell information (Index, Type, Count, First Line)""" async def _list_cells_local(self, contents_manager: Any, path: str) -> str: """List cells using local contents_manager (JUPYTER_SERVER mode).""" # Read the notebook file directly model = await contents_manager.get(path, content=True, type='notebook') if 'content' not in model: raise ValueError(f"Could not read notebook content from {path}") notebook_content = model['content'] cells = notebook_content.get('cells', []) # Format the cells into a table headers = ["Index", "Type", "Count", "First Line"] rows = [] for idx, cell in enumerate(cells): cell_type = cell.get('cell_type', 'unknown') execution_count = cell.get('execution_count', '-') if cell_type == 'code' else '-' # Get the first line of source source = cell.get('source', '') if isinstance(source, list): first_line = source[0] if source else '' lines = len(source) else: first_line = source.split('\n')[0] if source else '' lines = len(source.split('\n')) if lines > 1: first_line += f"...({lines - 1} lines hidden)" rows.append([idx, cell_type, execution_count, first_line]) return format_TSV(headers, rows) def _list_cells_websocket(self, notebook: NbModelClient) -> str: """List cells using WebSocket connection (MCP_SERVER mode).""" total_cells = len(notebook) if total_cells == 0: return "Notebook is empty, no cells found." # Create header headers = ["Index", "Type", "Count", "First Line"] rows = [] # Process each cell for i in range(total_cells): cell_data = notebook[i] cell_type = cell_data.get("cell_type", "unknown") # Get execution count for code cells execution_count = (cell_data.get("execution_count") or "None") if cell_type == "code" else "N/A" # Get first line of source source_lines = normalize_cell_source(cell_data.get("source", "")) first_line = source_lines[0] if source_lines else "" if len(source_lines) > 1: first_line += f"...({len(source_lines) - 1} lines hidden)" # Add to table rows.append([i, cell_type, execution_count, first_line]) return format_TSV(headers, rows) async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, **kwargs ) -> str: """Execute the list_cells tool. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) contents_manager: Direct API access for JUPYTER_SERVER mode notebook_manager: Notebook manager instance **kwargs: Additional parameters Returns: Formatted table with cell information """ if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # Local mode: read notebook directly from file system from jupyter_mcp_server.jupyter_extension.context import get_server_context from pathlib import Path context = get_server_context() serverapp = context.serverapp # Get current notebook path from notebook_manager if available, else use config notebook_path = None if notebook_manager: notebook_path = notebook_manager.get_current_notebook_path() if not notebook_path: config = get_config() notebook_path = config.document_id # contents_manager expects path relative to serverapp.root_dir # If we have an absolute path, convert it to relative if serverapp and Path(notebook_path).is_absolute(): root_dir = Path(serverapp.root_dir) abs_path = Path(notebook_path) try: notebook_path = str(abs_path.relative_to(root_dir)) except ValueError: # Path is not under root_dir, use as-is pass return await self._list_cells_local(contents_manager, notebook_path) elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: # Remote mode: use WebSocket connection to Y.js document async with notebook_manager.get_current_connection() as notebook: return self._list_cells_websocket(notebook) else: raise ValueError(f"Invalid mode or missing required clients: mode={mode}") ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/assign_kernel_to_notebook_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Assign kernel to notebook tool implementation.""" from typing import Any, Optional from jupyter_server_api import JupyterServerClient, NotFoundError from jupyter_mcp_server.tools._base import BaseTool, ServerMode class AssignKernelToNotebookTool(BaseTool): """Tool to assign a kernel to a notebook by creating a Jupyter session.""" @property def name(self) -> str: return "assign_kernel_to_notebook" @property def description(self) -> str: return """Assign a kernel to a notebook by creating a Jupyter session. This creates a Jupyter server session that connects a notebook file to a kernel, enabling code execution in the notebook. Sessions are the mechanism Jupyter uses to maintain the relationship between notebooks and their kernels. Args: notebook_path: Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb") kernel_id: ID of the kernel to assign to the notebook session_name: Optional name for the session (defaults to notebook path) Returns: str: Success message with session information including session ID""" async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, contents_manager: Optional[Any] = None, session_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, # Tool-specific parameters notebook_path: str = None, kernel_id: str = None, session_name: Optional[str] = None, **kwargs ) -> str: """Execute the assign_kernel_to_notebook tool. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: HTTP client for MCP_SERVER mode contents_manager: Direct API access for JUPYTER_SERVER mode session_manager: Session manager for JUPYTER_SERVER mode kernel_manager: Kernel manager for validation notebook_path: Path to the notebook file kernel_id: ID of the kernel to assign session_name: Optional session name **kwargs: Additional parameters Returns: Success message with session information """ if not notebook_path: return "Error: notebook_path is required" if not kernel_id: return "Error: kernel_id is required" # Use notebook_path as session name if not provided if not session_name: session_name = notebook_path # Verify notebook exists try: if mode == ServerMode.MCP_SERVER and server_client is not None: # Check notebook exists using HTTP API try: # FIXED: contents.get_file -> contents.get server_client.contents.get(notebook_path) except NotFoundError: return f"Error: Notebook '{notebook_path}' not found on Jupyter server" elif mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # Check notebook exists using local API try: await contents_manager.get(notebook_path, content=False) except Exception as e: return f"Error: Notebook '{notebook_path}' not found: {e}" else: return f"Error: Invalid mode or missing required clients: mode={mode}" except Exception as e: return f"Error checking notebook: {e}" # Verify kernel exists try: if mode == ServerMode.MCP_SERVER and server_client is not None: # Check kernel exists using HTTP API kernels = server_client.kernels.list_kernels() kernel_exists = any(kernel.id == kernel_id for kernel in kernels) if not kernel_exists: return f"Error: Kernel '{kernel_id}' not found on Jupyter server" elif mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None: # Check kernel exists using local API if kernel_id not in kernel_manager: return f"Error: Kernel '{kernel_id}' not found in local kernel manager" else: return f"Error: Invalid mode or missing kernel manager: mode={mode}" except Exception as e: return f"Error checking kernel: {e}" # Create the session try: if mode == ServerMode.MCP_SERVER and server_client is not None: # Create session using HTTP API session = server_client.sessions.create_session( path=notebook_path, kernel={"id": kernel_id}, session_type="notebook", name=session_name ) return ( f"Successfully created session '{session.id}' for notebook '{notebook_path}' " f"with kernel '{kernel_id}'. The notebook is now connected to the kernel." ) elif mode == ServerMode.JUPYTER_SERVER and session_manager is not None: # Create session using local API # The session_manager API varies, but typically follows similar pattern import asyncio # Create session dict with required parameters session_dict = await asyncio.to_thread( session_manager.create_session, path=notebook_path, kernel_id=kernel_id, type="notebook", name=session_name ) session_id = session_dict.get("id", "unknown") return ( f"Successfully created session '{session_id}' for notebook '{notebook_path}' " f"with kernel '{kernel_id}'. The notebook is now connected to the kernel." ) else: return f"Error: Invalid mode or missing session manager: mode={mode}" except Exception as e: return f"Error creating session: {e}" ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/jupyter_extension/protocol/messages.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ MCP Protocol Messages Pydantic models for MCP protocol requests and responses to ensure consistent API across both MCP_SERVER and JUPYTER_SERVER modes. """ from typing import Any, Optional, Union, Literal from pydantic import BaseModel, Field from mcp.types import ImageContent # Tool execution models class ToolRequest(BaseModel): """Request to execute a tool""" tool_name: str = Field(..., description="Name of the tool to execute") arguments: dict[str, Any] = Field(default_factory=dict, description="Tool arguments") context: Optional[dict[str, Any]] = Field(None, description="Execution context") class ToolResponse(BaseModel): """Response from tool execution""" success: bool = Field(..., description="Whether execution was successful") result: Any = Field(None, description="Tool execution result") error: Optional[str] = Field(None, description="Error message if execution failed") # Notebook operation models class NotebookContentRequest(BaseModel): """Request to retrieve notebook content""" path: str = Field(..., description="Path to the notebook file") include_outputs: bool = Field(True, description="Include cell outputs") class NotebookContentResponse(BaseModel): """Response containing notebook content""" path: str = Field(..., description="Notebook path") cells: list[dict[str, Any]] = Field(..., description="List of cells") metadata: dict[str, Any] = Field(default_factory=dict, description="Notebook metadata") class NotebookListRequest(BaseModel): """Request to list notebooks""" path: Optional[str] = Field("", description="Directory path to search") recursive: bool = Field(True, description="Search recursively") class NotebookListResponse(BaseModel): """Response containing list of notebooks""" notebooks: list[str] = Field(..., description="List of notebook paths") # Cell operation models class ReadCellsRequest(BaseModel): """Request to read cells from a notebook""" path: Optional[str] = Field(None, description="Notebook path (uses current if not specified)") start_index: Optional[int] = Field(None, description="Start cell index") end_index: Optional[int] = Field(None, description="End cell index") class ReadCellsResponse(BaseModel): """Response containing cell information""" cells: list[dict[str, Any]] = Field(..., description="List of cell information") class AppendCellRequest(BaseModel): """Request to append a cell""" path: Optional[str] = Field(None, description="Notebook path") cell_type: Literal["code", "markdown"] = Field(..., description="Cell type") source: Union[str, list[str]] = Field(..., description="Cell source") class AppendCellResponse(BaseModel): """Response after appending a cell""" cell_index: int = Field(..., description="Index of the appended cell") message: str = Field(..., description="Success message") class InsertCellRequest(BaseModel): """Request to insert a cell""" path: Optional[str] = Field(None, description="Notebook path") cell_index: int = Field(..., description="Index where to insert") cell_type: Literal["code", "markdown"] = Field(..., description="Cell type") source: Union[str, list[str]] = Field(..., description="Cell source") class InsertCellResponse(BaseModel): """Response after inserting a cell""" cell_index: int = Field(..., description="Index of the inserted cell") message: str = Field(..., description="Success message") class DeleteCellRequest(BaseModel): """Request to delete a cell""" path: Optional[str] = Field(None, description="Notebook path") cell_index: int = Field(..., description="Index of cell to delete") class DeleteCellResponse(BaseModel): """Response after deleting a cell""" message: str = Field(..., description="Success message") class OverwriteCellRequest(BaseModel): """Request to overwrite a cell""" path: Optional[str] = Field(None, description="Notebook path") cell_index: int = Field(..., description="Index of cell to overwrite") new_source: Union[str, list[str]] = Field(..., description="New cell source") class OverwriteCellResponse(BaseModel): """Response after overwriting a cell""" message: str = Field(..., description="Success message with diff") # Cell execution models class ExecuteCellRequest(BaseModel): """Request to execute a cell""" path: Optional[str] = Field(None, description="Notebook path") cell_index: int = Field(..., description="Index of cell to execute") timeout_seconds: int = Field(300, description="Execution timeout in seconds") class ExecuteCellResponse(BaseModel): """Response after executing a cell""" cell_index: int = Field(..., description="Executed cell index") outputs: list[Union[str, ImageContent]] = Field(..., description="Cell outputs") execution_count: Optional[int] = Field(None, description="Execution count") status: Literal["success", "error", "timeout"] = Field(..., description="Execution status") # Kernel operation models class ConnectNotebookRequest(BaseModel): """Request to connect to a notebook""" notebook_name: str = Field(..., description="Unique notebook identifier") notebook_path: str = Field(..., description="Path to notebook file") mode: Literal["connect", "create"] = Field("connect", description="Connection mode") kernel_id: Optional[str] = Field(None, description="Specific kernel ID") class ConnectNotebookResponse(BaseModel): """Response after connecting to notebook""" message: str = Field(..., description="Success message") notebook_name: str = Field(..., description="Notebook identifier") notebook_path: str = Field(..., description="Notebook path") class UnuseNotebookRequest(BaseModel): """Request to unuse from a notebook""" notebook_name: str = Field(..., description="Notebook identifier to disconnect") class UnuseNotebookResponse(BaseModel): """Response after disconnecting""" message: str = Field(..., description="Success message") class RestartNotebookRequest(BaseModel): """Request to restart a notebook kernel""" notebook_name: str = Field(..., description="Notebook identifier to restart") class RestartNotebookResponse(BaseModel): """Response after restarting kernel""" message: str = Field(..., description="Success message") notebook_name: str = Field(..., description="Notebook identifier") ``` -------------------------------------------------------------------------------- /docs/static/img/feature_1.svg: -------------------------------------------------------------------------------- ``` <?xml version="1.0" encoding="UTF-8" standalone="no"?> <!-- ~ Copyright (c) 2023-2024 Datalayer, Inc. ~ ~ BSD 3-Clause License --> <svg xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:cc="http://creativecommons.org/ns#" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:svg="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg" xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd" xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape" viewBox="0 0 143.86 320.16998" version="1.1" id="svg1038" sodipodi:docname="feature_1.svg" inkscape:version="1.0.1 (c497b03c, 2020-09-10)" width="143.86" height="320.16998"> <metadata id="metadata1042"> <rdf:RDF> <cc:Work rdf:about=""> <dc:format>image/svg+xml</dc:format> <dc:type rdf:resource="http://purl.org/dc/dcmitype/StillImage" /> <dc:title>Startup_SVG</dc:title> </cc:Work> </rdf:RDF> </metadata> <sodipodi:namedview pagecolor="#ffffff" bordercolor="#666666" borderopacity="1" objecttolerance="10" gridtolerance="10" guidetolerance="10" inkscape:pageopacity="0" inkscape:pageshadow="2" inkscape:window-width="1440" inkscape:window-height="717" id="namedview1040" showgrid="false" inkscape:zoom="1.0226025" inkscape:cx="117.68707" inkscape:cy="153.04271" inkscape:window-x="0" inkscape:window-y="25" inkscape:window-maximized="0" inkscape:current-layer="svg1038" inkscape:document-rotation="0" fit-margin-top="0" fit-margin-left="0" fit-margin-right="0" fit-margin-bottom="0" /> <defs id="defs835"> <style id="style833">.cls-1,.cls-11,.cls-9{fill:#d6d8e5;}.cls-1{opacity:0.15;}.cls-2,.cls-3{fill:#edeff9;}.cls-2{opacity:0.5;}.cls-4{fill:#ffbc00;}.cls-5{fill:#8c50ff;}.cls-6{fill:#424956;}.cls-7{fill:#494949;}.cls-8{fill:#2b303f;}.cls-9{opacity:0.4;}.cls-10{fill:#b1b4c4;}.cls-12{fill:#9ea1af;}.cls-13{fill:#c4c7d6;}.cls-14{fill:#e4e7f2;}.cls-15{fill:#fff;}.cls-16{fill:#e9eaf2;}.cls-17{fill:#f5f6ff;}.cls-18,.cls-19{fill:none;stroke:#e9eaf2;stroke-miterlimit:10;}.cls-18{stroke-width:0.31px;}.cls-19{stroke-width:1.53px;}.cls-20{fill:#edf0f9;}.cls-21{fill:#e2e5f2;}.cls-22{fill:#6e48e5;}.cls-23{fill:#5e42d3;}.cls-24{fill:#ffcea9;}.cls-25{fill:#ededed;}.cls-26{fill:#38226d;}.cls-27{fill:#9c73ff;}.cls-28{fill:#f4f4f4;}.cls-29{fill:#3a2c6d;}.cls-30{isolation:isolate;}</style> </defs> <title id="title837">Startup_SVG</title> <ellipse class="cls-1" cx="71.93" cy="278.63998" rx="71.93" ry="41.529999" id="ellipse839" /> <ellipse class="cls-2" cx="71.099998" cy="274.19998" rx="40.119999" ry="23.16" id="ellipse841" /> <ellipse class="cls-3" cx="70.719994" cy="265.97998" rx="11.44" ry="6.6100001" id="ellipse843" /> <rect class="cls-3" x="60.099998" y="178.34999" width="22" height="86.459999" id="rect845" /> <path class="cls-4" d="m 59.05,177.37 c 0,0 0.6,17.78 13.25,49.67 0,0 11.78,-25.68 13,-50.21 1.22,-24.53 -26.25,0.54 -26.25,0.54 z" id="path847" /> <polygon class="cls-5" points="10.85,219.48 46.25,201.73 52.69,156.37 43.6,153.81 14.33,182.29 " id="polygon849" transform="translate(-4.1748046e-7,-43.38)" /> <polygon class="cls-5" points="133.75,219.48 98.22,200.5 91.91,156.37 101,153.81 130.26,182.29 " id="polygon851" transform="translate(-4.1748046e-7,-43.38)" /> <ellipse class="cls-6" cx="71.93" cy="173.46001" rx="14.55" ry="8.3999996" id="ellipse853" /> <polygon class="cls-7" points="93.94,208.72 49.87,208.44 49.87,200.88 93.94,200.88 " id="polygon855" transform="translate(-4.1748046e-7,-43.38)" /> <ellipse class="cls-8" cx="71.93" cy="165.05" rx="22.059999" ry="12.74" id="ellipse857" /> <ellipse class="cls-3" cx="71.859993" cy="153.58002" rx="26.92" ry="15.54" id="ellipse859" /> <path class="cls-3" d="m 100.93,125.9 v 0 c 0,1.25 0,2.48 0,3.67 0,1 0,2 0,3 0,1.35 -0.07,2.64 -0.12,3.9 0,1.57 -0.11,3.06 -0.19,4.49 -0.08,1.43 -0.15,2.83 -0.25,4.12 -0.41,5.65 -0.4,9.81 -1.1,11.44 -3.81,8.91 -16.7,4 -23.06,0.8 -2.35,-1.18 -3.9,-2.15 -3.9,-2.15 h -0.58 c 0,0 -1.84,1 -4.56,2.08 -6.67,2.7 -18.56,7.49 -21.86,-0.76 -0.59,-1.48 -1.18,-6.37 -1.54,-11.49 -0.09,-1.28 -0.17,-2.65 -0.25,-4.11 -0.08,-1.46 -0.12,-2.91 -0.18,-4.48 0,-1.25 -0.07,-2.54 -0.1,-3.88 0,-1 0,-2 0,-3 v 0 -3.67 0 C 43.06,90.969997 46.76,35.179997 58.98,11.369997 c 3.53,-6.89 7.77,-11.1 12.84,-11.349997 v 0 h 0.58 v 0 c 5,0.259997 9.29,4.449997 12.83,11.309997 12.05,23.72 15.92,79.29 15.7,114.570003 z" id="path861" /> <circle class="cls-8" cx="71.999992" cy="82.829994" r="10.17" id="circle863" /> <path class="cls-6" d="m 72,94.489997 a 11.66,11.66 0 1 1 11.66,-11.66 11.67,11.67 0 0 1 -11.66,11.66 z m 0,-20.34 a 8.68,8.68 0 1 0 8.67,8.68 8.69,8.69 0 0 0 -8.67,-8.68 z" id="path865" /> <circle class="cls-8" cx="71.999992" cy="52.930004" r="10.17" id="circle867" /> <path class="cls-6" d="m 72,64.619997 a 11.67,11.67 0 1 1 11.66,-11.69 11.68,11.68 0 0 1 -11.66,11.69 z m 0,-20.34 a 8.68,8.68 0 1 0 8.67,8.67 8.68,8.68 0 0 0 -8.67,-8.69 z" id="path869" /> <path class="cls-5" d="m 100.93,146.01 c 0,1.25 0,2.48 0,3.68 -1,3.3 -3.72,6.46 -8.14,9 -11.46,6.61 -30,6.61 -41.51,0 -4.42,-2.56 -7.13,-5.73 -8.14,-9 v 0 -3.67 c 1,3.36 3.7,6.56 8.18,9.15 11.47,6.61 30.05,6.61 41.51,0 4.39,-2.62 7.17,-5.81 8.1,-9.16 z" id="path871" /> <path class="cls-5" d="m 100.84,151.83 c 0,1.34 -0.07,2.64 -0.12,3.89 -1.12,3.12 -3.78,6.09 -8,8.51 -11.46,6.62 -30,6.62 -41.51,0 -4.21,-2.44 -6.87,-5.43 -8,-8.57 0,-1.25 -0.07,-2.54 -0.1,-3.87 1,3.26 3.73,6.36 8.09,8.88 11.47,6.62 30.05,6.62 41.51,0 4.38,-2.51 7.08,-5.6 8.13,-8.84 z" id="path873" /> <path class="cls-5" d="m 100.53,141.5 c -1.21,2.93 -3.81,5.72 -7.78,8 -11.46,6.61 -30,6.61 -41.51,0 -4,-2.32 -6.6,-5.12 -7.81,-8.08 0.08,1.46 0.16,2.83 0.25,4.11 1.29,2.76 3.81,5.36 7.56,7.53 a 39.77,39.77 0 0 0 15.84,4.72 50.54,50.54 0 0 0 9,0.07 40.14,40.14 0 0 0 16.63,-4.79 c 3.72,-2.15 6.23,-4.72 7.53,-7.45 z" id="path875" /> <path class="cls-5" d="m 85.05,11.349997 a 44.73,44.73 0 0 1 -26.25,0 C 62.33,4.459997 66.57,0.249997 71.64,0 v 0 h 0.58 v 0 c 5.05,0.299997 9.29,4.489997 12.83,11.349997 z" id="path877" /> <rect class="cls-5" x="69.799995" y="110.51" width="4.9899998" height="65.510002" id="rect879" /> </svg> ``` -------------------------------------------------------------------------------- /docs/docs/tools/index.mdx: -------------------------------------------------------------------------------- ```markdown # Tools The server currently offers 16 tools organized into 3 categories: ## Server Management Tools (3 tools) #### 1. `list_files` - List all files and directories in the Jupyter server's file system. - This tool recursively lists files and directories from the Jupyter server's content API, showing the complete file structure including notebooks, data files, scripts, and directories. - Input: - `path`(string, optional): The starting path to list from (empty string means root directory) - `max_depth`(int, optional): Maximum depth to recurse into subdirectories (default: 3) - Returns: Tab-separated table with columns: Path, Type, Size, Last_Modified - **Path**: Full path to the file or directory - **Type**: File type ("file", "directory", "notebook", or "error" if inaccessible) - **Size**: File size formatted as B, KB, or MB (empty for directories) - **Last_Modified**: Last modification timestamp in YYYY-MM-DD HH:MM:SS format #### 2. `list_kernels` - List all available kernels in the Jupyter server. - This tool shows all running and available kernel sessions on the Jupyter server, including their IDs, names, states, connection information, and kernel specifications. Useful for monitoring kernel resources and identifying specific kernels for connection. - Input: None - Returns: Tab-separated table with columns: ID, Name, Display_Name, Language, State, Connections, Last_Activity, Environment - **ID**: Unique kernel identifier - **Name**: Kernel name/type (e.g., "python3", "ir", etc.) - **Display_Name**: Human-readable kernel name from kernel spec - **Language**: Programming language supported by the kernel - **State**: Current execution state ("idle", "busy", "unknown") - **Connections**: Number of active connections to this kernel - **Last_Activity**: Timestamp of last kernel activity in YYYY-MM-DD HH:MM:SS format - **Environment**: Environment variables defined in the kernel spec (truncated if long) #### 3. `assign_kernel_to_notebook` - Assign a kernel to a notebook by creating a Jupyter session. - This creates a Jupyter server session that connects a notebook file to a kernel, enabling code execution in the notebook. Sessions are the mechanism Jupyter uses to maintain the relationship between notebooks and their kernels. - Input: - `notebook_path`(string): Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb") - `kernel_id`(string): ID of the kernel to assign to the notebook - `session_name`(string, optional): Optional name for the session (defaults to notebook path) - Returns: Success message with session information including session ID ## Multi-Notebook Management Tools (4 tools) #### 4. `use_notebook` - Connect to a notebook file or create a new one. - Input: - `notebook_name`(string): Unique identifier for the notebook - `notebook_path`(string, optional): Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb"). If not provided, switches to an already-connected notebook with the given name. - `mode`(string): "connect" to connect to existing, "create" to create new (default: "connect") - `kernel_id`(string, optional): Specific kernel ID to use (optional, will create new if not provided) - Returns: Success message with notebook information #### 5. `list_notebooks` - List all notebooks in the Jupyter server (including subdirectories) and show which ones are managed by the notebook manager. To interact with a notebook, it has to be "managed". If a notebook is not managed, you can connect to it using the `use_notebook` tool. - Input: None - Returns: TSV formatted table with notebook information (Path, Managed, Name, Status, Current) - **Path**: Relative path to the notebook file in the Jupyter server - **Managed**: "Yes" if the notebook is currently managed by the MCP server, "No" otherwise - **Name**: Unique identifier for managed notebooks, "-" for unmanaged notebooks - **Status**: Kernel status for managed notebooks ("alive", "dead", etc.), "-" for unmanaged notebooks - **Current**: "✓" if this is the currently active managed notebook, empty otherwise #### 6. `restart_notebook` - Restart the kernel for a specific notebook. - Input: - `notebook_name`(string): Notebook identifier to restart - Returns: Success message #### 7. `unuse_notebook` - Unuse from a specific notebook and release its resources. - Input: - `notebook_name`(string): Notebook identifier to disconnect - Returns: Success message ## Cell Tools (9 tools) #### 8. `insert_cell` - Insert a cell to specified position with unified API. - Input: - `cell_index`(int): Target index for insertion (0-based). Use -1 to append at end. - `cell_type`(string): Type of cell to insert ("code" or "markdown"). - `cell_source`(string): Source content for the cell. - Returns: Success message and the structure of its surrounding cells (up to 5 cells above and 5 cells below). #### 9. `insert_execute_code_cell` - Insert and execute a code cell in a Jupyter notebook. - Input: - `cell_index`(int): Index of the cell to insert (0-based). Use -1 to append at end and execute. - `cell_source`(string): Code source. - Returns: List of outputs from the executed cell (supports multimodal output including images). #### 10. `delete_cell` - Delete a specific cell from the notebook. - Input: - `cell_index`(int): Index of the cell to delete (0-based). - Returns: Success message. #### 11. `read_cell` - Read a specific cell from the notebook. - Input: - `cell_index`(int): Index of the cell to read (0-based). - Returns: Dictionary with cell index, type, source, and outputs (for code cells). #### 12. `read_cells` - Read all cells from the notebook. - Returns: List of cell information including index, type, source, and outputs (for code cells). #### 13. `list_cells` - List the basic information of all cells in the notebook. - Returns a formatted table showing the index, type, execution count (for code cells), and first line of each cell. - Provides a quick overview of the notebook structure and is useful for locating specific cells for operations. - Input: None - Returns: Formatted table string with cell information (Index, Type, Count, First Line). #### 14. `overwrite_cell_source` - Overwrite the source of an existing cell. - Input: - `cell_index`(int): Index of the cell to overwrite (0-based). - `cell_source`(string): New cell source - must match existing cell type. - Returns: Success message and diff style. #### 15. `execute_cell` - Execute a cell with configurable timeout and optional streaming progress updates. - Input: - `cell_index`: Index of the cell to execute (0-based) - `timeout_seconds`: Maximum time to wait for execution (default: 300s) - `stream`: Enable streaming progress updates for long-running cells (default: False) - `progress_interval`: Seconds between progress updates when stream=True (default: 5s) - Returns: - `list[Union[str, ImageContent]]`: List of outputs from the executed cell (supports multimodal output including images) - Use `stream=False` for short-running cells (more reliable) - Use `stream=True` for long-running cells (provides real-time feedback) #### 16. `execute_ipython` - Execute IPython code directly in the kernel on the current active notebook. - This powerful tool supports: 1. Magic commands (e.g., %timeit, %who, %load, %run, %matplotlib) 2. Shell commands (e.g., !pip install, !ls, !cat) 3. Python code (e.g., print(df.head()), df.info()) - Use cases: - Performance profiling and debugging - Environment exploration and package management - Variable inspection and data analysis - File system operations on Jupyter server - Temporary calculations and quick tests - Input: - `code`(string): IPython code to execute (supports magic commands, shell commands with !, and Python code) - `timeout`(int): Execution timeout in seconds (default: 60s) - Returns: - `list[Union[str, ImageContent]]`: List of outputs from the executed code (supports multimodal output including images) ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/execute_ipython_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Execute IPython code directly in kernel tool.""" import asyncio import logging from typing import Union from mcp.types import ImageContent from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager logger = logging.getLogger(__name__) class ExecuteIpythonTool(BaseTool): """Execute IPython code directly in the kernel on the current active notebook. This powerful tool supports: 1. Magic commands (e.g., %timeit, %who, %load, %run, %matplotlib) 2. Shell commands (e.g., !pip install, !ls, !cat) 3. Python code (e.g., print(df.head()), df.info()) Use cases: - Performance profiling and debugging - Environment exploration and package management - Variable inspection and data analysis - File system operations on Jupyter server - Temporary calculations and quick tests """ @property def name(self) -> str: return "execute_ipython" @property def description(self) -> str: return "Execute IPython code directly in the kernel (supports magic commands, shell commands, and Python code)" async def _execute_via_kernel_manager( self, kernel_manager, kernel_id: str, code: str, timeout: int, safe_extract_outputs_fn ) -> list[Union[str, ImageContent]]: """Execute code using kernel_manager (JUPYTER_SERVER mode). Uses execute_code_local which handles ZMQ message collection properly. """ from jupyter_mcp_server.utils import execute_code_local # Get serverapp from kernel_manager serverapp = kernel_manager.parent # Use centralized execute_code_local function return await execute_code_local( serverapp=serverapp, notebook_path="", # Not needed for execute_ipython code=code, kernel_id=kernel_id, timeout=timeout, logger=logger ) async def _execute_via_notebook_manager( self, notebook_manager: NotebookManager, code: str, timeout: int, ensure_kernel_alive_fn, wait_for_kernel_idle_fn, safe_extract_outputs_fn ) -> list[Union[str, ImageContent]]: """Execute code using notebook_manager (MCP_SERVER mode - original logic).""" # Get current notebook name and kernel current_notebook = notebook_manager.get_current_notebook() or "default" kernel = notebook_manager.get_kernel(current_notebook) if not kernel: # Ensure kernel is alive kernel = ensure_kernel_alive_fn() # Wait for kernel to be idle before executing await wait_for_kernel_idle_fn(kernel, max_wait_seconds=30) logger.info(f"Executing IPython code (MCP_SERVER) with timeout {timeout}s: {code[:100]}...") try: # Execute code directly with kernel execution_task = asyncio.create_task( asyncio.to_thread(kernel.execute, code) ) # Wait for execution with timeout try: outputs = await asyncio.wait_for(execution_task, timeout=timeout) except asyncio.TimeoutError: execution_task.cancel() try: if kernel and hasattr(kernel, 'interrupt'): kernel.interrupt() logger.info("Sent interrupt signal to kernel due to timeout") except Exception as interrupt_err: logger.error(f"Failed to interrupt kernel: {interrupt_err}") return [f"[TIMEOUT ERROR: IPython execution exceeded {timeout} seconds and was interrupted]"] # Process and extract outputs if outputs: result = safe_extract_outputs_fn(outputs['outputs']) logger.info(f"IPython execution completed successfully with {len(result)} outputs") return result else: return ["[No output generated]"] except Exception as e: logger.error(f"Error executing IPython code: {e}") return [f"[ERROR: {str(e)}]"] async def execute( self, mode: ServerMode, server_client=None, contents_manager=None, kernel_manager=None, kernel_spec_manager=None, notebook_manager=None, # Tool-specific parameters code: str = None, timeout: int = 60, kernel_id: str = None, ensure_kernel_alive_fn=None, wait_for_kernel_idle_fn=None, safe_extract_outputs_fn=None, **kwargs ) -> list[Union[str, ImageContent]]: """Execute IPython code directly in the kernel. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: JupyterServerClient (not used) contents_manager: Contents manager (not used) kernel_manager: Kernel manager (for JUPYTER_SERVER mode) kernel_spec_manager: Kernel spec manager (not used) notebook_manager: Notebook manager (for MCP_SERVER mode) code: IPython code to execute (supports magic commands, shell commands with !, and Python code) timeout: Execution timeout in seconds (default: 60s) kernel_id: Kernel ID (for JUPYTER_SERVER mode) ensure_kernel_alive_fn: Function to ensure kernel is alive (for MCP_SERVER mode) wait_for_kernel_idle_fn: Function to wait for kernel idle state (for MCP_SERVER mode) safe_extract_outputs_fn: Function to safely extract outputs Returns: List of outputs from the executed code """ if safe_extract_outputs_fn is None: raise ValueError("safe_extract_outputs_fn is required") # JUPYTER_SERVER mode: Use kernel_manager directly if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None: if kernel_id is None: # Try to get kernel_id from context from jupyter_mcp_server.utils import get_current_notebook_context _, kernel_id = get_current_notebook_context(notebook_manager) if kernel_id is None: # No kernel available - start a new one on demand logger.info("No kernel_id available, starting new kernel for execute_ipython") kernel_id = await kernel_manager.start_kernel() # Store the kernel in notebook_manager if available if notebook_manager is not None: default_notebook = "default" kernel_info = {"id": kernel_id} notebook_manager.add_notebook( default_notebook, kernel_info, server_url="local", token=None, path="notebook.ipynb" # Placeholder path ) notebook_manager.set_current_notebook(default_notebook) logger.info(f"Executing IPython in JUPYTER_SERVER mode with kernel_id={kernel_id}") return await self._execute_via_kernel_manager( kernel_manager=kernel_manager, kernel_id=kernel_id, code=code, timeout=timeout, safe_extract_outputs_fn=safe_extract_outputs_fn ) # MCP_SERVER mode: Use notebook_manager (original behavior) elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: if ensure_kernel_alive_fn is None: raise ValueError("ensure_kernel_alive_fn is required for MCP_SERVER mode") if wait_for_kernel_idle_fn is None: raise ValueError("wait_for_kernel_idle_fn is required for MCP_SERVER mode") logger.info("Executing IPython in MCP_SERVER mode") return await self._execute_via_notebook_manager( notebook_manager=notebook_manager, code=code, timeout=timeout, ensure_kernel_alive_fn=ensure_kernel_alive_fn, wait_for_kernel_idle_fn=wait_for_kernel_idle_fn, safe_extract_outputs_fn=safe_extract_outputs_fn ) else: return [f"[ERROR: Invalid mode or missing required managers]"] ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/delete_cell_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Delete cell tool implementation.""" from typing import Any, Optional from pathlib import Path import nbformat from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager from jupyter_mcp_server.utils import get_current_notebook_context class DeleteCellTool(BaseTool): """Tool to delete a specific cell from a notebook.""" @property def name(self) -> str: return "delete_cell" @property def description(self) -> str: return """Delete a specific cell from the Jupyter notebook. Args: cell_index: Index of the cell to delete (0-based) Returns: str: Success message""" async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str): """Get the YNotebook document if it's currently open in a collaborative session. This follows the jupyter_ai_tools pattern of accessing YDoc through the yroom_manager when the notebook is actively being edited. Args: serverapp: The Jupyter ServerApp instance file_id: The file ID for the document Returns: YNotebook instance or None if not in a collaborative session """ try: yroom_manager = serverapp.web_app.settings.get("yroom_manager") if yroom_manager is None: return None room_id = f"json:notebook:{file_id}" if yroom_manager.has_room(room_id): yroom = yroom_manager.get_room(room_id) notebook = await yroom.get_jupyter_ydoc() return notebook except Exception: # YDoc not available, will fall back to file operations pass return None def _get_cell_index_from_id(self, ydoc, cell_id: str) -> Optional[int]: """Find cell index by cell ID in YDoc.""" for i, ycell in enumerate(ydoc.ycells): if ycell.get("id") == cell_id: return i return None async def _delete_cell_ydoc( self, serverapp: Any, notebook_path: str, cell_index: int ) -> str: """Delete cell using YDoc (collaborative editing mode). Args: serverapp: Jupyter ServerApp instance notebook_path: Path to the notebook cell_index: Index of cell to delete Returns: Success message """ # Get file_id from file_id_manager file_id_manager = serverapp.web_app.settings.get("file_id_manager") if file_id_manager is None: raise RuntimeError("file_id_manager not available in serverapp") file_id = file_id_manager.get_id(notebook_path) # Try to get YDoc ydoc = await self._get_jupyter_ydoc(serverapp, file_id) if ydoc: # Notebook is open in collaborative mode, use YDoc if cell_index < 0 or cell_index >= len(ydoc.ycells): raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {len(ydoc.ycells)} cells." ) cell_type = ydoc.ycells[cell_index].get("cell_type", "unknown") # Delete the cell from YDoc del ydoc.ycells[cell_index] return f"Cell {cell_index} ({cell_type}) deleted successfully." else: # YDoc not available, use file operations return await self._delete_cell_file(notebook_path, cell_index) async def _delete_cell_file( self, notebook_path: str, cell_index: int ) -> str: """Delete cell using file operations (non-collaborative mode). Args: notebook_path: Absolute path to the notebook cell_index: Index of cell to delete Returns: Success message """ # Read notebook file as version 4 for consistency with open(notebook_path, "r", encoding="utf-8") as f: notebook = nbformat.read(f, as_version=4) # Clean transient fields from outputs from jupyter_mcp_server.utils import _clean_notebook_outputs _clean_notebook_outputs(notebook) # Validate index if cell_index < 0 or cell_index >= len(notebook.cells): raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {len(notebook.cells)} cells." ) cell_type = notebook.cells[cell_index].cell_type # Delete the cell notebook.cells.pop(cell_index) # Write back to file with open(notebook_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) return f"Cell {cell_index} ({cell_type}) deleted successfully." async def _delete_cell_websocket( self, notebook_manager: NotebookManager, cell_index: int ) -> str: """Delete cell using WebSocket connection (MCP_SERVER mode). Args: notebook_manager: Notebook manager instance cell_index: Index of cell to delete Returns: Success message """ async with notebook_manager.get_current_connection() as notebook: if cell_index < 0 or cell_index >= len(notebook): raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {len(notebook)} cells." ) deleted_content = notebook.delete_cell(cell_index) return f"Cell {cell_index} ({deleted_content['cell_type']}) deleted successfully." async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, # Tool-specific parameters cell_index: int = None, **kwargs ) -> str: """Execute the delete_cell tool. This tool supports three modes of operation: 1. JUPYTER_SERVER mode with YDoc (collaborative): - Checks if notebook is open in a collaborative session - Uses YDoc for real-time collaborative editing - Changes are immediately visible to all connected users 2. JUPYTER_SERVER mode without YDoc (file-based): - Falls back to direct file operations using nbformat - Suitable when notebook is not actively being edited 3. MCP_SERVER mode (WebSocket): - Uses WebSocket connection to remote Jupyter server - Accesses YDoc through NbModelClient Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: HTTP client for MCP_SERVER mode contents_manager: Direct API access for JUPYTER_SERVER mode notebook_manager: Notebook manager instance cell_index: Index of the cell to delete (0-based) **kwargs: Additional parameters Returns: Success message """ if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations from jupyter_mcp_server.jupyter_extension.context import get_server_context context = get_server_context() serverapp = context.serverapp notebook_path, _ = get_current_notebook_context(notebook_manager) # Resolve to absolute path if serverapp and not Path(notebook_path).is_absolute(): root_dir = serverapp.root_dir notebook_path = str(Path(root_dir) / notebook_path) if serverapp: # Try YDoc approach first return await self._delete_cell_ydoc(serverapp, notebook_path, cell_index) else: # Fall back to file operations return await self._delete_cell_file(notebook_path, cell_index) elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: # MCP_SERVER mode: Use WebSocket connection return await self._delete_cell_websocket(notebook_manager, cell_index) else: raise ValueError(f"Invalid mode or missing required clients: mode={mode}") ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/list_kernels_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """List all available kernels tool.""" from typing import Any, Optional, List, Dict from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.utils import format_TSV class ListKernelsTool(BaseTool): """List all available kernels in the Jupyter server. This tool shows all running and available kernel sessions on the Jupyter server, including their IDs, names, states, connection information, and kernel specifications. Useful for monitoring kernel resources and identifying specific kernels for connection. """ @property def name(self) -> str: return "list_kernels" @property def description(self) -> str: return "List all available kernels in the Jupyter server" def _list_kernels_http(self, server_client: JupyterServerClient) -> List[Dict[str, str]]: """List kernels using HTTP API (MCP_SERVER mode).""" try: # Get all kernels from the Jupyter server kernels = server_client.kernels.list_kernels() if not kernels: return [] # Get kernel specifications for additional details kernels_specs = server_client.kernelspecs.list_kernelspecs() # Create enhanced kernel information list output = [] for kernel in kernels: kernel_info = { "id": kernel.id or "unknown", "name": kernel.name or "unknown", "state": "unknown", "connections": "unknown", "last_activity": "unknown", "display_name": "unknown", "language": "unknown", "env": "unknown" } # Get kernel state - this might vary depending on the API version if hasattr(kernel, 'execution_state'): kernel_info["state"] = kernel.execution_state elif hasattr(kernel, 'state'): kernel_info["state"] = kernel.state # Get connection count if hasattr(kernel, 'connections'): kernel_info["connections"] = str(kernel.connections) # Get last activity if hasattr(kernel, 'last_activity') and kernel.last_activity: if hasattr(kernel.last_activity, 'strftime'): kernel_info["last_activity"] = kernel.last_activity.strftime("%Y-%m-%d %H:%M:%S") else: kernel_info["last_activity"] = str(kernel.last_activity) output.append(kernel_info) # Enhance kernel info with specifications for kernel in output: kernel_name = kernel["name"] if hasattr(kernels_specs, 'kernelspecs') and kernel_name in kernels_specs.kernelspecs: kernel_spec = kernels_specs.kernelspecs[kernel_name] if hasattr(kernel_spec, 'spec'): if hasattr(kernel_spec.spec, 'display_name'): kernel["display_name"] = kernel_spec.spec.display_name if hasattr(kernel_spec.spec, 'language'): kernel["language"] = kernel_spec.spec.language if hasattr(kernel_spec.spec, 'env'): # Convert env dict to a readable string format env_dict = kernel_spec.spec.env if env_dict: env_str = "; ".join([f"{k}={v}" for k, v in env_dict.items()]) kernel["env"] = env_str[:100] + "..." if len(env_str) > 100 else env_str return output except Exception as e: raise RuntimeError(f"Error listing kernels via HTTP: {str(e)}") async def _list_kernels_local( self, kernel_manager: Any, kernel_spec_manager: Any ) -> List[Dict[str, str]]: """List kernels using local kernel_manager API (JUPYTER_SERVER mode).""" try: # Get all running kernels - list_kernels() returns dicts with kernel info kernel_infos = list(kernel_manager.list_kernels()) if not kernel_infos: return [] # Get kernel specifications kernel_specs = kernel_spec_manager.get_all_specs() if kernel_spec_manager else {} # Create enhanced kernel information list output = [] for kernel_info_dict in kernel_infos: # kernel_info_dict is already a dict with kernel information kernel_id = kernel_info_dict.get('id', 'unknown') kernel_name = kernel_info_dict.get('name', 'unknown') kernel_info = { "id": kernel_id, "name": kernel_name, "state": kernel_info_dict.get('execution_state', 'unknown'), "connections": str(kernel_info_dict.get('connections', 'unknown')), "last_activity": "unknown", "display_name": "unknown", "language": "unknown", "env": "unknown" } # Format last activity if present last_activity = kernel_info_dict.get('last_activity') if last_activity: if hasattr(last_activity, 'strftime'): kernel_info["last_activity"] = last_activity.strftime("%Y-%m-%d %H:%M:%S") else: kernel_info["last_activity"] = str(last_activity) output.append(kernel_info) # Enhance kernel info with specifications for kernel in output: kernel_name = kernel["name"] if kernel_name in kernel_specs: spec = kernel_specs[kernel_name].get('spec', {}) if 'display_name' in spec: kernel["display_name"] = spec['display_name'] if 'language' in spec: kernel["language"] = spec['language'] if 'env' in spec and spec['env']: env_dict = spec['env'] env_str = "; ".join([f"{k}={v}" for k, v in env_dict.items()]) kernel["env"] = env_str[:100] + "..." if len(env_str) > 100 else env_str return output except Exception as e: raise RuntimeError(f"Error listing kernels locally: {str(e)}") async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, **kwargs ) -> str: """List all available kernels. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: HTTP client for MCP_SERVER mode kernel_manager: Direct kernel manager access for JUPYTER_SERVER mode kernel_spec_manager: Kernel spec manager for JUPYTER_SERVER mode **kwargs: Additional parameters (unused) Returns: Tab-separated table with columns: ID, Name, Display_Name, Language, State, Connections, Last_Activity, Environment """ # Get kernel info based on mode if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None: kernel_list = await self._list_kernels_local(kernel_manager, kernel_spec_manager) elif mode == ServerMode.MCP_SERVER and server_client is not None: kernel_list = self._list_kernels_http(server_client) else: raise ValueError(f"Invalid mode or missing required managers/clients: mode={mode}") if not kernel_list: return "No kernels found on the Jupyter server." try: # Create TSV formatted output headers = ["ID", "Name", "Display_Name", "Language", "State", "Connections", "Last_Activity", "Environment"] rows = [] for kernel in kernel_list: rows.append([kernel['id'], kernel['name'], kernel['display_name'], kernel['language'], kernel['state'], kernel['connections'], kernel['last_activity'], kernel['env']]) return format_TSV(headers, rows) except Exception as e: return f"Error formatting kernel list: {str(e)}" ``` -------------------------------------------------------------------------------- /.github/copilot-instructions.md: -------------------------------------------------------------------------------- ```markdown # Jupyter MCP Server **Always reference these instructions first and fallback to search or bash commands only when you encounter unexpected information that does not match the info here.** Jupyter MCP Server is a Python-based Model Context Protocol (MCP) server implementation that enables real-time interaction with Jupyter Notebooks. The project uses a modern Python build system with hatch, and includes comprehensive testing, linting, and documentation. ## Working Effectively ### Environment Setup - **Python Requirements**: Python 3.10 or higher (tested with 3.9-3.13) - **Network Considerations**: PyPI installs may fail due to SSL certificate issues or timeout limitations. This is a known environment constraint. ### Build and Install (CRITICAL: Network Limitations) ```bash # Standard installation (may fail with network issues) pip install ".[test,lint,typing]" # Alternative if pip install fails: # 1. Install dependencies individually with longer timeouts pip install --timeout=300 pytest pip install --timeout=300 ruff pip install --timeout=300 mypy # 2. Or use Docker approach (preferred for consistency) docker build -t jupyter-mcp-server . ``` **NETWORK TIMEOUT WARNING**: pip install commands may fail with SSL certificate errors or read timeouts when connecting to PyPI. If installs fail: - Try increasing timeout: `pip install --timeout=300` - Use Docker build which handles dependencies internally - Document the network limitation in any testing notes ### Core Development Commands ```bash # Development installation (when network allows) make dev # Equivalent to: pip install ".[test,lint,typing]" # Basic installation make install # Equivalent to: pip install . # Build the package make build # Equivalent to: pip install build && python -m build . ``` ### Testing (CRITICAL: Use Long Timeouts) ```bash # Run tests using hatch (when available) make test # Equivalent to: hatch test # Run tests directly with pytest (when network allows install) pytest . # NEVER CANCEL: Test suite timing expectations # - Full test suite: Allow 15-20 minutes minimum # - Network-dependent tests may take longer # - Set timeout to 30+ minutes for safety ``` **VALIDATION REQUIREMENT**: When testing is not possible due to network issues, verify at minimum: ```bash # Syntax validation (always works) python -m py_compile jupyter_mcp_server/server.py find . -name "*.py" -exec python -m py_compile {} \; # Import validation PYTHONPATH=. python -c "import jupyter_mcp_server; print('Import successful')" ``` ### Linting and Code Quality (CRITICAL: Use Long Timeouts) ```bash # Full linting pipeline (when network allows) bash ./.github/workflows/lint.sh # Individual linting commands: pip install -e ".[lint,typing]" mypy --install-types --non-interactive . # May take 10+ minutes, NEVER CANCEL ruff check . # Quick, usually <1 minute mdformat --check *.md # Quick, usually <1 minute pipx run 'validate-pyproject[all]' pyproject.toml # 2-3 minutes # TIMING WARNING: mypy type checking can take 10+ minutes on first run # Set timeout to 20+ minutes for mypy operations ``` ### Running the Application #### Local Development Mode ```bash # Start with streamable HTTP transport make start # Equivalent to: jupyter-mcp-server start \ --transport streamable-http \ --document-url http://localhost:8888 \ --document-id notebook.ipynb \ --document-token MY_TOKEN \ --runtime-url http://localhost:8888 \ --start-new-runtime true \ --runtime-token MY_TOKEN \ --port 4040 ``` #### JupyterLab Setup (Required for Testing) ```bash # Start JupyterLab server for MCP integration make jupyterlab # Equivalent to: pip uninstall -y pycrdt datalayer_pycrdt pip install datalayer_pycrdt jupyter lab \ --port 8888 \ --ip 0.0.0.0 \ --ServerApp.root_dir ./dev/content \ --IdentityProvider.token MY_TOKEN ``` #### Docker Deployment ```bash # Build Docker image (NEVER CANCEL: Build takes 10-15 minutes) make build-docker # Takes 10-15 minutes, set timeout to 20+ minutes # Run with Docker make start-docker # Or manually: docker run -i --rm \ -e DOCUMENT_URL=http://localhost:8888 \ -e DOCUMENT_ID=notebook.ipynb \ -e DOCUMENT_TOKEN=MY_TOKEN \ -e RUNTIME_URL=http://localhost:8888 \ -e START_NEW_RUNTIME=true \ -e RUNTIME_TOKEN=MY_TOKEN \ --network=host \ datalayer/jupyter-mcp-server:latest ``` ### Manual Validation Scenarios **When full testing is not possible due to network constraints, always verify:** 1. **Syntax and Import Validation**: ```bash # Validate all Python files compile find . -name "*.py" -exec python -m py_compile {} \; # Test local imports work PYTHONPATH=. python -c "import jupyter_mcp_server; print('SUCCESS')" ``` 2. **Configuration Validation**: ```bash # Verify pyproject.toml is valid python -c "import tomllib; tomllib.load(open('pyproject.toml', 'rb'))" # Test module structure python -c "import jupyter_mcp_server.server, jupyter_mcp_server.models" ``` 3. **Documentation Build** (when Node.js available): ```bash cd docs/ npm install # May have network issues npm run build # 3-5 minutes, set timeout to 10+ minutes ``` ## Project Structure and Navigation ### Key Directories - **`jupyter_mcp_server/`**: Main Python package - `server.py`: Core MCP server implementation with FastMCP integration - `models.py`: Pydantic data models for document and runtime handling - `utils.py`: Utility functions for output extraction and processing - `tests/`: Unit tests (internal package tests) - **`tests/`**: Integration tests using pytest-asyncio - **`docs/`**: Docusaurus-based documentation site (Node.js/React) - **`dev/content/`**: Development Jupyter notebook files for testing - **`.github/workflows/`**: CI/CD pipeline definitions ### Important Files - **`pyproject.toml`**: Build configuration, dependencies, and tool settings - **`Makefile`**: Development workflow automation - **`Dockerfile`**: Container build definition - **`.github/workflows/lint.sh`**: Linting pipeline script - **`pytest.ini`**: Test configuration ### Frequently Modified Areas - **Server Logic**: `jupyter_mcp_server/server.py` - Main MCP server implementation - **Data Models**: `jupyter_mcp_server/models.py` - When adding new MCP tools or changing data structures - **Tests**: `tests/test_mcp.py` - Integration tests for MCP functionality - **Documentation**: `docs/src/` - When updating API documentation or user guides ## Common Tasks and Gotchas ### Adding New MCP Tools 1. Add tool definition in `jupyter_mcp_server/server.py` 2. Update models in `jupyter_mcp_server/models.py` if needed 3. Add tests in `tests/test_mcp.py` 4. Update documentation in `docs/` ### Dependency Management - **Core deps**: Defined in `pyproject.toml` dependencies section - **Dev deps**: Use `[test,lint,typing]` optional dependencies - **Special handling**: `datalayer_pycrdt` has specific version requirements (0.12.17) ### CI/CD Pipeline Expectations - **Build Matrix**: Tests run on Ubuntu, macOS, Windows with Python 3.9, 3.13 - **Critical Timing**: Full CI pipeline takes 20-30 minutes - **Required Checks**: pytest, ruff, mypy, mdformat, pyproject validation ### Environment Variables for Testing ```bash # Required for MCP server operation export DOCUMENT_URL="http://localhost:8888" export DOCUMENT_TOKEN="MY_TOKEN" export DOCUMENT_ID="notebook.ipynb" export RUNTIME_URL="http://localhost:8888" export RUNTIME_TOKEN="MY_TOKEN" ``` ## Network Limitations and Workarounds **CRITICAL CONSTRAINT**: This development environment has limited PyPI connectivity with SSL certificate issues and timeout problems. ### Known Working Commands ```bash # These always work (no network required): python -m py_compile <file> # Syntax validation PYTHONPATH=. python -c "import ..." # Import testing python -c "import tomllib; ..." # Config validation git operations # Version control docker build (when base images cached) ``` ### Commands That May Fail ```bash pip install <anything> # Network timeouts/SSL issues npm install # Network limitations mypy --install-types # Downloads type stubs hatch test # May need PyPI for dependencies ``` ### Required Workarounds 1. **Document network failures** when they occur: "pip install fails due to network limitations" 2. **Use syntax validation** instead of full testing when pip installs fail 3. **Prefer Docker approach** for consistent builds when possible 4. **Set generous timeouts** (60+ minutes) for any network operations 5. **Never cancel long-running commands** - document expected timing instead ## Timing Expectations **NEVER CANCEL these operations - they are expected to take significant time:** - **pip install ".[test,lint,typing]"**: 5-10 minutes (when network works) - **mypy --install-types --non-interactive**: 10-15 minutes first run - **Docker build**: 10-15 minutes - **Full test suite**: 15-20 minutes - **Documentation build**: 3-5 minutes - **CI pipeline**: 20-30 minutes total Always set timeouts to at least double these estimates to account for network variability. ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/overwrite_cell_source_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Overwrite cell source tool implementation.""" import difflib import nbformat from pathlib import Path from typing import Any, Optional from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager from jupyter_mcp_server.utils import get_current_notebook_context class OverwriteCellSourceTool(BaseTool): """Tool to overwrite the source of an existing cell.""" @property def name(self) -> str: return "overwrite_cell_source" @property def description(self) -> str: return """Overwrite the source of an existing cell. Note this does not execute the modified cell by itself. Args: cell_index: Index of the cell to overwrite (0-based) cell_source: New cell source - must match existing cell type Returns: str: Success message with diff showing changes made""" async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str): """Get the YNotebook document if it's currently open in a collaborative session.""" try: yroom_manager = serverapp.web_app.settings.get("yroom_manager") if yroom_manager is None: return None room_id = f"json:notebook:{file_id}" if yroom_manager.has_room(room_id): yroom = yroom_manager.get_room(room_id) notebook = await yroom.get_jupyter_ydoc() return notebook except Exception: pass return None def _generate_diff(self, old_source: str, new_source: str) -> str: """Generate unified diff between old and new source.""" old_lines = old_source.splitlines(keepends=False) new_lines = new_source.splitlines(keepends=False) diff_lines = list(difflib.unified_diff( old_lines, new_lines, lineterm='', n=3 # Number of context lines )) # Remove the first 3 lines (file headers) from unified_diff output if len(diff_lines) > 3: return '\n'.join(diff_lines[3:]) return "no changes detected" async def _overwrite_cell_ydoc( self, serverapp: Any, notebook_path: str, cell_index: int, cell_source: str ) -> str: """Overwrite cell using YDoc (collaborative editing mode).""" # Get file_id from file_id_manager file_id_manager = serverapp.web_app.settings.get("file_id_manager") if file_id_manager is None: raise RuntimeError("file_id_manager not available in serverapp") file_id = file_id_manager.get_id(notebook_path) # Try to get YDoc ydoc = await self._get_jupyter_ydoc(serverapp, file_id) if ydoc: # Notebook is open in collaborative mode, use YDoc if cell_index < 0 or cell_index >= len(ydoc.ycells): raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {len(ydoc.ycells)} cells." ) # Get original cell content old_source_raw = ydoc.ycells[cell_index].get("source", "") if isinstance(old_source_raw, list): old_source = "".join(old_source_raw) else: old_source = str(old_source_raw) # Set new cell source ydoc.ycells[cell_index]["source"] = cell_source # Generate diff diff_content = self._generate_diff(old_source, cell_source) if not diff_content.strip() or diff_content == "no changes detected": return f"Cell {cell_index} overwritten successfully - no changes detected" return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```" else: # YDoc not available, use file operations return await self._overwrite_cell_file(notebook_path, cell_index, cell_source) async def _overwrite_cell_file( self, notebook_path: str, cell_index: int, cell_source: str ) -> str: """Overwrite cell using file operations (non-collaborative mode).""" # Read notebook file as version 4 for consistency with open(notebook_path, "r", encoding="utf-8") as f: notebook = nbformat.read(f, as_version=4) # Clean transient fields from outputs from jupyter_mcp_server.utils import _clean_notebook_outputs _clean_notebook_outputs(notebook) if cell_index < 0 or cell_index >= len(notebook.cells): raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {len(notebook.cells)} cells." ) # Get original cell content old_source = notebook.cells[cell_index].source # Set new cell source notebook.cells[cell_index].source = cell_source # Write back to file with open(notebook_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) # Generate diff diff_content = self._generate_diff(old_source, cell_source) if not diff_content.strip() or diff_content == "no changes detected": return f"Cell {cell_index} overwritten successfully - no changes detected" return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```" async def _overwrite_cell_websocket( self, notebook_manager: NotebookManager, cell_index: int, cell_source: str ) -> str: """Overwrite cell using WebSocket connection (MCP_SERVER mode).""" async with notebook_manager.get_current_connection() as notebook: if cell_index < 0 or cell_index >= len(notebook): raise ValueError(f"Cell index {cell_index} out of range") # Get original cell content old_source_raw = notebook[cell_index].get("source", "") if isinstance(old_source_raw, list): old_source = "".join(old_source_raw) else: old_source = str(old_source_raw) # Set new cell content notebook.set_cell_source(cell_index, cell_source) # Generate diff diff_content = self._generate_diff(old_source, cell_source) if not diff_content.strip() or diff_content == "no changes detected": return f"Cell {cell_index} overwritten successfully - no changes detected" return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```" async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, # Tool-specific parameters cell_index: int = None, cell_source: str = None, **kwargs ) -> str: """Execute the overwrite_cell_source tool. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) contents_manager: Direct API access for JUPYTER_SERVER mode notebook_manager: Notebook manager instance cell_index: Index of the cell to overwrite (0-based) cell_source: New cell source **kwargs: Additional parameters Returns: Success message with diff """ if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations from jupyter_mcp_server.jupyter_extension.context import get_server_context context = get_server_context() serverapp = context.serverapp notebook_path, _ = get_current_notebook_context(notebook_manager) # Resolve to absolute path if serverapp and not Path(notebook_path).is_absolute(): root_dir = serverapp.root_dir notebook_path = str(Path(root_dir) / notebook_path) if serverapp: return await self._overwrite_cell_ydoc(serverapp, notebook_path, cell_index, cell_source) else: return await self._overwrite_cell_file(notebook_path, cell_index, cell_source) elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: # MCP_SERVER mode: Use WebSocket connection return await self._overwrite_cell_websocket(notebook_manager, cell_index, cell_source) else: raise ValueError(f"Invalid mode or missing required clients: mode={mode}") ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/jupyter_extension/extension.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ Jupyter Server Extension for MCP Protocol This extension exposes MCP tools directly from a running Jupyter Server, allowing MCP clients to connect to the Jupyter Server's MCP endpoints. """ import logging from traitlets import Unicode, Bool from jupyter_server.extension.application import ExtensionApp, ExtensionAppJinjaMixin from jupyter_server.utils import url_path_join from jupyter_mcp_server.jupyter_extension.context import get_server_context from jupyter_mcp_server.jupyter_extension.handlers import ( MCPHealthHandler, MCPToolsListHandler, MCPToolsCallHandler, ) logger = logging.getLogger(__name__) class JupyterMCPServerExtensionApp(ExtensionAppJinjaMixin, ExtensionApp): """ Jupyter Server Extension for MCP Server. This extension allows MCP clients to connect to Jupyter Server and use MCP tools to interact with notebooks and kernels. Configuration: c.JupyterMCPServerExtensionApp.document_url = "local" # or http://... c.JupyterMCPServerExtensionApp.runtime_url = "local" # or http://... c.JupyterMCPServerExtensionApp.document_id = "notebook.ipynb" c.JupyterMCPServerExtensionApp.start_new_runtime = True # Start new kernel c.JupyterMCPServerExtensionApp.runtime_id = "kernel-id" # Or connect to existing """ # Extension metadata name = "jupyter_mcp_server" default_url = "/mcp" load_other_extensions = True # Configuration traits document_url = Unicode( "local", config=True, help='Document URL - use "local" for local serverapp access or http://... for remote' ) runtime_url = Unicode( "local", config=True, help='Runtime URL - use "local" for local serverapp access or http://... for remote' ) document_id = Unicode( "notebook.ipynb", config=True, help='Default document ID (notebook path)' ) start_new_runtime = Bool( False, config=True, help='Whether to start a new kernel runtime on initialization' ) runtime_id = Unicode( "", config=True, help='Existing kernel ID to connect to (if not starting new runtime)' ) document_token = Unicode( "", config=True, help='Authentication token for document server (if remote)' ) runtime_token = Unicode( "", config=True, help='Authentication token for runtime server (if remote)' ) provider = Unicode( "jupyter", config=True, help='Provider type for document/runtime' ) def initialize_settings(self): """ Initialize extension settings. This is called during extension loading to set up configuration and update the server context. """ # Reduce noise from httpx logging (used by JupyterLab for PyPI extension discovery) logging.getLogger("httpx").setLevel(logging.WARNING) logger.info(f"Initializing Jupyter MCP Server Extension") logger.info(f" Document URL: {self.document_url}") logger.info(f" Runtime URL: {self.runtime_url}") logger.info(f" Document ID: {self.document_id}") logger.info(f" Start New Runtime: {self.start_new_runtime}") if self.runtime_id: logger.info(f" Runtime ID: {self.runtime_id}") # Update the global server context context = get_server_context() context.update( context_type="JUPYTER_SERVER", serverapp=self.serverapp, document_url=self.document_url, runtime_url=self.runtime_url ) # Update global MCP configuration from jupyter_mcp_server.config import get_config config = get_config() config.document_url = self.document_url config.runtime_url = self.runtime_url config.document_id = self.document_id config.document_token = self.document_token if self.document_token else None config.runtime_token = self.runtime_token if self.runtime_token else None config.start_new_runtime = self.start_new_runtime config.runtime_id = self.runtime_id if self.runtime_id else None config.provider = self.provider # Store configuration in settings for handlers self.settings.update({ "mcp_document_url": self.document_url, "mcp_runtime_url": self.runtime_url, "mcp_document_id": self.document_id, "mcp_document_token": self.document_token, "mcp_runtime_token": self.runtime_token, "mcp_start_new_runtime": self.start_new_runtime, "mcp_runtime_id": self.runtime_id, "mcp_provider": self.provider, "mcp_serverapp": self.serverapp, }) # Trigger auto-enrollment if document_id is configured # Note: Auto-enrollment supports 3 modes: # 1. With existing kernel (runtime_id set) # 2. With new kernel (start_new_runtime=True) # 3. Without kernel - notebook-only mode (both False/None) if self.document_id: from tornado.ioloop import IOLoop from jupyter_mcp_server.enroll import auto_enroll_document from jupyter_mcp_server.server import notebook_manager, use_notebook_tool, server_context # Schedule auto-enrollment to run after Jupyter Server is fully started async def _run_auto_enrollment(): try: logger.info(f"Running auto-enrollment for document '{self.document_id}'") await auto_enroll_document( config=config, notebook_manager=notebook_manager, use_notebook_tool=use_notebook_tool, server_context=server_context, ) logger.info(f"Auto-enrollment completed for document '{self.document_id}'") except Exception as e: logger.error(f"Failed to auto-enroll document: {e}", exc_info=True) # Schedule the enrollment to run on the IOLoop after server starts # Use callback with delay to ensure server is fully initialized IOLoop.current().call_later(1.0, lambda: IOLoop.current().add_callback(_run_auto_enrollment)) logger.info("Jupyter MCP Server Extension settings initialized") def initialize_handlers(self): """ Register MCP protocol handlers. Strategy: Implement MCP protocol directly in Tornado handlers that call the MCP tools from server.py. This avoids the complexity of wrapping the Starlette ASGI app. Endpoints: - GET/POST /mcp - MCP protocol endpoint (SSE-based) - GET /mcp/healthz - Health check (Tornado handler) - GET /mcp/tools/list - List available tools (Tornado handler) - POST /mcp/tools/call - Execute a tool (Tornado handler) """ base_url = self.serverapp.base_url # Import here to avoid circular imports from jupyter_mcp_server.jupyter_extension.handlers import MCPSSEHandler # Define handlers handlers = [ # MCP protocol endpoint - SSE-based handler # Match /mcp with or without trailing slash (url_path_join(base_url, "mcp/?"), MCPSSEHandler), # Utility endpoints (optional, for debugging) (url_path_join(base_url, "mcp/healthz"), MCPHealthHandler), (url_path_join(base_url, "mcp/tools/list"), MCPToolsListHandler), (url_path_join(base_url, "mcp/tools/call"), MCPToolsCallHandler), ] # Register handlers self.handlers.extend(handlers) # Log registered endpoints using url_path_join for consistent formatting logger.info(f"Registered MCP handlers at {url_path_join(base_url, 'mcp/')}") logger.info(f" - MCP protocol: {url_path_join(base_url, 'mcp')} (SSE-based)") logger.info(f" - Health check: {url_path_join(base_url, 'mcp/healthz')}") logger.info(f" - List tools: {url_path_join(base_url, 'mcp/tools/list')}") logger.info(f" - Call tool: {url_path_join(base_url, 'mcp/tools/call')}") def initialize_templates(self): """ Initialize Jinja templates. Not needed for API-only extension, but included for completeness. """ pass async def stop_extension(self): """ Clean up when extension stops. Shutdown any managed kernels and cleanup resources. """ logger.info("Stopping Jupyter MCP Server Extension") # Reset server context context = get_server_context() context.reset() logger.info("Jupyter MCP Server Extension stopped") # Extension loading functions def _jupyter_server_extension_points(): """ Declare the Jupyter Server extension. Returns: List of extension metadata dictionaries """ return [ { "module": "jupyter_mcp_server.jupyter_extension.extension", "app": JupyterMCPServerExtensionApp } ] def _load_jupyter_server_extension(serverapp): """ Load the extension (for backward compatibility). Args: serverapp: Jupyter ServerApp instance """ extension = JupyterMCPServerExtensionApp() extension.serverapp = serverapp extension.initialize_settings() extension.initialize_handlers() extension.initialize_templates() # For classic Notebook server compatibility load_jupyter_server_extension = _load_jupyter_server_extension ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ Pytest configuration and shared fixtures for Jupyter MCP Server tests. This module provides: - jupyter_server fixture: Session-scoped Jupyter Lab server - jupyter_server_with_extension fixture: Jupyter Lab with MCP extension - jupyter_mcp_server fixture: Standalone MCP server instance - mcp_client fixture: MCP protocol client for testing - _start_server helper: Generic server startup with health checks - JUPYTER_TOKEN: Authentication token for Jupyter API """ import logging import os import socket import subprocess import time from http import HTTPStatus import pytest import pytest_asyncio import requests from requests.exceptions import ConnectionError JUPYTER_TOKEN = "MY_TOKEN" # Test mode configuration - set to False to skip testing specific modes TEST_MCP_SERVER = os.environ.get("TEST_MCP_SERVER", "true").lower() == "true" TEST_JUPYTER_SERVER = os.environ.get("TEST_JUPYTER_SERVER", "true").lower() == "true" def _start_server( name: str, host: str, port: int, command: list, readiness_endpoint: str, max_retries: int = 5 ): """A Helper that starts a web server as a python subprocess and wait until it's ready to accept connections This method can be used to start both Jupyter and Jupyter MCP servers Uses subprocess.DEVNULL to prevent pipe blocking issues with verbose output. """ _log_prefix = name url = f"http://{host}:{port}" url_readiness = f"{url}{readiness_endpoint}" logging.info(f"{_log_prefix}: starting ...") logging.debug(f"{_log_prefix}: command: {' '.join(command)}") # Use DEVNULL to prevent any pipe blocking issues p_serv = subprocess.Popen( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) _log_prefix = f"{_log_prefix} [{p_serv.pid}]" while max_retries > 0: # Check if process died poll_result = p_serv.poll() if poll_result is not None: logging.error(f"{_log_prefix}: process died with exit code {poll_result}") pytest.fail(f"{name} failed to start (exit code {poll_result}). Check if port {port} is available.") try: response = requests.get(url_readiness, timeout=10) if response is not None and response.status_code == HTTPStatus.OK: logging.info(f"{_log_prefix}: started ({url})!") yield url break except (ConnectionError, requests.exceptions.Timeout): logging.debug( f"{_log_prefix}: waiting to accept connections [{max_retries}]" ) time.sleep(2) max_retries -= 1 if not max_retries: logging.error(f"{_log_prefix}: fail to start after retries. Check if port {port} is available.") pytest.fail(f"{name} failed to start after max retries. Port {port} may be in use or server crashed.") logging.debug(f"{_log_prefix}: stopping ...") try: p_serv.terminate() p_serv.wait(timeout=5) # Reduced timeout for faster cleanup logging.info(f"{_log_prefix}: stopped") except subprocess.TimeoutExpired: logging.warning(f"{_log_prefix}: terminate timeout, forcing kill") p_serv.kill() try: p_serv.wait(timeout=2) except subprocess.TimeoutExpired: logging.error(f"{_log_prefix}: kill timeout, process may be stuck") except Exception as e: logging.error(f"{_log_prefix}: error during shutdown: {e}") @pytest.fixture(scope="session") def jupyter_server(): """Start the Jupyter server and returns its URL This is a session-scoped fixture that starts a single Jupyter Lab instance for all tests. Both MCP_SERVER and JUPYTER_SERVER mode tests can share this. Only starts if at least one test mode is enabled. """ if not TEST_MCP_SERVER and not TEST_JUPYTER_SERVER: pytest.skip("Both TEST_MCP_SERVER and TEST_JUPYTER_SERVER are disabled") host = "localhost" port = 8888 yield from _start_server( name="JupyterLab", host=host, port=port, command=[ "jupyter", "lab", "--port", str(port), "--IdentityProvider.token", JUPYTER_TOKEN, "--ip", host, "--ServerApp.root_dir", "./dev/content", "--no-browser", ], readiness_endpoint="/api", max_retries=10, ) @pytest.fixture(scope="session") def jupyter_server_with_extension(): """Start Jupyter server with MCP extension loaded (JUPYTER_SERVER mode) This fixture starts Jupyter Lab with the jupyter_mcp_server extension enabled, allowing tests to verify JUPYTER_SERVER mode functionality (YDoc, direct kernel access, etc). Only starts if TEST_JUPYTER_SERVER=True, otherwise skips. """ if not TEST_JUPYTER_SERVER: pytest.skip("TEST_JUPYTER_SERVER is disabled") host = "localhost" port = 8889 # Different port to avoid conflicts yield from _start_server( name="JupyterLab+MCP", host=host, port=port, command=[ "jupyter", "lab", "--port", str(port), "--IdentityProvider.token", JUPYTER_TOKEN, "--ip", host, "--ServerApp.root_dir", "./dev/content", "--no-browser", # Load the MCP extension "--ServerApp.jpserver_extensions", '{"jupyter_mcp_server": True}', ], readiness_endpoint="/api", max_retries=10, ) ############################################################################### # MCP Server Fixtures ############################################################################### @pytest.fixture(scope="function") def jupyter_mcp_server(request, jupyter_server): """Start the Jupyter MCP server and returns its URL This fixture starts a standalone MCP server that communicates with Jupyter via HTTP (MCP_SERVER mode). It can be parametrized to control runtime startup. Parameters: request.param (bool): Whether to start a new kernel runtime (default: True) """ # Find an available port def find_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.listen(1) port = s.getsockname()[1] return port host = "localhost" port = find_free_port() start_new_runtime = True try: start_new_runtime = request.param except AttributeError: # fixture not parametrized pass yield from _start_server( name="Jupyter MCP", host=host, port=port, command=[ "python", "-m", "jupyter_mcp_server", "--transport", "streamable-http", "--document-url", jupyter_server, "--document-id", "notebook.ipynb", "--document-token", JUPYTER_TOKEN, "--runtime-url", jupyter_server, "--start-new-runtime", str(start_new_runtime), "--runtime-token", JUPYTER_TOKEN, "--port", str(port), ], readiness_endpoint="/api/healthz", ) def _get_test_params(): """Generate test parameters based on TEST_MCP_SERVER and TEST_JUPYTER_SERVER flags""" params = [] if TEST_MCP_SERVER: params.append("mcp_server") if TEST_JUPYTER_SERVER: params.append("jupyter_extension") if not params: pytest.skip("Both TEST_MCP_SERVER and TEST_JUPYTER_SERVER are disabled") return params @pytest.fixture(scope="function", params=_get_test_params()) def mcp_server_url(request): """Parametrized fixture that provides both MCP_SERVER and JUPYTER_SERVER mode URLs This fixture enables testing the same functionality against both deployment modes: - mcp_server: Standalone MCP server (HTTP transport) - when TEST_MCP_SERVER=True - jupyter_extension: Jupyter extension mode (direct API access) - when TEST_JUPYTER_SERVER=True Both expose MCP protocol endpoints that can be tested with MCPClient. You can control which modes to test via environment variables: TEST_MCP_SERVER=true/false (default: true) TEST_JUPYTER_SERVER=true/false (default: true) Parameters: request.param (str): Either "mcp_server" or "jupyter_extension" Returns: str: URL of the MCP endpoint for the selected mode """ if request.param == "mcp_server": # Get jupyter_server fixture dynamically jupyter_server = request.getfixturevalue("jupyter_server") # Start standalone MCP server import socket def find_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.listen(1) port = s.getsockname()[1] return port host = "localhost" port = find_free_port() yield from _start_server( name="Jupyter MCP", host=host, port=port, command=[ "python", "-m", "jupyter_mcp_server", "--transport", "streamable-http", "--document-url", jupyter_server, "--document-id", "notebook.ipynb", "--document-token", JUPYTER_TOKEN, "--runtime-url", jupyter_server, "--start-new-runtime", "True", "--runtime-token", JUPYTER_TOKEN, "--port", str(port), ], readiness_endpoint="/api/healthz", ) else: # jupyter_extension # Get jupyter_server_with_extension fixture dynamically jupyter_server_with_extension = request.getfixturevalue("jupyter_server_with_extension") # Use the extension's MCP endpoints (note: no /mcp suffix, the extension handles routing) yield jupyter_server_with_extension ############################################################################### @pytest_asyncio.fixture(scope="function") async def mcp_client(jupyter_mcp_server): """An MCP client that can connect to the Jupyter MCP server This fixture provides an MCPClient instance configured to connect to the standalone MCP server. It requires the test_common module. Returns: MCPClient: Configured client for MCP protocol communication """ from .test_common import MCPClient return MCPClient(jupyter_mcp_server) @pytest.fixture(scope="function") def mcp_client_parametrized(mcp_server_url): """MCP client that works with both server modes via parametrization This fixture creates an MCPClient that can connect to either: - Standalone MCP server (MCP_SERVER mode) - Jupyter extension MCP endpoints (JUPYTER_SERVER mode) Returns: MCPClient: Configured client for the parametrized server mode """ from .test_common import MCPClient return MCPClient(mcp_server_url) ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/notebook_manager.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ Unified Notebook and Kernel Management Module This module provides centralized management for Jupyter notebooks and kernels, replacing the scattered global variable approach with a unified architecture. """ from typing import Dict, Any, Optional, Callable, Union from types import TracebackType from jupyter_nbmodel_client import NbModelClient, get_notebook_websocket_url from jupyter_kernel_client import KernelClient from .config import get_config class NotebookConnection: """ Context manager for Notebook connections that handles the lifecycle of NbModelClient instances. Note: This is only used in MCP_SERVER mode with remote Jupyter servers that have RTC enabled. In JUPYTER_SERVER mode (local), notebook content is accessed directly via contents_manager. """ def __init__(self, notebook_info: Dict[str, str], is_local: bool = False): self.notebook_info = notebook_info self.is_local = is_local self._notebook: Optional[NbModelClient] = None async def __aenter__(self) -> NbModelClient: """Enter context, establish notebook connection.""" if self.is_local: raise ValueError( "NotebookConnection cannot be used in local/JUPYTER_SERVER mode. " "Cell operations in local mode should use contents_manager directly to read notebook JSON files." ) config = get_config() ws_url = get_notebook_websocket_url( server_url=self.notebook_info.get("server_url", config.document_url), token=self.notebook_info.get("token", config.document_token), path=self.notebook_info.get("path", config.document_id), provider=config.provider ) self._notebook = NbModelClient(ws_url) await self._notebook.__aenter__() return self._notebook async def __aexit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> None: """Exit context, clean up connection.""" if self._notebook: await self._notebook.__aexit__(exc_type, exc_val, exc_tb) class NotebookManager: """ Centralized manager for multiple notebooks and their corresponding kernels. This class replaces the global kernel variable approach with a unified management system that supports both single and multiple notebook scenarios. """ def __init__(self): self._notebooks: Dict[str, Dict[str, Any]] = {} self._default_notebook_name = "default" self._current_notebook: Optional[str] = None # Currently active notebook def __contains__(self, name: str) -> bool: """Check if a notebook is managed by this instance.""" return name in self._notebooks def __iter__(self): """Iterate over notebook name, info pairs.""" return iter(self._notebooks.items()) def add_notebook( self, name: str, kernel: Union[KernelClient, Dict[str, Any]], # Can be KernelClient or dict with kernel metadata server_url: Optional[str] = None, token: Optional[str] = None, path: Optional[str] = None ) -> None: """ Add a notebook to the manager. Args: name: Unique identifier for the notebook kernel: Kernel client instance (MCP_SERVER mode) or kernel metadata dict (JUPYTER_SERVER mode) server_url: Jupyter server URL (optional, uses config default). Use "local" for JUPYTER_SERVER mode. token: Authentication token (optional, uses config default) path: Notebook file path (optional, uses config default) """ config = get_config() # Determine if this is local (JUPYTER_SERVER) mode or HTTP (MCP_SERVER) mode is_local_mode = server_url == "local" self._notebooks[name] = { "kernel": kernel, "is_local": is_local_mode, "notebook_info": { "server_url": server_url or config.document_url, "token": token or config.document_token, "path": path or config.document_id } } # For backward compatibility: if this is the first notebook or it's "default", # set it as the current notebook if self._current_notebook is None or name == self._default_notebook_name: self._current_notebook = name def remove_notebook(self, name: str) -> bool: """ Remove a notebook from the manager. Args: name: Notebook identifier Returns: True if removed successfully, False if not found """ if name in self._notebooks: try: notebook_data = self._notebooks[name] is_local = notebook_data.get("is_local", False) kernel = notebook_data["kernel"] # Only stop kernel if it's an HTTP KernelClient (MCP_SERVER mode) # In JUPYTER_SERVER mode, kernel is just metadata, actual kernel managed elsewhere if not is_local and kernel and hasattr(kernel, 'stop'): kernel.stop() except Exception: # Ignore errors during kernel cleanup pass finally: del self._notebooks[name] # If we removed the current notebook, update the current pointer if self._current_notebook == name: # Set to another notebook if available, prefer "default" for compatibility if self._default_notebook_name in self._notebooks: self._current_notebook = self._default_notebook_name elif self._notebooks: # Set to the first available notebook self._current_notebook = next(iter(self._notebooks.keys())) else: # No notebooks left self._current_notebook = None return True return False def get_kernel(self, name: str) -> Optional[Union[KernelClient, Dict[str, Any]]]: """ Get the kernel for a specific notebook. Args: name: Notebook identifier Returns: Kernel client (MCP_SERVER mode) or kernel metadata dict (JUPYTER_SERVER mode), or None if not found """ if name in self._notebooks: return self._notebooks[name]["kernel"] return None def get_kernel_id(self, name: str) -> Optional[str]: """ Get the kernel ID for a specific notebook. Args: name: Notebook identifier Returns: Kernel ID string or None if not found """ if name in self._notebooks: kernel = self._notebooks[name]["kernel"] # Handle both KernelClient objects and kernel metadata dicts if isinstance(kernel, dict): return kernel.get("id") elif hasattr(kernel, 'kernel_id'): return kernel.kernel_id return None def is_local_notebook(self, name: str) -> bool: """ Check if a notebook is using local (JUPYTER_SERVER) mode. Args: name: Notebook identifier Returns: True if local mode, False otherwise """ if name in self._notebooks: return self._notebooks[name].get("is_local", False) return False def get_notebook_connection(self, name: str) -> NotebookConnection: """ Get a context manager for notebook connection. Args: name: Notebook identifier Returns: NotebookConnection context manager Raises: ValueError: If notebook doesn't exist """ if name not in self._notebooks: raise ValueError(f"Notebook '{name}' does not exist in manager") return NotebookConnection(self._notebooks[name]["notebook_info"]) def restart_notebook(self, name: str) -> bool: """ Restart the kernel for a specific notebook. Args: name: Notebook identifier Returns: True if restarted successfully, False otherwise """ if name in self._notebooks: try: kernel = self._notebooks[name]["kernel"] if kernel and hasattr(kernel, 'restart'): kernel.restart() return True except Exception: return False return False def is_empty(self) -> bool: """Check if the manager is empty (no notebooks).""" return len(self._notebooks) == 0 def ensure_kernel_alive(self, name: str, kernel_factory: Callable[[], KernelClient]) -> KernelClient: """ Ensure a kernel is alive, create if necessary. Args: name: Notebook identifier kernel_factory: Function to create a new kernel Returns: The alive kernel instance """ kernel = self.get_kernel(name) if kernel is None or not hasattr(kernel, 'is_alive') or not kernel.is_alive(): # Create new kernel new_kernel = kernel_factory() self.add_notebook(name, new_kernel) return new_kernel return kernel def set_current_notebook(self, name: str) -> bool: """ Set the currently active notebook. Args: name: Notebook identifier Returns: True if set successfully, False if notebook doesn't exist """ if name in self._notebooks: self._current_notebook = name return True return False def get_current_notebook(self) -> Optional[str]: """ Get the name of the currently active notebook. Returns: Current notebook name or None if no active notebook """ return self._current_notebook def get_current_connection(self) -> NotebookConnection: """ Get the connection for the currently active notebook. For backward compatibility, defaults to "default" if no current notebook is set. Returns: NotebookConnection context manager for the current notebook Raises: ValueError: If no notebooks exist and no default config is available """ current = self._current_notebook or self._default_notebook_name # For backward compatibility: if the requested notebook doesn't exist but we're # asking for default, create a connection using the default config if current not in self._notebooks and current == self._default_notebook_name: # Return a connection using default configuration config = get_config() return NotebookConnection({ "server_url": config.document_url, "token": config.document_token, "path": config.document_id }) return self.get_notebook_connection(current) def get_current_notebook_path(self) -> Optional[str]: """ Get the file path of the currently active notebook. Returns: Notebook file path or None if no active notebook """ current = self._current_notebook or self._default_notebook_name if current in self._notebooks: return self._notebooks[current]["notebook_info"].get("path") return None def list_all_notebooks(self) -> Dict[str, Dict[str, Any]]: """ Get information about all managed notebooks. Returns: Dictionary with notebook names as keys and their info as values """ result = {} for name, notebook_data in self._notebooks.items(): kernel = notebook_data["kernel"] notebook_info = notebook_data["notebook_info"] # Check kernel status kernel_status = "unknown" if kernel: try: kernel_status = "alive" if hasattr(kernel, 'is_alive') and kernel.is_alive() else "dead" except Exception: kernel_status = "error" else: kernel_status = "not_initialized" result[name] = { "path": notebook_info.get("path", ""), "kernel_status": kernel_status, "is_current": name == self._current_notebook } return result ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/use_notebook_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Use notebook tool implementation.""" import logging from typing import Any, Optional, Literal from pathlib import Path from jupyter_server_api import JupyterServerClient, NotFoundError from jupyter_kernel_client import KernelClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager logger = logging.getLogger(__name__) class UseNotebookTool(BaseTool): """Tool to use (connect to or create) a notebook file.""" @property def name(self) -> str: return "use_notebook" @property def description(self) -> str: return """Use a notebook file (connect to existing, create new, or switch to already-connected notebook). Args: notebook_name: Unique identifier for the notebook notebook_path: Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb"). Optional - if not provided, switches to an already-connected notebook with the given name. mode: "connect" to connect to existing, "create" to create new kernel_id: Specific kernel ID to use (optional, will create new if not provided) Returns: str: Success message with notebook information""" async def _check_path_http( self, server_client: JupyterServerClient, notebook_path: str, mode: str ) -> tuple[bool, Optional[str]]: """Check if path exists using HTTP API.""" path = Path(notebook_path) try: parent_path = path.parent.as_posix() if path.parent.as_posix() != "." else "" if parent_path: dir_contents = server_client.contents.list_directory(parent_path) else: dir_contents = server_client.contents.list_directory("") if mode == "connect": file_exists = any(file.name == path.name for file in dir_contents) if not file_exists: return False, f"'{notebook_path}' not found in jupyter server, please check the notebook already exists." return True, None except NotFoundError: parent_dir = path.parent.as_posix() if path.parent.as_posix() != "." else "root directory" return False, f"'{parent_dir}' not found in jupyter server, please check the directory path already exists." except Exception as e: return False, f"Failed to check the path '{notebook_path}': {e}" async def _check_path_local( self, contents_manager: Any, notebook_path: str, mode: str ) -> tuple[bool, Optional[str]]: """Check if path exists using local contents_manager API.""" path = Path(notebook_path) try: parent_path = str(path.parent) if str(path.parent) != "." else "" # Get directory contents using local API model = await contents_manager.get(parent_path, content=True, type='directory') if mode == "connect": file_exists = any(item['name'] == path.name for item in model.get('content', [])) if not file_exists: return False, f"'{notebook_path}' not found in jupyter server, please check the notebook already exists." return True, None except Exception as e: parent_dir = str(path.parent) if str(path.parent) != "." else "root directory" return False, f"'{parent_dir}' not found in jupyter server: {e}" async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, session_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, # Tool-specific parameters notebook_name: str = None, notebook_path: Optional[str] = None, use_mode: Literal["connect", "create"] = "connect", kernel_id: Optional[str] = None, runtime_url: Optional[str] = None, runtime_token: Optional[str] = None, **kwargs ) -> str: """Execute the use_notebook tool. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: HTTP client for MCP_SERVER mode contents_manager: Direct API access for JUPYTER_SERVER mode kernel_manager: Direct kernel manager for JUPYTER_SERVER mode session_manager: Session manager for creating kernel-notebook associations notebook_manager: Notebook manager instance notebook_name: Unique identifier for the notebook notebook_path: Path to the notebook file (optional, if not provided switches to existing notebook) use_mode: "connect" or "create" kernel_id: Optional specific kernel ID runtime_url: Runtime URL for HTTP mode runtime_token: Runtime token for HTTP mode **kwargs: Additional parameters Returns: Success message with notebook information """ # If no notebook_path provided, switch to already-connected notebook if notebook_path is None: if notebook_name not in notebook_manager: return f"Notebook '{notebook_name}' is not connected. Please provide a notebook_path to connect to it first." # Switch to the existing notebook notebook_manager.set_current_notebook(notebook_name) return f"Successfully switched to notebook '{notebook_name}'." # Rest of the logic for connecting/creating new notebooks if notebook_name in notebook_manager: return f"Notebook '{notebook_name}' is already using. Use unuse_notebook first if you want to reconnect." # Check server connectivity (HTTP mode only) if mode == ServerMode.MCP_SERVER and server_client is not None: try: server_client.get_status() except Exception as e: return f"Failed to connect the Jupyter server: {e}" # Check the path exists if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: path_ok, error_msg = await self._check_path_local(contents_manager, notebook_path, use_mode) elif mode == ServerMode.MCP_SERVER and server_client is not None: path_ok, error_msg = await self._check_path_http(server_client, notebook_path, use_mode) else: return f"Invalid mode or missing required clients: mode={mode}" if not path_ok: return error_msg # Check kernel if kernel_id provided (HTTP mode only for now) if kernel_id and mode == ServerMode.MCP_SERVER and server_client is not None: kernels = server_client.kernels.list_kernels() kernel_exists = any(kernel.id == kernel_id for kernel in kernels) if not kernel_exists: return f"Kernel '{kernel_id}' not found in jupyter server, please check the kernel already exists." # Create notebook if needed if use_mode == "create": content = { "cells": [{ "cell_type": "markdown", "metadata": {}, "source": [ "New Notebook Created by Jupyter MCP Server", ] }], "metadata": {}, "nbformat": 4, "nbformat_minor": 4 } if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # Use local API to create notebook await contents_manager.new(model={'type': 'notebook'}, path=notebook_path) elif mode == ServerMode.MCP_SERVER and server_client is not None: server_client.contents.create_notebook(notebook_path, content=content) # Create/connect to kernel based on mode if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None: # JUPYTER_SERVER mode: Use local kernel manager API directly if kernel_id: # Connect to existing kernel - verify it exists if kernel_id not in kernel_manager: return f"Kernel '{kernel_id}' not found in local kernel manager." kernel_info = {"id": kernel_id} else: # Start a new kernel using local API kernel_id = await kernel_manager.start_kernel() logger.info(f"Started kernel '{kernel_id}', waiting for it to be ready...") # CRITICAL: Wait for the kernel to actually start and be ready # The start_kernel() call returns immediately, but kernel takes time to start import asyncio max_wait_time = 30 # seconds wait_interval = 0.5 # seconds elapsed = 0 kernel_ready = False while elapsed < max_wait_time: try: # Get kernel model to check its state kernel_model = kernel_manager.get_kernel(kernel_id) if kernel_model is not None: # Kernel exists, check if it's ready # In Jupyter, we can try to get connection info which indicates readiness try: kernel_manager.get_connection_info(kernel_id) kernel_ready = True logger.info(f"Kernel '{kernel_id}' is ready (took {elapsed:.1f}s)") break except: # Connection info not available yet, kernel still starting pass except Exception as e: logger.debug(f"Waiting for kernel to start: {e}") await asyncio.sleep(wait_interval) elapsed += wait_interval if not kernel_ready: logger.warning(f"Kernel '{kernel_id}' may not be fully ready after {max_wait_time}s wait") kernel_info = {"id": kernel_id} # Create a Jupyter session to associate the kernel with the notebook # This is CRITICAL for JupyterLab to recognize the kernel-notebook connection if session_manager is not None: try: # create_session is an async method, so we await it directly session_dict = await session_manager.create_session( path=notebook_path, kernel_id=kernel_id, type="notebook", name=notebook_path ) logger.info(f"Created Jupyter session '{session_dict.get('id')}' for notebook '{notebook_path}' with kernel '{kernel_id}'") except Exception as e: logger.warning(f"Failed to create Jupyter session: {e}. Notebook may not be properly connected in JupyterLab UI.") else: logger.warning("No session_manager available. Notebook may not be properly connected in JupyterLab UI.") # For JUPYTER_SERVER mode, store kernel info (not KernelClient object) # The actual kernel is managed by kernel_manager notebook_manager.add_notebook( notebook_name, kernel_info, # Store kernel metadata, not client object server_url="local", # Indicate local mode token=None, path=notebook_path ) elif mode == ServerMode.MCP_SERVER and runtime_url: # MCP_SERVER mode: Use HTTP-based kernel client kernel = KernelClient( server_url=runtime_url, token=runtime_token, kernel_id=kernel_id ) kernel.start() # Add notebook to manager with HTTP client notebook_manager.add_notebook( notebook_name, kernel, server_url=runtime_url, token=runtime_token, path=notebook_path ) else: return f"Invalid configuration: mode={mode}, runtime_url={runtime_url}, kernel_manager={kernel_manager is not None}" notebook_manager.set_current_notebook(notebook_name) # Return message based on mode if use_mode == "create": return f"Successfully created and using notebook '{notebook_name}' at path '{notebook_path}' in {mode.value} mode." else: return f"Successfully using notebook '{notebook_name}' at path '{notebook_path}' in {mode.value} mode." ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/insert_execute_code_cell_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Insert and execute code cell tool implementation.""" import asyncio import logging from pathlib import Path from typing import Any, Optional, List, Union from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager from jupyter_mcp_server.utils import get_current_notebook_context, safe_extract_outputs, execute_via_execution_stack from mcp.types import ImageContent logger = logging.getLogger(__name__) class InsertExecuteCodeCellTool(BaseTool): """Tool to insert and execute a code cell.""" @property def name(self) -> str: return "insert_execute_code_cell" @property def description(self) -> str: return """Insert and execute a code cell in a Jupyter notebook. Args: cell_index: Index of the cell to insert (0-based). Use -1 to append at end and execute. cell_source: Code source Returns: list[Union[str, ImageContent]]: List of outputs from the executed cell""" async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str): """Get the YNotebook document if it's currently open in a collaborative session.""" try: yroom_manager = serverapp.web_app.settings.get("yroom_manager") if yroom_manager is None: return None room_id = f"json:notebook:{file_id}" if yroom_manager.has_room(room_id): yroom = yroom_manager.get_room(room_id) notebook = await yroom.get_jupyter_ydoc() return notebook except Exception: pass return None async def _insert_execute_ydoc( self, serverapp: Any, notebook_path: str, cell_index: int, cell_source: str, kernel_manager, kernel_id: str, safe_extract_outputs_fn ) -> List[Union[str, ImageContent]]: """Insert and execute cell using YDoc (collaborative editing mode).""" # Get file_id from file_id_manager file_id_manager = serverapp.web_app.settings.get("file_id_manager") if file_id_manager is None: raise RuntimeError("file_id_manager not available in serverapp") file_id = file_id_manager.get_id(notebook_path) # Try to get YDoc ydoc = await self._get_jupyter_ydoc(serverapp, file_id) if ydoc: # Notebook is open in collaborative mode, use YDoc total_cells = len(ydoc.ycells) actual_index = cell_index if cell_index != -1 else total_cells if actual_index < 0 or actual_index > total_cells: raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end." ) # Create and insert the cell cell = { "cell_type": "code", "source": cell_source, } ycell = ydoc.create_ycell(cell) if actual_index >= total_cells: ydoc.ycells.append(ycell) else: ydoc.ycells.insert(actual_index, ycell) # Get the inserted cell's ID for RTC metadata inserted_cell_id = ycell.get("id") # Build document_id for RTC (format: json:notebook:<file_id>) document_id = f"json:notebook:{file_id}" # Execute the cell using ExecutionStack with RTC metadata # This will automatically update the cell outputs in the YDoc return await execute_via_execution_stack( serverapp, kernel_id, cell_source, document_id=document_id, cell_id=inserted_cell_id, timeout=300, logger=logger ) else: # YDoc not available - use file operations + direct kernel execution # This path is used when notebook is not open in JupyterLab but we still have kernel access logger.info("YDoc not available, using file operations + ExecutionStack execution fallback") # Insert cell using file operations from jupyter_mcp_server.tools.insert_cell_tool import InsertCellTool insert_tool = InsertCellTool() # Call the file-based insertion method directly await insert_tool._insert_cell_file(notebook_path, cell_index, "code", cell_source) # Calculate actual index where cell was inserted import nbformat with open(notebook_path, 'r', encoding='utf-8') as f: notebook = nbformat.read(f, as_version=4) total_cells = len(notebook.cells) actual_index = cell_index if cell_index != -1 else total_cells - 1 # Then execute directly via ExecutionStack (without RTC metadata since notebook not open) outputs = await execute_via_execution_stack( serverapp, kernel_id, cell_source, timeout=300, logger=logger ) # CRITICAL: Write outputs back to the notebook file so they're visible in UI logger.info(f"Writing {len(outputs)} outputs back to notebook cell {actual_index}") await self._write_outputs_to_cell(notebook_path, actual_index, outputs) return outputs async def _insert_execute_websocket( self, notebook_manager: NotebookManager, cell_index: int, cell_source: str, ensure_kernel_alive ) -> List[Union[str, ImageContent]]: """Insert and execute cell using WebSocket connection (MCP_SERVER mode).""" # Ensure kernel is alive if ensure_kernel_alive: kernel = ensure_kernel_alive() else: # Fallback: get kernel from notebook_manager current_notebook = notebook_manager.get_current_notebook() or "default" kernel = notebook_manager.get_kernel(current_notebook) if not kernel: raise RuntimeError("No kernel available for execution") async with notebook_manager.get_current_connection() as notebook: actual_index = cell_index if cell_index != -1 else len(notebook) if actual_index < 0 or actual_index > len(notebook): raise ValueError(f"Cell index {cell_index} out of range") notebook.insert_cell(actual_index, cell_source, "code") notebook.execute_cell(actual_index, kernel) outputs = notebook[actual_index].get("outputs", []) return safe_extract_outputs(outputs) async def _write_outputs_to_cell( self, notebook_path: str, cell_index: int, outputs: List[Union[str, ImageContent]] ): """Write execution outputs back to a notebook cell. This is critical for making outputs visible in JupyterLab when using file-based execution (when YDoc/RTC is not available). Args: notebook_path: Path to the notebook file cell_index: Index of the cell to update outputs: List of output strings or ImageContent objects """ import nbformat from jupyter_mcp_server.utils import _clean_notebook_outputs # Read the notebook with open(notebook_path, 'r', encoding='utf-8') as f: notebook = nbformat.read(f, as_version=4) # Clean any transient fields _clean_notebook_outputs(notebook) if cell_index < 0 or cell_index >= len(notebook.cells): logger.warning(f"Cell index {cell_index} out of range, cannot write outputs") return cell = notebook.cells[cell_index] if cell.cell_type != 'code': logger.warning(f"Cell {cell_index} is not a code cell, cannot write outputs") return # Convert formatted outputs to nbformat structure cell.outputs = [] for output in outputs: if isinstance(output, ImageContent): # Image output cell.outputs.append(nbformat.v4.new_output( output_type='display_data', data={output.mimeType: output.data}, metadata={} )) elif isinstance(output, str): # Text output - determine if it's an error or regular output if output.startswith('[ERROR:') or output.startswith('[TIMEOUT ERROR:'): # Error output cell.outputs.append(nbformat.v4.new_output( output_type='stream', name='stderr', text=output )) else: # Regular output (assume execute_result for simplicity) cell.outputs.append(nbformat.v4.new_output( output_type='execute_result', data={'text/plain': output}, metadata={}, execution_count=None )) # Update execution count max_count = 0 for c in notebook.cells: if c.cell_type == 'code' and c.execution_count: max_count = max(max_count, c.execution_count) cell.execution_count = max_count + 1 # Write back to file with open(notebook_path, 'w', encoding='utf-8') as f: nbformat.write(notebook, f) logger.info(f"Wrote {len(outputs)} outputs to cell {cell_index} in {notebook_path}") async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, # Tool-specific parameters cell_index: int = None, cell_source: str = None, # Helper function passed from server.py ensure_kernel_alive = None, **kwargs ) -> List[Union[str, ImageContent]]: """Execute the insert_execute_code_cell tool. Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) kernel_manager: Kernel manager for JUPYTER_SERVER mode notebook_manager: Notebook manager instance cell_index: Index to insert cell (0-based, -1 to append) cell_source: Code source ensure_kernel_alive: Function to ensure kernel is alive **kwargs: Additional parameters Returns: List of outputs from the executed cell """ if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None: # JUPYTER_SERVER mode: Use YDoc and kernel_manager from jupyter_mcp_server.jupyter_extension.context import get_server_context from jupyter_mcp_server.config import get_config context = get_server_context() serverapp = context.serverapp notebook_path, kernel_id = get_current_notebook_context(notebook_manager) # Resolve to absolute path FIRST if serverapp and not Path(notebook_path).is_absolute(): root_dir = serverapp.root_dir notebook_path = str(Path(root_dir) / notebook_path) if kernel_id is None: # No kernel available - start a new one on demand logger.info("No kernel_id available, starting new kernel for insert_execute_code_cell") kernel_id = await kernel_manager.start_kernel() # Wait a bit for kernel to initialize await asyncio.sleep(1.0) logger.info(f"Kernel {kernel_id} started and initialized") # Store the kernel with ABSOLUTE path in notebook_manager if notebook_manager is not None: kernel_info = {"id": kernel_id} notebook_manager.add_notebook( name=notebook_path, kernel=kernel_info, server_url="local", path=notebook_path ) if serverapp: return await self._insert_execute_ydoc( serverapp, notebook_path, cell_index, cell_source, kernel_manager, kernel_id, safe_extract_outputs ) else: raise RuntimeError("serverapp not available in JUPYTER_SERVER mode") elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: # MCP_SERVER mode: Use WebSocket connection return await self._insert_execute_websocket( notebook_manager, cell_index, cell_source, ensure_kernel_alive ) else: raise ValueError(f"Invalid mode or missing required clients: mode={mode}") ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/jupyter_extension/backends/local_backend.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ Local Backend Implementation This backend uses the Jupyter Server's local API directly when running as an extension. It provides efficient local access to contents_manager and kernel_manager. """ from typing import Optional, Any, Union, Literal, TYPE_CHECKING import asyncio from mcp.types import ImageContent from jupyter_mcp_server.jupyter_extension.backends.base import Backend from jupyter_mcp_server.utils import safe_extract_outputs if TYPE_CHECKING: from jupyter_server.serverapp import ServerApp class LocalBackend(Backend): """ Backend that uses local Jupyter Server API directly. Uses: - serverapp.contents_manager for notebook file operations - serverapp.kernel_manager for kernel management - serverapp.kernel_spec_manager for kernel specs This backend is only available when running as a Jupyter Server extension with document_url="local" or runtime_url="local". """ def __init__(self, serverapp: 'ServerApp'): """ Initialize local backend with direct serverapp access. Args: serverapp: Jupyter ServerApp instance """ self.serverapp = serverapp self.contents_manager = serverapp.contents_manager self.kernel_manager = serverapp.kernel_manager self.kernel_spec_manager = serverapp.kernel_spec_manager # Notebook operations async def get_notebook_content(self, path: str) -> dict[str, Any]: """ Get notebook content using local contents_manager. Args: path: Path to notebook file Returns: Notebook content dictionary """ model = await asyncio.to_thread( self.contents_manager.get, path, type='notebook', content=True ) return model['content'] async def list_notebooks(self, path: str = "") -> list[str]: """ List all notebooks recursively using local contents_manager. Args: path: Directory path to search Returns: List of notebook paths """ notebooks = [] await self._list_notebooks_recursive(path, notebooks) return notebooks async def _list_notebooks_recursive(self, path: str, notebooks: list[str]) -> None: """Helper to recursively list notebooks.""" try: model = await asyncio.to_thread( self.contents_manager.get, path, content=True ) if model['type'] == 'directory': for item in model['content']: item_path = f"{path}/{item['name']}" if path else item['name'] if item['type'] == 'directory': await self._list_notebooks_recursive(item_path, notebooks) elif item['type'] == 'notebook' or item['name'].endswith('.ipynb'): notebooks.append(item_path) except Exception: # Skip directories we can't access pass async def notebook_exists(self, path: str) -> bool: """ Check if notebook exists using local contents_manager. Args: path: Path to notebook Returns: True if exists """ try: await asyncio.to_thread( self.contents_manager.get, path, content=False ) return True except Exception: return False async def create_notebook(self, path: str) -> dict[str, Any]: """ Create a new notebook using local contents_manager. Args: path: Path for new notebook Returns: Created notebook content """ model = await asyncio.to_thread( self.contents_manager.new, path=path ) return model['content'] # Cell operations async def read_cells( self, path: str, start_index: Optional[int] = None, end_index: Optional[int] = None ) -> list[dict[str, Any]]: """ Read cells from notebook. Args: path: Notebook path start_index: Start index end_index: End index Returns: List of cells """ content = await self.get_notebook_content(path) cells = content.get('cells', []) if start_index is not None or end_index is not None: start = start_index or 0 end = end_index if end_index is not None else len(cells) cells = cells[start:end] return cells async def append_cell( self, path: str, cell_type: Literal["code", "markdown"], source: Union[str, list[str]] ) -> int: """ Append a cell to notebook. Args: path: Notebook path cell_type: Cell type source: Cell source Returns: Index of appended cell """ content = await self.get_notebook_content(path) cells = content.get('cells', []) # Normalize source to list of strings if isinstance(source, str): source = source.splitlines(keepends=True) new_cell = { 'cell_type': cell_type, 'metadata': {}, 'source': source } if cell_type == 'code': new_cell['outputs'] = [] new_cell['execution_count'] = None cells.append(new_cell) content['cells'] = cells # Save updated notebook await asyncio.to_thread( self.contents_manager.save, { 'type': 'notebook', 'content': content }, path ) return len(cells) - 1 async def insert_cell( self, path: str, cell_index: int, cell_type: Literal["code", "markdown"], source: Union[str, list[str]] ) -> int: """ Insert a cell at specific index. Args: path: Notebook path cell_index: Insert position cell_type: Cell type source: Cell source Returns: Index of inserted cell """ content = await self.get_notebook_content(path) cells = content.get('cells', []) # Normalize source if isinstance(source, str): source = source.splitlines(keepends=True) new_cell = { 'cell_type': cell_type, 'metadata': {}, 'source': source } if cell_type == 'code': new_cell['outputs'] = [] new_cell['execution_count'] = None cells.insert(cell_index, new_cell) content['cells'] = cells # Save updated notebook await asyncio.to_thread( self.contents_manager.save, { 'type': 'notebook', 'content': content }, path ) return cell_index async def delete_cell(self, path: str, cell_index: int) -> None: """ Delete a cell from notebook. Args: path: Notebook path cell_index: Index to delete """ content = await self.get_notebook_content(path) cells = content.get('cells', []) if 0 <= cell_index < len(cells): cells.pop(cell_index) content['cells'] = cells await asyncio.to_thread( self.contents_manager.save, { 'type': 'notebook', 'content': content }, path ) async def overwrite_cell( self, path: str, cell_index: int, new_source: Union[str, list[str]] ) -> tuple[str, str]: """ Overwrite cell content. Args: path: Notebook path cell_index: Cell index new_source: New source Returns: Tuple of (old_source, new_source) """ content = await self.get_notebook_content(path) cells = content.get('cells', []) if cell_index < 0 or cell_index >= len(cells): raise ValueError(f"Cell index {cell_index} out of range") cell = cells[cell_index] old_source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source'] # Normalize new source if isinstance(new_source, str): new_source_str = new_source new_source = new_source.splitlines(keepends=True) else: new_source_str = ''.join(new_source) cell['source'] = new_source content['cells'] = cells await asyncio.to_thread( self.contents_manager.save, { 'type': 'notebook', 'content': content }, path ) return (old_source, new_source_str) # Kernel operations async def get_or_create_kernel(self, path: str, kernel_id: Optional[str] = None) -> str: """ Get existing kernel or create new one. Args: path: Notebook path (for context) kernel_id: Specific kernel ID Returns: Kernel ID """ if kernel_id and kernel_id in self.kernel_manager: return kernel_id # Start new kernel kernel_id = await self.kernel_manager.start_kernel() return kernel_id async def execute_cell( self, path: str, cell_index: int, kernel_id: str, timeout_seconds: int = 300 ) -> list[Union[str, ImageContent]]: """ Execute a cell using local kernel manager. Args: path: Notebook path cell_index: Cell index kernel_id: Kernel ID timeout_seconds: Timeout Returns: List of outputs """ # Get cell source cells = await self.read_cells(path) if cell_index < 0 or cell_index >= len(cells): raise ValueError(f"Cell index {cell_index} out of range") cell = cells[cell_index] source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source'] # Get kernel client kernel = self.kernel_manager.get_kernel(kernel_id) client = kernel.client() # Execute code msg_id = client.execute(source) # Collect outputs outputs = [] start_time = asyncio.get_event_loop().time() while True: if asyncio.get_event_loop().time() - start_time > timeout_seconds: raise TimeoutError(f"Cell execution exceeded {timeout_seconds} seconds") try: msg = await asyncio.wait_for( asyncio.to_thread(client.get_iopub_msg, timeout=1), timeout=2 ) msg_type = msg['header']['msg_type'] if msg_type == 'status': if msg['content']['execution_state'] == 'idle': break elif msg_type in ['execute_result', 'display_data']: outputs.append(msg['content']) elif msg_type == 'stream': outputs.append(msg['content']) elif msg_type == 'error': outputs.append(msg['content']) except asyncio.TimeoutError: continue except Exception: break # Update cell with outputs content = await self.get_notebook_content(path) if cell_index < len(content['cells']): content['cells'][cell_index]['outputs'] = outputs await asyncio.to_thread( self.contents_manager.save, {'type': 'notebook', 'content': content}, path ) return safe_extract_outputs(outputs) async def interrupt_kernel(self, kernel_id: str) -> None: """Interrupt a kernel.""" if kernel_id in self.kernel_manager: kernel = self.kernel_manager.get_kernel(kernel_id) await kernel.interrupt() async def restart_kernel(self, kernel_id: str) -> None: """Restart a kernel.""" if kernel_id in self.kernel_manager: await self.kernel_manager.restart_kernel(kernel_id) async def shutdown_kernel(self, kernel_id: str) -> None: """Shutdown a kernel.""" if kernel_id in self.kernel_manager: await self.kernel_manager.shutdown_kernel(kernel_id) async def list_kernels(self) -> list[dict[str, Any]]: """List all running kernels.""" return [ { 'id': kid, 'name': self.kernel_manager.get_kernel(kid).kernel_name } for kid in self.kernel_manager.list_kernel_ids() ] async def kernel_exists(self, kernel_id: str) -> bool: """Check if kernel exists.""" return kernel_id in self.kernel_manager ``` -------------------------------------------------------------------------------- /jupyter_mcp_server/tools/insert_cell_tool.py: -------------------------------------------------------------------------------- ```python # Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """Insert cell tool implementation.""" from typing import Any, Optional, Literal from pathlib import Path import nbformat from jupyter_server_api import JupyterServerClient from jupyter_mcp_server.tools._base import BaseTool, ServerMode from jupyter_mcp_server.notebook_manager import NotebookManager from jupyter_mcp_server.utils import get_current_notebook_context from jupyter_mcp_server.utils import get_surrounding_cells_info class InsertCellTool(BaseTool): """Tool to insert a cell at a specified position.""" @property def name(self) -> str: return "insert_cell" @property def description(self) -> str: return """Insert a cell to specified position. Args: cell_index: target index for insertion (0-based). Use -1 to append at end. cell_type: Type of cell to insert ("code" or "markdown") cell_source: Source content for the cell Returns: str: Success message and the structure of its surrounding cells (up to 5 cells above and 5 cells below)""" async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str): """Get the YNotebook document if it's currently open in a collaborative session. This follows the jupyter_ai_tools pattern of accessing YDoc through the yroom_manager when the notebook is actively being edited. Args: serverapp: The Jupyter ServerApp instance file_id: The file ID for the document Returns: YNotebook instance or None if not in a collaborative session """ try: yroom_manager = serverapp.web_app.settings.get("yroom_manager") if yroom_manager is None: return None room_id = f"json:notebook:{file_id}" if yroom_manager.has_room(room_id): yroom = yroom_manager.get_room(room_id) notebook = await yroom.get_jupyter_ydoc() return notebook except Exception: # YDoc not available, will fall back to file operations pass return None async def _insert_cell_ydoc( self, serverapp: Any, notebook_path: str, cell_index: int, cell_type: Literal["code", "markdown"], cell_source: str ) -> str: """Insert cell using YDoc (collaborative editing mode). Args: serverapp: Jupyter ServerApp instance notebook_path: Path to the notebook cell_index: Index to insert at (-1 for append) cell_type: Type of cell to insert cell_source: Source content for the cell Returns: Success message with surrounding cells info """ # Get file_id from file_id_manager file_id_manager = serverapp.web_app.settings.get("file_id_manager") if file_id_manager is None: raise RuntimeError("file_id_manager not available in serverapp") file_id = file_id_manager.get_id(notebook_path) # Try to get YDoc ydoc = await self._get_jupyter_ydoc(serverapp, file_id) if ydoc: # Notebook is open in collaborative mode, use YDoc total_cells = len(ydoc.ycells) actual_index = cell_index if cell_index != -1 else total_cells if actual_index < 0 or actual_index > total_cells: raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end." ) # Create the cell cell = { "cell_type": cell_type, "source": "", } ycell = ydoc.create_ycell(cell) # Insert at the specified position if actual_index >= total_cells: ydoc.ycells.append(ycell) else: ydoc.ycells.insert(actual_index, ycell) # Write content to the cell collaboratively if cell_source: # Set the source directly on the ycell ycell["source"] = cell_source # Get surrounding cells info (simplified version for YDoc) new_total_cells = len(ydoc.ycells) surrounding_info = self._get_surrounding_cells_info_ydoc(ydoc, actual_index, new_total_cells) return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}" else: # YDoc not available, use file operations return await self._insert_cell_file(notebook_path, cell_index, cell_type, cell_source) def _get_surrounding_cells_info_ydoc(self, ydoc, center_index: int, total_cells: int) -> str: """Get info about surrounding cells from YDoc.""" lines = [] start_index = max(0, center_index - 5) end_index = min(total_cells, center_index + 6) for i in range(start_index, end_index): cell = ydoc.ycells[i] cell_type = cell.get("cell_type", "unknown") source = cell.get("source", "") if isinstance(source, list): source = "".join(source) first_line = source.split('\n')[0][:50] if source else "(empty)" marker = " <-- NEW" if i == center_index else "" lines.append(f" [{i}] {cell_type}: {first_line}{marker}") return "\n".join(lines) async def _insert_cell_file( self, notebook_path: str, cell_index: int, cell_type: Literal["code", "markdown"], cell_source: str ) -> str: """Insert cell using file operations (non-collaborative mode). Args: notebook_path: Absolute path to the notebook cell_index: Index to insert at (-1 for append) cell_type: Type of cell to insert cell_source: Source content for the cell Returns: Success message with surrounding cells info """ # Read notebook file with open(notebook_path, "r", encoding="utf-8") as f: # Read as version 4 (latest) to ensure consistency and support for cell IDs notebook = nbformat.read(f, as_version=4) # Clean any transient fields from existing outputs (kernel protocol field not in nbformat schema) self._clean_notebook_outputs(notebook) total_cells = len(notebook.cells) actual_index = cell_index if cell_index != -1 else total_cells if actual_index < 0 or actual_index > total_cells: raise ValueError( f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end." ) # Create and insert the cell if cell_type == "code": new_cell = nbformat.v4.new_code_cell(source=cell_source or "") elif cell_type == "markdown": new_cell = nbformat.v4.new_markdown_cell(source=cell_source or "") else: raise ValueError(f"Invalid cell_type: {cell_type}. Must be 'code' or 'markdown'.") notebook.cells.insert(actual_index, new_cell) # Write back to file with open(notebook_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) # Get surrounding cells info new_total_cells = len(notebook.cells) surrounding_info = self._get_surrounding_cells_info_file(notebook, actual_index, new_total_cells) return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}" def _clean_notebook_outputs(self, notebook): """Remove transient fields from all cell outputs. The 'transient' field is part of the Jupyter kernel messaging protocol but is NOT part of the nbformat schema. This causes validation errors. Args: notebook: nbformat notebook object to clean (modified in place) """ # Clean transient fields from outputs for cell in notebook.cells: if cell.cell_type == 'code' and hasattr(cell, 'outputs'): for output in cell.outputs: if isinstance(output, dict) and 'transient' in output: del output['transient'] def _get_surrounding_cells_info_file(self, notebook, center_index: int, total_cells: int) -> str: """Get info about surrounding cells from nbformat notebook.""" lines = [] start_index = max(0, center_index - 5) end_index = min(total_cells, center_index + 6) for i in range(start_index, end_index): cell = notebook.cells[i] cell_type = cell.cell_type source = cell.source first_line = source.split('\n')[0][:50] if source else "(empty)" marker = " <-- NEW" if i == center_index else "" lines.append(f" [{i}] {cell_type}: {first_line}{marker}") return "\n".join(lines) async def _insert_cell_websocket( self, notebook_manager: NotebookManager, cell_index: int, cell_type: Literal["code", "markdown"], cell_source: str ) -> str: """Insert cell using WebSocket connection (MCP_SERVER mode). Args: notebook_manager: Notebook manager instance cell_index: Index to insert at (-1 for append) cell_type: Type of cell to insert cell_source: Source content for the cell Returns: Success message with surrounding cells info """ async with notebook_manager.get_current_connection() as notebook: actual_index = cell_index if cell_index != -1 else len(notebook) if actual_index < 0 or actual_index > len(notebook): raise ValueError(f"Cell index {cell_index} out of range") notebook.insert_cell(actual_index, cell_source, cell_type) # Get surrounding cells info new_total_cells = len(notebook) surrounding_info = get_surrounding_cells_info(notebook, actual_index, new_total_cells) return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}" async def execute( self, mode: ServerMode, server_client: Optional[JupyterServerClient] = None, kernel_client: Optional[Any] = None, contents_manager: Optional[Any] = None, kernel_manager: Optional[Any] = None, kernel_spec_manager: Optional[Any] = None, notebook_manager: Optional[NotebookManager] = None, # Tool-specific parameters cell_index: int = None, cell_type: Literal["code", "markdown"] = None, cell_source: str = None, **kwargs ) -> str: """Execute the insert_cell tool. This tool supports three modes of operation: 1. JUPYTER_SERVER mode with YDoc (collaborative): - Checks if notebook is open in a collaborative session - Uses YDoc for real-time collaborative editing - Changes are immediately visible to all connected users 2. JUPYTER_SERVER mode without YDoc (file-based): - Falls back to direct file operations using nbformat - Suitable when notebook is not actively being edited 3. MCP_SERVER mode (WebSocket): - Uses WebSocket connection to remote Jupyter server - Accesses YDoc through NbModelClient Args: mode: Server mode (MCP_SERVER or JUPYTER_SERVER) server_client: HTTP client for MCP_SERVER mode contents_manager: Direct API access for JUPYTER_SERVER mode notebook_manager: Notebook manager instance cell_index: Target index for insertion (0-based, -1 to append) cell_type: Type of cell ("code" or "markdown") cell_source: Source content for the cell **kwargs: Additional parameters Returns: Success message with surrounding cells info """ if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None: # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations from jupyter_mcp_server.jupyter_extension.context import get_server_context context = get_server_context() serverapp = context.serverapp notebook_path, _ = get_current_notebook_context(notebook_manager) # Resolve to absolute path if serverapp and not Path(notebook_path).is_absolute(): root_dir = serverapp.root_dir notebook_path = str(Path(root_dir) / notebook_path) if serverapp: # Try YDoc approach first return await self._insert_cell_ydoc(serverapp, notebook_path, cell_index, cell_type, cell_source) else: # Fall back to file operations return await self._insert_cell_file(notebook_path, cell_index, cell_type, cell_source) elif mode == ServerMode.MCP_SERVER and notebook_manager is not None: # MCP_SERVER mode: Use WebSocket connection return await self._insert_cell_websocket(notebook_manager, cell_index, cell_type, cell_source) else: raise ValueError(f"Invalid mode or missing required clients: mode={mode}") ``` -------------------------------------------------------------------------------- /docs/src/css/custom.css: -------------------------------------------------------------------------------- ```css /* * Copyright (c) 2023-2024 Datalayer, Inc. * * BSD 3-Clause License */ /* stylelint-disable docusaurus/copyright-header */ /** * Any CSS included here will be global. The classic template * bundles Infima by default. Infima is a CSS framework designed to * work well for content-centric websites. */ /* You can override the default Infima variables here. */ :root { --ifm-color-primary: #25c2a0; --ifm-color-primary-dark: rgb(33, 175, 144); --ifm-color-primary-darker: rgb(31, 165, 136); --ifm-color-primary-darkest: rgb(26, 136, 112); --ifm-color-primary-light: rgb(70, 203, 174); --ifm-color-primary-lighter: rgb(102, 212, 189); --ifm-color-primary-lightest: rgb(146, 224, 208); --ifm-code-font-size: 95%; } .docusaurus-highlight-code-line { background-color: rgb(72, 77, 91); display: block; margin: 0 calc(-1 * var(--ifm-pre-padding)); padding: 0 var(--ifm-pre-padding); } .header-datalayer-io-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='none' aria-hidden='true' viewBox='0 0 20 20'%3E%3Cpath fill='%232ECC71' d='M0 0h20v4H0zm0 0'/%3E%3Cpath fill='%231ABC9C' d='M0 8h20v4H0zm0 0'/%3E%3Cpath fill='%2316A085' d='M0 16h20v4H0zm0 0'/%3E%3C/svg%3E%0A") no-repeat; } .header-datalayer-io-link:hover { opacity: 0.6; } [data-theme='dark'] .header-datalayer-io-link::before { background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='none' aria-hidden='true' viewBox='0 0 20 20'%3E%3Cpath fill='%232ECC71' d='M0 0h20v4H0zm0 0'/%3E%3Cpath fill='%231ABC9C' d='M0 8h20v4H0zm0 0'/%3E%3Cpath fill='%2316A085' d='M0 16h20v4H0zm0 0'/%3E%3C/svg%3E%0A") no-repeat; } .header-github-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%3C%3Fxml version='1.0' encoding='UTF-8' standalone='no'%3F%3E%3Csvg viewBox='0 0 80 80' version='1.1' id='svg4' xmlns='http://www.w3.org/2000/svg' xmlns:svg='http://www.w3.org/2000/svg'%3E%3Cdefs id='defs8' /%3E%3Cpath fill='%23959da5' d='M 40,0 C 17.9,0 0,17.900001 0,40 c 0,17.7 11.45,32.65 27.35,37.950001 2,0.35 2.75,-0.85 2.75,-1.9 0,-0.95 -0.05,-4.1 -0.05,-7.45 C 20,70.45 17.4,66.15 16.6,63.9 16.15,62.75 14.2,59.2 12.5,58.25 11.1,57.5 9.1,55.65 12.45,55.600001 c 3.15,-0.05 5.4,2.899999 6.15,4.1 3.6,6.05 9.35,4.35 11.65,3.3 0.35,-2.6 1.4,-4.35 2.55,-5.35 -8.9,-1 -18.2,-4.45 -18.2,-19.75 0,-4.35 1.55,-7.95 4.1,-10.75 -0.4,-1 -1.8,-5.1 0.4,-10.6 0,0 3.35,-1.05 11,4.1 3.2,-0.9 6.6,-1.35 10,-1.35 3.4,0 6.8,0.45 10,1.35 7.65,-5.2 11,-4.1 11,-4.1 2.2,5.5 0.8,9.6 0.4,10.6 2.55,2.8 4.1,6.35 4.1,10.75 0,15.35 -9.35,18.75 -18.25,19.75 1.45,1.25 2.7,3.65 2.7,7.4 0,5.349999 -0.05,9.65 -0.05,11 0,1.05 0.75,2.3 2.75,1.9 A 40.065,40.065 0 0 0 80,40 C 80,17.900001 62.1,0 40,0 Z' id='path2' style='stroke-width:5' /%3E%3C/svg%3E%0A") no-repeat; } .header-github-link:hover { opacity: 0.6; } [data-theme='dark'] .header-github-link::before { background: url("data:image/svg+xml,%3C%3Fxml version='1.0' encoding='UTF-8' standalone='no'%3F%3E%3Csvg viewBox='0 0 80 80' version='1.1' id='svg4' xmlns='http://www.w3.org/2000/svg' xmlns:svg='http://www.w3.org/2000/svg'%3E%3Cdefs id='defs8' /%3E%3Cpath fill='%23959da5' d='M 40,0 C 17.9,0 0,17.900001 0,40 c 0,17.7 11.45,32.65 27.35,37.950001 2,0.35 2.75,-0.85 2.75,-1.9 0,-0.95 -0.05,-4.1 -0.05,-7.45 C 20,70.45 17.4,66.15 16.6,63.9 16.15,62.75 14.2,59.2 12.5,58.25 11.1,57.5 9.1,55.65 12.45,55.600001 c 3.15,-0.05 5.4,2.899999 6.15,4.1 3.6,6.05 9.35,4.35 11.65,3.3 0.35,-2.6 1.4,-4.35 2.55,-5.35 -8.9,-1 -18.2,-4.45 -18.2,-19.75 0,-4.35 1.55,-7.95 4.1,-10.75 -0.4,-1 -1.8,-5.1 0.4,-10.6 0,0 3.35,-1.05 11,4.1 3.2,-0.9 6.6,-1.35 10,-1.35 3.4,0 6.8,0.45 10,1.35 7.65,-5.2 11,-4.1 11,-4.1 2.2,5.5 0.8,9.6 0.4,10.6 2.55,2.8 4.1,6.35 4.1,10.75 0,15.35 -9.35,18.75 -18.25,19.75 1.45,1.25 2.7,3.65 2.7,7.4 0,5.349999 -0.05,9.65 -0.05,11 0,1.05 0.75,2.3 2.75,1.9 A 40.065,40.065 0 0 0 80,40 C 80,17.900001 62.1,0 40,0 Z' id='path2' style='stroke-width:5' /%3E%3C/svg%3E%0A") no-repeat; } .header-bluesky-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%202.88%202.88%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20d%3D%22M%201.44%2C1.306859%20C%201.30956%2C1.053179%200.95447995%2C0.58049901%200.62423999%2C0.34745901%200.30791999%2C0.12413901%200.18732%2C0.16277901%200.10824%2C0.19865901%200.01668%2C0.23981901%200%2C0.38045901%200%2C0.46301901%20c%200%2C0.0828%200.04536%2C0.67799999%200.07488%2C0.77747999%200.0978%2C0.32832%200.44556%2C0.4392%200.76595999%2C0.4036799%200.01632%2C-0.0024%200.033%2C-0.00468%200.0498%2C-0.00672%20-0.01656%2C0.00264%20-0.03312%2C0.0048%20-0.0498%2C0.00672%20C%200.3714%2C1.7137789%20-0.0456%2C1.884779%200.50124%2C2.493539%201.1027999%2C3.1163391%201.3256399%2C2.359979%201.44%2C1.976579%201.55436%2C2.359979%201.686%2C3.0890991%202.36796%2C2.493539%202.88%2C1.976579%202.5086%2C1.713779%202.0391599%2C1.6441789%20a%201.04892%2C1.04892%200%200%201%20-0.0498%2C-0.00672%20c%200.0168%2C0.00204%200.03348%2C0.00432%200.0498%2C0.00672%20C%202.35956%2C1.6798189%202.7073199%2C1.5688189%202.80512%2C1.240499%202.8346401%2C1.141139%202.88%2C0.54569891%202.88%2C0.46313901%20c%200%2C-0.0828%20-0.01668%2C-0.22332%20-0.10824%2C-0.26472%20-0.07908%2C-0.03576%20-0.19968%2C-0.0744%20-0.516%2C0.1488%20C%201.92552%2C0.58061901%201.5704399%2C1.053299%201.44%2C1.306859%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.12%22%0A%20%20%20%20%20id%3D%22path2%22%20%2F%3E%0A%3C%2Fsvg%3E%0A") no-repeat; } .header-bluesky-link:hover { opacity: 0.6; } [data-theme='dark'] .header-bluesky-link::before { background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%202.88%202.88%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20d%3D%22M%201.44%2C1.306859%20C%201.30956%2C1.053179%200.95447995%2C0.58049901%200.62423999%2C0.34745901%200.30791999%2C0.12413901%200.18732%2C0.16277901%200.10824%2C0.19865901%200.01668%2C0.23981901%200%2C0.38045901%200%2C0.46301901%20c%200%2C0.0828%200.04536%2C0.67799999%200.07488%2C0.77747999%200.0978%2C0.32832%200.44556%2C0.4392%200.76595999%2C0.4036799%200.01632%2C-0.0024%200.033%2C-0.00468%200.0498%2C-0.00672%20-0.01656%2C0.00264%20-0.03312%2C0.0048%20-0.0498%2C0.00672%20C%200.3714%2C1.7137789%20-0.0456%2C1.884779%200.50124%2C2.493539%201.1027999%2C3.1163391%201.3256399%2C2.359979%201.44%2C1.976579%201.55436%2C2.359979%201.686%2C3.0890991%202.36796%2C2.493539%202.88%2C1.976579%202.5086%2C1.713779%202.0391599%2C1.6441789%20a%201.04892%2C1.04892%200%200%201%20-0.0498%2C-0.00672%20c%200.0168%2C0.00204%200.03348%2C0.00432%200.0498%2C0.00672%20C%202.35956%2C1.6798189%202.7073199%2C1.5688189%202.80512%2C1.240499%202.8346401%2C1.141139%202.88%2C0.54569891%202.88%2C0.46313901%20c%200%2C-0.0828%20-0.01668%2C-0.22332%20-0.10824%2C-0.26472%20-0.07908%2C-0.03576%20-0.19968%2C-0.0744%20-0.516%2C0.1488%20C%201.92552%2C0.58061901%201.5704399%2C1.053299%201.44%2C1.306859%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.12%22%0A%20%20%20%20%20id%3D%22path2%22%20%2F%3E%0A%3C%2Fsvg%3E%0A") no-repeat; } .header-linkedin-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%0A%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 19 18'%3E%3Cpath d='M3.94 2A2 2 0 1 1 2 0a2 2 0 0 1 1.94 2zM4 5.48H0V18h4zm6.32 0H6.34V18h3.94v-6.57c0-3.66 4.77-4 4.77 0V18H19v-7.93c0-6.17-7.06-5.94-8.72-2.91z' fill='rgb(149, 157, 165)'/%3E%3C/svg%3E") no-repeat; } .header-linkedin-link:hover { opacity: 0.6; } [data-theme='dark'] .header-linkedin-link::before { background-image: url("data:image/svg+xml,%0A%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 19 18'%3E%3Cpath d='M3.94 2A2 2 0 1 1 2 0a2 2 0 0 1 1.94 2zM4 5.48H0V18h4zm6.32 0H6.34V18h3.94v-6.57c0-3.66 4.77-4 4.77 0V18H19v-7.93c0-6.17-7.06-5.94-8.72-2.91z' fill='rgb(149, 157, 165)'/%3E%3C/svg%3E") no-repeat; } .header-x-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%3Csvg%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%20viewBox%3D%220%200%201200%201227%22%20fill%3D%22rgb(149%2C%20157%2C%20165)%22%3E%3Cpath%20d%3D%22M714.163%20519.284%201160.89%200h-105.86L667.137%20450.887%20357.328%200H0l468.492%20681.821L0%201226.37h105.866l409.625-476.152%20327.181%20476.152H1200L714.137%20519.284h.026ZM569.165%20687.828l-47.468-67.894-377.686-540.24h162.604l304.797%20435.991%2047.468%2067.894%20396.2%20566.721H892.476L569.165%20687.854v-.026Z%22%20%2F%3E%3C%2Fsvg%3E") no-repeat; } .header-x-link:hover { opacity: 0.6; } [data-theme='dark'] .header-x-link::before { background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='%231DA1F2' viewBox='0 0 20 20' aria-hidden='true'%3E%3Cpath d='M19.96 3.808a8.333 8.333 0 01-2.353.646 4.132 4.132 0 001.802-2.269 8.47 8.47 0 01-2.606.987 4.1 4.1 0 00-6.986 3.735c-3.409-.161-6.428-1.799-8.45-4.272a4.018 4.018 0 00-.555 2.063A4.1 4.1 0 002.635 8.11a4.087 4.087 0 01-1.857-.513v.05a4.102 4.102 0 003.289 4.022 4.162 4.162 0 01-1.844.07 4.114 4.114 0 003.837 2.848 8.223 8.223 0 01-5.085 1.755c-.325 0-.65-.02-.975-.056a11.662 11.662 0 006.298 1.84c7.544 0 11.665-6.246 11.665-11.654 0-.175 0-.35-.013-.525A8.278 8.278 0 0020 3.825l-.04-.017z'/%3E%3C/svg%3E%0A") no-repeat; } .header-discord-link::before { content: ''; width: 24px; height: 18px; display: flex; background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 127.14 96.36'%3E%3Cpath fill='rgb(149, 157, 165)' d='M107.7,8.07A105.15,105.15,0,0,0,81.47,0a72.06,72.06,0,0,0-3.36,6.83A97.68,97.68,0,0,0,49,6.83,72.37,72.37,0,0,0,45.64,0,105.89,105.89,0,0,0,19.39,8.09C2.79,32.65-1.71,56.6.54,80.21h0A105.73,105.73,0,0,0,32.71,96.36,77.7,77.7,0,0,0,39.6,85.25a68.42,68.42,0,0,1-10.85-5.18c.91-.66,1.8-1.34,2.66-2a75.57,75.57,0,0,0,64.32,0c.87.71,1.76,1.39,2.66,2a68.68,68.68,0,0,1-10.87,5.19,77,77,0,0,0,6.89,11.1A105.25,105.25,0,0,0,126.6,80.22h0C129.24,52.84,122.09,29.11,107.7,8.07ZM42.45,65.69C36.18,65.69,31,60,31,53s5-12.74,11.43-12.74S54,46,53.89,53,48.84,65.69,42.45,65.69Zm42.24,0C78.41,65.69,73.25,60,73.25,53s5-12.74,11.44-12.74S96.23,46,96.12,53,91.08,65.69,84.69,65.69Z'/%3E%3C/svg%3E%0A") no-repeat; } .header-discord-link:hover { opacity: 0.6; } [data-theme='dark'] .header-discord-link::before { background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 127.14 96.36'%3E%3Cpath fill='rgb(149, 157, 165)' d='M107.7,8.07A105.15,105.15,0,0,0,81.47,0a72.06,72.06,0,0,0-3.36,6.83A97.68,97.68,0,0,0,49,6.83,72.37,72.37,0,0,0,45.64,0,105.89,105.89,0,0,0,19.39,8.09C2.79,32.65-1.71,56.6.54,80.21h0A105.73,105.73,0,0,0,32.71,96.36,77.7,77.7,0,0,0,39.6,85.25a68.42,68.42,0,0,1-10.85-5.18c.91-.66,1.8-1.34,2.66-2a75.57,75.57,0,0,0,64.32,0c.87.71,1.76,1.39,2.66,2a68.68,68.68,0,0,1-10.87,5.19,77,77,0,0,0,6.89,11.1A105.25,105.25,0,0,0,126.6,80.22h0C129.24,52.84,122.09,29.11,107.7,8.07ZM42.45,65.69C36.18,65.69,31,60,31,53s5-12.74,11.43-12.74S54,46,53.89,53,48.84,65.69,42.45,65.69Zm42.24,0C78.41,65.69,73.25,60,73.25,53s5-12.74,11.44-12.74S96.23,46,96.12,53,91.08,65.69,84.69,65.69Z'/%3E%3C/svg%3E%0A") no-repeat; } .header-tiktok-link::before { content: ''; width: 24px; height: 24px; display: flex; background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20fill%3D%22%23959da5%22%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%200.72%200.72%22%0A%20%20%20xml%3Aspace%3D%22preserve%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%3Cpath%0A%20%20%20%20%20d%3D%22M%200.63539429%2C0.16867393%20A%200.1725255%2C0.17252543%200%200%201%200.49969199%2C0.01587392%20V%200%20h%20-0.1240039%20v%200.49212763%20a%200.10424241%2C0.10424236%200%200%201%20-0.1872116%2C0.0627398%20l%20-7.2e-5%2C-3.6e-5%207.2e-5%2C3.6e-5%20A%200.10420641%2C0.10420637%200%200%201%200.30304959%2C0.39252865%20V%200.26654513%20A%200.22781429%2C0.2278142%200%200%200%200.10889089%2C0.65140678%200.22785029%2C0.2278502%200%200%200%200.49969199%2C0.4921636%20V%200.24070051%20a%200.2945136%2C0.29451347%200%200%200%200.1718056%2C0.0549288%20V%200.17241745%20a%200.17389332%2C0.17389325%200%200%201%20-0.0361033%2C-0.003744%20z%22%0A%20%20%20%20%20id%3D%22path2%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.0359952%22%20%2F%3E%3C%2Fsvg%3E%0A") no-repeat; } .header-tiktok-link:hover { opacity: 0.6; } .header-youtube-link::before { content: ''; width: 24px; height: 20px; display: flex; background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20viewBox%3D%220%200%2024%2024%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20d%3D%22M%2023.496693%2C5.8315054%20A%203.0042862%2C3.0042862%200%200%200%2021.393692%2C3.6909515%20C%2019.516013%2C3.1652014%2011.992781%2C3.1652014%2011.992781%2C3.1652014%20A%2072.040279%2C72.040279%200%200%200%202.6043863%2C3.6659158%203.1169469%2C3.1169469%200%200%200%200.488868%2C5.8315054%2032.884416%2C32.884416%200%200%200%206.7149698e-4%2C11.677346%2032.734202%2C32.734202%200%200%200%200.488868%2C17.523186%203.0418398%2C3.0418398%200%200%200%202.6043863%2C19.663739%20c%201.9027146%2C0.525751%209.3883947%2C0.525751%209.3883947%2C0.525751%20a%2072.215529%2C72.215529%200%200%200%209.400911%2C-0.500715%203.0042862%2C3.0042862%200%200%200%202.103001%2C-2.140554%2032.083273%2C32.083273%200%200%200%200.500714%2C-5.845839%2030.042862%2C30.042862%200%200%200%20-0.500714%2C-5.8708766%20z%20M%209.6018695%2C15.320042%20V%208.0346486%20l%206.2589285%2C3.6426974%20z%22%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20id%3D%22path2%22%0A%20%20%20%20%20style%3D%22stroke-width%3A1.25179%22%20%2F%3E%0A%3C%2Fsvg%3E%0A") no-repeat; } .header-youtube-link:hover { opacity: 0.6; } header .container { max-width: 9000px; } ```