This is page 5 of 12. Use http://codebase.md/oraios/serena?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .devcontainer │ └── devcontainer.json ├── .dockerignore ├── .env.example ├── .github │ ├── FUNDING.yml │ ├── ISSUE_TEMPLATE │ │ ├── config.yml │ │ ├── feature_request.md │ │ └── issue--bug--performance-problem--question-.md │ └── workflows │ ├── codespell.yml │ ├── docker.yml │ ├── junie.yml │ ├── lint_and_docs.yaml │ ├── publish.yml │ └── pytest.yml ├── .gitignore ├── .serena │ ├── memories │ │ ├── adding_new_language_support_guide.md │ │ ├── serena_core_concepts_and_architecture.md │ │ ├── serena_repository_structure.md │ │ └── suggested_commands.md │ └── project.yml ├── .vscode │ └── settings.json ├── CHANGELOG.md ├── CLAUDE.md ├── compose.yaml ├── CONTRIBUTING.md ├── docker_build_and_run.sh ├── DOCKER.md ├── Dockerfile ├── docs │ ├── custom_agent.md │ └── serena_on_chatgpt.md ├── flake.lock ├── flake.nix ├── lessons_learned.md ├── LICENSE ├── llms-install.md ├── public │ └── .gitignore ├── pyproject.toml ├── README.md ├── resources │ ├── serena-icons.cdr │ ├── serena-logo-dark-mode.svg │ ├── serena-logo.cdr │ ├── serena-logo.svg │ └── vscode_sponsor_logo.png ├── roadmap.md ├── scripts │ ├── agno_agent.py │ ├── demo_run_tools.py │ ├── gen_prompt_factory.py │ ├── mcp_server.py │ ├── print_mode_context_options.py │ └── print_tool_overview.py ├── src │ ├── interprompt │ │ ├── __init__.py │ │ ├── .syncCommitId.remote │ │ ├── .syncCommitId.this │ │ ├── jinja_template.py │ │ ├── multilang_prompt.py │ │ ├── prompt_factory.py │ │ └── util │ │ ├── __init__.py │ │ └── class_decorators.py │ ├── README.md │ ├── serena │ │ ├── __init__.py │ │ ├── agent.py │ │ ├── agno.py │ │ ├── analytics.py │ │ ├── cli.py │ │ ├── code_editor.py │ │ ├── config │ │ │ ├── __init__.py │ │ │ ├── context_mode.py │ │ │ └── serena_config.py │ │ ├── constants.py │ │ ├── dashboard.py │ │ ├── generated │ │ │ └── generated_prompt_factory.py │ │ ├── gui_log_viewer.py │ │ ├── mcp.py │ │ ├── project.py │ │ ├── prompt_factory.py │ │ ├── resources │ │ │ ├── config │ │ │ │ ├── contexts │ │ │ │ │ ├── agent.yml │ │ │ │ │ ├── chatgpt.yml │ │ │ │ │ ├── codex.yml │ │ │ │ │ ├── context.template.yml │ │ │ │ │ ├── desktop-app.yml │ │ │ │ │ ├── ide-assistant.yml │ │ │ │ │ └── oaicompat-agent.yml │ │ │ │ ├── internal_modes │ │ │ │ │ └── jetbrains.yml │ │ │ │ ├── modes │ │ │ │ │ ├── editing.yml │ │ │ │ │ ├── interactive.yml │ │ │ │ │ ├── mode.template.yml │ │ │ │ │ ├── no-onboarding.yml │ │ │ │ │ ├── onboarding.yml │ │ │ │ │ ├── one-shot.yml │ │ │ │ │ └── planning.yml │ │ │ │ └── prompt_templates │ │ │ │ ├── simple_tool_outputs.yml │ │ │ │ └── system_prompt.yml │ │ │ ├── dashboard │ │ │ │ ├── dashboard.js │ │ │ │ ├── index.html │ │ │ │ ├── jquery.min.js │ │ │ │ ├── serena-icon-16.png │ │ │ │ ├── serena-icon-32.png │ │ │ │ ├── serena-icon-48.png │ │ │ │ ├── serena-logs-dark-mode.png │ │ │ │ └── serena-logs.png │ │ │ ├── project.template.yml │ │ │ └── serena_config.template.yml │ │ ├── symbol.py │ │ ├── text_utils.py │ │ ├── tools │ │ │ ├── __init__.py │ │ │ ├── cmd_tools.py │ │ │ ├── config_tools.py │ │ │ ├── file_tools.py │ │ │ ├── jetbrains_plugin_client.py │ │ │ ├── jetbrains_tools.py │ │ │ ├── memory_tools.py │ │ │ ├── symbol_tools.py │ │ │ ├── tools_base.py │ │ │ └── workflow_tools.py │ │ └── util │ │ ├── class_decorators.py │ │ ├── exception.py │ │ ├── file_system.py │ │ ├── general.py │ │ ├── git.py │ │ ├── inspection.py │ │ ├── logging.py │ │ ├── shell.py │ │ └── thread.py │ └── solidlsp │ ├── __init__.py │ ├── .gitignore │ ├── language_servers │ │ ├── al_language_server.py │ │ ├── bash_language_server.py │ │ ├── clangd_language_server.py │ │ ├── clojure_lsp.py │ │ ├── common.py │ │ ├── csharp_language_server.py │ │ ├── dart_language_server.py │ │ ├── eclipse_jdtls.py │ │ ├── elixir_tools │ │ │ ├── __init__.py │ │ │ ├── elixir_tools.py │ │ │ └── README.md │ │ ├── elm_language_server.py │ │ ├── erlang_language_server.py │ │ ├── gopls.py │ │ ├── intelephense.py │ │ ├── jedi_server.py │ │ ├── kotlin_language_server.py │ │ ├── lua_ls.py │ │ ├── marksman.py │ │ ├── nixd_ls.py │ │ ├── omnisharp │ │ │ ├── initialize_params.json │ │ │ ├── runtime_dependencies.json │ │ │ └── workspace_did_change_configuration.json │ │ ├── omnisharp.py │ │ ├── perl_language_server.py │ │ ├── pyright_server.py │ │ ├── r_language_server.py │ │ ├── regal_server.py │ │ ├── ruby_lsp.py │ │ ├── rust_analyzer.py │ │ ├── solargraph.py │ │ ├── sourcekit_lsp.py │ │ ├── terraform_ls.py │ │ ├── typescript_language_server.py │ │ ├── vts_language_server.py │ │ └── zls.py │ ├── ls_config.py │ ├── ls_exceptions.py │ ├── ls_handler.py │ ├── ls_logger.py │ ├── ls_request.py │ ├── ls_types.py │ ├── ls_utils.py │ ├── ls.py │ ├── lsp_protocol_handler │ │ ├── lsp_constants.py │ │ ├── lsp_requests.py │ │ ├── lsp_types.py │ │ └── server.py │ ├── settings.py │ └── util │ ├── subprocess_util.py │ └── zip.py ├── test │ ├── __init__.py │ ├── conftest.py │ ├── resources │ │ └── repos │ │ ├── al │ │ │ └── test_repo │ │ │ ├── app.json │ │ │ └── src │ │ │ ├── Codeunits │ │ │ │ ├── CustomerMgt.Codeunit.al │ │ │ │ └── PaymentProcessorImpl.Codeunit.al │ │ │ ├── Enums │ │ │ │ └── CustomerType.Enum.al │ │ │ ├── Interfaces │ │ │ │ └── IPaymentProcessor.Interface.al │ │ │ ├── Pages │ │ │ │ ├── CustomerCard.Page.al │ │ │ │ └── CustomerList.Page.al │ │ │ ├── TableExtensions │ │ │ │ └── Item.TableExt.al │ │ │ └── Tables │ │ │ └── Customer.Table.al │ │ ├── bash │ │ │ └── test_repo │ │ │ ├── config.sh │ │ │ ├── main.sh │ │ │ └── utils.sh │ │ ├── clojure │ │ │ └── test_repo │ │ │ ├── deps.edn │ │ │ └── src │ │ │ └── test_app │ │ │ ├── core.clj │ │ │ └── utils.clj │ │ ├── csharp │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── Models │ │ │ │ └── Person.cs │ │ │ ├── Program.cs │ │ │ ├── serena.sln │ │ │ └── TestProject.csproj │ │ ├── dart │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── helper.dart │ │ │ │ ├── main.dart │ │ │ │ └── models.dart │ │ │ └── pubspec.yaml │ │ ├── elixir │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── examples.ex │ │ │ │ ├── ignored_dir │ │ │ │ │ └── ignored_module.ex │ │ │ │ ├── models.ex │ │ │ │ ├── services.ex │ │ │ │ ├── test_repo.ex │ │ │ │ └── utils.ex │ │ │ ├── mix.exs │ │ │ ├── mix.lock │ │ │ ├── scripts │ │ │ │ └── build_script.ex │ │ │ └── test │ │ │ ├── models_test.exs │ │ │ └── test_repo_test.exs │ │ ├── elm │ │ │ └── test_repo │ │ │ ├── elm.json │ │ │ ├── Main.elm │ │ │ └── Utils.elm │ │ ├── erlang │ │ │ └── test_repo │ │ │ ├── hello.erl │ │ │ ├── ignored_dir │ │ │ │ └── ignored_module.erl │ │ │ ├── include │ │ │ │ ├── records.hrl │ │ │ │ └── types.hrl │ │ │ ├── math_utils.erl │ │ │ ├── rebar.config │ │ │ ├── src │ │ │ │ ├── app.erl │ │ │ │ ├── models.erl │ │ │ │ ├── services.erl │ │ │ │ └── utils.erl │ │ │ └── test │ │ │ ├── models_tests.erl │ │ │ └── utils_tests.erl │ │ ├── go │ │ │ └── test_repo │ │ │ └── main.go │ │ ├── java │ │ │ └── test_repo │ │ │ ├── pom.xml │ │ │ └── src │ │ │ └── main │ │ │ └── java │ │ │ └── test_repo │ │ │ ├── Main.java │ │ │ ├── Model.java │ │ │ ├── ModelUser.java │ │ │ └── Utils.java │ │ ├── kotlin │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── build.gradle.kts │ │ │ └── src │ │ │ └── main │ │ │ └── kotlin │ │ │ └── test_repo │ │ │ ├── Main.kt │ │ │ ├── Model.kt │ │ │ ├── ModelUser.kt │ │ │ └── Utils.kt │ │ ├── lua │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── main.lua │ │ │ ├── src │ │ │ │ ├── calculator.lua │ │ │ │ └── utils.lua │ │ │ └── tests │ │ │ └── test_calculator.lua │ │ ├── markdown │ │ │ └── test_repo │ │ │ ├── api.md │ │ │ ├── CONTRIBUTING.md │ │ │ ├── guide.md │ │ │ └── README.md │ │ ├── nix │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── default.nix │ │ │ ├── flake.nix │ │ │ ├── lib │ │ │ │ └── utils.nix │ │ │ ├── modules │ │ │ │ └── example.nix │ │ │ └── scripts │ │ │ └── hello.sh │ │ ├── perl │ │ │ └── test_repo │ │ │ ├── helper.pl │ │ │ └── main.pl │ │ ├── php │ │ │ └── test_repo │ │ │ ├── helper.php │ │ │ ├── index.php │ │ │ └── simple_var.php │ │ ├── python │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── custom_test │ │ │ │ ├── __init__.py │ │ │ │ └── advanced_features.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── user_management.py │ │ │ ├── ignore_this_dir_with_postfix │ │ │ │ └── ignored_module.py │ │ │ ├── scripts │ │ │ │ ├── __init__.py │ │ │ │ └── run_app.py │ │ │ └── test_repo │ │ │ ├── __init__.py │ │ │ ├── complex_types.py │ │ │ ├── models.py │ │ │ ├── name_collisions.py │ │ │ ├── nested_base.py │ │ │ ├── nested.py │ │ │ ├── overloaded.py │ │ │ ├── services.py │ │ │ ├── utils.py │ │ │ └── variables.py │ │ ├── r │ │ │ └── test_repo │ │ │ ├── .Rbuildignore │ │ │ ├── DESCRIPTION │ │ │ ├── examples │ │ │ │ └── analysis.R │ │ │ ├── NAMESPACE │ │ │ └── R │ │ │ ├── models.R │ │ │ └── utils.R │ │ ├── rego │ │ │ └── test_repo │ │ │ ├── policies │ │ │ │ ├── authz.rego │ │ │ │ └── validation.rego │ │ │ └── utils │ │ │ └── helpers.rego │ │ ├── ruby │ │ │ └── test_repo │ │ │ ├── .solargraph.yml │ │ │ ├── examples │ │ │ │ └── user_management.rb │ │ │ ├── lib.rb │ │ │ ├── main.rb │ │ │ ├── models.rb │ │ │ ├── nested.rb │ │ │ ├── services.rb │ │ │ └── variables.rb │ │ ├── rust │ │ │ ├── test_repo │ │ │ │ ├── Cargo.lock │ │ │ │ ├── Cargo.toml │ │ │ │ └── src │ │ │ │ ├── lib.rs │ │ │ │ └── main.rs │ │ │ └── test_repo_2024 │ │ │ ├── Cargo.lock │ │ │ ├── Cargo.toml │ │ │ └── src │ │ │ ├── lib.rs │ │ │ └── main.rs │ │ ├── swift │ │ │ └── test_repo │ │ │ ├── Package.swift │ │ │ └── src │ │ │ ├── main.swift │ │ │ └── utils.swift │ │ ├── terraform │ │ │ └── test_repo │ │ │ ├── data.tf │ │ │ ├── main.tf │ │ │ ├── outputs.tf │ │ │ └── variables.tf │ │ ├── typescript │ │ │ └── test_repo │ │ │ ├── .serena │ │ │ │ └── project.yml │ │ │ ├── index.ts │ │ │ ├── tsconfig.json │ │ │ └── use_helper.ts │ │ └── zig │ │ └── test_repo │ │ ├── .gitignore │ │ ├── build.zig │ │ ├── src │ │ │ ├── calculator.zig │ │ │ ├── main.zig │ │ │ └── math_utils.zig │ │ └── zls.json │ ├── serena │ │ ├── __init__.py │ │ ├── __snapshots__ │ │ │ └── test_symbol_editing.ambr │ │ ├── config │ │ │ ├── __init__.py │ │ │ └── test_serena_config.py │ │ ├── test_edit_marker.py │ │ ├── test_mcp.py │ │ ├── test_serena_agent.py │ │ ├── test_symbol_editing.py │ │ ├── test_symbol.py │ │ ├── test_text_utils.py │ │ ├── test_tool_parameter_types.py │ │ └── util │ │ ├── test_exception.py │ │ └── test_file_system.py │ └── solidlsp │ ├── al │ │ └── test_al_basic.py │ ├── bash │ │ ├── __init__.py │ │ └── test_bash_basic.py │ ├── clojure │ │ ├── __init__.py │ │ └── test_clojure_basic.py │ ├── csharp │ │ └── test_csharp_basic.py │ ├── dart │ │ ├── __init__.py │ │ └── test_dart_basic.py │ ├── elixir │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_elixir_basic.py │ │ ├── test_elixir_ignored_dirs.py │ │ ├── test_elixir_integration.py │ │ └── test_elixir_symbol_retrieval.py │ ├── elm │ │ └── test_elm_basic.py │ ├── erlang │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_erlang_basic.py │ │ ├── test_erlang_ignored_dirs.py │ │ └── test_erlang_symbol_retrieval.py │ ├── go │ │ └── test_go_basic.py │ ├── java │ │ └── test_java_basic.py │ ├── kotlin │ │ └── test_kotlin_basic.py │ ├── lua │ │ └── test_lua_basic.py │ ├── markdown │ │ ├── __init__.py │ │ └── test_markdown_basic.py │ ├── nix │ │ └── test_nix_basic.py │ ├── perl │ │ └── test_perl_basic.py │ ├── php │ │ └── test_php_basic.py │ ├── python │ │ ├── test_python_basic.py │ │ ├── test_retrieval_with_ignored_dirs.py │ │ └── test_symbol_retrieval.py │ ├── r │ │ ├── __init__.py │ │ └── test_r_basic.py │ ├── rego │ │ └── test_rego_basic.py │ ├── ruby │ │ ├── test_ruby_basic.py │ │ └── test_ruby_symbol_retrieval.py │ ├── rust │ │ ├── test_rust_2024_edition.py │ │ └── test_rust_basic.py │ ├── swift │ │ └── test_swift_basic.py │ ├── terraform │ │ └── test_terraform_basic.py │ ├── typescript │ │ └── test_typescript_basic.py │ ├── util │ │ └── test_zip.py │ └── zig │ └── test_zig_basic.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # latest Status of the `main` branch. Changes prior to the next official version change will appear here. * Language support: * **Add support for Elm** via @elm-tooling/elm-language-server (automatically downloads if not installed; requires Elm compiler) * **Add support for Perl** via Perl::LanguageServer with LSP integration for .pl, .pm, and .t files * **Add support for AL (Application Language)** for Microsoft Dynamics 365 Business Central development. Requires VS Code AL extension (ms-dynamics-smb.al). * **Add support for R** via the R languageserver package with LSP integration, performance optimizations, and fallback symbol extraction * **Add support for Zig** via ZLS (cross-file references may not fully work on Windows) * **Add support for Lua** via lua-language-server * **Add support for Nix** requires nixd installation (Windows not supported) * **Dart now officially supported**: Dart was always working, but now tests were added, and it is promoted to "officially supported" * **Rust now uses already installed rustup**: The rust-analyzer is no longer bundled with Serena. Instead, it uses the rust-analyzer from your Rust toolchain managed by rustup. This ensures compatibility with your Rust version and eliminates outdated bundled binaries. * **Kotlin now officially supported**: We now use the official Kotlin LS, tests run through and performance is good, even though the LS is in an early development stage. * **Add support for Erlang** experimental, may hang or be slow, uses the recently archived [erlang_ls](https://github.com/erlang-ls/erlang_ls) * **Ruby dual language server support**: Added ruby-lsp as the modern primary Ruby language server. Solargraph remains available as an experimental legacy option. ruby-lsp supports both .rb and .erb files, while Solargraph supports .rb files only. * Client support: * New mode `oaicompat-agent` and extensions in the openai tool compatibility, **permitting Serena to work with llama.cpp** * General: * Various fixes related to indexing, special paths and determation of ignored paths * Decreased `TOOL_DEFAULT_MAX_ANSWER_LENGTH` to be in accordance with (below) typical max-tokens configurations * Allow passing language server specific settings through `ls_specific_settings` field (in `serena_config.yml`) # 0.1.4 ## Summary This likely is the last release before the stable version 1.0.0 which will come together with the jetbrains IDE extension. We release it for users who install Serena from a tag, since the last tag cannot be installed due to a breaking change in the mcp dependency (see #381). Since the last release, several new languages were supported, and the Serena CLI and configurability were significantly extended. We thank all external contributors who made a lot of the improvements possible! * General: * **Initial instructions no longer need to be loaded by the user** * Significantly extended CLI * Removed `replace_regex` tool from `ide-assistant` and `codex` contexts. The current string replacement tool in Claude Code seems to be sufficiently efficient and is better integrated with the IDE. Users who want to enable `replace_regex` can do so by customizing the context. * Configuration: * Simplify customization of modes and contexts, including CLI support. * Possibility to customize the system prompt and outputs of simple tools, including CLI support. * Possibility to override tool descriptions through the context YAML. * Prompt templates are now automatically adapted to the enabled tools. * Several tools are now excluded by default, need to be included explicitly. * New context for ChatGPT * Language servers: * Reliably detect language server termination and propagate the respective error all the way back to the tool application, where an unexpected termination is handled by restarting the language server and subsequently retrying the tool application. * **Add support for Swift** * **Add support for Bash** * Enhance Solargraph (Ruby) integration * Automatic Rails project detection via config/application.rb, Rakefile, and Gemfile analysis * Ruby/Rails-specific exclude patterns for improved indexing performance (vendor/, .bundle/, tmp/, log/, coverage/) * Enhanced error handling with detailed diagnostics and Ruby manager-specific installation instructions (rbenv, RVM, asdf) * Improved LSP capability negotiation and analysis completion detection * Better Bundler and Solargraph installation error messages with clear resolution steps Fixes: * Ignore `.git` in check for ignored paths and improve performance of `find_all_non_ignored_files` * Fix language server startup issues on Windows when using Claude Code (which was due to default shell reconfiguration imposed by Claude Code) * Additional wait for initialization in C# language server before requesting references, allowing cross-file references to be found. # 0.1.3 ## Summary This is the first release of Serena to pypi. Since the last release, we have greatly improved stability and performance, as well as extended functionality, improved editing tools and included support for several new languages. * **Reduce the use of asyncio to a minimum**, improving stability and reducing the need for workarounds * Switch to newly developed fully synchronous LSP library `solidlsp` (derived from `multilspy`), removing our fork of `multilspy` (src/multilspy) * Switch from fastapi (which uses asyncio) to Flask in the Serena dashboard * The MCP server is the only asyncio-based component now, which resolves cross-component loop contamination, such that process isolation is no longer required. Neither are non-graceful shutdowns on Windows. * **Improved editing tools**: The editing logic was simplified and improved, making it more robust. * The "minimal indentation" logic was removed, because LLMs did not understand it. * The logic for the insertion of empty lines was improved (mostly controlled by the LLM now) * Add a task queue for the agent, which is executed in a separate and thread and * allows the language server to be initialized in the background, making the MCP server respond to requests immediately upon startup, * ensures that all tool executions are fully synchronized (executed linearly). * `SearchForPatternTool`: Better default, extended parameters and description for restricting the search * Language support: * Better support for C# by switching from `omnisharp` to Microsoft's official C# language server. * **Add support for Clojure, Elixir and Terraform. New language servers for C# and typescript.** * Experimental language server implementations can now be accessed by users through configuring the `language` field * Configuration: * Add option `web_dashboard_open_on_launch` (allowing the dashboard to be enabled without opening a browser window) * Add options `record_tool_usage_stats` and `token_count_estimator` * Serena config, modes and contexts can now be adjusted from the user's home directory. * Extended CLI to help with configuration * Dashboard: * Displaying tool usage statistics if enabled in the config Fixes: * Fix `ExecuteShellCommandTool` and `GetCurrentConfigTool` hanging on Windows * Fix project activation by name via `--project` not working (was broken in previous release) * Improve handling of indentation and newlines in symbolic editing tools * Fix `InsertAfterSymbolTool` failing for insertions at the end of a file that did not end with a newline * Fix `InsertBeforeSymbolTool` inserting in the wrong place in the absence of empty lines above the reference symbol * Fix `ReplaceSymbolBodyTool` changing whitespace before/after the symbol * Fix repository indexing not following links and catch exceptions during indexing, allowing indexing to continue even if unexpected errors occur for individual files. * Fix `ImportError` in Ruby language server. * Fix some issues with gitignore matching and interpreting of regexes in `search_for_pattern` tool. # 2025-06-20 * **Overhaul and major improvement of editing tools!** This represents a very important change in Serena. Symbols can now be addressed by their `name_path` (including nested ones) and we introduced a regex-based replaced tools. We tuned the prompts and tested the new editing mechanism. It is much more reliable, flexible, and at the same time uses fewer tokens. The line-replacement tools are disabled by default and deprecated, we will likely remove them soon. * **Better multi-project support and zero-config setup**: We significantly simplified the config setup, you no longer need to manually create `project.yaml` for each project. Project activation is now always available. Any project can now be activated by just asking the LLM to do so and passing the path to a repo. * Dashboard as web app and possibility to shut down Serena from it (or the old log GUI). * Possibility to index your project beforehand, accelerating Serena's tools. * Initial prompt for project supported (has to be added manually for the moment) * Massive performance improvement of pattern search tool * Use **process isolation** to fix stability issues and deadlocks (see #170). This uses separate process for the MCP server, the Serena agent and the dashboard in order to fix asyncio-related issues. # 2025-05-24 * Important new feature: **configurability of mode and context**, allowing better integration in a variety of clients. See corresponding section in readme - Serena can now be integrated in IDE assistants in a more productive way. You can now also do things like switching to one-shot planning mode, ask to plan something (which will create a memory), then switch to interactive editing mode in the next conversation and work through the plan read from the memory. * Some improvements to prompts. # 2025-05-21 **Significant improvement in symbol finding!** * Serena core: * `FindSymbolTool` now can look for symbols by specifying paths to them, not just the symbol name * Language Servers: * Fixed `gopls` initialization * Symbols retrieved through the symbol tree or through overview methods now are linked to their parents # 2025-05-19 * Serena core: * Bugfix in `FindSymbolTool` (a bug fixed in LS) * Fix in `ListDirTool`: Do not ignore files with extensions not understood by the language server, only skip ignored directories (error introduced in previous version) * Merged the two overview tools (for directories and files) into a single one: `GetSymbolsOverviewTool` * One-click setup for Cline enabled * `SearchForPatternTool` can now (optionally) search in the entire project * New tool `RestartLanguageServerTool` for restarting the language server (in case of other sources of editing apart from Serena) * Fix `CheckOnboardingPerformedTool`: * Tool description was incompatible with project change * Returned result was not as useful as it could be (now added list of memories) * Language Servers: * Add further file extensions considered by the language servers for Python (.pyi), JavaScript (.jsx) and TypeScript (.tsx, .jsx) * Updated multilspy, adding support for Kotlin, Dart and C/C++ and several improvements. * Added support for PHP # 2025-04-07 > **Breaking Config Changes**: make sure to set `ignore_all_files_in_gitignore`, remove `ignore_dirs` > and (optionally) set `ignore_paths` in your project configs. See [updated config template](myproject.template.yml) * Serena core: * New tool: FindReferencingCodeSnippets * Adjusted prompt in CreateTextFileTool to prevent writing partial content (see [here](https://www.reddit.com/r/ClaudeAI/comments/1jpavtm/comment/mloek1x/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button)). * FindSymbolTool: allow passing a file for restricting search, not just a directory (Gemini was too dumb to pass directories) * Native support for gitignore files for configuring files to be ignored by serena. See also in *Language Servers* section below. * **Major Feature**: Allow Serena to switch between projects (project activation) * Add central Serena configuration in `serena_config.yml`, which * contains the list of available projects * allows to configure whether project activation is enabled * now contains the GUI logging configuration (project configurations no longer do) * Add new tools `activate_project` and `get_active_project` * Providing a project configuration file in the launch parameters is now optional * Logging: * Improve error reporting in case of initialization failure: open a new GUI log window showing the error or ensure that the existing log window remains visible for some time * Language Servers: * Fix C# language server initialization issue when the project path contains spaces * Native support for gitignore in overview, document-tree and find_references operations. This is an **important** addition, since previously things like `venv` and `node_modules` were scanned and were likely responsible for slowness of tools and even server crashes (presumably due to OOM errors). * Agno: * Fix Agno reloading mechanism causing failures when initializing the sqlite memory database #8 * Fix Serena GUI log window not capturing logs after initialization # 2025-04-01 Initial public version ``` -------------------------------------------------------------------------------- /test/solidlsp/csharp/test_csharp_basic.py: -------------------------------------------------------------------------------- ```python import os import tempfile from pathlib import Path from typing import cast from unittest.mock import Mock, patch import pytest from solidlsp import SolidLanguageServer from solidlsp.language_servers.csharp_language_server import ( CSharpLanguageServer, breadth_first_file_scan, find_solution_or_project_file, ) from solidlsp.ls_config import Language, LanguageServerConfig from solidlsp.ls_utils import SymbolUtils from solidlsp.settings import SolidLSPSettings @pytest.mark.csharp class TestCSharpLanguageServer: @pytest.mark.parametrize("language_server", [Language.CSHARP], indirect=True) def test_find_symbol(self, language_server: SolidLanguageServer) -> None: """Test finding symbols in the full symbol tree.""" symbols = language_server.request_full_symbol_tree() assert SymbolUtils.symbol_tree_contains_name(symbols, "Program"), "Program class not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "Calculator"), "Calculator class not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "Add"), "Add method not found in symbol tree" @pytest.mark.parametrize("language_server", [Language.CSHARP], indirect=True) def test_get_document_symbols(self, language_server: SolidLanguageServer) -> None: """Test getting document symbols from a C# file.""" file_path = os.path.join("Program.cs") symbols = language_server.request_document_symbols(file_path) # Check that we have symbols assert len(symbols) > 0 # Flatten the symbols if they're nested if isinstance(symbols[0], list): symbols = symbols[0] # Look for expected classes class_names = [s.get("name") for s in symbols if s.get("kind") == 5] # 5 is class assert "Program" in class_names assert "Calculator" in class_names @pytest.mark.parametrize("language_server", [Language.CSHARP], indirect=True) def test_find_referencing_symbols(self, language_server: SolidLanguageServer) -> None: """Test finding references using symbol selection range.""" file_path = os.path.join("Program.cs") symbols = language_server.request_document_symbols(file_path) add_symbol = None # Handle nested symbol structure symbol_list = symbols[0] if symbols and isinstance(symbols[0], list) else symbols for sym in symbol_list: if sym.get("name") == "Add": add_symbol = sym break assert add_symbol is not None, "Could not find 'Add' method symbol in Program.cs" sel_start = add_symbol["selectionRange"]["start"] refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"] + 1) assert any( "Program.cs" in ref.get("relativePath", "") for ref in refs ), "Program.cs should reference Add method (tried all positions in selectionRange)" @pytest.mark.parametrize("language_server", [Language.CSHARP], indirect=True) def test_nested_namespace_symbols(self, language_server: SolidLanguageServer) -> None: """Test getting symbols from nested namespace.""" file_path = os.path.join("Models", "Person.cs") symbols = language_server.request_document_symbols(file_path) # Check that we have symbols assert len(symbols) > 0 # Flatten the symbols if they're nested if isinstance(symbols[0], list): symbols = symbols[0] # Check that we have the Person class assert any(s.get("name") == "Person" and s.get("kind") == 5 for s in symbols) # Check for properties and methods symbol_names = [s.get("name") for s in symbols] assert "Name" in symbol_names assert "Age" in symbol_names assert "Email" in symbol_names assert "ToString" in symbol_names assert "IsAdult" in symbol_names @pytest.mark.parametrize("language_server", [Language.CSHARP], indirect=True) def test_find_referencing_symbols_across_files(self, language_server: SolidLanguageServer) -> None: """Test finding references to Calculator.Subtract method across files.""" # First, find the Subtract method in Program.cs file_path = os.path.join("Program.cs") symbols = language_server.request_document_symbols(file_path) # Flatten the symbols if they're nested symbol_list = symbols[0] if symbols and isinstance(symbols[0], list) else symbols subtract_symbol = None for sym in symbol_list: if sym.get("name") == "Subtract": subtract_symbol = sym break assert subtract_symbol is not None, "Could not find 'Subtract' method symbol in Program.cs" # Get references to the Subtract method sel_start = subtract_symbol["selectionRange"]["start"] refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"] + 1) # Should find references in both Program.cs and Models/Person.cs ref_files = cast(list[str], [ref.get("relativePath", "") for ref in refs]) print(f"Found references: {refs}") print(f"Reference files: {ref_files}") # Check that we have references from both files assert any("Program.cs" in ref_file for ref_file in ref_files), "Should find reference in Program.cs" assert any( os.path.join("Models", "Person.cs") in ref_file for ref_file in ref_files ), "Should find reference in Models/Person.cs where Calculator.Subtract is called" # check for a second time, since the first call may trigger initialization and change the state of the LS refs_second_call = language_server.request_references(file_path, sel_start["line"], sel_start["character"] + 1) assert refs_second_call == refs, "Second call to request_references should return the same results" @pytest.mark.csharp class TestCSharpSolutionProjectOpening: """Test C# language server solution and project opening functionality.""" def test_breadth_first_file_scan(self): """Test that breadth_first_file_scan finds files in breadth-first order.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create test directory structure (temp_path / "file1.txt").touch() (temp_path / "subdir1").mkdir() (temp_path / "subdir1" / "file2.txt").touch() (temp_path / "subdir2").mkdir() (temp_path / "subdir2" / "file3.txt").touch() (temp_path / "subdir1" / "subdir3").mkdir() (temp_path / "subdir1" / "subdir3" / "file4.txt").touch() # Scan files files = list(breadth_first_file_scan(str(temp_path))) filenames = [os.path.basename(f) for f in files] # Should find all files assert len(files) == 4 assert "file1.txt" in filenames assert "file2.txt" in filenames assert "file3.txt" in filenames assert "file4.txt" in filenames # file1.txt should be found first (breadth-first) assert filenames[0] == "file1.txt" def test_find_solution_or_project_file_with_solution(self): """Test that find_solution_or_project_file prefers .sln files.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create both .sln and .csproj files solution_file = temp_path / "MySolution.sln" project_file = temp_path / "MyProject.csproj" solution_file.touch() project_file.touch() result = find_solution_or_project_file(str(temp_path)) # Should prefer .sln file assert result == str(solution_file) def test_find_solution_or_project_file_with_project_only(self): """Test that find_solution_or_project_file falls back to .csproj files.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create only .csproj file project_file = temp_path / "MyProject.csproj" project_file.touch() result = find_solution_or_project_file(str(temp_path)) # Should return .csproj file assert result == str(project_file) def test_find_solution_or_project_file_with_nested_files(self): """Test that find_solution_or_project_file finds files in subdirectories.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create nested structure (temp_path / "src").mkdir() solution_file = temp_path / "src" / "MySolution.sln" solution_file.touch() result = find_solution_or_project_file(str(temp_path)) # Should find nested .sln file assert result == str(solution_file) def test_find_solution_or_project_file_returns_none_when_no_files(self): """Test that find_solution_or_project_file returns None when no .sln or .csproj files exist.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create some other files (temp_path / "readme.txt").touch() (temp_path / "other.cs").touch() result = find_solution_or_project_file(str(temp_path)) # Should return None assert result is None def test_find_solution_or_project_file_prefers_solution_breadth_first(self): """Test that solution files are preferred even when deeper in the tree.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Create .csproj at root and .sln in subdirectory project_file = temp_path / "MyProject.csproj" project_file.touch() (temp_path / "src").mkdir() solution_file = temp_path / "src" / "MySolution.sln" solution_file.touch() result = find_solution_or_project_file(str(temp_path)) # Should still prefer .sln file even though it's deeper assert result == str(solution_file) @patch("solidlsp.language_servers.csharp_language_server.CSharpLanguageServer._ensure_server_installed") @patch("solidlsp.language_servers.csharp_language_server.CSharpLanguageServer._start_server") def test_csharp_language_server_logs_solution_discovery(self, mock_start_server, mock_ensure_server_installed): """Test that CSharpLanguageServer logs solution/project discovery during initialization.""" mock_ensure_server_installed.return_value = ("/usr/bin/dotnet", "/path/to/server.dll") # Create test directory with solution file with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) solution_file = temp_path / "TestSolution.sln" solution_file.touch() # Mock logger to capture log messages mock_logger = Mock() mock_config = Mock(spec=LanguageServerConfig) mock_config.ignored_paths = [] # Create CSharpLanguageServer instance mock_settings = Mock(spec=SolidLSPSettings) mock_settings.ls_resources_dir = "/tmp/test_ls_resources" mock_settings.project_data_relative_path = "project_data" CSharpLanguageServer(mock_config, mock_logger, str(temp_path), mock_settings) # Verify that logger was called with solution file discovery mock_logger.log.assert_any_call(f"Found solution/project file: {solution_file}", 20) # logging.INFO @patch("solidlsp.language_servers.csharp_language_server.CSharpLanguageServer._ensure_server_installed") @patch("solidlsp.language_servers.csharp_language_server.CSharpLanguageServer._start_server") def test_csharp_language_server_logs_no_solution_warning(self, mock_start_server, mock_ensure_server_installed): """Test that CSharpLanguageServer logs warning when no solution/project files are found.""" # Mock the server installation mock_ensure_server_installed.return_value = ("/usr/bin/dotnet", "/path/to/server.dll") # Create empty test directory with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Mock logger to capture log messages mock_logger = Mock() mock_config = Mock(spec=LanguageServerConfig) mock_config.ignored_paths = [] # Create CSharpLanguageServer instance mock_settings = Mock(spec=SolidLSPSettings) mock_settings.ls_resources_dir = "/tmp/test_ls_resources" mock_settings.project_data_relative_path = "project_data" CSharpLanguageServer(mock_config, mock_logger, str(temp_path), mock_settings) # Verify that logger was called with warning about no solution/project files mock_logger.log.assert_any_call( "No .sln or .csproj file found, language server will attempt auto-discovery", 30 # logging.WARNING ) def test_solution_and_project_opening_with_real_test_repo(self): """Test solution and project opening with the actual C# test repository.""" # Get the C# test repo path test_repo_path = Path(__file__).parent.parent.parent / "resources" / "repos" / "csharp" / "test_repo" if not test_repo_path.exists(): pytest.skip("C# test repository not found") # Test solution/project discovery in the real test repo result = find_solution_or_project_file(str(test_repo_path)) # Should find either .sln or .csproj file assert result is not None assert result.endswith((".sln", ".csproj")) # Verify the file actually exists assert os.path.exists(result) ``` -------------------------------------------------------------------------------- /test/solidlsp/python/test_python_basic.py: -------------------------------------------------------------------------------- ```python """ Basic integration tests for the language server functionality. These tests validate the functionality of the language server APIs like request_references using the test repository. """ import os import pytest from serena.project import Project from serena.text_utils import LineType from solidlsp import SolidLanguageServer from solidlsp.ls_config import Language @pytest.mark.python class TestLanguageServerBasics: """Test basic functionality of the language server.""" @pytest.mark.parametrize("language_server", [Language.PYTHON], indirect=True) def test_request_references_user_class(self, language_server: SolidLanguageServer) -> None: """Test request_references on the User class.""" # Get references to the User class in models.py file_path = os.path.join("test_repo", "models.py") # Line 31 contains the User class definition # Use selectionRange only symbols = language_server.request_document_symbols(file_path) user_symbol = next((s for s in symbols[0] if s.get("name") == "User"), None) if not user_symbol or "selectionRange" not in user_symbol: raise AssertionError("User symbol or its selectionRange not found") sel_start = user_symbol["selectionRange"]["start"] references = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) assert len(references) > 1, "User class should be referenced in multiple files (using selectionRange if present)" @pytest.mark.parametrize("language_server", [Language.PYTHON], indirect=True) def test_request_references_item_class(self, language_server: SolidLanguageServer) -> None: """Test request_references on the Item class.""" # Get references to the Item class in models.py file_path = os.path.join("test_repo", "models.py") # Line 56 contains the Item class definition # Use selectionRange only symbols = language_server.request_document_symbols(file_path) item_symbol = next((s for s in symbols[0] if s.get("name") == "Item"), None) if not item_symbol or "selectionRange" not in item_symbol: raise AssertionError("Item symbol or its selectionRange not found") sel_start = item_symbol["selectionRange"]["start"] references = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) services_references = [ref for ref in references if "services.py" in ref["uri"]] assert len(services_references) > 0, "At least one reference should be in services.py (using selectionRange if present)" @pytest.mark.parametrize("language_server", [Language.PYTHON], indirect=True) def test_request_references_function_parameter(self, language_server: SolidLanguageServer) -> None: """Test request_references on a function parameter.""" # Get references to the id parameter in get_user method file_path = os.path.join("test_repo", "services.py") # Line 24 contains the get_user method with id parameter # Use selectionRange only symbols = language_server.request_document_symbols(file_path) get_user_symbol = next((s for s in symbols[0] if s.get("name") == "get_user"), None) if not get_user_symbol or "selectionRange" not in get_user_symbol: raise AssertionError("get_user symbol or its selectionRange not found") sel_start = get_user_symbol["selectionRange"]["start"] references = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) assert len(references) > 0, "id parameter should be referenced within the method (using selectionRange if present)" @pytest.mark.parametrize("language_server", [Language.PYTHON], indirect=True) def test_request_references_create_user_method(self, language_server: SolidLanguageServer) -> None: # Get references to the create_user method in UserService file_path = os.path.join("test_repo", "services.py") # Line 15 contains the create_user method definition # Use selectionRange only symbols = language_server.request_document_symbols(file_path) create_user_symbol = next((s for s in symbols[0] if s.get("name") == "create_user"), None) if not create_user_symbol or "selectionRange" not in create_user_symbol: raise AssertionError("create_user symbol or its selectionRange not found") sel_start = create_user_symbol["selectionRange"]["start"] references = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) assert len(references) > 1, "Should get valid references for create_user (using selectionRange if present)" class TestProjectBasics: @pytest.mark.parametrize("project", [Language.PYTHON], indirect=True) def test_retrieve_content_around_line(self, project: Project) -> None: """Test retrieve_content_around_line functionality with various scenarios.""" file_path = os.path.join("test_repo", "models.py") # Scenario 1: Just a single line (User class definition) line_31 = project.retrieve_content_around_line(file_path, 31) assert len(line_31.lines) == 1 assert "class User(BaseModel):" in line_31.lines[0].line_content assert line_31.lines[0].line_number == 31 assert line_31.lines[0].match_type == LineType.MATCH # Scenario 2: Context above and below with_context_around_user = project.retrieve_content_around_line(file_path, 31, 2, 2) assert len(with_context_around_user.lines) == 5 # Check line content assert "class User(BaseModel):" in with_context_around_user.matched_lines[0].line_content assert with_context_around_user.num_matched_lines == 1 assert " User model representing a system user." in with_context_around_user.lines[4].line_content # Check line numbers assert with_context_around_user.lines[0].line_number == 29 assert with_context_around_user.lines[1].line_number == 30 assert with_context_around_user.lines[2].line_number == 31 assert with_context_around_user.lines[3].line_number == 32 assert with_context_around_user.lines[4].line_number == 33 # Check match types assert with_context_around_user.lines[0].match_type == LineType.BEFORE_MATCH assert with_context_around_user.lines[1].match_type == LineType.BEFORE_MATCH assert with_context_around_user.lines[2].match_type == LineType.MATCH assert with_context_around_user.lines[3].match_type == LineType.AFTER_MATCH assert with_context_around_user.lines[4].match_type == LineType.AFTER_MATCH # Scenario 3a: Only context above with_context_above = project.retrieve_content_around_line(file_path, 31, 3, 0) assert len(with_context_above.lines) == 4 assert "return cls(id=id, name=name)" in with_context_above.lines[0].line_content assert "class User(BaseModel):" in with_context_above.matched_lines[0].line_content assert with_context_above.num_matched_lines == 1 # Check line numbers assert with_context_above.lines[0].line_number == 28 assert with_context_above.lines[1].line_number == 29 assert with_context_above.lines[2].line_number == 30 assert with_context_above.lines[3].line_number == 31 # Check match types assert with_context_above.lines[0].match_type == LineType.BEFORE_MATCH assert with_context_above.lines[1].match_type == LineType.BEFORE_MATCH assert with_context_above.lines[2].match_type == LineType.BEFORE_MATCH assert with_context_above.lines[3].match_type == LineType.MATCH # Scenario 3b: Only context below with_context_below = project.retrieve_content_around_line(file_path, 31, 0, 3) assert len(with_context_below.lines) == 4 assert "class User(BaseModel):" in with_context_below.matched_lines[0].line_content assert with_context_below.num_matched_lines == 1 assert with_context_below.lines[0].line_number == 31 assert with_context_below.lines[1].line_number == 32 assert with_context_below.lines[2].line_number == 33 assert with_context_below.lines[3].line_number == 34 # Check match types assert with_context_below.lines[0].match_type == LineType.MATCH assert with_context_below.lines[1].match_type == LineType.AFTER_MATCH assert with_context_below.lines[2].match_type == LineType.AFTER_MATCH assert with_context_below.lines[3].match_type == LineType.AFTER_MATCH # Scenario 4a: Edge case - context above but line is at 0 first_line_with_context_around = project.retrieve_content_around_line(file_path, 0, 2, 1) assert len(first_line_with_context_around.lines) <= 4 # Should have at most 4 lines (line 0 + 1 below + up to 2 above) assert first_line_with_context_around.lines[0].line_number <= 2 # First line should be at most line 2 # Check match type for the target line for line in first_line_with_context_around.lines: if line.line_number == 0: assert line.match_type == LineType.MATCH elif line.line_number < 0: assert line.match_type == LineType.BEFORE_MATCH else: assert line.match_type == LineType.AFTER_MATCH # Scenario 4b: Edge case - context above but line is at 1 second_line_with_context_above = project.retrieve_content_around_line(file_path, 1, 3, 1) assert len(second_line_with_context_above.lines) <= 5 # Should have at most 5 lines (line 1 + 1 below + up to 3 above) assert second_line_with_context_above.lines[0].line_number <= 1 # First line should be at most line 1 # Check match type for the target line for line in second_line_with_context_above.lines: if line.line_number == 1: assert line.match_type == LineType.MATCH elif line.line_number < 1: assert line.match_type == LineType.BEFORE_MATCH else: assert line.match_type == LineType.AFTER_MATCH # Scenario 4c: Edge case - context below but line is at the end of file # First get the total number of lines in the file all_content = project.read_file(file_path) total_lines = len(all_content.split("\n")) last_line_with_context_around = project.retrieve_content_around_line(file_path, total_lines - 1, 1, 3) assert len(last_line_with_context_around.lines) <= 5 # Should have at most 5 lines (last line + 1 above + up to 3 below) assert last_line_with_context_around.lines[-1].line_number >= total_lines - 4 # Last line should be at least total_lines - 4 # Check match type for the target line for line in last_line_with_context_around.lines: if line.line_number == total_lines - 1: assert line.match_type == LineType.MATCH elif line.line_number < total_lines - 1: assert line.match_type == LineType.BEFORE_MATCH else: assert line.match_type == LineType.AFTER_MATCH @pytest.mark.parametrize("project", [Language.PYTHON], indirect=True) def test_search_files_for_pattern(self, project: Project) -> None: """Test search_files_for_pattern with various patterns and glob filters.""" # Test 1: Search for class definitions across all files class_pattern = r"class\s+\w+\s*(?:\([^{]*\)|:)" matches = project.search_source_files_for_pattern(class_pattern) assert len(matches) > 0 # Should find multiple classes like User, Item, BaseModel, etc. assert len(matches) >= 5 # Test 2: Search for specific class with include glob user_class_pattern = r"class\s+User\s*(?:\([^{]*\)|:)" matches = project.search_source_files_for_pattern(user_class_pattern, paths_include_glob="**/models.py") assert len(matches) == 1 # Should only find User class in models.py assert matches[0].source_file_path is not None assert "models.py" in matches[0].source_file_path # Test 3: Search for method definitions with exclude glob method_pattern = r"def\s+\w+\s*\([^)]*\):" matches = project.search_source_files_for_pattern(method_pattern, paths_exclude_glob="**/models.py") assert len(matches) > 0 # Should find methods in services.py but not in models.py assert all(match.source_file_path is not None and "models.py" not in match.source_file_path for match in matches) # Test 4: Search for specific method with both include and exclude globs create_user_pattern = r"def\s+create_user\s*\([^)]*\)(?:\s*->[^:]+)?:" matches = project.search_source_files_for_pattern( create_user_pattern, paths_include_glob="**/*.py", paths_exclude_glob="**/models.py" ) assert len(matches) == 1 # Should only find create_user in services.py assert matches[0].source_file_path is not None assert "services.py" in matches[0].source_file_path # Test 5: Search for a pattern that should appear in multiple files init_pattern = r"def\s+__init__\s*\([^)]*\):" matches = project.search_source_files_for_pattern(init_pattern) assert len(matches) > 1 # Should find __init__ in multiple classes # Should find __init__ in both models.py and services.py assert any(match.source_file_path is not None and "models.py" in match.source_file_path for match in matches) assert any(match.source_file_path is not None and "services.py" in match.source_file_path for match in matches) # Test 6: Search with a pattern that should have no matches no_match_pattern = r"def\s+this_method_does_not_exist\s*\([^)]*\):" matches = project.search_source_files_for_pattern(no_match_pattern) assert len(matches) == 0 ``` -------------------------------------------------------------------------------- /src/serena/util/file_system.py: -------------------------------------------------------------------------------- ```python import logging import os from collections.abc import Callable, Iterator from dataclasses import dataclass, field from pathlib import Path from typing import NamedTuple import pathspec from pathspec import PathSpec from sensai.util.logging import LogTime log = logging.getLogger(__name__) class ScanResult(NamedTuple): """Result of scanning a directory.""" directories: list[str] files: list[str] def scan_directory( path: str, recursive: bool = False, relative_to: str | None = None, is_ignored_dir: Callable[[str], bool] | None = None, is_ignored_file: Callable[[str], bool] | None = None, ) -> ScanResult: """ :param path: the path to scan :param recursive: whether to recursively scan subdirectories :param relative_to: the path to which the results should be relative to; if None, provide absolute paths :param is_ignored_dir: a function with which to determine whether the given directory (abs. path) shall be ignored :param is_ignored_file: a function with which to determine whether the given file (abs. path) shall be ignored :return: the list of directories and files """ if is_ignored_file is None: is_ignored_file = lambda x: False if is_ignored_dir is None: is_ignored_dir = lambda x: False files = [] directories = [] abs_path = os.path.abspath(path) rel_base = os.path.abspath(relative_to) if relative_to else None try: with os.scandir(abs_path) as entries: for entry in entries: try: entry_path = entry.path if rel_base: result_path = os.path.relpath(entry_path, rel_base) else: result_path = entry_path if entry.is_file(): if not is_ignored_file(entry_path): files.append(result_path) elif entry.is_dir(): if not is_ignored_dir(entry_path): directories.append(result_path) if recursive: sub_result = scan_directory( entry_path, recursive=True, relative_to=relative_to, is_ignored_dir=is_ignored_dir, is_ignored_file=is_ignored_file, ) files.extend(sub_result.files) directories.extend(sub_result.directories) except PermissionError as ex: # Skip files/directories that cannot be accessed due to permission issues log.debug(f"Skipping entry due to permission error: {entry.path}", exc_info=ex) continue except PermissionError as ex: # Skip the entire directory if it cannot be accessed log.debug(f"Skipping directory due to permission error: {abs_path}", exc_info=ex) return ScanResult([], []) return ScanResult(directories, files) def find_all_non_ignored_files(repo_root: str) -> list[str]: """ Find all non-ignored files in the repository, respecting all gitignore files in the repository. :param repo_root: The root directory of the repository :return: A list of all non-ignored files in the repository """ gitignore_parser = GitignoreParser(repo_root) _, files = scan_directory( repo_root, recursive=True, is_ignored_dir=gitignore_parser.should_ignore, is_ignored_file=gitignore_parser.should_ignore ) return files @dataclass class GitignoreSpec: file_path: str """Path to the gitignore file.""" patterns: list[str] = field(default_factory=list) """List of patterns from the gitignore file. The patterns are adjusted based on the gitignore file location. """ pathspec: PathSpec = field(init=False) """Compiled PathSpec object for pattern matching.""" def __post_init__(self) -> None: """Initialize the PathSpec from patterns.""" self.pathspec = PathSpec.from_lines(pathspec.patterns.GitWildMatchPattern, self.patterns) def matches(self, relative_path: str) -> bool: """ Check if the given path matches any pattern in this gitignore spec. :param relative_path: Path to check (should be relative to repo root) :return: True if path matches any pattern """ return match_path(relative_path, self.pathspec, root_path=os.path.dirname(self.file_path)) class GitignoreParser: """ Parser for gitignore files in a repository. This class handles parsing multiple gitignore files throughout a repository and provides methods to check if paths should be ignored. """ def __init__(self, repo_root: str) -> None: """ Initialize the parser for a repository. :param repo_root: Root directory of the repository """ self.repo_root = os.path.abspath(repo_root) self.ignore_specs: list[GitignoreSpec] = [] self._load_gitignore_files() def _load_gitignore_files(self) -> None: """Load all gitignore files from the repository.""" with LogTime("Loading of .gitignore files", logger=log): for gitignore_path in self._iter_gitignore_files(): log.info("Processing .gitignore file: %s", gitignore_path) spec = self._create_ignore_spec(gitignore_path) if spec.patterns: # Only add non-empty specs self.ignore_specs.append(spec) def _iter_gitignore_files(self, follow_symlinks: bool = False) -> Iterator[str]: """ Iteratively discover .gitignore files in a top-down fashion, starting from the repository root. Directory paths are skipped if they match any already loaded ignore patterns. :return: an iterator yielding paths to .gitignore files (top-down) """ queue: list[str] = [self.repo_root] def scan(abs_path: str | None) -> Iterator[str]: for entry in os.scandir(abs_path): if entry.is_dir(follow_symlinks=follow_symlinks): queue.append(entry.path) elif entry.is_file(follow_symlinks=follow_symlinks) and entry.name == ".gitignore": yield entry.path while queue: next_abs_path = queue.pop(0) if next_abs_path != self.repo_root: rel_path = os.path.relpath(next_abs_path, self.repo_root) if self.should_ignore(rel_path): continue yield from scan(next_abs_path) def _create_ignore_spec(self, gitignore_file_path: str) -> GitignoreSpec: """ Create a GitignoreSpec from a single gitignore file. :param gitignore_file_path: Path to the .gitignore file :return: GitignoreSpec object for the gitignore patterns """ try: with open(gitignore_file_path, encoding="utf-8") as f: content = f.read() except (OSError, UnicodeDecodeError): # If we can't read the file, return an empty spec return GitignoreSpec(gitignore_file_path, []) gitignore_dir = os.path.dirname(gitignore_file_path) patterns = self._parse_gitignore_content(content, gitignore_dir) return GitignoreSpec(gitignore_file_path, patterns) def _parse_gitignore_content(self, content: str, gitignore_dir: str) -> list[str]: """ Parse gitignore content and adjust patterns based on the gitignore file location. :param content: Content of the .gitignore file :param gitignore_dir: Directory containing the .gitignore file (absolute path) :return: List of adjusted patterns """ patterns = [] # Get the relative path from repo root to the gitignore directory rel_dir = os.path.relpath(gitignore_dir, self.repo_root) if rel_dir == ".": rel_dir = "" for line in content.splitlines(): # Strip trailing whitespace (but preserve leading whitespace for now) line = line.rstrip() # Skip empty lines and comments if not line or line.lstrip().startswith("#"): continue # Store whether this is a negation pattern is_negation = line.startswith("!") if is_negation: line = line[1:] # Strip leading/trailing whitespace after removing negation line = line.strip() if not line: continue # Handle escaped characters at the beginning if line.startswith(("\\#", "\\!")): line = line[1:] # Determine if pattern is anchored to the gitignore directory and remove leading slash for processing is_anchored = line.startswith("/") if is_anchored: line = line[1:] # Adjust pattern based on gitignore file location if rel_dir: if is_anchored: # Anchored patterns are relative to the gitignore directory adjusted_pattern = os.path.join(rel_dir, line) else: # Non-anchored patterns can match anywhere below the gitignore directory # We need to preserve this behavior if line.startswith("**/"): # Even if pattern starts with **, it should still be scoped to the subdirectory adjusted_pattern = os.path.join(rel_dir, line) else: # Add the directory prefix but also allow matching in subdirectories adjusted_pattern = os.path.join(rel_dir, "**", line) else: if is_anchored: # Anchored patterns in root should only match at root level # Add leading slash back to indicate root-only matching adjusted_pattern = "/" + line else: # Non-anchored patterns can match anywhere adjusted_pattern = line # Re-add negation if needed if is_negation: adjusted_pattern = "!" + adjusted_pattern # Normalize path separators to forward slashes (gitignore uses forward slashes) adjusted_pattern = adjusted_pattern.replace(os.sep, "/") patterns.append(adjusted_pattern) return patterns def should_ignore(self, path: str) -> bool: """ Check if a path should be ignored based on the gitignore rules. :param path: Path to check (absolute or relative to repo_root) :return: True if the path should be ignored, False otherwise """ # Convert to relative path from repo root if os.path.isabs(path): try: rel_path = os.path.relpath(path, self.repo_root) except Exception as e: # If the path could not be converted to a relative path, # it is outside the repository root, so we ignore it log.info("Ignoring path '%s' which is outside of the repository root (%s)", path, e) return True else: rel_path = path # Ignore paths inside .git rel_path_first_path = Path(rel_path).parts[0] if rel_path_first_path == ".git": return True abs_path = os.path.join(self.repo_root, rel_path) # Normalize path separators rel_path = rel_path.replace(os.sep, "/") if os.path.exists(abs_path) and os.path.isdir(abs_path) and not rel_path.endswith("/"): rel_path = rel_path + "/" # Check against each ignore spec for spec in self.ignore_specs: if spec.matches(rel_path): return True return False def get_ignore_specs(self) -> list[GitignoreSpec]: """ Get all loaded gitignore specs. :return: List of GitignoreSpec objects """ return self.ignore_specs def reload(self) -> None: """Reload all gitignore files from the repository.""" self.ignore_specs.clear() self._load_gitignore_files() def match_path(relative_path: str, path_spec: PathSpec, root_path: str = "") -> bool: """ Match a relative path against a given pathspec. Just pathspec.match_file() is not enough, we need to do some massaging to fix issues with pathspec matching. :param relative_path: relative path to match against the pathspec :param path_spec: the pathspec to match against :param root_path: the root path from which the relative path is derived :return: """ normalized_path = str(relative_path).replace(os.path.sep, "/") # We can have patterns like /src/..., which would only match corresponding paths from the repo root # Unfortunately, pathspec can't know whether a relative path is relative to the repo root or not, # so it will never match src/... # The fix is to just always assume that the input path is relative to the repo root and to # prefix it with /. if not normalized_path.startswith("/"): normalized_path = "/" + normalized_path # pathspec can't handle the matching of directories if they don't end with a slash! # see https://github.com/cpburnz/python-pathspec/issues/89 abs_path = os.path.abspath(os.path.join(root_path, relative_path)) if os.path.isdir(abs_path) and not normalized_path.endswith("/"): normalized_path = normalized_path + "/" return path_spec.match_file(normalized_path) ``` -------------------------------------------------------------------------------- /.github/workflows/pytest.yml: -------------------------------------------------------------------------------- ```yaml name: Tests on CI on: pull_request: push: branches: - main concurrency: group: ci-${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true jobs: cpu: name: Tests on ${{ matrix.os }} runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.11"] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: "${{ matrix.python-version }}" - uses: actions/setup-go@v5 with: go-version: ">=1.17.0" - name: Set up Node.js uses: actions/setup-node@v4 with: node-version: '20.x' - name: Ensure cached directory exist before calling cache-related actions shell: bash run: | mkdir -p $HOME/.serena/language_servers/static mkdir -p $HOME/.cache/go-build mkdir -p $HOME/go/bin # Add Go bin directory to PATH for this workflow # GITHUB_PATH is a special file that GitHub Actions uses to modify PATH # Writing to this file adds the directory to the PATH for subsequent steps - name: Cache Go binaries id: cache-go-binaries uses: actions/cache@v3 with: path: | ~/go/bin ~/.cache/go-build key: go-binaries-${{ runner.os }}-gopls-latest - name: Install gopls if: steps.cache-go-binaries.outputs.cache-hit != 'true' shell: bash run: go install golang.org/x/tools/gopls@latest - name: Set up Elixir if: runner.os != 'Windows' uses: erlef/setup-beam@v1 with: elixir-version: "1.18.4" otp-version: "26.1" # Erlang currently not tested in CI, random hangings on macos, always hangs on ubuntu # In local tests, erlang seems to work though # - name: Install Erlang Language Server # if: runner.os != 'Windows' # shell: bash # run: | # # Install rebar3 if not already available # which rebar3 || (curl -fsSL https://github.com/erlang/rebar3/releases/download/3.23.0/rebar3 -o /tmp/rebar3 && chmod +x /tmp/rebar3 && sudo mv /tmp/rebar3 /usr/local/bin/rebar3) # # Clone and build erlang_ls # git clone https://github.com/erlang-ls/erlang_ls.git /tmp/erlang_ls # cd /tmp/erlang_ls # make install PREFIX=/usr/local # # Ensure erlang_ls is in PATH # echo "$HOME/.local/bin" >> $GITHUB_PATH - name: Install clojure tools uses: DeLaGuardo/[email protected] with: cli: latest - name: Setup Java (for JVM based languages) uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '17' - name: Install Terraform uses: hashicorp/setup-terraform@v3 with: terraform_version: "1.5.0" terraform_wrapper: false # - name: Install swift # if: runner.os != 'Windows' # uses: swift-actions/setup-swift@v2 # Installation of swift with the action screws with installation of ruby on macOS for some reason # We can try again when version 3 of the action is released, where they will also use swiftly # Until then, we use custom code to install swift. Sourcekit-lsp is installed automatically with swift - name: Install Swift with swiftly (macOS) if: runner.os == 'macOS' run: | echo "=== Installing swiftly on macOS ===" curl -O https://download.swift.org/swiftly/darwin/swiftly.pkg && \ installer -pkg swiftly.pkg -target CurrentUserHomeDirectory && \ ~/.swiftly/bin/swiftly init --quiet-shell-followup && \ . "${SWIFTLY_HOME_DIR:-$HOME/.swiftly}/env.sh" && \ hash -r swiftly install --use 6.1.2 swiftly use 6.1.2 echo "~/.swiftly/bin" >> $GITHUB_PATH echo "Swiftly installed successfully" # Verify sourcekit-lsp is working before proceeding echo "=== Verifying sourcekit-lsp installation ===" which sourcekit-lsp || echo "Warning: sourcekit-lsp not found in PATH" sourcekit-lsp --help || echo "Warning: sourcekit-lsp not responding" - name: Install Swift with swiftly (Ubuntu) if: runner.os == 'Linux' run: | echo "=== Installing swiftly on Ubuntu ===" # Install dependencies BEFORE Swift to avoid exit code 1 sudo apt-get update sudo apt-get -y install libcurl4-openssl-dev curl -O https://download.swift.org/swiftly/linux/swiftly-$(uname -m).tar.gz && \ tar zxf swiftly-$(uname -m).tar.gz && \ ./swiftly init --quiet-shell-followup && \ . "${SWIFTLY_HOME_DIR:-$HOME/.local/share/swiftly}/env.sh" && \ hash -r swiftly install --use 6.1.2 swiftly use 6.1.2 echo "=== Adding Swift toolchain to PATH ===" echo "$HOME/.local/share/swiftly/bin" >> $GITHUB_PATH echo "Swiftly installed successfully!" # Verify sourcekit-lsp is working before proceeding echo "=== Verifying sourcekit-lsp installation ===" which sourcekit-lsp || echo "Warning: sourcekit-lsp not found in PATH" sourcekit-lsp --help || echo "Warning: sourcekit-lsp not responding" - name: Install Ruby uses: ruby/setup-ruby@v1 with: ruby-version: '3.4' - name: Install Ruby language server shell: bash run: gem install ruby-lsp - name: Install R uses: r-lib/actions/setup-r@v2 with: r-version: '4.4.2' use-public-rspm: true - name: Install R language server shell: bash run: | Rscript -e "install.packages('languageserver', repos='https://cloud.r-project.org')" - name: Install Zig uses: goto-bus-stop/setup-zig@v2 with: version: 0.14.1 - name: Install ZLS (Zig Language Server) shell: bash run: | if [[ "${{ runner.os }}" == "Linux" ]]; then wget https://github.com/zigtools/zls/releases/download/0.14.0/zls-x86_64-linux.tar.xz tar -xf zls-x86_64-linux.tar.xz sudo mv zls /usr/local/bin/ rm zls-x86_64-linux.tar.xz elif [[ "${{ runner.os }}" == "macOS" ]]; then wget https://github.com/zigtools/zls/releases/download/0.14.0/zls-x86_64-macos.tar.xz tar -xf zls-x86_64-macos.tar.xz sudo mv zls /usr/local/bin/ rm zls-x86_64-macos.tar.xz elif [[ "${{ runner.os }}" == "Windows" ]]; then curl -L -o zls.zip https://github.com/zigtools/zls/releases/download/0.14.0/zls-x86_64-windows.zip unzip -o zls.zip mkdir -p "$HOME/bin" mv zls.exe "$HOME/bin/" echo "$HOME/bin" >> $GITHUB_PATH rm zls.zip fi - name: Install Lua Language Server shell: bash run: | LUA_LS_VERSION="3.15.0" LUA_LS_DIR="$HOME/.serena/language_servers/lua" mkdir -p "$LUA_LS_DIR" if [[ "${{ runner.os }}" == "Linux" ]]; then if [[ "$(uname -m)" == "x86_64" ]]; then wget https://github.com/LuaLS/lua-language-server/releases/download/${LUA_LS_VERSION}/lua-language-server-${LUA_LS_VERSION}-linux-x64.tar.gz tar -xzf lua-language-server-${LUA_LS_VERSION}-linux-x64.tar.gz -C "$LUA_LS_DIR" else wget https://github.com/LuaLS/lua-language-server/releases/download/${LUA_LS_VERSION}/lua-language-server-${LUA_LS_VERSION}-linux-arm64.tar.gz tar -xzf lua-language-server-${LUA_LS_VERSION}-linux-arm64.tar.gz -C "$LUA_LS_DIR" fi chmod +x "$LUA_LS_DIR/bin/lua-language-server" # Create wrapper script instead of symlink to ensure supporting files are found echo '#!/bin/bash' | sudo tee /usr/local/bin/lua-language-server > /dev/null echo 'cd "${HOME}/.serena/language_servers/lua/bin"' | sudo tee -a /usr/local/bin/lua-language-server > /dev/null echo 'exec ./lua-language-server "$@"' | sudo tee -a /usr/local/bin/lua-language-server > /dev/null sudo chmod +x /usr/local/bin/lua-language-server rm lua-language-server-*.tar.gz elif [[ "${{ runner.os }}" == "macOS" ]]; then if [[ "$(uname -m)" == "x86_64" ]]; then wget https://github.com/LuaLS/lua-language-server/releases/download/${LUA_LS_VERSION}/lua-language-server-${LUA_LS_VERSION}-darwin-x64.tar.gz tar -xzf lua-language-server-${LUA_LS_VERSION}-darwin-x64.tar.gz -C "$LUA_LS_DIR" else wget https://github.com/LuaLS/lua-language-server/releases/download/${LUA_LS_VERSION}/lua-language-server-${LUA_LS_VERSION}-darwin-arm64.tar.gz tar -xzf lua-language-server-${LUA_LS_VERSION}-darwin-arm64.tar.gz -C "$LUA_LS_DIR" fi chmod +x "$LUA_LS_DIR/bin/lua-language-server" # Create wrapper script instead of symlink to ensure supporting files are found echo '#!/bin/bash' | sudo tee /usr/local/bin/lua-language-server > /dev/null echo 'cd "${HOME}/.serena/language_servers/lua/bin"' | sudo tee -a /usr/local/bin/lua-language-server > /dev/null echo 'exec ./lua-language-server "$@"' | sudo tee -a /usr/local/bin/lua-language-server > /dev/null sudo chmod +x /usr/local/bin/lua-language-server rm lua-language-server-*.tar.gz elif [[ "${{ runner.os }}" == "Windows" ]]; then curl -L -o lua-ls.zip https://github.com/LuaLS/lua-language-server/releases/download/${LUA_LS_VERSION}/lua-language-server-${LUA_LS_VERSION}-win32-x64.zip unzip -o lua-ls.zip -d "$LUA_LS_DIR" # For Windows, we'll add the bin directory directly to PATH # The lua-language-server.exe can find its supporting files relative to its location echo "$LUA_LS_DIR/bin" >> $GITHUB_PATH rm lua-ls.zip fi - name: Install Perl::LanguageServer if: runner.os != 'Windows' shell: bash run: | if [[ "${{ runner.os }}" == "Linux" ]]; then sudo apt-get update sudo apt-get install -y cpanminus build-essential libanyevent-perl libio-aio-perl elif [[ "${{ runner.os }}" == "macOS" ]]; then brew install cpanminus fi PERL_MM_USE_DEFAULT=1 cpanm --notest --force Perl::LanguageServer # Set up Perl local::lib environment for subsequent steps echo "PERL5LIB=$HOME/perl5/lib/perl5${PERL5LIB:+:${PERL5LIB}}" >> $GITHUB_ENV echo "PERL_LOCAL_LIB_ROOT=$HOME/perl5${PERL_LOCAL_LIB_ROOT:+:${PERL_LOCAL_LIB_ROOT}}" >> $GITHUB_ENV echo "PERL_MB_OPT=--install_base \"$HOME/perl5\"" >> $GITHUB_ENV echo "PERL_MM_OPT=INSTALL_BASE=$HOME/perl5" >> $GITHUB_ENV echo "$HOME/perl5/bin" >> $GITHUB_PATH - name: Install Elm shell: bash run: npm install -g [email protected] - name: Install Nix if: runner.os != 'Windows' # Nix doesn't support Windows natively uses: cachix/install-nix-action@v30 with: nix_path: nixpkgs=channel:nixos-unstable - name: Install nixd (Nix Language Server) if: runner.os != 'Windows' # Skip on Windows since Nix isn't available shell: bash run: | # Install nixd using nix nix profile install github:nix-community/nixd # Verify nixd is installed and working if ! command -v nixd &> /dev/null; then echo "nixd installation failed or not in PATH" exit 1 fi echo "$HOME/.nix-profile/bin" >> $GITHUB_PATH - name: Verify Nix package build if: runner.os != 'Windows' # Nix only supported on Linux/macOS shell: bash run: | # Verify the flake builds successfully nix build --no-link - name: Install Regal (Rego Language Server) shell: bash run: | REGAL_VERSION="0.36.1" if [[ "${{ runner.os }}" == "Linux" ]]; then if [[ "$(uname -m)" == "x86_64" ]]; then curl -L -o regal https://github.com/StyraInc/regal/releases/download/v${REGAL_VERSION}/regal_Linux_x86_64 else curl -L -o regal https://github.com/StyraInc/regal/releases/download/v${REGAL_VERSION}/regal_Linux_arm64 fi chmod +x regal sudo mv regal /usr/local/bin/ elif [[ "${{ runner.os }}" == "macOS" ]]; then if [[ "$(uname -m)" == "x86_64" ]]; then curl -L -o regal https://github.com/StyraInc/regal/releases/download/v${REGAL_VERSION}/regal_Darwin_x86_64 else curl -L -o regal https://github.com/StyraInc/regal/releases/download/v${REGAL_VERSION}/regal_Darwin_arm64 fi chmod +x regal sudo mv regal /usr/local/bin/ elif [[ "${{ runner.os }}" == "Windows" ]]; then curl -L -o regal.exe https://github.com/StyraInc/regal/releases/download/v${REGAL_VERSION}/regal_Windows_x86_64.exe mkdir -p "$HOME/bin" mv regal.exe "$HOME/bin/" echo "$HOME/bin" >> $GITHUB_PATH fi - name: Install uv shell: bash run: curl -LsSf https://astral.sh/uv/install.sh | sh - name: Cache uv virtualenv id: cache-uv uses: actions/cache@v3 with: path: .venv key: uv-venv-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('uv.lock') }} - name: Cache language servers id: cache-language-servers uses: actions/cache@v3 with: path: ~/.serena/language_servers/static key: language-servers-${{ runner.os }}-v1 restore-keys: | language-servers-${{ runner.os }}- - name: Create virtual environment shell: bash run: | if [ ! -d ".venv" ]; then uv venv fi - name: Install dependencies shell: bash run: uv pip install -e ".[dev]" - name: Check formatting shell: bash run: uv run poe lint - name: Test with pytest shell: bash run: uv run poe test ``` -------------------------------------------------------------------------------- /src/serena/gui_log_viewer.py: -------------------------------------------------------------------------------- ```python # mypy: ignore-errors import logging import os import queue import sys import threading import tkinter as tk import traceback from enum import Enum, auto from pathlib import Path from typing import Literal from serena import constants from serena.util.logging import MemoryLogHandler log = logging.getLogger(__name__) class LogLevel(Enum): DEBUG = auto() INFO = auto() WARNING = auto() ERROR = auto() DEFAULT = auto() class GuiLogViewer: """ A class that creates a Tkinter GUI for displaying log messages in a separate thread. The log viewer supports coloring based on log levels (DEBUG, INFO, WARNING, ERROR). It can also highlight tool names in boldface when they appear in log messages. """ def __init__( self, mode: Literal["dashboard", "error"], title="Log Viewer", memory_log_handler: MemoryLogHandler | None = None, width=800, height=600, ): """ :param mode: the mode; if "dashboard", run a dashboard with logs and some control options; if "error", run a simple error log viewer (for fatal exceptions) :param title: the window title :param memory_log_handler: an optional log handler from which to obtain log messages; If not provided, must pass the instance to a `GuiLogViewerHandler` to add log messages. :param width: the initial window width :param height: the initial window height """ self.mode = mode self.title = title self.width = width self.height = height self.message_queue = queue.Queue() self.running = False self.log_thread = None self.tool_names = [] # List to store tool names for highlighting # Define colors for different log levels self.log_colors = { LogLevel.DEBUG: "#808080", # Gray LogLevel.INFO: "#000000", # Black LogLevel.WARNING: "#FF8C00", # Dark Orange LogLevel.ERROR: "#FF0000", # Red LogLevel.DEFAULT: "#000000", # Black } if memory_log_handler is not None: for msg in memory_log_handler.get_log_messages(): self.message_queue.put(msg) memory_log_handler.add_emit_callback(lambda msg: self.message_queue.put(msg)) def start(self): """Start the log viewer in a separate thread.""" if not self.running: self.log_thread = threading.Thread(target=self.run_gui) self.log_thread.daemon = True self.log_thread.start() return True return False def stop(self): """Stop the log viewer.""" if self.running: # Add a sentinel value to the queue to signal the GUI to exit self.message_queue.put(None) return True return False def set_tool_names(self, tool_names): """ Set or update the list of tool names to be highlighted in log messages. Args: tool_names (list): A list of tool name strings to highlight """ self.tool_names = tool_names def add_log(self, message): """ Add a log message to the viewer. Args: message (str): The log message to display """ self.message_queue.put(message) def _determine_log_level(self, message): """ Determine the log level from the message. Args: message (str): The log message Returns: LogLevel: The determined log level """ message_upper = message.upper() if message_upper.startswith("DEBUG"): return LogLevel.DEBUG elif message_upper.startswith("INFO"): return LogLevel.INFO elif message_upper.startswith("WARNING"): return LogLevel.WARNING elif message_upper.startswith("ERROR"): return LogLevel.ERROR else: return LogLevel.DEFAULT def _process_queue(self): """Process messages from the queue and update the text widget.""" try: while not self.message_queue.empty(): message = self.message_queue.get_nowait() # Check for sentinel value to exit if message is None: self.root.quit() return # Check if scrollbar is at the bottom before adding new text # Get current scroll position current_position = self.text_widget.yview() # If near the bottom (allowing for small floating point differences) was_at_bottom = current_position[1] > 0.99 log_level = self._determine_log_level(message) # Insert the message at the end of the text with appropriate log level tag self.text_widget.configure(state=tk.NORMAL) # Find tool names in the message and highlight them if self.tool_names: # Capture start position (before insertion) start_index = self.text_widget.index("end-1c") # Insert the message self.text_widget.insert(tk.END, message + "\n", log_level.name) # Convert start index to line/char format line, char = map(int, start_index.split(".")) # Search for tool names in the message string directly for tool_name in self.tool_names: start_offset = 0 while True: found_at = message.find(tool_name, start_offset) if found_at == -1: break # Calculate line/column from offset offset_line = line offset_char = char for c in message[:found_at]: if c == "\n": offset_line += 1 offset_char = 0 else: offset_char += 1 # Construct index positions start_pos = f"{offset_line}.{offset_char}" end_pos = f"{offset_line}.{offset_char + len(tool_name)}" # Add tag to highlight the tool name self.text_widget.tag_add("TOOL_NAME", start_pos, end_pos) start_offset = found_at + len(tool_name) else: # No tool names to highlight, just insert the message self.text_widget.insert(tk.END, message + "\n", log_level.name) self.text_widget.configure(state=tk.DISABLED) # Auto-scroll to the bottom only if it was already at the bottom if was_at_bottom: self.text_widget.see(tk.END) # Schedule to check the queue again if self.running: self.root.after(100, self._process_queue) except Exception as e: print(f"Error processing message queue: {e}", file=sys.stderr) if self.running: self.root.after(100, self._process_queue) def run_gui(self): """Run the GUI""" self.running = True try: # Set app id (avoid app being lumped together with other Python-based apps in Windows taskbar) if sys.platform == "win32": import ctypes ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID("oraios.serena") self.root = tk.Tk() self.root.title(self.title) self.root.geometry(f"{self.width}x{self.height}") # Make the window resizable self.root.columnconfigure(0, weight=1) # We now have two rows - one for logo and one for text self.root.rowconfigure(0, weight=0) # Logo row self.root.rowconfigure(1, weight=1) # Text content row dashboard_path = Path(constants.SERENA_DASHBOARD_DIR) # Load and display the logo image try: # construct path relative to path of this file image_path = dashboard_path / "serena-logs.png" self.logo_image = tk.PhotoImage(file=image_path) # Create a label to display the logo self.logo_label = tk.Label(self.root, image=self.logo_image) self.logo_label.grid(row=0, column=0, sticky="ew") except Exception as e: print(f"Error loading logo image: {e}", file=sys.stderr) # Create frame to hold text widget and scrollbars frame = tk.Frame(self.root) frame.grid(row=1, column=0, sticky="nsew") frame.columnconfigure(0, weight=1) frame.rowconfigure(0, weight=1) # Create horizontal scrollbar h_scrollbar = tk.Scrollbar(frame, orient=tk.HORIZONTAL) h_scrollbar.grid(row=1, column=0, sticky="ew") # Create vertical scrollbar v_scrollbar = tk.Scrollbar(frame, orient=tk.VERTICAL) v_scrollbar.grid(row=0, column=1, sticky="ns") # Create text widget with horizontal scrolling self.text_widget = tk.Text( frame, wrap=tk.NONE, width=self.width, height=self.height, xscrollcommand=h_scrollbar.set, yscrollcommand=v_scrollbar.set ) self.text_widget.grid(row=0, column=0, sticky="nsew") self.text_widget.configure(state=tk.DISABLED) # Make it read-only # Configure scrollbars h_scrollbar.config(command=self.text_widget.xview) v_scrollbar.config(command=self.text_widget.yview) # Configure tags for different log levels with appropriate colors for level, color in self.log_colors.items(): self.text_widget.tag_configure(level.name, foreground=color) # Configure tag for tool names self.text_widget.tag_configure("TOOL_NAME", background="#ffff00") # Set up the queue processing self.root.after(100, self._process_queue) # Handle window close event depending on mode if self.mode == "dashboard": self.root.protocol("WM_DELETE_WINDOW", lambda: self.root.iconify()) else: self.root.protocol("WM_DELETE_WINDOW", self.stop) # Create menu bar if self.mode == "dashboard": menubar = tk.Menu(self.root) server_menu = tk.Menu(menubar, tearoff=0) server_menu.add_command(label="Shutdown", command=self._shutdown_server) # type: ignore menubar.add_cascade(label="Server", menu=server_menu) self.root.config(menu=menubar) # Configure icons icon_16 = tk.PhotoImage(file=dashboard_path / "serena-icon-16.png") icon_32 = tk.PhotoImage(file=dashboard_path / "serena-icon-32.png") icon_48 = tk.PhotoImage(file=dashboard_path / "serena-icon-48.png") self.root.iconphoto(False, icon_48, icon_32, icon_16) # Start the Tkinter event loop self.root.mainloop() except Exception as e: print(f"Error in GUI thread: {e}", file=sys.stderr) finally: self.running = False def _shutdown_server(self) -> None: log.info("Shutting down Serena") # noinspection PyUnresolvedReferences # noinspection PyProtectedMember os._exit(0) class GuiLogViewerHandler(logging.Handler): """ A logging handler that sends log records to a ThreadedLogViewer instance. This handler can be integrated with Python's standard logging module to direct log entries to a GUI log viewer. """ def __init__( self, log_viewer: GuiLogViewer, level=logging.NOTSET, format_string: str | None = "%(levelname)-5s %(asctime)-15s %(name)s:%(funcName)s:%(lineno)d - %(message)s", ): """ Initialize the handler with a ThreadedLogViewer instance. Args: log_viewer: A ThreadedLogViewer instance that will display the logs level: The logging level (default: NOTSET which captures all logs) format_string: the format string """ super().__init__(level) self.log_viewer = log_viewer self.formatter = logging.Formatter(format_string) # Start the log viewer if it's not already running if not self.log_viewer.running: self.log_viewer.start() @classmethod def is_instance_registered(cls) -> bool: for h in logging.Logger.root.handlers: if isinstance(h, cls): return True return False def emit(self, record): """ Emit a log record to the ThreadedLogViewer. Args: record: The log record to emit """ try: # Format the record according to the formatter msg = self.format(record) # Convert the level name to a standard format for the viewer level_prefix = record.levelname # Add the appropriate prefix if it's not already there if not msg.startswith(level_prefix): msg = f"{level_prefix}: {msg}" self.log_viewer.add_log(msg) except Exception: self.handleError(record) def close(self): """ Close the handler and optionally stop the log viewer. """ # We don't automatically stop the log viewer here as it might # be used by other handlers or directly by the application super().close() def stop_viewer(self): """ Explicitly stop the associated log viewer. """ if self.log_viewer.running: self.log_viewer.stop() def show_fatal_exception(e: Exception): """ Makes sure the given exception is shown in the GUI log viewer, either an existing instance or a new one. :param e: the exception to display """ # show in new window in main thread (user must close it) log_viewer = GuiLogViewer("error") exc_info = "".join(traceback.format_exception(type(e), e, e.__traceback__)) log_viewer.add_log(f"ERROR Fatal exception: {e}\n{exc_info}") log_viewer.run_gui() ``` -------------------------------------------------------------------------------- /src/serena/project.py: -------------------------------------------------------------------------------- ```python import logging import os from pathlib import Path from typing import Any import pathspec from serena.config.serena_config import DEFAULT_TOOL_TIMEOUT, ProjectConfig from serena.constants import SERENA_MANAGED_DIR_IN_HOME, SERENA_MANAGED_DIR_NAME from serena.text_utils import MatchedConsecutiveLines, search_files from serena.util.file_system import GitignoreParser, match_path from solidlsp import SolidLanguageServer from solidlsp.ls_config import Language, LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.settings import SolidLSPSettings log = logging.getLogger(__name__) class Project: def __init__(self, project_root: str, project_config: ProjectConfig, is_newly_created: bool = False): self.project_root = project_root self.project_config = project_config self.is_newly_created = is_newly_created # create .gitignore file in the project's Serena data folder if not yet present serena_data_gitignore_path = os.path.join(self.path_to_serena_data_folder(), ".gitignore") if not os.path.exists(serena_data_gitignore_path): os.makedirs(os.path.dirname(serena_data_gitignore_path), exist_ok=True) log.info(f"Creating .gitignore file in {serena_data_gitignore_path}") with open(serena_data_gitignore_path, "w", encoding="utf-8") as f: f.write(f"/{SolidLanguageServer.CACHE_FOLDER_NAME}\n") # gather ignored paths from the project configuration and gitignore files ignored_patterns = project_config.ignored_paths if len(ignored_patterns) > 0: log.info(f"Using {len(ignored_patterns)} ignored paths from the explicit project configuration.") log.debug(f"Ignored paths: {ignored_patterns}") if project_config.ignore_all_files_in_gitignore: gitignore_parser = GitignoreParser(self.project_root) for spec in gitignore_parser.get_ignore_specs(): log.debug(f"Adding {len(spec.patterns)} patterns from {spec.file_path} to the ignored paths.") ignored_patterns.extend(spec.patterns) self._ignored_patterns = ignored_patterns # Set up the pathspec matcher for the ignored paths # for all absolute paths in ignored_paths, convert them to relative paths processed_patterns = [] for pattern in set(ignored_patterns): # Normalize separators (pathspec expects forward slashes) pattern = pattern.replace(os.path.sep, "/") processed_patterns.append(pattern) log.debug(f"Processing {len(processed_patterns)} ignored paths") self._ignore_spec = pathspec.PathSpec.from_lines(pathspec.patterns.GitWildMatchPattern, processed_patterns) @property def project_name(self) -> str: return self.project_config.project_name @property def language(self) -> Language: return self.project_config.language @classmethod def load(cls, project_root: str | Path, autogenerate: bool = True) -> "Project": project_root = Path(project_root).resolve() if not project_root.exists(): raise FileNotFoundError(f"Project root not found: {project_root}") project_config = ProjectConfig.load(project_root, autogenerate=autogenerate) return Project(project_root=str(project_root), project_config=project_config) def path_to_serena_data_folder(self) -> str: return os.path.join(self.project_root, SERENA_MANAGED_DIR_NAME) def path_to_project_yml(self) -> str: return os.path.join(self.project_root, self.project_config.rel_path_to_project_yml()) def read_file(self, relative_path: str) -> str: """ Reads a file relative to the project root. :param relative_path: the path to the file relative to the project root :return: the content of the file """ abs_path = Path(self.project_root) / relative_path if not abs_path.exists(): raise FileNotFoundError(f"File not found: {abs_path}") return abs_path.read_text(encoding=self.project_config.encoding) def get_ignore_spec(self) -> pathspec.PathSpec: """ :return: the pathspec matcher for the paths that were configured to be ignored, either explicitly or implicitly through .gitignore files. """ return self._ignore_spec def _is_ignored_relative_path(self, relative_path: str | Path, ignore_non_source_files: bool = True) -> bool: """ Determine whether an existing path should be ignored based on file type and ignore patterns. Raises `FileNotFoundError` if the path does not exist. :param relative_path: Relative path to check :param ignore_non_source_files: whether files that are not source files (according to the file masks determined by the project's programming language) shall be ignored :return: whether the path should be ignored """ # special case, never ignore the project root itself # If the user ignores hidden files, "." might match against the corresponding PathSpec pattern. # The empty string also points to the project root and should never be ignored. if str(relative_path) in [".", ""]: return False abs_path = os.path.join(self.project_root, relative_path) if not os.path.exists(abs_path): raise FileNotFoundError(f"File {abs_path} not found, the ignore check cannot be performed") # Check file extension if it's a file is_file = os.path.isfile(abs_path) if is_file and ignore_non_source_files: fn_matcher = self.language.get_source_fn_matcher() if not fn_matcher.is_relevant_filename(abs_path): return True # Create normalized path for consistent handling rel_path = Path(relative_path) # always ignore paths inside .git if len(rel_path.parts) > 0 and rel_path.parts[0] == ".git": return True return match_path(str(relative_path), self.get_ignore_spec(), root_path=self.project_root) def is_ignored_path(self, path: str | Path, ignore_non_source_files: bool = False) -> bool: """ Checks whether the given path is ignored :param path: the path to check, can be absolute or relative :param ignore_non_source_files: whether to ignore files that are not source files (according to the file masks determined by the project's programming language) """ path = Path(path) if path.is_absolute(): try: relative_path = path.relative_to(self.project_root) except ValueError: # If the path is not relative to the project root, we consider it as an absolute path outside the project # (which we ignore) log.warning(f"Path {path} is not relative to the project root {self.project_root} and was therefore ignored") return True else: relative_path = path return self._is_ignored_relative_path(str(relative_path), ignore_non_source_files=ignore_non_source_files) def is_path_in_project(self, path: str | Path) -> bool: """ Checks if the given (absolute or relative) path is inside the project directory. Note that even relative paths may be outside if they contain ".." or point to symlinks. """ path = Path(path) _proj_root = Path(self.project_root) if not path.is_absolute(): path = _proj_root / path path = path.resolve() return path.is_relative_to(_proj_root) def relative_path_exists(self, relative_path: str) -> bool: """ Checks if the given relative path exists in the project directory. :param relative_path: the path to check, relative to the project root :return: True if the path exists, False otherwise """ abs_path = Path(self.project_root) / relative_path return abs_path.exists() def validate_relative_path(self, relative_path: str, require_not_ignored: bool = False) -> None: """ Validates that the given relative path to an existing file/dir is safe to read or edit, meaning it's inside the project directory. Passing a path to a non-existing file will lead to a `FileNotFoundError`. :param relative_path: the path to validate, relative to the project root :param require_not_ignored: if True, the path must not be ignored according to the project's ignore settings """ if not self.is_path_in_project(relative_path): raise ValueError(f"{relative_path=} points to path outside of the repository root; cannot access for safety reasons") if require_not_ignored: if self.is_ignored_path(relative_path): raise ValueError(f"Path {relative_path} is ignored; cannot access for safety reasons") def gather_source_files(self, relative_path: str = "") -> list[str]: """Retrieves relative paths of all source files, optionally limited to the given path :param relative_path: if provided, restrict search to this path """ rel_file_paths = [] start_path = os.path.join(self.project_root, relative_path) if not os.path.exists(start_path): raise FileNotFoundError(f"Relative path {start_path} not found.") if os.path.isfile(start_path): return [relative_path] else: for root, dirs, files in os.walk(start_path, followlinks=True): # prevent recursion into ignored directories dirs[:] = [d for d in dirs if not self.is_ignored_path(os.path.join(root, d))] # collect non-ignored files for file in files: abs_file_path = os.path.join(root, file) try: if not self.is_ignored_path(abs_file_path, ignore_non_source_files=True): try: rel_file_path = os.path.relpath(abs_file_path, start=self.project_root) except Exception: log.warning( "Ignoring path '%s' because it appears to be outside of the project root (%s)", abs_file_path, self.project_root, ) continue rel_file_paths.append(rel_file_path) except FileNotFoundError: log.warning( f"File {abs_file_path} not found (possibly due it being a symlink), skipping it in request_parsed_files", ) return rel_file_paths def search_source_files_for_pattern( self, pattern: str, relative_path: str = "", context_lines_before: int = 0, context_lines_after: int = 0, paths_include_glob: str | None = None, paths_exclude_glob: str | None = None, ) -> list[MatchedConsecutiveLines]: """ Search for a pattern across all (non-ignored) source files :param pattern: Regular expression pattern to search for, either as a compiled Pattern or string :param relative_path: :param context_lines_before: Number of lines of context to include before each match :param context_lines_after: Number of lines of context to include after each match :param paths_include_glob: Glob pattern to filter which files to include in the search :param paths_exclude_glob: Glob pattern to filter which files to exclude from the search. Takes precedence over paths_include_glob. :return: List of matched consecutive lines with context """ relative_file_paths = self.gather_source_files(relative_path=relative_path) return search_files( relative_file_paths, pattern, root_path=self.project_root, context_lines_before=context_lines_before, context_lines_after=context_lines_after, paths_include_glob=paths_include_glob, paths_exclude_glob=paths_exclude_glob, ) def retrieve_content_around_line( self, relative_file_path: str, line: int, context_lines_before: int = 0, context_lines_after: int = 0 ) -> MatchedConsecutiveLines: """ Retrieve the content of the given file around the given line. :param relative_file_path: The relative path of the file to retrieve the content from :param line: The line number to retrieve the content around :param context_lines_before: The number of lines to retrieve before the given line :param context_lines_after: The number of lines to retrieve after the given line :return MatchedConsecutiveLines: A container with the desired lines. """ file_contents = self.read_file(relative_file_path) return MatchedConsecutiveLines.from_file_contents( file_contents, line=line, context_lines_before=context_lines_before, context_lines_after=context_lines_after, source_file_path=relative_file_path, ) def create_language_server( self, log_level: int = logging.INFO, ls_timeout: float | None = DEFAULT_TOOL_TIMEOUT - 5, trace_lsp_communication: bool = False, ls_specific_settings: dict[Language, Any] | None = None, ) -> SolidLanguageServer: """ Create a language server for a project. Note that you will have to start it before performing any LS operations. :param project: either a path to the project root or a ProjectConfig instance. If no project.yml is found, the default project configuration will be used. :param log_level: the log level for the language server :param ls_timeout: the timeout for the language server :param trace_lsp_communication: whether to trace LSP communication :param ls_specific_settings: optional LS specific configuration of the language server, see docstrings in the inits of subclasses of SolidLanguageServer to see what values may be passed. :return: the language server """ ls_config = LanguageServerConfig( code_language=self.language, ignored_paths=self._ignored_patterns, trace_lsp_communication=trace_lsp_communication, ) ls_logger = LanguageServerLogger(log_level=log_level) log.info(f"Creating language server instance for {self.project_root}.") return SolidLanguageServer.create( ls_config, ls_logger, self.project_root, timeout=ls_timeout, solidlsp_settings=SolidLSPSettings( solidlsp_dir=SERENA_MANAGED_DIR_IN_HOME, project_data_relative_path=SERENA_MANAGED_DIR_NAME, ls_specific_settings=ls_specific_settings or {}, ), ) ``` -------------------------------------------------------------------------------- /src/solidlsp/ls_utils.py: -------------------------------------------------------------------------------- ```python """ This file contains various utility functions like I/O operations, handling paths, etc. """ import gzip import logging import os import platform import shutil import subprocess import uuid import zipfile from enum import Enum from pathlib import Path, PurePath import requests from solidlsp.ls_exceptions import SolidLSPException from solidlsp.ls_logger import LanguageServerLogger from solidlsp.ls_types import UnifiedSymbolInformation class InvalidTextLocationError(Exception): pass class TextUtils: """ Utilities for text operations. """ @staticmethod def get_line_col_from_index(text: str, index: int) -> tuple[int, int]: """ Returns the zero-indexed line and column number of the given index in the given text """ l = 0 c = 0 idx = 0 while idx < index: if text[idx] == "\n": l += 1 c = 0 else: c += 1 idx += 1 return l, c @staticmethod def get_index_from_line_col(text: str, line: int, col: int) -> int: """ Returns the index of the given zero-indexed line and column number in the given text """ idx = 0 while line > 0: if idx >= len(text): raise InvalidTextLocationError if text[idx] == "\n": line -= 1 idx += 1 idx += col return idx @staticmethod def _get_updated_position_from_line_and_column_and_edit(l: int, c: int, text_to_be_inserted: str) -> tuple[int, int]: """ Utility function to get the position of the cursor after inserting text at a given line and column. """ num_newlines_in_gen_text = text_to_be_inserted.count("\n") if num_newlines_in_gen_text > 0: l += num_newlines_in_gen_text c = len(text_to_be_inserted.split("\n")[-1]) else: c += len(text_to_be_inserted) return (l, c) @staticmethod def delete_text_between_positions(text: str, start_line: int, start_col: int, end_line: int, end_col: int) -> tuple[str, str]: """ Deletes the text between the given start and end positions. Returns the modified text and the deleted text. """ del_start_idx = TextUtils.get_index_from_line_col(text, start_line, start_col) del_end_idx = TextUtils.get_index_from_line_col(text, end_line, end_col) deleted_text = text[del_start_idx:del_end_idx] new_text = text[:del_start_idx] + text[del_end_idx:] return new_text, deleted_text @staticmethod def insert_text_at_position(text: str, line: int, col: int, text_to_be_inserted: str) -> tuple[str, int, int]: """ Inserts the given text at the given line and column. Returns the modified text and the new line and column. """ try: change_index = TextUtils.get_index_from_line_col(text, line, col) except InvalidTextLocationError: num_lines_in_text = text.count("\n") + 1 max_line = num_lines_in_text - 1 if line == max_line + 1 and col == 0: # trying to insert at new line after full text # insert at end, adding missing newline change_index = len(text) text_to_be_inserted = "\n" + text_to_be_inserted else: raise new_text = text[:change_index] + text_to_be_inserted + text[change_index:] new_l, new_c = TextUtils._get_updated_position_from_line_and_column_and_edit(line, col, text_to_be_inserted) return new_text, new_l, new_c class PathUtils: """ Utilities for platform-agnostic path operations. """ @staticmethod def uri_to_path(uri: str) -> str: """ Converts a URI to a file path. Works on both Linux and Windows. This method was obtained from https://stackoverflow.com/a/61922504 """ try: from urllib.parse import unquote, urlparse from urllib.request import url2pathname except ImportError: # backwards compatibility from urllib import unquote, url2pathname from urlparse import urlparse parsed = urlparse(uri) host = f"{os.path.sep}{os.path.sep}{parsed.netloc}{os.path.sep}" path = os.path.normpath(os.path.join(host, url2pathname(unquote(parsed.path)))) return path @staticmethod def path_to_uri(path: str) -> str: """ Converts a file path to a file URI (file:///...). """ return str(Path(path).absolute().as_uri()) @staticmethod def is_glob_pattern(pattern: str) -> bool: """Check if a pattern contains glob-specific characters.""" return any(c in pattern for c in "*?[]!") @staticmethod def get_relative_path(path: str, base_path: str) -> str | None: """ Gets relative path if it's possible (paths should be on the same drive), returns `None` otherwise. """ if PurePath(path).drive == PurePath(base_path).drive: rel_path = str(PurePath(os.path.relpath(path, base_path))) return rel_path return None class FileUtils: """ Utility functions for file operations. """ @staticmethod def read_file(logger: LanguageServerLogger, file_path: str) -> str: """ Reads the file at the given path and returns the contents as a string. """ if not os.path.exists(file_path): logger.log(f"File read '{file_path}' failed: File does not exist.", logging.ERROR) raise SolidLSPException(f"File read '{file_path}' failed: File does not exist.") try: with open(file_path, encoding="utf-8") as inp_file: return inp_file.read() except Exception as exc: logger.log(f"File read '{file_path}' failed to read with encoding 'utf-8': {exc}", logging.ERROR) raise SolidLSPException("File read failed.") from None @staticmethod def download_file(logger: LanguageServerLogger, url: str, target_path: str) -> None: """ Downloads the file from the given URL to the given {target_path} """ os.makedirs(os.path.dirname(target_path), exist_ok=True) try: response = requests.get(url, stream=True, timeout=60) if response.status_code != 200: logger.log(f"Error downloading file '{url}': {response.status_code} {response.text}", logging.ERROR) raise SolidLSPException("Error downloading file.") with open(target_path, "wb") as f: shutil.copyfileobj(response.raw, f) except Exception as exc: logger.log(f"Error downloading file '{url}': {exc}", logging.ERROR) raise SolidLSPException("Error downloading file.") from None @staticmethod def download_and_extract_archive(logger: LanguageServerLogger, url: str, target_path: str, archive_type: str) -> None: """ Downloads the archive from the given URL having format {archive_type} and extracts it to the given {target_path} """ try: tmp_files = [] tmp_file_name = str(PurePath(os.path.expanduser("~"), "multilspy_tmp", uuid.uuid4().hex)) tmp_files.append(tmp_file_name) os.makedirs(os.path.dirname(tmp_file_name), exist_ok=True) FileUtils.download_file(logger, url, tmp_file_name) if archive_type in ["tar", "gztar", "bztar", "xztar"]: os.makedirs(target_path, exist_ok=True) shutil.unpack_archive(tmp_file_name, target_path, archive_type) elif archive_type == "zip": os.makedirs(target_path, exist_ok=True) with zipfile.ZipFile(tmp_file_name, "r") as zip_ref: for zip_info in zip_ref.infolist(): extracted_path = zip_ref.extract(zip_info, target_path) ZIP_SYSTEM_UNIX = 3 # zip file created on Unix system if zip_info.create_system != ZIP_SYSTEM_UNIX: continue # extractall() does not preserve permissions # see. https://github.com/python/cpython/issues/59999 attrs = (zip_info.external_attr >> 16) & 0o777 if attrs: os.chmod(extracted_path, attrs) elif archive_type == "zip.gz": os.makedirs(target_path, exist_ok=True) tmp_file_name_ungzipped = tmp_file_name + ".zip" tmp_files.append(tmp_file_name_ungzipped) with gzip.open(tmp_file_name, "rb") as f_in, open(tmp_file_name_ungzipped, "wb") as f_out: shutil.copyfileobj(f_in, f_out) shutil.unpack_archive(tmp_file_name_ungzipped, target_path, "zip") elif archive_type == "gz": with gzip.open(tmp_file_name, "rb") as f_in, open(target_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) elif archive_type == "binary": # For single binary files, just move to target without extraction shutil.move(tmp_file_name, target_path) else: logger.log(f"Unknown archive type '{archive_type}' for extraction", logging.ERROR) raise SolidLSPException(f"Unknown archive type '{archive_type}'") except Exception as exc: logger.log(f"Error extracting archive '{tmp_file_name}' obtained from '{url}': {exc}", logging.ERROR) raise SolidLSPException("Error extracting archive.") from exc finally: for tmp_file_name in tmp_files: if os.path.exists(tmp_file_name): Path.unlink(Path(tmp_file_name)) class PlatformId(str, Enum): """ multilspy supported platforms """ WIN_x86 = "win-x86" WIN_x64 = "win-x64" WIN_arm64 = "win-arm64" OSX = "osx" OSX_x64 = "osx-x64" OSX_arm64 = "osx-arm64" LINUX_x86 = "linux-x86" LINUX_x64 = "linux-x64" LINUX_arm64 = "linux-arm64" LINUX_MUSL_x64 = "linux-musl-x64" LINUX_MUSL_arm64 = "linux-musl-arm64" def is_windows(self): return self.value.startswith("win") class DotnetVersion(str, Enum): """ multilspy supported dotnet versions """ V4 = "4" V6 = "6" V7 = "7" V8 = "8" V9 = "9" VMONO = "mono" class PlatformUtils: """ This class provides utilities for platform detection and identification. """ @classmethod def get_platform_id(cls) -> PlatformId: """ Returns the platform id for the current system """ system = platform.system() machine = platform.machine() bitness = platform.architecture()[0] if system == "Windows" and machine == "": machine = cls._determine_windows_machine_type() system_map = {"Windows": "win", "Darwin": "osx", "Linux": "linux"} machine_map = { "AMD64": "x64", "x86_64": "x64", "i386": "x86", "i686": "x86", "aarch64": "arm64", "arm64": "arm64", "ARM64": "arm64", } if system in system_map and machine in machine_map: platform_id = system_map[system] + "-" + machine_map[machine] if system == "Linux" and bitness == "64bit": libc = platform.libc_ver()[0] if libc != "glibc": platform_id += "-" + libc return PlatformId(platform_id) else: raise SolidLSPException(f"Unknown platform: {system=}, {machine=}, {bitness=}") @staticmethod def _determine_windows_machine_type(): import ctypes from ctypes import wintypes class SYSTEM_INFO(ctypes.Structure): class _U(ctypes.Union): class _S(ctypes.Structure): _fields_ = [("wProcessorArchitecture", wintypes.WORD), ("wReserved", wintypes.WORD)] _fields_ = [("dwOemId", wintypes.DWORD), ("s", _S)] _anonymous_ = ("s",) _fields_ = [ ("u", _U), ("dwPageSize", wintypes.DWORD), ("lpMinimumApplicationAddress", wintypes.LPVOID), ("lpMaximumApplicationAddress", wintypes.LPVOID), ("dwActiveProcessorMask", wintypes.LPVOID), ("dwNumberOfProcessors", wintypes.DWORD), ("dwProcessorType", wintypes.DWORD), ("dwAllocationGranularity", wintypes.DWORD), ("wProcessorLevel", wintypes.WORD), ("wProcessorRevision", wintypes.WORD), ] _anonymous_ = ("u",) sys_info = SYSTEM_INFO() ctypes.windll.kernel32.GetNativeSystemInfo(ctypes.byref(sys_info)) arch_map = { 9: "AMD64", 5: "ARM", 12: "arm64", 6: "Intel Itanium-based", 0: "i386", } return arch_map.get(sys_info.wProcessorArchitecture, f"Unknown ({sys_info.wProcessorArchitecture})") @staticmethod def get_dotnet_version() -> DotnetVersion: """ Returns the dotnet version for the current system """ try: result = subprocess.run(["dotnet", "--list-runtimes"], capture_output=True, check=True) available_version_cmd_output = [] for line in result.stdout.decode("utf-8").split("\n"): if line.startswith("Microsoft.NETCore.App"): version_cmd_output = line.split(" ")[1] available_version_cmd_output.append(version_cmd_output) if not available_version_cmd_output: raise SolidLSPException("dotnet not found on the system") # Check for supported versions in order of preference (latest first) for version_cmd_output in available_version_cmd_output: if version_cmd_output.startswith("9"): return DotnetVersion.V9 if version_cmd_output.startswith("8"): return DotnetVersion.V8 if version_cmd_output.startswith("7"): return DotnetVersion.V7 if version_cmd_output.startswith("6"): return DotnetVersion.V6 if version_cmd_output.startswith("4"): return DotnetVersion.V4 # If no supported version found, raise exception with all available versions raise SolidLSPException( f"No supported dotnet version found. Available versions: {', '.join(available_version_cmd_output)}. Supported versions: 4, 6, 7, 8" ) except (FileNotFoundError, subprocess.CalledProcessError): try: result = subprocess.run(["mono", "--version"], capture_output=True, check=True) return DotnetVersion.VMONO except (FileNotFoundError, subprocess.CalledProcessError): raise SolidLSPException("dotnet or mono not found on the system") class SymbolUtils: @staticmethod def symbol_tree_contains_name(roots: list[UnifiedSymbolInformation], name: str) -> bool: for symbol in roots: if symbol["name"] == name: return True if SymbolUtils.symbol_tree_contains_name(symbol["children"], name): return True return False ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/nixd_ls.py: -------------------------------------------------------------------------------- ```python """ Provides Nix specific instantiation of the LanguageServer class using nixd (Nix Language Server). Note: Windows is not supported as Nix itself doesn't support Windows natively. """ import logging import os import pathlib import platform import shutil import subprocess import threading from pathlib import Path from overrides import override from solidlsp import ls_types from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings class NixLanguageServer(SolidLanguageServer): """ Provides Nix specific instantiation of the LanguageServer class using nixd. """ def _extend_nix_symbol_range_to_include_semicolon( self, symbol: ls_types.UnifiedSymbolInformation, file_content: str ) -> ls_types.UnifiedSymbolInformation: """ Extend symbol range to include trailing semicolon for Nix attribute symbols. nixd provides ranges that exclude semicolons (expression-level), but serena needs statement-level ranges that include semicolons for proper replacement. """ range_info = symbol["range"] end_line = range_info["end"]["line"] end_char = range_info["end"]["character"] # Split file content into lines lines = file_content.split("\n") if end_line >= len(lines): return symbol line = lines[end_line] # Check if there's a semicolon immediately after the current range end if end_char < len(line) and line[end_char] == ";": # Extend range to include the semicolon new_range = {"start": range_info["start"], "end": {"line": end_line, "character": end_char + 1}} # Create modified symbol with extended range extended_symbol = symbol.copy() extended_symbol["range"] = new_range # CRITICAL: Also update the location.range if it exists if extended_symbol.get("location"): location = extended_symbol["location"].copy() if "range" in location: location["range"] = new_range.copy() extended_symbol["location"] = location return extended_symbol return symbol @override def request_document_symbols( self, relative_file_path: str, include_body: bool = False ) -> tuple[list[ls_types.UnifiedSymbolInformation], list[ls_types.UnifiedSymbolInformation]]: """ Override to extend Nix symbol ranges to include trailing semicolons. nixd provides expression-level ranges (excluding semicolons) but serena needs statement-level ranges (including semicolons) for proper symbol replacement. """ # Get symbols from parent implementation all_symbols, root_symbols = super().request_document_symbols(relative_file_path, include_body) # Get file content for range extension file_content = self.language_server.retrieve_full_file_content(relative_file_path) # Extend ranges for all symbols recursively def extend_symbol_and_children(symbol: ls_types.UnifiedSymbolInformation) -> ls_types.UnifiedSymbolInformation: # Extend this symbol's range extended = self._extend_nix_symbol_range_to_include_semicolon(symbol, file_content) # Extend children recursively if extended.get("children"): extended["children"] = [extend_symbol_and_children(child) for child in extended["children"]] return extended # Apply range extension to all symbols extended_all_symbols = [extend_symbol_and_children(sym) for sym in all_symbols] extended_root_symbols = [extend_symbol_and_children(sym) for sym in root_symbols] return extended_all_symbols, extended_root_symbols @override def is_ignored_dirname(self, dirname: str) -> bool: # For Nix projects, we should ignore: # - result: nix build output symlinks # - result-*: multiple build outputs # - .direnv: direnv cache return super().is_ignored_dirname(dirname) or dirname in ["result", ".direnv"] or dirname.startswith("result-") @staticmethod def _get_nixd_version(): """Get the installed nixd version or None if not found.""" try: result = subprocess.run(["nixd", "--version"], capture_output=True, text=True, check=False) if result.returncode == 0: # nixd outputs version like: nixd 2.0.0 return result.stdout.strip() except FileNotFoundError: return None return None @staticmethod def _check_nixd_installed(): """Check if nixd is installed in the system.""" return shutil.which("nixd") is not None @staticmethod def _get_nixd_path(): """Get the path to nixd executable.""" # First check if it's in PATH nixd_path = shutil.which("nixd") if nixd_path: return nixd_path # Check common installation locations home = Path.home() possible_paths = [ home / ".local" / "bin" / "nixd", home / ".serena" / "language_servers" / "nixd" / "nixd", home / ".nix-profile" / "bin" / "nixd", Path("/usr/local/bin/nixd"), Path("/run/current-system/sw/bin/nixd"), # NixOS system profile Path("/opt/homebrew/bin/nixd"), # Homebrew on Apple Silicon Path("/usr/local/opt/nixd/bin/nixd"), # Homebrew on Intel Mac ] # Add Windows-specific paths if platform.system() == "Windows": possible_paths.extend( [ home / "AppData" / "Local" / "nixd" / "nixd.exe", home / ".serena" / "language_servers" / "nixd" / "nixd.exe", ] ) for path in possible_paths: if path.exists(): return str(path) return None @staticmethod def _install_nixd_with_nix(): """Install nixd using nix if available.""" # Check if nix is available if not shutil.which("nix"): return None print("Installing nixd using nix... This may take a few minutes.") try: # Try to install nixd using nix profile result = subprocess.run( ["nix", "profile", "install", "github:nix-community/nixd"], capture_output=True, text=True, check=False, timeout=600, # 10 minute timeout for building ) if result.returncode == 0: # Check if nixd is now in PATH nixd_path = shutil.which("nixd") if nixd_path: print(f"Successfully installed nixd at: {nixd_path}") return nixd_path else: # Try nix-env as fallback result = subprocess.run( ["nix-env", "-iA", "nixpkgs.nixd"], capture_output=True, text=True, check=False, timeout=600, ) if result.returncode == 0: nixd_path = shutil.which("nixd") if nixd_path: print(f"Successfully installed nixd at: {nixd_path}") return nixd_path print(f"Failed to install nixd: {result.stderr}") except subprocess.TimeoutExpired: print("Nix install timed out after 10 minutes") except Exception as e: print(f"Error installing nixd with nix: {e}") return None @staticmethod def _setup_runtime_dependency(): """ Check if required Nix runtime dependencies are available. Attempts to install nixd if not present. """ # First check if Nix is available (nixd needs it at runtime) if not shutil.which("nix"): print("WARNING: Nix is not installed. nixd requires Nix to function properly.") raise RuntimeError("Nix is required for nixd. Please install Nix from https://nixos.org/download.html") nixd_path = NixLanguageServer._get_nixd_path() if not nixd_path: print("nixd not found. Attempting to install...") # Try to install with nix if available nixd_path = NixLanguageServer._install_nixd_with_nix() if not nixd_path: raise RuntimeError( "nixd (Nix Language Server) is not installed.\n" "Please install nixd using one of the following methods:\n" " - Using Nix flakes: nix profile install github:nix-community/nixd\n" " - From nixpkgs: nix-env -iA nixpkgs.nixd\n" " - On macOS with Homebrew: brew install nixd\n\n" "After installation, make sure 'nixd' is in your PATH." ) # Verify nixd works try: result = subprocess.run([nixd_path, "--version"], capture_output=True, text=True, check=False, timeout=5) if result.returncode != 0: raise RuntimeError(f"nixd failed to run: {result.stderr}") except Exception as e: raise RuntimeError(f"Failed to verify nixd installation: {e}") return nixd_path def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): nixd_path = self._setup_runtime_dependency() super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd=nixd_path, cwd=repository_root_path), "nix", solidlsp_settings, ) self.server_ready = threading.Event() self.request_id = 0 @staticmethod def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: """ Returns the initialize params for nixd. """ root_uri = pathlib.Path(repository_absolute_path).as_uri() initialize_params = { "locale": "en", "capabilities": { "textDocument": { "synchronization": {"didSave": True, "dynamicRegistration": True}, "definition": {"dynamicRegistration": True}, "references": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "hierarchicalDocumentSymbolSupport": True, "symbolKind": {"valueSet": list(range(1, 27))}, }, "completion": { "dynamicRegistration": True, "completionItem": { "snippetSupport": True, "commitCharactersSupport": True, "documentationFormat": ["markdown", "plaintext"], "deprecatedSupport": True, "preselectSupport": True, }, }, "hover": { "dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"], }, "signatureHelp": { "dynamicRegistration": True, "signatureInformation": { "documentationFormat": ["markdown", "plaintext"], "parameterInformation": {"labelOffsetSupport": True}, }, }, "codeAction": { "dynamicRegistration": True, "codeActionLiteralSupport": { "codeActionKind": { "valueSet": [ "", "quickfix", "refactor", "refactor.extract", "refactor.inline", "refactor.rewrite", "source", "source.organizeImports", ] } }, }, "rename": {"dynamicRegistration": True, "prepareSupport": True}, }, "workspace": { "workspaceFolders": True, "didChangeConfiguration": {"dynamicRegistration": True}, "configuration": True, "symbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, }, }, }, "processId": os.getpid(), "rootPath": repository_absolute_path, "rootUri": root_uri, "workspaceFolders": [ { "uri": root_uri, "name": os.path.basename(repository_absolute_path), } ], "initializationOptions": { # nixd specific options "nixpkgs": {"expr": "import <nixpkgs> { }"}, "formatting": {"command": ["nixpkgs-fmt"]}, # or ["alejandra"] or ["nixfmt"] "options": { "enable": True, "target": { "installable": "", # Will be auto-detected from flake.nix if present }, }, }, } return initialize_params def _start_server(self): """Start nixd server process""" def register_capability_handler(params): return def window_log_message(msg): self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) def do_nothing(params): return self.server.on_request("client/registerCapability", register_capability_handler) self.server.on_notification("window/logMessage", window_log_message) self.server.on_notification("$/progress", do_nothing) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) self.logger.log("Starting nixd server process", logging.INFO) self.server.start() initialize_params = self._get_initialize_params(self.repository_root_path) self.logger.log( "Sending initialize request from LSP client to LSP server and awaiting response", logging.INFO, ) init_response = self.server.send.initialize(initialize_params) # Verify server capabilities assert "textDocumentSync" in init_response["capabilities"] assert "definitionProvider" in init_response["capabilities"] assert "documentSymbolProvider" in init_response["capabilities"] assert "referencesProvider" in init_response["capabilities"] self.server.notify.initialized({}) self.completions_available.set() # nixd server is typically ready immediately after initialization self.server_ready.set() self.server_ready.wait() ``` -------------------------------------------------------------------------------- /src/serena/mcp.py: -------------------------------------------------------------------------------- ```python """ The Serena Model Context Protocol (MCP) Server """ import sys from abc import abstractmethod from collections.abc import AsyncIterator, Iterator, Sequence from contextlib import asynccontextmanager from copy import deepcopy from dataclasses import dataclass from typing import Any, Literal, cast import docstring_parser from mcp.server.fastmcp import server from mcp.server.fastmcp.server import FastMCP, Settings from mcp.server.fastmcp.tools.base import Tool as MCPTool from pydantic_settings import SettingsConfigDict from sensai.util import logging from serena.agent import ( SerenaAgent, SerenaConfig, ) from serena.config.context_mode import SerenaAgentContext, SerenaAgentMode from serena.constants import DEFAULT_CONTEXT, DEFAULT_MODES, SERENA_LOG_FORMAT from serena.tools import Tool from serena.util.exception import show_fatal_exception_safe from serena.util.logging import MemoryLogHandler log = logging.getLogger(__name__) def configure_logging(*args, **kwargs) -> None: # type: ignore # We only do something here if logging has not yet been configured. # Normally, logging is configured in the MCP server startup script. if not logging.is_enabled(): logging.basicConfig(level=logging.INFO, stream=sys.stderr, format=SERENA_LOG_FORMAT) # patch the logging configuration function in fastmcp, because it's hard-coded and broken server.configure_logging = configure_logging # type: ignore @dataclass class SerenaMCPRequestContext: agent: SerenaAgent class SerenaMCPFactory: def __init__(self, context: str = DEFAULT_CONTEXT, project: str | None = None): """ :param context: The context name or path to context file :param project: Either an absolute path to the project directory or a name of an already registered project. If the project passed here hasn't been registered yet, it will be registered automatically and can be activated by its name afterward. """ self.context = SerenaAgentContext.load(context) self.project = project @staticmethod def _sanitize_for_openai_tools(schema: dict) -> dict: """ This method was written by GPT-5, I have not reviewed it in detail. Only called when `openai_tool_compatible` is True. Make a Pydantic/JSON Schema object compatible with OpenAI tool schema. - 'integer' -> 'number' (+ multipleOf: 1) - remove 'null' from union type arrays - coerce integer-only enums to number - best-effort simplify oneOf/anyOf when they only differ by integer/number """ s = deepcopy(schema) def walk(node): # type: ignore if not isinstance(node, dict): # lists get handled by parent calls return node # ---- handle type ---- t = node.get("type") if isinstance(t, str): if t == "integer": node["type"] = "number" # preserve existing multipleOf but ensure it's integer-like if "multipleOf" not in node: node["multipleOf"] = 1 elif isinstance(t, list): # remove 'null' (OpenAI tools don't support nullables) t2 = [x if x != "integer" else "number" for x in t if x != "null"] if not t2: # fall back to object if it somehow becomes empty t2 = ["object"] node["type"] = t2[0] if len(t2) == 1 else t2 if "integer" in t or "number" in t2: # if integers were present, keep integer-like restriction node.setdefault("multipleOf", 1) # ---- enums of integers -> number ---- if "enum" in node and isinstance(node["enum"], list): vals = node["enum"] if vals and all(isinstance(v, int) for v in vals): node.setdefault("type", "number") # keep them as ints; JSON 'number' covers ints node.setdefault("multipleOf", 1) # ---- simplify anyOf/oneOf if they only differ by integer/number ---- for key in ("oneOf", "anyOf"): if key in node and isinstance(node[key], list): # Special case: anyOf or oneOf with "type X" and "null" if len(node[key]) == 2: types = [sub.get("type") for sub in node[key]] if "null" in types: non_null_type = next(t for t in types if t != "null") if isinstance(non_null_type, str): node["type"] = non_null_type node.pop(key, None) continue simplified = [] changed = False for sub in node[key]: sub = walk(sub) # recurse simplified.append(sub) # If all subs are the same after integer→number, collapse try: import json canon = [json.dumps(x, sort_keys=True) for x in simplified] if len(set(canon)) == 1: # copy the single schema up only = simplified[0] node.pop(key, None) for k, v in only.items(): if k not in node: node[k] = v changed = True except Exception: pass if not changed: node[key] = simplified # ---- recurse into known schema containers ---- for child_key in ("properties", "patternProperties", "definitions", "$defs"): if child_key in node and isinstance(node[child_key], dict): for k, v in list(node[child_key].items()): node[child_key][k] = walk(v) # arrays/items if "items" in node: node["items"] = walk(node["items"]) # allOf/if/then/else - pass through with integer→number conversions applied inside for key in ("allOf",): if key in node and isinstance(node[key], list): node[key] = [walk(x) for x in node[key]] if "if" in node: node["if"] = walk(node["if"]) if "then" in node: node["then"] = walk(node["then"]) if "else" in node: node["else"] = walk(node["else"]) return node return walk(s) @staticmethod def make_mcp_tool(tool: Tool, openai_tool_compatible: bool = True) -> MCPTool: """ Create an MCP tool from a Serena Tool instance. :param tool: The Serena Tool instance to convert. :param openai_tool_compatible: whether to process the tool schema to be compatible with OpenAI tools (doesn't accept integer, needs number instead, etc.). This allows using Serena MCP within codex. """ func_name = tool.get_name() func_doc = tool.get_apply_docstring() or "" func_arg_metadata = tool.get_apply_fn_metadata() is_async = False parameters = func_arg_metadata.arg_model.model_json_schema() if openai_tool_compatible: parameters = SerenaMCPFactory._sanitize_for_openai_tools(parameters) docstring = docstring_parser.parse(func_doc) # Mount the tool description as a combination of the docstring description and # the return value description, if it exists. overridden_description = tool.agent.get_context().tool_description_overrides.get(func_name, None) if overridden_description is not None: func_doc = overridden_description elif docstring.description: func_doc = docstring.description else: func_doc = "" func_doc = func_doc.strip().strip(".") if func_doc: func_doc += "." if docstring.returns and (docstring_returns_descr := docstring.returns.description): # Only add a space before "Returns" if func_doc is not empty prefix = " " if func_doc else "" func_doc = f"{func_doc}{prefix}Returns {docstring_returns_descr.strip().strip('.')}." # Parse the parameter descriptions from the docstring and add pass its description # to the parameter schema. docstring_params = {param.arg_name: param for param in docstring.params} parameters_properties: dict[str, dict[str, Any]] = parameters["properties"] for parameter, properties in parameters_properties.items(): if (param_doc := docstring_params.get(parameter)) and param_doc.description: param_desc = f"{param_doc.description.strip().strip('.') + '.'}" properties["description"] = param_desc[0].upper() + param_desc[1:] def execute_fn(**kwargs) -> str: # type: ignore return tool.apply_ex(log_call=True, catch_exceptions=True, **kwargs) return MCPTool( fn=execute_fn, name=func_name, description=func_doc, parameters=parameters, fn_metadata=func_arg_metadata, is_async=is_async, context_kwarg=None, annotations=None, title=None, ) @abstractmethod def _iter_tools(self) -> Iterator[Tool]: pass # noinspection PyProtectedMember def _set_mcp_tools(self, mcp: FastMCP, openai_tool_compatible: bool = False) -> None: """Update the tools in the MCP server""" if mcp is not None: mcp._tool_manager._tools = {} for tool in self._iter_tools(): mcp_tool = self.make_mcp_tool(tool, openai_tool_compatible=openai_tool_compatible) mcp._tool_manager._tools[tool.get_name()] = mcp_tool log.info(f"Starting MCP server with {len(mcp._tool_manager._tools)} tools: {list(mcp._tool_manager._tools.keys())}") @abstractmethod def _instantiate_agent(self, serena_config: SerenaConfig, modes: list[SerenaAgentMode]) -> None: pass def create_mcp_server( self, host: str = "0.0.0.0", port: int = 8000, modes: Sequence[str] = DEFAULT_MODES, enable_web_dashboard: bool | None = None, enable_gui_log_window: bool | None = None, log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] | None = None, trace_lsp_communication: bool | None = None, tool_timeout: float | None = None, ) -> FastMCP: """ Create an MCP server with process-isolated SerenaAgent to prevent asyncio contamination. :param host: The host to bind to :param port: The port to bind to :param modes: List of mode names or paths to mode files :param enable_web_dashboard: Whether to enable the web dashboard. If not specified, will take the value from the serena configuration. :param enable_gui_log_window: Whether to enable the GUI log window. It currently does not work on macOS, and setting this to True will be ignored then. If not specified, will take the value from the serena configuration. :param log_level: Log level. If not specified, will take the value from the serena configuration. :param trace_lsp_communication: Whether to trace the communication between Serena and the language servers. This is useful for debugging language server issues. :param tool_timeout: Timeout in seconds for tool execution. If not specified, will take the value from the serena configuration. """ try: config = SerenaConfig.from_config_file() # update configuration with the provided parameters if enable_web_dashboard is not None: config.web_dashboard = enable_web_dashboard if enable_gui_log_window is not None: config.gui_log_window_enabled = enable_gui_log_window if log_level is not None: log_level = cast(Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], log_level.upper()) config.log_level = logging.getLevelNamesMapping()[log_level] if trace_lsp_communication is not None: config.trace_lsp_communication = trace_lsp_communication if tool_timeout is not None: config.tool_timeout = tool_timeout modes_instances = [SerenaAgentMode.load(mode) for mode in modes] self._instantiate_agent(config, modes_instances) except Exception as e: show_fatal_exception_safe(e) raise # Override model_config to disable the use of `.env` files for reading settings, because user projects are likely to contain # `.env` files (e.g. containing LOG_LEVEL) that are not supposed to override the MCP settings; # retain only FASTMCP_ prefix for already set environment variables. Settings.model_config = SettingsConfigDict(env_prefix="FASTMCP_") instructions = self._get_initial_instructions() mcp = FastMCP(lifespan=self.server_lifespan, host=host, port=port, instructions=instructions) return mcp @asynccontextmanager @abstractmethod async def server_lifespan(self, mcp_server: FastMCP) -> AsyncIterator[None]: """Manage server startup and shutdown lifecycle.""" yield None # ensures MyPy understands we yield None @abstractmethod def _get_initial_instructions(self) -> str: pass class SerenaMCPFactorySingleProcess(SerenaMCPFactory): """ MCP server factory where the SerenaAgent and its language server run in the same process as the MCP server """ def __init__(self, context: str = DEFAULT_CONTEXT, project: str | None = None, memory_log_handler: MemoryLogHandler | None = None): """ :param context: The context name or path to context file :param project: Either an absolute path to the project directory or a name of an already registered project. If the project passed here hasn't been registered yet, it will be registered automatically and can be activated by its name afterward. """ super().__init__(context=context, project=project) self.agent: SerenaAgent | None = None self.memory_log_handler = memory_log_handler def _instantiate_agent(self, serena_config: SerenaConfig, modes: list[SerenaAgentMode]) -> None: self.agent = SerenaAgent( project=self.project, serena_config=serena_config, context=self.context, modes=modes, memory_log_handler=self.memory_log_handler ) def _iter_tools(self) -> Iterator[Tool]: assert self.agent is not None yield from self.agent.get_exposed_tool_instances() def _get_initial_instructions(self) -> str: assert self.agent is not None # we don't use the tool (which at the time of writing calls this method), since the tool may be disabled by the config return self.agent.create_system_prompt() @asynccontextmanager async def server_lifespan(self, mcp_server: FastMCP) -> AsyncIterator[None]: openai_tool_compatible = self.context.name in ["chatgpt", "codex", "oaicompat-agent"] self._set_mcp_tools(mcp_server, openai_tool_compatible=openai_tool_compatible) log.info("MCP server lifetime setup complete") yield ``` -------------------------------------------------------------------------------- /src/serena/text_utils.py: -------------------------------------------------------------------------------- ```python import fnmatch import logging import os import re from collections.abc import Callable from dataclasses import dataclass, field from enum import StrEnum from typing import Any, Self from joblib import Parallel, delayed log = logging.getLogger(__name__) class LineType(StrEnum): """Enum for different types of lines in search results.""" MATCH = "match" """Part of the matched lines""" BEFORE_MATCH = "prefix" """Lines before the match""" AFTER_MATCH = "postfix" """Lines after the match""" @dataclass(kw_only=True) class TextLine: """Represents a line of text with information on how it relates to the match.""" line_number: int line_content: str match_type: LineType """Represents the type of line (match, prefix, postfix)""" def get_display_prefix(self) -> str: """Get the display prefix for this line based on the match type.""" if self.match_type == LineType.MATCH: return " >" return "..." def format_line(self, include_line_numbers: bool = True) -> str: """Format the line for display (e.g.,for logging or passing to an LLM). :param include_line_numbers: Whether to include the line number in the result. """ prefix = self.get_display_prefix() if include_line_numbers: line_num = str(self.line_number).rjust(4) prefix = f"{prefix}{line_num}" return f"{prefix}:{self.line_content}" @dataclass(kw_only=True) class MatchedConsecutiveLines: """Represents a collection of consecutive lines found through some criterion in a text file or a string. May include lines before, after, and matched. """ lines: list[TextLine] """All lines in the context of the match. At least one of them is of `match_type` `MATCH`.""" source_file_path: str | None = None """Path to the file where the match was found (Metadata).""" # set in post-init lines_before_matched: list[TextLine] = field(default_factory=list) matched_lines: list[TextLine] = field(default_factory=list) lines_after_matched: list[TextLine] = field(default_factory=list) def __post_init__(self) -> None: for line in self.lines: if line.match_type == LineType.BEFORE_MATCH: self.lines_before_matched.append(line) elif line.match_type == LineType.MATCH: self.matched_lines.append(line) elif line.match_type == LineType.AFTER_MATCH: self.lines_after_matched.append(line) assert len(self.matched_lines) > 0, "At least one matched line is required" @property def start_line(self) -> int: return self.lines[0].line_number @property def end_line(self) -> int: return self.lines[-1].line_number @property def num_matched_lines(self) -> int: return len(self.matched_lines) def to_display_string(self, include_line_numbers: bool = True) -> str: return "\n".join([line.format_line(include_line_numbers) for line in self.lines]) @classmethod def from_file_contents( cls, file_contents: str, line: int, context_lines_before: int = 0, context_lines_after: int = 0, source_file_path: str | None = None ) -> Self: line_contents = file_contents.split("\n") start_lineno = max(0, line - context_lines_before) end_lineno = min(len(line_contents) - 1, line + context_lines_after) text_lines: list[TextLine] = [] # before the line for lineno in range(start_lineno, line): text_lines.append(TextLine(line_number=lineno, line_content=line_contents[lineno], match_type=LineType.BEFORE_MATCH)) # the line text_lines.append(TextLine(line_number=line, line_content=line_contents[line], match_type=LineType.MATCH)) # after the line for lineno in range(line + 1, end_lineno + 1): text_lines.append(TextLine(line_number=lineno, line_content=line_contents[lineno], match_type=LineType.AFTER_MATCH)) return cls(lines=text_lines, source_file_path=source_file_path) def glob_to_regex(glob_pat: str) -> str: regex_parts: list[str] = [] i = 0 while i < len(glob_pat): ch = glob_pat[i] if ch == "*": regex_parts.append(".*") elif ch == "?": regex_parts.append(".") elif ch == "\\": i += 1 if i < len(glob_pat): regex_parts.append(re.escape(glob_pat[i])) else: regex_parts.append("\\") else: regex_parts.append(re.escape(ch)) i += 1 return "".join(regex_parts) def search_text( pattern: str, content: str | None = None, source_file_path: str | None = None, allow_multiline_match: bool = False, context_lines_before: int = 0, context_lines_after: int = 0, is_glob: bool = False, ) -> list[MatchedConsecutiveLines]: """ Search for a pattern in text content. Supports both regex and glob-like patterns. :param pattern: Pattern to search for (regex or glob-like pattern) :param content: The text content to search. May be None if source_file_path is provided. :param source_file_path: Optional path to the source file. If content is None, this has to be passed and the file will be read. :param allow_multiline_match: Whether to search across multiple lines. Currently, the default option (False) is very inefficient, so it is recommended to set this to True. :param context_lines_before: Number of context lines to include before matches :param context_lines_after: Number of context lines to include after matches :param is_glob: If True, pattern is treated as a glob-like pattern (e.g., "*.py", "test_??.py") and will be converted to regex internally :return: List of `TextSearchMatch` objects :raises: ValueError if the pattern is not valid """ if source_file_path and content is None: with open(source_file_path) as f: content = f.read() if content is None: raise ValueError("Pass either content or source_file_path") matches = [] lines = content.splitlines() total_lines = len(lines) # Convert pattern to a compiled regex if it's a string if is_glob: pattern = glob_to_regex(pattern) if allow_multiline_match: # For multiline matches, we need to use the DOTALL flag to make '.' match newlines compiled_pattern = re.compile(pattern, re.DOTALL) # Search across the entire content as a single string for match in compiled_pattern.finditer(content): start_pos = match.start() end_pos = match.end() # Find the line numbers for the start and end positions start_line_num = content[:start_pos].count("\n") + 1 end_line_num = content[:end_pos].count("\n") + 1 # Calculate the range of lines to include in the context context_start = max(1, start_line_num - context_lines_before) context_end = min(total_lines, end_line_num + context_lines_after) # Create TextLine objects for the context context_lines = [] for i in range(context_start - 1, context_end): line_num = i + 1 if context_start <= line_num < start_line_num: match_type = LineType.BEFORE_MATCH elif end_line_num < line_num <= context_end: match_type = LineType.AFTER_MATCH else: match_type = LineType.MATCH context_lines.append(TextLine(line_number=line_num, line_content=lines[i], match_type=match_type)) matches.append(MatchedConsecutiveLines(lines=context_lines, source_file_path=source_file_path)) else: # TODO: extremely inefficient! Since we currently don't use this option in SerenaAgent or LanguageServer, # it is not urgent to fix, but should be either improved or the option should be removed. # Search line by line, normal compile without DOTALL compiled_pattern = re.compile(pattern) for i, line in enumerate(lines): line_num = i + 1 if compiled_pattern.search(line): # Calculate the range of lines to include in the context context_start = max(0, i - context_lines_before) context_end = min(total_lines - 1, i + context_lines_after) # Create TextLine objects for the context context_lines = [] for j in range(context_start, context_end + 1): context_line_num = j + 1 if j < i: match_type = LineType.BEFORE_MATCH elif j > i: match_type = LineType.AFTER_MATCH else: match_type = LineType.MATCH context_lines.append(TextLine(line_number=context_line_num, line_content=lines[j], match_type=match_type)) matches.append(MatchedConsecutiveLines(lines=context_lines, source_file_path=source_file_path)) return matches def default_file_reader(file_path: str) -> str: """Reads using utf-8 encoding.""" with open(file_path, encoding="utf-8") as f: return f.read() def expand_braces(pattern: str) -> list[str]: """ Expands brace patterns in a glob string. For example, "**/*.{js,jsx,ts,tsx}" becomes ["**/*.js", "**/*.jsx", "**/*.ts", "**/*.tsx"]. Handles multiple brace sets as well. """ patterns = [pattern] while any("{" in p for p in patterns): new_patterns = [] for p in patterns: match = re.search(r"\{([^{}]+)\}", p) if match: prefix = p[: match.start()] suffix = p[match.end() :] options = match.group(1).split(",") for option in options: new_patterns.append(f"{prefix}{option}{suffix}") else: new_patterns.append(p) patterns = new_patterns return patterns def glob_match(pattern: str, path: str) -> bool: """ Match a file path against a glob pattern. Supports standard glob patterns: - * matches any number of characters except / - ** matches any number of directories (zero or more) - ? matches a single character except / - [seq] matches any character in seq Supports brace expansion: - {a,b,c} expands to multiple patterns (including nesting) Unsupported patterns: - Bash extended glob features are unavailable in Python's fnmatch - Extended globs like !(), ?(), +(), *(), @() are not supported :param pattern: Glob pattern (e.g., 'src/**/*.py', '**agent.py') :param path: File path to match against :return: True if path matches pattern """ pattern = pattern.replace("\\", "/") # Normalize backslashes to forward slashes path = path.replace("\\", "/") # Normalize path backslashes to forward slashes # Handle ** patterns that should match zero or more directories if "**" in pattern: # Method 1: Standard fnmatch (matches one or more directories) regex1 = fnmatch.translate(pattern) if re.match(regex1, path): return True # Method 2: Handle zero-directory case by removing /** entirely # Convert "src/**/test.py" to "src/test.py" if "/**/" in pattern: zero_dir_pattern = pattern.replace("/**/", "/") regex2 = fnmatch.translate(zero_dir_pattern) if re.match(regex2, path): return True # Method 3: Handle leading ** case by removing **/ # Convert "**/test.py" to "test.py" if pattern.startswith("**/"): zero_dir_pattern = pattern[3:] # Remove "**/" regex3 = fnmatch.translate(zero_dir_pattern) if re.match(regex3, path): return True return False else: # Simple pattern without **, use fnmatch directly return fnmatch.fnmatch(path, pattern) def search_files( relative_file_paths: list[str], pattern: str, root_path: str = "", file_reader: Callable[[str], str] = default_file_reader, context_lines_before: int = 0, context_lines_after: int = 0, paths_include_glob: str | None = None, paths_exclude_glob: str | None = None, ) -> list[MatchedConsecutiveLines]: """ Search for a pattern in a list of files. :param relative_file_paths: List of relative file paths in which to search :param pattern: Pattern to search for :param root_path: Root path to resolve relative paths against (by default, current working directory). :param file_reader: Function to read a file, by default will just use os.open. All files that can't be read by it will be skipped. :param context_lines_before: Number of context lines to include before matches :param context_lines_after: Number of context lines to include after matches :param paths_include_glob: Optional glob pattern to include files from the list :param paths_exclude_glob: Optional glob pattern to exclude files from the list :return: List of MatchedConsecutiveLines objects """ # Pre-filter paths (done sequentially to avoid overhead) # Use proper glob matching instead of gitignore patterns include_patterns = expand_braces(paths_include_glob) if paths_include_glob else None exclude_patterns = expand_braces(paths_exclude_glob) if paths_exclude_glob else None filtered_paths = [] for path in relative_file_paths: if include_patterns: if not any(glob_match(p, path) for p in include_patterns): log.debug(f"Skipping {path}: does not match include pattern {paths_include_glob}") continue if exclude_patterns: if any(glob_match(p, path) for p in exclude_patterns): log.debug(f"Skipping {path}: matches exclude pattern {paths_exclude_glob}") continue filtered_paths.append(path) log.info(f"Processing {len(filtered_paths)} files.") def process_single_file(path: str) -> dict[str, Any]: """Process a single file - this function will be parallelized.""" try: abs_path = os.path.join(root_path, path) file_content = file_reader(abs_path) search_results = search_text( pattern, content=file_content, source_file_path=path, allow_multiline_match=True, context_lines_before=context_lines_before, context_lines_after=context_lines_after, ) if len(search_results) > 0: log.debug(f"Found {len(search_results)} matches in {path}") return {"path": path, "results": search_results, "error": None} except Exception as e: log.debug(f"Error processing {path}: {e}") return {"path": path, "results": [], "error": str(e)} # Execute in parallel using joblib results = Parallel( n_jobs=-1, backend="threading", )(delayed(process_single_file)(path) for path in filtered_paths) # Collect results and errors matches = [] skipped_file_error_tuples = [] for result in results: if result["error"]: skipped_file_error_tuples.append((result["path"], result["error"])) else: matches.extend(result["results"]) if skipped_file_error_tuples: log.debug(f"Failed to read {len(skipped_file_error_tuples)} files: {skipped_file_error_tuples}") log.info(f"Found {len(matches)} total matches across {len(filtered_paths)} files") return matches ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/elixir_tools/elixir_tools.py: -------------------------------------------------------------------------------- ```python import logging import os import pathlib import stat import subprocess import threading import time from overrides import override from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.ls_utils import FileUtils, PlatformId, PlatformUtils from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings from ..common import RuntimeDependency class ElixirTools(SolidLanguageServer): """ Provides Elixir specific instantiation of the LanguageServer class using Next LS from elixir-tools. """ @override def is_ignored_dirname(self, dirname: str) -> bool: # For Elixir projects, we should ignore: # - _build: compiled artifacts # - deps: dependencies # - node_modules: if the project has JavaScript components # - .elixir_ls: ElixirLS artifacts (in case both are present) # - cover: coverage reports return super().is_ignored_dirname(dirname) or dirname in ["_build", "deps", "node_modules", ".elixir_ls", "cover"] def _is_next_ls_internal_file(self, abs_path: str) -> bool: """Check if an absolute path is a Next LS internal file that should be ignored.""" return any( pattern in abs_path for pattern in [ ".burrito", # Next LS runtime directory "next_ls_erts-", # Next LS Erlang runtime "_next_ls_private_", # Next LS private files "/priv/monkey/", # Next LS monkey patching directory ] ) @override def _send_references_request(self, relative_file_path: str, line: int, column: int): """Override to filter out Next LS internal files from references.""" from solidlsp.ls_utils import PathUtils # Get the raw response from the parent implementation raw_response = super()._send_references_request(relative_file_path, line, column) if raw_response is None: return None # Filter out Next LS internal files filtered_response = [] for item in raw_response: if isinstance(item, dict) and "uri" in item: abs_path = PathUtils.uri_to_path(item["uri"]) if self._is_next_ls_internal_file(abs_path): self.logger.log(f"Filtering out Next LS internal file: {abs_path}", logging.DEBUG) continue filtered_response.append(item) return filtered_response @classmethod def _get_elixir_version(cls): """Get the installed Elixir version or None if not found.""" try: result = subprocess.run(["elixir", "--version"], capture_output=True, text=True, check=False) if result.returncode == 0: return result.stdout.strip() except FileNotFoundError: return None return None @classmethod def _setup_runtime_dependencies( cls, logger: LanguageServerLogger, config: LanguageServerConfig, solidlsp_settings: SolidLSPSettings ) -> str: """ Setup runtime dependencies for Next LS. Downloads the Next LS binary for the current platform and returns the path to the executable. """ # Check if Elixir is available first elixir_version = cls._get_elixir_version() if not elixir_version: raise RuntimeError( "Elixir is not installed. Please install Elixir from https://elixir-lang.org/install.html and make sure it is added to your PATH." ) logger.log(f"Found Elixir: {elixir_version}", logging.INFO) platform_id = PlatformUtils.get_platform_id() # Check for Windows and provide a helpful error message if platform_id.value.startswith("win"): raise RuntimeError( "Windows is not supported by Next LS. The Next LS project does not provide Windows binaries. " "Consider using Windows Subsystem for Linux (WSL) or a virtual machine with Linux/macOS." ) valid_platforms = [ PlatformId.LINUX_x64, PlatformId.LINUX_arm64, PlatformId.OSX_x64, PlatformId.OSX_arm64, ] assert platform_id in valid_platforms, f"Platform {platform_id} is not supported for Next LS at the moment" next_ls_dir = os.path.join(cls.ls_resources_dir(solidlsp_settings), "next-ls") NEXTLS_VERSION = "v0.23.4" # Define runtime dependencies inline runtime_deps = { PlatformId.LINUX_x64: RuntimeDependency( id="next_ls_linux_amd64", platform_id="linux-x64", url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_linux_amd64", archive_type="binary", binary_name="next_ls_linux_amd64", extract_path="next_ls", ), PlatformId.LINUX_arm64: RuntimeDependency( id="next_ls_linux_arm64", platform_id="linux-arm64", url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_linux_arm64", archive_type="binary", binary_name="next_ls_linux_arm64", extract_path="next_ls", ), PlatformId.OSX_x64: RuntimeDependency( id="next_ls_darwin_amd64", platform_id="osx-x64", url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_darwin_amd64", archive_type="binary", binary_name="next_ls_darwin_amd64", extract_path="next_ls", ), PlatformId.OSX_arm64: RuntimeDependency( id="next_ls_darwin_arm64", platform_id="osx-arm64", url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_darwin_arm64", archive_type="binary", binary_name="next_ls_darwin_arm64", extract_path="next_ls", ), } dependency = runtime_deps[platform_id] executable_path = os.path.join(next_ls_dir, "nextls") binary_path = os.path.join(next_ls_dir, dependency.binary_name) if not os.path.exists(executable_path): logger.log(f"Downloading Next LS binary from {dependency.url}", logging.INFO) FileUtils.download_file(logger, dependency.url, binary_path) # Make the binary executable on Unix-like systems os.chmod(binary_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) # Create a symlink with the expected name if binary_path != executable_path: if os.path.exists(executable_path): os.remove(executable_path) os.symlink(os.path.basename(binary_path), executable_path) assert os.path.exists(executable_path), f"Next LS executable not found at {executable_path}" logger.log(f"Next LS binary ready at: {executable_path}", logging.INFO) return executable_path def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): nextls_executable_path = self._setup_runtime_dependencies(logger, config, solidlsp_settings) super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd=f'"{nextls_executable_path}" --stdio', cwd=repository_root_path), "elixir", solidlsp_settings, ) self.server_ready = threading.Event() self.request_id = 0 # Set generous timeout for Next LS which can be slow to initialize and respond self.set_request_timeout(180.0) # 60 seconds for all environments @staticmethod def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: """ Returns the initialize params for the Next LS Language Server. """ root_uri = pathlib.Path(repository_absolute_path).as_uri() initialize_params = { "processId": os.getpid(), "locale": "en", "rootPath": repository_absolute_path, "rootUri": root_uri, "initializationOptions": { "mix_env": "dev", "mix_target": "host", "experimental": {"completions": {"enable": False}}, "extensions": {"credo": {"enable": True, "cli_options": []}}, }, "capabilities": { "textDocument": { "synchronization": {"didSave": True, "dynamicRegistration": True}, "completion": { "dynamicRegistration": True, "completionItem": {"snippetSupport": True, "documentationFormat": ["markdown", "plaintext"]}, }, "definition": {"dynamicRegistration": True}, "references": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "hierarchicalDocumentSymbolSupport": True, "symbolKind": {"valueSet": list(range(1, 27))}, }, "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, "formatting": {"dynamicRegistration": True}, "codeAction": { "dynamicRegistration": True, "codeActionLiteralSupport": { "codeActionKind": { "valueSet": [ "quickfix", "refactor", "refactor.extract", "refactor.inline", "refactor.rewrite", "source", "source.organizeImports", ] } }, }, }, "workspace": { "workspaceFolders": True, "didChangeConfiguration": {"dynamicRegistration": True}, "executeCommand": {"dynamicRegistration": True}, }, }, "workspaceFolders": [{"uri": root_uri, "name": os.path.basename(repository_absolute_path)}], } return initialize_params def _start_server(self): """Start Next LS server process""" def register_capability_handler(params): return def window_log_message(msg): """Handle window/logMessage notifications from Next LS""" message_text = msg.get("message", "") self.logger.log(f"LSP: window/logMessage: {message_text}", logging.INFO) # Check for the specific Next LS readiness signal # Based on Next LS source: "Runtime for folder #{name} is ready..." if "Runtime for folder" in message_text and "is ready..." in message_text: self.logger.log("Next LS runtime is ready based on official log message", logging.INFO) self.server_ready.set() def do_nothing(params): return def check_server_ready(params): """ Handle $/progress notifications from Next LS. Keep as fallback for error detection, but primary readiness detection is now done via window/logMessage handler. """ value = params.get("value", {}) # Check for initialization completion progress (fallback signal) if value.get("kind") == "end": message = value.get("message", "") if "has initialized!" in message: self.logger.log("Next LS initialization progress completed", logging.INFO) # Note: We don't set server_ready here - we wait for the log message def work_done_progress(params): """ Handle $/workDoneProgress notifications from Next LS. Keep for completeness but primary readiness detection is via window/logMessage. """ value = params.get("value", {}) if value.get("kind") == "end": self.logger.log("Next LS work done progress completed", logging.INFO) # Note: We don't set server_ready here - we wait for the log message self.server.on_request("client/registerCapability", register_capability_handler) self.server.on_notification("window/logMessage", window_log_message) self.server.on_notification("$/progress", check_server_ready) self.server.on_notification("window/workDoneProgress/create", do_nothing) self.server.on_notification("$/workDoneProgress", work_done_progress) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) self.logger.log("Starting Next LS server process", logging.INFO) self.server.start() initialize_params = self._get_initialize_params(self.repository_root_path) self.logger.log( "Sending initialize request from LSP client to LSP server and awaiting response", logging.INFO, ) init_response = self.server.send.initialize(initialize_params) # Verify server capabilities - be more lenient with Next LS self.logger.log(f"Next LS capabilities: {list(init_response['capabilities'].keys())}", logging.INFO) # Next LS may not provide all capabilities immediately, so we check for basic ones assert "textDocumentSync" in init_response["capabilities"], f"Missing textDocumentSync in {init_response['capabilities']}" # Some capabilities might be optional or provided later if "completionProvider" not in init_response["capabilities"]: self.logger.log("Warning: completionProvider not available in initial capabilities", logging.WARNING) if "definitionProvider" not in init_response["capabilities"]: self.logger.log("Warning: definitionProvider not available in initial capabilities", logging.WARNING) self.server.notify.initialized({}) self.completions_available.set() # Wait for Next LS to send the specific "Runtime for folder X is ready..." log message # This is the authoritative signal that Next LS is truly ready for requests ready_timeout = 180.0 self.logger.log(f"Waiting up to {ready_timeout} seconds for Next LS runtime readiness...", logging.INFO) if self.server_ready.wait(timeout=ready_timeout): self.logger.log("Next LS is ready and available for requests", logging.INFO) # Add a small settling period to ensure background indexing is complete # Next LS often continues compilation/indexing in background after ready signal settling_time = 120.0 self.logger.log(f"Allowing {settling_time} seconds for Next LS background indexing to complete...", logging.INFO) time.sleep(settling_time) self.logger.log("Next LS settling period complete", logging.INFO) else: error_msg = f"Next LS failed to initialize within {ready_timeout} seconds. This may indicate a problem with the Elixir installation, project compilation, or Next LS itself." self.logger.log(error_msg, logging.ERROR) raise RuntimeError(error_msg) ``` -------------------------------------------------------------------------------- /src/serena/tools/tools_base.py: -------------------------------------------------------------------------------- ```python import inspect import os from abc import ABC from collections.abc import Iterable from dataclasses import dataclass from types import TracebackType from typing import TYPE_CHECKING, Any, Protocol, Self, TypeVar from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, func_metadata from sensai.util import logging from sensai.util.string import dict_string from serena.project import Project from serena.prompt_factory import PromptFactory from serena.symbol import LanguageServerSymbolRetriever from serena.util.class_decorators import singleton from serena.util.inspection import iter_subclasses from solidlsp.ls_exceptions import SolidLSPException if TYPE_CHECKING: from serena.agent import MemoriesManager, SerenaAgent from serena.code_editor import CodeEditor log = logging.getLogger(__name__) T = TypeVar("T") SUCCESS_RESULT = "OK" class Component(ABC): def __init__(self, agent: "SerenaAgent"): self.agent = agent def get_project_root(self) -> str: """ :return: the root directory of the active project, raises a ValueError if no active project configuration is set """ return self.agent.get_project_root() @property def prompt_factory(self) -> PromptFactory: return self.agent.prompt_factory @property def memories_manager(self) -> "MemoriesManager": assert self.agent.memories_manager is not None return self.agent.memories_manager def create_language_server_symbol_retriever(self) -> LanguageServerSymbolRetriever: if not self.agent.is_using_language_server(): raise Exception("Cannot create LanguageServerSymbolRetriever; agent is not in language server mode.") language_server = self.agent.language_server assert language_server is not None return LanguageServerSymbolRetriever(language_server, agent=self.agent) @property def project(self) -> Project: return self.agent.get_active_project_or_raise() def create_code_editor(self) -> "CodeEditor": from ..code_editor import JetBrainsCodeEditor, LanguageServerCodeEditor if self.agent.is_using_language_server(): return LanguageServerCodeEditor(self.create_language_server_symbol_retriever(), agent=self.agent) else: return JetBrainsCodeEditor(project=self.project, agent=self.agent) class ToolMarker: """ Base class for tool markers. """ class ToolMarkerCanEdit(ToolMarker): """ Marker class for all tools that can perform editing operations on files. """ class ToolMarkerDoesNotRequireActiveProject(ToolMarker): pass class ToolMarkerOptional(ToolMarker): """ Marker class for optional tools that are disabled by default. """ class ToolMarkerSymbolicRead(ToolMarker): """ Marker class for tools that perform symbol read operations. """ class ToolMarkerSymbolicEdit(ToolMarkerCanEdit): """ Marker class for tools that perform symbolic edit operations. """ class ApplyMethodProtocol(Protocol): """Callable protocol for the apply method of a tool.""" def __call__(self, *args: Any, **kwargs: Any) -> str: pass class Tool(Component): # NOTE: each tool should implement the apply method, which is then used in # the central method of the Tool class `apply_ex`. # Failure to do so will result in a RuntimeError at tool execution time. # The apply method is not declared as part of the base Tool interface since we cannot # know the signature of the (input parameters of the) method in advance. # # The docstring and types of the apply method are used to generate the tool description # (which is use by the LLM, so a good description is important) # and to validate the tool call arguments. @classmethod def get_name_from_cls(cls) -> str: name = cls.__name__ if name.endswith("Tool"): name = name[:-4] # convert to snake_case name = "".join(["_" + c.lower() if c.isupper() else c for c in name]).lstrip("_") return name def get_name(self) -> str: return self.get_name_from_cls() def get_apply_fn(self) -> ApplyMethodProtocol: apply_fn = getattr(self, "apply") if apply_fn is None: raise RuntimeError(f"apply not defined in {self}. Did you forget to implement it?") return apply_fn @classmethod def can_edit(cls) -> bool: """ Returns whether this tool can perform editing operations on code. :return: True if the tool can edit code, False otherwise """ return issubclass(cls, ToolMarkerCanEdit) @classmethod def get_tool_description(cls) -> str: docstring = cls.__doc__ if docstring is None: return "" return docstring.strip() @classmethod def get_apply_docstring_from_cls(cls) -> str: """Get the docstring for the apply method from the class (static metadata). Needed for creating MCP tools in a separate process without running into serialization issues. """ # First try to get from __dict__ to handle dynamic docstring changes if "apply" in cls.__dict__: apply_fn = cls.__dict__["apply"] else: # Fall back to getattr for inherited methods apply_fn = getattr(cls, "apply", None) if apply_fn is None: raise AttributeError(f"apply method not defined in {cls}. Did you forget to implement it?") docstring = apply_fn.__doc__ if not docstring: raise AttributeError(f"apply method has no (or empty) docstring in {cls}. Did you forget to implement it?") return docstring.strip() def get_apply_docstring(self) -> str: """Gets the docstring for the tool application, used by the MCP server.""" return self.get_apply_docstring_from_cls() def get_apply_fn_metadata(self) -> FuncMetadata: """Gets the metadata for the tool application function, used by the MCP server.""" return self.get_apply_fn_metadata_from_cls() @classmethod def get_apply_fn_metadata_from_cls(cls) -> FuncMetadata: """Get the metadata for the apply method from the class (static metadata). Needed for creating MCP tools in a separate process without running into serialization issues. """ # First try to get from __dict__ to handle dynamic docstring changes if "apply" in cls.__dict__: apply_fn = cls.__dict__["apply"] else: # Fall back to getattr for inherited methods apply_fn = getattr(cls, "apply", None) if apply_fn is None: raise AttributeError(f"apply method not defined in {cls}. Did you forget to implement it?") return func_metadata(apply_fn, skip_names=["self", "cls"]) def _log_tool_application(self, frame: Any) -> None: params = {} ignored_params = {"self", "log_call", "catch_exceptions", "args", "apply_fn"} for param, value in frame.f_locals.items(): if param in ignored_params: continue if param == "kwargs": params.update(value) else: params[param] = value log.info(f"{self.get_name_from_cls()}: {dict_string(params)}") def _limit_length(self, result: str, max_answer_chars: int) -> str: if max_answer_chars == -1: max_answer_chars = self.agent.serena_config.default_max_tool_answer_chars if max_answer_chars <= 0: raise ValueError(f"Must be positive or the default (-1), got: {max_answer_chars=}") if (n_chars := len(result)) > max_answer_chars: result = ( f"The answer is too long ({n_chars} characters). " + "Please try a more specific tool query or raise the max_answer_chars parameter." ) return result def is_active(self) -> bool: return self.agent.tool_is_active(self.__class__) def apply_ex(self, log_call: bool = True, catch_exceptions: bool = True, **kwargs) -> str: # type: ignore """ Applies the tool with logging and exception handling, using the given keyword arguments """ def task() -> str: apply_fn = self.get_apply_fn() try: if not self.is_active(): return f"Error: Tool '{self.get_name_from_cls()}' is not active. Active tools: {self.agent.get_active_tool_names()}" except Exception as e: return f"RuntimeError while checking if tool {self.get_name_from_cls()} is active: {e}" if log_call: self._log_tool_application(inspect.currentframe()) try: # check whether the tool requires an active project and language server if not isinstance(self, ToolMarkerDoesNotRequireActiveProject): if self.agent._active_project is None: return ( "Error: No active project. Ask the user to provide the project path or to select a project from this list of known projects: " + f"{self.agent.serena_config.project_names}" ) if self.agent.is_using_language_server() and not self.agent.is_language_server_running(): log.info("Language server is not running. Starting it ...") self.agent.reset_language_server() # apply the actual tool try: result = apply_fn(**kwargs) except SolidLSPException as e: if e.is_language_server_terminated(): log.error(f"Language server terminated while executing tool ({e}). Restarting the language server and retrying ...") self.agent.reset_language_server() result = apply_fn(**kwargs) else: raise # record tool usage self.agent.record_tool_usage_if_enabled(kwargs, result, self) except Exception as e: if not catch_exceptions: raise msg = f"Error executing tool: {e}" log.error(f"Error executing tool: {e}", exc_info=e) result = msg if log_call: log.info(f"Result: {result}") try: if self.agent.language_server is not None: self.agent.language_server.save_cache() except Exception as e: log.error(f"Error saving language server cache: {e}") return result future = self.agent.issue_task(task, name=self.__class__.__name__) return future.result(timeout=self.agent.serena_config.tool_timeout) class EditedFileContext: """ Context manager for file editing. Create the context, then use `set_updated_content` to set the new content, the original content being provided in `original_content`. When exiting the context without an exception, the updated content will be written back to the file. """ def __init__(self, relative_path: str, agent: "SerenaAgent"): self._project = agent.get_active_project() assert self._project is not None self._abs_path = os.path.join(self._project.project_root, relative_path) if not os.path.isfile(self._abs_path): raise FileNotFoundError(f"File {self._abs_path} does not exist.") with open(self._abs_path, encoding=self._project.project_config.encoding) as f: self._original_content = f.read() self._updated_content: str | None = None def __enter__(self) -> Self: return self def get_original_content(self) -> str: """ :return: the original content of the file before any modifications. """ return self._original_content def set_updated_content(self, content: str) -> None: """ Sets the updated content of the file, which will be written back to the file when the context is exited without an exception. :param content: the updated content of the file """ self._updated_content = content def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> None: if self._updated_content is not None and exc_type is None: assert self._project is not None with open(self._abs_path, "w", encoding=self._project.project_config.encoding) as f: f.write(self._updated_content) log.info(f"Updated content written to {self._abs_path}") # Language servers should automatically detect the change and update its state accordingly. # If they do not, we may have to add a call to notify it. @dataclass(kw_only=True) class RegisteredTool: tool_class: type[Tool] is_optional: bool tool_name: str @singleton class ToolRegistry: def __init__(self) -> None: self._tool_dict: dict[str, RegisteredTool] = {} for cls in iter_subclasses(Tool): if not cls.__module__.startswith("serena.tools"): continue is_optional = issubclass(cls, ToolMarkerOptional) name = cls.get_name_from_cls() if name in self._tool_dict: raise ValueError(f"Duplicate tool name found: {name}. Tool classes must have unique names.") self._tool_dict[name] = RegisteredTool(tool_class=cls, is_optional=is_optional, tool_name=name) def get_tool_class_by_name(self, tool_name: str) -> type[Tool]: return self._tool_dict[tool_name].tool_class def get_all_tool_classes(self) -> list[type[Tool]]: return list(t.tool_class for t in self._tool_dict.values()) def get_tool_classes_default_enabled(self) -> list[type[Tool]]: """ :return: the list of tool classes that are enabled by default (i.e. non-optional tools). """ return [t.tool_class for t in self._tool_dict.values() if not t.is_optional] def get_tool_classes_optional(self) -> list[type[Tool]]: """ :return: the list of tool classes that are optional (i.e. disabled by default). """ return [t.tool_class for t in self._tool_dict.values() if t.is_optional] def get_tool_names_default_enabled(self) -> list[str]: """ :return: the list of tool names that are enabled by default (i.e. non-optional tools). """ return [t.tool_name for t in self._tool_dict.values() if not t.is_optional] def get_tool_names_optional(self) -> list[str]: """ :return: the list of tool names that are optional (i.e. disabled by default). """ return [t.tool_name for t in self._tool_dict.values() if t.is_optional] def get_tool_names(self) -> list[str]: """ :return: the list of all tool names. """ return list(self._tool_dict.keys()) def print_tool_overview( self, tools: Iterable[type[Tool] | Tool] | None = None, include_optional: bool = False, only_optional: bool = False ) -> None: """ Print a summary of the tools. If no tools are passed, a summary of the selection of tools (all, default or only optional) is printed. """ if tools is None: if only_optional: tools = self.get_tool_classes_optional() elif include_optional: tools = self.get_all_tool_classes() else: tools = self.get_tool_classes_default_enabled() tool_dict: dict[str, type[Tool] | Tool] = {} for tool_class in tools: tool_dict[tool_class.get_name_from_cls()] = tool_class for tool_name in sorted(tool_dict.keys()): tool_class = tool_dict[tool_name] print(f" * `{tool_name}`: {tool_class.get_tool_description().strip()}") def is_valid_tool_name(self, tool_name: str) -> bool: return tool_name in self._tool_dict ```