#
tokens: 47377/50000 18/134 files (page 2/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 5. Use http://codebase.md/datalayer/jupyter-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .github
│   ├── copilot-instructions.md
│   ├── dependabot.yml
│   └── workflows
│       ├── build.yml
│       ├── fix-license-header.yml
│       ├── lint.sh
│       ├── prep-release.yml
│       ├── publish-release.yml
│       └── test.yml
├── .gitignore
├── .licenserc.yaml
├── .pre-commit-config.yaml
├── .vscode
│   ├── mcp.json
│   └── settings.json
├── ARCHITECTURE.md
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── dev
│   ├── content
│   │   ├── new.ipynb
│   │   ├── notebook.ipynb
│   │   └── README.md
│   └── README.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .yarnrc.yml
│   ├── babel.config.js
│   ├── docs
│   │   ├── _category_.yaml
│   │   ├── clients
│   │   │   ├── _category_.yaml
│   │   │   ├── claude_desktop
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── cline
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── cursor
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── index.mdx
│   │   │   ├── vscode
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   └── windsurf
│   │   │       ├── _category_.yaml
│   │   │       └── index.mdx
│   │   ├── configure
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── contribute
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── deployment
│   │   │   ├── _category_.yaml
│   │   │   ├── datalayer
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── streamable-http
│   │   │   │       └── index.mdx
│   │   │   ├── index.mdx
│   │   │   └── jupyter
│   │   │       ├── _category_.yaml
│   │   │       ├── index.mdx
│   │   │       ├── stdio
│   │   │       │   ├── _category_.yaml
│   │   │       │   └── index.mdx
│   │   │       └── streamable-http
│   │   │           ├── _category_.yaml
│   │   │           ├── jupyter-extension
│   │   │           │   └── index.mdx
│   │   │           └── standalone
│   │   │               └── index.mdx
│   │   ├── index.mdx
│   │   ├── releases
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── resources
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   └── tools
│   │       ├── _category_.yaml
│   │       └── index.mdx
│   ├── docusaurus.config.js
│   ├── LICENSE
│   ├── Makefile
│   ├── package.json
│   ├── README.md
│   ├── sidebars.js
│   ├── src
│   │   ├── components
│   │   │   ├── HomepageFeatures.js
│   │   │   ├── HomepageFeatures.module.css
│   │   │   ├── HomepageProducts.js
│   │   │   └── HomepageProducts.module.css
│   │   ├── css
│   │   │   └── custom.css
│   │   ├── pages
│   │   │   ├── index.module.css
│   │   │   ├── markdown-page.md
│   │   │   └── testimonials.tsx
│   │   └── theme
│   │       └── CustomDocItem.tsx
│   └── static
│       └── img
│           ├── datalayer
│           │   ├── logo.png
│           │   └── logo.svg
│           ├── favicon.ico
│           ├── feature_1.svg
│           ├── feature_2.svg
│           ├── feature_3.svg
│           ├── product_1.svg
│           ├── product_2.svg
│           └── product_3.svg
├── examples
│   └── integration_example.py
├── jupyter_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── __version__.py
│   ├── config.py
│   ├── enroll.py
│   ├── env.py
│   ├── jupyter_extension
│   │   ├── __init__.py
│   │   ├── backends
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── local_backend.py
│   │   │   └── remote_backend.py
│   │   ├── context.py
│   │   ├── extension.py
│   │   ├── handlers.py
│   │   └── protocol
│   │       ├── __init__.py
│   │       └── messages.py
│   ├── models.py
│   ├── notebook_manager.py
│   ├── server_modes.py
│   ├── server.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── _base.py
│   │   ├── _registry.py
│   │   ├── assign_kernel_to_notebook_tool.py
│   │   ├── delete_cell_tool.py
│   │   ├── execute_cell_tool.py
│   │   ├── execute_ipython_tool.py
│   │   ├── insert_cell_tool.py
│   │   ├── insert_execute_code_cell_tool.py
│   │   ├── list_cells_tool.py
│   │   ├── list_files_tool.py
│   │   ├── list_kernels_tool.py
│   │   ├── list_notebooks_tool.py
│   │   ├── overwrite_cell_source_tool.py
│   │   ├── read_cell_tool.py
│   │   ├── read_cells_tool.py
│   │   ├── restart_notebook_tool.py
│   │   ├── unuse_notebook_tool.py
│   │   └── use_notebook_tool.py
│   └── utils.py
├── jupyter-config
│   ├── jupyter_notebook_config
│   │   └── jupyter_mcp_server.json
│   └── jupyter_server_config.d
│       └── jupyter_mcp_server.json
├── LICENSE
├── Makefile
├── pyproject.toml
├── pytest.ini
├── README.md
├── RELEASE.md
├── smithery.yaml
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── test_common.py
    ├── test_config.py
    ├── test_jupyter_extension.py
    ├── test_list_kernels.py
    ├── test_tools.py
    └── test_use_notebook.py
```

# Files

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/list_cells_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""List cells tool implementation."""

from typing import Any, Optional
from jupyter_server_api import JupyterServerClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager
from jupyter_mcp_server.config import get_config
from jupyter_nbmodel_client import NbModelClient
from jupyter_mcp_server.utils import normalize_cell_source, format_TSV


class ListCellsTool(BaseTool):
    """Tool to list basic information of all cells."""
    
    @property
    def name(self) -> str:
        return "list_cells"
    
    @property
    def description(self) -> str:
        return """List the basic information of all cells in the notebook.
    
Returns a formatted table showing the index, type, execution count (for code cells),
and first line of each cell. This provides a quick overview of the notebook structure
and is useful for locating specific cells for operations like delete or insert.

Returns:
    str: Formatted table with cell information (Index, Type, Count, First Line)"""
    
    async def _list_cells_local(self, contents_manager: Any, path: str) -> str:
        """List cells using local contents_manager (JUPYTER_SERVER mode)."""
        # Read the notebook file directly
        model = await contents_manager.get(path, content=True, type='notebook')
        
        if 'content' not in model:
            raise ValueError(f"Could not read notebook content from {path}")
        
        notebook_content = model['content']
        cells = notebook_content.get('cells', [])
        
        # Format the cells into a table
        headers = ["Index", "Type", "Count", "First Line"]
        rows = []
        
        for idx, cell in enumerate(cells):
            cell_type = cell.get('cell_type', 'unknown')
            execution_count = cell.get('execution_count', '-') if cell_type == 'code' else '-'
            
            # Get the first line of source
            source = cell.get('source', '')
            if isinstance(source, list):
                first_line = source[0] if source else ''
                lines = len(source)
            else:
                first_line = source.split('\n')[0] if source else ''
                lines = len(source.split('\n'))
            
            if lines > 1:
                first_line += f"...({lines - 1} lines hidden)"
            
            rows.append([idx, cell_type, execution_count, first_line])
        
        return format_TSV(headers, rows)
    
    def _list_cells_websocket(self, notebook: NbModelClient) -> str:
        """List cells using WebSocket connection (MCP_SERVER mode)."""
        total_cells = len(notebook)
        
        if total_cells == 0:
            return "Notebook is empty, no cells found."
        
        # Create header
        headers = ["Index", "Type", "Count", "First Line"]
        rows = []
        
        # Process each cell
        for i in range(total_cells):
            cell_data = notebook[i]
            cell_type = cell_data.get("cell_type", "unknown")
            
            # Get execution count for code cells
            execution_count = (cell_data.get("execution_count") or "None") if cell_type == "code" else "N/A"
            # Get first line of source
            source_lines = normalize_cell_source(cell_data.get("source", ""))
            first_line = source_lines[0] if source_lines else ""
            if len(source_lines) > 1:
                first_line += f"...({len(source_lines) - 1} lines hidden)"
            
            # Add to table
            rows.append([i, cell_type, execution_count, first_line])
        
        return format_TSV(headers, rows)
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        **kwargs
    ) -> str:
        """Execute the list_cells tool.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            contents_manager: Direct API access for JUPYTER_SERVER mode
            notebook_manager: Notebook manager instance
            **kwargs: Additional parameters
            
        Returns:
            Formatted table with cell information
        """
        if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
            # Local mode: read notebook directly from file system
            from jupyter_mcp_server.jupyter_extension.context import get_server_context
            from pathlib import Path
            
            context = get_server_context()
            serverapp = context.serverapp
            
            # Get current notebook path from notebook_manager if available, else use config
            notebook_path = None
            if notebook_manager:
                notebook_path = notebook_manager.get_current_notebook_path()
            if not notebook_path:
                config = get_config()
                notebook_path = config.document_id
            
            # contents_manager expects path relative to serverapp.root_dir
            # If we have an absolute path, convert it to relative
            if serverapp and Path(notebook_path).is_absolute():
                root_dir = Path(serverapp.root_dir)
                abs_path = Path(notebook_path)
                try:
                    notebook_path = str(abs_path.relative_to(root_dir))
                except ValueError:
                    # Path is not under root_dir, use as-is
                    pass
            
            return await self._list_cells_local(contents_manager, notebook_path)
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            # Remote mode: use WebSocket connection to Y.js document
            async with notebook_manager.get_current_connection() as notebook:
                return self._list_cells_websocket(notebook)
        else:
            raise ValueError(f"Invalid mode or missing required clients: mode={mode}")

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/assign_kernel_to_notebook_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Assign kernel to notebook tool implementation."""

from typing import Any, Optional
from jupyter_server_api import JupyterServerClient, NotFoundError
from jupyter_mcp_server.tools._base import BaseTool, ServerMode


class AssignKernelToNotebookTool(BaseTool):
    """Tool to assign a kernel to a notebook by creating a Jupyter session."""
    
    @property
    def name(self) -> str:
        return "assign_kernel_to_notebook"
    
    @property
    def description(self) -> str:
        return """Assign a kernel to a notebook by creating a Jupyter session.
    
This creates a Jupyter server session that connects a notebook file to a kernel,
enabling code execution in the notebook. Sessions are the mechanism Jupyter uses
to maintain the relationship between notebooks and their kernels.

Args:
    notebook_path: Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb")
    kernel_id: ID of the kernel to assign to the notebook
    session_name: Optional name for the session (defaults to notebook path)
    
Returns:
    str: Success message with session information including session ID"""
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        contents_manager: Optional[Any] = None,
        session_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        # Tool-specific parameters
        notebook_path: str = None,
        kernel_id: str = None,
        session_name: Optional[str] = None,
        **kwargs
    ) -> str:
        """Execute the assign_kernel_to_notebook tool.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: HTTP client for MCP_SERVER mode
            contents_manager: Direct API access for JUPYTER_SERVER mode
            session_manager: Session manager for JUPYTER_SERVER mode
            kernel_manager: Kernel manager for validation
            notebook_path: Path to the notebook file
            kernel_id: ID of the kernel to assign
            session_name: Optional session name
            **kwargs: Additional parameters
            
        Returns:
            Success message with session information
        """
        if not notebook_path:
            return "Error: notebook_path is required"
        
        if not kernel_id:
            return "Error: kernel_id is required"
        
        # Use notebook_path as session name if not provided
        if not session_name:
            session_name = notebook_path
        
        # Verify notebook exists
        try:
            if mode == ServerMode.MCP_SERVER and server_client is not None:
                # Check notebook exists using HTTP API
                try:
                    # FIXED: contents.get_file -> contents.get
                    server_client.contents.get(notebook_path)
                except NotFoundError:
                    return f"Error: Notebook '{notebook_path}' not found on Jupyter server"
            elif mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
                # Check notebook exists using local API
                try:
                    await contents_manager.get(notebook_path, content=False)
                except Exception as e:
                    return f"Error: Notebook '{notebook_path}' not found: {e}"
            else:
                return f"Error: Invalid mode or missing required clients: mode={mode}"
        except Exception as e:
            return f"Error checking notebook: {e}"
        
        # Verify kernel exists
        try:
            if mode == ServerMode.MCP_SERVER and server_client is not None:
                # Check kernel exists using HTTP API
                kernels = server_client.kernels.list_kernels()
                kernel_exists = any(kernel.id == kernel_id for kernel in kernels)
                if not kernel_exists:
                    return f"Error: Kernel '{kernel_id}' not found on Jupyter server"
            elif mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None:
                # Check kernel exists using local API
                if kernel_id not in kernel_manager:
                    return f"Error: Kernel '{kernel_id}' not found in local kernel manager"
            else:
                return f"Error: Invalid mode or missing kernel manager: mode={mode}"
        except Exception as e:
            return f"Error checking kernel: {e}"
        
        # Create the session
        try:
            if mode == ServerMode.MCP_SERVER and server_client is not None:
                # Create session using HTTP API
                session = server_client.sessions.create_session(
                    path=notebook_path,
                    kernel={"id": kernel_id},
                    session_type="notebook",
                    name=session_name
                )
                return (
                    f"Successfully created session '{session.id}' for notebook '{notebook_path}' "
                    f"with kernel '{kernel_id}'. The notebook is now connected to the kernel."
                )
            elif mode == ServerMode.JUPYTER_SERVER and session_manager is not None:
                # Create session using local API
                # The session_manager API varies, but typically follows similar pattern
                import asyncio
                
                # Create session dict with required parameters
                session_dict = await asyncio.to_thread(
                    session_manager.create_session,
                    path=notebook_path,
                    kernel_id=kernel_id,
                    type="notebook",
                    name=session_name
                )
                
                session_id = session_dict.get("id", "unknown")
                return (
                    f"Successfully created session '{session_id}' for notebook '{notebook_path}' "
                    f"with kernel '{kernel_id}'. The notebook is now connected to the kernel."
                )
            else:
                return f"Error: Invalid mode or missing session manager: mode={mode}"
        except Exception as e:
            return f"Error creating session: {e}"

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/jupyter_extension/protocol/messages.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
MCP Protocol Messages

Pydantic models for MCP protocol requests and responses to ensure consistent
API across both MCP_SERVER and JUPYTER_SERVER modes.
"""

from typing import Any, Optional, Union, Literal
from pydantic import BaseModel, Field
from mcp.types import ImageContent


# Tool execution models
class ToolRequest(BaseModel):
    """Request to execute a tool"""
    tool_name: str = Field(..., description="Name of the tool to execute")
    arguments: dict[str, Any] = Field(default_factory=dict, description="Tool arguments")
    context: Optional[dict[str, Any]] = Field(None, description="Execution context")


class ToolResponse(BaseModel):
    """Response from tool execution"""
    success: bool = Field(..., description="Whether execution was successful")
    result: Any = Field(None, description="Tool execution result")
    error: Optional[str] = Field(None, description="Error message if execution failed")


# Notebook operation models
class NotebookContentRequest(BaseModel):
    """Request to retrieve notebook content"""
    path: str = Field(..., description="Path to the notebook file")
    include_outputs: bool = Field(True, description="Include cell outputs")


class NotebookContentResponse(BaseModel):
    """Response containing notebook content"""
    path: str = Field(..., description="Notebook path")
    cells: list[dict[str, Any]] = Field(..., description="List of cells")
    metadata: dict[str, Any] = Field(default_factory=dict, description="Notebook metadata")


class NotebookListRequest(BaseModel):
    """Request to list notebooks"""
    path: Optional[str] = Field("", description="Directory path to search")
    recursive: bool = Field(True, description="Search recursively")


class NotebookListResponse(BaseModel):
    """Response containing list of notebooks"""
    notebooks: list[str] = Field(..., description="List of notebook paths")


# Cell operation models
class ReadCellsRequest(BaseModel):
    """Request to read cells from a notebook"""
    path: Optional[str] = Field(None, description="Notebook path (uses current if not specified)")
    start_index: Optional[int] = Field(None, description="Start cell index")
    end_index: Optional[int] = Field(None, description="End cell index")


class ReadCellsResponse(BaseModel):
    """Response containing cell information"""
    cells: list[dict[str, Any]] = Field(..., description="List of cell information")


class AppendCellRequest(BaseModel):
    """Request to append a cell"""
    path: Optional[str] = Field(None, description="Notebook path")
    cell_type: Literal["code", "markdown"] = Field(..., description="Cell type")
    source: Union[str, list[str]] = Field(..., description="Cell source")


class AppendCellResponse(BaseModel):
    """Response after appending a cell"""
    cell_index: int = Field(..., description="Index of the appended cell")
    message: str = Field(..., description="Success message")


class InsertCellRequest(BaseModel):
    """Request to insert a cell"""
    path: Optional[str] = Field(None, description="Notebook path")
    cell_index: int = Field(..., description="Index where to insert")
    cell_type: Literal["code", "markdown"] = Field(..., description="Cell type")
    source: Union[str, list[str]] = Field(..., description="Cell source")


class InsertCellResponse(BaseModel):
    """Response after inserting a cell"""
    cell_index: int = Field(..., description="Index of the inserted cell")
    message: str = Field(..., description="Success message")


class DeleteCellRequest(BaseModel):
    """Request to delete a cell"""
    path: Optional[str] = Field(None, description="Notebook path")
    cell_index: int = Field(..., description="Index of cell to delete")


class DeleteCellResponse(BaseModel):
    """Response after deleting a cell"""
    message: str = Field(..., description="Success message")


class OverwriteCellRequest(BaseModel):
    """Request to overwrite a cell"""
    path: Optional[str] = Field(None, description="Notebook path")
    cell_index: int = Field(..., description="Index of cell to overwrite")
    new_source: Union[str, list[str]] = Field(..., description="New cell source")


class OverwriteCellResponse(BaseModel):
    """Response after overwriting a cell"""
    message: str = Field(..., description="Success message with diff")


# Cell execution models
class ExecuteCellRequest(BaseModel):
    """Request to execute a cell"""
    path: Optional[str] = Field(None, description="Notebook path")
    cell_index: int = Field(..., description="Index of cell to execute")
    timeout_seconds: int = Field(300, description="Execution timeout in seconds")


class ExecuteCellResponse(BaseModel):
    """Response after executing a cell"""
    cell_index: int = Field(..., description="Executed cell index")
    outputs: list[Union[str, ImageContent]] = Field(..., description="Cell outputs")
    execution_count: Optional[int] = Field(None, description="Execution count")
    status: Literal["success", "error", "timeout"] = Field(..., description="Execution status")


# Kernel operation models
class ConnectNotebookRequest(BaseModel):
    """Request to connect to a notebook"""
    notebook_name: str = Field(..., description="Unique notebook identifier")
    notebook_path: str = Field(..., description="Path to notebook file")
    mode: Literal["connect", "create"] = Field("connect", description="Connection mode")
    kernel_id: Optional[str] = Field(None, description="Specific kernel ID")


class ConnectNotebookResponse(BaseModel):
    """Response after connecting to notebook"""
    message: str = Field(..., description="Success message")
    notebook_name: str = Field(..., description="Notebook identifier")
    notebook_path: str = Field(..., description="Notebook path")


class UnuseNotebookRequest(BaseModel):
    """Request to unuse from a notebook"""
    notebook_name: str = Field(..., description="Notebook identifier to disconnect")


class UnuseNotebookResponse(BaseModel):
    """Response after disconnecting"""
    message: str = Field(..., description="Success message")


class RestartNotebookRequest(BaseModel):
    """Request to restart a notebook kernel"""
    notebook_name: str = Field(..., description="Notebook identifier to restart")


class RestartNotebookResponse(BaseModel):
    """Response after restarting kernel"""
    message: str = Field(..., description="Success message")
    notebook_name: str = Field(..., description="Notebook identifier")

```

--------------------------------------------------------------------------------
/docs/static/img/feature_1.svg:
--------------------------------------------------------------------------------

```
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
  ~ Copyright (c) 2023-2024 Datalayer, Inc.
  ~
  ~ BSD 3-Clause License
-->

<svg
   xmlns:dc="http://purl.org/dc/elements/1.1/"
   xmlns:cc="http://creativecommons.org/ns#"
   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
   xmlns:svg="http://www.w3.org/2000/svg"
   xmlns="http://www.w3.org/2000/svg"
   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
   viewBox="0 0 143.86 320.16998"
   version="1.1"
   id="svg1038"
   sodipodi:docname="feature_1.svg"
   inkscape:version="1.0.1 (c497b03c, 2020-09-10)"
   width="143.86"
   height="320.16998">
  <metadata
     id="metadata1042">
    <rdf:RDF>
      <cc:Work
         rdf:about="">
        <dc:format>image/svg+xml</dc:format>
        <dc:type
           rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
        <dc:title>Startup_SVG</dc:title>
      </cc:Work>
    </rdf:RDF>
  </metadata>
  <sodipodi:namedview
     pagecolor="#ffffff"
     bordercolor="#666666"
     borderopacity="1"
     objecttolerance="10"
     gridtolerance="10"
     guidetolerance="10"
     inkscape:pageopacity="0"
     inkscape:pageshadow="2"
     inkscape:window-width="1440"
     inkscape:window-height="717"
     id="namedview1040"
     showgrid="false"
     inkscape:zoom="1.0226025"
     inkscape:cx="117.68707"
     inkscape:cy="153.04271"
     inkscape:window-x="0"
     inkscape:window-y="25"
     inkscape:window-maximized="0"
     inkscape:current-layer="svg1038"
     inkscape:document-rotation="0"
     fit-margin-top="0"
     fit-margin-left="0"
     fit-margin-right="0"
     fit-margin-bottom="0" />
  <defs
     id="defs835">
    <style
       id="style833">.cls-1,.cls-11,.cls-9{fill:#d6d8e5;}.cls-1{opacity:0.15;}.cls-2,.cls-3{fill:#edeff9;}.cls-2{opacity:0.5;}.cls-4{fill:#ffbc00;}.cls-5{fill:#8c50ff;}.cls-6{fill:#424956;}.cls-7{fill:#494949;}.cls-8{fill:#2b303f;}.cls-9{opacity:0.4;}.cls-10{fill:#b1b4c4;}.cls-12{fill:#9ea1af;}.cls-13{fill:#c4c7d6;}.cls-14{fill:#e4e7f2;}.cls-15{fill:#fff;}.cls-16{fill:#e9eaf2;}.cls-17{fill:#f5f6ff;}.cls-18,.cls-19{fill:none;stroke:#e9eaf2;stroke-miterlimit:10;}.cls-18{stroke-width:0.31px;}.cls-19{stroke-width:1.53px;}.cls-20{fill:#edf0f9;}.cls-21{fill:#e2e5f2;}.cls-22{fill:#6e48e5;}.cls-23{fill:#5e42d3;}.cls-24{fill:#ffcea9;}.cls-25{fill:#ededed;}.cls-26{fill:#38226d;}.cls-27{fill:#9c73ff;}.cls-28{fill:#f4f4f4;}.cls-29{fill:#3a2c6d;}.cls-30{isolation:isolate;}</style>
  </defs>
  <title
     id="title837">Startup_SVG</title>
  <ellipse
     class="cls-1"
     cx="71.93"
     cy="278.63998"
     rx="71.93"
     ry="41.529999"
     id="ellipse839" />
  <ellipse
     class="cls-2"
     cx="71.099998"
     cy="274.19998"
     rx="40.119999"
     ry="23.16"
     id="ellipse841" />
  <ellipse
     class="cls-3"
     cx="70.719994"
     cy="265.97998"
     rx="11.44"
     ry="6.6100001"
     id="ellipse843" />
  <rect
     class="cls-3"
     x="60.099998"
     y="178.34999"
     width="22"
     height="86.459999"
     id="rect845" />
  <path
     class="cls-4"
     d="m 59.05,177.37 c 0,0 0.6,17.78 13.25,49.67 0,0 11.78,-25.68 13,-50.21 1.22,-24.53 -26.25,0.54 -26.25,0.54 z"
     id="path847" />
  <polygon
     class="cls-5"
     points="10.85,219.48 46.25,201.73 52.69,156.37 43.6,153.81 14.33,182.29 "
     id="polygon849"
     transform="translate(-4.1748046e-7,-43.38)" />
  <polygon
     class="cls-5"
     points="133.75,219.48 98.22,200.5 91.91,156.37 101,153.81 130.26,182.29 "
     id="polygon851"
     transform="translate(-4.1748046e-7,-43.38)" />
  <ellipse
     class="cls-6"
     cx="71.93"
     cy="173.46001"
     rx="14.55"
     ry="8.3999996"
     id="ellipse853" />
  <polygon
     class="cls-7"
     points="93.94,208.72 49.87,208.44 49.87,200.88 93.94,200.88 "
     id="polygon855"
     transform="translate(-4.1748046e-7,-43.38)" />
  <ellipse
     class="cls-8"
     cx="71.93"
     cy="165.05"
     rx="22.059999"
     ry="12.74"
     id="ellipse857" />
  <ellipse
     class="cls-3"
     cx="71.859993"
     cy="153.58002"
     rx="26.92"
     ry="15.54"
     id="ellipse859" />
  <path
     class="cls-3"
     d="m 100.93,125.9 v 0 c 0,1.25 0,2.48 0,3.67 0,1 0,2 0,3 0,1.35 -0.07,2.64 -0.12,3.9 0,1.57 -0.11,3.06 -0.19,4.49 -0.08,1.43 -0.15,2.83 -0.25,4.12 -0.41,5.65 -0.4,9.81 -1.1,11.44 -3.81,8.91 -16.7,4 -23.06,0.8 -2.35,-1.18 -3.9,-2.15 -3.9,-2.15 h -0.58 c 0,0 -1.84,1 -4.56,2.08 -6.67,2.7 -18.56,7.49 -21.86,-0.76 -0.59,-1.48 -1.18,-6.37 -1.54,-11.49 -0.09,-1.28 -0.17,-2.65 -0.25,-4.11 -0.08,-1.46 -0.12,-2.91 -0.18,-4.48 0,-1.25 -0.07,-2.54 -0.1,-3.88 0,-1 0,-2 0,-3 v 0 -3.67 0 C 43.06,90.969997 46.76,35.179997 58.98,11.369997 c 3.53,-6.89 7.77,-11.1 12.84,-11.349997 v 0 h 0.58 v 0 c 5,0.259997 9.29,4.449997 12.83,11.309997 12.05,23.72 15.92,79.29 15.7,114.570003 z"
     id="path861" />
  <circle
     class="cls-8"
     cx="71.999992"
     cy="82.829994"
     r="10.17"
     id="circle863" />
  <path
     class="cls-6"
     d="m 72,94.489997 a 11.66,11.66 0 1 1 11.66,-11.66 11.67,11.67 0 0 1 -11.66,11.66 z m 0,-20.34 a 8.68,8.68 0 1 0 8.67,8.68 8.69,8.69 0 0 0 -8.67,-8.68 z"
     id="path865" />
  <circle
     class="cls-8"
     cx="71.999992"
     cy="52.930004"
     r="10.17"
     id="circle867" />
  <path
     class="cls-6"
     d="m 72,64.619997 a 11.67,11.67 0 1 1 11.66,-11.69 11.68,11.68 0 0 1 -11.66,11.69 z m 0,-20.34 a 8.68,8.68 0 1 0 8.67,8.67 8.68,8.68 0 0 0 -8.67,-8.69 z"
     id="path869" />
  <path
     class="cls-5"
     d="m 100.93,146.01 c 0,1.25 0,2.48 0,3.68 -1,3.3 -3.72,6.46 -8.14,9 -11.46,6.61 -30,6.61 -41.51,0 -4.42,-2.56 -7.13,-5.73 -8.14,-9 v 0 -3.67 c 1,3.36 3.7,6.56 8.18,9.15 11.47,6.61 30.05,6.61 41.51,0 4.39,-2.62 7.17,-5.81 8.1,-9.16 z"
     id="path871" />
  <path
     class="cls-5"
     d="m 100.84,151.83 c 0,1.34 -0.07,2.64 -0.12,3.89 -1.12,3.12 -3.78,6.09 -8,8.51 -11.46,6.62 -30,6.62 -41.51,0 -4.21,-2.44 -6.87,-5.43 -8,-8.57 0,-1.25 -0.07,-2.54 -0.1,-3.87 1,3.26 3.73,6.36 8.09,8.88 11.47,6.62 30.05,6.62 41.51,0 4.38,-2.51 7.08,-5.6 8.13,-8.84 z"
     id="path873" />
  <path
     class="cls-5"
     d="m 100.53,141.5 c -1.21,2.93 -3.81,5.72 -7.78,8 -11.46,6.61 -30,6.61 -41.51,0 -4,-2.32 -6.6,-5.12 -7.81,-8.08 0.08,1.46 0.16,2.83 0.25,4.11 1.29,2.76 3.81,5.36 7.56,7.53 a 39.77,39.77 0 0 0 15.84,4.72 50.54,50.54 0 0 0 9,0.07 40.14,40.14 0 0 0 16.63,-4.79 c 3.72,-2.15 6.23,-4.72 7.53,-7.45 z"
     id="path875" />
  <path
     class="cls-5"
     d="m 85.05,11.349997 a 44.73,44.73 0 0 1 -26.25,0 C 62.33,4.459997 66.57,0.249997 71.64,0 v 0 h 0.58 v 0 c 5.05,0.299997 9.29,4.489997 12.83,11.349997 z"
     id="path877" />
  <rect
     class="cls-5"
     x="69.799995"
     y="110.51"
     width="4.9899998"
     height="65.510002"
     id="rect879" />
</svg>

```

--------------------------------------------------------------------------------
/docs/docs/tools/index.mdx:
--------------------------------------------------------------------------------

```markdown
# Tools

The server currently offers 16 tools organized into 3 categories:

## Server Management Tools (3 tools)

#### 1. `list_files`

- List all files and directories in the Jupyter server's file system.
- This tool recursively lists files and directories from the Jupyter server's content API, showing the complete file structure including notebooks, data files, scripts, and directories.
- Input:
  - `path`(string, optional): The starting path to list from (empty string means root directory)
  - `max_depth`(int, optional): Maximum depth to recurse into subdirectories (default: 3)
- Returns: Tab-separated table with columns: Path, Type, Size, Last_Modified
  - **Path**: Full path to the file or directory
  - **Type**: File type ("file", "directory", "notebook", or "error" if inaccessible)
  - **Size**: File size formatted as B, KB, or MB (empty for directories)
  - **Last_Modified**: Last modification timestamp in YYYY-MM-DD HH:MM:SS format

#### 2. `list_kernels`

- List all available kernels in the Jupyter server.
- This tool shows all running and available kernel sessions on the Jupyter server, including their IDs, names, states, connection information, and kernel specifications. Useful for monitoring kernel resources and identifying specific kernels for connection.
- Input: None
- Returns: Tab-separated table with columns: ID, Name, Display_Name, Language, State, Connections, Last_Activity, Environment
  - **ID**: Unique kernel identifier
  - **Name**: Kernel name/type (e.g., "python3", "ir", etc.)
  - **Display_Name**: Human-readable kernel name from kernel spec
  - **Language**: Programming language supported by the kernel
  - **State**: Current execution state ("idle", "busy", "unknown")
  - **Connections**: Number of active connections to this kernel
  - **Last_Activity**: Timestamp of last kernel activity in YYYY-MM-DD HH:MM:SS format
  - **Environment**: Environment variables defined in the kernel spec (truncated if long)

#### 3. `assign_kernel_to_notebook`

- Assign a kernel to a notebook by creating a Jupyter session.
- This creates a Jupyter server session that connects a notebook file to a kernel, enabling code execution in the notebook. Sessions are the mechanism Jupyter uses to maintain the relationship between notebooks and their kernels.
- Input:
  - `notebook_path`(string): Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb")
  - `kernel_id`(string): ID of the kernel to assign to the notebook
  - `session_name`(string, optional): Optional name for the session (defaults to notebook path)
- Returns: Success message with session information including session ID

## Multi-Notebook Management Tools (4 tools)

#### 4. `use_notebook`

- Connect to a notebook file or create a new one.
- Input:
  - `notebook_name`(string): Unique identifier for the notebook
  - `notebook_path`(string, optional): Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb"). If not provided, switches to an already-connected notebook with the given name.
  - `mode`(string): "connect" to connect to existing, "create" to create new (default: "connect")
  - `kernel_id`(string, optional): Specific kernel ID to use (optional, will create new if not provided)
- Returns: Success message with notebook information

#### 5. `list_notebooks`

- List all notebooks in the Jupyter server (including subdirectories) and show which ones are managed by the notebook manager. To interact with a notebook, it has to be "managed". If a notebook is not managed, you can connect to it using the `use_notebook` tool.
- Input: None
- Returns: TSV formatted table with notebook information (Path, Managed, Name, Status, Current)
  - **Path**: Relative path to the notebook file in the Jupyter server
  - **Managed**: "Yes" if the notebook is currently managed by the MCP server, "No" otherwise
  - **Name**: Unique identifier for managed notebooks, "-" for unmanaged notebooks
  - **Status**: Kernel status for managed notebooks ("alive", "dead", etc.), "-" for unmanaged notebooks
  - **Current**: "✓" if this is the currently active managed notebook, empty otherwise

#### 6. `restart_notebook`

- Restart the kernel for a specific notebook.
- Input:
  - `notebook_name`(string): Notebook identifier to restart
- Returns: Success message

#### 7. `unuse_notebook`

- Unuse from a specific notebook and release its resources.
- Input:
  - `notebook_name`(string): Notebook identifier to disconnect
- Returns: Success message

## Cell Tools (9 tools)

#### 8. `insert_cell`

- Insert a cell to specified position with unified API.
- Input:
  - `cell_index`(int): Target index for insertion (0-based). Use -1 to append at end.
  - `cell_type`(string): Type of cell to insert ("code" or "markdown").
  - `cell_source`(string): Source content for the cell.
- Returns: Success message and the structure of its surrounding cells (up to 5 cells above and 5 cells below).

#### 9. `insert_execute_code_cell`

- Insert and execute a code cell in a Jupyter notebook.
- Input:
  - `cell_index`(int): Index of the cell to insert (0-based). Use -1 to append at end and execute.
  - `cell_source`(string): Code source.
- Returns: List of outputs from the executed cell (supports multimodal output including images).

#### 10. `delete_cell`

- Delete a specific cell from the notebook.
- Input:
  - `cell_index`(int): Index of the cell to delete (0-based).
- Returns: Success message.

#### 11. `read_cell`

- Read a specific cell from the notebook.
- Input:
  - `cell_index`(int): Index of the cell to read (0-based).
- Returns: Dictionary with cell index, type, source, and outputs (for code cells).

#### 12. `read_cells`

- Read all cells from the notebook.
- Returns:  List of cell information including index, type, source, and outputs (for code cells).

#### 13. `list_cells`

- List the basic information of all cells in the notebook.
- Returns a formatted table showing the index, type, execution count (for code cells), and first line of each cell.
- Provides a quick overview of the notebook structure and is useful for locating specific cells for operations.
- Input: None
- Returns: Formatted table string with cell information (Index, Type, Count, First Line).

#### 14. `overwrite_cell_source`

- Overwrite the source of an existing cell.
- Input:
  - `cell_index`(int): Index of the cell to overwrite (0-based).
  - `cell_source`(string): New cell source - must match existing cell type.
- Returns: Success message and diff style.

#### 15. `execute_cell`

- Execute a cell with configurable timeout and optional streaming progress updates.
- Input:
  - `cell_index`: Index of the cell to execute (0-based)
  - `timeout_seconds`: Maximum time to wait for execution (default: 300s)
  - `stream`: Enable streaming progress updates for long-running cells (default: False)
  - `progress_interval`: Seconds between progress updates when stream=True (default: 5s)
- Returns:
  - `list[Union[str, ImageContent]]`: List of outputs from the executed cell (supports multimodal output including images)
- Use `stream=False` for short-running cells (more reliable)
- Use `stream=True` for long-running cells (provides real-time feedback)

#### 16. `execute_ipython`

- Execute IPython code directly in the kernel on the current active notebook.
- This powerful tool supports:
  1. Magic commands (e.g., %timeit, %who, %load, %run, %matplotlib)
  2. Shell commands (e.g., !pip install, !ls, !cat)
  3. Python code (e.g., print(df.head()), df.info())
- Use cases:
  - Performance profiling and debugging
  - Environment exploration and package management
  - Variable inspection and data analysis
  - File system operations on Jupyter server
  - Temporary calculations and quick tests
- Input:
  - `code`(string): IPython code to execute (supports magic commands, shell commands with !, and Python code)
  - `timeout`(int): Execution timeout in seconds (default: 60s)
- Returns:
  - `list[Union[str, ImageContent]]`: List of outputs from the executed code (supports multimodal output including images)

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/execute_ipython_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Execute IPython code directly in kernel tool."""

import asyncio
import logging
from typing import Union

from mcp.types import ImageContent

from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager

logger = logging.getLogger(__name__)


class ExecuteIpythonTool(BaseTool):
    """Execute IPython code directly in the kernel on the current active notebook.
    
    This powerful tool supports:
    1. Magic commands (e.g., %timeit, %who, %load, %run, %matplotlib)
    2. Shell commands (e.g., !pip install, !ls, !cat)
    3. Python code (e.g., print(df.head()), df.info())
    
    Use cases:
    - Performance profiling and debugging
    - Environment exploration and package management
    - Variable inspection and data analysis
    - File system operations on Jupyter server
    - Temporary calculations and quick tests
    """
    
    @property
    def name(self) -> str:
        return "execute_ipython"
    
    @property
    def description(self) -> str:
        return "Execute IPython code directly in the kernel (supports magic commands, shell commands, and Python code)"
    
    async def _execute_via_kernel_manager(
        self,
        kernel_manager,
        kernel_id: str,
        code: str,
        timeout: int,
        safe_extract_outputs_fn
    ) -> list[Union[str, ImageContent]]:
        """Execute code using kernel_manager (JUPYTER_SERVER mode).
        
        Uses execute_code_local which handles ZMQ message collection properly.
        """
        from jupyter_mcp_server.utils import execute_code_local
        
        # Get serverapp from kernel_manager
        serverapp = kernel_manager.parent
        
        # Use centralized execute_code_local function
        return await execute_code_local(
            serverapp=serverapp,
            notebook_path="",  # Not needed for execute_ipython
            code=code,
            kernel_id=kernel_id,
            timeout=timeout,
            logger=logger
        )
    
    async def _execute_via_notebook_manager(
        self,
        notebook_manager: NotebookManager,
        code: str,
        timeout: int,
        ensure_kernel_alive_fn,
        wait_for_kernel_idle_fn,
        safe_extract_outputs_fn
    ) -> list[Union[str, ImageContent]]:
        """Execute code using notebook_manager (MCP_SERVER mode - original logic)."""
        # Get current notebook name and kernel
        current_notebook = notebook_manager.get_current_notebook() or "default"
        kernel = notebook_manager.get_kernel(current_notebook)
        
        if not kernel:
            # Ensure kernel is alive
            kernel = ensure_kernel_alive_fn()
        
        # Wait for kernel to be idle before executing
        await wait_for_kernel_idle_fn(kernel, max_wait_seconds=30)
        
        logger.info(f"Executing IPython code (MCP_SERVER) with timeout {timeout}s: {code[:100]}...")
        
        try:
            # Execute code directly with kernel
            execution_task = asyncio.create_task(
                asyncio.to_thread(kernel.execute, code)
            )
            
            # Wait for execution with timeout
            try:
                outputs = await asyncio.wait_for(execution_task, timeout=timeout)
            except asyncio.TimeoutError:
                execution_task.cancel()
                try:
                    if kernel and hasattr(kernel, 'interrupt'):
                        kernel.interrupt()
                        logger.info("Sent interrupt signal to kernel due to timeout")
                except Exception as interrupt_err:
                    logger.error(f"Failed to interrupt kernel: {interrupt_err}")
                
                return [f"[TIMEOUT ERROR: IPython execution exceeded {timeout} seconds and was interrupted]"]
            
            # Process and extract outputs
            if outputs:
                result = safe_extract_outputs_fn(outputs['outputs'])
                logger.info(f"IPython execution completed successfully with {len(result)} outputs")
                return result
            else:
                return ["[No output generated]"]
                
        except Exception as e:
            logger.error(f"Error executing IPython code: {e}")
            return [f"[ERROR: {str(e)}]"]
    
    async def execute(
        self,
        mode: ServerMode,
        server_client=None,
        contents_manager=None,
        kernel_manager=None,
        kernel_spec_manager=None,
        notebook_manager=None,
        # Tool-specific parameters
        code: str = None,
        timeout: int = 60,
        kernel_id: str = None,
        ensure_kernel_alive_fn=None,
        wait_for_kernel_idle_fn=None,
        safe_extract_outputs_fn=None,
        **kwargs
    ) -> list[Union[str, ImageContent]]:
        """Execute IPython code directly in the kernel.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: JupyterServerClient (not used)
            contents_manager: Contents manager (not used)
            kernel_manager: Kernel manager (for JUPYTER_SERVER mode)
            kernel_spec_manager: Kernel spec manager (not used)
            notebook_manager: Notebook manager (for MCP_SERVER mode)
            code: IPython code to execute (supports magic commands, shell commands with !, and Python code)
            timeout: Execution timeout in seconds (default: 60s)
            kernel_id: Kernel ID (for JUPYTER_SERVER mode)
            ensure_kernel_alive_fn: Function to ensure kernel is alive (for MCP_SERVER mode)
            wait_for_kernel_idle_fn: Function to wait for kernel idle state (for MCP_SERVER mode)
            safe_extract_outputs_fn: Function to safely extract outputs
            
        Returns:
            List of outputs from the executed code
        """
        if safe_extract_outputs_fn is None:
            raise ValueError("safe_extract_outputs_fn is required")
        
        # JUPYTER_SERVER mode: Use kernel_manager directly
        if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None:
            if kernel_id is None:
                # Try to get kernel_id from context
                from jupyter_mcp_server.utils import get_current_notebook_context
                _, kernel_id = get_current_notebook_context(notebook_manager)
            
            if kernel_id is None:
                # No kernel available - start a new one on demand
                logger.info("No kernel_id available, starting new kernel for execute_ipython")
                kernel_id = await kernel_manager.start_kernel()
                
                # Store the kernel in notebook_manager if available
                if notebook_manager is not None:
                    default_notebook = "default"
                    kernel_info = {"id": kernel_id}
                    notebook_manager.add_notebook(
                        default_notebook,
                        kernel_info,
                        server_url="local",
                        token=None,
                        path="notebook.ipynb"  # Placeholder path
                    )
                    notebook_manager.set_current_notebook(default_notebook)
            
            logger.info(f"Executing IPython in JUPYTER_SERVER mode with kernel_id={kernel_id}")
            return await self._execute_via_kernel_manager(
                kernel_manager=kernel_manager,
                kernel_id=kernel_id,
                code=code,
                timeout=timeout,
                safe_extract_outputs_fn=safe_extract_outputs_fn
            )
        
        # MCP_SERVER mode: Use notebook_manager (original behavior)
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            if ensure_kernel_alive_fn is None:
                raise ValueError("ensure_kernel_alive_fn is required for MCP_SERVER mode")
            if wait_for_kernel_idle_fn is None:
                raise ValueError("wait_for_kernel_idle_fn is required for MCP_SERVER mode")
            
            logger.info("Executing IPython in MCP_SERVER mode")
            return await self._execute_via_notebook_manager(
                notebook_manager=notebook_manager,
                code=code,
                timeout=timeout,
                ensure_kernel_alive_fn=ensure_kernel_alive_fn,
                wait_for_kernel_idle_fn=wait_for_kernel_idle_fn,
                safe_extract_outputs_fn=safe_extract_outputs_fn
            )
        
        else:
            return [f"[ERROR: Invalid mode or missing required managers]"]


```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/delete_cell_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Delete cell tool implementation."""

from typing import Any, Optional
from pathlib import Path
import nbformat
from jupyter_server_api import JupyterServerClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager
from jupyter_mcp_server.utils import get_current_notebook_context


class DeleteCellTool(BaseTool):
    """Tool to delete a specific cell from a notebook."""
    
    @property
    def name(self) -> str:
        return "delete_cell"
    
    @property
    def description(self) -> str:
        return """Delete a specific cell from the Jupyter notebook.
    
Args:
    cell_index: Index of the cell to delete (0-based)
    
Returns:
    str: Success message"""
    
    async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str):
        """Get the YNotebook document if it's currently open in a collaborative session.
        
        This follows the jupyter_ai_tools pattern of accessing YDoc through the
        yroom_manager when the notebook is actively being edited.
        
        Args:
            serverapp: The Jupyter ServerApp instance
            file_id: The file ID for the document
            
        Returns:
            YNotebook instance or None if not in a collaborative session
        """
        try:
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            if yroom_manager is None:
                return None
                
            room_id = f"json:notebook:{file_id}"
            
            if yroom_manager.has_room(room_id):
                yroom = yroom_manager.get_room(room_id)
                notebook = await yroom.get_jupyter_ydoc()
                return notebook
        except Exception:
            # YDoc not available, will fall back to file operations
            pass
        
        return None
    
    def _get_cell_index_from_id(self, ydoc, cell_id: str) -> Optional[int]:
        """Find cell index by cell ID in YDoc."""
        for i, ycell in enumerate(ydoc.ycells):
            if ycell.get("id") == cell_id:
                return i
        return None
    
    async def _delete_cell_ydoc(
        self,
        serverapp: Any,
        notebook_path: str,
        cell_index: int
    ) -> str:
        """Delete cell using YDoc (collaborative editing mode).
        
        Args:
            serverapp: Jupyter ServerApp instance
            notebook_path: Path to the notebook
            cell_index: Index of cell to delete
            
        Returns:
            Success message
        """
        # Get file_id from file_id_manager
        file_id_manager = serverapp.web_app.settings.get("file_id_manager")
        if file_id_manager is None:
            raise RuntimeError("file_id_manager not available in serverapp")
        
        file_id = file_id_manager.get_id(notebook_path)
        
        # Try to get YDoc
        ydoc = await self._get_jupyter_ydoc(serverapp, file_id)
        
        if ydoc:
            # Notebook is open in collaborative mode, use YDoc
            if cell_index < 0 or cell_index >= len(ydoc.ycells):
                raise ValueError(
                    f"Cell index {cell_index} is out of range. Notebook has {len(ydoc.ycells)} cells."
                )
            
            cell_type = ydoc.ycells[cell_index].get("cell_type", "unknown")
            
            # Delete the cell from YDoc
            del ydoc.ycells[cell_index]
            
            return f"Cell {cell_index} ({cell_type}) deleted successfully."
        else:
            # YDoc not available, use file operations
            return await self._delete_cell_file(notebook_path, cell_index)
    
    async def _delete_cell_file(
        self,
        notebook_path: str,
        cell_index: int
    ) -> str:
        """Delete cell using file operations (non-collaborative mode).
        
        Args:
            notebook_path: Absolute path to the notebook
            cell_index: Index of cell to delete
            
        Returns:
            Success message
        """
        # Read notebook file as version 4 for consistency
        with open(notebook_path, "r", encoding="utf-8") as f:
            notebook = nbformat.read(f, as_version=4)
        
        # Clean transient fields from outputs
        from jupyter_mcp_server.utils import _clean_notebook_outputs
        _clean_notebook_outputs(notebook)
        
        # Validate index
        if cell_index < 0 or cell_index >= len(notebook.cells):
            raise ValueError(
                f"Cell index {cell_index} is out of range. Notebook has {len(notebook.cells)} cells."
            )
        
        cell_type = notebook.cells[cell_index].cell_type
        
        # Delete the cell
        notebook.cells.pop(cell_index)
        
        # Write back to file
        with open(notebook_path, "w", encoding="utf-8") as f:
            nbformat.write(notebook, f)
        
        return f"Cell {cell_index} ({cell_type}) deleted successfully."
    
    async def _delete_cell_websocket(
        self,
        notebook_manager: NotebookManager,
        cell_index: int
    ) -> str:
        """Delete cell using WebSocket connection (MCP_SERVER mode).
        
        Args:
            notebook_manager: Notebook manager instance
            cell_index: Index of cell to delete
            
        Returns:
            Success message
        """
        async with notebook_manager.get_current_connection() as notebook:
            if cell_index < 0 or cell_index >= len(notebook):
                raise ValueError(
                    f"Cell index {cell_index} is out of range. Notebook has {len(notebook)} cells."
                )

            deleted_content = notebook.delete_cell(cell_index)
            return f"Cell {cell_index} ({deleted_content['cell_type']}) deleted successfully."
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        # Tool-specific parameters
        cell_index: int = None,
        **kwargs
    ) -> str:
        """Execute the delete_cell tool.
        
        This tool supports three modes of operation:
        
        1. JUPYTER_SERVER mode with YDoc (collaborative):
           - Checks if notebook is open in a collaborative session
           - Uses YDoc for real-time collaborative editing
           - Changes are immediately visible to all connected users
           
        2. JUPYTER_SERVER mode without YDoc (file-based):
           - Falls back to direct file operations using nbformat
           - Suitable when notebook is not actively being edited
           
        3. MCP_SERVER mode (WebSocket):
           - Uses WebSocket connection to remote Jupyter server
           - Accesses YDoc through NbModelClient
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: HTTP client for MCP_SERVER mode
            contents_manager: Direct API access for JUPYTER_SERVER mode
            notebook_manager: Notebook manager instance
            cell_index: Index of the cell to delete (0-based)
            **kwargs: Additional parameters
            
        Returns:
            Success message
        """
        if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
            # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations
            from jupyter_mcp_server.jupyter_extension.context import get_server_context
            
            context = get_server_context()
            serverapp = context.serverapp
            notebook_path, _ = get_current_notebook_context(notebook_manager)
            
            # Resolve to absolute path
            if serverapp and not Path(notebook_path).is_absolute():
                root_dir = serverapp.root_dir
                notebook_path = str(Path(root_dir) / notebook_path)
            
            if serverapp:
                # Try YDoc approach first
                return await self._delete_cell_ydoc(serverapp, notebook_path, cell_index)
            else:
                # Fall back to file operations
                return await self._delete_cell_file(notebook_path, cell_index)
                
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            # MCP_SERVER mode: Use WebSocket connection
            return await self._delete_cell_websocket(notebook_manager, cell_index)
        else:
            raise ValueError(f"Invalid mode or missing required clients: mode={mode}")

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/list_kernels_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""List all available kernels tool."""

from typing import Any, Optional, List, Dict
from jupyter_server_api import JupyterServerClient

from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.utils import format_TSV


class ListKernelsTool(BaseTool):
    """List all available kernels in the Jupyter server.
    
    This tool shows all running and available kernel sessions on the Jupyter server,
    including their IDs, names, states, connection information, and kernel specifications.
    Useful for monitoring kernel resources and identifying specific kernels for connection.
    """
    
    @property
    def name(self) -> str:
        return "list_kernels"
    
    @property
    def description(self) -> str:
        return "List all available kernels in the Jupyter server"
    
    def _list_kernels_http(self, server_client: JupyterServerClient) -> List[Dict[str, str]]:
        """List kernels using HTTP API (MCP_SERVER mode)."""
        try:
            # Get all kernels from the Jupyter server
            kernels = server_client.kernels.list_kernels()
            
            if not kernels:
                return []
            
            # Get kernel specifications for additional details
            kernels_specs = server_client.kernelspecs.list_kernelspecs()
            
            # Create enhanced kernel information list
            output = []
            for kernel in kernels:
                kernel_info = {
                    "id": kernel.id or "unknown",
                    "name": kernel.name or "unknown",
                    "state": "unknown",
                    "connections": "unknown", 
                    "last_activity": "unknown",
                    "display_name": "unknown",
                    "language": "unknown",
                    "env": "unknown"
                }
                
                # Get kernel state - this might vary depending on the API version
                if hasattr(kernel, 'execution_state'):
                    kernel_info["state"] = kernel.execution_state
                elif hasattr(kernel, 'state'):
                    kernel_info["state"] = kernel.state
                
                # Get connection count
                if hasattr(kernel, 'connections'):
                    kernel_info["connections"] = str(kernel.connections)
                
                # Get last activity
                if hasattr(kernel, 'last_activity') and kernel.last_activity:
                    if hasattr(kernel.last_activity, 'strftime'):
                        kernel_info["last_activity"] = kernel.last_activity.strftime("%Y-%m-%d %H:%M:%S")
                    else:
                        kernel_info["last_activity"] = str(kernel.last_activity)
                
                output.append(kernel_info)
            
            # Enhance kernel info with specifications
            for kernel in output:
                kernel_name = kernel["name"]
                if hasattr(kernels_specs, 'kernelspecs') and kernel_name in kernels_specs.kernelspecs:
                    kernel_spec = kernels_specs.kernelspecs[kernel_name]
                    if hasattr(kernel_spec, 'spec'):
                        if hasattr(kernel_spec.spec, 'display_name'):
                            kernel["display_name"] = kernel_spec.spec.display_name
                        if hasattr(kernel_spec.spec, 'language'):
                            kernel["language"] = kernel_spec.spec.language
                        if hasattr(kernel_spec.spec, 'env'):
                            # Convert env dict to a readable string format
                            env_dict = kernel_spec.spec.env
                            if env_dict:
                                env_str = "; ".join([f"{k}={v}" for k, v in env_dict.items()])
                                kernel["env"] = env_str[:100] + "..." if len(env_str) > 100 else env_str
            
            return output
            
        except Exception as e:
            raise RuntimeError(f"Error listing kernels via HTTP: {str(e)}")
    
    async def _list_kernels_local(
        self, 
        kernel_manager: Any, 
        kernel_spec_manager: Any
    ) -> List[Dict[str, str]]:
        """List kernels using local kernel_manager API (JUPYTER_SERVER mode)."""
        try:
            # Get all running kernels - list_kernels() returns dicts with kernel info
            kernel_infos = list(kernel_manager.list_kernels())
            
            if not kernel_infos:
                return []
            
            # Get kernel specifications
            kernel_specs = kernel_spec_manager.get_all_specs() if kernel_spec_manager else {}
            
            # Create enhanced kernel information list
            output = []
            for kernel_info_dict in kernel_infos:
                # kernel_info_dict is already a dict with kernel information
                kernel_id = kernel_info_dict.get('id', 'unknown')
                kernel_name = kernel_info_dict.get('name', 'unknown')
                
                kernel_info = {
                    "id": kernel_id,
                    "name": kernel_name,
                    "state": kernel_info_dict.get('execution_state', 'unknown'),
                    "connections": str(kernel_info_dict.get('connections', 'unknown')),
                    "last_activity": "unknown",
                    "display_name": "unknown",
                    "language": "unknown",
                    "env": "unknown"
                }
                
                # Format last activity if present
                last_activity = kernel_info_dict.get('last_activity')
                if last_activity:
                    if hasattr(last_activity, 'strftime'):
                        kernel_info["last_activity"] = last_activity.strftime("%Y-%m-%d %H:%M:%S")
                    else:
                        kernel_info["last_activity"] = str(last_activity)
                
                output.append(kernel_info)
            
            # Enhance kernel info with specifications
            for kernel in output:
                kernel_name = kernel["name"]
                if kernel_name in kernel_specs:
                    spec = kernel_specs[kernel_name].get('spec', {})
                    if 'display_name' in spec:
                        kernel["display_name"] = spec['display_name']
                    if 'language' in spec:
                        kernel["language"] = spec['language']
                    if 'env' in spec and spec['env']:
                        env_dict = spec['env']
                        env_str = "; ".join([f"{k}={v}" for k, v in env_dict.items()])
                        kernel["env"] = env_str[:100] + "..." if len(env_str) > 100 else env_str
            
            return output
            
        except Exception as e:
            raise RuntimeError(f"Error listing kernels locally: {str(e)}")
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        **kwargs
    ) -> str:
        """List all available kernels.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: HTTP client for MCP_SERVER mode
            kernel_manager: Direct kernel manager access for JUPYTER_SERVER mode
            kernel_spec_manager: Kernel spec manager for JUPYTER_SERVER mode
            **kwargs: Additional parameters (unused)
            
        Returns:
            Tab-separated table with columns: ID, Name, Display_Name, Language, State, Connections, Last_Activity, Environment
        """
        # Get kernel info based on mode
        if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None:
            kernel_list = await self._list_kernels_local(kernel_manager, kernel_spec_manager)
        elif mode == ServerMode.MCP_SERVER and server_client is not None:
            kernel_list = self._list_kernels_http(server_client)
        else:
            raise ValueError(f"Invalid mode or missing required managers/clients: mode={mode}")
        
        if not kernel_list:
            return "No kernels found on the Jupyter server."
        
        try:
            # Create TSV formatted output
            headers = ["ID", "Name", "Display_Name", "Language", "State", "Connections", "Last_Activity", "Environment"]
            rows = []
            
            for kernel in kernel_list:
                rows.append([kernel['id'], kernel['name'], kernel['display_name'], kernel['language'], kernel['state'], kernel['connections'], kernel['last_activity'], kernel['env']])
            
            return format_TSV(headers, rows)
            
        except Exception as e:
            return f"Error formatting kernel list: {str(e)}"


```

--------------------------------------------------------------------------------
/.github/copilot-instructions.md:
--------------------------------------------------------------------------------

```markdown
# Jupyter MCP Server

**Always reference these instructions first and fallback to search or bash commands only when you encounter unexpected information that does not match the info here.**

Jupyter MCP Server is a Python-based Model Context Protocol (MCP) server implementation that enables real-time interaction with Jupyter Notebooks. The project uses a modern Python build system with hatch, and includes comprehensive testing, linting, and documentation.

## Working Effectively

### Environment Setup
- **Python Requirements**: Python 3.10 or higher (tested with 3.9-3.13)
- **Network Considerations**: PyPI installs may fail due to SSL certificate issues or timeout limitations. This is a known environment constraint.

### Build and Install (CRITICAL: Network Limitations)
```bash
# Standard installation (may fail with network issues)
pip install ".[test,lint,typing]"

# Alternative if pip install fails:
# 1. Install dependencies individually with longer timeouts
pip install --timeout=300 pytest
pip install --timeout=300 ruff  
pip install --timeout=300 mypy

# 2. Or use Docker approach (preferred for consistency)
docker build -t jupyter-mcp-server .
```

**NETWORK TIMEOUT WARNING**: pip install commands may fail with SSL certificate errors or read timeouts when connecting to PyPI. If installs fail:
- Try increasing timeout: `pip install --timeout=300`
- Use Docker build which handles dependencies internally
- Document the network limitation in any testing notes

### Core Development Commands
```bash
# Development installation (when network allows)
make dev
# Equivalent to: pip install ".[test,lint,typing]"

# Basic installation  
make install
# Equivalent to: pip install .

# Build the package
make build
# Equivalent to: pip install build && python -m build .
```

### Testing (CRITICAL: Use Long Timeouts)
```bash
# Run tests using hatch (when available)
make test
# Equivalent to: hatch test

# Run tests directly with pytest (when network allows install)
pytest .

# NEVER CANCEL: Test suite timing expectations
# - Full test suite: Allow 15-20 minutes minimum
# - Network-dependent tests may take longer
# - Set timeout to 30+ minutes for safety
```

**VALIDATION REQUIREMENT**: When testing is not possible due to network issues, verify at minimum:
```bash
# Syntax validation (always works)
python -m py_compile jupyter_mcp_server/server.py
find . -name "*.py" -exec python -m py_compile {} \;

# Import validation
PYTHONPATH=. python -c "import jupyter_mcp_server; print('Import successful')"
```

### Linting and Code Quality (CRITICAL: Use Long Timeouts)
```bash
# Full linting pipeline (when network allows)
bash ./.github/workflows/lint.sh

# Individual linting commands:
pip install -e ".[lint,typing]"
mypy --install-types --non-interactive .  # May take 10+ minutes, NEVER CANCEL
ruff check .                              # Quick, usually <1 minute  
mdformat --check *.md                     # Quick, usually <1 minute
pipx run 'validate-pyproject[all]' pyproject.toml  # 2-3 minutes

# TIMING WARNING: mypy type checking can take 10+ minutes on first run
# Set timeout to 20+ minutes for mypy operations
```

### Running the Application

#### Local Development Mode
```bash
# Start with streamable HTTP transport
make start
# Equivalent to:
jupyter-mcp-server start \
  --transport streamable-http \
  --document-url http://localhost:8888 \
  --document-id notebook.ipynb \
  --document-token MY_TOKEN \
  --runtime-url http://localhost:8888 \
  --start-new-runtime true \
  --runtime-token MY_TOKEN \
  --port 4040
```

#### JupyterLab Setup (Required for Testing)
```bash
# Start JupyterLab server for MCP integration
make jupyterlab
# Equivalent to:
pip uninstall -y pycrdt datalayer_pycrdt
pip install datalayer_pycrdt
jupyter lab \
  --port 8888 \
  --ip 0.0.0.0 \
  --ServerApp.root_dir ./dev/content \
  --IdentityProvider.token MY_TOKEN
```

#### Docker Deployment
```bash
# Build Docker image (NEVER CANCEL: Build takes 10-15 minutes)
make build-docker  # Takes 10-15 minutes, set timeout to 20+ minutes

# Run with Docker  
make start-docker
# Or manually:
docker run -i --rm \
  -e DOCUMENT_URL=http://localhost:8888 \
  -e DOCUMENT_ID=notebook.ipynb \
  -e DOCUMENT_TOKEN=MY_TOKEN \
  -e RUNTIME_URL=http://localhost:8888 \
  -e START_NEW_RUNTIME=true \
  -e RUNTIME_TOKEN=MY_TOKEN \
  --network=host \
  datalayer/jupyter-mcp-server:latest
```

### Manual Validation Scenarios

**When full testing is not possible due to network constraints, always verify:**

1. **Syntax and Import Validation**:
   ```bash
   # Validate all Python files compile
   find . -name "*.py" -exec python -m py_compile {} \;
   
   # Test local imports work
   PYTHONPATH=. python -c "import jupyter_mcp_server; print('SUCCESS')"
   ```

2. **Configuration Validation**:
   ```bash
   # Verify pyproject.toml is valid
   python -c "import tomllib; tomllib.load(open('pyproject.toml', 'rb'))"
   
   # Test module structure
   python -c "import jupyter_mcp_server.server, jupyter_mcp_server.models"
   ```

3. **Documentation Build** (when Node.js available):
   ```bash
   cd docs/
   npm install  # May have network issues
   npm run build  # 3-5 minutes, set timeout to 10+ minutes
   ```

## Project Structure and Navigation

### Key Directories
- **`jupyter_mcp_server/`**: Main Python package
  - `server.py`: Core MCP server implementation with FastMCP integration
  - `models.py`: Pydantic data models for document and runtime handling
  - `utils.py`: Utility functions for output extraction and processing
  - `tests/`: Unit tests (internal package tests)
- **`tests/`**: Integration tests using pytest-asyncio
- **`docs/`**: Docusaurus-based documentation site (Node.js/React)
- **`dev/content/`**: Development Jupyter notebook files for testing
- **`.github/workflows/`**: CI/CD pipeline definitions

### Important Files
- **`pyproject.toml`**: Build configuration, dependencies, and tool settings
- **`Makefile`**: Development workflow automation
- **`Dockerfile`**: Container build definition
- **`.github/workflows/lint.sh`**: Linting pipeline script
- **`pytest.ini`**: Test configuration

### Frequently Modified Areas
- **Server Logic**: `jupyter_mcp_server/server.py` - Main MCP server implementation
- **Data Models**: `jupyter_mcp_server/models.py` - When adding new MCP tools or changing data structures
- **Tests**: `tests/test_mcp.py` - Integration tests for MCP functionality
- **Documentation**: `docs/src/` - When updating API documentation or user guides

## Common Tasks and Gotchas

### Adding New MCP Tools
1. Add tool definition in `jupyter_mcp_server/server.py`
2. Update models in `jupyter_mcp_server/models.py` if needed
3. Add tests in `tests/test_mcp.py`
4. Update documentation in `docs/`

### Dependency Management
- **Core deps**: Defined in `pyproject.toml` dependencies section
- **Dev deps**: Use `[test,lint,typing]` optional dependencies
- **Special handling**: `datalayer_pycrdt` has specific version requirements (0.12.17)

### CI/CD Pipeline Expectations
- **Build Matrix**: Tests run on Ubuntu, macOS, Windows with Python 3.9, 3.13
- **Critical Timing**: Full CI pipeline takes 20-30 minutes
- **Required Checks**: pytest, ruff, mypy, mdformat, pyproject validation

### Environment Variables for Testing
```bash
# Required for MCP server operation
export DOCUMENT_URL="http://localhost:8888"
export DOCUMENT_TOKEN="MY_TOKEN"
export DOCUMENT_ID="notebook.ipynb"
export RUNTIME_URL="http://localhost:8888"
export RUNTIME_TOKEN="MY_TOKEN"
```

## Network Limitations and Workarounds

**CRITICAL CONSTRAINT**: This development environment has limited PyPI connectivity with SSL certificate issues and timeout problems.

### Known Working Commands
```bash
# These always work (no network required):
python -m py_compile <file>          # Syntax validation
PYTHONPATH=. python -c "import ..."  # Import testing  
python -c "import tomllib; ..."      # Config validation
git operations                       # Version control
docker build (when base images cached)
```

### Commands That May Fail
```bash
pip install <anything>               # Network timeouts/SSL issues
npm install                          # Network limitations  
mypy --install-types                 # Downloads type stubs
hatch test                           # May need PyPI for dependencies
```

### Required Workarounds
1. **Document network failures** when they occur: "pip install fails due to network limitations"
2. **Use syntax validation** instead of full testing when pip installs fail
3. **Prefer Docker approach** for consistent builds when possible
4. **Set generous timeouts** (60+ minutes) for any network operations
5. **Never cancel long-running commands** - document expected timing instead

## Timing Expectations

**NEVER CANCEL these operations - they are expected to take significant time:**

- **pip install ".[test,lint,typing]"**: 5-10 minutes (when network works)
- **mypy --install-types --non-interactive**: 10-15 minutes first run
- **Docker build**: 10-15 minutes
- **Full test suite**: 15-20 minutes  
- **Documentation build**: 3-5 minutes
- **CI pipeline**: 20-30 minutes total

Always set timeouts to at least double these estimates to account for network variability.
```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/overwrite_cell_source_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Overwrite cell source tool implementation."""

import difflib
import nbformat
from pathlib import Path
from typing import Any, Optional
from jupyter_server_api import JupyterServerClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager
from jupyter_mcp_server.utils import get_current_notebook_context


class OverwriteCellSourceTool(BaseTool):
    """Tool to overwrite the source of an existing cell."""
    
    @property
    def name(self) -> str:
        return "overwrite_cell_source"
    
    @property
    def description(self) -> str:
        return """Overwrite the source of an existing cell.
Note this does not execute the modified cell by itself.

Args:
    cell_index: Index of the cell to overwrite (0-based)
    cell_source: New cell source - must match existing cell type

Returns:
    str: Success message with diff showing changes made"""
    
    async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str):
        """Get the YNotebook document if it's currently open in a collaborative session."""
        try:
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            if yroom_manager is None:
                return None
                
            room_id = f"json:notebook:{file_id}"
            
            if yroom_manager.has_room(room_id):
                yroom = yroom_manager.get_room(room_id)
                notebook = await yroom.get_jupyter_ydoc()
                return notebook
        except Exception:
            pass
        
        return None
    
    def _generate_diff(self, old_source: str, new_source: str) -> str:
        """Generate unified diff between old and new source."""
        old_lines = old_source.splitlines(keepends=False)
        new_lines = new_source.splitlines(keepends=False)
        
        diff_lines = list(difflib.unified_diff(
            old_lines, 
            new_lines, 
            lineterm='',
            n=3  # Number of context lines
        ))
        
        # Remove the first 3 lines (file headers) from unified_diff output
        if len(diff_lines) > 3:
            return '\n'.join(diff_lines[3:])
        return "no changes detected"
    
    async def _overwrite_cell_ydoc(
        self,
        serverapp: Any,
        notebook_path: str,
        cell_index: int,
        cell_source: str
    ) -> str:
        """Overwrite cell using YDoc (collaborative editing mode)."""
        # Get file_id from file_id_manager
        file_id_manager = serverapp.web_app.settings.get("file_id_manager")
        if file_id_manager is None:
            raise RuntimeError("file_id_manager not available in serverapp")
        
        file_id = file_id_manager.get_id(notebook_path)
        
        # Try to get YDoc
        ydoc = await self._get_jupyter_ydoc(serverapp, file_id)
        
        if ydoc:
            # Notebook is open in collaborative mode, use YDoc
            if cell_index < 0 or cell_index >= len(ydoc.ycells):
                raise ValueError(
                    f"Cell index {cell_index} is out of range. Notebook has {len(ydoc.ycells)} cells."
                )
            
            # Get original cell content
            old_source_raw = ydoc.ycells[cell_index].get("source", "")
            if isinstance(old_source_raw, list):
                old_source = "".join(old_source_raw)
            else:
                old_source = str(old_source_raw)
            
            # Set new cell source
            ydoc.ycells[cell_index]["source"] = cell_source
            
            # Generate diff
            diff_content = self._generate_diff(old_source, cell_source)
            
            if not diff_content.strip() or diff_content == "no changes detected":
                return f"Cell {cell_index} overwritten successfully - no changes detected"
            
            return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```"
        else:
            # YDoc not available, use file operations
            return await self._overwrite_cell_file(notebook_path, cell_index, cell_source)
    
    async def _overwrite_cell_file(
        self,
        notebook_path: str,
        cell_index: int,
        cell_source: str
    ) -> str:
        """Overwrite cell using file operations (non-collaborative mode)."""
        # Read notebook file as version 4 for consistency
        with open(notebook_path, "r", encoding="utf-8") as f:
            notebook = nbformat.read(f, as_version=4)
        
        # Clean transient fields from outputs
        from jupyter_mcp_server.utils import _clean_notebook_outputs
        _clean_notebook_outputs(notebook)
        
        if cell_index < 0 or cell_index >= len(notebook.cells):
            raise ValueError(
                f"Cell index {cell_index} is out of range. Notebook has {len(notebook.cells)} cells."
            )
        
        # Get original cell content
        old_source = notebook.cells[cell_index].source
        
        # Set new cell source
        notebook.cells[cell_index].source = cell_source
        
        # Write back to file
        with open(notebook_path, "w", encoding="utf-8") as f:
            nbformat.write(notebook, f)
        
        # Generate diff
        diff_content = self._generate_diff(old_source, cell_source)
        
        if not diff_content.strip() or diff_content == "no changes detected":
            return f"Cell {cell_index} overwritten successfully - no changes detected"
        
        return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```"
    
    async def _overwrite_cell_websocket(
        self,
        notebook_manager: NotebookManager,
        cell_index: int,
        cell_source: str
    ) -> str:
        """Overwrite cell using WebSocket connection (MCP_SERVER mode)."""
        async with notebook_manager.get_current_connection() as notebook:
            if cell_index < 0 or cell_index >= len(notebook):
                raise ValueError(f"Cell index {cell_index} out of range")
            
            # Get original cell content
            old_source_raw = notebook[cell_index].get("source", "")
            if isinstance(old_source_raw, list):
                old_source = "".join(old_source_raw)
            else:
                old_source = str(old_source_raw)
            
            # Set new cell content
            notebook.set_cell_source(cell_index, cell_source)
            
            # Generate diff
            diff_content = self._generate_diff(old_source, cell_source)
            
            if not diff_content.strip() or diff_content == "no changes detected":
                return f"Cell {cell_index} overwritten successfully - no changes detected"
            
            return f"Cell {cell_index} overwritten successfully!\n\n```diff\n{diff_content}\n```"
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        # Tool-specific parameters
        cell_index: int = None,
        cell_source: str = None,
        **kwargs
    ) -> str:
        """Execute the overwrite_cell_source tool.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            contents_manager: Direct API access for JUPYTER_SERVER mode
            notebook_manager: Notebook manager instance
            cell_index: Index of the cell to overwrite (0-based)
            cell_source: New cell source
            **kwargs: Additional parameters
            
        Returns:
            Success message with diff
        """
        if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
            # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations
            from jupyter_mcp_server.jupyter_extension.context import get_server_context
            
            context = get_server_context()
            serverapp = context.serverapp
            notebook_path, _ = get_current_notebook_context(notebook_manager)
            
            # Resolve to absolute path
            if serverapp and not Path(notebook_path).is_absolute():
                root_dir = serverapp.root_dir
                notebook_path = str(Path(root_dir) / notebook_path)
            
            if serverapp:
                return await self._overwrite_cell_ydoc(serverapp, notebook_path, cell_index, cell_source)
            else:
                return await self._overwrite_cell_file(notebook_path, cell_index, cell_source)
                
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            # MCP_SERVER mode: Use WebSocket connection
            return await self._overwrite_cell_websocket(notebook_manager, cell_index, cell_source)
        else:
            raise ValueError(f"Invalid mode or missing required clients: mode={mode}")

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/jupyter_extension/extension.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Jupyter Server Extension for MCP Protocol

This extension exposes MCP tools directly from a running Jupyter Server,
allowing MCP clients to connect to the Jupyter Server's MCP endpoints.
"""

import logging
from traitlets import Unicode, Bool
from jupyter_server.extension.application import ExtensionApp, ExtensionAppJinjaMixin
from jupyter_server.utils import url_path_join

from jupyter_mcp_server.jupyter_extension.context import get_server_context
from jupyter_mcp_server.jupyter_extension.handlers import (
    MCPHealthHandler,
    MCPToolsListHandler,
    MCPToolsCallHandler,
)


logger = logging.getLogger(__name__)


class JupyterMCPServerExtensionApp(ExtensionAppJinjaMixin, ExtensionApp):
    """
    Jupyter Server Extension for MCP Server.
    
    This extension allows MCP clients to connect to Jupyter Server and use
    MCP tools to interact with notebooks and kernels.
    
    Configuration:
        c.JupyterMCPServerExtensionApp.document_url = "local"  # or http://...
        c.JupyterMCPServerExtensionApp.runtime_url = "local"   # or http://...
        c.JupyterMCPServerExtensionApp.document_id = "notebook.ipynb"
        c.JupyterMCPServerExtensionApp.start_new_runtime = True  # Start new kernel
        c.JupyterMCPServerExtensionApp.runtime_id = "kernel-id"  # Or connect to existing
    """
    
    # Extension metadata
    name = "jupyter_mcp_server"
    default_url = "/mcp"
    load_other_extensions = True
    
    # Configuration traits
    document_url = Unicode(
        "local",
        config=True,
        help='Document URL - use "local" for local serverapp access or http://... for remote'
    )
    
    runtime_url = Unicode(
        "local",
        config=True,
        help='Runtime URL - use "local" for local serverapp access or http://... for remote'
    )
    
    document_id = Unicode(
        "notebook.ipynb",
        config=True,
        help='Default document ID (notebook path)'
    )
    
    start_new_runtime = Bool(
        False,
        config=True,
        help='Whether to start a new kernel runtime on initialization'
    )
    
    runtime_id = Unicode(
        "",
        config=True,
        help='Existing kernel ID to connect to (if not starting new runtime)'
    )
    
    document_token = Unicode(
        "",
        config=True,
        help='Authentication token for document server (if remote)'
    )
    
    runtime_token = Unicode(
        "",
        config=True,
        help='Authentication token for runtime server (if remote)'
    )
    
    provider = Unicode(
        "jupyter",
        config=True,
        help='Provider type for document/runtime'
    )
    
    def initialize_settings(self):
        """
        Initialize extension settings.
        
        This is called during extension loading to set up configuration
        and update the server context.
        """
        # Reduce noise from httpx logging (used by JupyterLab for PyPI extension discovery)
        logging.getLogger("httpx").setLevel(logging.WARNING)
        
        logger.info(f"Initializing Jupyter MCP Server Extension")
        logger.info(f"  Document URL: {self.document_url}")
        logger.info(f"  Runtime URL: {self.runtime_url}")
        logger.info(f"  Document ID: {self.document_id}")
        logger.info(f"  Start New Runtime: {self.start_new_runtime}")
        if self.runtime_id:
            logger.info(f"  Runtime ID: {self.runtime_id}")
        
        # Update the global server context
        context = get_server_context()
        context.update(
            context_type="JUPYTER_SERVER",
            serverapp=self.serverapp,
            document_url=self.document_url,
            runtime_url=self.runtime_url
        )
        
        # Update global MCP configuration
        from jupyter_mcp_server.config import get_config
        config = get_config()
        config.document_url = self.document_url
        config.runtime_url = self.runtime_url
        config.document_id = self.document_id
        config.document_token = self.document_token if self.document_token else None
        config.runtime_token = self.runtime_token if self.runtime_token else None
        config.start_new_runtime = self.start_new_runtime
        config.runtime_id = self.runtime_id if self.runtime_id else None
        config.provider = self.provider
        
        # Store configuration in settings for handlers
        self.settings.update({
            "mcp_document_url": self.document_url,
            "mcp_runtime_url": self.runtime_url,
            "mcp_document_id": self.document_id,
            "mcp_document_token": self.document_token,
            "mcp_runtime_token": self.runtime_token,
            "mcp_start_new_runtime": self.start_new_runtime,
            "mcp_runtime_id": self.runtime_id,
            "mcp_provider": self.provider,
            "mcp_serverapp": self.serverapp,
        })
        
        # Trigger auto-enrollment if document_id is configured
        # Note: Auto-enrollment supports 3 modes:
        # 1. With existing kernel (runtime_id set)
        # 2. With new kernel (start_new_runtime=True)
        # 3. Without kernel - notebook-only mode (both False/None)
        if self.document_id:
            from tornado.ioloop import IOLoop
            from jupyter_mcp_server.enroll import auto_enroll_document
            from jupyter_mcp_server.server import notebook_manager, use_notebook_tool, server_context
            
            # Schedule auto-enrollment to run after Jupyter Server is fully started
            async def _run_auto_enrollment():
                try:
                    logger.info(f"Running auto-enrollment for document '{self.document_id}'")
                    await auto_enroll_document(
                        config=config,
                        notebook_manager=notebook_manager,
                        use_notebook_tool=use_notebook_tool,
                        server_context=server_context,
                    )
                    logger.info(f"Auto-enrollment completed for document '{self.document_id}'")
                except Exception as e:
                    logger.error(f"Failed to auto-enroll document: {e}", exc_info=True)
            
            # Schedule the enrollment to run on the IOLoop after server starts
            # Use callback with delay to ensure server is fully initialized
            IOLoop.current().call_later(1.0, lambda: IOLoop.current().add_callback(_run_auto_enrollment))
        
        logger.info("Jupyter MCP Server Extension settings initialized")
    
    def initialize_handlers(self):
        """
        Register MCP protocol handlers.
        
        Strategy: Implement MCP protocol directly in Tornado handlers that
        call the MCP tools from server.py. This avoids the complexity of
        wrapping the Starlette ASGI app.
        
        Endpoints:
        - GET/POST /mcp - MCP protocol endpoint (SSE-based)
        - GET /mcp/healthz - Health check (Tornado handler)
        - GET /mcp/tools/list - List available tools (Tornado handler)
        - POST /mcp/tools/call - Execute a tool (Tornado handler)
        """
        base_url = self.serverapp.base_url
        
        # Import here to avoid circular imports
        from jupyter_mcp_server.jupyter_extension.handlers import MCPSSEHandler
        
        # Define handlers
        handlers = [
            # MCP protocol endpoint - SSE-based handler
            # Match /mcp with or without trailing slash
            (url_path_join(base_url, "mcp/?"), MCPSSEHandler),
            # Utility endpoints (optional, for debugging)
            (url_path_join(base_url, "mcp/healthz"), MCPHealthHandler),
            (url_path_join(base_url, "mcp/tools/list"), MCPToolsListHandler),
            (url_path_join(base_url, "mcp/tools/call"), MCPToolsCallHandler),
        ]
        
        # Register handlers
        self.handlers.extend(handlers)
        
        # Log registered endpoints using url_path_join for consistent formatting
        logger.info(f"Registered MCP handlers at {url_path_join(base_url, 'mcp/')}")
        logger.info(f"  - MCP protocol: {url_path_join(base_url, 'mcp')} (SSE-based)")
        logger.info(f"  - Health check: {url_path_join(base_url, 'mcp/healthz')}")
        logger.info(f"  - List tools: {url_path_join(base_url, 'mcp/tools/list')}")
        logger.info(f"  - Call tool: {url_path_join(base_url, 'mcp/tools/call')}")
    
    def initialize_templates(self):
        """
        Initialize Jinja templates.
        
        Not needed for API-only extension, but included for completeness.
        """
        pass
    
    async def stop_extension(self):
        """
        Clean up when extension stops.
        
        Shutdown any managed kernels and cleanup resources.
        """
        logger.info("Stopping Jupyter MCP Server Extension")
        
        # Reset server context
        context = get_server_context()
        context.reset()
        
        logger.info("Jupyter MCP Server Extension stopped")


# Extension loading functions

def _jupyter_server_extension_points():
    """
    Declare the Jupyter Server extension.
    
    Returns:
        List of extension metadata dictionaries
    """
    return [
        {
            "module": "jupyter_mcp_server.jupyter_extension.extension",
            "app": JupyterMCPServerExtensionApp
        }
    ]


def _load_jupyter_server_extension(serverapp):
    """
    Load the extension (for backward compatibility).
    
    Args:
        serverapp: Jupyter ServerApp instance
    """
    extension = JupyterMCPServerExtensionApp()
    extension.serverapp = serverapp
    extension.initialize_settings()
    extension.initialize_handlers()
    extension.initialize_templates()


# For classic Notebook server compatibility
load_jupyter_server_extension = _load_jupyter_server_extension

```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Pytest configuration and shared fixtures for Jupyter MCP Server tests.

This module provides:
- jupyter_server fixture: Session-scoped Jupyter Lab server
- jupyter_server_with_extension fixture: Jupyter Lab with MCP extension
- jupyter_mcp_server fixture: Standalone MCP server instance
- mcp_client fixture: MCP protocol client for testing
- _start_server helper: Generic server startup with health checks
- JUPYTER_TOKEN: Authentication token for Jupyter API
"""

import logging
import os
import socket
import subprocess
import time
from http import HTTPStatus

import pytest
import pytest_asyncio
import requests
from requests.exceptions import ConnectionError


JUPYTER_TOKEN = "MY_TOKEN"

# Test mode configuration - set to False to skip testing specific modes
TEST_MCP_SERVER = os.environ.get("TEST_MCP_SERVER", "true").lower() == "true"
TEST_JUPYTER_SERVER = os.environ.get("TEST_JUPYTER_SERVER", "true").lower() == "true"


def _start_server(
    name: str, host: str, port: int, command: list, readiness_endpoint: str, max_retries: int = 5
):
    """A Helper that starts a web server as a python subprocess and wait until it's ready to accept connections

    This method can be used to start both Jupyter and Jupyter MCP servers
    
    Uses subprocess.DEVNULL to prevent pipe blocking issues with verbose output.
    """
    _log_prefix = name
    url = f"http://{host}:{port}"
    url_readiness = f"{url}{readiness_endpoint}"
    logging.info(f"{_log_prefix}: starting ...")
    logging.debug(f"{_log_prefix}: command: {' '.join(command)}")
    
    # Use DEVNULL to prevent any pipe blocking issues
    p_serv = subprocess.Popen(
        command, 
        stdout=subprocess.DEVNULL, 
        stderr=subprocess.DEVNULL
    )
    _log_prefix = f"{_log_prefix} [{p_serv.pid}]"
    
    while max_retries > 0:
        # Check if process died
        poll_result = p_serv.poll()
        if poll_result is not None:
            logging.error(f"{_log_prefix}: process died with exit code {poll_result}")
            pytest.fail(f"{name} failed to start (exit code {poll_result}). Check if port {port} is available.")
        
        try:
            response = requests.get(url_readiness, timeout=10)
            if response is not None and response.status_code == HTTPStatus.OK:
                logging.info(f"{_log_prefix}: started ({url})!")
                yield url
                break
        except (ConnectionError, requests.exceptions.Timeout):
            logging.debug(
                f"{_log_prefix}: waiting to accept connections [{max_retries}]"
            )
            time.sleep(2)
            max_retries -= 1
            
    if not max_retries:
        logging.error(f"{_log_prefix}: fail to start after retries. Check if port {port} is available.")
        pytest.fail(f"{name} failed to start after max retries. Port {port} may be in use or server crashed.")
    logging.debug(f"{_log_prefix}: stopping ...")
    try:
        p_serv.terminate()
        p_serv.wait(timeout=5)  # Reduced timeout for faster cleanup
        logging.info(f"{_log_prefix}: stopped")
    except subprocess.TimeoutExpired:
        logging.warning(f"{_log_prefix}: terminate timeout, forcing kill")
        p_serv.kill()
        try:
            p_serv.wait(timeout=2)
        except subprocess.TimeoutExpired:
            logging.error(f"{_log_prefix}: kill timeout, process may be stuck")
    except Exception as e:
        logging.error(f"{_log_prefix}: error during shutdown: {e}")


@pytest.fixture(scope="session")
def jupyter_server():
    """Start the Jupyter server and returns its URL
    
    This is a session-scoped fixture that starts a single Jupyter Lab instance
    for all tests. Both MCP_SERVER and JUPYTER_SERVER mode tests can share this.
    
    Only starts if at least one test mode is enabled.
    """
    if not TEST_MCP_SERVER and not TEST_JUPYTER_SERVER:
        pytest.skip("Both TEST_MCP_SERVER and TEST_JUPYTER_SERVER are disabled")
    
    host = "localhost"
    port = 8888
    yield from _start_server(
        name="JupyterLab",
        host=host,
        port=port,
        command=[
            "jupyter",
            "lab",
            "--port",
            str(port),
            "--IdentityProvider.token",
            JUPYTER_TOKEN,
            "--ip",
            host,
            "--ServerApp.root_dir",
            "./dev/content",
            "--no-browser",
        ],
        readiness_endpoint="/api",
        max_retries=10,
    )


@pytest.fixture(scope="session")
def jupyter_server_with_extension():
    """Start Jupyter server with MCP extension loaded (JUPYTER_SERVER mode)
    
    This fixture starts Jupyter Lab with the jupyter_mcp_server extension enabled,
    allowing tests to verify JUPYTER_SERVER mode functionality (YDoc, direct kernel access, etc).
    
    Only starts if TEST_JUPYTER_SERVER=True, otherwise skips.
    """
    if not TEST_JUPYTER_SERVER:
        pytest.skip("TEST_JUPYTER_SERVER is disabled")
    
    host = "localhost"
    port = 8889  # Different port to avoid conflicts
    yield from _start_server(
        name="JupyterLab+MCP",
        host=host,
        port=port,
        command=[
            "jupyter",
            "lab",
            "--port",
            str(port),
            "--IdentityProvider.token",
            JUPYTER_TOKEN,
            "--ip",
            host,
            "--ServerApp.root_dir",
            "./dev/content",
            "--no-browser",
            # Load the MCP extension
            "--ServerApp.jpserver_extensions",
            '{"jupyter_mcp_server": True}',
        ],
        readiness_endpoint="/api",
        max_retries=10,
    )


###############################################################################
# MCP Server Fixtures
###############################################################################

@pytest.fixture(scope="function")
def jupyter_mcp_server(request, jupyter_server):
    """Start the Jupyter MCP server and returns its URL
    
    This fixture starts a standalone MCP server that communicates with Jupyter
    via HTTP (MCP_SERVER mode). It can be parametrized to control runtime startup.
    
    Parameters:
        request.param (bool): Whether to start a new kernel runtime (default: True)
    """
    # Find an available port
    def find_free_port():
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind(('', 0))
            s.listen(1)
            port = s.getsockname()[1]
        return port
    
    host = "localhost"
    port = find_free_port()
    start_new_runtime = True
    try:
        start_new_runtime = request.param
    except AttributeError:
        # fixture not parametrized
        pass
    
    yield from _start_server(
        name="Jupyter MCP",
        host=host,
        port=port,
        command=[
            "python",
            "-m",
            "jupyter_mcp_server",
            "--transport",
            "streamable-http",
            "--document-url",
            jupyter_server,
            "--document-id",
            "notebook.ipynb",
            "--document-token",
            JUPYTER_TOKEN,
            "--runtime-url",
            jupyter_server,
            "--start-new-runtime",
            str(start_new_runtime),
            "--runtime-token",
            JUPYTER_TOKEN,
            "--port",
            str(port),
        ],
        readiness_endpoint="/api/healthz",
    )


def _get_test_params():
    """Generate test parameters based on TEST_MCP_SERVER and TEST_JUPYTER_SERVER flags"""
    params = []
    if TEST_MCP_SERVER:
        params.append("mcp_server")
    if TEST_JUPYTER_SERVER:
        params.append("jupyter_extension")
    
    if not params:
        pytest.skip("Both TEST_MCP_SERVER and TEST_JUPYTER_SERVER are disabled")
    
    return params


@pytest.fixture(scope="function", params=_get_test_params())
def mcp_server_url(request):
    """Parametrized fixture that provides both MCP_SERVER and JUPYTER_SERVER mode URLs
    
    This fixture enables testing the same functionality against both deployment modes:
    - mcp_server: Standalone MCP server (HTTP transport) - when TEST_MCP_SERVER=True
    - jupyter_extension: Jupyter extension mode (direct API access) - when TEST_JUPYTER_SERVER=True
    
    Both expose MCP protocol endpoints that can be tested with MCPClient.
    
    You can control which modes to test via environment variables:
        TEST_MCP_SERVER=true/false (default: true)
        TEST_JUPYTER_SERVER=true/false (default: true)
    
    Parameters:
        request.param (str): Either "mcp_server" or "jupyter_extension"
    
    Returns:
        str: URL of the MCP endpoint for the selected mode
    """
    if request.param == "mcp_server":
        # Get jupyter_server fixture dynamically
        jupyter_server = request.getfixturevalue("jupyter_server")
        
        # Start standalone MCP server
        import socket
        def find_free_port():
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('', 0))
                s.listen(1)
                port = s.getsockname()[1]
            return port
        
        host = "localhost"
        port = find_free_port()
        
        yield from _start_server(
            name="Jupyter MCP",
            host=host,
            port=port,
            command=[
                "python",
                "-m",
                "jupyter_mcp_server",
                "--transport",
                "streamable-http",
                "--document-url",
                jupyter_server,
                "--document-id",
                "notebook.ipynb",
                "--document-token",
                JUPYTER_TOKEN,
                "--runtime-url",
                jupyter_server,
                "--start-new-runtime",
                "True",
                "--runtime-token",
                JUPYTER_TOKEN,
                "--port",
                str(port),
            ],
            readiness_endpoint="/api/healthz",
        )
    else:  # jupyter_extension
        # Get jupyter_server_with_extension fixture dynamically
        jupyter_server_with_extension = request.getfixturevalue("jupyter_server_with_extension")
        # Use the extension's MCP endpoints (note: no /mcp suffix, the extension handles routing)
        yield jupyter_server_with_extension


###############################################################################


@pytest_asyncio.fixture(scope="function")
async def mcp_client(jupyter_mcp_server):
    """An MCP client that can connect to the Jupyter MCP server
    
    This fixture provides an MCPClient instance configured to connect to
    the standalone MCP server. It requires the test_common module.
    
    Returns:
        MCPClient: Configured client for MCP protocol communication
    """
    from .test_common import MCPClient
    return MCPClient(jupyter_mcp_server)


@pytest.fixture(scope="function")
def mcp_client_parametrized(mcp_server_url):
    """MCP client that works with both server modes via parametrization
    
    This fixture creates an MCPClient that can connect to either:
    - Standalone MCP server (MCP_SERVER mode)
    - Jupyter extension MCP endpoints (JUPYTER_SERVER mode)
    
    Returns:
        MCPClient: Configured client for the parametrized server mode
    """
    from .test_common import MCPClient
    return MCPClient(mcp_server_url)

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/notebook_manager.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Unified Notebook and Kernel Management Module

This module provides centralized management for Jupyter notebooks and kernels,
replacing the scattered global variable approach with a unified architecture.
"""

from typing import Dict, Any, Optional, Callable, Union
from types import TracebackType

from jupyter_nbmodel_client import NbModelClient, get_notebook_websocket_url
from jupyter_kernel_client import KernelClient

from .config import get_config


class NotebookConnection:
    """
    Context manager for Notebook connections that handles the lifecycle
    of NbModelClient instances.
    
    Note: This is only used in MCP_SERVER mode with remote Jupyter servers that have RTC enabled.
    In JUPYTER_SERVER mode (local), notebook content is accessed directly via contents_manager.
    """
    
    def __init__(self, notebook_info: Dict[str, str], is_local: bool = False):
        self.notebook_info = notebook_info
        self.is_local = is_local
        self._notebook: Optional[NbModelClient] = None
    
    async def __aenter__(self) -> NbModelClient:
        """Enter context, establish notebook connection."""
        if self.is_local:
            raise ValueError(
                "NotebookConnection cannot be used in local/JUPYTER_SERVER mode. "
                "Cell operations in local mode should use contents_manager directly to read notebook JSON files."
            )
        
        config = get_config()
        ws_url = get_notebook_websocket_url(
            server_url=self.notebook_info.get("server_url", config.document_url),
            token=self.notebook_info.get("token", config.document_token),
            path=self.notebook_info.get("path", config.document_id),
            provider=config.provider
        )
        self._notebook = NbModelClient(ws_url)
        await self._notebook.__aenter__()
        return self._notebook
    
    async def __aexit__(
        self, 
        exc_type: Optional[type], 
        exc_val: Optional[BaseException], 
        exc_tb: Optional[TracebackType]
    ) -> None:
        """Exit context, clean up connection."""
        if self._notebook:
            await self._notebook.__aexit__(exc_type, exc_val, exc_tb)


class NotebookManager:
    """
    Centralized manager for multiple notebooks and their corresponding kernels.
    
    This class replaces the global kernel variable approach with a unified
    management system that supports both single and multiple notebook scenarios.
    """
    
    def __init__(self):
        self._notebooks: Dict[str, Dict[str, Any]] = {}
        self._default_notebook_name = "default"
        self._current_notebook: Optional[str] = None  # Currently active notebook
    
    def __contains__(self, name: str) -> bool:
        """Check if a notebook is managed by this instance."""
        return name in self._notebooks
    
    def __iter__(self):
        """Iterate over notebook name, info pairs."""
        return iter(self._notebooks.items())
    
    def add_notebook(
        self, 
        name: str, 
        kernel: Union[KernelClient, Dict[str, Any]],  # Can be KernelClient or dict with kernel metadata
        server_url: Optional[str] = None,
        token: Optional[str] = None,
        path: Optional[str] = None
    ) -> None:
        """
        Add a notebook to the manager.
        
        Args:
            name: Unique identifier for the notebook
            kernel: Kernel client instance (MCP_SERVER mode) or kernel metadata dict (JUPYTER_SERVER mode)
            server_url: Jupyter server URL (optional, uses config default). Use "local" for JUPYTER_SERVER mode.
            token: Authentication token (optional, uses config default)
            path: Notebook file path (optional, uses config default)
        """
        config = get_config()
        
        # Determine if this is local (JUPYTER_SERVER) mode or HTTP (MCP_SERVER) mode
        is_local_mode = server_url == "local"
        
        self._notebooks[name] = {
            "kernel": kernel,
            "is_local": is_local_mode,
            "notebook_info": {
                "server_url": server_url or config.document_url,
                "token": token or config.document_token,
                "path": path or config.document_id
            }
        }
        
        # For backward compatibility: if this is the first notebook or it's "default",
        # set it as the current notebook
        if self._current_notebook is None or name == self._default_notebook_name:
            self._current_notebook = name
    
    def remove_notebook(self, name: str) -> bool:
        """
        Remove a notebook from the manager.
        
        Args:
            name: Notebook identifier
            
        Returns:
            True if removed successfully, False if not found
        """
        if name in self._notebooks:
            try:
                notebook_data = self._notebooks[name]
                is_local = notebook_data.get("is_local", False)
                kernel = notebook_data["kernel"]
                
                # Only stop kernel if it's an HTTP KernelClient (MCP_SERVER mode)
                # In JUPYTER_SERVER mode, kernel is just metadata, actual kernel managed elsewhere
                if not is_local and kernel and hasattr(kernel, 'stop'):
                    kernel.stop()
            except Exception:
                # Ignore errors during kernel cleanup
                pass
            finally:
                del self._notebooks[name]
                
                # If we removed the current notebook, update the current pointer
                if self._current_notebook == name:
                    # Set to another notebook if available, prefer "default" for compatibility
                    if self._default_notebook_name in self._notebooks:
                        self._current_notebook = self._default_notebook_name
                    elif self._notebooks:
                        # Set to the first available notebook
                        self._current_notebook = next(iter(self._notebooks.keys()))
                    else:
                        # No notebooks left
                        self._current_notebook = None
            return True
        return False
    
    def get_kernel(self, name: str) -> Optional[Union[KernelClient, Dict[str, Any]]]:
        """
        Get the kernel for a specific notebook.
        
        Args:
            name: Notebook identifier
            
        Returns:
            Kernel client (MCP_SERVER mode) or kernel metadata dict (JUPYTER_SERVER mode), or None if not found
        """
        if name in self._notebooks:
            return self._notebooks[name]["kernel"]
        return None
    
    def get_kernel_id(self, name: str) -> Optional[str]:
        """
        Get the kernel ID for a specific notebook.
        
        Args:
            name: Notebook identifier
            
        Returns:
            Kernel ID string or None if not found
        """
        if name in self._notebooks:
            kernel = self._notebooks[name]["kernel"]
            # Handle both KernelClient objects and kernel metadata dicts
            if isinstance(kernel, dict):
                return kernel.get("id")
            elif hasattr(kernel, 'kernel_id'):
                return kernel.kernel_id
        return None
    
    def is_local_notebook(self, name: str) -> bool:
        """
        Check if a notebook is using local (JUPYTER_SERVER) mode.
        
        Args:
            name: Notebook identifier
            
        Returns:
            True if local mode, False otherwise
        """
        if name in self._notebooks:
            return self._notebooks[name].get("is_local", False)
        return False
    
    def get_notebook_connection(self, name: str) -> NotebookConnection:
        """
        Get a context manager for notebook connection.
        
        Args:
            name: Notebook identifier
            
        Returns:
            NotebookConnection context manager
            
        Raises:
            ValueError: If notebook doesn't exist
        """
        if name not in self._notebooks:
            raise ValueError(f"Notebook '{name}' does not exist in manager")
        
        return NotebookConnection(self._notebooks[name]["notebook_info"])
    
    def restart_notebook(self, name: str) -> bool:
        """
        Restart the kernel for a specific notebook.
        
        Args:
            name: Notebook identifier
            
        Returns:
            True if restarted successfully, False otherwise
        """
        if name in self._notebooks:
            try:
                kernel = self._notebooks[name]["kernel"]
                if kernel and hasattr(kernel, 'restart'):
                    kernel.restart()
                return True
            except Exception:
                return False
        return False
    
    def is_empty(self) -> bool:
        """Check if the manager is empty (no notebooks)."""
        return len(self._notebooks) == 0
    
    def ensure_kernel_alive(self, name: str, kernel_factory: Callable[[], KernelClient]) -> KernelClient:
        """
        Ensure a kernel is alive, create if necessary.
        
        Args:
            name: Notebook identifier
            kernel_factory: Function to create a new kernel
            
        Returns:
            The alive kernel instance
        """
        kernel = self.get_kernel(name)
        if kernel is None or not hasattr(kernel, 'is_alive') or not kernel.is_alive():
            # Create new kernel
            new_kernel = kernel_factory()
            self.add_notebook(name, new_kernel)
            return new_kernel
        return kernel
    
    def set_current_notebook(self, name: str) -> bool:
        """
        Set the currently active notebook.
        
        Args:
            name: Notebook identifier
            
        Returns:
            True if set successfully, False if notebook doesn't exist
        """
        if name in self._notebooks:
            self._current_notebook = name
            return True
        return False
    
    def get_current_notebook(self) -> Optional[str]:
        """
        Get the name of the currently active notebook.
        
        Returns:
            Current notebook name or None if no active notebook
        """
        return self._current_notebook
    
    def get_current_connection(self) -> NotebookConnection:
        """
        Get the connection for the currently active notebook.
        For backward compatibility, defaults to "default" if no current notebook is set.
        
        Returns:
            NotebookConnection context manager for the current notebook
            
        Raises:
            ValueError: If no notebooks exist and no default config is available
        """
        current = self._current_notebook or self._default_notebook_name
        
        # For backward compatibility: if the requested notebook doesn't exist but we're 
        # asking for default, create a connection using the default config
        if current not in self._notebooks and current == self._default_notebook_name:
            # Return a connection using default configuration
            config = get_config()
            return NotebookConnection({
                "server_url": config.document_url,
                "token": config.document_token,
                "path": config.document_id
            })
        
        return self.get_notebook_connection(current)
    
    def get_current_notebook_path(self) -> Optional[str]:
        """
        Get the file path of the currently active notebook.
        
        Returns:
            Notebook file path or None if no active notebook
        """
        current = self._current_notebook or self._default_notebook_name
        if current in self._notebooks:
            return self._notebooks[current]["notebook_info"].get("path")
        return None
    
    def list_all_notebooks(self) -> Dict[str, Dict[str, Any]]:
        """
        Get information about all managed notebooks.
        
        Returns:
            Dictionary with notebook names as keys and their info as values
        """
        result = {}
        for name, notebook_data in self._notebooks.items():
            kernel = notebook_data["kernel"]
            notebook_info = notebook_data["notebook_info"]
            
            # Check kernel status
            kernel_status = "unknown"
            if kernel:
                try:
                    kernel_status = "alive" if hasattr(kernel, 'is_alive') and kernel.is_alive() else "dead"
                except Exception:
                    kernel_status = "error"
            else:
                kernel_status = "not_initialized"
            
            result[name] = {
                "path": notebook_info.get("path", ""),
                "kernel_status": kernel_status,
                "is_current": name == self._current_notebook
            }
        
        return result

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/use_notebook_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Use notebook tool implementation."""

import logging
from typing import Any, Optional, Literal
from pathlib import Path
from jupyter_server_api import JupyterServerClient, NotFoundError
from jupyter_kernel_client import KernelClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager

logger = logging.getLogger(__name__)


class UseNotebookTool(BaseTool):
    """Tool to use (connect to or create) a notebook file."""
    
    @property
    def name(self) -> str:
        return "use_notebook"
    
    @property
    def description(self) -> str:
        return """Use a notebook file (connect to existing, create new, or switch to already-connected notebook).
    
Args:
    notebook_name: Unique identifier for the notebook
    notebook_path: Path to the notebook file, relative to the Jupyter server root (e.g. "notebook.ipynb").
                  Optional - if not provided, switches to an already-connected notebook with the given name.
    mode: "connect" to connect to existing, "create" to create new
    kernel_id: Specific kernel ID to use (optional, will create new if not provided)
    
Returns:
    str: Success message with notebook information"""
    
    async def _check_path_http(
        self, 
        server_client: JupyterServerClient, 
        notebook_path: str, 
        mode: str
    ) -> tuple[bool, Optional[str]]:
        """Check if path exists using HTTP API."""
        path = Path(notebook_path)
        try:
            parent_path = path.parent.as_posix() if path.parent.as_posix() != "." else ""
            
            if parent_path:
                dir_contents = server_client.contents.list_directory(parent_path)
            else:
                dir_contents = server_client.contents.list_directory("")
                
            if mode == "connect":
                file_exists = any(file.name == path.name for file in dir_contents)
                if not file_exists:
                    return False, f"'{notebook_path}' not found in jupyter server, please check the notebook already exists."
            
            return True, None
        except NotFoundError:
            parent_dir = path.parent.as_posix() if path.parent.as_posix() != "." else "root directory"
            return False, f"'{parent_dir}' not found in jupyter server, please check the directory path already exists."
        except Exception as e:
            return False, f"Failed to check the path '{notebook_path}': {e}"
    
    async def _check_path_local(
        self,
        contents_manager: Any,
        notebook_path: str,
        mode: str
    ) -> tuple[bool, Optional[str]]:
        """Check if path exists using local contents_manager API."""
        path = Path(notebook_path)
        try:
            parent_path = str(path.parent) if str(path.parent) != "." else ""
            
            # Get directory contents using local API
            model = await contents_manager.get(parent_path, content=True, type='directory')
            
            if mode == "connect":
                file_exists = any(item['name'] == path.name for item in model.get('content', []))
                if not file_exists:
                    return False, f"'{notebook_path}' not found in jupyter server, please check the notebook already exists."
            
            return True, None
        except Exception as e:
            parent_dir = str(path.parent) if str(path.parent) != "." else "root directory"
            return False, f"'{parent_dir}' not found in jupyter server: {e}"
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        session_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        # Tool-specific parameters
        notebook_name: str = None,
        notebook_path: Optional[str] = None,
        use_mode: Literal["connect", "create"] = "connect",
        kernel_id: Optional[str] = None,
        runtime_url: Optional[str] = None,
        runtime_token: Optional[str] = None,
        **kwargs
    ) -> str:
        """Execute the use_notebook tool.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: HTTP client for MCP_SERVER mode
            contents_manager: Direct API access for JUPYTER_SERVER mode
            kernel_manager: Direct kernel manager for JUPYTER_SERVER mode
            session_manager: Session manager for creating kernel-notebook associations
            notebook_manager: Notebook manager instance
            notebook_name: Unique identifier for the notebook
            notebook_path: Path to the notebook file (optional, if not provided switches to existing notebook)
            use_mode: "connect" or "create"
            kernel_id: Optional specific kernel ID
            runtime_url: Runtime URL for HTTP mode
            runtime_token: Runtime token for HTTP mode
            **kwargs: Additional parameters
            
        Returns:
            Success message with notebook information
        """
        # If no notebook_path provided, switch to already-connected notebook
        if notebook_path is None:
            if notebook_name not in notebook_manager:
                return f"Notebook '{notebook_name}' is not connected. Please provide a notebook_path to connect to it first."
            
            # Switch to the existing notebook
            notebook_manager.set_current_notebook(notebook_name)
            return f"Successfully switched to notebook '{notebook_name}'."
        
        # Rest of the logic for connecting/creating new notebooks
        if notebook_name in notebook_manager:
            return f"Notebook '{notebook_name}' is already using. Use unuse_notebook first if you want to reconnect."
        
        # Check server connectivity (HTTP mode only)
        if mode == ServerMode.MCP_SERVER and server_client is not None:
            try:
                server_client.get_status()
            except Exception as e:
                return f"Failed to connect the Jupyter server: {e}"
        
        # Check the path exists
        if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
            path_ok, error_msg = await self._check_path_local(contents_manager, notebook_path, use_mode)
        elif mode == ServerMode.MCP_SERVER and server_client is not None:
            path_ok, error_msg = await self._check_path_http(server_client, notebook_path, use_mode)
        else:
            return f"Invalid mode or missing required clients: mode={mode}"
        
        if not path_ok:
            return error_msg
        
        # Check kernel if kernel_id provided (HTTP mode only for now)
        if kernel_id and mode == ServerMode.MCP_SERVER and server_client is not None:
            kernels = server_client.kernels.list_kernels()
            kernel_exists = any(kernel.id == kernel_id for kernel in kernels)
            if not kernel_exists:
                return f"Kernel '{kernel_id}' not found in jupyter server, please check the kernel already exists."
        
        # Create notebook if needed
        if use_mode == "create":
            content = {
                "cells": [{
                    "cell_type": "markdown",
                    "metadata": {},
                    "source": [
                        "New Notebook Created by Jupyter MCP Server",
                    ]
                }],
                "metadata": {},
                "nbformat": 4,
                "nbformat_minor": 4
            }
            if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
                # Use local API to create notebook
                await contents_manager.new(model={'type': 'notebook'}, path=notebook_path)
            elif mode == ServerMode.MCP_SERVER and server_client is not None:
                server_client.contents.create_notebook(notebook_path, content=content)
        
        # Create/connect to kernel based on mode
        if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None:
            # JUPYTER_SERVER mode: Use local kernel manager API directly
            if kernel_id:
                # Connect to existing kernel - verify it exists
                if kernel_id not in kernel_manager:
                    return f"Kernel '{kernel_id}' not found in local kernel manager."
                kernel_info = {"id": kernel_id}
            else:
                # Start a new kernel using local API
                kernel_id = await kernel_manager.start_kernel()
                logger.info(f"Started kernel '{kernel_id}', waiting for it to be ready...")
                
                # CRITICAL: Wait for the kernel to actually start and be ready
                # The start_kernel() call returns immediately, but kernel takes time to start
                import asyncio
                max_wait_time = 30  # seconds
                wait_interval = 0.5  # seconds
                elapsed = 0
                kernel_ready = False
                
                while elapsed < max_wait_time:
                    try:
                        # Get kernel model to check its state
                        kernel_model = kernel_manager.get_kernel(kernel_id)
                        if kernel_model is not None:
                            # Kernel exists, check if it's ready
                            # In Jupyter, we can try to get connection info which indicates readiness
                            try:
                                kernel_manager.get_connection_info(kernel_id)
                                kernel_ready = True
                                logger.info(f"Kernel '{kernel_id}' is ready (took {elapsed:.1f}s)")
                                break
                            except:
                                # Connection info not available yet, kernel still starting
                                pass
                    except Exception as e:
                        logger.debug(f"Waiting for kernel to start: {e}")
                    
                    await asyncio.sleep(wait_interval)
                    elapsed += wait_interval
                
                if not kernel_ready:
                    logger.warning(f"Kernel '{kernel_id}' may not be fully ready after {max_wait_time}s wait")
                
                kernel_info = {"id": kernel_id}
            
            # Create a Jupyter session to associate the kernel with the notebook
            # This is CRITICAL for JupyterLab to recognize the kernel-notebook connection
            if session_manager is not None:
                try:
                    # create_session is an async method, so we await it directly
                    session_dict = await session_manager.create_session(
                        path=notebook_path,
                        kernel_id=kernel_id,
                        type="notebook",
                        name=notebook_path
                    )
                    logger.info(f"Created Jupyter session '{session_dict.get('id')}' for notebook '{notebook_path}' with kernel '{kernel_id}'")
                except Exception as e:
                    logger.warning(f"Failed to create Jupyter session: {e}. Notebook may not be properly connected in JupyterLab UI.")
            else:
                logger.warning("No session_manager available. Notebook may not be properly connected in JupyterLab UI.")
            
            # For JUPYTER_SERVER mode, store kernel info (not KernelClient object)
            # The actual kernel is managed by kernel_manager
            notebook_manager.add_notebook(
                notebook_name,
                kernel_info,  # Store kernel metadata, not client object
                server_url="local",  # Indicate local mode
                token=None,
                path=notebook_path
            )
        elif mode == ServerMode.MCP_SERVER and runtime_url:
            # MCP_SERVER mode: Use HTTP-based kernel client
            kernel = KernelClient(
                server_url=runtime_url,
                token=runtime_token,
                kernel_id=kernel_id
            )
            kernel.start()
            
            # Add notebook to manager with HTTP client
            notebook_manager.add_notebook(
                notebook_name,
                kernel,
                server_url=runtime_url,
                token=runtime_token,
                path=notebook_path
            )
        else:
            return f"Invalid configuration: mode={mode}, runtime_url={runtime_url}, kernel_manager={kernel_manager is not None}"
        
        notebook_manager.set_current_notebook(notebook_name)
        
        # Return message based on mode
        if use_mode == "create":
            return f"Successfully created and using notebook '{notebook_name}' at path '{notebook_path}' in {mode.value} mode."
        else:
            return f"Successfully using notebook '{notebook_name}' at path '{notebook_path}' in {mode.value} mode."

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/insert_execute_code_cell_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Insert and execute code cell tool implementation."""

import asyncio
import logging
from pathlib import Path
from typing import Any, Optional, List, Union
from jupyter_server_api import JupyterServerClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager
from jupyter_mcp_server.utils import get_current_notebook_context, safe_extract_outputs, execute_via_execution_stack
from mcp.types import ImageContent

logger = logging.getLogger(__name__)


class InsertExecuteCodeCellTool(BaseTool):
    """Tool to insert and execute a code cell."""
    
    @property
    def name(self) -> str:
        return "insert_execute_code_cell"
    
    @property
    def description(self) -> str:
        return """Insert and execute a code cell in a Jupyter notebook.

Args:
    cell_index: Index of the cell to insert (0-based). Use -1 to append at end and execute.
    cell_source: Code source

Returns:
    list[Union[str, ImageContent]]: List of outputs from the executed cell"""
    
    async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str):
        """Get the YNotebook document if it's currently open in a collaborative session."""
        try:
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            if yroom_manager is None:
                return None
                
            room_id = f"json:notebook:{file_id}"
            
            if yroom_manager.has_room(room_id):
                yroom = yroom_manager.get_room(room_id)
                notebook = await yroom.get_jupyter_ydoc()
                return notebook
        except Exception:
            pass
        
        return None
    
    async def _insert_execute_ydoc(
        self,
        serverapp: Any,
        notebook_path: str,
        cell_index: int,
        cell_source: str,
        kernel_manager,
        kernel_id: str,
        safe_extract_outputs_fn
    ) -> List[Union[str, ImageContent]]:
        """Insert and execute cell using YDoc (collaborative editing mode)."""
        # Get file_id from file_id_manager
        file_id_manager = serverapp.web_app.settings.get("file_id_manager")
        if file_id_manager is None:
            raise RuntimeError("file_id_manager not available in serverapp")
        
        file_id = file_id_manager.get_id(notebook_path)
        
        # Try to get YDoc
        ydoc = await self._get_jupyter_ydoc(serverapp, file_id)
        
        if ydoc:
            # Notebook is open in collaborative mode, use YDoc
            total_cells = len(ydoc.ycells)
            actual_index = cell_index if cell_index != -1 else total_cells
            
            if actual_index < 0 or actual_index > total_cells:
                raise ValueError(
                    f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end."
                )
            
            # Create and insert the cell
            cell = {
                "cell_type": "code",
                "source": cell_source,
            }
            ycell = ydoc.create_ycell(cell)
            
            if actual_index >= total_cells:
                ydoc.ycells.append(ycell)
            else:
                ydoc.ycells.insert(actual_index, ycell)
            
            # Get the inserted cell's ID for RTC metadata
            inserted_cell_id = ycell.get("id")
            
            # Build document_id for RTC (format: json:notebook:<file_id>)
            document_id = f"json:notebook:{file_id}"
            
            # Execute the cell using ExecutionStack with RTC metadata
            # This will automatically update the cell outputs in the YDoc
            return await execute_via_execution_stack(
                serverapp, kernel_id, cell_source, 
                document_id=document_id, 
                cell_id=inserted_cell_id,
                timeout=300,
                logger=logger
            )
        else:
            # YDoc not available - use file operations + direct kernel execution
            # This path is used when notebook is not open in JupyterLab but we still have kernel access
            logger.info("YDoc not available, using file operations + ExecutionStack execution fallback")
            
            # Insert cell using file operations
            from jupyter_mcp_server.tools.insert_cell_tool import InsertCellTool
            insert_tool = InsertCellTool()
            
            # Call the file-based insertion method directly
            await insert_tool._insert_cell_file(notebook_path, cell_index, "code", cell_source)
            
            # Calculate actual index where cell was inserted
            import nbformat
            with open(notebook_path, 'r', encoding='utf-8') as f:
                notebook = nbformat.read(f, as_version=4)
            total_cells = len(notebook.cells)
            actual_index = cell_index if cell_index != -1 else total_cells - 1
            
            # Then execute directly via ExecutionStack (without RTC metadata since notebook not open)
            outputs = await execute_via_execution_stack(
                serverapp, kernel_id, cell_source, timeout=300, logger=logger
            )
            
            # CRITICAL: Write outputs back to the notebook file so they're visible in UI
            logger.info(f"Writing {len(outputs)} outputs back to notebook cell {actual_index}")
            await self._write_outputs_to_cell(notebook_path, actual_index, outputs)
            
            return outputs
    
    async def _insert_execute_websocket(
        self,
        notebook_manager: NotebookManager,
        cell_index: int,
        cell_source: str,
        ensure_kernel_alive
    ) -> List[Union[str, ImageContent]]:
        """Insert and execute cell using WebSocket connection (MCP_SERVER mode)."""
        # Ensure kernel is alive
        if ensure_kernel_alive:
            kernel = ensure_kernel_alive()
        else:
            # Fallback: get kernel from notebook_manager
            current_notebook = notebook_manager.get_current_notebook() or "default"
            kernel = notebook_manager.get_kernel(current_notebook)
            if not kernel:
                raise RuntimeError("No kernel available for execution")
        
        async with notebook_manager.get_current_connection() as notebook:
            actual_index = cell_index if cell_index != -1 else len(notebook)
            
            if actual_index < 0 or actual_index > len(notebook):
                raise ValueError(f"Cell index {cell_index} out of range")
            
            notebook.insert_cell(actual_index, cell_source, "code")
            notebook.execute_cell(actual_index, kernel)

            outputs = notebook[actual_index].get("outputs", [])
            return safe_extract_outputs(outputs)
    
    async def _write_outputs_to_cell(
        self,
        notebook_path: str,
        cell_index: int,
        outputs: List[Union[str, ImageContent]]
    ):
        """Write execution outputs back to a notebook cell.
        
        This is critical for making outputs visible in JupyterLab when using
        file-based execution (when YDoc/RTC is not available).
        
        Args:
            notebook_path: Path to the notebook file
            cell_index: Index of the cell to update
            outputs: List of output strings or ImageContent objects
        """
        import nbformat
        from jupyter_mcp_server.utils import _clean_notebook_outputs
        
        # Read the notebook
        with open(notebook_path, 'r', encoding='utf-8') as f:
            notebook = nbformat.read(f, as_version=4)
        
        # Clean any transient fields
        _clean_notebook_outputs(notebook)
        
        if cell_index < 0 or cell_index >= len(notebook.cells):
            logger.warning(f"Cell index {cell_index} out of range, cannot write outputs")
            return
        
        cell = notebook.cells[cell_index]
        if cell.cell_type != 'code':
            logger.warning(f"Cell {cell_index} is not a code cell, cannot write outputs")
            return
        
        # Convert formatted outputs to nbformat structure
        cell.outputs = []
        for output in outputs:
            if isinstance(output, ImageContent):
                # Image output
                cell.outputs.append(nbformat.v4.new_output(
                    output_type='display_data',
                    data={output.mimeType: output.data},
                    metadata={}
                ))
            elif isinstance(output, str):
                # Text output - determine if it's an error or regular output
                if output.startswith('[ERROR:') or output.startswith('[TIMEOUT ERROR:'):
                    # Error output
                    cell.outputs.append(nbformat.v4.new_output(
                        output_type='stream',
                        name='stderr',
                        text=output
                    ))
                else:
                    # Regular output (assume execute_result for simplicity)
                    cell.outputs.append(nbformat.v4.new_output(
                        output_type='execute_result',
                        data={'text/plain': output},
                        metadata={},
                        execution_count=None
                    ))
        
        # Update execution count
        max_count = 0
        for c in notebook.cells:
            if c.cell_type == 'code' and c.execution_count:
                max_count = max(max_count, c.execution_count)
        cell.execution_count = max_count + 1
        
        # Write back to file
        with open(notebook_path, 'w', encoding='utf-8') as f:
            nbformat.write(notebook, f)
        
        logger.info(f"Wrote {len(outputs)} outputs to cell {cell_index} in {notebook_path}")
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        # Tool-specific parameters
        cell_index: int = None,
        cell_source: str = None,
        # Helper function passed from server.py
        ensure_kernel_alive = None,
        **kwargs
    ) -> List[Union[str, ImageContent]]:
        """Execute the insert_execute_code_cell tool.
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            kernel_manager: Kernel manager for JUPYTER_SERVER mode
            notebook_manager: Notebook manager instance
            cell_index: Index to insert cell (0-based, -1 to append)
            cell_source: Code source
            ensure_kernel_alive: Function to ensure kernel is alive
            **kwargs: Additional parameters
            
        Returns:
            List of outputs from the executed cell
        """
        if mode == ServerMode.JUPYTER_SERVER and kernel_manager is not None:
            # JUPYTER_SERVER mode: Use YDoc and kernel_manager
            from jupyter_mcp_server.jupyter_extension.context import get_server_context
            from jupyter_mcp_server.config import get_config
            
            context = get_server_context()
            serverapp = context.serverapp
            
            notebook_path, kernel_id = get_current_notebook_context(notebook_manager)
            
            # Resolve to absolute path FIRST
            if serverapp and not Path(notebook_path).is_absolute():
                root_dir = serverapp.root_dir
                notebook_path = str(Path(root_dir) / notebook_path)
            
            if kernel_id is None:
                # No kernel available - start a new one on demand
                logger.info("No kernel_id available, starting new kernel for insert_execute_code_cell")
                kernel_id = await kernel_manager.start_kernel()
                
                # Wait a bit for kernel to initialize
                await asyncio.sleep(1.0)
                logger.info(f"Kernel {kernel_id} started and initialized")
                
                # Store the kernel with ABSOLUTE path in notebook_manager
                if notebook_manager is not None:
                    kernel_info = {"id": kernel_id}
                    notebook_manager.add_notebook(
                        name=notebook_path,
                        kernel=kernel_info,
                        server_url="local",
                        path=notebook_path
                    )
            
            if serverapp:
                return await self._insert_execute_ydoc(
                    serverapp, notebook_path, cell_index, cell_source,
                    kernel_manager, kernel_id, safe_extract_outputs
                )
            else:
                raise RuntimeError("serverapp not available in JUPYTER_SERVER mode")
                
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            # MCP_SERVER mode: Use WebSocket connection
            return await self._insert_execute_websocket(
                notebook_manager, cell_index, cell_source, ensure_kernel_alive
            )
        else:
            raise ValueError(f"Invalid mode or missing required clients: mode={mode}")

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/jupyter_extension/backends/local_backend.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Local Backend Implementation

This backend uses the Jupyter Server's local API directly when running as an extension.
It provides efficient local access to contents_manager and kernel_manager.
"""

from typing import Optional, Any, Union, Literal, TYPE_CHECKING
import asyncio
from mcp.types import ImageContent
from jupyter_mcp_server.jupyter_extension.backends.base import Backend
from jupyter_mcp_server.utils import safe_extract_outputs

if TYPE_CHECKING:
    from jupyter_server.serverapp import ServerApp


class LocalBackend(Backend):
    """
    Backend that uses local Jupyter Server API directly.
    
    Uses:
    - serverapp.contents_manager for notebook file operations
    - serverapp.kernel_manager for kernel management
    - serverapp.kernel_spec_manager for kernel specs
    
    This backend is only available when running as a Jupyter Server extension
    with document_url="local" or runtime_url="local".
    """
    
    def __init__(self, serverapp: 'ServerApp'):
        """
        Initialize local backend with direct serverapp access.
        
        Args:
            serverapp: Jupyter ServerApp instance
        """
        self.serverapp = serverapp
        self.contents_manager = serverapp.contents_manager
        self.kernel_manager = serverapp.kernel_manager
        self.kernel_spec_manager = serverapp.kernel_spec_manager
    
    # Notebook operations
    
    async def get_notebook_content(self, path: str) -> dict[str, Any]:
        """
        Get notebook content using local contents_manager.
        
        Args:
            path: Path to notebook file
            
        Returns:
            Notebook content dictionary
        """
        model = await asyncio.to_thread(
            self.contents_manager.get,
            path,
            type='notebook',
            content=True
        )
        return model['content']
    
    async def list_notebooks(self, path: str = "") -> list[str]:
        """
        List all notebooks recursively using local contents_manager.
        
        Args:
            path: Directory path to search
            
        Returns:
            List of notebook paths
        """
        notebooks = []
        await self._list_notebooks_recursive(path, notebooks)
        return notebooks
    
    async def _list_notebooks_recursive(self, path: str, notebooks: list[str]) -> None:
        """Helper to recursively list notebooks."""
        try:
            model = await asyncio.to_thread(
                self.contents_manager.get,
                path,
                content=True
            )
            
            if model['type'] == 'directory':
                for item in model['content']:
                    item_path = f"{path}/{item['name']}" if path else item['name']
                    
                    if item['type'] == 'directory':
                        await self._list_notebooks_recursive(item_path, notebooks)
                    elif item['type'] == 'notebook' or item['name'].endswith('.ipynb'):
                        notebooks.append(item_path)
        except Exception:
            # Skip directories we can't access
            pass
    
    async def notebook_exists(self, path: str) -> bool:
        """
        Check if notebook exists using local contents_manager.
        
        Args:
            path: Path to notebook
            
        Returns:
            True if exists
        """
        try:
            await asyncio.to_thread(
                self.contents_manager.get,
                path,
                content=False
            )
            return True
        except Exception:
            return False
    
    async def create_notebook(self, path: str) -> dict[str, Any]:
        """
        Create a new notebook using local contents_manager.
        
        Args:
            path: Path for new notebook
            
        Returns:
            Created notebook content
        """
        model = await asyncio.to_thread(
            self.contents_manager.new,
            path=path
        )
        return model['content']
    
    # Cell operations
    
    async def read_cells(
        self, 
        path: str, 
        start_index: Optional[int] = None,
        end_index: Optional[int] = None
    ) -> list[dict[str, Any]]:
        """
        Read cells from notebook.
        
        Args:
            path: Notebook path
            start_index: Start index
            end_index: End index
            
        Returns:
            List of cells
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if start_index is not None or end_index is not None:
            start = start_index or 0
            end = end_index if end_index is not None else len(cells)
            cells = cells[start:end]
        
        return cells
    
    async def append_cell(
        self, 
        path: str, 
        cell_type: Literal["code", "markdown"],
        source: Union[str, list[str]]
    ) -> int:
        """
        Append a cell to notebook.
        
        Args:
            path: Notebook path
            cell_type: Cell type
            source: Cell source
            
        Returns:
            Index of appended cell
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        # Normalize source to list of strings
        if isinstance(source, str):
            source = source.splitlines(keepends=True)
        
        new_cell = {
            'cell_type': cell_type,
            'metadata': {},
            'source': source
        }
        
        if cell_type == 'code':
            new_cell['outputs'] = []
            new_cell['execution_count'] = None
        
        cells.append(new_cell)
        content['cells'] = cells
        
        # Save updated notebook
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return len(cells) - 1
    
    async def insert_cell(
        self,
        path: str,
        cell_index: int,
        cell_type: Literal["code", "markdown"],
        source: Union[str, list[str]]
    ) -> int:
        """
        Insert a cell at specific index.
        
        Args:
            path: Notebook path
            cell_index: Insert position
            cell_type: Cell type
            source: Cell source
            
        Returns:
            Index of inserted cell
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        # Normalize source
        if isinstance(source, str):
            source = source.splitlines(keepends=True)
        
        new_cell = {
            'cell_type': cell_type,
            'metadata': {},
            'source': source
        }
        
        if cell_type == 'code':
            new_cell['outputs'] = []
            new_cell['execution_count'] = None
        
        cells.insert(cell_index, new_cell)
        content['cells'] = cells
        
        # Save updated notebook
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return cell_index
    
    async def delete_cell(self, path: str, cell_index: int) -> None:
        """
        Delete a cell from notebook.
        
        Args:
            path: Notebook path
            cell_index: Index to delete
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if 0 <= cell_index < len(cells):
            cells.pop(cell_index)
            content['cells'] = cells
            
            await asyncio.to_thread(
                self.contents_manager.save,
                {
                    'type': 'notebook',
                    'content': content
                },
                path
            )
    
    async def overwrite_cell(
        self,
        path: str,
        cell_index: int,
        new_source: Union[str, list[str]]
    ) -> tuple[str, str]:
        """
        Overwrite cell content.
        
        Args:
            path: Notebook path
            cell_index: Cell index
            new_source: New source
            
        Returns:
            Tuple of (old_source, new_source)
        """
        content = await self.get_notebook_content(path)
        cells = content.get('cells', [])
        
        if cell_index < 0 or cell_index >= len(cells):
            raise ValueError(f"Cell index {cell_index} out of range")
        
        cell = cells[cell_index]
        old_source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source']
        
        # Normalize new source
        if isinstance(new_source, str):
            new_source_str = new_source
            new_source = new_source.splitlines(keepends=True)
        else:
            new_source_str = ''.join(new_source)
        
        cell['source'] = new_source
        content['cells'] = cells
        
        await asyncio.to_thread(
            self.contents_manager.save,
            {
                'type': 'notebook',
                'content': content
            },
            path
        )
        
        return (old_source, new_source_str)
    
    # Kernel operations
    
    async def get_or_create_kernel(self, path: str, kernel_id: Optional[str] = None) -> str:
        """
        Get existing kernel or create new one.
        
        Args:
            path: Notebook path (for context)
            kernel_id: Specific kernel ID
            
        Returns:
            Kernel ID
        """
        if kernel_id and kernel_id in self.kernel_manager:
            return kernel_id
        
        # Start new kernel
        kernel_id = await self.kernel_manager.start_kernel()
        return kernel_id
    
    async def execute_cell(
        self,
        path: str,
        cell_index: int,
        kernel_id: str,
        timeout_seconds: int = 300
    ) -> list[Union[str, ImageContent]]:
        """
        Execute a cell using local kernel manager.
        
        Args:
            path: Notebook path
            cell_index: Cell index
            kernel_id: Kernel ID
            timeout_seconds: Timeout
            
        Returns:
            List of outputs
        """
        # Get cell source
        cells = await self.read_cells(path)
        if cell_index < 0 or cell_index >= len(cells):
            raise ValueError(f"Cell index {cell_index} out of range")
        
        cell = cells[cell_index]
        source = ''.join(cell['source']) if isinstance(cell['source'], list) else cell['source']
        
        # Get kernel client
        kernel = self.kernel_manager.get_kernel(kernel_id)
        client = kernel.client()
        
        # Execute code
        msg_id = client.execute(source)
        
        # Collect outputs
        outputs = []
        start_time = asyncio.get_event_loop().time()
        
        while True:
            if asyncio.get_event_loop().time() - start_time > timeout_seconds:
                raise TimeoutError(f"Cell execution exceeded {timeout_seconds} seconds")
            
            try:
                msg = await asyncio.wait_for(
                    asyncio.to_thread(client.get_iopub_msg, timeout=1),
                    timeout=2
                )
                
                msg_type = msg['header']['msg_type']
                
                if msg_type == 'status':
                    if msg['content']['execution_state'] == 'idle':
                        break
                elif msg_type in ['execute_result', 'display_data']:
                    outputs.append(msg['content'])
                elif msg_type == 'stream':
                    outputs.append(msg['content'])
                elif msg_type == 'error':
                    outputs.append(msg['content'])
            except asyncio.TimeoutError:
                continue
            except Exception:
                break
        
        # Update cell with outputs
        content = await self.get_notebook_content(path)
        if cell_index < len(content['cells']):
            content['cells'][cell_index]['outputs'] = outputs
            await asyncio.to_thread(
                self.contents_manager.save,
                {'type': 'notebook', 'content': content},
                path
            )
        
        return safe_extract_outputs(outputs)
    
    async def interrupt_kernel(self, kernel_id: str) -> None:
        """Interrupt a kernel."""
        if kernel_id in self.kernel_manager:
            kernel = self.kernel_manager.get_kernel(kernel_id)
            await kernel.interrupt()
    
    async def restart_kernel(self, kernel_id: str) -> None:
        """Restart a kernel."""
        if kernel_id in self.kernel_manager:
            await self.kernel_manager.restart_kernel(kernel_id)
    
    async def shutdown_kernel(self, kernel_id: str) -> None:
        """Shutdown a kernel."""
        if kernel_id in self.kernel_manager:
            await self.kernel_manager.shutdown_kernel(kernel_id)
    
    async def list_kernels(self) -> list[dict[str, Any]]:
        """List all running kernels."""
        return [
            {
                'id': kid,
                'name': self.kernel_manager.get_kernel(kid).kernel_name
            }
            for kid in self.kernel_manager.list_kernel_ids()
        ]
    
    async def kernel_exists(self, kernel_id: str) -> bool:
        """Check if kernel exists."""
        return kernel_id in self.kernel_manager

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/insert_cell_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Insert cell tool implementation."""

from typing import Any, Optional, Literal
from pathlib import Path
import nbformat
from jupyter_server_api import JupyterServerClient
from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.notebook_manager import NotebookManager
from jupyter_mcp_server.utils import get_current_notebook_context
from jupyter_mcp_server.utils import get_surrounding_cells_info


class InsertCellTool(BaseTool):
    """Tool to insert a cell at a specified position."""
    
    @property
    def name(self) -> str:
        return "insert_cell"
    
    @property
    def description(self) -> str:
        return """Insert a cell to specified position.

Args:
    cell_index: target index for insertion (0-based). Use -1 to append at end.
    cell_type: Type of cell to insert ("code" or "markdown")
    cell_source: Source content for the cell

Returns:
    str: Success message and the structure of its surrounding cells (up to 5 cells above and 5 cells below)"""
    
    async def _get_jupyter_ydoc(self, serverapp: Any, file_id: str):
        """Get the YNotebook document if it's currently open in a collaborative session.
        
        This follows the jupyter_ai_tools pattern of accessing YDoc through the
        yroom_manager when the notebook is actively being edited.
        
        Args:
            serverapp: The Jupyter ServerApp instance
            file_id: The file ID for the document
            
        Returns:
            YNotebook instance or None if not in a collaborative session
        """
        try:
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            if yroom_manager is None:
                return None
                
            room_id = f"json:notebook:{file_id}"
            
            if yroom_manager.has_room(room_id):
                yroom = yroom_manager.get_room(room_id)
                notebook = await yroom.get_jupyter_ydoc()
                return notebook
        except Exception:
            # YDoc not available, will fall back to file operations
            pass
        
        return None
    
    async def _insert_cell_ydoc(
        self,
        serverapp: Any,
        notebook_path: str,
        cell_index: int,
        cell_type: Literal["code", "markdown"],
        cell_source: str
    ) -> str:
        """Insert cell using YDoc (collaborative editing mode).
        
        Args:
            serverapp: Jupyter ServerApp instance
            notebook_path: Path to the notebook
            cell_index: Index to insert at (-1 for append)
            cell_type: Type of cell to insert
            cell_source: Source content for the cell
            
        Returns:
            Success message with surrounding cells info
        """
        # Get file_id from file_id_manager
        file_id_manager = serverapp.web_app.settings.get("file_id_manager")
        if file_id_manager is None:
            raise RuntimeError("file_id_manager not available in serverapp")
        
        file_id = file_id_manager.get_id(notebook_path)
        
        # Try to get YDoc
        ydoc = await self._get_jupyter_ydoc(serverapp, file_id)
        
        if ydoc:
            # Notebook is open in collaborative mode, use YDoc
            total_cells = len(ydoc.ycells)
            actual_index = cell_index if cell_index != -1 else total_cells
            
            if actual_index < 0 or actual_index > total_cells:
                raise ValueError(
                    f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end."
                )
            
            # Create the cell
            cell = {
                "cell_type": cell_type,
                "source": "",
            }
            ycell = ydoc.create_ycell(cell)
            
            # Insert at the specified position
            if actual_index >= total_cells:
                ydoc.ycells.append(ycell)
            else:
                ydoc.ycells.insert(actual_index, ycell)
            
            # Write content to the cell collaboratively
            if cell_source:
                # Set the source directly on the ycell
                ycell["source"] = cell_source
            
            # Get surrounding cells info (simplified version for YDoc)
            new_total_cells = len(ydoc.ycells)
            surrounding_info = self._get_surrounding_cells_info_ydoc(ydoc, actual_index, new_total_cells)
            
            return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}"
        else:
            # YDoc not available, use file operations
            return await self._insert_cell_file(notebook_path, cell_index, cell_type, cell_source)
    
    def _get_surrounding_cells_info_ydoc(self, ydoc, center_index: int, total_cells: int) -> str:
        """Get info about surrounding cells from YDoc."""
        lines = []
        start_index = max(0, center_index - 5)
        end_index = min(total_cells, center_index + 6)
        
        for i in range(start_index, end_index):
            cell = ydoc.ycells[i]
            cell_type = cell.get("cell_type", "unknown")
            source = cell.get("source", "")
            if isinstance(source, list):
                source = "".join(source)
            first_line = source.split('\n')[0][:50] if source else "(empty)"
            marker = " <-- NEW" if i == center_index else ""
            lines.append(f"  [{i}] {cell_type}: {first_line}{marker}")
        
        return "\n".join(lines)
    
    async def _insert_cell_file(
        self,
        notebook_path: str,
        cell_index: int,
        cell_type: Literal["code", "markdown"],
        cell_source: str
    ) -> str:
        """Insert cell using file operations (non-collaborative mode).
        
        Args:
            notebook_path: Absolute path to the notebook
            cell_index: Index to insert at (-1 for append)
            cell_type: Type of cell to insert
            cell_source: Source content for the cell
            
        Returns:
            Success message with surrounding cells info
        """
        # Read notebook file
        with open(notebook_path, "r", encoding="utf-8") as f:
            # Read as version 4 (latest) to ensure consistency and support for cell IDs
            notebook = nbformat.read(f, as_version=4)
        
        # Clean any transient fields from existing outputs (kernel protocol field not in nbformat schema)
        self._clean_notebook_outputs(notebook)
        
        total_cells = len(notebook.cells)
        actual_index = cell_index if cell_index != -1 else total_cells
        
        if actual_index < 0 or actual_index > total_cells:
            raise ValueError(
                f"Cell index {cell_index} is out of range. Notebook has {total_cells} cells. Use -1 to append at end."
            )
        
        # Create and insert the cell
        if cell_type == "code":
            new_cell = nbformat.v4.new_code_cell(source=cell_source or "")
        elif cell_type == "markdown":
            new_cell = nbformat.v4.new_markdown_cell(source=cell_source or "")
        else:
            raise ValueError(f"Invalid cell_type: {cell_type}. Must be 'code' or 'markdown'.")
        
        notebook.cells.insert(actual_index, new_cell)
        
        # Write back to file
        with open(notebook_path, "w", encoding="utf-8") as f:
            nbformat.write(notebook, f)
        
        # Get surrounding cells info
        new_total_cells = len(notebook.cells)
        surrounding_info = self._get_surrounding_cells_info_file(notebook, actual_index, new_total_cells)
        
        return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}"
    
    def _clean_notebook_outputs(self, notebook):
        """Remove transient fields from all cell outputs.
        
        The 'transient' field is part of the Jupyter kernel messaging protocol
        but is NOT part of the nbformat schema. This causes validation errors.
        
        Args:
            notebook: nbformat notebook object to clean (modified in place)
        """
        # Clean transient fields from outputs
        for cell in notebook.cells:
            if cell.cell_type == 'code' and hasattr(cell, 'outputs'):
                for output in cell.outputs:
                    if isinstance(output, dict) and 'transient' in output:
                        del output['transient']
    
    def _get_surrounding_cells_info_file(self, notebook, center_index: int, total_cells: int) -> str:
        """Get info about surrounding cells from nbformat notebook."""
        lines = []
        start_index = max(0, center_index - 5)
        end_index = min(total_cells, center_index + 6)
        
        for i in range(start_index, end_index):
            cell = notebook.cells[i]
            cell_type = cell.cell_type
            source = cell.source
            first_line = source.split('\n')[0][:50] if source else "(empty)"
            marker = " <-- NEW" if i == center_index else ""
            lines.append(f"  [{i}] {cell_type}: {first_line}{marker}")
        
        return "\n".join(lines)
    
    async def _insert_cell_websocket(
        self,
        notebook_manager: NotebookManager,
        cell_index: int,
        cell_type: Literal["code", "markdown"],
        cell_source: str
    ) -> str:
        """Insert cell using WebSocket connection (MCP_SERVER mode).
        
        Args:
            notebook_manager: Notebook manager instance
            cell_index: Index to insert at (-1 for append)
            cell_type: Type of cell to insert
            cell_source: Source content for the cell
            
        Returns:
            Success message with surrounding cells info
        """
        async with notebook_manager.get_current_connection() as notebook:
            actual_index = cell_index if cell_index != -1 else len(notebook)
            if actual_index < 0 or actual_index > len(notebook):
                raise ValueError(f"Cell index {cell_index} out of range")
            
            notebook.insert_cell(actual_index, cell_source, cell_type)
            
            # Get surrounding cells info
            new_total_cells = len(notebook)
            surrounding_info = get_surrounding_cells_info(notebook, actual_index, new_total_cells)
            
            return f"Cell inserted successfully at index {actual_index} ({cell_type})!\n\nCurrent Surrounding Cells:\n{surrounding_info}"
    
    async def execute(
        self,
        mode: ServerMode,
        server_client: Optional[JupyterServerClient] = None,
        kernel_client: Optional[Any] = None,
        contents_manager: Optional[Any] = None,
        kernel_manager: Optional[Any] = None,
        kernel_spec_manager: Optional[Any] = None,
        notebook_manager: Optional[NotebookManager] = None,
        # Tool-specific parameters
        cell_index: int = None,
        cell_type: Literal["code", "markdown"] = None,
        cell_source: str = None,
        **kwargs
    ) -> str:
        """Execute the insert_cell tool.
        
        This tool supports three modes of operation:
        
        1. JUPYTER_SERVER mode with YDoc (collaborative):
           - Checks if notebook is open in a collaborative session
           - Uses YDoc for real-time collaborative editing
           - Changes are immediately visible to all connected users
           
        2. JUPYTER_SERVER mode without YDoc (file-based):
           - Falls back to direct file operations using nbformat
           - Suitable when notebook is not actively being edited
           
        3. MCP_SERVER mode (WebSocket):
           - Uses WebSocket connection to remote Jupyter server
           - Accesses YDoc through NbModelClient
        
        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            server_client: HTTP client for MCP_SERVER mode
            contents_manager: Direct API access for JUPYTER_SERVER mode
            notebook_manager: Notebook manager instance
            cell_index: Target index for insertion (0-based, -1 to append)
            cell_type: Type of cell ("code" or "markdown")
            cell_source: Source content for the cell
            **kwargs: Additional parameters
            
        Returns:
            Success message with surrounding cells info
        """
        if mode == ServerMode.JUPYTER_SERVER and contents_manager is not None:
            # JUPYTER_SERVER mode: Try YDoc first, fall back to file operations
            from jupyter_mcp_server.jupyter_extension.context import get_server_context
            
            context = get_server_context()
            serverapp = context.serverapp
            notebook_path, _ = get_current_notebook_context(notebook_manager)
            
            # Resolve to absolute path
            if serverapp and not Path(notebook_path).is_absolute():
                root_dir = serverapp.root_dir
                notebook_path = str(Path(root_dir) / notebook_path)
            
            if serverapp:
                # Try YDoc approach first
                return await self._insert_cell_ydoc(serverapp, notebook_path, cell_index, cell_type, cell_source)
            else:
                # Fall back to file operations
                return await self._insert_cell_file(notebook_path, cell_index, cell_type, cell_source)
                
        elif mode == ServerMode.MCP_SERVER and notebook_manager is not None:
            # MCP_SERVER mode: Use WebSocket connection
            return await self._insert_cell_websocket(notebook_manager, cell_index, cell_type, cell_source)
        else:
            raise ValueError(f"Invalid mode or missing required clients: mode={mode}")

```

--------------------------------------------------------------------------------
/docs/src/css/custom.css:
--------------------------------------------------------------------------------

```css
/*
 * Copyright (c) 2023-2024 Datalayer, Inc.
 *
 * BSD 3-Clause License
 */

/* stylelint-disable docusaurus/copyright-header */
/**
 * Any CSS included here will be global. The classic template
 * bundles Infima by default. Infima is a CSS framework designed to
 * work well for content-centric websites.
 */

/* You can override the default Infima variables here. */
:root {
  --ifm-color-primary: #25c2a0;
  --ifm-color-primary-dark: rgb(33, 175, 144);
  --ifm-color-primary-darker: rgb(31, 165, 136);
  --ifm-color-primary-darkest: rgb(26, 136, 112);
  --ifm-color-primary-light: rgb(70, 203, 174);
  --ifm-color-primary-lighter: rgb(102, 212, 189);
  --ifm-color-primary-lightest: rgb(146, 224, 208);
  --ifm-code-font-size: 95%;
}

.docusaurus-highlight-code-line {
  background-color: rgb(72, 77, 91);
  display: block;
  margin: 0 calc(-1 * var(--ifm-pre-padding));
  padding: 0 var(--ifm-pre-padding);
}

.header-datalayer-io-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='none' aria-hidden='true' viewBox='0 0 20 20'%3E%3Cpath fill='%232ECC71' d='M0 0h20v4H0zm0 0'/%3E%3Cpath fill='%231ABC9C' d='M0 8h20v4H0zm0 0'/%3E%3Cpath fill='%2316A085' d='M0 16h20v4H0zm0 0'/%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-datalayer-io-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-datalayer-io-link::before {
  background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='none' aria-hidden='true' viewBox='0 0 20 20'%3E%3Cpath fill='%232ECC71' d='M0 0h20v4H0zm0 0'/%3E%3Cpath fill='%231ABC9C' d='M0 8h20v4H0zm0 0'/%3E%3Cpath fill='%2316A085' d='M0 16h20v4H0zm0 0'/%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-github-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%3C%3Fxml version='1.0' encoding='UTF-8' standalone='no'%3F%3E%3Csvg viewBox='0 0 80 80' version='1.1' id='svg4' xmlns='http://www.w3.org/2000/svg' xmlns:svg='http://www.w3.org/2000/svg'%3E%3Cdefs id='defs8' /%3E%3Cpath fill='%23959da5' d='M 40,0 C 17.9,0 0,17.900001 0,40 c 0,17.7 11.45,32.65 27.35,37.950001 2,0.35 2.75,-0.85 2.75,-1.9 0,-0.95 -0.05,-4.1 -0.05,-7.45 C 20,70.45 17.4,66.15 16.6,63.9 16.15,62.75 14.2,59.2 12.5,58.25 11.1,57.5 9.1,55.65 12.45,55.600001 c 3.15,-0.05 5.4,2.899999 6.15,4.1 3.6,6.05 9.35,4.35 11.65,3.3 0.35,-2.6 1.4,-4.35 2.55,-5.35 -8.9,-1 -18.2,-4.45 -18.2,-19.75 0,-4.35 1.55,-7.95 4.1,-10.75 -0.4,-1 -1.8,-5.1 0.4,-10.6 0,0 3.35,-1.05 11,4.1 3.2,-0.9 6.6,-1.35 10,-1.35 3.4,0 6.8,0.45 10,1.35 7.65,-5.2 11,-4.1 11,-4.1 2.2,5.5 0.8,9.6 0.4,10.6 2.55,2.8 4.1,6.35 4.1,10.75 0,15.35 -9.35,18.75 -18.25,19.75 1.45,1.25 2.7,3.65 2.7,7.4 0,5.349999 -0.05,9.65 -0.05,11 0,1.05 0.75,2.3 2.75,1.9 A 40.065,40.065 0 0 0 80,40 C 80,17.900001 62.1,0 40,0 Z' id='path2' style='stroke-width:5' /%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-github-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-github-link::before {
  background: url("data:image/svg+xml,%3C%3Fxml version='1.0' encoding='UTF-8' standalone='no'%3F%3E%3Csvg viewBox='0 0 80 80' version='1.1' id='svg4' xmlns='http://www.w3.org/2000/svg' xmlns:svg='http://www.w3.org/2000/svg'%3E%3Cdefs id='defs8' /%3E%3Cpath fill='%23959da5' d='M 40,0 C 17.9,0 0,17.900001 0,40 c 0,17.7 11.45,32.65 27.35,37.950001 2,0.35 2.75,-0.85 2.75,-1.9 0,-0.95 -0.05,-4.1 -0.05,-7.45 C 20,70.45 17.4,66.15 16.6,63.9 16.15,62.75 14.2,59.2 12.5,58.25 11.1,57.5 9.1,55.65 12.45,55.600001 c 3.15,-0.05 5.4,2.899999 6.15,4.1 3.6,6.05 9.35,4.35 11.65,3.3 0.35,-2.6 1.4,-4.35 2.55,-5.35 -8.9,-1 -18.2,-4.45 -18.2,-19.75 0,-4.35 1.55,-7.95 4.1,-10.75 -0.4,-1 -1.8,-5.1 0.4,-10.6 0,0 3.35,-1.05 11,4.1 3.2,-0.9 6.6,-1.35 10,-1.35 3.4,0 6.8,0.45 10,1.35 7.65,-5.2 11,-4.1 11,-4.1 2.2,5.5 0.8,9.6 0.4,10.6 2.55,2.8 4.1,6.35 4.1,10.75 0,15.35 -9.35,18.75 -18.25,19.75 1.45,1.25 2.7,3.65 2.7,7.4 0,5.349999 -0.05,9.65 -0.05,11 0,1.05 0.75,2.3 2.75,1.9 A 40.065,40.065 0 0 0 80,40 C 80,17.900001 62.1,0 40,0 Z' id='path2' style='stroke-width:5' /%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-bluesky-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%202.88%202.88%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20d%3D%22M%201.44%2C1.306859%20C%201.30956%2C1.053179%200.95447995%2C0.58049901%200.62423999%2C0.34745901%200.30791999%2C0.12413901%200.18732%2C0.16277901%200.10824%2C0.19865901%200.01668%2C0.23981901%200%2C0.38045901%200%2C0.46301901%20c%200%2C0.0828%200.04536%2C0.67799999%200.07488%2C0.77747999%200.0978%2C0.32832%200.44556%2C0.4392%200.76595999%2C0.4036799%200.01632%2C-0.0024%200.033%2C-0.00468%200.0498%2C-0.00672%20-0.01656%2C0.00264%20-0.03312%2C0.0048%20-0.0498%2C0.00672%20C%200.3714%2C1.7137789%20-0.0456%2C1.884779%200.50124%2C2.493539%201.1027999%2C3.1163391%201.3256399%2C2.359979%201.44%2C1.976579%201.55436%2C2.359979%201.686%2C3.0890991%202.36796%2C2.493539%202.88%2C1.976579%202.5086%2C1.713779%202.0391599%2C1.6441789%20a%201.04892%2C1.04892%200%200%201%20-0.0498%2C-0.00672%20c%200.0168%2C0.00204%200.03348%2C0.00432%200.0498%2C0.00672%20C%202.35956%2C1.6798189%202.7073199%2C1.5688189%202.80512%2C1.240499%202.8346401%2C1.141139%202.88%2C0.54569891%202.88%2C0.46313901%20c%200%2C-0.0828%20-0.01668%2C-0.22332%20-0.10824%2C-0.26472%20-0.07908%2C-0.03576%20-0.19968%2C-0.0744%20-0.516%2C0.1488%20C%201.92552%2C0.58061901%201.5704399%2C1.053299%201.44%2C1.306859%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.12%22%0A%20%20%20%20%20id%3D%22path2%22%20%2F%3E%0A%3C%2Fsvg%3E%0A")
    no-repeat;
}

.header-bluesky-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-bluesky-link::before {
  background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%202.88%202.88%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20d%3D%22M%201.44%2C1.306859%20C%201.30956%2C1.053179%200.95447995%2C0.58049901%200.62423999%2C0.34745901%200.30791999%2C0.12413901%200.18732%2C0.16277901%200.10824%2C0.19865901%200.01668%2C0.23981901%200%2C0.38045901%200%2C0.46301901%20c%200%2C0.0828%200.04536%2C0.67799999%200.07488%2C0.77747999%200.0978%2C0.32832%200.44556%2C0.4392%200.76595999%2C0.4036799%200.01632%2C-0.0024%200.033%2C-0.00468%200.0498%2C-0.00672%20-0.01656%2C0.00264%20-0.03312%2C0.0048%20-0.0498%2C0.00672%20C%200.3714%2C1.7137789%20-0.0456%2C1.884779%200.50124%2C2.493539%201.1027999%2C3.1163391%201.3256399%2C2.359979%201.44%2C1.976579%201.55436%2C2.359979%201.686%2C3.0890991%202.36796%2C2.493539%202.88%2C1.976579%202.5086%2C1.713779%202.0391599%2C1.6441789%20a%201.04892%2C1.04892%200%200%201%20-0.0498%2C-0.00672%20c%200.0168%2C0.00204%200.03348%2C0.00432%200.0498%2C0.00672%20C%202.35956%2C1.6798189%202.7073199%2C1.5688189%202.80512%2C1.240499%202.8346401%2C1.141139%202.88%2C0.54569891%202.88%2C0.46313901%20c%200%2C-0.0828%20-0.01668%2C-0.22332%20-0.10824%2C-0.26472%20-0.07908%2C-0.03576%20-0.19968%2C-0.0744%20-0.516%2C0.1488%20C%201.92552%2C0.58061901%201.5704399%2C1.053299%201.44%2C1.306859%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.12%22%0A%20%20%20%20%20id%3D%22path2%22%20%2F%3E%0A%3C%2Fsvg%3E%0A")
    no-repeat;
}

.header-linkedin-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%0A%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 19 18'%3E%3Cpath d='M3.94 2A2 2 0 1 1 2 0a2 2 0 0 1 1.94 2zM4 5.48H0V18h4zm6.32 0H6.34V18h3.94v-6.57c0-3.66 4.77-4 4.77 0V18H19v-7.93c0-6.17-7.06-5.94-8.72-2.91z' fill='rgb(149, 157, 165)'/%3E%3C/svg%3E")
    no-repeat;
}

.header-linkedin-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-linkedin-link::before {
  background-image: url("data:image/svg+xml,%0A%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 19 18'%3E%3Cpath d='M3.94 2A2 2 0 1 1 2 0a2 2 0 0 1 1.94 2zM4 5.48H0V18h4zm6.32 0H6.34V18h3.94v-6.57c0-3.66 4.77-4 4.77 0V18H19v-7.93c0-6.17-7.06-5.94-8.72-2.91z' fill='rgb(149, 157, 165)'/%3E%3C/svg%3E")
    no-repeat;
}

.header-x-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%3Csvg%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%20viewBox%3D%220%200%201200%201227%22%20fill%3D%22rgb(149%2C%20157%2C%20165)%22%3E%3Cpath%20d%3D%22M714.163%20519.284%201160.89%200h-105.86L667.137%20450.887%20357.328%200H0l468.492%20681.821L0%201226.37h105.866l409.625-476.152%20327.181%20476.152H1200L714.137%20519.284h.026ZM569.165%20687.828l-47.468-67.894-377.686-540.24h162.604l304.797%20435.991%2047.468%2067.894%20396.2%20566.721H892.476L569.165%20687.854v-.026Z%22%20%2F%3E%3C%2Fsvg%3E")
    no-repeat;
}

.header-x-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-x-link::before {
  background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' fill='%231DA1F2' viewBox='0 0 20 20' aria-hidden='true'%3E%3Cpath d='M19.96 3.808a8.333 8.333 0 01-2.353.646 4.132 4.132 0 001.802-2.269 8.47 8.47 0 01-2.606.987 4.1 4.1 0 00-6.986 3.735c-3.409-.161-6.428-1.799-8.45-4.272a4.018 4.018 0 00-.555 2.063A4.1 4.1 0 002.635 8.11a4.087 4.087 0 01-1.857-.513v.05a4.102 4.102 0 003.289 4.022 4.162 4.162 0 01-1.844.07 4.114 4.114 0 003.837 2.848 8.223 8.223 0 01-5.085 1.755c-.325 0-.65-.02-.975-.056a11.662 11.662 0 006.298 1.84c7.544 0 11.665-6.246 11.665-11.654 0-.175 0-.35-.013-.525A8.278 8.278 0 0020 3.825l-.04-.017z'/%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-discord-link::before {
  content: '';
  width: 24px;
  height: 18px;
  display: flex;
  background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 127.14 96.36'%3E%3Cpath fill='rgb(149, 157, 165)' d='M107.7,8.07A105.15,105.15,0,0,0,81.47,0a72.06,72.06,0,0,0-3.36,6.83A97.68,97.68,0,0,0,49,6.83,72.37,72.37,0,0,0,45.64,0,105.89,105.89,0,0,0,19.39,8.09C2.79,32.65-1.71,56.6.54,80.21h0A105.73,105.73,0,0,0,32.71,96.36,77.7,77.7,0,0,0,39.6,85.25a68.42,68.42,0,0,1-10.85-5.18c.91-.66,1.8-1.34,2.66-2a75.57,75.57,0,0,0,64.32,0c.87.71,1.76,1.39,2.66,2a68.68,68.68,0,0,1-10.87,5.19,77,77,0,0,0,6.89,11.1A105.25,105.25,0,0,0,126.6,80.22h0C129.24,52.84,122.09,29.11,107.7,8.07ZM42.45,65.69C36.18,65.69,31,60,31,53s5-12.74,11.43-12.74S54,46,53.89,53,48.84,65.69,42.45,65.69Zm42.24,0C78.41,65.69,73.25,60,73.25,53s5-12.74,11.44-12.74S96.23,46,96.12,53,91.08,65.69,84.69,65.69Z'/%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-discord-link:hover {
  opacity: 0.6;
}

[data-theme='dark'] .header-discord-link::before {
  background: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 127.14 96.36'%3E%3Cpath fill='rgb(149, 157, 165)' d='M107.7,8.07A105.15,105.15,0,0,0,81.47,0a72.06,72.06,0,0,0-3.36,6.83A97.68,97.68,0,0,0,49,6.83,72.37,72.37,0,0,0,45.64,0,105.89,105.89,0,0,0,19.39,8.09C2.79,32.65-1.71,56.6.54,80.21h0A105.73,105.73,0,0,0,32.71,96.36,77.7,77.7,0,0,0,39.6,85.25a68.42,68.42,0,0,1-10.85-5.18c.91-.66,1.8-1.34,2.66-2a75.57,75.57,0,0,0,64.32,0c.87.71,1.76,1.39,2.66,2a68.68,68.68,0,0,1-10.87,5.19,77,77,0,0,0,6.89,11.1A105.25,105.25,0,0,0,126.6,80.22h0C129.24,52.84,122.09,29.11,107.7,8.07ZM42.45,65.69C36.18,65.69,31,60,31,53s5-12.74,11.43-12.74S54,46,53.89,53,48.84,65.69,42.45,65.69Zm42.24,0C78.41,65.69,73.25,60,73.25,53s5-12.74,11.44-12.74S96.23,46,96.12,53,91.08,65.69,84.69,65.69Z'/%3E%3C/svg%3E%0A")
    no-repeat;
}

.header-tiktok-link::before {
  content: '';
  width: 24px;
  height: 24px;
  display: flex;
  background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20fill%3D%22%23959da5%22%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20viewBox%3D%220%200%200.72%200.72%22%0A%20%20%20xml%3Aspace%3D%22preserve%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%3Cpath%0A%20%20%20%20%20d%3D%22M%200.63539429%2C0.16867393%20A%200.1725255%2C0.17252543%200%200%201%200.49969199%2C0.01587392%20V%200%20h%20-0.1240039%20v%200.49212763%20a%200.10424241%2C0.10424236%200%200%201%20-0.1872116%2C0.0627398%20l%20-7.2e-5%2C-3.6e-5%207.2e-5%2C3.6e-5%20A%200.10420641%2C0.10420637%200%200%201%200.30304959%2C0.39252865%20V%200.26654513%20A%200.22781429%2C0.2278142%200%200%200%200.10889089%2C0.65140678%200.22785029%2C0.2278502%200%200%200%200.49969199%2C0.4921636%20V%200.24070051%20a%200.2945136%2C0.29451347%200%200%200%200.1718056%2C0.0549288%20V%200.17241745%20a%200.17389332%2C0.17389325%200%200%201%20-0.0361033%2C-0.003744%20z%22%0A%20%20%20%20%20id%3D%22path2%22%0A%20%20%20%20%20style%3D%22stroke-width%3A0.0359952%22%20%2F%3E%3C%2Fsvg%3E%0A")
    no-repeat;
}

.header-tiktok-link:hover {
  opacity: 0.6;
}

.header-youtube-link::before {
  content: '';
  width: 24px;
  height: 20px;
  display: flex;
  background: url("data:image/svg+xml,%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22UTF-8%22%20standalone%3D%22no%22%3F%3E%0A%3Csvg%0A%20%20%20viewBox%3D%220%200%2024%2024%22%0A%20%20%20version%3D%221.1%22%0A%20%20%20id%3D%22svg4%22%0A%20%20%20width%3D%2224%22%0A%20%20%20height%3D%2224%22%0A%20%20%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%0A%20%20%20xmlns%3Asvg%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%0A%20%20%3Cdefs%0A%20%20%20%20%20id%3D%22defs8%22%20%2F%3E%0A%20%20%3Cpath%0A%20%20%20%20%20d%3D%22M%2023.496693%2C5.8315054%20A%203.0042862%2C3.0042862%200%200%200%2021.393692%2C3.6909515%20C%2019.516013%2C3.1652014%2011.992781%2C3.1652014%2011.992781%2C3.1652014%20A%2072.040279%2C72.040279%200%200%200%202.6043863%2C3.6659158%203.1169469%2C3.1169469%200%200%200%200.488868%2C5.8315054%2032.884416%2C32.884416%200%200%200%206.7149698e-4%2C11.677346%2032.734202%2C32.734202%200%200%200%200.488868%2C17.523186%203.0418398%2C3.0418398%200%200%200%202.6043863%2C19.663739%20c%201.9027146%2C0.525751%209.3883947%2C0.525751%209.3883947%2C0.525751%20a%2072.215529%2C72.215529%200%200%200%209.400911%2C-0.500715%203.0042862%2C3.0042862%200%200%200%202.103001%2C-2.140554%2032.083273%2C32.083273%200%200%200%200.500714%2C-5.845839%2030.042862%2C30.042862%200%200%200%20-0.500714%2C-5.8708766%20z%20M%209.6018695%2C15.320042%20V%208.0346486%20l%206.2589285%2C3.6426974%20z%22%0A%20%20%20%20%20fill%3D%22%23959da5%22%0A%20%20%20%20%20id%3D%22path2%22%0A%20%20%20%20%20style%3D%22stroke-width%3A1.25179%22%20%2F%3E%0A%3C%2Fsvg%3E%0A")
    no-repeat;
}

.header-youtube-link:hover {
  opacity: 0.6;
}

header .container {
  max-width: 9000px;
}

```
Page 2/5FirstPrevNextLast