#
tokens: 46315/50000 7/134 files (page 3/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 5. Use http://codebase.md/datalayer/jupyter-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .github
│   ├── copilot-instructions.md
│   ├── dependabot.yml
│   └── workflows
│       ├── build.yml
│       ├── fix-license-header.yml
│       ├── lint.sh
│       ├── prep-release.yml
│       ├── publish-release.yml
│       └── test.yml
├── .gitignore
├── .licenserc.yaml
├── .pre-commit-config.yaml
├── .vscode
│   ├── mcp.json
│   └── settings.json
├── ARCHITECTURE.md
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── dev
│   ├── content
│   │   ├── new.ipynb
│   │   ├── notebook.ipynb
│   │   └── README.md
│   └── README.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .yarnrc.yml
│   ├── babel.config.js
│   ├── docs
│   │   ├── _category_.yaml
│   │   ├── clients
│   │   │   ├── _category_.yaml
│   │   │   ├── claude_desktop
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── cline
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── cursor
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   ├── index.mdx
│   │   │   ├── vscode
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── index.mdx
│   │   │   └── windsurf
│   │   │       ├── _category_.yaml
│   │   │       └── index.mdx
│   │   ├── configure
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── contribute
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── deployment
│   │   │   ├── _category_.yaml
│   │   │   ├── datalayer
│   │   │   │   ├── _category_.yaml
│   │   │   │   └── streamable-http
│   │   │   │       └── index.mdx
│   │   │   ├── index.mdx
│   │   │   └── jupyter
│   │   │       ├── _category_.yaml
│   │   │       ├── index.mdx
│   │   │       ├── stdio
│   │   │       │   ├── _category_.yaml
│   │   │       │   └── index.mdx
│   │   │       └── streamable-http
│   │   │           ├── _category_.yaml
│   │   │           ├── jupyter-extension
│   │   │           │   └── index.mdx
│   │   │           └── standalone
│   │   │               └── index.mdx
│   │   ├── index.mdx
│   │   ├── releases
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   ├── resources
│   │   │   ├── _category_.yaml
│   │   │   └── index.mdx
│   │   └── tools
│   │       ├── _category_.yaml
│   │       └── index.mdx
│   ├── docusaurus.config.js
│   ├── LICENSE
│   ├── Makefile
│   ├── package.json
│   ├── README.md
│   ├── sidebars.js
│   ├── src
│   │   ├── components
│   │   │   ├── HomepageFeatures.js
│   │   │   ├── HomepageFeatures.module.css
│   │   │   ├── HomepageProducts.js
│   │   │   └── HomepageProducts.module.css
│   │   ├── css
│   │   │   └── custom.css
│   │   ├── pages
│   │   │   ├── index.module.css
│   │   │   ├── markdown-page.md
│   │   │   └── testimonials.tsx
│   │   └── theme
│   │       └── CustomDocItem.tsx
│   └── static
│       └── img
│           ├── datalayer
│           │   ├── logo.png
│           │   └── logo.svg
│           ├── favicon.ico
│           ├── feature_1.svg
│           ├── feature_2.svg
│           ├── feature_3.svg
│           ├── product_1.svg
│           ├── product_2.svg
│           └── product_3.svg
├── examples
│   └── integration_example.py
├── jupyter_mcp_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── __version__.py
│   ├── config.py
│   ├── enroll.py
│   ├── env.py
│   ├── jupyter_extension
│   │   ├── __init__.py
│   │   ├── backends
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── local_backend.py
│   │   │   └── remote_backend.py
│   │   ├── context.py
│   │   ├── extension.py
│   │   ├── handlers.py
│   │   └── protocol
│   │       ├── __init__.py
│   │       └── messages.py
│   ├── models.py
│   ├── notebook_manager.py
│   ├── server_modes.py
│   ├── server.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── _base.py
│   │   ├── _registry.py
│   │   ├── assign_kernel_to_notebook_tool.py
│   │   ├── delete_cell_tool.py
│   │   ├── execute_cell_tool.py
│   │   ├── execute_ipython_tool.py
│   │   ├── insert_cell_tool.py
│   │   ├── insert_execute_code_cell_tool.py
│   │   ├── list_cells_tool.py
│   │   ├── list_files_tool.py
│   │   ├── list_kernels_tool.py
│   │   ├── list_notebooks_tool.py
│   │   ├── overwrite_cell_source_tool.py
│   │   ├── read_cell_tool.py
│   │   ├── read_cells_tool.py
│   │   ├── restart_notebook_tool.py
│   │   ├── unuse_notebook_tool.py
│   │   └── use_notebook_tool.py
│   └── utils.py
├── jupyter-config
│   ├── jupyter_notebook_config
│   │   └── jupyter_mcp_server.json
│   └── jupyter_server_config.d
│       └── jupyter_mcp_server.json
├── LICENSE
├── Makefile
├── pyproject.toml
├── pytest.ini
├── README.md
├── RELEASE.md
├── smithery.yaml
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── test_common.py
    ├── test_config.py
    ├── test_jupyter_extension.py
    ├── test_list_kernels.py
    ├── test_tools.py
    └── test_use_notebook.py
```

# Files

--------------------------------------------------------------------------------
/docs/static/img/product_1.svg:
--------------------------------------------------------------------------------

```
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
  ~ Copyright (c) 2023-2024 Datalayer, Inc.
  ~
  ~ BSD 3-Clause License
-->

<svg
   xmlns:dc="http://purl.org/dc/elements/1.1/"
   xmlns:cc="http://creativecommons.org/ns#"
   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
   xmlns:svg="http://www.w3.org/2000/svg"
   xmlns="http://www.w3.org/2000/svg"
   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
   viewBox="0 0 222.55631 184.29483"
   version="1.1"
   id="svg1377"
   sodipodi:docname="1.svg"
   inkscape:version="1.0.1 (c497b03c, 2020-09-10)"
   width="222.5563"
   height="184.29483">
  <metadata
     id="metadata1381">
    <rdf:RDF>
      <cc:Work
         rdf:about="">
        <dc:format>image/svg+xml</dc:format>
        <dc:type
           rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
        <dc:title>Cloud_database_SVG</dc:title>
      </cc:Work>
    </rdf:RDF>
  </metadata>
  <sodipodi:namedview
     pagecolor="#ffffff"
     bordercolor="#666666"
     borderopacity="1"
     objecttolerance="10"
     gridtolerance="10"
     guidetolerance="10"
     inkscape:pageopacity="0"
     inkscape:pageshadow="2"
     inkscape:window-width="1349"
     inkscape:window-height="694"
     id="namedview1379"
     showgrid="false"
     inkscape:zoom="0.81391066"
     inkscape:cx="246.97899"
     inkscape:cy="87.469146"
     inkscape:window-x="173"
     inkscape:window-y="109"
     inkscape:window-maximized="0"
     inkscape:current-layer="svg1377"
     inkscape:document-rotation="0"
     fit-margin-top="0"
     fit-margin-left="0"
     fit-margin-right="0"
     fit-margin-bottom="0" />
  <defs
     id="defs847">
    <style
       id="style833">.cls-1{fill:none;}.cls-2{clip-path:url(#clip-path);}.cls-3{fill:#d3d3dd;}.cls-4{fill:#e0dee9;}.cls-5{fill:#eaeaf4;}.cls-6{fill:#2a313f;}.cls-7{fill:#9052fe;}.cls-8{opacity:0.4;isolation:isolate;}.cls-9{clip-path:url(#clip-path-3);}.cls-10{fill:#d6d8e5;}.cls-11{clip-path:url(#clip-path-4);}.cls-12{fill:#3a2c6d;}.cls-13{fill:#ffcea9;}.cls-14{fill:#f4f4f4;}.cls-15{fill:#38226d;}.cls-16{fill:#9c73ff;}.cls-17{fill:#ff7b7b;}.cls-18{fill:#ededed;}.cls-19{fill:#8c50ff;}.cls-20{fill:#bfbfbf;}.cls-21{fill:#e5e5e5;}.cls-22{clip-path:url(#clip-path-5);}</style>
    <clipPath
       id="clip-path"
       transform="translate(-28.33 0)">
      <rect
         class="cls-1"
         width="500"
         height="500"
         id="rect835"
         x="0"
         y="0" />
    </clipPath>
    <clipPath
       id="clip-path-3"
       transform="translate(-28.33 0)">
      <rect
         class="cls-1"
         x="375.23999"
         y="440.37"
         width="36.549999"
         height="22.549999"
         id="rect838" />
    </clipPath>
    <clipPath
       id="clip-path-4"
       transform="translate(-28.33 0)">
      <rect
         class="cls-1"
         x="273.67001"
         y="353.35001"
         width="39.59"
         height="26.34"
         id="rect841" />
    </clipPath>
    <clipPath
       id="clip-path-5"
       transform="translate(-28.33 0)">
      <rect
         class="cls-1"
         x="321.47"
         y="473.66"
         width="39.59"
         height="26.34"
         id="rect844" />
    </clipPath>
  </defs>
  <title
     id="title849">Cloud_database_SVG</title>
  <path
     class="cls-5"
     d="m 199.73301,51.578526 0.37501,-0.16811 c 0.49137,-0.2069 0.98275,-0.38793 1.47413,-0.55603 h 0.0905 c 0.45259,-0.15518 0.90517,-0.28449 1.29311,-0.4138 h 0.28448 a 19.396569,19.396569 0 0 1 2.14655,-0.46551 h 0.15518 c 0.55603,-0.0776 1.11207,-0.12931 1.65517,-0.15518 h 0.32328 c 0.51724,0 1.02155,0 1.52586,0 h 0.32328 c 0.38793,0 0.76293,0.0776 1.13793,0.12931 h 0.34914 c 0.43965,0.0776 0.87931,0.18104 1.2931,0.29742 h 0.15517 a 10.538802,10.538802 0 0 1 1.07328,0.375 l 0.34914,0.15517 0.94396,0.45259 0.32328,0.1681 v 0 l -44.96125,-26.13364 -0.14224,-0.0776 -0.18103,-0.0905 -0.94397,-0.45258 -0.18103,-0.0905 -0.15518,-0.0647 c -0.36207,-0.14224 -0.72413,-0.27155 -1.0862,-0.375 h -0.12931 v 0 c -0.42673,-0.11638 -0.85345,-0.21982 -1.29311,-0.29741 h -0.12931 -0.21983 c -0.375,0 -0.75,-0.10345 -1.13793,-0.12931 h -0.21983 -0.10345 c -0.50431,0 -1.00862,0 -1.52586,0 h -0.11638 -0.2069 c -0.5431,0 -1.0862,0.0776 -1.65517,0.15517 h -0.12931 a 19.396569,19.396569 0 0 0 -2.14655,0.46552 h -0.16811 -0.11638 c -0.45258,0.12931 -0.90517,0.25862 -1.2931,0.41379 h -0.0905 c -0.49138,0.16811 -0.98276,0.34914 -1.47414,0.55604 h -0.11638 l -0.25862,0.12931 c -0.45258,0.19396 -0.9181,0.40086 -1.38362,0.63362 -0.15517,0.0776 -0.32328,0.14224 -0.47845,0.23276 -0.5819,0.28448 -1.16379,0.59483 -1.74569,0.94396 l 45.03883,26.17244 c 0.5819,-0.34914 1.1638,-0.65948 1.74569,-0.94396 l 0.47845,-0.23276 c 0.46552,-0.23276 0.91811,-0.43966 1.38363,-0.63362"
     id="path1059"
     style="stroke-width:1.2931" />
  <path
     class="cls-5"
     d="m 173.41834,41.906096 c 0.75,-0.3362 1.5,-0.64655 2.22414,-0.90517 h 0.10344 a 18.439671,18.439671 0 0 1 1.9138,-0.53017 q 0.5431,-0.14224 1.08621,-0.23276 a 12.788804,12.788804 0 0 1 1.2931,-0.2069 l 0.77586,-0.0776 q 0.85345,0 1.66811,0 h 0.375 a 15.168117,15.168117 0 0 1 1.91379,0.24568 l 0.46552,0.10345 a 13.745702,13.745702 0 0 1 1.42242,0.4138 l 0.43965,0.15517 a 11.340527,11.340527 0 0 1 1.69397,0.81466 l -44.90952,-26.25003 -0.14224,-0.0776 a 13.47415,13.47415 0 0 0 -1.29311,-0.63363 l -0.25862,-0.0905 -0.43965,-0.15518 -0.67242,-0.24569 -0.76293,-0.15517 -0.46552,-0.10345 h -0.24569 a 15.20691,15.20691 0 0 0 -1.56466,-0.19396 h -0.47844 c -0.40087,0 -0.81466,0 -1.29311,0 -0.14224,0 -0.28448,0 -0.43965,0 l -0.77587,0.0905 c -0.23276,0 -0.46551,0 -0.7112,0 l -0.64656,0.14224 q -0.5431,0.0905 -1.0862,0.23276 l -0.56897,0.11638 c -0.43966,0.11637 -0.89224,0.25862 -1.2931,0.41379 h -0.10345 c -0.50431,0.18103 -1.02156,0.375 -1.55173,0.59483 l -0.67241,0.29741 -0.63362,0.29741 c -0.7888,0.36207 -1.57759,0.76294 -2.37932,1.22845 -0.80172,0.46552 -1.93965,1.18966 -2.89655,1.87501 l 45.03883,26.17243 c 0.94397,-0.67241 1.9138,-1.2931 2.89656,-1.875 0.98276,-0.5819 1.59051,-0.86638 2.37931,-1.22845 l 0.63362,-0.28448"
     id="path1061"
     style="stroke-width:1.2931" />
  <path
     class="cls-5"
     d="m 130.8752,29.143156 0.68534,-0.29741 c 0.67242,-0.27156 1.29311,-0.53018 2.00431,-0.76294 l 0.375,-0.14224 c 0.7888,-0.25862 1.56466,-0.50431 2.34052,-0.7112 0.43966,-0.12931 0.89224,-0.23276 1.29311,-0.33621 l 0.65948,-0.15517 c 0.63362,-0.14225 1.29311,-0.24569 1.875,-0.34914 h 0.10345 c 0.62069,-0.0905 1.29311,-0.15517 1.83621,-0.21983 h 0.5819 1.07327 a 23.379331,23.379331 0 0 1 2.52156,0 h 0.43965 c 0.65949,0 1.29311,0.12931 1.96552,0.23276 l 0.60776,0.10345 a 19.668121,19.668121 0 0 1 2.19828,0.50431 h 0.0905 a 19.875018,19.875018 0 0 1 1.97845,0.69827 l 0.55603,0.23276 c 0.55604,0.24569 1.08621,0.50431 1.61638,0.7888 l 0.51724,0.27155 v 0 l -45.0259,-26.2112301 -0.23276,-0.12931 -0.28448,-0.14224 c -0.51724,-0.28449 -1.06035,-0.54311 -1.61638,-0.7888 l -0.27155,-0.14224 -0.25862,-0.0905 a 19.875018,19.875018 0 0 0 -1.97845,-0.69826999 h -0.0647 v 0 a 19.668121,19.668121 0 0 0 -2.22414,-0.53017 h -0.25862 -0.34914 c -0.64655,-0.10345 -1.2931,-0.18104 -1.96552,-0.23276 a 2.7543128,2.7543128 0 0 1 -0.32327,0 h -0.10345 a 23.637952,23.637952 0 0 0 -2.58621,0 h -0.23276 -0.82759 -0.58189 c -0.59483,0.0647 -1.21552,0.12931 -1.82328,0.21982 v 0 h -0.0905 c -0.62069,0.0905 -1.2931,0.2069 -1.875,0.34914 -0.23276,0 -0.45259,0.10345 -0.67241,0.15517 -0.21983,0.0517 -0.80173,0.18104 -1.20259,0.29741999 h -0.12931 c -0.77587,0.2069 -1.55173,0.45259 -2.34052,0.71121 l -0.375,0.14224 c -0.65949,0.23276 -1.29311,0.49138 -2.00431,0.76293 l -0.21983,0.0776 c -0.15517,0 -0.31035,0.15517 -0.46552,0.20689 -0.71121,0.31035 -1.42241,0.63362 -2.14655,0.98276 l -0.7888,0.38793 c -0.93103,0.46552 -1.86207,0.9569 -2.7931,1.5 -1.29311,0.73707 -2.58621,1.5388 -3.80173,2.37932 -0.43966,0.29741 -0.85345,0.63362 -1.2931,0.94396 -0.80173,0.56897 -1.61639,1.1379401 -2.40518,1.7456901 -0.50431,0.38794 -0.98276,0.81466 -1.47414,1.29311 -0.71121,0.56897 -1.42241,1.13793 -2.12069,1.74569 -0.69828,0.60776 -1.00862,0.94397 -1.52586,1.42242 -0.51725,0.47844 -1.29311,1.18965 -1.95259,1.82327 l -0.0905,0.0776 q -1.44828,1.46121 -2.87069,2.98707 l -0.77586,0.86638 c -0.36207,0.40086 -0.72414,0.78879 -1.08621,1.20259 -0.36207,0.41379 -0.46552,0.56896 -0.69828,0.85345 -0.67241,0.80172 -1.2931,1.60345 -1.99138,2.4181 -0.25862,0.33621 -0.5431,0.64656 -0.81466,0.99569 -0.27155,0.34914 -0.375,0.51725 -0.56896,0.77587 -0.49138,0.63362 -0.9569,1.2931 -1.43535,1.93965 -0.29741,0.4138 -0.60776,0.80173 -0.89224,1.21552 -0.28448,0.4138 -0.38793,0.59483 -0.59483,0.87931 -0.38793,0.56897 -0.77586,1.15087 -1.15086,1.73276 -0.375,0.5819 -0.5431,0.77587 -0.80173,1.17673 l -0.69827,1.15086 -0.98276,1.60345 -0.56897,0.94397 c -0.32327,0.5431 -0.60776,1.0862 -0.9181,1.62931 l -0.81466,1.47414 -0.32327,0.5819 c -0.43966,0.82758 -0.85345,1.6681 -1.29311,2.50862 l -0.50431,1.00862 -0.10345,0.2069 c -0.60776,1.2931 -1.17672,2.50862 -1.73276,3.77586 l -0.0776,0.19397 c -0.15518,0.34914 -0.28449,0.71121 -0.43966,1.07327 -0.36207,0.87932 -0.73707,1.7457 -1.07328,2.58621 -0.0776,0.18104 -0.12931,0.34914 -0.19396,0.53018 -0.21983,0.56896 -0.4138,1.15086 -0.62069,1.71983 -0.2069,0.56896 -0.46552,1.2931 -0.67242,1.86207 -0.0776,0.25862 -0.14224,0.50431 -0.21982,0.75 -0.23276,0.7112 -0.43966,1.42241 -0.65949,2.13362 -0.21983,0.71121 -0.3362,1.04741 -0.47845,1.57759 -0.0647,0.24569 -0.11638,0.49138 -0.18103,0.75 -0.24569,0.93103 -0.46552,1.84914 -0.68535,2.78017 -0.11638,0.49138 -0.25862,0.98276 -0.36207,1.46121 -0.10344,0.47845 -0.1681,0.80173 -0.24569,1.20259 -0.45258,0.25862 -0.9181,0.49138 -1.38362,0.76293 -0.98276,0.56897 -1.93965,1.18966 -2.89655,1.82328 l -0.49138,0.32327 c -0.91811,0.62069 -1.82328,1.29311 -2.71552,1.96552 l -0.51724,0.4138 c -0.90518,0.7112 -1.79742,1.43534 -2.67673,2.2112 l -0.23276,0.2069 c -0.85345,0.75 -1.68103,1.5388 -2.50862,2.34052 l -0.25862,0.23276 -0.15517,0.1681 c -0.63363,0.62069 -1.29311,1.29311 -1.87501,1.9138 l -0.0905,0.10345 c -0.5819,0.63362 -1.1638,1.2931 -1.73276,1.91379 l -0.15518,0.18104 -0.20689,0.24569 c -0.91811,1.07327 -1.82328,2.17241 -2.70259,3.29741 l -0.0776,0.0905 -0.19396,0.27155 c -0.69828,0.90517 -1.38363,1.83621 -2.04311,2.76725 l -0.2069,0.28448 a 0.53017288,0.53017288 0 0 0 -0.0905,0.14224 c -0.64656,0.90517 -1.29311,1.83621 -1.86207,2.76724 l -0.18104,0.27156 a 4.3577625,4.3577625 0 0 0 -0.24569,0.41379 q -0.73707,1.15086 -1.43534,2.32759 a 3.375003,3.375003 0 0 1 -0.21983,0.36207 l -0.0776,0.14224 c -0.53017,0.90517 -1.03448,1.82328 -1.52586,2.74138 l -0.11638,0.21983 -0.23276,0.45258 q -0.5819,1.099144 -1.125,2.211214 l -0.23276,0.46552 a 2.5862092,2.5862092 0 0 1 -0.12931,0.27155 c -0.43966,0.94397 -0.87931,1.875 -1.2931,2.8319 v 0.0776 l -0.11638,0.31034 c -0.375,0.85345 -0.72414,1.7069 -1.06035,2.58621 l -0.1681,0.42673 c 0,0.10345 -0.0776,0.23276 -0.12931,0.34914 -0.38794,1.00862 -0.75,2.03017 -1.09914,3.05172 v 0 l -0.0647,0.18104 c -0.34914,1.06034 -0.65948,2.12069 -0.96983,3.1681 a 3.6206929,3.6206929 0 0 1 -0.11638,0.40087 1.7586222,1.7586222 0 0 1 0,0.21982 c -0.3362,1.21552 -0.63362,2.44397 -0.90517,3.67242 l -0.0776,0.32328 c 0,0.1681 -0.0647,0.32327 -0.0905,0.47844 -0.10345,0.47845 -0.19397,0.96983 -0.27155,1.44828 -0.0776,0.47845 -0.12932,0.65949 -0.18104,0.99569 -0.0517,0.33621 -0.15517,0.98276 -0.23276,1.48707 0,0.23276 -0.0647,0.46552 -0.10345,0.69828 a 0.91810426,0.91810426 0 0 1 0,0.18103 c -0.12931,1.04742 -0.23276,2.10776 -0.31034,3.14225 v 0.67241 c 0,1.07328 -0.10345,2.14656 -0.10345,3.19397 0,7.97846 1.96552,14.3276 5.31466,18.76295 a 18.607775,18.607775 0 0 0 5.40518,4.83621 l 45.03883,26.15951 c -6.59483,-3.87932 -10.69398,-11.98708 -10.71984,-23.58623 0,-1.04742 0,-2.12069 0.10345,-3.19397 v -0.65948 c 0.0776,-1.04742 0.18104,-2.10776 0.31035,-3.15518 l 0.11638,-0.87931 c 0.0776,-0.50431 0.14224,-0.99569 0.23275,-1.48707 0.0905,-0.49138 0.11638,-0.65948 0.16811,-0.99569 0.0517,-0.33621 0.18103,-0.96983 0.28448,-1.44828 0,-0.27155 0.11638,-0.53017 0.1681,-0.80172 0.27156,-1.21552 0.56897,-2.44397 0.90518,-3.67242 0,-0.2069 0.11638,-0.41379 0.1681,-0.62069 0.31035,-1.06035 0.62069,-2.12069 0.96983,-3.16811 l 0.0776,-0.23276 c 0.3362,-1.03448 0.7112,-2.0431 1.09914,-3.06465 0.0905,-0.25862 0.19396,-0.51725 0.29741,-0.77587 0.33621,-0.85345 0.68534,-1.71983 1.04741,-2.58621 0.0647,-0.12931 0.11638,-0.25862 0.16811,-0.38793 0.41379,-0.95689 0.85345,-1.88793 1.2931,-2.8319 l 0.36207,-0.73707 q 0.54311,-1.09913 1.125,-2.2112 l 0.34914,-0.67242 c 0.49138,-0.9181 0.99569,-1.83621 1.52586,-2.72845 l 0.31035,-0.51724 c 0.45259,-0.78879 0.93103,-1.56466 1.42241,-2.34052 l 0.42673,-0.67241 c 0.59483,-0.93104 1.21552,-1.86208 1.86207,-2.78018 l 0.28448,-0.40086 c 0.67242,-0.94397 1.35776,-1.875 2.05604,-2.78018 l 0.27155,-0.36207 c 0.87931,-1.125 1.78448,-2.22414 2.70259,-3.29741 l 0.36207,-0.42673 c 0.56896,-0.64655 1.15086,-1.2931 1.73276,-1.90086 l 0.0905,-0.11638 c 0.62069,-0.64655 1.2931,-1.29311 1.875,-1.9138 l 0.41379,-0.40086 c 0.82759,-0.80172 1.66811,-1.59052 2.50862,-2.34052 l 0.23276,-0.20689 q 1.29311,-1.15087 2.67673,-2.211214 l 0.51724,-0.4138 c 0.89224,-0.68534 1.79742,-1.2931 2.71552,-1.96552 l 0.47845,-0.32327 c 0.9569,-0.63362 1.92673,-1.29311 2.89655,-1.81035 l 1.39656,-0.77586 c 0.18103,-0.87931 0.40086,-1.77155 0.60776,-2.6638 0.20689,-0.89224 0.43965,-1.84914 0.68534,-2.78017 0.24569,-0.93104 0.42673,-1.55173 0.65948,-2.32759 0.23276,-0.77586 0.42673,-1.42241 0.65949,-2.13362 0.23276,-0.71121 0.5819,-1.73276 0.89224,-2.58621 0.2069,-0.5819 0.40086,-1.16379 0.62069,-1.73276 0.40086,-1.06035 0.82759,-2.10776 1.2931,-3.15518 0.15518,-0.36207 0.28449,-0.73707 0.43966,-1.0862 0.60776,-1.40949 1.29311,-2.79311 1.9138,-4.17673 l 0.51724,-1.00862 c 0.50431,-1.03449 1.03448,-2.06897 1.59052,-3.09052 0.27155,-0.49138 0.5431,-0.98276 0.82758,-1.47414 q 0.72414,-1.29311 1.47414,-2.58621 c 0.32328,-0.54311 0.64655,-1.08621 0.98276,-1.61638 0.49138,-0.7888 0.98276,-1.56466 1.5,-2.32759 0.51724,-0.76293 0.76293,-1.16379 1.15087,-1.73276 0.38793,-0.56897 0.85344,-1.3319 1.38362,-1.97845 0.53017,-0.64655 0.94396,-1.29311 1.43534,-1.95259 0.49138,-0.65948 0.91811,-1.17672 1.38363,-1.75862 0.65948,-0.81466 1.2931,-1.61638 1.99138,-2.41811 0.69827,-0.80172 1.17672,-1.38362 1.78448,-2.05603 0.25862,-0.29742 0.51724,-0.5819 0.78879,-0.86638 q 1.43535,-1.57759 2.94828,-3.06466 c 0.64656,-0.63362 1.29311,-1.29311 1.97845,-1.84914 0.68535,-0.55604 0.98276,-0.94397 1.5,-1.38362 0.51725,-0.43966 1.42242,-1.18966 2.13363,-1.78449 0.49138,-0.40086 0.96983,-0.81465 1.46121,-1.20258 0.78879,-0.60776 1.60345,-1.17673 2.4181,-1.75863 l 1.29311,-0.93103 c 1.2931,-0.84052 2.5862,-1.64224 3.80172,-2.37931 0.93104,-0.54311 1.86207,-1.03449 2.79311,-1.50001 l 0.78879,-0.38793 c 0.72414,-0.34913 1.43535,-0.67241 2.14656,-0.96983"
     id="path1063"
     style="stroke-width:1.2931" />
  <path
     class="cls-3"
     d="m 125.14674,32.000916 c 20.3664,-11.75432 37.50004,-5.59914 42.41383,13.29312 a 33.426754,33.426754 0 0 1 2.89656,-1.875 c 12.84053,-7.40949 23.50864,-2.84483 25.66812,10.00862 14.79312,-8.56035 26.43106,-1.84913 26.43106,14.61209 0,15.28449 -9.91811,33.620724 -22.81036,42.866414 -1.06035,0.75 -2.06897,1.42242 -3.10346,2.01725 a 30.750027,30.750027 0 0 1 -2.97414,1.51293 l -111.90527,64.8104 c -20.31467,11.72846 -36.82762,2.32759 -36.89227,-21.02588 -0.0647,-23.35347 16.33191,-51.72418 36.64658,-63.478504 0.47845,-0.28449 0.93104,-0.51725 1.39656,-0.77587 4.81035,-24.40088 21.85346,-50.22418 42.18107,-61.96557"
     id="path1065"
     style="stroke-width:1.2931" />
</svg>

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/jupyter_extension/handlers.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Tornado request handlers for the Jupyter MCP Server extension.

This module provides handlers that bridge between Tornado (Jupyter Server) and
FastMCP, managing the MCP protocol lifecycle and request proxying.
"""

import json
import logging
import tornado.web
from typing import Any
from tornado.web import RequestHandler
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.extension.handler import ExtensionHandlerMixin

from jupyter_mcp_server.jupyter_extension.context import get_server_context
from jupyter_mcp_server.jupyter_extension.backends.local_backend import LocalBackend
from jupyter_mcp_server.jupyter_extension.backends.remote_backend import RemoteBackend

logger = logging.getLogger(__name__)


class MCPSSEHandler(RequestHandler):
    """
    Server-Sent Events (SSE) handler for MCP protocol.
    
    This handler implements the MCP SSE transport by directly calling
    the registered MCP tools instead of trying to wrap the Starlette app.
    
    The MCP protocol uses SSE for streaming responses from the server to the client.
    """
    
    def check_xsrf_cookie(self):
        """Disable XSRF checking for MCP protocol requests."""
        pass
    
    def set_default_headers(self):
        """Set headers for SSE and CORS."""
        self.set_header("Content-Type", "text/event-stream")
        self.set_header("Cache-Control", "no-cache")
        self.set_header("Connection", "keep-alive")
        self.set_header("Access-Control-Allow-Origin", "*")
        self.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
    
    async def options(self, *args, **kwargs):
        """Handle CORS preflight requests."""
        self.set_status(204)
        self.finish()
    
    async def get(self):
        """Handle SSE connection establishment."""
        # Import here to avoid circular dependency
        from jupyter_mcp_server.server import mcp
        
        # For now, just acknowledge the connection
        # The actual MCP protocol would be handled via POST
        self.write("event: connected\ndata: {}\n\n")
        await self.flush()
    
    async def post(self):
        """Handle MCP protocol messages."""
        # Import here to avoid circular dependency
        from jupyter_mcp_server.server import mcp
        
        try:
            # Parse the JSON-RPC request
            body = json.loads(self.request.body.decode('utf-8'))
            method = body.get("method")
            params = body.get("params", {})
            request_id = body.get("id")
            
            logger.info(f"MCP request: method={method}, id={request_id}")
            
            # Handle notifications (id is None) - these don't require a response per JSON-RPC 2.0
            # But in HTTP transport, we need to acknowledge the request
            if request_id is None:
                logger.info(f"Received notification: {method} - acknowledging without result")
                # Return empty response - the client should handle notifications without expecting a result
                # Some clients may send this as POST and expect HTTP 200 with no JSON-RPC response
                self.set_status(200)
                self.finish()
                return
            
            # Handle different MCP methods
            if method == "initialize":
                # Return server capabilities
                response = {
                    "jsonrpc": "2.0",
                    "id": request_id,
                    "result": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {
                            "tools": {}
                        },
                        "serverInfo": {
                            "name": "Jupyter MCP Server",
                            "version": "0.14.0"
                        }
                    }
                }
                logger.info(f"Sending initialize response: {response}")
            elif method == "tools/list":
                # List available tools from FastMCP
                from jupyter_mcp_server.server import mcp
                
                logger.info("Calling mcp.list_tools()...")
                
                try:
                    # Use FastMCP's list_tools method - returns list of Tool objects
                    tools_list = await mcp.list_tools()
                    logger.info(f"Got {len(tools_list)} tools from FastMCP")
                    
                    # Convert to MCP protocol format
                    tools = []
                    for tool in tools_list:
                        tools.append({
                            "name": tool.name,
                            "description": tool.description,
                            "inputSchema": tool.inputSchema
                        })
                    
                    logger.info(f"Converted {len(tools)} tools to MCP format")
                    
                    response = {
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "result": {
                            "tools": tools
                        }
                    }
                except Exception as e:
                    logger.error(f"Error listing tools: {e}", exc_info=True)
                    response = {
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "error": {
                            "code": -32603,
                            "message": f"Internal error listing tools: {str(e)}"
                        }
                    }
            elif method == "tools/call":
                # Execute a tool
                from jupyter_mcp_server.server import mcp
                
                tool_name = params.get("name")
                tool_arguments = params.get("arguments", {})
                
                logger.info(f"Calling tool: {tool_name} with args: {tool_arguments}")
                
                try:
                    # Use FastMCP's call_tool method
                    result = await mcp.call_tool(tool_name, tool_arguments)
                    
                    # Handle tuple results from FastMCP
                    if isinstance(result, tuple) and len(result) >= 1:
                        # FastMCP returns (content_list, metadata_dict)
                        content_list = result[0]
                        if isinstance(content_list, list):
                            # Serialize TextContent objects to dicts
                            serialized_content = []
                            for item in content_list:
                                if hasattr(item, 'model_dump'):
                                    serialized_content.append(item.model_dump())
                                elif hasattr(item, 'dict'):
                                    serialized_content.append(item.dict())
                                elif isinstance(item, dict):
                                    serialized_content.append(item)
                                else:
                                    serialized_content.append({"type": "text", "text": str(item)})
                            result_dict = {"content": serialized_content}
                        else:
                            result_dict = {"content": [{"type": "text", "text": str(result)}]}
                    # Convert result to dict - it's a CallToolResult with content list
                    elif hasattr(result, 'model_dump'):
                        result_dict = result.model_dump()
                    elif hasattr(result, 'dict'):
                        result_dict = result.dict()
                    elif hasattr(result, 'content'):
                        # Extract content directly if it has a content attribute
                        result_dict = {"content": result.content}
                    else:
                        # Last resort: check if it's already a string
                        if isinstance(result, str):
                            result_dict = {"content": [{"type": "text", "text": result}]}
                        else:
                            # If it's some other type, try to serialize it
                            result_dict = {"content": [{"type": "text", "text": str(result)}]}
                            logger.warning(f"Used fallback str() conversion for type {type(result)}")
                    
                    logger.info(f"Converted result to dict")

                    response = {
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "result": result_dict
                    }
                except Exception as e:
                    logger.error(f"Error calling tool: {e}", exc_info=True)
                    response = {
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "error": {
                            "code": -32603,
                            "message": f"Internal error calling tool: {str(e)}"
                        }
                    }
            else:
                # Method not supported
                response = {
                    "jsonrpc": "2.0",
                    "id": request_id,
                    "error": {
                        "code": -32601,
                        "message": f"Method not found: {method}"
                    }
                }
            
            # Send response
            self.set_header("Content-Type", "application/json")
            logger.info(f"Sending response: {json.dumps(response)[:200]}...")
            self.write(json.dumps(response))
            self.finish()
            
        except Exception as e:
            logger.error(f"Error handling MCP request: {e}", exc_info=True)
            self.set_status(500)
            self.write(json.dumps({
                "jsonrpc": "2.0",
                "id": body.get("id") if 'body' in locals() else None,
                "error": {
                    "code": -32603,
                    "message": str(e)
                }
            }))
            self.finish()


class MCPHandler(ExtensionHandlerMixin, JupyterHandler):
    """Base handler for MCP endpoints with common functionality."""
    
    def get_backend(self):
        """
        Get the appropriate backend based on configuration.
        
        Returns:
            Backend instance (LocalBackend or RemoteBackend)
        """
        context = get_server_context()
        
        # Check if we should use local backend
        if context.is_local_document() or context.is_local_runtime():
            return LocalBackend(context.serverapp)
        else:
            # Use remote backend
            document_url = self.settings.get("mcp_document_url")
            document_token = self.settings.get("mcp_document_token", "")
            runtime_url = self.settings.get("mcp_runtime_url")
            runtime_token = self.settings.get("mcp_runtime_token", "")
            
            return RemoteBackend(
                document_url=document_url,
                document_token=document_token,
                runtime_url=runtime_url,
                runtime_token=runtime_token
            )
    
    def set_default_headers(self):
        """Set CORS headers for MCP clients."""
        self.set_header("Access-Control-Allow-Origin", "*")
        self.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
    
    def options(self, *args, **kwargs):
        """Handle OPTIONS requests for CORS preflight."""
        self.set_status(204)
        self.finish()


class MCPHealthHandler(MCPHandler):
    """
    Health check endpoint.
    
    GET /mcp/healthz
    """
    
    @tornado.web.authenticated
    def get(self):
        """Handle health check request."""
        context = get_server_context()
        
        health_info = {
            "status": "healthy",
            "context_type": context.context_type,
            "document_url": context.document_url or self.settings.get("mcp_document_url"),
            "runtime_url": context.runtime_url or self.settings.get("mcp_runtime_url"),
            "extension": "jupyter_mcp_server",
            "version": "0.14.0"
        }
        
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps(health_info))
        self.finish()


class MCPToolsListHandler(MCPHandler):
    """
    List available MCP tools.
    
    GET /mcp/tools/list
    """
    
    @tornado.web.authenticated
    async def get(self):
        """Return list of available tools dynamically from the tool registry."""
        # Import here to avoid circular dependency
        from jupyter_mcp_server.server import get_registered_tools
        
        # Get tools dynamically from the MCP server registry
        tools = await get_registered_tools()
        
        response = {
            "tools": tools,
            "count": len(tools)
        }
        
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps(response))
        self.finish()


class MCPToolsCallHandler(MCPHandler):
    """
    Execute an MCP tool.
    
    POST /mcp/tools/call
    Body: {"tool_name": "...", "arguments": {...}}
    """
    
    @tornado.web.authenticated
    async def post(self):
        """Handle tool execution request."""
        try:
            # Parse request body
            body = json.loads(self.request.body.decode('utf-8'))
            tool_name = body.get("tool_name")
            arguments = body.get("arguments", {})
            
            if not tool_name:
                self.set_status(400)
                self.write(json.dumps({"error": "tool_name is required"}))
                self.finish()
                return
            
            logger.info(f"Executing tool: {tool_name} with args: {arguments}")
            
            # Get backend
            backend = self.get_backend()
            
            # Execute tool based on name
            # For now, return a placeholder response
            # TODO: Implement actual tool routing
            result = await self._execute_tool(tool_name, arguments, backend)
            
            response = {
                "success": True,
                "result": result
            }
            
            self.set_header("Content-Type", "application/json")
            self.write(json.dumps(response))
            self.finish()
            
        except Exception as e:
            logger.error(f"Error executing tool: {e}", exc_info=True)
            self.set_status(500)
            self.write(json.dumps({
                "success": False,
                "error": str(e)
            }))
            self.finish()
    
    async def _execute_tool(self, tool_name: str, arguments: dict[str, Any], backend):
        """
        Route tool execution to appropriate implementation.
        
        Args:
            tool_name: Name of tool to execute
            arguments: Tool arguments
            backend: Backend instance
            
        Returns:
            Tool execution result
        """
        # TODO: Implement actual tool routing
        # For now, return a simple response
        
        if tool_name == "list_notebooks":
            notebooks = await backend.list_notebooks()
            return {"notebooks": notebooks}
        
        # Placeholder for other tools
        return f"Tool {tool_name} executed with backend {type(backend).__name__}"

```

--------------------------------------------------------------------------------
/docs/static/img/feature_3.svg:
--------------------------------------------------------------------------------

```
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
  ~ Copyright (c) 2023-2024 Datalayer, Inc.
  ~
  ~ BSD 3-Clause License
-->

<svg
   xmlns:dc="http://purl.org/dc/elements/1.1/"
   xmlns:cc="http://creativecommons.org/ns#"
   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
   xmlns:svg="http://www.w3.org/2000/svg"
   xmlns="http://www.w3.org/2000/svg"
   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
   viewBox="0 0 302.65826 398.12268"
   version="1.1"
   id="svg1054"
   sodipodi:docname="6.svg"
   inkscape:version="1.0.1 (c497b03c, 2020-09-10)"
   width="302.65826"
   height="398.12268">
  <metadata
     id="metadata1058">
    <rdf:RDF>
      <cc:Work
         rdf:about="">
        <dc:format>image/svg+xml</dc:format>
        <dc:type
           rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
        <dc:title>Web_SVG</dc:title>
      </cc:Work>
    </rdf:RDF>
  </metadata>
  <sodipodi:namedview
     pagecolor="#ffffff"
     bordercolor="#666666"
     borderopacity="1"
     objecttolerance="10"
     gridtolerance="10"
     guidetolerance="10"
     inkscape:pageopacity="0"
     inkscape:pageshadow="2"
     inkscape:window-width="1256"
     inkscape:window-height="607"
     id="namedview1056"
     showgrid="false"
     inkscape:zoom="0.83484846"
     inkscape:cx="105.00162"
     inkscape:cy="149.95736"
     inkscape:window-x="0"
     inkscape:window-y="25"
     inkscape:window-maximized="0"
     inkscape:current-layer="svg1054"
     inkscape:document-rotation="0"
     fit-margin-top="0"
     fit-margin-left="0"
     fit-margin-right="0"
     fit-margin-bottom="0" />
  <defs
     id="defs843">
    <style
       id="style833">.cls-1{fill:none;}.cls-2,.cls-9{fill:#d6d8e5;}.cls-2{opacity:0.4;isolation:isolate;}.cls-3{fill:#d5d6e0;}.cls-4{fill:#e9eaf4;}.cls-5{fill:url(#Безымянный_градиент_15);}.cls-6{clip-path:url(#clip-path);}.cls-7{fill:#8c50ff;}.cls-8{opacity:0.05;}.cls-9{opacity:0.4;}.cls-10{fill:#c5c7d3;}.cls-11{fill:#38226d;}.cls-12{fill:#9c73ff;}.cls-13{fill:#ffcea9;}.cls-14{fill:#ededed;}.cls-15{fill:#e5e5e5;}.cls-16{fill:#f4f4f4;}.cls-17{fill:#bfbfbf;}.cls-18{fill:#3a2c6d;}.cls-19{fill:#dceeff;}.cls-20{fill:#dbdbdb;}.cls-21{fill:#1e3779;}.cls-22{fill:#031f60;}</style>
    <linearGradient
       id="Безымянный_градиент_15"
       x1="235.62"
       y1="356.16"
       x2="235.62"
       y2="256.92999"
       gradientUnits="userSpaceOnUse">
      <stop
         offset="0"
         stop-color="#e9eaf4"
         id="stop835" />
      <stop
         offset="0.63"
         stop-color="#e9eaf4"
         stop-opacity="0"
         id="stop837" />
    </linearGradient>
    <clipPath
       id="clip-path">
      <circle
         class="cls-1"
         cx="233.41"
         cy="151.32001"
         r="151.32001"
         id="circle840" />
    </clipPath>
  </defs>
  <title
     id="title845">Web_SVG</title>
  <path
     class="cls-2"
     d="m 146.09827,397.27001 -47.08,-27.18 c -2.19,-1.27 -1.92,-3.48 0.62,-4.94 l 45.84,-26.47 c 2.54,-1.46 6.37,-1.62 8.56,-0.36 l 47.07,27.18 c 2.2,1.27 1.92,3.48 -0.61,4.94 l -45.85,26.47 c -2.53,1.47 -6.36,1.62 -8.55,0.36 z"
     id="path847" />
  <path
     class="cls-3"
     d="m 205.92827,362.37001 -2.32,0.71 -46.23,-26.7 c -2.19,-1.26 -6,-1.1 -8.55,0.36 l -45.85,26.47 -0.11,0.07 -2,-0.61 v 3 c 0,0.11 0,0.22 0,0.34 v 0.24 0 a 2.68,2.68 0 0 0 1.44,1.86 l 47.08,27.18 c 2.19,1.26 6,1.11 8.56,-0.36 l 45.98,-26.43 a 3.62,3.62 0 0 0 2.08,-2.64 v 0 z"
     id="path851" />
  <path
     class="cls-4"
     d="m 149.47827,392.14001 -47.08,-27.14 c -2.19,-1.26 -1.91,-3.47 0.62,-4.93 l 45.85,-26.47 c 2.53,-1.47 6.36,-1.63 8.55,-0.36 l 47.08,27.18 c 2.19,1.27 1.92,3.48 -0.62,4.94 l -45.84,26.47 c -2.54,1.42 -6.37,1.58 -8.56,0.31 z"
     id="path853" />
  <path
     class="cls-3"
     d="m 173.30827,360.25001 v -4 l -5.7,1.5 -12.68,-7.35 a 3.58,3.58 0 0 0 -3.24,0.14 l -12.33,7.12 -5.83,-1.54 v 4.31 0 1 a 0.48,0.48 0 0 0 0,0.12 v 0.1 0 a 1,1 0 0 0 0.55,0.7 l 17.85,10.39 a 3.61,3.61 0 0 0 3.24,-0.13 l 17.38,-10 a 1.36,1.36 0 0 0 0.78,-1 v 0 -1.32 z"
     id="path855" />
  <path
     class="cls-4"
     d="m 151.92827,367.33001 -17.87,-10.33 c -0.83,-0.48 -0.73,-1.32 0.23,-1.87 l 17.38,-10 a 3.58,3.58 0 0 1 3.26,-0.13 l 17.84,10.3 c 0.83,0.48 0.73,1.32 -0.23,1.88 l -17.38,10 a 3.61,3.61 0 0 1 -3.23,0.15 z"
     id="path857" />
  <polygon
     class="cls-5"
     points="129.08,260.92 342.17,256.93 235.48,356.16 "
     id="polygon859"
     style="fill:url(#%D0%91%D0%B5%D0%B7%D1%8B%D0%BC%D1%8F%D0%BD%D0%BD%D1%8B%D0%B9_%D0%B3%D1%80%D0%B0%D0%B4%D0%B8%D0%B5%D0%BD%D1%82_15)"
     transform="translate(-82.071732)" />
  <circle
     class="cls-4"
     cx="151.33827"
     cy="151.32001"
     r="151.32001"
     id="circle861" />
  <g
     class="cls-6"
     clip-path="url(#clip-path)"
     id="g865"
     transform="translate(-82.071732)">
    <path
       class="cls-7"
       d="m 193.61,61.31 0.51,-1.79 v -3.88 a 9.1,9.1 0 0 1 -2.17,0 c -0.38,-0.17 -0.12,-1.23 -0.12,-1.23 l 1.78,-1.53 -3.57,0.76 -1.28,2 -4,2.47 1.79,1 1.15,0.76 1.72,0.26 V 59 c 0,0 2,-0.51 2.49,0.25 l -0.25,1.41 z m -44.22,55.25 a 4,4 0 0 0 -3.28,0 2.7,2.7 0 0 1 -2.56,2.73 l 2.92,1.63 2.92,-1.09 3.82,-1.27 3.27,2.36 c 2.46,-3 0,-4.73 0,-4.73 z m 62.1,55.63 -2.83,-0.6 -4.92,-3.13 h -4.32 l -3.73,-1.93 h -1.49 c 0,0 0.3,-2.09 -0.15,-2.24 -0.45,-0.15 -3.72,-2.09 -3.72,-2.09 l -2.54,0.9 v -1.79 L 186,160.56 h -1.49 l 0.15,-1.49 c 0,0 1.34,-2.65 0.6,-3.11 -0.74,-0.46 -3.43,-6.43 -3.43,-6.43 l -4,-2.24 h -4.32 l -3.73,-3.57 -3.72,-3 -2.09,-4.17 -2.24,-1.49 c 0,0 -2.53,1.64 -3.13,1.49 -0.6,-0.15 -4.32,-1.49 -4.32,-1.49 l -3.13,-0.89 -3.91,1.83 v 4 l -1.49,-1 v -1.5 l 1.34,-2.53 0.6,-1.79 -1.34,-0.59 -1.79,2.08 -4.47,1.33 -0.9,1.94 -2.09,1.94 -0.59,1.64 -3.28,-2.68 a 25.86,25.86 0 0 1 -3.43,1.49 22.67,22.67 0 0 1 -3.43,-2.24 l -1.64,-3.13 0.9,-4.17 0.74,-4.33 -2.83,-1.79 h -3.88 l -3,-0.89 1.4,-2.24 1.64,-3.28 1.49,-2.38 c 0,0 0.42,-1.09 0.9,-1.34 1.9,-1 0,-1.94 0,-1.94 0,0 -4,-0.3 -4.62,0 -0.62,0.3 -2.24,1.19 -2.24,1.19 v 3.43 l -3,1.34 -4.62,1 -3.13,-3.14 c -0.45,-0.44 -1.49,-5.36 -1.49,-5.36 l 1.93,-1.79 0.9,-1.49 0.15,-3.58 1.93,-1.64 0.75,-1.94 2.22,-1.62 2.13,-1.95 1.94,-0.59 2.68,0.59 c 0.15,-1 5.07,-0.59 5.07,-0.59 l 1.94,-0.1 -0.15,-1.09 c 0.9,-0.75 3.88,0 3.88,0 l 4,-0.34 c 0.88,0.19 1.62,2 1.62,2 l -1.79,1.64 1.2,3.57 1.19,1.64 -0.45,1.64 2.87,-0.42 0.14,-2.84 0.15,-3.43 -0.44,-3.76 2.68,-5.78 10.88,-7 -0.15,-2.54 0.74,-3.65 1.94,0.67 3.43,-4.07 4.47,-1.48 2.68,-1.82 1.34,-2.69 4.92,-1.49 4.92,-2.53 -2.26,2.41 -0.3,2.09 2.68,-0.6 2.39,-1.34 2.08,-0.59 1.94,-0.6 0.6,-1.34 h -1.34 l -2.09,-0.75 c -1.34,-1 -3,-2.23 -3,-2.23 a 1.66,1.66 0 0 1 0.44,-1.35 l 1.35,-1.21 c 0,0 0.59,-0.72 0.15,-0.72 -0.44,0 -3.73,-0.72 -3.73,-0.72 l -3.58,1.44 H 168 l 2.54,-2.21 h 2.83 l 3.13,-0.75 h 6.5 l 3.58,-0.75 2.55,-1.37 3.13,-0.32 0.9,-3.35 -2.09,-1.79 -3,-1.55 -0.6,-3.52 -1.64,-3.58 h -1.49 l -0.15,1.94 h -2.38 l -0.6,1.19 -0.89,0.6 -1.49,0.3 -1.35,-1.35 1.79,-1.93 -2.08,-1.94 -2.54,-1.34 H 172 l -1.94,0.59 -2.68,3.13 -2.09,1.79 0.6,1.64 -0.76,2.49 0.45,1 -2.54,0.9 -2.83,0.59 -1.19,-0.3 -0.9,1.2 1.64,3.35 -1.64,1.12 h -2.23 l -1.89,-2.43 -0.45,-1.38 1.34,-0.74 -1,-1.79 -3.13,-0.15 -4.17,-2.59 h -1.94 l -0.45,-1.59 -1.34,-0.59 1.94,-2.24 2.83,-1.64 4.92,-1.79 1.79,-0.59 3.87,-0.9 2.6,-1.44 2.38,-2.09 h 3.13 l 3.73,-0.94 h 2.39 l 1,-2.78 h -4.77 l -1.79,1.37 h -3.13 a 5.39,5.39 0 0 0 -2.39,-1.37 v -1.1 L 163,24.6 165.13,23 a 12.52,12.52 0 0 0 1.79,-3.13 l 4.32,1.49 3.27,-0.17 2.25,-0.13 2.83,0.15 1.64,0.6 C 180.78,23 179,22.11 179,22.11 l -5.37,0.59 -2.39,1.3 0.75,-1.64 -2.54,0.15 -3,1.93 2.09,1.05 h 4.92 l 2.68,1 2.24,-0.89 2.38,0.89 2.84,0.9 -3.43,1.07 -3,1.41 -3,0.65 -2.38,0.89 1,2.09 6.26,-0.15 3.95,2.43 h 3.28 l 0.15,-1.34 0.74,-1.34 1.5,-0.3 -1.2,-1.64 -0.44,-1.24 h 1.78 l 1.5,1.09 c 1.19,0.9 3.72,-1.09 3.72,-1.09 l 3.73,-1.41 -4.62,-1.37 -1.49,-1.49 c 0.44,-0.9 -4.92,-2.24 -4.92,-2.24 l -4.18,-2.53 -1,-1.05 H 178 l 2.83,-1.19 7,0.74 3.58,-0.74 8.2,-2.09 5.51,-1.34 4,-1.49 -4.63,-0.72 c 0,0 -3.28,0.9 -3.73,0.9 -0.45,0 -4.32,0.3 -4.77,0.3 h -10.44 l -3.27,0.74 3,0.75 -1.79,1 h -3.13 l -1.79,0.6 2.38,-1.79 -2.53,-0.6 -3.13,0.75 -3.88,2.23 1.79,0.6 2.54,-1 2.08,0.44 -2.23,0.6 -0.9,1.19 1.49,0.45 -4,0.3 -7,-1.19 -2.83,3.42 -5.67,4 1.49,1.19 -1,0.63 -1.34,0.71 -3.88,-0.71 -5.36,1.41 -9.84,2.43 -32.89,31.65 -13.12,17.1 -7.6,45.62 13.41,-4.47 2.09,2.23 3.58,0.6 2.39,1.49 2.53,-1 2.09,2.09 4.62,2.53 4.47,0.6 1.34,4.47 c 0,0 3.13,2.83 3,3.43 -0.13,0.6 1,3.28 1.64,3.28 a 30.41,30.41 0 0 1 4.33,1.94 l 2.53,0.29 1,-2.23 h 2.24 v 2.23 l 0.9,1.2 0.29,3.42 0.9,2.54 -1.49,1.64 -2.39,3 -3,4.15 0.3,4 -0.75,2.68 2.24,0.75 -1.94,2.68 a 32.55,32.55 0 0 1 1,3.43 30.41,30.41 0 0 0 1.64,3.73 l 2.09,1.79 0.9,3 5.07,10.43 3.57,3 5.22,2.68 c 0,0 3.28,3.58 3.43,4 0.15,0.42 1.79,7 1.79,7 l -0.07,3.68 1,3.87 0.3,4.33 0.45,6 1.49,4.77 1.34,6.26 -1.64,2.09 1,3.09 1.2,4.51 2.08,2.24 -0.14,3.58 0.44,3.57 9.25,9.84 1.78,1.94 v -4.62 l 3,-1.73 V 260 l -3,-1.34 v -2.09 l 1.47,-1.34 1.48,-2.39 v -1.94 c 1.23,-0.44 0,-1.79 0,-1.79 h -2.05 l -0.15,-1.93 h 1.94 l 0.8,1.19 1.58,-1.19 -1.19,-2.39 c 0.3,-0.74 2.39,0 2.39,0 l 3.57,-2.09 1.49,-1.78 v -2.39 l -1.7,-1.53 -1.5,-2.39 h 1.79 c 0,0 2.69,1.35 3.58,0 0.89,-1.35 2.24,-3.13 2.24,-3.13 l 1.64,-3.87 c 0,0 3.13,-5.67 3.43,-6.12 0.3,-0.45 0,-6 0,-6 l 4.92,-4 h 3.28 l 2.68,-4.48 2.09,-2.94 v -5 l 0.74,-4.77 -1.17,-4.66 4.17,-5.52 3.13,-3 -1.19,-5.51 z m 33.22,-117.14 1,0.59 1.36,-1.18 H 249 l 1.78,-0.6 0.8,-1.1 -0.38,-1.52 0.76,-0.81 c 0.09,-1.23 -1.86,-2.12 -1.86,-2.12 H 249 l -1.36,1.57 h -1.44 l -0.51,1.78 0.21,1.34 -1.19,0.74 z m 9,-11.27 -1.52,0.68 v 2.29 l 0.17,0.59 a 6.36,6.36 0 0 1 0.17,1.44 l 1.1,0.76 1.1,0.68 0.51,1 c 0,0 -0.17,1 -0.76,1 l -1.36,0.43 0.51,0.93 c 0,0 -1,0.51 -1.1,0.93 l 0.93,0.68 -0.85,0.85 h -1.86 v 1 h 3.9 l 1.61,-1 2.37,0.42 c 0,0 1.95,-0.42 2.37,-0.42 l -0.34,-0.43 0.6,-0.76 0.93,-1.1 -1.19,-0.48 -1.35,-0.42 0.17,-1.44 -1.28,-1 -1.94,-1.06 -0.17,-1.95 0.17,-1.69 h -2 l -0.42,-0.85 1.18,-1.1 z m 11.1,7.29 v -0.64 h 1.36 l 0.76,-0.55 v -2.12 l 1,-0.51 -0.34,-1.69 L 266,45 263.63,45.76 V 47 l -1.44,0.17 -0.34,1.14 1,0.47 -1.61,0.93 -0.43,0.72 1.36,1.23 z m 85.4,51.37 5.6,3 2.89,-1.12 1.92,2.72 5.13,1.13 2.24,1.12 6.25,-16.5 L 372.45,84 344.45,38.18 336.09,32 H 326 l -4.49,0.64 -1.12,2.25 L 318,34 316.87,32 h -2.25 l 1,1 0.81,1.61 h -3.53 v 1.99 l -2.52,-0.37 -1.53,-0.23 -0.86,0.43 0.8,0.49 1.11,0.25 -1,0.37 -1.34,-0.54 -1.67,-0.56 -0.5,-1.11 -1.23,-1 -1.42,-0.19 -1,-1.17 2.18,0.19 1.92,0.86 1.6,0.19 c 0,0 1,-0.25 1.11,-0.07 a 12.12,12.12 0 0 0 2.35,0.5 l 1.3,-1 L 310,32 306.1,30.85 c 0,0 -6.55,-0.9 -6.86,-0.88 a 28.82,28.82 0 0 1 -2.78,-0.85 l -2.28,-0.56 h -5 l -7.6,1.41 -4.2,2.67 -5,4.51 c 0,0 -4.32,0.62 -4.57,0.68 a 20.44,20.44 0 0 0 -2,1.91 l 1,2.17 -0.25,1 0.68,0.55 0.5,1 0.68,1 2.53,-0.37 2.72,-1.54 2.1,1.11 1.61,2.78 0.3,1.67 1.92,0.37 1.3,-1.11 1.36,-0.31 0.74,-2.23 0.12,-1.73 1.86,-0.55 v -1.14 l -0.68,-0.49 -0.87,-0.56 -0.31,-2 2.83,-1.79 c 0,0 1.5,-1.36 1.5,-1.6 0,-0.24 0.74,-1.06 0.74,-1.06 0,0 1.18,0.07 1.55,0.13 0.37,0.06 2.34,0.74 2.34,0.74 l 0.75,0.56 -2.29,1.11 -1.35,1.36 -1,0.68 0.93,2.16 2.41,1.24 2.16,-0.44 3.43,-0.17 c 0,0 3.93,0.79 4.11,0.92 0.18,0.13 -1.13,0.55 -1.13,0.55 l -1.53,0.43 -0.86,0.56 -1.46,-0.74 a 3.78,3.78 0 0 0 -2,-0.68 4.09,4.09 0 0 0 -1.67,0.74 l 0.93,1.42 1.29,0.87 c 0,0 -0.55,1.6 -0.92,1.54 -0.37,-0.06 -1,-1.17 -1,-1.17 l -1.43,-0.56 -0.92,0.56 0.12,1.36 -0.4,1.92 -1.54,1 -1.12,0.87 -1.48,-0.87 -1.91,0.56 -3.34,-0.19 -0.74,0.68 -1.18,-0.49 c 0,0 -2.9,0.12 -3.09,0.18 a 10.62,10.62 0 0 1 -1.91,-0.86 l -0.74,-1.67 0.92,-1.11 -0.43,-0.5 -0.37,-0.92 0.62,-0.75 -0.93,-0.24 -1.54,1.54 -0.07,1.49 c 0,0 0.81,1.42 0.87,1.6 a 10.91,10.91 0 0 1 0.06,1.3 l -1.54,0.62 -1.67,0.49 0.31,0.62 1.73,0.31 -0.56,1 -1,0.06 -0.49,1.24 -0.31,0.68 -1.36,-0.37 0.06,-0.87 0.06,-1.11 -0.55,-0.56 -0.81,0.87 -0.86,1.11 -2,0.37 -0.93,0.74 -0.62,1.36 -1.48,0.68 -1.17,0.74 -1.8,-0.92 -0.24,0.43 0.37,0.92 -0.43,0.5 -2.35,-0.19 -1.3,-0.18 -0.19,1.73 2.29,0.37 1.11,1 c 0,0 0.87,1.05 1.05,1.05 0.18,0 1.43,0.31 1.43,0.31 l -0.5,2.72 -1.11,2.53 -4.64,-0.06 -5.06,-0.12 -1.61,0.61 -0.5,1.42 0.44,1.49 -0.9,2.79 -0.37,1.49 A 7,7 0 0 0 245,79 c 0.13,0.18 0.87,1.48 0.87,1.48 l -0.56,2.1 1.67,0.5 2.78,0.18 1.48,0.81 1.12,0.18 1.85,-0.8 2.78,-0.37 2.6,-2.17 -0.43,-2.65 1.6,-2 3.4,-2.16 1.48,-0.5 -0.18,-2.22 2,-1.18 2,0.68 1.36,-0.74 4.18,-1.28 4.14,3.84 5.68,3.89 c 0,0 1.18,2.35 1.24,2.53 a 7.8,7.8 0 0 1 -0.87,1.61 H 282 L 280.8,81 c 0,0 0.37,0.92 0.61,0.92 0.24,0 2.79,1.18 2.79,1.18 l 0.8,0.62 0.43,-0.13 -0.06,-1.17 0.25,-1.11 0.86,0.12 a 7.77,7.77 0 0 0 0.93,-1.24 17.93,17.93 0 0 1 0.86,-1.6 l -0.68,-1.3 0.38,-0.87 1,0.13 0.12,1.29 h 0.37 l -0.12,-1.6 -1,-0.62 -1.34,-0.89 -0.93,-1.35 -1.61,-0.44 -1.85,-1.3 -0.43,-1.48 -1.73,-0.86 -0.45,-1.36 0.5,-1.42 1.11,-0.19 0.06,1.36 1.11,0.37 0.75,-0.43 1.11,2 2.28,1.29 1.8,1 1.11,0.13 1.85,2 0.44,2.59 3.46,3.59 0.49,2.53 2.47,1.73 0.68,-0.56 -0.37,-1.6 1.36,-0.74 -0.43,-1.12 -1.37,-0.47 -0.56,-1.11 0.25,-1.8 -0.8,-0.92 0.74,0.12 0.55,0.68 0.87,-1.17 1.48,0.06 1.49,0.62 1.11,1.52 1.11,1.73 c 0,0 -0.74,0.43 -0.8,0.62 a 5.76,5.76 0 0 0 0.62,1.36 l 1.42,1.29 1.11,1.56 2.22,-0.06 0.25,1 1.61,0.18 1.54,-0.37 0.44,-1 0.68,0.93 1.79,0.62 1.54,-0.12 1.3,-1.43 1.17,0.5 0.87,0.55 -0.31,1.24 0.77,2.54 -1.12,2.57 v 3.36 H 313 l -2.56,1 -4.15,-2.07 -2.72,0.32 -4.65,-1.68 h -3.68 l -1.12,1 0.64,1.76 -1.44,1.44 -3.69,-2.24 -2.88,0.32 -1.12,-1.92 -4.33,-1.28 -3,-1.45 -2.3,-1.13 2.72,-3.2 -0.8,-2.72 -1.92,-1.13 -11.38,1.29 -4.48,1.76 -2.89,0.16 -3,1.12 -3.68,-0.8 -2.41,2.88 -2.88,1.92 -2.6,5.45 0.68,2.4 -3.36,3.21 -3.36,1.28 -1.6,3 c 0,0 -3.53,7.85 -3.85,8.49 -0.32,0.64 0,4.65 0,4.65 l 1.44,4.64 -3.36,7.85 1.28,3.85 1.76,3.52 4.65,6.57 -0.14,3.3 9.58,7.29 6.42,-2.58 6.18,1.96 7.68,-4.16 c 0,0 2.57,0 3.37,1.44 0.8,1.44 1.28,2.72 4.16,2.72 2.88,0 4.33,3.21 4.33,3.21 l -1.28,3.68 0.64,1.92 -1.28,3 3,6.24 2.24,4.81 2.09,4.65 0.48,5.92 -0.48,3.37 -3.69,6.41 1.12,9.93 1.28,2.24 -0.48,1.6 2.25,2.41 1.44,5.6 -1.28,5.29 1.92,4 c 0,0 1.76,4.33 1.92,4.81 0.16,0.48 1.28,2.08 1.12,2.88 -0.16,0.8 -0.64,2.89 -0.64,2.89 l 0.64,3.2 h 8.17 l 3.85,-1.28 9.45,-9.61 a 33.38,33.38 0 0 0 2.4,-3.21 c 0.16,-0.48 1.44,-6.4 1.44,-6.4 l 6.73,-2.73 v -5.23 l -1.44,-4.17 3.52,-3.36 9.46,-5.45 0.8,-4.17 v -7.2 l -1.77,-5.45 0.48,-3.36 -1.64,-1.45 3.05,-4.48 1.12,-5.93 4.48,-2.72 1,-2.73 8.17,-8.47 2.57,-8.06 2.4,-4.45 v -4.81 l -2.24,-0.8 -2.57,2.08 -3.52,0.32 -3.68,1 -3.05,-1.93 0.32,-2.4 -3.84,-4.32 -3.85,-2.25 -1.76,-5.76 -3.76,-3.58 -0.32,-4.48 -2.73,-4 -3.36,-7.52 -3.53,-3.52 -0.32,-2.89 1.45,0.65 1.6,1.28 0.64,2.4 c 1.28,0 1.76,-2.4 1.76,-2.4 l 1.12,2.4 1.12,3 3.53,5.29 1.92,0.32 2.24,4.65 -0.16,2.08 6.41,6.57 1.12,6.08 1.12,2.73 2.09,2.08 9.61,-2.56 1.76,-1.61 4.16,-1.28 v -1.92 l 4.81,-3.2 2,-3.72 1.16,-1.89 1.61,-2.24 -2.65,-3.65 -3.7,-3.13 -1.18,-3 -1.36,0.32 -1,2.88 -3.68,1 -1.92,-1.28 -0.64,-2.72 -1.28,0.32 -1.93,-3.53 -1.76,-1 -2.24,-3.68 1.6,-1.77 z m -75,-23.58 0.34,-1.53 v -2.2 a 2.47,2.47 0 0 0 -1.23,-0.59 1.34,1.34 0 0 0 -0.89,0.34 v 2.45 a 4.55,4.55 0 0 1 0,1.53 z m 0,-6.61 a 1,1 0 0 0 -1.19,-1.1 l -0.59,1.69 0.59,1.27 h 1.19 z m -43.7,-43.74 h 6 l 3.27,-4.71 h 7.53 l 1.21,-3 4.63,-2.45 c 2.19,-1.91 0,-5.37 0,-5.37 l -9.44,-1.18 -9.38,1.2 -13.64,-2.54 -8.41,3.82 c 0,0 -6.05,3.27 -6.87,3 a 29.17,29.17 0 0 0 -4.91,0 c 0,0 3,2.72 4.36,3 a 9.87,9.87 0 0 0 3,0 c 0,0 0,2.66 0.55,3.27 1.91,2.19 -2,3.82 -2,3.82 v 2.58 c 0,0 0.43,4.87 1.24,4.65 0.81,-0.22 2.14,2.59 2.14,2.59 l 2.51,2.73 h 4.59 l 5.73,-6.82 5.64,-2.45 z m 117.85,163.14 -0.74,-2 -0.85,0.84 -0.74,1.7 -1.69,1 -0.74,2.23 -1.06,2.64 -4.13,1.27 -1.8,3.07 -0.42,5.08 -1.48,2.52 -1.69,1.8 0.63,3 1.17,1.38 -1.17,1.58 1.17,1 h 3.91 l 2.22,-3.28 c 0,0 1.8,-3.81 1.8,-4.13 0,-0.32 0.85,-3.81 0.85,-3.81 l 3.06,-4 V 198 h 2.12 l -0.63,-3.92 z m -213.61,-81.15 -4.64,-1.58 c 0,0 -1.29,0.18 -2.8,0.54 a 16.83,16.83 0 0 0 -3.24,1 l 6.58,2.24 h 2.66 l 4.44,3.82 h 3 c 0,0 2.45,-1.09 1.63,-3 l -3.81,-1.37 z"
       id="path863" />
  </g>
  <path
     class="cls-8"
     d="M 166.04827,269.00001 A 151.41,151.41 0 0 1 21.048268,74.340006 151.34,151.34 0 1 0 296.34827,194.65001 a 151.23,151.23 0 0 1 -130.3,74.35 z"
     id="path867" />
  <ellipse
     class="cls-3"
     cx="153.54826"
     cy="356.16"
     rx="4.0900002"
     ry="2.3599999"
     id="ellipse1050" />
</svg>

```

--------------------------------------------------------------------------------
/jupyter_mcp_server/tools/execute_cell_tool.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""Unified execute cell tool with configurable streaming."""

import asyncio
import logging
import time
from pathlib import Path
from typing import Union, List
from mcp.types import ImageContent

from jupyter_mcp_server.tools._base import BaseTool, ServerMode
from jupyter_mcp_server.config import get_config
from jupyter_mcp_server.utils import get_current_notebook_context, execute_via_execution_stack, safe_extract_outputs

logger = logging.getLogger(__name__)


class ExecuteCellTool(BaseTool):
    """Execute a cell with configurable timeout and optional streaming progress updates.

    Supports both MCP_SERVER (with WebSocket) and JUPYTER_SERVER modes.
    The stream parameter controls execution behavior:
    - stream=False: Use forced sync approach (more reliable for short-running cells)
    - stream=True: Use real-time monitoring approach (better for long-running cells)
    """

    @property
    def name(self) -> str:
        return "execute_cell"

    @property
    def description(self) -> str:
        return "Execute a cell with configurable timeout and optional streaming progress updates"

    async def _get_jupyter_ydoc(self, serverapp, file_id: str):
        """Get the YNotebook document if it's currently open in a collaborative session."""
        try:
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            if yroom_manager is None:
                return None

            room_id = f"json:notebook:{file_id}"

            if yroom_manager.has_room(room_id):
                yroom = yroom_manager.get_room(room_id)
                notebook = await yroom.get_jupyter_ydoc()
                return notebook
        except Exception:
            pass

        return None

    async def _write_outputs_to_cell(
        self,
        notebook_path: str,
        cell_index: int,
        outputs: List[Union[str, ImageContent]]
    ):
        """Write execution outputs back to a notebook cell."""
        import nbformat
        from jupyter_mcp_server.utils import _clean_notebook_outputs

        with open(notebook_path, 'r', encoding='utf-8') as f:
            notebook = nbformat.read(f, as_version=4)

        _clean_notebook_outputs(notebook)

        if cell_index < 0 or cell_index >= len(notebook.cells):
            logger.warning(f"Cell index {cell_index} out of range, cannot write outputs")
            return

        cell = notebook.cells[cell_index]
        if cell.cell_type != 'code':
            logger.warning(f"Cell {cell_index} is not a code cell, cannot write outputs")
            return

        # Convert formatted outputs to nbformat structure
        cell.outputs = []
        for output in outputs:
            if isinstance(output, ImageContent):
                cell.outputs.append(nbformat.v4.new_output(
                    output_type='display_data',
                    data={output.mimeType: output.data},
                    metadata={}
                ))
            elif isinstance(output, str):
                if output.startswith('[ERROR:') or output.startswith('[TIMEOUT ERROR:') or output.startswith('[PROGRESS:'):
                    cell.outputs.append(nbformat.v4.new_output(
                        output_type='stream',
                        name='stdout',
                        text=output
                    ))
                else:
                    cell.outputs.append(nbformat.v4.new_output(
                        output_type='execute_result',
                        data={'text/plain': output},
                        metadata={},
                        execution_count=None
                    ))

        # Update execution count
        max_count = 0
        for c in notebook.cells:
            if c.cell_type == 'code' and c.execution_count:
                max_count = max(max_count, c.execution_count)
        cell.execution_count = max_count + 1

        with open(notebook_path, 'w', encoding='utf-8') as f:
            nbformat.write(notebook, f)

        logger.info(f"Wrote {len(outputs)} outputs to cell {cell_index} in {notebook_path}")

    async def execute(
        self,
        mode: ServerMode,
        server_client=None,
        contents_manager=None,
        kernel_manager=None,
        kernel_spec_manager=None,
        notebook_manager=None,
        serverapp=None,
        # Tool-specific parameters
        cell_index: int = None,
        timeout_seconds: int = 300,
        stream: bool = False,
        progress_interval: int = 5,
        ensure_kernel_alive_fn=None,
        wait_for_kernel_idle_fn=None,
        safe_extract_outputs_fn=None,
        execute_cell_with_forced_sync_fn=None,
        extract_output_fn=None,
        **kwargs
    ) -> List[Union[str, ImageContent]]:
        """Execute a cell with configurable timeout and optional streaming progress updates.

        Args:
            mode: Server mode (MCP_SERVER or JUPYTER_SERVER)
            serverapp: ServerApp instance for JUPYTER_SERVER mode
            kernel_manager: Kernel manager for JUPYTER_SERVER mode
            notebook_manager: Notebook manager for MCP_SERVER mode
            cell_index: Index of the cell to execute (0-based)
            timeout_seconds: Maximum time to wait for execution (default: 300s)
            stream: Enable streaming progress updates for long-running cells (default: False)
            progress_interval: Seconds between progress updates when stream=True (default: 5s)
            ensure_kernel_alive_fn: Function to ensure kernel is alive (MCP_SERVER)
            wait_for_kernel_idle_fn: Function to wait for kernel idle state (MCP_SERVER)
            safe_extract_outputs_fn: Function to safely extract outputs (MCP_SERVER)
            execute_cell_with_forced_sync_fn: Function to execute cell with forced sync (MCP_SERVER, stream=False)
            extract_output_fn: Function to extract single output (MCP_SERVER, stream=True)

        Returns:
            List of outputs from the executed cell
        """
        if mode == ServerMode.JUPYTER_SERVER:
            # JUPYTER_SERVER mode: Use ExecutionStack with YDoc awareness
            from jupyter_mcp_server.jupyter_extension.context import get_server_context

            context = get_server_context()
            serverapp = context.serverapp

            if serverapp is None:
                raise ValueError("serverapp is required for JUPYTER_SERVER mode")
            if kernel_manager is None:
                raise ValueError("kernel_manager is required for JUPYTER_SERVER mode")

            notebook_path, kernel_id = get_current_notebook_context(notebook_manager)

            # Check if kernel needs to be started
            if kernel_id is None:
                # No kernel available - start a new one on demand
                logger.info("No kernel_id available, starting new kernel for execute_cell")
                kernel_id = await kernel_manager.start_kernel()

                # Wait a bit for kernel to initialize
                await asyncio.sleep(1.0)
                logger.info(f"Kernel {kernel_id} started and initialized")

                # Store the kernel in notebook_manager if available
                if notebook_manager is not None:
                    kernel_info = {"id": kernel_id}
                    notebook_manager.add_notebook(
                        name=notebook_path,
                        kernel=kernel_info,
                        server_url="local",
                        path=notebook_path
                    )

            logger.info(f"Executing cell {cell_index} in JUPYTER_SERVER mode (timeout: {timeout_seconds}s)")

            # Resolve to absolute path
            if serverapp and not Path(notebook_path).is_absolute():
                root_dir = serverapp.root_dir
                notebook_path = str(Path(root_dir) / notebook_path)

            # Get file_id from file_id_manager
            file_id_manager = serverapp.web_app.settings.get("file_id_manager")
            if file_id_manager is None:
                raise RuntimeError("file_id_manager not available in serverapp")

            file_id = file_id_manager.get_id(notebook_path)
            if file_id is None:
                file_id = file_id_manager.index(notebook_path)

            # Try to get YDoc if notebook is open
            ydoc = await self._get_jupyter_ydoc(serverapp, file_id)

            if ydoc:
                # Notebook is open - use YDoc and RTC
                logger.info(f"Notebook {file_id} is open, using RTC mode")

                if cell_index < 0 or cell_index >= len(ydoc.ycells):
                    raise ValueError(f"Cell index {cell_index} out of range")

                cell_id = ydoc.ycells[cell_index].get("id")
                cell_source = ydoc.ycells[cell_index].get("source")

                if not cell_source or not cell_source.to_py().strip():
                    return []

                code_to_execute = cell_source.to_py()
                document_id = f"json:notebook:{file_id}"

                # Execute with RTC metadata - outputs will sync automatically
                outputs = await execute_via_execution_stack(
                    serverapp=serverapp,
                    kernel_id=kernel_id,
                    code=code_to_execute,
                    document_id=document_id,
                    cell_id=cell_id,
                    timeout=timeout_seconds
                )

                return safe_extract_outputs(outputs)
            else:
                # Notebook not open - use file-based approach
                logger.info(f"Notebook {file_id} not open, using file mode")

                import nbformat
                with open(notebook_path, 'r', encoding='utf-8') as f:
                    notebook = nbformat.read(f, as_version=4)

                if cell_index < 0 or cell_index >= len(notebook.cells):
                    raise ValueError(f"Cell index {cell_index} out of range")

                cell = notebook.cells[cell_index]
                if cell.cell_type != 'code':
                    raise ValueError(f"Cell {cell_index} is not a code cell")

                code_to_execute = cell.source
                if not code_to_execute.strip():
                    return []

                # Execute without RTC metadata
                outputs = await execute_via_execution_stack(
                    serverapp=serverapp,
                    kernel_id=kernel_id,
                    code=code_to_execute,
                    timeout=timeout_seconds
                )

                # Write outputs back to file
                await self._write_outputs_to_cell(notebook_path, cell_index, outputs)

                return safe_extract_outputs(outputs)

        elif mode == ServerMode.MCP_SERVER:
            # MCP_SERVER mode: Use WebSocket with configurable execution approach
            if ensure_kernel_alive_fn is None:
                raise ValueError("ensure_kernel_alive_fn is required for MCP_SERVER mode")
            if wait_for_kernel_idle_fn is None:
                raise ValueError("wait_for_kernel_idle_fn is required for MCP_SERVER mode")
            if notebook_manager is None:
                raise ValueError("notebook_manager is required for MCP_SERVER mode")

            # Validate function dependencies based on stream mode
            if not stream:
                if safe_extract_outputs_fn is None:
                    raise ValueError("safe_extract_outputs_fn is required for MCP_SERVER mode when stream=False")
                if execute_cell_with_forced_sync_fn is None:
                    raise ValueError("execute_cell_with_forced_sync_fn is required for MCP_SERVER mode when stream=False")
            else:
                if extract_output_fn is None:
                    raise ValueError("extract_output_fn is required for MCP_SERVER mode when stream=True")

            kernel = ensure_kernel_alive_fn()
            await wait_for_kernel_idle_fn(kernel, max_wait_seconds=30)

            async with notebook_manager.get_current_connection() as notebook:
                if cell_index < 0 or cell_index >= len(notebook):
                    raise ValueError(f"Cell index {cell_index} out of range")

                if stream:
                    # Streaming mode: Real-time monitoring with progress updates
                    logger.info(f"Executing cell {cell_index} in streaming mode (timeout: {timeout_seconds}s, interval: {progress_interval}s)")

                    outputs_log = []

                    # Start execution in background
                    execution_task = asyncio.create_task(
                        asyncio.to_thread(notebook.execute_cell, cell_index, kernel)
                    )

                    start_time = time.time()
                    last_output_count = 0

                    # Monitor progress
                    while not execution_task.done():
                        elapsed = time.time() - start_time

                        # Check timeout
                        if elapsed > timeout_seconds:
                            execution_task.cancel()
                            outputs_log.append(f"[TIMEOUT at {elapsed:.1f}s: Cancelling execution]")
                            try:
                                kernel.interrupt()
                                outputs_log.append("[Sent interrupt signal to kernel]")
                            except Exception:
                                pass
                            break

                        # Check for new outputs
                        try:
                            current_outputs = notebook[cell_index].get("outputs", [])
                            if len(current_outputs) > last_output_count:
                                new_outputs = current_outputs[last_output_count:]
                                for output in new_outputs:
                                    extracted = extract_output_fn(output)
                                    if extracted.strip():
                                        outputs_log.append(f"[{elapsed:.1f}s] {extracted}")
                                last_output_count = len(current_outputs)

                        except Exception as e:
                            outputs_log.append(f"[{elapsed:.1f}s] Error checking outputs: {e}")

                        # Progress update
                        if int(elapsed) % progress_interval == 0 and elapsed > 0:
                            outputs_log.append(f"[PROGRESS: {elapsed:.1f}s elapsed, {last_output_count} outputs so far]")

                        await asyncio.sleep(1)

                    # Get final result
                    if not execution_task.cancelled():
                        try:
                            await execution_task
                            final_outputs = notebook[cell_index].get("outputs", [])
                            outputs_log.append(f"[COMPLETED in {time.time() - start_time:.1f}s]")

                            # Add any final outputs not captured during monitoring
                            if len(final_outputs) > last_output_count:
                                remaining = final_outputs[last_output_count:]
                                for output in remaining:
                                    extracted = extract_output_fn(output)
                                    if extracted.strip():
                                        outputs_log.append(extracted)

                        except Exception as e:
                            outputs_log.append(f"[ERROR: {e}]")

                    return outputs_log if outputs_log else ["[No output generated]"]

                else:
                    # Non-streaming mode: Use forced synchronization
                    logger.info(f"Starting execution of cell {cell_index} with {timeout_seconds}s timeout")

                    try:
                        # Use the forced sync function
                        await execute_cell_with_forced_sync_fn(notebook, cell_index, kernel, timeout_seconds)

                        # Get final outputs
                        outputs = notebook[cell_index].get("outputs", [])
                        result = safe_extract_outputs_fn(outputs)

                        logger.info(f"Cell {cell_index} completed successfully with {len(result)} outputs")
                        return result

                    except asyncio.TimeoutError as e:
                        logger.error(f"Cell {cell_index} execution timed out: {e}")
                        try:
                            if kernel and hasattr(kernel, 'interrupt'):
                                kernel.interrupt()
                                logger.info("Sent interrupt signal to kernel")
                        except Exception as interrupt_err:
                            logger.error(f"Failed to interrupt kernel: {interrupt_err}")

                        # Return partial outputs if available
                        try:
                            outputs = notebook[cell_index].get("outputs", [])
                            partial_outputs = safe_extract_outputs_fn(outputs)
                            partial_outputs.append(f"[TIMEOUT ERROR: Execution exceeded {timeout_seconds} seconds]")
                            return partial_outputs
                        except Exception:
                            pass

                        return [f"[TIMEOUT ERROR: Cell execution exceeded {timeout_seconds} seconds and was interrupted]"]

                    except Exception as e:
                        logger.error(f"Error executing cell {cell_index}: {e}")
                        raise
        else:
            raise ValueError(f"Invalid mode: {mode}")

```

--------------------------------------------------------------------------------
/tests/test_common.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Common test infrastructure shared between MCP_SERVER and JUPYTER_SERVER mode tests.

This module provides:
- MCPClient: MCP protocol client for remote testing
- timeout_wrapper: Decorator for timeout handling
- requires_session: Decorator to check client session connection
- JUPYTER_TOOLS: List of expected tool names
- Helper functions for content extraction
"""

import asyncio
import functools
import json
import logging
from contextlib import AsyncExitStack

import pytest
from mcp import ClientSession, types
from mcp.client.streamable_http import streamablehttp_client


# TODO: could be retrieved from code (inspect)
JUPYTER_TOOLS = [
    # Multi-Notebook Management Tools
    "use_notebook",
    "list_notebooks", 
    "restart_notebook",
    "unuse_notebook",
    # Cell Tools
    "insert_cell",
    "insert_execute_code_cell",
    "overwrite_cell_source",
    "execute_cell",
    "read_cells",
    "list_cells",
    "read_cell",
    "delete_cell",
    "execute_ipython",
    "list_files",
    "list_kernels",
    "assign_kernel_to_notebook",
]


def timeout_wrapper(timeout_seconds=30):
    """Decorator to add timeout handling to async test functions
    
    Windows has known issues with asyncio and network timeouts that can cause 
    tests to hang indefinitely. This decorator adds a safety timeout specifically
    for Windows platforms while allowing other platforms to run normally.
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_seconds)
            except asyncio.TimeoutError:
                pytest.skip(f"Test {func.__name__} timed out ({timeout_seconds}s) - known platform limitation")
            except Exception as e:
                # Check if it's a network timeout related to Windows
                if "ReadTimeout" in str(e) or "TimeoutError" in str(e):
                    pytest.skip(f"Test {func.__name__} hit network timeout - known platform limitation: {e}")
                raise
        return wrapper
    return decorator


def requires_session(func):
    """
    A decorator that checks if the instance has a connected session.
    """
    @functools.wraps(func)
    async def wrapper(self, *args, **kwargs):
        if not self._session:
            raise RuntimeError("Client session is not connected")
        # If the session exists, call the original method
        return await func(self, *args, **kwargs)
    
    return wrapper


class MCPClient:
    """A standard MCP client used to interact with the Jupyter MCP server

    Basically it's a client wrapper for the Jupyter MCP server.
    It uses the `requires_session` decorator to check if the session is connected.
    """

    def __init__(self, url):
        self.url = f"{url}/mcp"
        self._session: ClientSession | None = None
        self._exit_stack = AsyncExitStack()

    async def __aenter__(self):
        """Initiate the session (enter session context)"""
        streams_context = streamablehttp_client(self.url)
        read_stream, write_stream, _ = await self._exit_stack.enter_async_context(
            streams_context
        )
        session_context = ClientSession(read_stream, write_stream)
        self._session = await self._exit_stack.enter_async_context(session_context)
        await self._session.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Close the session (exit session context)"""
        if self._exit_stack:
            await self._exit_stack.aclose()
        self._session = None

    @staticmethod
    def _extract_text_content(result):
        """Extract text content from a result"""
        try:
            logging.debug(f"_extract_text_content: result type={type(result)}, has content={hasattr(result, 'content')}, is tuple={isinstance(result, tuple)}, is list={isinstance(result, list)}")
            
            # Handle tuple results (content, metadata)
            if isinstance(result, tuple) and len(result) >= 2:
                logging.debug(f"_extract_text_content: handling tuple, first element type={type(result[0])}")
                result = result[0]  # Get the content list from the tuple
            
            if hasattr(result, 'content') and result.content and len(result.content) > 0:
                # Check if all items are TextContent
                if all(isinstance(item, types.TextContent) for item in result.content):
                    # If multiple TextContent items, return as JSON list
                    if len(result.content) > 1:
                        texts = [item.text for item in result.content]
                        import json
                        text = json.dumps(texts)
                        logging.debug(f"_extract_text_content: extracted {len(texts)} TextContent items as JSON list")
                        return text
                    else:
                        text = result.content[0].text
                        logging.debug(f"_extract_text_content: extracted from result.content[0].text, length={len(text)}")
                        return text
            # Handle list results directly
            elif isinstance(result, list) and len(result) > 0:
                # Check if all items are TextContent
                if all(isinstance(item, types.TextContent) for item in result):
                    # If multiple TextContent items, return as JSON list
                    if len(result) > 1:
                        texts = [item.text for item in result]
                        import json
                        text = json.dumps(texts)
                        logging.debug(f"_extract_text_content: extracted {len(texts)} TextContent items as JSON list")
                        return text
                    else:
                        text = result[0].text
                        logging.debug(f"_extract_text_content: extracted from list[0].text, length={len(text)}")
                        return text
        except (AttributeError, IndexError, TypeError) as e:
            logging.debug(f"_extract_text_content error: {e}, result type: {type(result)}")
        
        logging.debug(f"_extract_text_content: returning None, could not extract")
        return None

    def _get_structured_content_safe(self, result):
        """Safely get structured content with fallback to text content parsing"""
        content = getattr(result, 'structuredContent', None)
        if content is None:
            # Try to extract from text content as fallback
            text_content = self._extract_text_content(result)
            logging.debug(f"_get_structured_content_safe: text_content={repr(text_content[:200] if text_content else None)}")
            if text_content:
                # Try to parse as JSON
                try:
                    parsed = json.loads(text_content)
                    logging.debug(f"_get_structured_content_safe: JSON parsed successfully, type={type(parsed)}")
                    # Check if it's already a wrapped result or a direct response object
                    if isinstance(parsed, dict):
                        # If it has "result" key, it's already wrapped
                        if "result" in parsed:
                            return parsed
                        # If it has keys like "index", "type", "source" it's a direct object (like CellInfo)
                        elif any(key in parsed for key in ["index", "type", "source", "cells"]):
                            return parsed
                        # Otherwise wrap it
                        else:
                            return {"result": parsed}
                    else:
                        # Lists, strings, etc. - wrap them
                        return {"result": parsed}
                except json.JSONDecodeError:
                    # Not JSON - could be plain text or list representation
                    # Try to evaluate as Python literal (for lists, etc.)
                    try:
                        import ast
                        parsed = ast.literal_eval(text_content)
                        logging.debug(f"_get_structured_content_safe: ast.literal_eval succeeded, type={type(parsed)}, value={repr(parsed)}")
                        return {"result": parsed}
                    except (ValueError, SyntaxError):
                        # Plain text - return as-is
                        logging.debug(f"_get_structured_content_safe: Plain text, wrapping in result dict")
                        return {"result": text_content}
            else:
                # No text content - check if we have ImageContent or mixed content
                if hasattr(result, 'content') and result.content:
                    # Extract mixed content (ImageContent + TextContent)
                    content_list = []
                    for item in result.content:
                        if isinstance(item, types.ImageContent):
                            # Convert ImageContent to dict format
                            content_list.append({
                                'type': 'image',
                                'data': item.data,
                                'mimeType': item.mimeType,
                                'annotations': getattr(item, 'annotations', None),
                                'meta': getattr(item, 'meta', None)
                            })
                        elif isinstance(item, types.TextContent):
                            # Include text content if present
                            content_list.append(item.text)
                    
                    if content_list:
                        logging.debug(f"_get_structured_content_safe: extracted {len(content_list)} items from mixed content")
                        return {"result": content_list}
                
                logging.warning(f"No text content available in result: {type(result)}")
                return None
        return content
    
    async def _call_tool_safe(self, tool_name, arguments=None):
        """Safely call a tool, returning None on error (for test compatibility)"""
        try:
            result = await self._session.call_tool(tool_name, arguments=arguments or {})  # type: ignore
            
            # Log raw result for debugging
            logging.debug(f"_call_tool_safe({tool_name}): raw result type={type(result)}")
            logging.debug(f"_call_tool_safe({tool_name}): raw result={result}")
            
            # Check if result contains error text (for MCP_SERVER mode where errors are wrapped in results)
            text_content = self._extract_text_content(result)
            if text_content and ("Error executing tool" in text_content or "is out of range" in text_content or "not found" in text_content):
                logging.warning(f"Tool {tool_name} returned error in result: {text_content[:100]}")
                return None
            
            # Also check structured content for errors (for JUPYTER_SERVER mode)
            structured_content = self._get_structured_content_safe(result)
            if structured_content:
                # Check if result contains error messages
                result_value = structured_content.get("result")
                if result_value:
                    # Handle both string and list results
                    error_text = ""
                    if isinstance(result_value, str):
                        error_text = result_value
                    elif isinstance(result_value, list) and len(result_value) > 0:
                        error_text = str(result_value[0])
                    
                    if error_text and ("[ERROR:" in error_text or "is out of range" in error_text or "not found" in error_text):
                        logging.warning(f"Tool {tool_name} returned error in structured result: {error_text[:100]}")
                        return None
            
            return result
        except Exception as e:
            # Log the error but return None for test compatibility (JUPYTER_SERVER mode)
            logging.warning(f"Tool {tool_name} raised error: {e}")
            return None

    @requires_session
    async def list_tools(self):
        return await self._session.list_tools()  # type: ignore

    # Multi-Notebook Management Methods
    @requires_session
    async def use_notebook(self, notebook_name, notebook_path=None, mode="connect", kernel_id=None):
        arguments = {
            "notebook_name": notebook_name, 
            "mode": mode,
            "kernel_id": kernel_id
        }
        # Only add notebook_path if provided (for switching, it's optional)
        if notebook_path is not None:
            arguments["notebook_path"] = notebook_path
        
        result = await self._session.call_tool("use_notebook", arguments=arguments)  # type: ignore
        return self._extract_text_content(result)
    
    @requires_session
    async def list_notebooks(self):
        result = await self._session.call_tool("list_notebooks")  # type: ignore
        return self._extract_text_content(result)
    
    @requires_session
    async def restart_notebook(self, notebook_name):
        result = await self._session.call_tool("restart_notebook", arguments={"notebook_name": notebook_name})  # type: ignore
        return self._extract_text_content(result)
    
    @requires_session
    async def unuse_notebook(self, notebook_name):
        result = await self._session.call_tool("unuse_notebook", arguments={"notebook_name": notebook_name})  # type: ignore
        return self._extract_text_content(result)
    
    @requires_session
    async def insert_cell(self, cell_index, cell_type, cell_source):
        result = await self._call_tool_safe("insert_cell", {"cell_index": cell_index, "cell_type": cell_type, "cell_source": cell_source})
        return self._get_structured_content_safe(result) if result else None

    @requires_session
    async def insert_execute_code_cell(self, cell_index, cell_source):
        result = await self._call_tool_safe("insert_execute_code_cell", {"cell_index": cell_index, "cell_source": cell_source})
        structured = self._get_structured_content_safe(result) if result else None
        
        # Special handling for insert_execute_code_cell: tool returns list[str | ImageContent]
        # In JUPYTER_SERVER mode, the list gets flattened to a single string in TextContent
        # In MCP_SERVER mode, it's properly wrapped in structured content as {"result": [...]}
        if structured and "result" in structured:
            result_value = structured["result"]
            # If result is not already a list, wrap it in a list to match the tool's return type
            if not isinstance(result_value, list):
                # Wrap the single value in a list
                structured["result"] = [result_value]
        return structured

    @requires_session
    async def read_cell(self, cell_index):
        result = await self._call_tool_safe("read_cell", {"cell_index": cell_index})
        return self._get_structured_content_safe(result) if result else None

    @requires_session
    async def read_cells(self):
        result = await self._session.call_tool("read_cells")  # type: ignore
        structured = self._get_structured_content_safe(result)
        
        # read_cells returns a list of cell dicts directly
        # If wrapped in {"result": ...}, unwrap it
        if structured and "result" in structured:
            cells_list = structured["result"]
            # If the result is a list of JSON strings, parse each one
            if isinstance(cells_list, list) and len(cells_list) > 0 and isinstance(cells_list[0], str):
                try:
                    import json
                    cells_list = [json.loads(cell_str) for cell_str in cells_list]
                except (json.JSONDecodeError, TypeError):
                    pass
            return cells_list
        return structured

    @requires_session
    async def list_cells(self, max_retries=3):
        """List cells with retry mechanism for Windows compatibility"""
        for attempt in range(max_retries):
            try:
                result = await self._session.call_tool("list_cells")  # type: ignore
                text_result = self._extract_text_content(result)
                logging.debug(f"list_cells attempt {attempt + 1}: text_result type={type(text_result)}, len={len(text_result) if text_result else 0}")
                logging.debug(f"list_cells attempt {attempt + 1}: text_result[:500]={repr(text_result[:500]) if text_result else 'None'}")
                has_index_type = ("Index\tType" in text_result) if text_result else False
                logging.debug(f"list_cells attempt {attempt + 1}: has_index_type={has_index_type}")
                if text_result is not None and not text_result.startswith("Error") and "Index\tType" in text_result:
                    return text_result
                else:
                    logging.warning(f"list_cells returned unexpected result on attempt {attempt + 1}/{max_retries}")
                    if attempt < max_retries - 1:
                        await asyncio.sleep(0.5)
            except Exception as e:
                logging.error(f"list_cells failed on attempt {attempt + 1}/{max_retries}: {e}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(0.5)
                else:
                    logging.error("list_cells failed after all retries")
                    return "Error: Failed to retrieve cell list after all retries"
                    
        return "Error: Failed to retrieve cell list after all retries"

    @requires_session
    async def list_kernels(self):
        """List all available kernels"""
        result = await self._session.call_tool("list_kernels")  # type: ignore
        return self._extract_text_content(result)

    @requires_session
    async def delete_cell(self, cell_index):
        result = await self._call_tool_safe("delete_cell", {"cell_index": cell_index})
        return self._get_structured_content_safe(result) if result else None

    @requires_session
    async def execute_cell_streaming(self, cell_index):
        result = await self._call_tool_safe("execute_cell_streaming", {"cell_index": cell_index})
        return self._get_structured_content_safe(result) if result else None
    
    @requires_session
    async def execute_cell_with_progress(self, cell_index):
        result = await self._call_tool_safe("execute_cell_with_progress", {"cell_index": cell_index})
        structured = self._get_structured_content_safe(result) if result else None
        
        # Handle JUPYTER_SERVER mode flattening list responses to single string
        if structured and "result" in structured:
            result_value = structured["result"]
            if not isinstance(result_value, list):
                structured["result"] = [result_value]
        return structured

    @requires_session
    async def execute_cell(self, cell_index, timeout_seconds=300, stream=False, progress_interval=5):
        result = await self._call_tool_safe("execute_cell", {
            "cell_index": cell_index,
            "timeout_seconds": timeout_seconds,
            "stream": stream,
            "progress_interval": progress_interval
        })
        structured = self._get_structured_content_safe(result) if result else None

        # Handle JUPYTER_SERVER mode flattening list responses to single string
        if structured and "result" in structured:
            result_value = structured["result"]
            if not isinstance(result_value, list):
                structured["result"] = [result_value]
        return structured

    @requires_session
    async def overwrite_cell_source(self, cell_index, cell_source):
        result = await self._call_tool_safe("overwrite_cell_source", {"cell_index": cell_index, "cell_source": cell_source})
        return self._get_structured_content_safe(result) if result else None

    @requires_session
    async def execute_ipython(self, code, timeout=60):
        result = await self._session.call_tool("execute_ipython", arguments={"code": code, "timeout": timeout})  # type: ignore
        structured = self._get_structured_content_safe(result)
        
        # execute_ipython should always return a list of outputs
        # If we got a plain string, wrap it as a list
        if structured and "result" in structured:
            result_val = structured["result"]
            if isinstance(result_val, str):
                # Single output string, wrap as list
                structured["result"] = [result_val]
            elif not isinstance(result_val, list):
                # Some other type, wrap as list
                structured["result"] = [result_val]
        
        return structured

    @requires_session
    async def append_execute_code_cell(self, cell_source):
        """Append and execute a code cell at the end of the notebook."""
        return await self.insert_execute_code_cell(-1, cell_source)

    @requires_session
    async def append_markdown_cell(self, cell_source):
        """Append a markdown cell at the end of the notebook."""
        return await self.insert_cell(-1, "markdown", cell_source)
    
    # Helper method to get cell count from list_cells output
    @requires_session
    async def get_cell_count(self):
        """Get the number of cells by parsing list_cells output"""
        cell_list = await self.list_cells()
        if "Error" in cell_list or "Index\tType" not in cell_list:
            return 0
        lines = cell_list.split('\n')
        data_lines = [line for line in lines if '\t' in line and not line.startswith('Index') and not line.startswith('-')]
        return len(data_lines)

```

--------------------------------------------------------------------------------
/tests/test_tools.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

"""
Integration tests for Jupyter MCP Server - Both MCP_SERVER and JUPYTER_SERVER modes.

This test suite validates the Jupyter MCP Server in both deployment modes:

1. **MCP_SERVER Mode**: Standalone server using HTTP/WebSocket to Jupyter
2. **JUPYTER_SERVER Mode**: Extension with direct serverapp API access

Tests are parametrized to run against both modes using the same MCPClient,
ensuring consistent behavior across both deployment patterns.

Launch the tests:
```
$ pytest tests/test_server.py -v
```
"""

import logging
import platform
from http import HTTPStatus

import pytest
import requests

from .test_common import MCPClient, JUPYTER_TOOLS, timeout_wrapper
from .conftest import JUPYTER_TOKEN


###############################################################################
# Health Tests
###############################################################################

def test_jupyter_health(jupyter_server):
    """Test the Jupyter server health"""
    logging.info(f"Testing service health ({jupyter_server})")
    response = requests.get(
        f"{jupyter_server}/api/status",
        headers={
            "Authorization": f"token {JUPYTER_TOKEN}",
        },
    )
    assert response.status_code == HTTPStatus.OK


@pytest.mark.parametrize(
    "jupyter_mcp_server,kernel_expected_status",
    [(True, "alive"), (False, "not_initialized")],
    indirect=["jupyter_mcp_server"],
    ids=["start_runtime", "no_runtime"],
)
def test_mcp_health(jupyter_mcp_server, kernel_expected_status):
    """Test the MCP Jupyter server health"""
    logging.info(f"Testing MCP server health ({jupyter_mcp_server})")
    response = requests.get(f"{jupyter_mcp_server}/api/healthz")
    assert response.status_code == HTTPStatus.OK
    data = response.json()
    logging.debug(data)
    assert data.get("status") == "healthy"
    assert data.get("kernel_status") == kernel_expected_status


@pytest.mark.asyncio
async def test_mcp_tool_list(mcp_client_parametrized: MCPClient):
    """Check that the list of tools can be retrieved in both MCP_SERVER and JUPYTER_SERVER modes"""
    async with mcp_client_parametrized:
        tools = await mcp_client_parametrized.list_tools()
    tools_name = [tool.name for tool in tools.tools]
    logging.debug(f"tools_name: {tools_name}")
    assert len(tools_name) == len(JUPYTER_TOOLS) and sorted(tools_name) == sorted(
        JUPYTER_TOOLS
    )


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_markdown_cell(mcp_client_parametrized: MCPClient, content="Hello **World** !"):
    """Test markdown cell manipulation in both MCP_SERVER and JUPYTER_SERVER modes"""

    async def check_and_delete_markdown_cell(client: MCPClient, index, content):
        """Check and delete a markdown cell"""
        # reading and checking the content of the created cell
        cell_info = await client.read_cell(index)
        logging.debug(f"cell_info: {cell_info}")
        assert cell_info["index"] == index
        assert cell_info["type"] == "markdown"
        # TODO: don't now if it's normal to get a list of characters instead of a string
        assert "".join(cell_info["source"]) == content
        # reading all cells
        cells_info = await client.read_cells()
        assert cells_info is not None, "read_cells result should not be None"
        logging.debug(f"cells_info: {cells_info}")
        # Check that our cell is in the expected position with correct content
        assert "".join(cells_info[index]["source"]) == content
        # delete created cell
        result = await client.delete_cell(index)
        assert result is not None, "delete_cell result should not be None"
        assert result["result"] == f"Cell {index} (markdown) deleted successfully."

    async with mcp_client_parametrized:
        # Get initial cell count
        initial_count = await mcp_client_parametrized.get_cell_count()
        
        # append markdown cell using -1 index
        result = await mcp_client_parametrized.insert_cell(-1, "markdown", content)
        assert result is not None, "insert_cell result should not be None"
        assert "Cell inserted successfully" in result["result"]
        assert f"index {initial_count} (markdown)" in result["result"]
        await check_and_delete_markdown_cell(mcp_client_parametrized, initial_count, content)
        
        # insert markdown cell at the end (safer than index 0)
        result = await mcp_client_parametrized.insert_cell(initial_count, "markdown", content)
        assert result is not None, "insert_cell result should not be None"
        assert "Cell inserted successfully" in result["result"]
        assert f"index {initial_count} (markdown)" in result["result"]
        await check_and_delete_markdown_cell(mcp_client_parametrized, initial_count, content)


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_code_cell(mcp_client_parametrized: MCPClient, content="1 + 1"):
    """Test code cell manipulation in both MCP_SERVER and JUPYTER_SERVER modes"""
    async def check_and_delete_code_cell(client: MCPClient, index, content):
        """Check and delete a code cell"""
        # reading and checking the content of the created cell
        cell_info = await client.read_cell(index)
        logging.debug(f"cell_info: {cell_info}")
        assert cell_info["index"] == index
        assert cell_info["type"] == "code"
        assert "".join(cell_info["source"]) == content
        # reading all cells
        cells_info = await client.read_cells()
        logging.debug(f"cells_info: {cells_info}")
        # read_cells returns the list directly (unwrapped)
        assert "".join(cells_info[index]["source"]) == content
        # delete created cell
        result = await client.delete_cell(index)
        assert result["result"] == f"Cell {index} (code) deleted successfully."

    async with mcp_client_parametrized:
        # Get initial cell count
        initial_count = await mcp_client_parametrized.get_cell_count()
        
        # append and execute code cell using -1 index
        index = initial_count
        code_result = await mcp_client_parametrized.insert_execute_code_cell(-1, content)
        logging.debug(f"code_result: {code_result}")
        assert code_result is not None, "insert_execute_code_cell result should not be None"
        assert len(code_result["result"]) > 0, "insert_execute_code_cell should return non-empty result"
        # The first output should be the execution result, convert to int for comparison
        first_output = code_result["result"][0]
        first_output_value = int(first_output) if isinstance(first_output, str) else first_output
        assert first_output_value == eval(content), f"Expected {eval(content)}, got {first_output_value}"
        await check_and_delete_code_cell(mcp_client_parametrized, index, content)
        
        # insert and execute code cell at the end (safer than index 0)
        index = initial_count
        code_result = await mcp_client_parametrized.insert_execute_code_cell(index, content)
        logging.debug(f"code_result: {code_result}")
        expected_result = eval(content)
        assert int(code_result["result"][0]) == expected_result
        # overwrite content and test different cell execution modes
        content = f"({content}) * 2"
        expected_result = eval(content)
        result = await mcp_client_parametrized.overwrite_cell_source(index, content)
        logging.debug(f"result: {result}")
        # The server returns a message with diff content
        assert "Cell" in result["result"] and "overwritten successfully" in result["result"]
        assert "diff" in result["result"]  # Should contain diff output
        code_result = await mcp_client_parametrized.execute_cell(index)
        assert int(code_result["result"][0]) == expected_result
        await check_and_delete_code_cell(mcp_client_parametrized, index, content)


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_list_cells(mcp_client_parametrized: MCPClient):
    """Test list_cells functionality in both MCP_SERVER and JUPYTER_SERVER modes"""
    async with mcp_client_parametrized:
        # Test initial list_cells (notebook.ipynb has multiple cells)
        cell_list = await mcp_client_parametrized.list_cells()
        logging.debug(f"Initial cell list: {cell_list}")
        assert isinstance(cell_list, str)
        
        # Check for error conditions and skip if network issues occur
        if cell_list.startswith("Error executing tool list_cells") or cell_list.startswith("Error: Failed to retrieve"):
            pytest.skip(f"Network timeout occurred during list_cells operation: {cell_list}")
        
        assert "Index\tType\tCount\tFirst Line" in cell_list
        # The notebook has both markdown and code cells - just verify structure
        lines = cell_list.split('\n')
        data_lines = [line for line in lines if '\t' in line and not line.startswith('Index')]
        assert len(data_lines) >= 1  # Should have at least some cells
        
        # Add a markdown cell and test again
        markdown_content = "# Test Markdown Cell"
        await mcp_client_parametrized.insert_cell(-1, "markdown", markdown_content)
        
        # Check list_cells with added markdown cell
        cell_list = await mcp_client_parametrized.list_cells()
        logging.debug(f"Cell list after adding markdown: {cell_list}")
        lines = cell_list.split('\n')
        
        # Should have header, separator, and multiple data lines
        assert len(lines) >= 4  # header + separator + at least some cells
        assert "Index\tType\tCount\tFirst Line" in lines[0]
        
        # Check that the added cell is listed
        data_lines = [line for line in lines if '\t' in line and not line.startswith('Index')]
        assert len(data_lines) >= 10  # Should have at least the original 10 cells
        
        # Check that our added cell appears in the list
        assert any("# Test Markdown Cell" in line for line in data_lines)
        
        # Add a code cell with long content to test truncation
        long_code = "# This is a very long comment that should be truncated when displayed in the list because it exceeds the 50 character limit"
        await mcp_client_parametrized.insert_execute_code_cell(-1, "print('Hello World')")
        
        # Check list_cells with truncated content
        cell_list = await mcp_client_parametrized.list_cells()
        logging.debug(f"Cell list after adding long code: {cell_list}")
        
        # Clean up by deleting added cells (in reverse order)
        # Get current cell count to determine indices of added cells
        current_count = await mcp_client_parametrized.get_cell_count()
        # Delete the last two cells we added
        await mcp_client_parametrized.delete_cell(current_count - 1)  # Remove the code cell
        await mcp_client_parametrized.delete_cell(current_count - 2)  # Remove the markdown cell

@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_overwrite_cell_diff(mcp_client_parametrized: MCPClient):
    """Test overwrite_cell_source diff functionality in both MCP_SERVER and JUPYTER_SERVER modes"""
    async with mcp_client_parametrized:
        # Get initial cell count
        initial_count = await mcp_client_parametrized.get_cell_count()
        
        # Add a code cell with initial content
        initial_content = "x = 10\nprint(x)"
        await mcp_client_parametrized.append_execute_code_cell(initial_content)
        cell_index = initial_count
        
        # Overwrite with modified content
        new_content = "x = 20\ny = 30\nprint(x + y)"
        result = await mcp_client_parametrized.overwrite_cell_source(cell_index, new_content)
        
        # Verify diff output format
        assert result is not None, "overwrite_cell_source should not return None for valid input"
        result_text = result.get("result", "") if isinstance(result, dict) else str(result)
        assert f"Cell {cell_index} overwritten successfully!" in result_text
        assert "```diff" in result_text
        assert "```" in result_text  # Should have closing diff block
        
        # Verify diff content shows changes
        assert "-" in result_text  # Should show deletions
        assert "+" in result_text  # Should show additions
        
        # Test overwriting with identical content (no changes)
        result_no_change = await mcp_client_parametrized.overwrite_cell_source(cell_index, new_content)
        assert result_no_change is not None, "overwrite_cell_source should not return None"
        no_change_text = result_no_change.get("result", "") if isinstance(result_no_change, dict) else str(result_no_change)
        assert "no changes detected" in no_change_text
        
        # Test overwriting markdown cell
        await mcp_client_parametrized.append_markdown_cell("# Original Title")
        markdown_index = initial_count + 1
        
        markdown_result = await mcp_client_parametrized.overwrite_cell_source(markdown_index, "# Updated Title\n\nSome content")
        assert markdown_result is not None, "overwrite_cell_source should not return None for markdown cell"
        markdown_text = markdown_result.get("result", "") if isinstance(markdown_result, dict) else str(markdown_result)
        assert f"Cell {markdown_index} overwritten successfully!" in markdown_text
        assert "```diff" in markdown_text
        assert "Updated Title" in markdown_text
        
        # Clean up: delete the test cells
        await mcp_client_parametrized.delete_cell(markdown_index)  # Delete markdown cell first (higher index)
        await mcp_client_parametrized.delete_cell(cell_index)      # Then delete code cell

@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_bad_index(mcp_client_parametrized: MCPClient, index=99):
    """Test behavior of all index-based tools if the index does not exist in both modes"""
    async with mcp_client_parametrized:
        assert await mcp_client_parametrized.read_cell(index) is None
        assert await mcp_client_parametrized.insert_cell(index, "markdown", "test") is None
        assert await mcp_client_parametrized.insert_execute_code_cell(index, "1 + 1") is None
        assert await mcp_client_parametrized.overwrite_cell_source(index, "1 + 1") is None
        assert await mcp_client_parametrized.execute_cell(index) is None
        assert await mcp_client_parametrized.delete_cell(index) is None


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_multimodal_output(mcp_client_parametrized: MCPClient):
    """Test multimodal output functionality with image generation in both modes"""
    async with mcp_client_parametrized:
        # Get initial cell count
        initial_count = await mcp_client_parametrized.get_cell_count()
        
        # Test image generation code using PIL (lightweight)
        image_code = """
from PIL import Image, ImageDraw
import io
import base64

# Create a simple test image using PIL
width, height = 200, 100
image = Image.new('RGB', (width, height), color='white')
draw = ImageDraw.Draw(image)

# Draw a simple pattern
draw.rectangle([10, 10, 190, 90], outline='blue', width=2)
draw.ellipse([20, 20, 80, 80], fill='red')
draw.text((100, 40), "Test Image", fill='black')

# Convert to PNG and display
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)

# Display the image (this should generate image/png output)
from IPython.display import Image as IPythonImage, display
display(IPythonImage(buffer.getvalue()))
"""
        
        # Execute the image generation code
        result = await mcp_client_parametrized.insert_execute_code_cell(-1, image_code)
        cell_index = initial_count
        
        # Check that result is not None and contains outputs
        assert result is not None, "Result should not be None"
        assert "result" in result, "Result should contain 'result' key"
        outputs = result["result"]
        assert isinstance(outputs, list), "Outputs should be a list"
        
        # Check for image output or placeholder
        has_image_output = False
        for output in outputs:
            if isinstance(output, str):
                # Check for image placeholder or actual image content
                if ("Image Output (PNG)" in output or 
                    "image display" in output.lower() or
                    output.strip() == ''):
                    has_image_output = True
                    break
            elif isinstance(output, dict):
                # Check for ImageContent dictionary format (from safe_extract_outputs)
                if (output.get('type') == 'image' and 
                    'data' in output and 
                    output.get('mimeType') == 'image/png'):
                    has_image_output = True
                    logging.info(f"Found ImageContent object with {len(output['data'])} bytes of PNG data")
                    break
                # Check for nbformat output structure (from ExecutionStack)
                elif (output.get('output_type') == 'display_data' and 
                      'data' in output and 
                      'image/png' in output['data']):
                    has_image_output = True
                    png_data = output['data']['image/png']
                    logging.info(f"Found nbformat display_data with {len(png_data)} bytes of PNG data")
                    break
            elif hasattr(output, 'data') and hasattr(output, 'mimeType'):
                # This would be an actual ImageContent object
                if output.mimeType == "image/png":
                    has_image_output = True
                    break
        
        # We should have some indication of image output
        assert has_image_output, f"Expected image output indication, got: {outputs}"
        
        # Test with ALLOW_IMG_OUTPUT environment variable control
        # Note: In actual deployment, this would be controlled via environment variables
        # For testing, we just verify the code structure is correct
        logging.info(f"Multimodal test completed with outputs: {outputs}")
        
        # Clean up: delete the test cell
        await mcp_client_parametrized.delete_cell(cell_index)


###############################################################################
# Multi-Notebook Management Tests
###############################################################################

@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_multi_notebook_management(mcp_client_parametrized: MCPClient):
    """Test multi-notebook management functionality in both modes"""
    async with mcp_client_parametrized:
        # Test initial state - should show default notebook or no notebooks
        initial_list = await mcp_client_parametrized.list_notebooks()
        logging.debug(f"Initial notebook list: {initial_list}")
        
        # Connect to a new notebook
        connect_result = await mcp_client_parametrized.use_notebook("test_notebooks", "new.ipynb", "connect")
        logging.debug(f"Connect result: {connect_result}")
        assert "Successfully using notebook 'test_notebooks'" in connect_result
        assert "new.ipynb" in connect_result
        
        # List notebooks - should now show the connected notebook
        notebook_list = await mcp_client_parametrized.list_notebooks()
        logging.debug(f"Notebook list after connect: {notebook_list}")
        assert "test_notebooks" in notebook_list
        assert "new.ipynb" in notebook_list
        assert "✓" in notebook_list  # Should be marked as current
        
        # Try to connect to the same notebook again (should fail)
        duplicate_result = await mcp_client_parametrized.use_notebook("test_notebooks", "new.ipynb")
        assert "already using" in duplicate_result
        
        # Test switching between notebooks
        if "default" in notebook_list:
            use_result = await mcp_client_parametrized.use_notebook("default")
            logging.debug(f"Switch to default result: {use_result}")
            assert "Successfully switched to notebook 'default'" in use_result
            
            # Switch back to test notebook
            use_back_result = await mcp_client_parametrized.use_notebook("test_notebooks")
            assert "Successfully switched to notebook 'test_notebooks'" in use_back_result
        
        # Test cell operations on the new notebook
        # First get the cell count of new.ipynb (should have some cells)
        cell_count = await mcp_client_parametrized.get_cell_count()
        assert cell_count >= 2, f"new.ipynb should have at least 2 cells, got {cell_count}"
        
        # Add a test cell to the new notebook
        test_content = "# Multi-notebook test\nprint('Testing multi-notebook')"
        insert_result = await mcp_client_parametrized.insert_cell(-1, "code", test_content)
        assert "Cell inserted successfully" in insert_result["result"]
        
        # Execute the cell
        execute_result = await mcp_client_parametrized.insert_execute_code_cell(-1, "2 + 3")
        assert "5" in str(execute_result["result"])
        
        # Test restart notebook
        restart_result = await mcp_client_parametrized.restart_notebook("test_notebooks")
        logging.debug(f"Restart result: {restart_result}")
        assert "restarted successfully" in restart_result
        
        # Test unuse notebook
        disconnect_result = await mcp_client_parametrized.unuse_notebook("test_notebooks")
        logging.debug(f"Unuse result: {disconnect_result}")
        assert "unused successfully" in disconnect_result
        
        # Verify notebook is no longer in the list
        final_list = await mcp_client_parametrized.list_notebooks()
        logging.debug(f"Final notebook list: {final_list}")
        if "No notebooks are currently connected" not in final_list:
            assert "test_notebooks" not in final_list


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_multi_notebook_cell_operations(mcp_client_parametrized: MCPClient):
    """Test cell operations across multiple notebooks in both modes"""
    async with mcp_client_parametrized:
        # Connect to the new notebook
        await mcp_client_parametrized.use_notebook("notebook_a", "new.ipynb")
        
        # Get initial cell count for notebook A
        count_a = await mcp_client_parametrized.get_cell_count()
        
        # Add a cell to notebook A
        await mcp_client_parametrized.insert_cell(-1, "markdown", "# This is notebook A")
        
        # Connect to default notebook (if it exists)
        try:
            # Try to connect to notebook.ipynb as notebook_b
            await mcp_client_parametrized.use_notebook("notebook_b", "notebook.ipynb")
            
            # Switch to notebook B
            await mcp_client_parametrized.use_notebook("notebook_b")
            
            # Get cell count for notebook B
            count_b = await mcp_client_parametrized.get_cell_count()
            
            # Add a cell to notebook B
            await mcp_client_parametrized.insert_cell(-1, "markdown", "# This is notebook B")
            
            # Switch back to notebook A
            await mcp_client_parametrized.use_notebook("notebook_a")
            
            # Verify we're working with notebook A
            cell_list_a = await mcp_client_parametrized.list_cells()
            assert "This is notebook A" in cell_list_a
            
            # Switch to notebook B and verify
            await mcp_client_parametrized.use_notebook("notebook_b")
            cell_list_b = await mcp_client_parametrized.list_cells()
            assert "This is notebook B" in cell_list_b
            
            # Clean up - unuse both notebooks
            await mcp_client_parametrized.unuse_notebook("notebook_a")
            await mcp_client_parametrized.unuse_notebook("notebook_b")
            
        except Exception as e:
            logging.warning(f"Could not test with notebook.ipynb: {e}")
            # Clean up notebook A only
            await mcp_client_parametrized.unuse_notebook("notebook_a")


@pytest.mark.asyncio 
@timeout_wrapper(30)
async def test_notebooks_error_cases(mcp_client_parametrized: MCPClient):
    """Test error handling for notebook management in both modes"""
    async with mcp_client_parametrized:
        # Test connecting to non-existent notebook
        error_result = await mcp_client_parametrized.use_notebook("nonexistent", "nonexistent.ipynb")
        logging.debug(f"Nonexistent notebook result: {error_result}")
        assert "not found" in error_result.lower() or "not a valid file" in error_result.lower()
        
        # Test operations on non-used notebook
        restart_error = await mcp_client_parametrized.restart_notebook("nonexistent_notebook")
        assert "not connected" in restart_error
        
        disconnect_error = await mcp_client_parametrized.unuse_notebook("nonexistent_notebook") 
        assert "not connected" in disconnect_error
        
        use_error = await mcp_client_parametrized.use_notebook("nonexistent_notebook")
        assert "not connected" in use_error
        
        # Test invalid notebook paths
        invalid_path_result = await mcp_client_parametrized.use_notebook("test", "../invalid/path.ipynb")
        assert "not found" in invalid_path_result.lower() or "not a valid file" in invalid_path_result.lower()


###############################################################################
# execute_ipython Tests
###############################################################################

@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_execute_ipython_python_code(mcp_client_parametrized: MCPClient):
    """Test execute_ipython with basic Python code in both modes"""
    async with mcp_client_parametrized:
        # Test simple Python code
        result = await mcp_client_parametrized.execute_ipython("print('Hello IPython World!')")
        
        # On Windows, if result is None it's likely due to timeout - skip the test
        if platform.system() == "Windows" and result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
        
        assert result is not None, "execute_ipython result should not be None"
        assert "result" in result, "Result should contain 'result' key"
        outputs = result["result"]
        assert isinstance(outputs, list), "Outputs should be a list"
        
        # Check for expected output
        output_text = "".join(str(output) for output in outputs)
        assert "Hello IPython World!" in output_text or "[No output generated]" in output_text
        
        # Test mathematical calculation
        calc_result = await mcp_client_parametrized.execute_ipython("result = 2 ** 10\nprint(f'2^10 = {result}')")
        
        if platform.system() == "Windows" and calc_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        assert calc_result is not None
        calc_outputs = calc_result["result"]
        calc_text = "".join(str(output) for output in calc_outputs)
        assert "2^10 = 1024" in calc_text or "[No output generated]" in calc_text


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_execute_ipython_magic_commands(mcp_client_parametrized: MCPClient):
    """Test execute_ipython with IPython magic commands in both modes"""
    async with mcp_client_parametrized:
        # Test %who magic command (list variables)
        result = await mcp_client_parametrized.execute_ipython("%who")
        
        # On Windows, if result is None it's likely due to timeout - skip the test
        if platform.system() == "Windows" and result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
        
        assert result is not None, "execute_ipython result should not be None"
        outputs = result["result"]
        assert isinstance(outputs, list), "Outputs should be a list"
        
        # Set a variable first, then use %who to see it
        var_result = await mcp_client_parametrized.execute_ipython("test_var = 42")
        if platform.system() == "Windows" and var_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        who_result = await mcp_client_parametrized.execute_ipython("%who")
        if platform.system() == "Windows" and who_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        who_outputs = who_result["result"]
        who_text = "".join(str(output) for output in who_outputs)
        # %who should show our variable (or no output if variables exist but aren't shown)
        # This test mainly ensures %who doesn't crash
        
        # Test %timeit magic command
        timeit_result = await mcp_client_parametrized.execute_ipython("%timeit sum(range(100))")
        if platform.system() == "Windows" and timeit_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        assert timeit_result is not None
        timeit_outputs = timeit_result["result"]
        timeit_text = "".join(str(output) for output in timeit_outputs)
        # timeit should produce some timing output or complete without error
        assert len(timeit_text) >= 0  # Just ensure no crash


@pytest.mark.asyncio 
@timeout_wrapper(30)
async def test_execute_ipython_shell_commands(mcp_client_parametrized: MCPClient):
    """Test execute_ipython with shell commands in both modes"""
    async with mcp_client_parametrized:
        # Test basic shell command - echo (works on most systems)
        result = await mcp_client_parametrized.execute_ipython("!echo 'Hello from shell'")
        
        # On Windows, if result is None it's likely due to timeout - skip the test
        if platform.system() == "Windows" and result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
        
        assert result is not None, "execute_ipython result should not be None"
        outputs = result["result"]
        assert isinstance(outputs, list), "Outputs should be a list"
        
        output_text = "".join(str(output) for output in outputs)
        # Shell command should either work or be handled gracefully
        assert len(output_text) >= 0  # Just ensure no crash
        
        # Test Python version check
        python_result = await mcp_client_parametrized.execute_ipython("!python --version")
        if platform.system() == "Windows" and python_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        assert python_result is not None
        python_outputs = python_result["result"]
        python_text = "".join(str(output) for output in python_outputs)
        # Should show Python version or complete without error
        assert len(python_text) >= 0


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_execute_ipython_timeout(mcp_client_parametrized: MCPClient):
    """Test execute_ipython timeout functionality in both modes"""
    async with mcp_client_parametrized:
        # Test with very short timeout on a potentially long-running command
        result = await mcp_client_parametrized.execute_ipython("import time; time.sleep(5)", timeout=2)
        
        # On Windows, if result is None it's likely due to timeout - skip the test
        if platform.system() == "Windows" and result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
        
        assert result is not None
        outputs = result["result"]
        output_text = "".join(str(output) for output in outputs)
        # Should either complete quickly or timeout
        assert "TIMEOUT ERROR" in output_text or len(output_text) >= 0


@pytest.mark.asyncio
@timeout_wrapper(30)
async def test_execute_ipython_error_handling(mcp_client_parametrized: MCPClient):
    """Test execute_ipython error handling in both modes"""
    async with mcp_client_parametrized:
        # Test syntax error
        result = await mcp_client_parametrized.execute_ipython("invalid python syntax <<<")
        
        # On Windows, if result is None it's likely due to timeout - skip the test
        if platform.system() == "Windows" and result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
        
        assert result is not None
        outputs = result["result"]
        output_text = "".join(str(output) for output in outputs)
        # Should handle the error gracefully
        assert len(output_text) >= 0  # Ensure no crash
        
        # Test runtime error  
        runtime_result = await mcp_client_parametrized.execute_ipython("undefined_variable")
        if platform.system() == "Windows" and runtime_result is None:
            pytest.skip("execute_ipython timed out on Windows - known platform limitation")
            
        assert runtime_result is not None
        runtime_outputs = runtime_result["result"]
        runtime_text = "".join(str(output) for output in runtime_outputs)
        # Should handle the error gracefully
        assert len(runtime_text) >= 0
```

--------------------------------------------------------------------------------
/jupyter_mcp_server/utils.py:
--------------------------------------------------------------------------------

```python
# Copyright (c) 2023-2024 Datalayer, Inc.
#
# BSD 3-Clause License

import re
from typing import Any, Union
from mcp.types import ImageContent
from .env import ALLOW_IMG_OUTPUT


def get_current_notebook_context(notebook_manager=None):
    """
    Get the current notebook path and kernel ID for JUPYTER_SERVER mode.
    
    Args:
        notebook_manager: NotebookManager instance (optional)
        
    Returns:
        Tuple of (notebook_path, kernel_id)
        Falls back to config values if notebook_manager not provided
    """
    from .config import get_config
    
    notebook_path = None
    kernel_id = None
    
    if notebook_manager:
        # Try to get current notebook info from manager
        notebook_path = notebook_manager.get_current_notebook_path()
        current_notebook = notebook_manager.get_current_notebook() or "default"
        kernel_id = notebook_manager.get_kernel_id(current_notebook)
    
    # Fallback to config if not found in manager
    if not notebook_path or not kernel_id:
        config = get_config()
        if not notebook_path:
            notebook_path = config.document_id
        if not kernel_id:
            kernel_id = config.runtime_id
    
    return notebook_path, kernel_id


def extract_output(output: Union[dict, Any]) -> Union[str, ImageContent]:
    """
    Extracts readable output from a Jupyter cell output dictionary.
    Handles both traditional and CRDT-based Jupyter formats.

    Args:
        output: The output from a Jupyter cell (dict or CRDT object).

    Returns:
        str: A string representation of the output.
    """
    # Handle pycrdt._text.Text objects
    if hasattr(output, 'source'):
        return str(output.source)
    
    # Handle CRDT YText objects
    if hasattr(output, '__str__') and 'Text' in str(type(output)):
        text_content = str(output)
        return strip_ansi_codes(text_content)
    
    # Handle lists (common in error tracebacks)
    if isinstance(output, list):
        return '\n'.join(extract_output(item) for item in output)
    
    # Handle traditional dictionary format
    if not isinstance(output, dict):
        return strip_ansi_codes(str(output))
    
    output_type = output.get("output_type")
    
    if output_type == "stream":
        text = output.get("text", "")
        if isinstance(text, list):
            text = ''.join(text)
        elif hasattr(text, 'source'):
            text = str(text.source)
        return strip_ansi_codes(str(text))
    
    elif output_type in ["display_data", "execute_result"]:
        data = output.get("data", {})
        if "image/png" in data:
            if ALLOW_IMG_OUTPUT:
                try:
                    return ImageContent(type="image", data=data["image/png"], mimeType="image/png")
                except Exception:
                    # Fallback to text placeholder on error
                    return "[Image Output (PNG) - Error processing image]"
            else:
                return "[Image Output (PNG) - Image display disabled]"
        if "text/plain" in data:
            plain_text = data["text/plain"]
            if hasattr(plain_text, 'source'):
                plain_text = str(plain_text.source)
            return strip_ansi_codes(str(plain_text))
        elif "text/html" in data:
            return "[HTML Output]"
        else:
            return f"[{output_type} Data: keys={list(data.keys())}]"
    
    elif output_type == "error":
        traceback = output.get("traceback", [])
        if isinstance(traceback, list):
            clean_traceback = []
            for line in traceback:
                if hasattr(line, 'source'):
                    line = str(line.source)
                clean_traceback.append(strip_ansi_codes(str(line)))
            return '\n'.join(clean_traceback)
        else:
            if hasattr(traceback, 'source'):
                traceback = str(traceback.source)
            return strip_ansi_codes(str(traceback))
    
    else:
        return f"[Unknown output type: {output_type}]"


def strip_ansi_codes(text: str) -> str:
    """Remove ANSI escape sequences from text."""
    ansi_escape = re.compile(r'\x1b\[[0-9;]*m')
    return ansi_escape.sub('', text)


def _clean_notebook_outputs(notebook):
    """Remove transient fields from all cell outputs.
    
    The 'transient' field is part of the Jupyter kernel messaging protocol
    but is NOT part of the nbformat schema. This causes validation errors.
    
    Args:
        notebook: nbformat notebook object to clean (modified in place)
    """
    for cell in notebook.cells:
        if cell.cell_type == 'code' and hasattr(cell, 'outputs'):
            for output in cell.outputs:
                if isinstance(output, dict) and 'transient' in output:
                    del output['transient']


def safe_extract_outputs(outputs: Any) -> list[Union[str, ImageContent]]:
    """
    Safely extract all outputs from a cell, handling CRDT structures.
    
    Args:
        outputs: Cell outputs (could be CRDT YArray or traditional list)
        
    Returns:
        list[Union[str, ImageContent]]: List of outputs (strings or image content)
    """
    if not outputs:
        return []
    
    result = []
    
    # Handle CRDT YArray or list of outputs
    if hasattr(outputs, '__iter__') and not isinstance(outputs, (str, dict)):
        try:
            for output in outputs:
                extracted = extract_output(output)
                if extracted:
                    result.append(extracted)
        except Exception as e:
            result.append(f"[Error extracting output: {str(e)}]")
    else:
        # Handle single output
        extracted = extract_output(outputs)
        if extracted:
            result.append(extracted)
    
    return result

def normalize_cell_source(source: Any) -> list[str]:
    """
    Normalize cell source to a list of strings (lines).
    
    In Jupyter notebooks, source can be either:
    - A string (single or multi-line with \n)  
    - A list of strings (each element is a line)
    - CRDT text objects
    
    Args:
        source: The source from a Jupyter cell
        
    Returns:
        list[str]: List of source lines
    """
    if not source:
        return []
    
    # Handle CRDT text objects
    if hasattr(source, 'source'):
        source = str(source.source)
    elif hasattr(source, '__str__') and 'Text' in str(type(source)):
        source = str(source)
    
    # If it's already a list, return as is
    if isinstance(source, list):
        return [str(line) for line in source]
    
    # If it's a string, split by newlines
    if isinstance(source, str):
        # Split by newlines but preserve the newline characters except for the last line
        lines = source.splitlines(keepends=True)
        # Remove trailing newline from the last line if present
        if lines and lines[-1].endswith('\n'):
            lines[-1] = lines[-1][:-1]
        return lines
    
    # Fallback: convert to string and split
    return str(source).splitlines(keepends=True)

def format_TSV(headers: list[str], rows: list[list[str]]) -> str:
    """
    Format data as TSV (Tab-Separated Values)
    
    Args:
        headers: The list of headers
        rows: The list of data rows, each row is a list of strings
    
    Returns:
        The formatted TSV string
    """
    if not headers or not rows:
        return "No data to display"
    
    result = []
    
    header_row = "\t".join(headers)
    result.append(header_row)
    
    for row in rows:
        data_row = "\t".join(str(cell) for cell in row)
        result.append(data_row)
    
    return "\n".join(result)

def get_surrounding_cells_info(notebook, cell_index: int, total_cells: int) -> str:
    """Get information about surrounding cells for context."""
    start_index = max(0, cell_index - 5)
    end_index = min(total_cells, cell_index + 6)
    
    if total_cells == 0:
        return "Notebook is now empty, no cells remaining"
    
    headers = ["Index", "Type", "Count", "First Line"]
    rows = []
    
    for i in range(start_index, end_index):
        if i >= total_cells:
            break
            
        cell_data = notebook[i]
        cell_type = cell_data.get("cell_type", "unknown")
        
        execution_count = (cell_data.get("execution_count") or "None") if cell_type == "code" else "N/A"
        # Get first line of source
        source_lines = normalize_cell_source(cell_data.get("source", ""))
        first_line = source_lines[0] if source_lines else ""
        # Mark the target cell
        marker = " <-- NEW" if i == cell_index else ""
        rows.append([i, cell_type, execution_count, first_line+marker])
    
    return format_TSV(headers, rows)


###############################################################################
# Kernel and notebook operation helpers
###############################################################################


def create_kernel(config, logger):
    """Create a new kernel instance using current configuration."""
    from jupyter_kernel_client import KernelClient
    kernel = None
    try:
        # Initialize the kernel client with the provided parameters.
        kernel = KernelClient(
            server_url=config.runtime_url, 
            token=config.runtime_token, 
            kernel_id=config.runtime_id
        )
        kernel.start()
        logger.info("Kernel created and started successfully")
        return kernel
    except Exception as e:
        logger.error(f"Failed to create kernel: {e}")
        # Clean up partially initialized kernel to prevent __del__ errors
        if kernel is not None:
            try:
                # Try to clean up the kernel object if it exists
                if hasattr(kernel, 'stop'):
                    kernel.stop()
            except Exception as cleanup_error:
                logger.debug(f"Error during kernel cleanup: {cleanup_error}")
        raise


def start_kernel(notebook_manager, config, logger):
    """Start the Jupyter kernel with error handling (for backward compatibility)."""
    try:
        # Remove existing default notebook if any
        if "default" in notebook_manager:
            notebook_manager.remove_notebook("default")
        
        # Create and set up new kernel
        kernel = create_kernel(config, logger)
        notebook_manager.add_notebook("default", kernel)
        logger.info("Default notebook kernel started successfully")
    except Exception as e:
        logger.error(f"Failed to start kernel: {e}")
        raise


def ensure_kernel_alive(notebook_manager, current_notebook, create_kernel_fn):
    """Ensure kernel is running, restart if needed."""
    return notebook_manager.ensure_kernel_alive(current_notebook, create_kernel_fn)


async def execute_cell_with_timeout(notebook, cell_index, kernel, timeout_seconds, logger):
    """Execute a cell with timeout and real-time output sync."""
    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    start_time = time.time()
    
    def _execute_sync():
        return notebook.execute_cell(cell_index, kernel)
    
    executor = ThreadPoolExecutor(max_workers=1)
    try:
        future = executor.submit(_execute_sync)
        
        while not future.done():
            elapsed = time.time() - start_time
            if elapsed > timeout_seconds:
                future.cancel()
                raise asyncio.TimeoutError(f"Cell execution timed out after {timeout_seconds} seconds")
            
            await asyncio.sleep(2)
            try:
                # Try to force document sync using the correct method
                ydoc = notebook._doc
                if hasattr(ydoc, 'flush') and callable(ydoc.flush):
                    ydoc.flush()  # Flush pending changes
                elif hasattr(notebook, '_websocket') and notebook._websocket:
                    # Force a small update to trigger sync
                    pass  # The websocket should auto-sync
                
                if cell_index < len(ydoc._ycells):
                    outputs = ydoc._ycells[cell_index].get("outputs", [])
                    if outputs:
                        logger.info(f"Cell {cell_index} executing... ({elapsed:.1f}s) - {len(outputs)} outputs so far")
            except Exception as e:
                logger.debug(f"Sync attempt failed: {e}")
                pass
        
        result = future.result()
        return result
        
    finally:
        executor.shutdown(wait=False)


async def execute_cell_with_forced_sync(notebook, cell_index, kernel, timeout_seconds, logger):
    """Execute cell with forced real-time synchronization."""
    import asyncio
    import time
    
    start_time = time.time()
    
    # Start execution
    execution_future = asyncio.create_task(
        asyncio.to_thread(notebook.execute_cell, cell_index, kernel)
    )
    
    last_output_count = 0
    
    while not execution_future.done():
        elapsed = time.time() - start_time
        
        if elapsed > timeout_seconds:
            execution_future.cancel()
            try:
                if hasattr(kernel, 'interrupt'):
                    kernel.interrupt()
            except Exception:
                pass
            raise asyncio.TimeoutError(f"Cell execution timed out after {timeout_seconds} seconds")
        
        # Check for new outputs and try to trigger sync
        try:
            ydoc = notebook._doc
            current_outputs = ydoc._ycells[cell_index].get("outputs", [])
            
            if len(current_outputs) > last_output_count:
                last_output_count = len(current_outputs)
                logger.info(f"Cell {cell_index} progress: {len(current_outputs)} outputs after {elapsed:.1f}s")
                
                # Try different sync methods
                try:
                    # Method 1: Force Y-doc update
                    if hasattr(ydoc, 'observe') and hasattr(ydoc, 'unobserve'):
                        # Trigger observers by making a tiny change
                        pass
                        
                    # Method 2: Force websocket message
                    if hasattr(notebook, '_websocket') and notebook._websocket:
                        # The websocket should automatically sync on changes
                        pass
                        
                except Exception as sync_error:
                    logger.debug(f"Sync method failed: {sync_error}")
                    
        except Exception as e:
            logger.debug(f"Output check failed: {e}")
        
        await asyncio.sleep(1)  # Check every second
    
    # Get final result
    try:
        await execution_future
    except asyncio.CancelledError:
        pass
    
    return None


def is_kernel_busy(kernel):
    """Check if kernel is currently executing something."""
    try:
        # This is a simple check - you might need to adapt based on your kernel client
        if hasattr(kernel, '_client') and hasattr(kernel._client, 'is_alive'):
            return kernel._client.is_alive()
        return False
    except Exception:
        return False


async def wait_for_kernel_idle(kernel, logger, max_wait_seconds=60):
    """Wait for kernel to become idle before proceeding."""
    import asyncio
    import time
    
    start_time = time.time()
    while is_kernel_busy(kernel):
        elapsed = time.time() - start_time
        if elapsed > max_wait_seconds:
            logger.warning(f"Kernel still busy after {max_wait_seconds}s, proceeding anyway")
            break
        logger.info(f"Waiting for kernel to become idle... ({elapsed:.1f}s)")
        await asyncio.sleep(1)


async def safe_notebook_operation(operation_func, logger, max_retries=3):
    """Safely execute notebook operations with connection recovery."""
    import asyncio
    
    for attempt in range(max_retries):
        try:
            return await operation_func()
        except Exception as e:
            error_msg = str(e).lower()
            if any(err in error_msg for err in ["websocketclosederror", "connection is already closed", "connection closed"]):
                if attempt < max_retries - 1:
                    logger.warning(f"Connection lost, retrying... (attempt {attempt + 1}/{max_retries})")
                    await asyncio.sleep(1 + attempt)  # Increasing delay
                    continue
                else:
                    logger.error(f"Failed after {max_retries} attempts: {e}")
                    raise Exception(f"Connection failed after {max_retries} retries: {e}")
            else:
                # Non-connection error, don't retry
                raise e
    
    raise Exception("Unexpected error in retry logic")


def list_files_recursively(server_client, current_path="", current_depth=0, files=None, max_depth=3):
    """Recursively list all files and directories in the Jupyter server."""
    if files is None:
        files = []
    
    # Stop if we've reached max depth
    if current_depth > max_depth:
        return files
    
    try:
        contents = server_client.contents.list_directory(current_path)
        for item in contents:
            full_path = f"{current_path}/{item.name}" if current_path else item.name
            
            # Format size
            size_str = ""
            if hasattr(item, 'size') and item.size is not None:
                if item.size < 1024:
                    size_str = f"{item.size}B"
                elif item.size < 1024 * 1024:
                    size_str = f"{item.size // 1024}KB"
                else:
                    size_str = f"{item.size // (1024 * 1024)}MB"
            
            # Format last modified
            last_modified = ""
            if hasattr(item, 'last_modified') and item.last_modified:
                last_modified = item.last_modified.strftime("%Y-%m-%d %H:%M:%S")
            
            # Add file/directory to list
            files.append({
                'path': full_path,
                'type': item.type,
                'size': size_str,
                'last_modified': last_modified
            })
            
            # Recursively explore directories
            if item.type == "directory":
                list_files_recursively(server_client, full_path, current_depth + 1, files, max_depth)
                
    except Exception as e:
        # If we can't access a directory, add an error entry
        files.append({
            'path': current_path or "root",
            'type': "error",
            'size': "",
            'last_modified': f"Error: {str(e)}"
        })
    
    return files


###############################################################################
# Local code execution helpers (JUPYTER_SERVER mode)
###############################################################################


async def execute_via_execution_stack(
    serverapp: Any,
    kernel_id: str,
    code: str,
    document_id: str = None,
    cell_id: str = None,
    timeout: int = 300,
    poll_interval: float = 0.1,
    logger = None
) -> list[Union[str, ImageContent]]:
    """Execute code using ExecutionStack (JUPYTER_SERVER mode with jupyter-server-nbmodel).
    
    This uses the ExecutionStack from jupyter-server-nbmodel extension directly,
    avoiding the reentrant HTTP call issue. This is the preferred method for code
    execution in JUPYTER_SERVER mode.
    
    Args:
        serverapp: Jupyter server application instance
        kernel_id: Kernel ID to execute in
        code: Code to execute
        document_id: Optional document ID for RTC integration (format: json:notebook:<file_id>)
        cell_id: Optional cell ID for RTC integration
        timeout: Maximum time to wait for execution (seconds)
        poll_interval: Time between polling for results (seconds)
        logger: Logger instance (optional)
        
    Returns:
        List of formatted outputs (strings or ImageContent)
        
    Raises:
        RuntimeError: If jupyter-server-nbmodel extension is not installed
        TimeoutError: If execution exceeds timeout
    """
    import asyncio
    import logging as default_logging
    
    if logger is None:
        logger = default_logging.getLogger(__name__)
    
    try:
        # Get the ExecutionStack from the jupyter_server_nbmodel extension
        nbmodel_extensions = serverapp.extension_manager.extension_apps.get("jupyter_server_nbmodel", set())
        if not nbmodel_extensions:
            raise RuntimeError("jupyter_server_nbmodel extension not found. Please install it.")
        
        nbmodel_ext = next(iter(nbmodel_extensions))
        execution_stack = nbmodel_ext._Extension__execution_stack
        
        # Build metadata for RTC integration if available
        metadata = {}
        if document_id and cell_id:
            metadata = {
                "document_id": document_id,
                "cell_id": cell_id
            }
        
        # Submit execution request
        logger.info(f"Submitting execution request to kernel {kernel_id}")
        request_id = execution_stack.put(kernel_id, code, metadata)
        logger.info(f"Execution request {request_id} submitted")
        
        # Poll for results
        start_time = asyncio.get_event_loop().time()
        while True:
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > timeout:
                raise TimeoutError(f"Execution timed out after {timeout} seconds")
            
            # Get result (returns None if pending, result dict if complete)
            result = execution_stack.get(kernel_id, request_id)
            
            if result is not None:
                # Execution complete
                logger.info(f"Execution request {request_id} completed")
                
                # Check for errors
                if "error" in result:
                    error_info = result["error"]
                    logger.error(f"Execution error: {error_info}")
                    return [f"[ERROR: {error_info.get('ename', 'Unknown')}: {error_info.get('evalue', '')}]"]
                
                # Check for pending input (shouldn't happen with allow_stdin=False)
                if "input_request" in result:
                    logger.warning("Unexpected input request during execution")
                    return ["[ERROR: Unexpected input request]"]
                
                # Extract outputs
                outputs = result.get("outputs", [])
                
                # Parse JSON string if needed (ExecutionStack returns JSON string)
                if isinstance(outputs, str):
                    import json
                    try:
                        outputs = json.loads(outputs)
                    except json.JSONDecodeError:
                        logger.error(f"Failed to parse outputs JSON: {outputs}")
                        return [f"[ERROR: Invalid output format]"]
                
                if outputs:
                    formatted = safe_extract_outputs(outputs)
                    logger.info(f"Execution completed with {len(formatted)} formatted outputs: {formatted}")
                    return formatted
                else:
                    logger.info("Execution completed with no outputs")
                    return ["[No output generated]"]
            
            # Still pending, wait before next poll
            await asyncio.sleep(poll_interval)
            
    except Exception as e:
        logger.error(f"Error executing via ExecutionStack: {e}", exc_info=True)
        return [f"[ERROR: {str(e)}]"]


async def execute_code_local(
    serverapp,
    notebook_path: str,
    code: str,
    kernel_id: str,
    timeout: int = 300,
    logger=None
) -> list[Union[str, ImageContent]]:
    """Execute code in a kernel and return outputs (JUPYTER_SERVER mode).
    
    This is a centralized code execution function for JUPYTER_SERVER mode that:
    1. Gets the kernel from kernel_manager
    2. Creates a client and sends execute_request
    3. Polls for response messages with timeout
    4. Collects and formats outputs
    5. Cleans up resources
    
    Args:
        serverapp: Jupyter ServerApp instance
        notebook_path: Path to the notebook (for context)
        code: Code to execute
        kernel_id: ID of the kernel to execute in
        timeout: Timeout in seconds (default: 300)
        logger: Logger instance (optional)
        
    Returns:
        List of formatted outputs (strings or ImageContent)
    """
    import asyncio
    import zmq.asyncio
    from inspect import isawaitable
    
    if logger is None:
        import logging
        logger = logging.getLogger(__name__)
    
    try:
        # Get kernel manager
        kernel_manager = serverapp.kernel_manager
        
        # Get the kernel using pinned_superclass pattern (like KernelUsageHandler)
        lkm = kernel_manager.pinned_superclass.get_kernel(kernel_manager, kernel_id)
        session = lkm.session
        client = lkm.client()
        
        # Ensure channels are started (critical for receiving IOPub messages!)
        if not client.channels_running:
            client.start_channels()
            # Wait for channels to be ready
            await asyncio.sleep(0.1)
        
        # Send execute request on shell channel
        shell_channel = client.shell_channel
        msg_id = session.msg("execute_request", {
            "code": code,
            "silent": False,
            "store_history": True,
            "user_expressions": {},
            "allow_stdin": False,
            "stop_on_error": False
        })
        shell_channel.send(msg_id)
        
        # Give a moment for messages to start flowing
        await asyncio.sleep(0.01)
        
        # Prepare to collect outputs
        outputs = []
        execution_done = False
        grace_period_ms = 100  # Wait 100ms after shell reply for remaining IOPub messages
        execution_done_time = None
        
        # Poll for messages with timeout
        poller = zmq.asyncio.Poller()
        iopub_socket = client.iopub_channel.socket
        shell_socket = shell_channel.socket
        poller.register(iopub_socket, zmq.POLLIN)
        poller.register(shell_socket, zmq.POLLIN)
        
        timeout_ms = timeout * 1000
        start_time = asyncio.get_event_loop().time()
        
        while not execution_done or (execution_done_time and (asyncio.get_event_loop().time() - execution_done_time) * 1000 < grace_period_ms):
            elapsed_ms = (asyncio.get_event_loop().time() - start_time) * 1000
            remaining_ms = max(0, timeout_ms - elapsed_ms)
            
            # If execution is done and grace period expired, exit
            if execution_done and execution_done_time and (asyncio.get_event_loop().time() - execution_done_time) * 1000 >= grace_period_ms:
                break
            
            if remaining_ms <= 0:
                client.stop_channels()
                logger.warning(f"Code execution timeout after {timeout}s, collected {len(outputs)} outputs")
                return [f"[TIMEOUT ERROR: Code execution exceeded {timeout} seconds]"]
            
            # Use shorter poll timeout during grace period
            poll_timeout = min(remaining_ms, grace_period_ms / 2) if execution_done else remaining_ms
            events = dict(await poller.poll(poll_timeout))
            
            if not events:
                continue  # No messages, continue polling
            
            # IMPORTANT: Process IOPub messages BEFORE shell to collect outputs before marking done
            # Check for IOPub messages (outputs)
            if iopub_socket in events:
                msg = client.iopub_channel.get_msg(timeout=0)
                # Handle async get_msg (like KernelUsageHandler)
                if isawaitable(msg):
                    msg = await msg
                
                if msg and msg.get('parent_header', {}).get('msg_id') == msg_id['header']['msg_id']:
                    msg_type = msg.get('msg_type')
                    content = msg.get('content', {})
                    
                    logger.debug(f"IOPub message: {msg_type}")
                    
                    # Collect output messages
                    if msg_type == 'stream':
                        outputs.append({
                            'output_type': 'stream',
                            'name': content.get('name', 'stdout'),
                            'text': content.get('text', '')
                        })
                        logger.debug(f"Collected stream output: {len(content.get('text', ''))} chars")
                    elif msg_type == 'execute_result':
                        outputs.append({
                            'output_type': 'execute_result',
                            'data': content.get('data', {}),
                            'metadata': content.get('metadata', {}),
                            'execution_count': content.get('execution_count')
                        })
                        logger.debug(f"Collected execute_result, count: {content.get('execution_count')}")
                    elif msg_type == 'display_data':
                        # Note: 'transient' field from kernel messages is NOT part of nbformat schema
                        # Only include 'output_type', 'data', and 'metadata' fields
                        outputs.append({
                            'output_type': 'display_data',
                            'data': content.get('data', {}),
                            'metadata': content.get('metadata', {})
                        })
                        logger.debug("Collected display_data")
                    elif msg_type == 'error':
                        outputs.append({
                            'output_type': 'error',
                            'ename': content.get('ename', ''),
                            'evalue': content.get('evalue', ''),
                            'traceback': content.get('traceback', [])
                        })
                        logger.debug(f"Collected error: {content.get('ename')}")
            
            # Check for shell reply (execution complete) - AFTER processing IOPub
            if shell_socket in events:
                reply = client.shell_channel.get_msg(timeout=0)
                # Handle async get_msg (like KernelUsageHandler)
                if isawaitable(reply):
                    reply = await reply
                
                if reply and reply.get('parent_header', {}).get('msg_id') == msg_id['header']['msg_id']:
                    logger.debug(f"Execution complete, reply status: {reply.get('content', {}).get('status')}")
                    execution_done = True
                    execution_done_time = asyncio.get_event_loop().time()
        
        # Clean up
        client.stop_channels()
        
        # Extract and format outputs
        if outputs:
            result = safe_extract_outputs(outputs)
            logger.info(f"Code execution completed with {len(result)} outputs")
            return result
        else:
            return ["[No output generated]"]
            
    except Exception as e:
        logger.error(f"Error executing code locally: {e}")
        return [f"[ERROR: {str(e)}]"]


async def execute_cell_local(
    serverapp,
    notebook_path: str,
    cell_index: int,
    kernel_id: str,
    timeout: int = 300,
    logger=None
) -> list[Union[str, ImageContent]]:
    """Execute a cell in a notebook and return outputs (JUPYTER_SERVER mode).
    
    This function:
    1. Reads the cell source from the notebook (YDoc or file)
    2. Executes the code using execute_code_local
    3. Writes the outputs back to the notebook (YDoc or file)
    4. Returns the formatted outputs
    
    Args:
        serverapp: Jupyter ServerApp instance
        notebook_path: Path to the notebook
        cell_index: Index of the cell to execute
        kernel_id: ID of the kernel to execute in
        timeout: Timeout in seconds (default: 300)
        logger: Logger instance (optional)
        
    Returns:
        List of formatted outputs (strings or ImageContent)
    """
    import nbformat
    
    if logger is None:
        import logging
        logger = logging.getLogger(__name__)
    
    try:
        # Try to get YDoc first (for collaborative editing)
        file_id_manager = serverapp.web_app.settings.get("file_id_manager")
        ydoc = None
        
        if file_id_manager:
            file_id = file_id_manager.get_id(notebook_path)
            yroom_manager = serverapp.web_app.settings.get("yroom_manager")
            
            if yroom_manager:
                room_id = f"json:notebook:{file_id}"
                if yroom_manager.has_room(room_id):
                    try:
                        yroom = yroom_manager.get_room(room_id)
                        ydoc = await yroom.get_jupyter_ydoc()
                        logger.info(f"Using YDoc for cell {cell_index} execution")
                    except Exception as e:
                        logger.debug(f"Could not get YDoc: {e}")
        
        # Execute using YDoc or file
        if ydoc:
            # YDoc path - read from collaborative document
            if cell_index < 0 or cell_index >= len(ydoc.ycells):
                raise ValueError(f"Cell index {cell_index} out of range. Notebook has {len(ydoc.ycells)} cells.")
            
            cell = ydoc.ycells[cell_index]
            
            # Only execute code cells
            cell_type = cell.get("cell_type", "")
            if cell_type != "code":
                return [f"[Cell {cell_index} is not a code cell (type: {cell_type})]"]
            
            source_raw = cell.get("source", "")
            if isinstance(source_raw, list):
                source = "".join(source_raw)
            else:
                source = str(source_raw)
            
            if not source:
                return ["[Cell is empty]"]
            
            logger.info(f"Cell {cell_index} source from YDoc: {source[:100]}...")
            
            # Execute the code
            outputs = await execute_code_local(
                serverapp=serverapp,
                notebook_path=notebook_path,
                code=source,
                kernel_id=kernel_id,
                timeout=timeout,
                logger=logger
            )
            
            logger.info(f"Execution completed with {len(outputs)} outputs: {outputs}")
            
            # Update execution count in YDoc
            max_count = 0
            for c in ydoc.ycells:
                if c.get("cell_type") == "code" and c.get("execution_count"):
                    max_count = max(max_count, c["execution_count"])
            
            cell["execution_count"] = max_count + 1
            
            # Update outputs in YDoc (simplified - just store formatted strings)
            # YDoc outputs should match nbformat structure
            cell["outputs"] = []
            for output in outputs:
                if isinstance(output, str):
                    cell["outputs"].append({
                        "output_type": "stream",
                        "name": "stdout",
                        "text": output
                    })
            
            return outputs
        else:
            # File path - original logic
            # Read notebook as version 4 (latest) for consistency
            with open(notebook_path, 'r', encoding='utf-8') as f:
                notebook = nbformat.read(f, as_version=4)
            
            # Clean transient fields from outputs
            _clean_notebook_outputs(notebook)
            
            # Validate cell index
            if cell_index < 0 or cell_index >= len(notebook.cells):
                raise ValueError(f"Cell index {cell_index} out of range. Notebook has {len(notebook.cells)} cells.")
        
        cell = notebook.cells[cell_index]
        
        # Only execute code cells
        if cell.cell_type != 'code':
            return [f"[Cell {cell_index} is not a code cell (type: {cell.cell_type})]"]
        
        # Get cell source
        source = cell.source
        if not source:
            return ["[Cell is empty]"]
        
        # Execute the code
        logger.info(f"Executing cell {cell_index} from {notebook_path}")
        outputs = await execute_code_local(
            serverapp=serverapp,
            notebook_path=notebook_path,
            code=source,
            kernel_id=kernel_id,
            timeout=timeout,
            logger=logger
        )
        
        # Write outputs back to notebook (update execution_count and outputs)
        # Get the last execution count
        max_count = 0
        for c in notebook.cells:
            if c.cell_type == 'code' and c.execution_count:
                max_count = max(max_count, c.execution_count)
        
        cell.execution_count = max_count + 1
        
        # Convert formatted outputs back to nbformat structure
        # Note: outputs is already formatted, so we need to reconstruct
        # For simplicity, we'll store a simple representation
        cell.outputs = []
        for output in outputs:
            if isinstance(output, str):
                # Create a stream output
                cell.outputs.append(nbformat.v4.new_output(
                    output_type='stream',
                    name='stdout',
                    text=output
                ))
            elif isinstance(output, ImageContent):
                # Create a display_data output with image
                cell.outputs.append(nbformat.v4.new_output(
                    output_type='display_data',
                    data={'image/png': output.data}
                ))
        
        # Write notebook back
        with open(notebook_path, 'w', encoding='utf-8') as f:
            nbformat.write(notebook, f)
        
        logger.info(f"Cell {cell_index} executed and notebook updated")
        return outputs
        
    except Exception as e:
        logger.error(f"Error executing cell locally: {e}")
        return [f"[ERROR: {str(e)}]"]
```
Page 3/5FirstPrevNextLast