This is page 7 of 14. Use http://codebase.md/oraios/serena?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .devcontainer │ └── devcontainer.json ├── .dockerignore ├── .env.example ├── .github │ ├── FUNDING.yml │ ├── ISSUE_TEMPLATE │ │ ├── config.yml │ │ ├── feature_request.md │ │ └── issue--bug--performance-problem--question-.md │ └── workflows │ ├── codespell.yml │ ├── docker.yml │ ├── junie.yml │ ├── lint_and_docs.yaml │ ├── publish.yml │ └── pytest.yml ├── .gitignore ├── .serena │ ├── memories │ │ ├── adding_new_language_support_guide.md │ │ ├── serena_core_concepts_and_architecture.md │ │ ├── serena_repository_structure.md │ │ └── suggested_commands.md │ └── project.yml ├── .vscode │ └── settings.json ├── CHANGELOG.md ├── CLAUDE.md ├── compose.yaml ├── CONTRIBUTING.md ├── docker_build_and_run.sh ├── DOCKER.md ├── Dockerfile ├── docs │ ├── custom_agent.md │ └── serena_on_chatgpt.md ├── flake.lock ├── flake.nix ├── lessons_learned.md ├── LICENSE ├── llms-install.md ├── public │ └── .gitignore ├── pyproject.toml ├── README.md ├── resources │ ├── serena-icons.cdr │ ├── serena-logo-dark-mode.svg │ ├── serena-logo.cdr │ ├── serena-logo.svg │ └── vscode_sponsor_logo.png ├── roadmap.md ├── scripts │ ├── agno_agent.py │ ├── demo_run_tools.py │ ├── gen_prompt_factory.py │ ├── mcp_server.py │ ├── print_mode_context_options.py │ └── print_tool_overview.py ├── src │ ├── interprompt │ │ ├── __init__.py │ │ ├── .syncCommitId.remote │ │ ├── .syncCommitId.this │ │ ├── jinja_template.py │ │ ├── multilang_prompt.py │ │ ├── prompt_factory.py │ │ └── util │ │ ├── __init__.py │ │ └── class_decorators.py │ ├── README.md │ ├── serena │ │ ├── __init__.py │ │ ├── agent.py │ │ ├── agno.py │ │ ├── analytics.py │ │ ├── cli.py │ │ ├── code_editor.py │ │ ├── config │ │ │ ├── __init__.py │ │ │ ├── context_mode.py │ │ │ └── serena_config.py │ │ ├── constants.py │ │ ├── dashboard.py │ │ ├── generated │ │ │ └── generated_prompt_factory.py │ │ ├── gui_log_viewer.py │ │ ├── mcp.py │ │ ├── project.py │ │ ├── prompt_factory.py │ │ ├── resources │ │ │ ├── config │ │ │ │ ├── contexts │ │ │ │ │ ├── agent.yml │ │ │ │ │ ├── chatgpt.yml │ │ │ │ │ ├── codex.yml │ │ │ │ │ ├── context.template.yml │ │ │ │ │ ├── desktop-app.yml │ │ │ │ │ ├── ide-assistant.yml │ │ │ │ │ └── oaicompat-agent.yml │ │ │ │ ├── internal_modes │ │ │ │ │ └── jetbrains.yml │ │ │ │ ├── modes │ │ │ │ │ ├── editing.yml │ │ │ │ │ ├── interactive.yml │ │ │ │ │ ├── mode.template.yml │ │ │ │ │ ├── no-onboarding.yml │ │ │ │ │ ├── onboarding.yml │ │ │ │ │ ├── one-shot.yml │ │ │ │ │ └── planning.yml │ │ │ │ └── prompt_templates │ │ │ │ ├── simple_tool_outputs.yml │ │ │ │ └── system_prompt.yml │ │ │ ├── dashboard │ │ │ │ ├── dashboard.js │ │ │ │ ├── index.html │ │ │ │ ├── jquery.min.js │ │ │ │ ├── serena-icon-16.png │ │ │ │ ├── serena-icon-32.png │ │ │ │ ├── serena-icon-48.png │ │ │ │ ├── serena-logs-dark-mode.png │ │ │ │ └── serena-logs.png │ │ │ ├── project.template.yml │ │ │ └── serena_config.template.yml │ │ ├── symbol.py │ │ ├── text_utils.py │ │ ├── tools │ │ │ ├── __init__.py │ │ │ ├── cmd_tools.py │ │ │ ├── config_tools.py │ │ │ ├── file_tools.py │ │ │ ├── jetbrains_plugin_client.py │ │ │ ├── jetbrains_tools.py │ │ │ ├── memory_tools.py │ │ │ ├── symbol_tools.py │ │ │ ├── tools_base.py │ │ │ └── workflow_tools.py │ │ └── util │ │ ├── class_decorators.py │ │ ├── exception.py │ │ ├── file_system.py │ │ ├── general.py │ │ ├── git.py │ │ ├── inspection.py │ │ ├── logging.py │ │ ├── shell.py │ │ └── thread.py │ └── solidlsp │ ├── __init__.py │ ├── .gitignore │ ├── language_servers │ │ ├── al_language_server.py │ │ ├── bash_language_server.py │ │ ├── clangd_language_server.py │ │ ├── clojure_lsp.py │ │ ├── common.py │ │ ├── csharp_language_server.py │ │ ├── dart_language_server.py │ │ ├── eclipse_jdtls.py │ │ ├── elixir_tools │ │ │ ├── __init__.py │ │ │ ├── elixir_tools.py │ │ │ └── README.md │ │ ├── elm_language_server.py │ │ ├── erlang_language_server.py │ │ ├── gopls.py │ │ ├── intelephense.py │ │ ├── jedi_server.py │ │ ├── kotlin_language_server.py │ │ ├── lua_ls.py │ │ ├── marksman.py │ │ ├── nixd_ls.py │ │ ├── omnisharp │ │ │ ├── initialize_params.json │ │ │ ├── runtime_dependencies.json │ │ │ └── workspace_did_change_configuration.json │ │ ├── omnisharp.py │ │ ├── perl_language_server.py │ │ ├── pyright_server.py │ │ ├── r_language_server.py │ │ ├── regal_server.py │ │ ├── ruby_lsp.py │ │ ├── rust_analyzer.py │ │ ├── solargraph.py │ │ ├── sourcekit_lsp.py │ │ ├── terraform_ls.py │ │ ├── typescript_language_server.py │ │ ├── vts_language_server.py │ │ └── zls.py │ ├── ls_config.py │ ├── ls_exceptions.py │ ├── ls_handler.py │ ├── ls_logger.py │ ├── ls_request.py │ ├── ls_types.py │ ├── ls_utils.py │ ├── ls.py │ ├── lsp_protocol_handler │ │ ├── lsp_constants.py │ │ ├── lsp_requests.py │ │ ├── lsp_types.py │ │ └── server.py │ ├── settings.py │ └── util │ ├── subprocess_util.py │ └── zip.py ├── test │ ├── __init__.py │ ├── conftest.py │ ├── resources │ │ └── repos │ │ ├── al │ │ │ └── test_repo │ │ │ ├── app.json │ │ │ └── src │ │ │ ├── Codeunits │ │ │ │ ├── CustomerMgt.Codeunit.al │ │ │ │ └── PaymentProcessorImpl.Codeunit.al │ │ │ ├── Enums │ │ │ │ └── CustomerType.Enum.al │ │ │ ├── Interfaces │ │ │ │ └── IPaymentProcessor.Interface.al │ │ │ ├── Pages │ │ │ │ ├── CustomerCard.Page.al │ │ │ │ └── CustomerList.Page.al │ │ │ ├── TableExtensions │ │ │ │ └── Item.TableExt.al │ │ │ └── Tables │ │ │ └── Customer.Table.al │ │ ├── bash │ │ │ └── test_repo │ │ │ ├── config.sh │ │ │ ├── main.sh │ │ │ └── utils.sh │ │ ├── clojure │ │ │ └── test_repo │ │ │ ├── deps.edn │ │ │ └── src │ │ │ └── test_app │ │ │ ├── core.clj │ │ │ └── utils.clj │ │ ├── csharp │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── Models │ │ │ │ └── Person.cs │ │ │ ├── Program.cs │ │ │ ├── serena.sln │ │ │ └── TestProject.csproj │ │ ├── dart │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── helper.dart │ │ │ │ ├── main.dart │ │ │ │ └── models.dart │ │ │ └── pubspec.yaml │ │ ├── elixir │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── examples.ex │ │ │ │ ├── ignored_dir │ │ │ │ │ └── ignored_module.ex │ │ │ │ ├── models.ex │ │ │ │ ├── services.ex │ │ │ │ ├── test_repo.ex │ │ │ │ └── utils.ex │ │ │ ├── mix.exs │ │ │ ├── mix.lock │ │ │ ├── scripts │ │ │ │ └── build_script.ex │ │ │ └── test │ │ │ ├── models_test.exs │ │ │ └── test_repo_test.exs │ │ ├── elm │ │ │ └── test_repo │ │ │ ├── elm.json │ │ │ ├── Main.elm │ │ │ └── Utils.elm │ │ ├── erlang │ │ │ └── test_repo │ │ │ ├── hello.erl │ │ │ ├── ignored_dir │ │ │ │ └── ignored_module.erl │ │ │ ├── include │ │ │ │ ├── records.hrl │ │ │ │ └── types.hrl │ │ │ ├── math_utils.erl │ │ │ ├── rebar.config │ │ │ ├── src │ │ │ │ ├── app.erl │ │ │ │ ├── models.erl │ │ │ │ ├── services.erl │ │ │ │ └── utils.erl │ │ │ └── test │ │ │ ├── models_tests.erl │ │ │ └── utils_tests.erl │ │ ├── go │ │ │ └── test_repo │ │ │ └── main.go │ │ ├── java │ │ │ └── test_repo │ │ │ ├── pom.xml │ │ │ └── src │ │ │ └── main │ │ │ └── java │ │ │ └── test_repo │ │ │ ├── Main.java │ │ │ ├── Model.java │ │ │ ├── ModelUser.java │ │ │ └── Utils.java │ │ ├── kotlin │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── build.gradle.kts │ │ │ └── src │ │ │ └── main │ │ │ └── kotlin │ │ │ └── test_repo │ │ │ ├── Main.kt │ │ │ ├── Model.kt │ │ │ ├── ModelUser.kt │ │ │ └── Utils.kt │ │ ├── lua │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── main.lua │ │ │ ├── src │ │ │ │ ├── calculator.lua │ │ │ │ └── utils.lua │ │ │ └── tests │ │ │ └── test_calculator.lua │ │ ├── markdown │ │ │ └── test_repo │ │ │ ├── api.md │ │ │ ├── CONTRIBUTING.md │ │ │ ├── guide.md │ │ │ └── README.md │ │ ├── nix │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── default.nix │ │ │ ├── flake.nix │ │ │ ├── lib │ │ │ │ └── utils.nix │ │ │ ├── modules │ │ │ │ └── example.nix │ │ │ └── scripts │ │ │ └── hello.sh │ │ ├── perl │ │ │ └── test_repo │ │ │ ├── helper.pl │ │ │ └── main.pl │ │ ├── php │ │ │ └── test_repo │ │ │ ├── helper.php │ │ │ ├── index.php │ │ │ └── simple_var.php │ │ ├── python │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── custom_test │ │ │ │ ├── __init__.py │ │ │ │ └── advanced_features.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── user_management.py │ │ │ ├── ignore_this_dir_with_postfix │ │ │ │ └── ignored_module.py │ │ │ ├── scripts │ │ │ │ ├── __init__.py │ │ │ │ └── run_app.py │ │ │ └── test_repo │ │ │ ├── __init__.py │ │ │ ├── complex_types.py │ │ │ ├── models.py │ │ │ ├── name_collisions.py │ │ │ ├── nested_base.py │ │ │ ├── nested.py │ │ │ ├── overloaded.py │ │ │ ├── services.py │ │ │ ├── utils.py │ │ │ └── variables.py │ │ ├── r │ │ │ └── test_repo │ │ │ ├── .Rbuildignore │ │ │ ├── DESCRIPTION │ │ │ ├── examples │ │ │ │ └── analysis.R │ │ │ ├── NAMESPACE │ │ │ └── R │ │ │ ├── models.R │ │ │ └── utils.R │ │ ├── rego │ │ │ └── test_repo │ │ │ ├── policies │ │ │ │ ├── authz.rego │ │ │ │ └── validation.rego │ │ │ └── utils │ │ │ └── helpers.rego │ │ ├── ruby │ │ │ └── test_repo │ │ │ ├── .solargraph.yml │ │ │ ├── examples │ │ │ │ └── user_management.rb │ │ │ ├── lib.rb │ │ │ ├── main.rb │ │ │ ├── models.rb │ │ │ ├── nested.rb │ │ │ ├── services.rb │ │ │ └── variables.rb │ │ ├── rust │ │ │ ├── test_repo │ │ │ │ ├── Cargo.lock │ │ │ │ ├── Cargo.toml │ │ │ │ └── src │ │ │ │ ├── lib.rs │ │ │ │ └── main.rs │ │ │ └── test_repo_2024 │ │ │ ├── Cargo.lock │ │ │ ├── Cargo.toml │ │ │ └── src │ │ │ ├── lib.rs │ │ │ └── main.rs │ │ ├── swift │ │ │ └── test_repo │ │ │ ├── Package.swift │ │ │ └── src │ │ │ ├── main.swift │ │ │ └── utils.swift │ │ ├── terraform │ │ │ └── test_repo │ │ │ ├── data.tf │ │ │ ├── main.tf │ │ │ ├── outputs.tf │ │ │ └── variables.tf │ │ ├── typescript │ │ │ └── test_repo │ │ │ ├── .serena │ │ │ │ └── project.yml │ │ │ ├── index.ts │ │ │ ├── tsconfig.json │ │ │ └── use_helper.ts │ │ └── zig │ │ └── test_repo │ │ ├── .gitignore │ │ ├── build.zig │ │ ├── src │ │ │ ├── calculator.zig │ │ │ ├── main.zig │ │ │ └── math_utils.zig │ │ └── zls.json │ ├── serena │ │ ├── __init__.py │ │ ├── __snapshots__ │ │ │ └── test_symbol_editing.ambr │ │ ├── config │ │ │ ├── __init__.py │ │ │ └── test_serena_config.py │ │ ├── test_edit_marker.py │ │ ├── test_mcp.py │ │ ├── test_serena_agent.py │ │ ├── test_symbol_editing.py │ │ ├── test_symbol.py │ │ ├── test_text_utils.py │ │ ├── test_tool_parameter_types.py │ │ └── util │ │ ├── test_exception.py │ │ └── test_file_system.py │ └── solidlsp │ ├── al │ │ └── test_al_basic.py │ ├── bash │ │ ├── __init__.py │ │ └── test_bash_basic.py │ ├── clojure │ │ ├── __init__.py │ │ └── test_clojure_basic.py │ ├── csharp │ │ └── test_csharp_basic.py │ ├── dart │ │ ├── __init__.py │ │ └── test_dart_basic.py │ ├── elixir │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_elixir_basic.py │ │ ├── test_elixir_ignored_dirs.py │ │ ├── test_elixir_integration.py │ │ └── test_elixir_symbol_retrieval.py │ ├── elm │ │ └── test_elm_basic.py │ ├── erlang │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_erlang_basic.py │ │ ├── test_erlang_ignored_dirs.py │ │ └── test_erlang_symbol_retrieval.py │ ├── go │ │ └── test_go_basic.py │ ├── java │ │ └── test_java_basic.py │ ├── kotlin │ │ └── test_kotlin_basic.py │ ├── lua │ │ └── test_lua_basic.py │ ├── markdown │ │ ├── __init__.py │ │ └── test_markdown_basic.py │ ├── nix │ │ └── test_nix_basic.py │ ├── perl │ │ └── test_perl_basic.py │ ├── php │ │ └── test_php_basic.py │ ├── python │ │ ├── test_python_basic.py │ │ ├── test_retrieval_with_ignored_dirs.py │ │ └── test_symbol_retrieval.py │ ├── r │ │ ├── __init__.py │ │ └── test_r_basic.py │ ├── rego │ │ └── test_rego_basic.py │ ├── ruby │ │ ├── test_ruby_basic.py │ │ └── test_ruby_symbol_retrieval.py │ ├── rust │ │ ├── test_rust_2024_edition.py │ │ └── test_rust_basic.py │ ├── swift │ │ └── test_swift_basic.py │ ├── terraform │ │ └── test_terraform_basic.py │ ├── typescript │ │ └── test_typescript_basic.py │ ├── util │ │ └── test_zip.py │ └── zig │ └── test_zig_basic.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/solidlsp/language_servers/nixd_ls.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Provides Nix specific instantiation of the LanguageServer class using nixd (Nix Language Server). 3 | 4 | Note: Windows is not supported as Nix itself doesn't support Windows natively. 5 | """ 6 | 7 | import logging 8 | import os 9 | import pathlib 10 | import platform 11 | import shutil 12 | import subprocess 13 | import threading 14 | from pathlib import Path 15 | 16 | from overrides import override 17 | 18 | from solidlsp import ls_types 19 | from solidlsp.ls import SolidLanguageServer 20 | from solidlsp.ls_config import LanguageServerConfig 21 | from solidlsp.ls_logger import LanguageServerLogger 22 | from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams 23 | from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo 24 | from solidlsp.settings import SolidLSPSettings 25 | 26 | 27 | class NixLanguageServer(SolidLanguageServer): 28 | """ 29 | Provides Nix specific instantiation of the LanguageServer class using nixd. 30 | """ 31 | 32 | def _extend_nix_symbol_range_to_include_semicolon( 33 | self, symbol: ls_types.UnifiedSymbolInformation, file_content: str 34 | ) -> ls_types.UnifiedSymbolInformation: 35 | """ 36 | Extend symbol range to include trailing semicolon for Nix attribute symbols. 37 | 38 | nixd provides ranges that exclude semicolons (expression-level), but serena needs 39 | statement-level ranges that include semicolons for proper replacement. 40 | """ 41 | range_info = symbol["range"] 42 | end_line = range_info["end"]["line"] 43 | end_char = range_info["end"]["character"] 44 | 45 | # Split file content into lines 46 | lines = file_content.split("\n") 47 | if end_line >= len(lines): 48 | return symbol 49 | 50 | line = lines[end_line] 51 | 52 | # Check if there's a semicolon immediately after the current range end 53 | if end_char < len(line) and line[end_char] == ";": 54 | # Extend range to include the semicolon 55 | new_range = {"start": range_info["start"], "end": {"line": end_line, "character": end_char + 1}} 56 | 57 | # Create modified symbol with extended range 58 | extended_symbol = symbol.copy() 59 | extended_symbol["range"] = new_range 60 | 61 | # CRITICAL: Also update the location.range if it exists 62 | if extended_symbol.get("location"): 63 | location = extended_symbol["location"].copy() 64 | if "range" in location: 65 | location["range"] = new_range.copy() 66 | extended_symbol["location"] = location 67 | 68 | return extended_symbol 69 | 70 | return symbol 71 | 72 | @override 73 | def request_document_symbols( 74 | self, relative_file_path: str, include_body: bool = False 75 | ) -> tuple[list[ls_types.UnifiedSymbolInformation], list[ls_types.UnifiedSymbolInformation]]: 76 | """ 77 | Override to extend Nix symbol ranges to include trailing semicolons. 78 | 79 | nixd provides expression-level ranges (excluding semicolons) but serena needs 80 | statement-level ranges (including semicolons) for proper symbol replacement. 81 | """ 82 | # Get symbols from parent implementation 83 | all_symbols, root_symbols = super().request_document_symbols(relative_file_path, include_body) 84 | 85 | # Get file content for range extension 86 | file_content = self.language_server.retrieve_full_file_content(relative_file_path) 87 | 88 | # Extend ranges for all symbols recursively 89 | def extend_symbol_and_children(symbol: ls_types.UnifiedSymbolInformation) -> ls_types.UnifiedSymbolInformation: 90 | # Extend this symbol's range 91 | extended = self._extend_nix_symbol_range_to_include_semicolon(symbol, file_content) 92 | 93 | # Extend children recursively 94 | if extended.get("children"): 95 | extended["children"] = [extend_symbol_and_children(child) for child in extended["children"]] 96 | 97 | return extended 98 | 99 | # Apply range extension to all symbols 100 | extended_all_symbols = [extend_symbol_and_children(sym) for sym in all_symbols] 101 | extended_root_symbols = [extend_symbol_and_children(sym) for sym in root_symbols] 102 | 103 | return extended_all_symbols, extended_root_symbols 104 | 105 | @override 106 | def is_ignored_dirname(self, dirname: str) -> bool: 107 | # For Nix projects, we should ignore: 108 | # - result: nix build output symlinks 109 | # - result-*: multiple build outputs 110 | # - .direnv: direnv cache 111 | return super().is_ignored_dirname(dirname) or dirname in ["result", ".direnv"] or dirname.startswith("result-") 112 | 113 | @staticmethod 114 | def _get_nixd_version(): 115 | """Get the installed nixd version or None if not found.""" 116 | try: 117 | result = subprocess.run(["nixd", "--version"], capture_output=True, text=True, check=False) 118 | if result.returncode == 0: 119 | # nixd outputs version like: nixd 2.0.0 120 | return result.stdout.strip() 121 | except FileNotFoundError: 122 | return None 123 | return None 124 | 125 | @staticmethod 126 | def _check_nixd_installed(): 127 | """Check if nixd is installed in the system.""" 128 | return shutil.which("nixd") is not None 129 | 130 | @staticmethod 131 | def _get_nixd_path(): 132 | """Get the path to nixd executable.""" 133 | # First check if it's in PATH 134 | nixd_path = shutil.which("nixd") 135 | if nixd_path: 136 | return nixd_path 137 | 138 | # Check common installation locations 139 | home = Path.home() 140 | possible_paths = [ 141 | home / ".local" / "bin" / "nixd", 142 | home / ".serena" / "language_servers" / "nixd" / "nixd", 143 | home / ".nix-profile" / "bin" / "nixd", 144 | Path("/usr/local/bin/nixd"), 145 | Path("/run/current-system/sw/bin/nixd"), # NixOS system profile 146 | Path("/opt/homebrew/bin/nixd"), # Homebrew on Apple Silicon 147 | Path("/usr/local/opt/nixd/bin/nixd"), # Homebrew on Intel Mac 148 | ] 149 | 150 | # Add Windows-specific paths 151 | if platform.system() == "Windows": 152 | possible_paths.extend( 153 | [ 154 | home / "AppData" / "Local" / "nixd" / "nixd.exe", 155 | home / ".serena" / "language_servers" / "nixd" / "nixd.exe", 156 | ] 157 | ) 158 | 159 | for path in possible_paths: 160 | if path.exists(): 161 | return str(path) 162 | 163 | return None 164 | 165 | @staticmethod 166 | def _install_nixd_with_nix(): 167 | """Install nixd using nix if available.""" 168 | # Check if nix is available 169 | if not shutil.which("nix"): 170 | return None 171 | 172 | print("Installing nixd using nix... This may take a few minutes.") 173 | try: 174 | # Try to install nixd using nix profile 175 | result = subprocess.run( 176 | ["nix", "profile", "install", "github:nix-community/nixd"], 177 | capture_output=True, 178 | text=True, 179 | check=False, 180 | timeout=600, # 10 minute timeout for building 181 | ) 182 | 183 | if result.returncode == 0: 184 | # Check if nixd is now in PATH 185 | nixd_path = shutil.which("nixd") 186 | if nixd_path: 187 | print(f"Successfully installed nixd at: {nixd_path}") 188 | return nixd_path 189 | else: 190 | # Try nix-env as fallback 191 | result = subprocess.run( 192 | ["nix-env", "-iA", "nixpkgs.nixd"], 193 | capture_output=True, 194 | text=True, 195 | check=False, 196 | timeout=600, 197 | ) 198 | if result.returncode == 0: 199 | nixd_path = shutil.which("nixd") 200 | if nixd_path: 201 | print(f"Successfully installed nixd at: {nixd_path}") 202 | return nixd_path 203 | print(f"Failed to install nixd: {result.stderr}") 204 | 205 | except subprocess.TimeoutExpired: 206 | print("Nix install timed out after 10 minutes") 207 | except Exception as e: 208 | print(f"Error installing nixd with nix: {e}") 209 | 210 | return None 211 | 212 | @staticmethod 213 | def _setup_runtime_dependency(): 214 | """ 215 | Check if required Nix runtime dependencies are available. 216 | Attempts to install nixd if not present. 217 | """ 218 | # First check if Nix is available (nixd needs it at runtime) 219 | if not shutil.which("nix"): 220 | print("WARNING: Nix is not installed. nixd requires Nix to function properly.") 221 | raise RuntimeError("Nix is required for nixd. Please install Nix from https://nixos.org/download.html") 222 | 223 | nixd_path = NixLanguageServer._get_nixd_path() 224 | 225 | if not nixd_path: 226 | print("nixd not found. Attempting to install...") 227 | 228 | # Try to install with nix if available 229 | nixd_path = NixLanguageServer._install_nixd_with_nix() 230 | 231 | if not nixd_path: 232 | raise RuntimeError( 233 | "nixd (Nix Language Server) is not installed.\n" 234 | "Please install nixd using one of the following methods:\n" 235 | " - Using Nix flakes: nix profile install github:nix-community/nixd\n" 236 | " - From nixpkgs: nix-env -iA nixpkgs.nixd\n" 237 | " - On macOS with Homebrew: brew install nixd\n\n" 238 | "After installation, make sure 'nixd' is in your PATH." 239 | ) 240 | 241 | # Verify nixd works 242 | try: 243 | result = subprocess.run([nixd_path, "--version"], capture_output=True, text=True, check=False, timeout=5) 244 | if result.returncode != 0: 245 | raise RuntimeError(f"nixd failed to run: {result.stderr}") 246 | except Exception as e: 247 | raise RuntimeError(f"Failed to verify nixd installation: {e}") 248 | 249 | return nixd_path 250 | 251 | def __init__( 252 | self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings 253 | ): 254 | nixd_path = self._setup_runtime_dependency() 255 | 256 | super().__init__( 257 | config, 258 | logger, 259 | repository_root_path, 260 | ProcessLaunchInfo(cmd=nixd_path, cwd=repository_root_path), 261 | "nix", 262 | solidlsp_settings, 263 | ) 264 | self.server_ready = threading.Event() 265 | self.request_id = 0 266 | 267 | @staticmethod 268 | def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: 269 | """ 270 | Returns the initialize params for nixd. 271 | """ 272 | root_uri = pathlib.Path(repository_absolute_path).as_uri() 273 | initialize_params = { 274 | "locale": "en", 275 | "capabilities": { 276 | "textDocument": { 277 | "synchronization": {"didSave": True, "dynamicRegistration": True}, 278 | "definition": {"dynamicRegistration": True}, 279 | "references": {"dynamicRegistration": True}, 280 | "documentSymbol": { 281 | "dynamicRegistration": True, 282 | "hierarchicalDocumentSymbolSupport": True, 283 | "symbolKind": {"valueSet": list(range(1, 27))}, 284 | }, 285 | "completion": { 286 | "dynamicRegistration": True, 287 | "completionItem": { 288 | "snippetSupport": True, 289 | "commitCharactersSupport": True, 290 | "documentationFormat": ["markdown", "plaintext"], 291 | "deprecatedSupport": True, 292 | "preselectSupport": True, 293 | }, 294 | }, 295 | "hover": { 296 | "dynamicRegistration": True, 297 | "contentFormat": ["markdown", "plaintext"], 298 | }, 299 | "signatureHelp": { 300 | "dynamicRegistration": True, 301 | "signatureInformation": { 302 | "documentationFormat": ["markdown", "plaintext"], 303 | "parameterInformation": {"labelOffsetSupport": True}, 304 | }, 305 | }, 306 | "codeAction": { 307 | "dynamicRegistration": True, 308 | "codeActionLiteralSupport": { 309 | "codeActionKind": { 310 | "valueSet": [ 311 | "", 312 | "quickfix", 313 | "refactor", 314 | "refactor.extract", 315 | "refactor.inline", 316 | "refactor.rewrite", 317 | "source", 318 | "source.organizeImports", 319 | ] 320 | } 321 | }, 322 | }, 323 | "rename": {"dynamicRegistration": True, "prepareSupport": True}, 324 | }, 325 | "workspace": { 326 | "workspaceFolders": True, 327 | "didChangeConfiguration": {"dynamicRegistration": True}, 328 | "configuration": True, 329 | "symbol": { 330 | "dynamicRegistration": True, 331 | "symbolKind": {"valueSet": list(range(1, 27))}, 332 | }, 333 | }, 334 | }, 335 | "processId": os.getpid(), 336 | "rootPath": repository_absolute_path, 337 | "rootUri": root_uri, 338 | "workspaceFolders": [ 339 | { 340 | "uri": root_uri, 341 | "name": os.path.basename(repository_absolute_path), 342 | } 343 | ], 344 | "initializationOptions": { 345 | # nixd specific options 346 | "nixpkgs": {"expr": "import <nixpkgs> { }"}, 347 | "formatting": {"command": ["nixpkgs-fmt"]}, # or ["alejandra"] or ["nixfmt"] 348 | "options": { 349 | "enable": True, 350 | "target": { 351 | "installable": "", # Will be auto-detected from flake.nix if present 352 | }, 353 | }, 354 | }, 355 | } 356 | return initialize_params 357 | 358 | def _start_server(self): 359 | """Start nixd server process""" 360 | 361 | def register_capability_handler(params): 362 | return 363 | 364 | def window_log_message(msg): 365 | self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) 366 | 367 | def do_nothing(params): 368 | return 369 | 370 | self.server.on_request("client/registerCapability", register_capability_handler) 371 | self.server.on_notification("window/logMessage", window_log_message) 372 | self.server.on_notification("$/progress", do_nothing) 373 | self.server.on_notification("textDocument/publishDiagnostics", do_nothing) 374 | 375 | self.logger.log("Starting nixd server process", logging.INFO) 376 | self.server.start() 377 | initialize_params = self._get_initialize_params(self.repository_root_path) 378 | 379 | self.logger.log( 380 | "Sending initialize request from LSP client to LSP server and awaiting response", 381 | logging.INFO, 382 | ) 383 | init_response = self.server.send.initialize(initialize_params) 384 | 385 | # Verify server capabilities 386 | assert "textDocumentSync" in init_response["capabilities"] 387 | assert "definitionProvider" in init_response["capabilities"] 388 | assert "documentSymbolProvider" in init_response["capabilities"] 389 | assert "referencesProvider" in init_response["capabilities"] 390 | 391 | self.server.notify.initialized({}) 392 | self.completions_available.set() 393 | 394 | # nixd server is typically ready immediately after initialization 395 | self.server_ready.set() 396 | self.server_ready.wait() 397 | ``` -------------------------------------------------------------------------------- /src/serena/mcp.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | The Serena Model Context Protocol (MCP) Server 3 | """ 4 | 5 | import sys 6 | from abc import abstractmethod 7 | from collections.abc import AsyncIterator, Iterator, Sequence 8 | from contextlib import asynccontextmanager 9 | from copy import deepcopy 10 | from dataclasses import dataclass 11 | from typing import Any, Literal, cast 12 | 13 | import docstring_parser 14 | from mcp.server.fastmcp import server 15 | from mcp.server.fastmcp.server import FastMCP, Settings 16 | from mcp.server.fastmcp.tools.base import Tool as MCPTool 17 | from pydantic_settings import SettingsConfigDict 18 | from sensai.util import logging 19 | 20 | from serena.agent import ( 21 | SerenaAgent, 22 | SerenaConfig, 23 | ) 24 | from serena.config.context_mode import SerenaAgentContext, SerenaAgentMode 25 | from serena.constants import DEFAULT_CONTEXT, DEFAULT_MODES, SERENA_LOG_FORMAT 26 | from serena.tools import Tool 27 | from serena.util.exception import show_fatal_exception_safe 28 | from serena.util.logging import MemoryLogHandler 29 | 30 | log = logging.getLogger(__name__) 31 | 32 | 33 | def configure_logging(*args, **kwargs) -> None: # type: ignore 34 | # We only do something here if logging has not yet been configured. 35 | # Normally, logging is configured in the MCP server startup script. 36 | if not logging.is_enabled(): 37 | logging.basicConfig(level=logging.INFO, stream=sys.stderr, format=SERENA_LOG_FORMAT) 38 | 39 | 40 | # patch the logging configuration function in fastmcp, because it's hard-coded and broken 41 | server.configure_logging = configure_logging # type: ignore 42 | 43 | 44 | @dataclass 45 | class SerenaMCPRequestContext: 46 | agent: SerenaAgent 47 | 48 | 49 | class SerenaMCPFactory: 50 | def __init__(self, context: str = DEFAULT_CONTEXT, project: str | None = None): 51 | """ 52 | :param context: The context name or path to context file 53 | :param project: Either an absolute path to the project directory or a name of an already registered project. 54 | If the project passed here hasn't been registered yet, it will be registered automatically and can be activated by its name 55 | afterward. 56 | """ 57 | self.context = SerenaAgentContext.load(context) 58 | self.project = project 59 | 60 | @staticmethod 61 | def _sanitize_for_openai_tools(schema: dict) -> dict: 62 | """ 63 | This method was written by GPT-5, I have not reviewed it in detail. 64 | Only called when `openai_tool_compatible` is True. 65 | 66 | Make a Pydantic/JSON Schema object compatible with OpenAI tool schema. 67 | - 'integer' -> 'number' (+ multipleOf: 1) 68 | - remove 'null' from union type arrays 69 | - coerce integer-only enums to number 70 | - best-effort simplify oneOf/anyOf when they only differ by integer/number 71 | """ 72 | s = deepcopy(schema) 73 | 74 | def walk(node): # type: ignore 75 | if not isinstance(node, dict): 76 | # lists get handled by parent calls 77 | return node 78 | 79 | # ---- handle type ---- 80 | t = node.get("type") 81 | if isinstance(t, str): 82 | if t == "integer": 83 | node["type"] = "number" 84 | # preserve existing multipleOf but ensure it's integer-like 85 | if "multipleOf" not in node: 86 | node["multipleOf"] = 1 87 | elif isinstance(t, list): 88 | # remove 'null' (OpenAI tools don't support nullables) 89 | t2 = [x if x != "integer" else "number" for x in t if x != "null"] 90 | if not t2: 91 | # fall back to object if it somehow becomes empty 92 | t2 = ["object"] 93 | node["type"] = t2[0] if len(t2) == 1 else t2 94 | if "integer" in t or "number" in t2: 95 | # if integers were present, keep integer-like restriction 96 | node.setdefault("multipleOf", 1) 97 | 98 | # ---- enums of integers -> number ---- 99 | if "enum" in node and isinstance(node["enum"], list): 100 | vals = node["enum"] 101 | if vals and all(isinstance(v, int) for v in vals): 102 | node.setdefault("type", "number") 103 | # keep them as ints; JSON 'number' covers ints 104 | node.setdefault("multipleOf", 1) 105 | 106 | # ---- simplify anyOf/oneOf if they only differ by integer/number ---- 107 | for key in ("oneOf", "anyOf"): 108 | if key in node and isinstance(node[key], list): 109 | # Special case: anyOf or oneOf with "type X" and "null" 110 | if len(node[key]) == 2: 111 | types = [sub.get("type") for sub in node[key]] 112 | if "null" in types: 113 | non_null_type = next(t for t in types if t != "null") 114 | if isinstance(non_null_type, str): 115 | node["type"] = non_null_type 116 | node.pop(key, None) 117 | continue 118 | simplified = [] 119 | changed = False 120 | for sub in node[key]: 121 | sub = walk(sub) # recurse 122 | simplified.append(sub) 123 | # If all subs are the same after integer→number, collapse 124 | try: 125 | import json 126 | 127 | canon = [json.dumps(x, sort_keys=True) for x in simplified] 128 | if len(set(canon)) == 1: 129 | # copy the single schema up 130 | only = simplified[0] 131 | node.pop(key, None) 132 | for k, v in only.items(): 133 | if k not in node: 134 | node[k] = v 135 | changed = True 136 | except Exception: 137 | pass 138 | if not changed: 139 | node[key] = simplified 140 | 141 | # ---- recurse into known schema containers ---- 142 | for child_key in ("properties", "patternProperties", "definitions", "$defs"): 143 | if child_key in node and isinstance(node[child_key], dict): 144 | for k, v in list(node[child_key].items()): 145 | node[child_key][k] = walk(v) 146 | 147 | # arrays/items 148 | if "items" in node: 149 | node["items"] = walk(node["items"]) 150 | 151 | # allOf/if/then/else - pass through with integer→number conversions applied inside 152 | for key in ("allOf",): 153 | if key in node and isinstance(node[key], list): 154 | node[key] = [walk(x) for x in node[key]] 155 | 156 | if "if" in node: 157 | node["if"] = walk(node["if"]) 158 | if "then" in node: 159 | node["then"] = walk(node["then"]) 160 | if "else" in node: 161 | node["else"] = walk(node["else"]) 162 | 163 | return node 164 | 165 | return walk(s) 166 | 167 | @staticmethod 168 | def make_mcp_tool(tool: Tool, openai_tool_compatible: bool = True) -> MCPTool: 169 | """ 170 | Create an MCP tool from a Serena Tool instance. 171 | 172 | :param tool: The Serena Tool instance to convert. 173 | :param openai_tool_compatible: whether to process the tool schema to be compatible with OpenAI tools 174 | (doesn't accept integer, needs number instead, etc.). This allows using Serena MCP within codex. 175 | """ 176 | func_name = tool.get_name() 177 | func_doc = tool.get_apply_docstring() or "" 178 | func_arg_metadata = tool.get_apply_fn_metadata() 179 | is_async = False 180 | parameters = func_arg_metadata.arg_model.model_json_schema() 181 | if openai_tool_compatible: 182 | parameters = SerenaMCPFactory._sanitize_for_openai_tools(parameters) 183 | 184 | docstring = docstring_parser.parse(func_doc) 185 | 186 | # Mount the tool description as a combination of the docstring description and 187 | # the return value description, if it exists. 188 | overridden_description = tool.agent.get_context().tool_description_overrides.get(func_name, None) 189 | 190 | if overridden_description is not None: 191 | func_doc = overridden_description 192 | elif docstring.description: 193 | func_doc = docstring.description 194 | else: 195 | func_doc = "" 196 | func_doc = func_doc.strip().strip(".") 197 | if func_doc: 198 | func_doc += "." 199 | if docstring.returns and (docstring_returns_descr := docstring.returns.description): 200 | # Only add a space before "Returns" if func_doc is not empty 201 | prefix = " " if func_doc else "" 202 | func_doc = f"{func_doc}{prefix}Returns {docstring_returns_descr.strip().strip('.')}." 203 | 204 | # Parse the parameter descriptions from the docstring and add pass its description 205 | # to the parameter schema. 206 | docstring_params = {param.arg_name: param for param in docstring.params} 207 | parameters_properties: dict[str, dict[str, Any]] = parameters["properties"] 208 | for parameter, properties in parameters_properties.items(): 209 | if (param_doc := docstring_params.get(parameter)) and param_doc.description: 210 | param_desc = f"{param_doc.description.strip().strip('.') + '.'}" 211 | properties["description"] = param_desc[0].upper() + param_desc[1:] 212 | 213 | def execute_fn(**kwargs) -> str: # type: ignore 214 | return tool.apply_ex(log_call=True, catch_exceptions=True, **kwargs) 215 | 216 | return MCPTool( 217 | fn=execute_fn, 218 | name=func_name, 219 | description=func_doc, 220 | parameters=parameters, 221 | fn_metadata=func_arg_metadata, 222 | is_async=is_async, 223 | context_kwarg=None, 224 | annotations=None, 225 | title=None, 226 | ) 227 | 228 | @abstractmethod 229 | def _iter_tools(self) -> Iterator[Tool]: 230 | pass 231 | 232 | # noinspection PyProtectedMember 233 | def _set_mcp_tools(self, mcp: FastMCP, openai_tool_compatible: bool = False) -> None: 234 | """Update the tools in the MCP server""" 235 | if mcp is not None: 236 | mcp._tool_manager._tools = {} 237 | for tool in self._iter_tools(): 238 | mcp_tool = self.make_mcp_tool(tool, openai_tool_compatible=openai_tool_compatible) 239 | mcp._tool_manager._tools[tool.get_name()] = mcp_tool 240 | log.info(f"Starting MCP server with {len(mcp._tool_manager._tools)} tools: {list(mcp._tool_manager._tools.keys())}") 241 | 242 | @abstractmethod 243 | def _instantiate_agent(self, serena_config: SerenaConfig, modes: list[SerenaAgentMode]) -> None: 244 | pass 245 | 246 | def create_mcp_server( 247 | self, 248 | host: str = "0.0.0.0", 249 | port: int = 8000, 250 | modes: Sequence[str] = DEFAULT_MODES, 251 | enable_web_dashboard: bool | None = None, 252 | enable_gui_log_window: bool | None = None, 253 | log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] | None = None, 254 | trace_lsp_communication: bool | None = None, 255 | tool_timeout: float | None = None, 256 | ) -> FastMCP: 257 | """ 258 | Create an MCP server with process-isolated SerenaAgent to prevent asyncio contamination. 259 | 260 | :param host: The host to bind to 261 | :param port: The port to bind to 262 | :param modes: List of mode names or paths to mode files 263 | :param enable_web_dashboard: Whether to enable the web dashboard. If not specified, will take the value from the serena configuration. 264 | :param enable_gui_log_window: Whether to enable the GUI log window. It currently does not work on macOS, and setting this to True will be ignored then. 265 | If not specified, will take the value from the serena configuration. 266 | :param log_level: Log level. If not specified, will take the value from the serena configuration. 267 | :param trace_lsp_communication: Whether to trace the communication between Serena and the language servers. 268 | This is useful for debugging language server issues. 269 | :param tool_timeout: Timeout in seconds for tool execution. If not specified, will take the value from the serena configuration. 270 | """ 271 | try: 272 | config = SerenaConfig.from_config_file() 273 | 274 | # update configuration with the provided parameters 275 | if enable_web_dashboard is not None: 276 | config.web_dashboard = enable_web_dashboard 277 | if enable_gui_log_window is not None: 278 | config.gui_log_window_enabled = enable_gui_log_window 279 | if log_level is not None: 280 | log_level = cast(Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], log_level.upper()) 281 | config.log_level = logging.getLevelNamesMapping()[log_level] 282 | if trace_lsp_communication is not None: 283 | config.trace_lsp_communication = trace_lsp_communication 284 | if tool_timeout is not None: 285 | config.tool_timeout = tool_timeout 286 | 287 | modes_instances = [SerenaAgentMode.load(mode) for mode in modes] 288 | self._instantiate_agent(config, modes_instances) 289 | 290 | except Exception as e: 291 | show_fatal_exception_safe(e) 292 | raise 293 | 294 | # Override model_config to disable the use of `.env` files for reading settings, because user projects are likely to contain 295 | # `.env` files (e.g. containing LOG_LEVEL) that are not supposed to override the MCP settings; 296 | # retain only FASTMCP_ prefix for already set environment variables. 297 | Settings.model_config = SettingsConfigDict(env_prefix="FASTMCP_") 298 | instructions = self._get_initial_instructions() 299 | mcp = FastMCP(lifespan=self.server_lifespan, host=host, port=port, instructions=instructions) 300 | return mcp 301 | 302 | @asynccontextmanager 303 | @abstractmethod 304 | async def server_lifespan(self, mcp_server: FastMCP) -> AsyncIterator[None]: 305 | """Manage server startup and shutdown lifecycle.""" 306 | yield None # ensures MyPy understands we yield None 307 | 308 | @abstractmethod 309 | def _get_initial_instructions(self) -> str: 310 | pass 311 | 312 | 313 | class SerenaMCPFactorySingleProcess(SerenaMCPFactory): 314 | """ 315 | MCP server factory where the SerenaAgent and its language server run in the same process as the MCP server 316 | """ 317 | 318 | def __init__(self, context: str = DEFAULT_CONTEXT, project: str | None = None, memory_log_handler: MemoryLogHandler | None = None): 319 | """ 320 | :param context: The context name or path to context file 321 | :param project: Either an absolute path to the project directory or a name of an already registered project. 322 | If the project passed here hasn't been registered yet, it will be registered automatically and can be activated by its name 323 | afterward. 324 | """ 325 | super().__init__(context=context, project=project) 326 | self.agent: SerenaAgent | None = None 327 | self.memory_log_handler = memory_log_handler 328 | 329 | def _instantiate_agent(self, serena_config: SerenaConfig, modes: list[SerenaAgentMode]) -> None: 330 | self.agent = SerenaAgent( 331 | project=self.project, serena_config=serena_config, context=self.context, modes=modes, memory_log_handler=self.memory_log_handler 332 | ) 333 | 334 | def _iter_tools(self) -> Iterator[Tool]: 335 | assert self.agent is not None 336 | yield from self.agent.get_exposed_tool_instances() 337 | 338 | def _get_initial_instructions(self) -> str: 339 | assert self.agent is not None 340 | # we don't use the tool (which at the time of writing calls this method), since the tool may be disabled by the config 341 | return self.agent.create_system_prompt() 342 | 343 | @asynccontextmanager 344 | async def server_lifespan(self, mcp_server: FastMCP) -> AsyncIterator[None]: 345 | openai_tool_compatible = self.context.name in ["chatgpt", "codex", "oaicompat-agent"] 346 | self._set_mcp_tools(mcp_server, openai_tool_compatible=openai_tool_compatible) 347 | log.info("MCP server lifetime setup complete") 348 | yield 349 | ``` -------------------------------------------------------------------------------- /src/serena/text_utils.py: -------------------------------------------------------------------------------- ```python 1 | import fnmatch 2 | import logging 3 | import os 4 | import re 5 | from collections.abc import Callable 6 | from dataclasses import dataclass, field 7 | from enum import StrEnum 8 | from typing import Any, Self 9 | 10 | from joblib import Parallel, delayed 11 | 12 | log = logging.getLogger(__name__) 13 | 14 | 15 | class LineType(StrEnum): 16 | """Enum for different types of lines in search results.""" 17 | 18 | MATCH = "match" 19 | """Part of the matched lines""" 20 | BEFORE_MATCH = "prefix" 21 | """Lines before the match""" 22 | AFTER_MATCH = "postfix" 23 | """Lines after the match""" 24 | 25 | 26 | @dataclass(kw_only=True) 27 | class TextLine: 28 | """Represents a line of text with information on how it relates to the match.""" 29 | 30 | line_number: int 31 | line_content: str 32 | match_type: LineType 33 | """Represents the type of line (match, prefix, postfix)""" 34 | 35 | def get_display_prefix(self) -> str: 36 | """Get the display prefix for this line based on the match type.""" 37 | if self.match_type == LineType.MATCH: 38 | return " >" 39 | return "..." 40 | 41 | def format_line(self, include_line_numbers: bool = True) -> str: 42 | """Format the line for display (e.g.,for logging or passing to an LLM). 43 | 44 | :param include_line_numbers: Whether to include the line number in the result. 45 | """ 46 | prefix = self.get_display_prefix() 47 | if include_line_numbers: 48 | line_num = str(self.line_number).rjust(4) 49 | prefix = f"{prefix}{line_num}" 50 | return f"{prefix}:{self.line_content}" 51 | 52 | 53 | @dataclass(kw_only=True) 54 | class MatchedConsecutiveLines: 55 | """Represents a collection of consecutive lines found through some criterion in a text file or a string. 56 | May include lines before, after, and matched. 57 | """ 58 | 59 | lines: list[TextLine] 60 | """All lines in the context of the match. At least one of them is of `match_type` `MATCH`.""" 61 | source_file_path: str | None = None 62 | """Path to the file where the match was found (Metadata).""" 63 | 64 | # set in post-init 65 | lines_before_matched: list[TextLine] = field(default_factory=list) 66 | matched_lines: list[TextLine] = field(default_factory=list) 67 | lines_after_matched: list[TextLine] = field(default_factory=list) 68 | 69 | def __post_init__(self) -> None: 70 | for line in self.lines: 71 | if line.match_type == LineType.BEFORE_MATCH: 72 | self.lines_before_matched.append(line) 73 | elif line.match_type == LineType.MATCH: 74 | self.matched_lines.append(line) 75 | elif line.match_type == LineType.AFTER_MATCH: 76 | self.lines_after_matched.append(line) 77 | 78 | assert len(self.matched_lines) > 0, "At least one matched line is required" 79 | 80 | @property 81 | def start_line(self) -> int: 82 | return self.lines[0].line_number 83 | 84 | @property 85 | def end_line(self) -> int: 86 | return self.lines[-1].line_number 87 | 88 | @property 89 | def num_matched_lines(self) -> int: 90 | return len(self.matched_lines) 91 | 92 | def to_display_string(self, include_line_numbers: bool = True) -> str: 93 | return "\n".join([line.format_line(include_line_numbers) for line in self.lines]) 94 | 95 | @classmethod 96 | def from_file_contents( 97 | cls, file_contents: str, line: int, context_lines_before: int = 0, context_lines_after: int = 0, source_file_path: str | None = None 98 | ) -> Self: 99 | line_contents = file_contents.split("\n") 100 | start_lineno = max(0, line - context_lines_before) 101 | end_lineno = min(len(line_contents) - 1, line + context_lines_after) 102 | text_lines: list[TextLine] = [] 103 | # before the line 104 | for lineno in range(start_lineno, line): 105 | text_lines.append(TextLine(line_number=lineno, line_content=line_contents[lineno], match_type=LineType.BEFORE_MATCH)) 106 | # the line 107 | text_lines.append(TextLine(line_number=line, line_content=line_contents[line], match_type=LineType.MATCH)) 108 | # after the line 109 | for lineno in range(line + 1, end_lineno + 1): 110 | text_lines.append(TextLine(line_number=lineno, line_content=line_contents[lineno], match_type=LineType.AFTER_MATCH)) 111 | 112 | return cls(lines=text_lines, source_file_path=source_file_path) 113 | 114 | 115 | def glob_to_regex(glob_pat: str) -> str: 116 | regex_parts: list[str] = [] 117 | i = 0 118 | while i < len(glob_pat): 119 | ch = glob_pat[i] 120 | if ch == "*": 121 | regex_parts.append(".*") 122 | elif ch == "?": 123 | regex_parts.append(".") 124 | elif ch == "\\": 125 | i += 1 126 | if i < len(glob_pat): 127 | regex_parts.append(re.escape(glob_pat[i])) 128 | else: 129 | regex_parts.append("\\") 130 | else: 131 | regex_parts.append(re.escape(ch)) 132 | i += 1 133 | return "".join(regex_parts) 134 | 135 | 136 | def search_text( 137 | pattern: str, 138 | content: str | None = None, 139 | source_file_path: str | None = None, 140 | allow_multiline_match: bool = False, 141 | context_lines_before: int = 0, 142 | context_lines_after: int = 0, 143 | is_glob: bool = False, 144 | ) -> list[MatchedConsecutiveLines]: 145 | """ 146 | Search for a pattern in text content. Supports both regex and glob-like patterns. 147 | 148 | :param pattern: Pattern to search for (regex or glob-like pattern) 149 | :param content: The text content to search. May be None if source_file_path is provided. 150 | :param source_file_path: Optional path to the source file. If content is None, 151 | this has to be passed and the file will be read. 152 | :param allow_multiline_match: Whether to search across multiple lines. Currently, the default 153 | option (False) is very inefficient, so it is recommended to set this to True. 154 | :param context_lines_before: Number of context lines to include before matches 155 | :param context_lines_after: Number of context lines to include after matches 156 | :param is_glob: If True, pattern is treated as a glob-like pattern (e.g., "*.py", "test_??.py") 157 | and will be converted to regex internally 158 | 159 | :return: List of `TextSearchMatch` objects 160 | 161 | :raises: ValueError if the pattern is not valid 162 | 163 | """ 164 | if source_file_path and content is None: 165 | with open(source_file_path) as f: 166 | content = f.read() 167 | 168 | if content is None: 169 | raise ValueError("Pass either content or source_file_path") 170 | 171 | matches = [] 172 | lines = content.splitlines() 173 | total_lines = len(lines) 174 | 175 | # Convert pattern to a compiled regex if it's a string 176 | if is_glob: 177 | pattern = glob_to_regex(pattern) 178 | if allow_multiline_match: 179 | # For multiline matches, we need to use the DOTALL flag to make '.' match newlines 180 | compiled_pattern = re.compile(pattern, re.DOTALL) 181 | # Search across the entire content as a single string 182 | for match in compiled_pattern.finditer(content): 183 | start_pos = match.start() 184 | end_pos = match.end() 185 | 186 | # Find the line numbers for the start and end positions 187 | start_line_num = content[:start_pos].count("\n") + 1 188 | end_line_num = content[:end_pos].count("\n") + 1 189 | 190 | # Calculate the range of lines to include in the context 191 | context_start = max(1, start_line_num - context_lines_before) 192 | context_end = min(total_lines, end_line_num + context_lines_after) 193 | 194 | # Create TextLine objects for the context 195 | context_lines = [] 196 | for i in range(context_start - 1, context_end): 197 | line_num = i + 1 198 | if context_start <= line_num < start_line_num: 199 | match_type = LineType.BEFORE_MATCH 200 | elif end_line_num < line_num <= context_end: 201 | match_type = LineType.AFTER_MATCH 202 | else: 203 | match_type = LineType.MATCH 204 | 205 | context_lines.append(TextLine(line_number=line_num, line_content=lines[i], match_type=match_type)) 206 | 207 | matches.append(MatchedConsecutiveLines(lines=context_lines, source_file_path=source_file_path)) 208 | else: 209 | # TODO: extremely inefficient! Since we currently don't use this option in SerenaAgent or LanguageServer, 210 | # it is not urgent to fix, but should be either improved or the option should be removed. 211 | # Search line by line, normal compile without DOTALL 212 | compiled_pattern = re.compile(pattern) 213 | for i, line in enumerate(lines): 214 | line_num = i + 1 215 | if compiled_pattern.search(line): 216 | # Calculate the range of lines to include in the context 217 | context_start = max(0, i - context_lines_before) 218 | context_end = min(total_lines - 1, i + context_lines_after) 219 | 220 | # Create TextLine objects for the context 221 | context_lines = [] 222 | for j in range(context_start, context_end + 1): 223 | context_line_num = j + 1 224 | if j < i: 225 | match_type = LineType.BEFORE_MATCH 226 | elif j > i: 227 | match_type = LineType.AFTER_MATCH 228 | else: 229 | match_type = LineType.MATCH 230 | 231 | context_lines.append(TextLine(line_number=context_line_num, line_content=lines[j], match_type=match_type)) 232 | 233 | matches.append(MatchedConsecutiveLines(lines=context_lines, source_file_path=source_file_path)) 234 | 235 | return matches 236 | 237 | 238 | def default_file_reader(file_path: str) -> str: 239 | """Reads using utf-8 encoding.""" 240 | with open(file_path, encoding="utf-8") as f: 241 | return f.read() 242 | 243 | 244 | def expand_braces(pattern: str) -> list[str]: 245 | """ 246 | Expands brace patterns in a glob string. 247 | For example, "**/*.{js,jsx,ts,tsx}" becomes ["**/*.js", "**/*.jsx", "**/*.ts", "**/*.tsx"]. 248 | Handles multiple brace sets as well. 249 | """ 250 | patterns = [pattern] 251 | while any("{" in p for p in patterns): 252 | new_patterns = [] 253 | for p in patterns: 254 | match = re.search(r"\{([^{}]+)\}", p) 255 | if match: 256 | prefix = p[: match.start()] 257 | suffix = p[match.end() :] 258 | options = match.group(1).split(",") 259 | for option in options: 260 | new_patterns.append(f"{prefix}{option}{suffix}") 261 | else: 262 | new_patterns.append(p) 263 | patterns = new_patterns 264 | return patterns 265 | 266 | 267 | def glob_match(pattern: str, path: str) -> bool: 268 | """ 269 | Match a file path against a glob pattern. 270 | 271 | Supports standard glob patterns: 272 | - * matches any number of characters except / 273 | - ** matches any number of directories (zero or more) 274 | - ? matches a single character except / 275 | - [seq] matches any character in seq 276 | 277 | Supports brace expansion: 278 | - {a,b,c} expands to multiple patterns (including nesting) 279 | 280 | Unsupported patterns: 281 | - Bash extended glob features are unavailable in Python's fnmatch 282 | - Extended globs like !(), ?(), +(), *(), @() are not supported 283 | 284 | :param pattern: Glob pattern (e.g., 'src/**/*.py', '**agent.py') 285 | :param path: File path to match against 286 | :return: True if path matches pattern 287 | """ 288 | pattern = pattern.replace("\\", "/") # Normalize backslashes to forward slashes 289 | path = path.replace("\\", "/") # Normalize path backslashes to forward slashes 290 | 291 | # Handle ** patterns that should match zero or more directories 292 | if "**" in pattern: 293 | # Method 1: Standard fnmatch (matches one or more directories) 294 | regex1 = fnmatch.translate(pattern) 295 | if re.match(regex1, path): 296 | return True 297 | 298 | # Method 2: Handle zero-directory case by removing /** entirely 299 | # Convert "src/**/test.py" to "src/test.py" 300 | if "/**/" in pattern: 301 | zero_dir_pattern = pattern.replace("/**/", "/") 302 | regex2 = fnmatch.translate(zero_dir_pattern) 303 | if re.match(regex2, path): 304 | return True 305 | 306 | # Method 3: Handle leading ** case by removing **/ 307 | # Convert "**/test.py" to "test.py" 308 | if pattern.startswith("**/"): 309 | zero_dir_pattern = pattern[3:] # Remove "**/" 310 | regex3 = fnmatch.translate(zero_dir_pattern) 311 | if re.match(regex3, path): 312 | return True 313 | 314 | return False 315 | else: 316 | # Simple pattern without **, use fnmatch directly 317 | return fnmatch.fnmatch(path, pattern) 318 | 319 | 320 | def search_files( 321 | relative_file_paths: list[str], 322 | pattern: str, 323 | root_path: str = "", 324 | file_reader: Callable[[str], str] = default_file_reader, 325 | context_lines_before: int = 0, 326 | context_lines_after: int = 0, 327 | paths_include_glob: str | None = None, 328 | paths_exclude_glob: str | None = None, 329 | ) -> list[MatchedConsecutiveLines]: 330 | """ 331 | Search for a pattern in a list of files. 332 | 333 | :param relative_file_paths: List of relative file paths in which to search 334 | :param pattern: Pattern to search for 335 | :param root_path: Root path to resolve relative paths against (by default, current working directory). 336 | :param file_reader: Function to read a file, by default will just use os.open. 337 | All files that can't be read by it will be skipped. 338 | :param context_lines_before: Number of context lines to include before matches 339 | :param context_lines_after: Number of context lines to include after matches 340 | :param paths_include_glob: Optional glob pattern to include files from the list 341 | :param paths_exclude_glob: Optional glob pattern to exclude files from the list 342 | :return: List of MatchedConsecutiveLines objects 343 | """ 344 | # Pre-filter paths (done sequentially to avoid overhead) 345 | # Use proper glob matching instead of gitignore patterns 346 | include_patterns = expand_braces(paths_include_glob) if paths_include_glob else None 347 | exclude_patterns = expand_braces(paths_exclude_glob) if paths_exclude_glob else None 348 | 349 | filtered_paths = [] 350 | for path in relative_file_paths: 351 | if include_patterns: 352 | if not any(glob_match(p, path) for p in include_patterns): 353 | log.debug(f"Skipping {path}: does not match include pattern {paths_include_glob}") 354 | continue 355 | 356 | if exclude_patterns: 357 | if any(glob_match(p, path) for p in exclude_patterns): 358 | log.debug(f"Skipping {path}: matches exclude pattern {paths_exclude_glob}") 359 | continue 360 | 361 | filtered_paths.append(path) 362 | 363 | log.info(f"Processing {len(filtered_paths)} files.") 364 | 365 | def process_single_file(path: str) -> dict[str, Any]: 366 | """Process a single file - this function will be parallelized.""" 367 | try: 368 | abs_path = os.path.join(root_path, path) 369 | file_content = file_reader(abs_path) 370 | search_results = search_text( 371 | pattern, 372 | content=file_content, 373 | source_file_path=path, 374 | allow_multiline_match=True, 375 | context_lines_before=context_lines_before, 376 | context_lines_after=context_lines_after, 377 | ) 378 | if len(search_results) > 0: 379 | log.debug(f"Found {len(search_results)} matches in {path}") 380 | return {"path": path, "results": search_results, "error": None} 381 | except Exception as e: 382 | log.debug(f"Error processing {path}: {e}") 383 | return {"path": path, "results": [], "error": str(e)} 384 | 385 | # Execute in parallel using joblib 386 | results = Parallel( 387 | n_jobs=-1, 388 | backend="threading", 389 | )(delayed(process_single_file)(path) for path in filtered_paths) 390 | 391 | # Collect results and errors 392 | matches = [] 393 | skipped_file_error_tuples = [] 394 | 395 | for result in results: 396 | if result["error"]: 397 | skipped_file_error_tuples.append((result["path"], result["error"])) 398 | else: 399 | matches.extend(result["results"]) 400 | 401 | if skipped_file_error_tuples: 402 | log.debug(f"Failed to read {len(skipped_file_error_tuples)} files: {skipped_file_error_tuples}") 403 | 404 | log.info(f"Found {len(matches)} total matches across {len(filtered_paths)} files") 405 | return matches 406 | ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/elixir_tools/elixir_tools.py: -------------------------------------------------------------------------------- ```python 1 | import logging 2 | import os 3 | import pathlib 4 | import stat 5 | import subprocess 6 | import threading 7 | import time 8 | 9 | from overrides import override 10 | 11 | from solidlsp.ls import SolidLanguageServer 12 | from solidlsp.ls_config import LanguageServerConfig 13 | from solidlsp.ls_logger import LanguageServerLogger 14 | from solidlsp.ls_utils import FileUtils, PlatformId, PlatformUtils 15 | from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams 16 | from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo 17 | from solidlsp.settings import SolidLSPSettings 18 | 19 | from ..common import RuntimeDependency 20 | 21 | 22 | class ElixirTools(SolidLanguageServer): 23 | """ 24 | Provides Elixir specific instantiation of the LanguageServer class using Next LS from elixir-tools. 25 | """ 26 | 27 | @override 28 | def is_ignored_dirname(self, dirname: str) -> bool: 29 | # For Elixir projects, we should ignore: 30 | # - _build: compiled artifacts 31 | # - deps: dependencies 32 | # - node_modules: if the project has JavaScript components 33 | # - .elixir_ls: ElixirLS artifacts (in case both are present) 34 | # - cover: coverage reports 35 | return super().is_ignored_dirname(dirname) or dirname in ["_build", "deps", "node_modules", ".elixir_ls", "cover"] 36 | 37 | def _is_next_ls_internal_file(self, abs_path: str) -> bool: 38 | """Check if an absolute path is a Next LS internal file that should be ignored.""" 39 | return any( 40 | pattern in abs_path 41 | for pattern in [ 42 | ".burrito", # Next LS runtime directory 43 | "next_ls_erts-", # Next LS Erlang runtime 44 | "_next_ls_private_", # Next LS private files 45 | "/priv/monkey/", # Next LS monkey patching directory 46 | ] 47 | ) 48 | 49 | @override 50 | def _send_references_request(self, relative_file_path: str, line: int, column: int): 51 | """Override to filter out Next LS internal files from references.""" 52 | from solidlsp.ls_utils import PathUtils 53 | 54 | # Get the raw response from the parent implementation 55 | raw_response = super()._send_references_request(relative_file_path, line, column) 56 | 57 | if raw_response is None: 58 | return None 59 | 60 | # Filter out Next LS internal files 61 | filtered_response = [] 62 | for item in raw_response: 63 | if isinstance(item, dict) and "uri" in item: 64 | abs_path = PathUtils.uri_to_path(item["uri"]) 65 | if self._is_next_ls_internal_file(abs_path): 66 | self.logger.log(f"Filtering out Next LS internal file: {abs_path}", logging.DEBUG) 67 | continue 68 | filtered_response.append(item) 69 | 70 | return filtered_response 71 | 72 | @classmethod 73 | def _get_elixir_version(cls): 74 | """Get the installed Elixir version or None if not found.""" 75 | try: 76 | result = subprocess.run(["elixir", "--version"], capture_output=True, text=True, check=False) 77 | if result.returncode == 0: 78 | return result.stdout.strip() 79 | except FileNotFoundError: 80 | return None 81 | return None 82 | 83 | @classmethod 84 | def _setup_runtime_dependencies( 85 | cls, logger: LanguageServerLogger, config: LanguageServerConfig, solidlsp_settings: SolidLSPSettings 86 | ) -> str: 87 | """ 88 | Setup runtime dependencies for Next LS. 89 | Downloads the Next LS binary for the current platform and returns the path to the executable. 90 | """ 91 | # Check if Elixir is available first 92 | elixir_version = cls._get_elixir_version() 93 | if not elixir_version: 94 | raise RuntimeError( 95 | "Elixir is not installed. Please install Elixir from https://elixir-lang.org/install.html and make sure it is added to your PATH." 96 | ) 97 | 98 | logger.log(f"Found Elixir: {elixir_version}", logging.INFO) 99 | 100 | platform_id = PlatformUtils.get_platform_id() 101 | 102 | # Check for Windows and provide a helpful error message 103 | if platform_id.value.startswith("win"): 104 | raise RuntimeError( 105 | "Windows is not supported by Next LS. The Next LS project does not provide Windows binaries. " 106 | "Consider using Windows Subsystem for Linux (WSL) or a virtual machine with Linux/macOS." 107 | ) 108 | 109 | valid_platforms = [ 110 | PlatformId.LINUX_x64, 111 | PlatformId.LINUX_arm64, 112 | PlatformId.OSX_x64, 113 | PlatformId.OSX_arm64, 114 | ] 115 | assert platform_id in valid_platforms, f"Platform {platform_id} is not supported for Next LS at the moment" 116 | 117 | next_ls_dir = os.path.join(cls.ls_resources_dir(solidlsp_settings), "next-ls") 118 | 119 | NEXTLS_VERSION = "v0.23.4" 120 | 121 | # Define runtime dependencies inline 122 | runtime_deps = { 123 | PlatformId.LINUX_x64: RuntimeDependency( 124 | id="next_ls_linux_amd64", 125 | platform_id="linux-x64", 126 | url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_linux_amd64", 127 | archive_type="binary", 128 | binary_name="next_ls_linux_amd64", 129 | extract_path="next_ls", 130 | ), 131 | PlatformId.LINUX_arm64: RuntimeDependency( 132 | id="next_ls_linux_arm64", 133 | platform_id="linux-arm64", 134 | url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_linux_arm64", 135 | archive_type="binary", 136 | binary_name="next_ls_linux_arm64", 137 | extract_path="next_ls", 138 | ), 139 | PlatformId.OSX_x64: RuntimeDependency( 140 | id="next_ls_darwin_amd64", 141 | platform_id="osx-x64", 142 | url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_darwin_amd64", 143 | archive_type="binary", 144 | binary_name="next_ls_darwin_amd64", 145 | extract_path="next_ls", 146 | ), 147 | PlatformId.OSX_arm64: RuntimeDependency( 148 | id="next_ls_darwin_arm64", 149 | platform_id="osx-arm64", 150 | url=f"https://github.com/elixir-tools/next-ls/releases/download/{NEXTLS_VERSION}/next_ls_darwin_arm64", 151 | archive_type="binary", 152 | binary_name="next_ls_darwin_arm64", 153 | extract_path="next_ls", 154 | ), 155 | } 156 | 157 | dependency = runtime_deps[platform_id] 158 | executable_path = os.path.join(next_ls_dir, "nextls") 159 | binary_path = os.path.join(next_ls_dir, dependency.binary_name) 160 | 161 | if not os.path.exists(executable_path): 162 | logger.log(f"Downloading Next LS binary from {dependency.url}", logging.INFO) 163 | FileUtils.download_file(logger, dependency.url, binary_path) 164 | 165 | # Make the binary executable on Unix-like systems 166 | os.chmod(binary_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) 167 | 168 | # Create a symlink with the expected name 169 | if binary_path != executable_path: 170 | if os.path.exists(executable_path): 171 | os.remove(executable_path) 172 | os.symlink(os.path.basename(binary_path), executable_path) 173 | 174 | assert os.path.exists(executable_path), f"Next LS executable not found at {executable_path}" 175 | 176 | logger.log(f"Next LS binary ready at: {executable_path}", logging.INFO) 177 | return executable_path 178 | 179 | def __init__( 180 | self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings 181 | ): 182 | nextls_executable_path = self._setup_runtime_dependencies(logger, config, solidlsp_settings) 183 | 184 | super().__init__( 185 | config, 186 | logger, 187 | repository_root_path, 188 | ProcessLaunchInfo(cmd=f'"{nextls_executable_path}" --stdio', cwd=repository_root_path), 189 | "elixir", 190 | solidlsp_settings, 191 | ) 192 | self.server_ready = threading.Event() 193 | self.request_id = 0 194 | 195 | # Set generous timeout for Next LS which can be slow to initialize and respond 196 | self.set_request_timeout(180.0) # 60 seconds for all environments 197 | 198 | @staticmethod 199 | def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: 200 | """ 201 | Returns the initialize params for the Next LS Language Server. 202 | """ 203 | root_uri = pathlib.Path(repository_absolute_path).as_uri() 204 | initialize_params = { 205 | "processId": os.getpid(), 206 | "locale": "en", 207 | "rootPath": repository_absolute_path, 208 | "rootUri": root_uri, 209 | "initializationOptions": { 210 | "mix_env": "dev", 211 | "mix_target": "host", 212 | "experimental": {"completions": {"enable": False}}, 213 | "extensions": {"credo": {"enable": True, "cli_options": []}}, 214 | }, 215 | "capabilities": { 216 | "textDocument": { 217 | "synchronization": {"didSave": True, "dynamicRegistration": True}, 218 | "completion": { 219 | "dynamicRegistration": True, 220 | "completionItem": {"snippetSupport": True, "documentationFormat": ["markdown", "plaintext"]}, 221 | }, 222 | "definition": {"dynamicRegistration": True}, 223 | "references": {"dynamicRegistration": True}, 224 | "documentSymbol": { 225 | "dynamicRegistration": True, 226 | "hierarchicalDocumentSymbolSupport": True, 227 | "symbolKind": {"valueSet": list(range(1, 27))}, 228 | }, 229 | "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, 230 | "formatting": {"dynamicRegistration": True}, 231 | "codeAction": { 232 | "dynamicRegistration": True, 233 | "codeActionLiteralSupport": { 234 | "codeActionKind": { 235 | "valueSet": [ 236 | "quickfix", 237 | "refactor", 238 | "refactor.extract", 239 | "refactor.inline", 240 | "refactor.rewrite", 241 | "source", 242 | "source.organizeImports", 243 | ] 244 | } 245 | }, 246 | }, 247 | }, 248 | "workspace": { 249 | "workspaceFolders": True, 250 | "didChangeConfiguration": {"dynamicRegistration": True}, 251 | "executeCommand": {"dynamicRegistration": True}, 252 | }, 253 | }, 254 | "workspaceFolders": [{"uri": root_uri, "name": os.path.basename(repository_absolute_path)}], 255 | } 256 | 257 | return initialize_params 258 | 259 | def _start_server(self): 260 | """Start Next LS server process""" 261 | 262 | def register_capability_handler(params): 263 | return 264 | 265 | def window_log_message(msg): 266 | """Handle window/logMessage notifications from Next LS""" 267 | message_text = msg.get("message", "") 268 | self.logger.log(f"LSP: window/logMessage: {message_text}", logging.INFO) 269 | 270 | # Check for the specific Next LS readiness signal 271 | # Based on Next LS source: "Runtime for folder #{name} is ready..." 272 | if "Runtime for folder" in message_text and "is ready..." in message_text: 273 | self.logger.log("Next LS runtime is ready based on official log message", logging.INFO) 274 | self.server_ready.set() 275 | 276 | def do_nothing(params): 277 | return 278 | 279 | def check_server_ready(params): 280 | """ 281 | Handle $/progress notifications from Next LS. 282 | Keep as fallback for error detection, but primary readiness detection 283 | is now done via window/logMessage handler. 284 | """ 285 | value = params.get("value", {}) 286 | 287 | # Check for initialization completion progress (fallback signal) 288 | if value.get("kind") == "end": 289 | message = value.get("message", "") 290 | if "has initialized!" in message: 291 | self.logger.log("Next LS initialization progress completed", logging.INFO) 292 | # Note: We don't set server_ready here - we wait for the log message 293 | 294 | def work_done_progress(params): 295 | """ 296 | Handle $/workDoneProgress notifications from Next LS. 297 | Keep for completeness but primary readiness detection is via window/logMessage. 298 | """ 299 | value = params.get("value", {}) 300 | if value.get("kind") == "end": 301 | self.logger.log("Next LS work done progress completed", logging.INFO) 302 | # Note: We don't set server_ready here - we wait for the log message 303 | 304 | self.server.on_request("client/registerCapability", register_capability_handler) 305 | self.server.on_notification("window/logMessage", window_log_message) 306 | self.server.on_notification("$/progress", check_server_ready) 307 | self.server.on_notification("window/workDoneProgress/create", do_nothing) 308 | self.server.on_notification("$/workDoneProgress", work_done_progress) 309 | self.server.on_notification("textDocument/publishDiagnostics", do_nothing) 310 | 311 | self.logger.log("Starting Next LS server process", logging.INFO) 312 | self.server.start() 313 | initialize_params = self._get_initialize_params(self.repository_root_path) 314 | 315 | self.logger.log( 316 | "Sending initialize request from LSP client to LSP server and awaiting response", 317 | logging.INFO, 318 | ) 319 | init_response = self.server.send.initialize(initialize_params) 320 | 321 | # Verify server capabilities - be more lenient with Next LS 322 | self.logger.log(f"Next LS capabilities: {list(init_response['capabilities'].keys())}", logging.INFO) 323 | 324 | # Next LS may not provide all capabilities immediately, so we check for basic ones 325 | assert "textDocumentSync" in init_response["capabilities"], f"Missing textDocumentSync in {init_response['capabilities']}" 326 | 327 | # Some capabilities might be optional or provided later 328 | if "completionProvider" not in init_response["capabilities"]: 329 | self.logger.log("Warning: completionProvider not available in initial capabilities", logging.WARNING) 330 | if "definitionProvider" not in init_response["capabilities"]: 331 | self.logger.log("Warning: definitionProvider not available in initial capabilities", logging.WARNING) 332 | 333 | self.server.notify.initialized({}) 334 | self.completions_available.set() 335 | 336 | # Wait for Next LS to send the specific "Runtime for folder X is ready..." log message 337 | # This is the authoritative signal that Next LS is truly ready for requests 338 | ready_timeout = 180.0 339 | self.logger.log(f"Waiting up to {ready_timeout} seconds for Next LS runtime readiness...", logging.INFO) 340 | 341 | if self.server_ready.wait(timeout=ready_timeout): 342 | self.logger.log("Next LS is ready and available for requests", logging.INFO) 343 | 344 | # Add a small settling period to ensure background indexing is complete 345 | # Next LS often continues compilation/indexing in background after ready signal 346 | settling_time = 120.0 347 | self.logger.log(f"Allowing {settling_time} seconds for Next LS background indexing to complete...", logging.INFO) 348 | time.sleep(settling_time) 349 | self.logger.log("Next LS settling period complete", logging.INFO) 350 | else: 351 | error_msg = f"Next LS failed to initialize within {ready_timeout} seconds. This may indicate a problem with the Elixir installation, project compilation, or Next LS itself." 352 | self.logger.log(error_msg, logging.ERROR) 353 | raise RuntimeError(error_msg) 354 | ``` -------------------------------------------------------------------------------- /src/serena/tools/tools_base.py: -------------------------------------------------------------------------------- ```python 1 | import inspect 2 | import os 3 | from abc import ABC 4 | from collections.abc import Iterable 5 | from dataclasses import dataclass 6 | from types import TracebackType 7 | from typing import TYPE_CHECKING, Any, Protocol, Self, TypeVar 8 | 9 | from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, func_metadata 10 | from sensai.util import logging 11 | from sensai.util.string import dict_string 12 | 13 | from serena.project import Project 14 | from serena.prompt_factory import PromptFactory 15 | from serena.symbol import LanguageServerSymbolRetriever 16 | from serena.util.class_decorators import singleton 17 | from serena.util.inspection import iter_subclasses 18 | from solidlsp.ls_exceptions import SolidLSPException 19 | 20 | if TYPE_CHECKING: 21 | from serena.agent import MemoriesManager, SerenaAgent 22 | from serena.code_editor import CodeEditor 23 | 24 | log = logging.getLogger(__name__) 25 | T = TypeVar("T") 26 | SUCCESS_RESULT = "OK" 27 | 28 | 29 | class Component(ABC): 30 | def __init__(self, agent: "SerenaAgent"): 31 | self.agent = agent 32 | 33 | def get_project_root(self) -> str: 34 | """ 35 | :return: the root directory of the active project, raises a ValueError if no active project configuration is set 36 | """ 37 | return self.agent.get_project_root() 38 | 39 | @property 40 | def prompt_factory(self) -> PromptFactory: 41 | return self.agent.prompt_factory 42 | 43 | @property 44 | def memories_manager(self) -> "MemoriesManager": 45 | assert self.agent.memories_manager is not None 46 | return self.agent.memories_manager 47 | 48 | def create_language_server_symbol_retriever(self) -> LanguageServerSymbolRetriever: 49 | if not self.agent.is_using_language_server(): 50 | raise Exception("Cannot create LanguageServerSymbolRetriever; agent is not in language server mode.") 51 | language_server = self.agent.language_server 52 | assert language_server is not None 53 | return LanguageServerSymbolRetriever(language_server, agent=self.agent) 54 | 55 | @property 56 | def project(self) -> Project: 57 | return self.agent.get_active_project_or_raise() 58 | 59 | def create_code_editor(self) -> "CodeEditor": 60 | from ..code_editor import JetBrainsCodeEditor, LanguageServerCodeEditor 61 | 62 | if self.agent.is_using_language_server(): 63 | return LanguageServerCodeEditor(self.create_language_server_symbol_retriever(), agent=self.agent) 64 | else: 65 | return JetBrainsCodeEditor(project=self.project, agent=self.agent) 66 | 67 | 68 | class ToolMarker: 69 | """ 70 | Base class for tool markers. 71 | """ 72 | 73 | 74 | class ToolMarkerCanEdit(ToolMarker): 75 | """ 76 | Marker class for all tools that can perform editing operations on files. 77 | """ 78 | 79 | 80 | class ToolMarkerDoesNotRequireActiveProject(ToolMarker): 81 | pass 82 | 83 | 84 | class ToolMarkerOptional(ToolMarker): 85 | """ 86 | Marker class for optional tools that are disabled by default. 87 | """ 88 | 89 | 90 | class ToolMarkerSymbolicRead(ToolMarker): 91 | """ 92 | Marker class for tools that perform symbol read operations. 93 | """ 94 | 95 | 96 | class ToolMarkerSymbolicEdit(ToolMarkerCanEdit): 97 | """ 98 | Marker class for tools that perform symbolic edit operations. 99 | """ 100 | 101 | 102 | class ApplyMethodProtocol(Protocol): 103 | """Callable protocol for the apply method of a tool.""" 104 | 105 | def __call__(self, *args: Any, **kwargs: Any) -> str: 106 | pass 107 | 108 | 109 | class Tool(Component): 110 | # NOTE: each tool should implement the apply method, which is then used in 111 | # the central method of the Tool class `apply_ex`. 112 | # Failure to do so will result in a RuntimeError at tool execution time. 113 | # The apply method is not declared as part of the base Tool interface since we cannot 114 | # know the signature of the (input parameters of the) method in advance. 115 | # 116 | # The docstring and types of the apply method are used to generate the tool description 117 | # (which is use by the LLM, so a good description is important) 118 | # and to validate the tool call arguments. 119 | 120 | @classmethod 121 | def get_name_from_cls(cls) -> str: 122 | name = cls.__name__ 123 | if name.endswith("Tool"): 124 | name = name[:-4] 125 | # convert to snake_case 126 | name = "".join(["_" + c.lower() if c.isupper() else c for c in name]).lstrip("_") 127 | return name 128 | 129 | def get_name(self) -> str: 130 | return self.get_name_from_cls() 131 | 132 | def get_apply_fn(self) -> ApplyMethodProtocol: 133 | apply_fn = getattr(self, "apply") 134 | if apply_fn is None: 135 | raise RuntimeError(f"apply not defined in {self}. Did you forget to implement it?") 136 | return apply_fn 137 | 138 | @classmethod 139 | def can_edit(cls) -> bool: 140 | """ 141 | Returns whether this tool can perform editing operations on code. 142 | 143 | :return: True if the tool can edit code, False otherwise 144 | """ 145 | return issubclass(cls, ToolMarkerCanEdit) 146 | 147 | @classmethod 148 | def get_tool_description(cls) -> str: 149 | docstring = cls.__doc__ 150 | if docstring is None: 151 | return "" 152 | return docstring.strip() 153 | 154 | @classmethod 155 | def get_apply_docstring_from_cls(cls) -> str: 156 | """Get the docstring for the apply method from the class (static metadata). 157 | Needed for creating MCP tools in a separate process without running into serialization issues. 158 | """ 159 | # First try to get from __dict__ to handle dynamic docstring changes 160 | if "apply" in cls.__dict__: 161 | apply_fn = cls.__dict__["apply"] 162 | else: 163 | # Fall back to getattr for inherited methods 164 | apply_fn = getattr(cls, "apply", None) 165 | if apply_fn is None: 166 | raise AttributeError(f"apply method not defined in {cls}. Did you forget to implement it?") 167 | 168 | docstring = apply_fn.__doc__ 169 | if not docstring: 170 | raise AttributeError(f"apply method has no (or empty) docstring in {cls}. Did you forget to implement it?") 171 | return docstring.strip() 172 | 173 | def get_apply_docstring(self) -> str: 174 | """Gets the docstring for the tool application, used by the MCP server.""" 175 | return self.get_apply_docstring_from_cls() 176 | 177 | def get_apply_fn_metadata(self) -> FuncMetadata: 178 | """Gets the metadata for the tool application function, used by the MCP server.""" 179 | return self.get_apply_fn_metadata_from_cls() 180 | 181 | @classmethod 182 | def get_apply_fn_metadata_from_cls(cls) -> FuncMetadata: 183 | """Get the metadata for the apply method from the class (static metadata). 184 | Needed for creating MCP tools in a separate process without running into serialization issues. 185 | """ 186 | # First try to get from __dict__ to handle dynamic docstring changes 187 | if "apply" in cls.__dict__: 188 | apply_fn = cls.__dict__["apply"] 189 | else: 190 | # Fall back to getattr for inherited methods 191 | apply_fn = getattr(cls, "apply", None) 192 | if apply_fn is None: 193 | raise AttributeError(f"apply method not defined in {cls}. Did you forget to implement it?") 194 | 195 | return func_metadata(apply_fn, skip_names=["self", "cls"]) 196 | 197 | def _log_tool_application(self, frame: Any) -> None: 198 | params = {} 199 | ignored_params = {"self", "log_call", "catch_exceptions", "args", "apply_fn"} 200 | for param, value in frame.f_locals.items(): 201 | if param in ignored_params: 202 | continue 203 | if param == "kwargs": 204 | params.update(value) 205 | else: 206 | params[param] = value 207 | log.info(f"{self.get_name_from_cls()}: {dict_string(params)}") 208 | 209 | def _limit_length(self, result: str, max_answer_chars: int) -> str: 210 | if max_answer_chars == -1: 211 | max_answer_chars = self.agent.serena_config.default_max_tool_answer_chars 212 | if max_answer_chars <= 0: 213 | raise ValueError(f"Must be positive or the default (-1), got: {max_answer_chars=}") 214 | if (n_chars := len(result)) > max_answer_chars: 215 | result = ( 216 | f"The answer is too long ({n_chars} characters). " 217 | + "Please try a more specific tool query or raise the max_answer_chars parameter." 218 | ) 219 | return result 220 | 221 | def is_active(self) -> bool: 222 | return self.agent.tool_is_active(self.__class__) 223 | 224 | def apply_ex(self, log_call: bool = True, catch_exceptions: bool = True, **kwargs) -> str: # type: ignore 225 | """ 226 | Applies the tool with logging and exception handling, using the given keyword arguments 227 | """ 228 | 229 | def task() -> str: 230 | apply_fn = self.get_apply_fn() 231 | 232 | try: 233 | if not self.is_active(): 234 | return f"Error: Tool '{self.get_name_from_cls()}' is not active. Active tools: {self.agent.get_active_tool_names()}" 235 | except Exception as e: 236 | return f"RuntimeError while checking if tool {self.get_name_from_cls()} is active: {e}" 237 | 238 | if log_call: 239 | self._log_tool_application(inspect.currentframe()) 240 | try: 241 | # check whether the tool requires an active project and language server 242 | if not isinstance(self, ToolMarkerDoesNotRequireActiveProject): 243 | if self.agent._active_project is None: 244 | return ( 245 | "Error: No active project. Ask the user to provide the project path or to select a project from this list of known projects: " 246 | + f"{self.agent.serena_config.project_names}" 247 | ) 248 | if self.agent.is_using_language_server() and not self.agent.is_language_server_running(): 249 | log.info("Language server is not running. Starting it ...") 250 | self.agent.reset_language_server() 251 | 252 | # apply the actual tool 253 | try: 254 | result = apply_fn(**kwargs) 255 | except SolidLSPException as e: 256 | if e.is_language_server_terminated(): 257 | log.error(f"Language server terminated while executing tool ({e}). Restarting the language server and retrying ...") 258 | self.agent.reset_language_server() 259 | result = apply_fn(**kwargs) 260 | else: 261 | raise 262 | 263 | # record tool usage 264 | self.agent.record_tool_usage_if_enabled(kwargs, result, self) 265 | 266 | except Exception as e: 267 | if not catch_exceptions: 268 | raise 269 | msg = f"Error executing tool: {e}" 270 | log.error(f"Error executing tool: {e}", exc_info=e) 271 | result = msg 272 | 273 | if log_call: 274 | log.info(f"Result: {result}") 275 | 276 | try: 277 | if self.agent.language_server is not None: 278 | self.agent.language_server.save_cache() 279 | except Exception as e: 280 | log.error(f"Error saving language server cache: {e}") 281 | 282 | return result 283 | 284 | future = self.agent.issue_task(task, name=self.__class__.__name__) 285 | return future.result(timeout=self.agent.serena_config.tool_timeout) 286 | 287 | 288 | class EditedFileContext: 289 | """ 290 | Context manager for file editing. 291 | 292 | Create the context, then use `set_updated_content` to set the new content, the original content 293 | being provided in `original_content`. 294 | When exiting the context without an exception, the updated content will be written back to the file. 295 | """ 296 | 297 | def __init__(self, relative_path: str, agent: "SerenaAgent"): 298 | self._project = agent.get_active_project() 299 | assert self._project is not None 300 | self._abs_path = os.path.join(self._project.project_root, relative_path) 301 | if not os.path.isfile(self._abs_path): 302 | raise FileNotFoundError(f"File {self._abs_path} does not exist.") 303 | with open(self._abs_path, encoding=self._project.project_config.encoding) as f: 304 | self._original_content = f.read() 305 | self._updated_content: str | None = None 306 | 307 | def __enter__(self) -> Self: 308 | return self 309 | 310 | def get_original_content(self) -> str: 311 | """ 312 | :return: the original content of the file before any modifications. 313 | """ 314 | return self._original_content 315 | 316 | def set_updated_content(self, content: str) -> None: 317 | """ 318 | Sets the updated content of the file, which will be written back to the file 319 | when the context is exited without an exception. 320 | 321 | :param content: the updated content of the file 322 | """ 323 | self._updated_content = content 324 | 325 | def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> None: 326 | if self._updated_content is not None and exc_type is None: 327 | assert self._project is not None 328 | with open(self._abs_path, "w", encoding=self._project.project_config.encoding) as f: 329 | f.write(self._updated_content) 330 | log.info(f"Updated content written to {self._abs_path}") 331 | # Language servers should automatically detect the change and update its state accordingly. 332 | # If they do not, we may have to add a call to notify it. 333 | 334 | 335 | @dataclass(kw_only=True) 336 | class RegisteredTool: 337 | tool_class: type[Tool] 338 | is_optional: bool 339 | tool_name: str 340 | 341 | 342 | @singleton 343 | class ToolRegistry: 344 | def __init__(self) -> None: 345 | self._tool_dict: dict[str, RegisteredTool] = {} 346 | for cls in iter_subclasses(Tool): 347 | if not cls.__module__.startswith("serena.tools"): 348 | continue 349 | is_optional = issubclass(cls, ToolMarkerOptional) 350 | name = cls.get_name_from_cls() 351 | if name in self._tool_dict: 352 | raise ValueError(f"Duplicate tool name found: {name}. Tool classes must have unique names.") 353 | self._tool_dict[name] = RegisteredTool(tool_class=cls, is_optional=is_optional, tool_name=name) 354 | 355 | def get_tool_class_by_name(self, tool_name: str) -> type[Tool]: 356 | return self._tool_dict[tool_name].tool_class 357 | 358 | def get_all_tool_classes(self) -> list[type[Tool]]: 359 | return list(t.tool_class for t in self._tool_dict.values()) 360 | 361 | def get_tool_classes_default_enabled(self) -> list[type[Tool]]: 362 | """ 363 | :return: the list of tool classes that are enabled by default (i.e. non-optional tools). 364 | """ 365 | return [t.tool_class for t in self._tool_dict.values() if not t.is_optional] 366 | 367 | def get_tool_classes_optional(self) -> list[type[Tool]]: 368 | """ 369 | :return: the list of tool classes that are optional (i.e. disabled by default). 370 | """ 371 | return [t.tool_class for t in self._tool_dict.values() if t.is_optional] 372 | 373 | def get_tool_names_default_enabled(self) -> list[str]: 374 | """ 375 | :return: the list of tool names that are enabled by default (i.e. non-optional tools). 376 | """ 377 | return [t.tool_name for t in self._tool_dict.values() if not t.is_optional] 378 | 379 | def get_tool_names_optional(self) -> list[str]: 380 | """ 381 | :return: the list of tool names that are optional (i.e. disabled by default). 382 | """ 383 | return [t.tool_name for t in self._tool_dict.values() if t.is_optional] 384 | 385 | def get_tool_names(self) -> list[str]: 386 | """ 387 | :return: the list of all tool names. 388 | """ 389 | return list(self._tool_dict.keys()) 390 | 391 | def print_tool_overview( 392 | self, tools: Iterable[type[Tool] | Tool] | None = None, include_optional: bool = False, only_optional: bool = False 393 | ) -> None: 394 | """ 395 | Print a summary of the tools. If no tools are passed, a summary of the selection of tools (all, default or only optional) is printed. 396 | """ 397 | if tools is None: 398 | if only_optional: 399 | tools = self.get_tool_classes_optional() 400 | elif include_optional: 401 | tools = self.get_all_tool_classes() 402 | else: 403 | tools = self.get_tool_classes_default_enabled() 404 | 405 | tool_dict: dict[str, type[Tool] | Tool] = {} 406 | for tool_class in tools: 407 | tool_dict[tool_class.get_name_from_cls()] = tool_class 408 | for tool_name in sorted(tool_dict.keys()): 409 | tool_class = tool_dict[tool_name] 410 | print(f" * `{tool_name}`: {tool_class.get_tool_description().strip()}") 411 | 412 | def is_valid_tool_name(self, tool_name: str) -> bool: 413 | return tool_name in self._tool_dict 414 | ``` -------------------------------------------------------------------------------- /src/serena/tools/symbol_tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Language server-related tools 3 | """ 4 | 5 | import dataclasses 6 | import json 7 | import os 8 | from collections.abc import Sequence 9 | from copy import copy 10 | from typing import Any 11 | 12 | from serena.tools import ( 13 | SUCCESS_RESULT, 14 | Tool, 15 | ToolMarkerSymbolicEdit, 16 | ToolMarkerSymbolicRead, 17 | ) 18 | from serena.tools.tools_base import ToolMarkerOptional 19 | from solidlsp.ls_types import SymbolKind 20 | 21 | 22 | def _sanitize_symbol_dict(symbol_dict: dict[str, Any]) -> dict[str, Any]: 23 | """ 24 | Sanitize a symbol dictionary inplace by removing unnecessary information. 25 | """ 26 | # We replace the location entry, which repeats line information already included in body_location 27 | # and has unnecessary information on column, by just the relative path. 28 | symbol_dict = copy(symbol_dict) 29 | s_relative_path = symbol_dict.get("location", {}).get("relative_path") 30 | if s_relative_path is not None: 31 | symbol_dict["relative_path"] = s_relative_path 32 | symbol_dict.pop("location", None) 33 | # also remove name, name_path should be enough 34 | symbol_dict.pop("name") 35 | return symbol_dict 36 | 37 | 38 | class RestartLanguageServerTool(Tool, ToolMarkerOptional): 39 | """Restarts the language server, may be necessary when edits not through Serena happen.""" 40 | 41 | def apply(self) -> str: 42 | """Use this tool only on explicit user request or after confirmation. 43 | It may be necessary to restart the language server if it hangs. 44 | """ 45 | self.agent.reset_language_server() 46 | return SUCCESS_RESULT 47 | 48 | 49 | class GetSymbolsOverviewTool(Tool, ToolMarkerSymbolicRead): 50 | """ 51 | Gets an overview of the top-level symbols defined in a given file. 52 | """ 53 | 54 | def apply(self, relative_path: str, max_answer_chars: int = -1) -> str: 55 | """ 56 | Use this tool to get a high-level understanding of the code symbols in a file. 57 | This should be the first tool to call when you want to understand a new file, unless you already know 58 | what you are looking for. 59 | 60 | :param relative_path: the relative path to the file to get the overview of 61 | :param max_answer_chars: if the overview is longer than this number of characters, 62 | no content will be returned. -1 means the default value from the config will be used. 63 | Don't adjust unless there is really no other way to get the content required for the task. 64 | :return: a JSON object containing info about top-level symbols in the file 65 | """ 66 | symbol_retriever = self.create_language_server_symbol_retriever() 67 | file_path = os.path.join(self.project.project_root, relative_path) 68 | 69 | # The symbol overview is capable of working with both files and directories, 70 | # but we want to ensure that the user provides a file path. 71 | if not os.path.exists(file_path): 72 | raise FileNotFoundError(f"File or directory {relative_path} does not exist in the project.") 73 | if os.path.isdir(file_path): 74 | raise ValueError(f"Expected a file path, but got a directory path: {relative_path}. ") 75 | result = symbol_retriever.get_symbol_overview(relative_path)[relative_path] 76 | result_json_str = json.dumps([dataclasses.asdict(i) for i in result]) 77 | return self._limit_length(result_json_str, max_answer_chars) 78 | 79 | 80 | class FindSymbolTool(Tool, ToolMarkerSymbolicRead): 81 | """ 82 | Performs a global (or local) search for symbols with/containing a given name/substring (optionally filtered by type). 83 | """ 84 | 85 | # noinspection PyDefaultArgument 86 | def apply( 87 | self, 88 | name_path: str, 89 | depth: int = 0, 90 | relative_path: str = "", 91 | include_body: bool = False, 92 | include_kinds: list[int] = [], # noqa: B006 93 | exclude_kinds: list[int] = [], # noqa: B006 94 | substring_matching: bool = False, 95 | max_answer_chars: int = -1, 96 | ) -> str: 97 | """ 98 | Retrieves information on all symbols/code entities (classes, methods, etc.) based on the given `name_path`, 99 | which represents a pattern for the symbol's path within the symbol tree of a single file. 100 | The returned symbol location can be used for edits or further queries. 101 | Specify `depth > 0` to retrieve children (e.g., methods of a class). 102 | 103 | The matching behavior is determined by the structure of `name_path`, which can 104 | either be a simple name (e.g. "method") or a name path like "class/method" (relative name path) 105 | or "/class/method" (absolute name path). Note that the name path is not a path in the file system 106 | but rather a path in the symbol tree **within a single file**. Thus, file or directory names should never 107 | be included in the `name_path`. For restricting the search to a single file or directory, 108 | the `within_relative_path` parameter should be used instead. The retrieved symbols' `name_path` attribute 109 | will always be composed of symbol names, never file or directory names. 110 | 111 | Key aspects of the name path matching behavior: 112 | - Trailing slashes in `name_path` play no role and are ignored. 113 | - The name of the retrieved symbols will match (either exactly or as a substring) 114 | the last segment of `name_path`, while other segments will restrict the search to symbols that 115 | have a desired sequence of ancestors. 116 | - If there is no starting or intermediate slash in `name_path`, there is no 117 | restriction on the ancestor symbols. For example, passing `method` will match 118 | against symbols with name paths like `method`, `class/method`, `class/nested_class/method`, etc. 119 | - If `name_path` contains a `/` but doesn't start with a `/`, the matching is restricted to symbols 120 | with the same ancestors as the last segment of `name_path`. For example, passing `class/method` will match against 121 | `class/method` as well as `nested_class/class/method` but not `method`. 122 | - If `name_path` starts with a `/`, it will be treated as an absolute name path pattern, meaning 123 | that the first segment of it must match the first segment of the symbol's name path. 124 | For example, passing `/class` will match only against top-level symbols like `class` but not against `nested_class/class`. 125 | Passing `/class/method` will match against `class/method` but not `nested_class/class/method` or `method`. 126 | 127 | 128 | :param name_path: The name path pattern to search for, see above for details. 129 | :param depth: Depth to retrieve descendants (e.g., 1 for class methods/attributes). 130 | :param relative_path: Optional. Restrict search to this file or directory. If None, searches entire codebase. 131 | If a directory is passed, the search will be restricted to the files in that directory. 132 | If a file is passed, the search will be restricted to that file. 133 | If you have some knowledge about the codebase, you should use this parameter, as it will significantly 134 | speed up the search as well as reduce the number of results. 135 | :param include_body: If True, include the symbol's source code. Use judiciously. 136 | :param include_kinds: Optional. List of LSP symbol kind integers to include. (e.g., 5 for Class, 12 for Function). 137 | Valid kinds: 1=file, 2=module, 3=namespace, 4=package, 5=class, 6=method, 7=property, 8=field, 9=constructor, 10=enum, 138 | 11=interface, 12=function, 13=variable, 14=constant, 15=string, 16=number, 17=boolean, 18=array, 19=object, 139 | 20=key, 21=null, 22=enum member, 23=struct, 24=event, 25=operator, 26=type parameter. 140 | If not provided, all kinds are included. 141 | :param exclude_kinds: Optional. List of LSP symbol kind integers to exclude. Takes precedence over `include_kinds`. 142 | If not provided, no kinds are excluded. 143 | :param substring_matching: If True, use substring matching for the last segment of `name`. 144 | :param max_answer_chars: Max characters for the JSON result. If exceeded, no content is returned. 145 | -1 means the default value from the config will be used. 146 | :return: a list of symbols (with locations) matching the name. 147 | """ 148 | parsed_include_kinds: Sequence[SymbolKind] | None = [SymbolKind(k) for k in include_kinds] if include_kinds else None 149 | parsed_exclude_kinds: Sequence[SymbolKind] | None = [SymbolKind(k) for k in exclude_kinds] if exclude_kinds else None 150 | symbol_retriever = self.create_language_server_symbol_retriever() 151 | symbols = symbol_retriever.find_by_name( 152 | name_path, 153 | include_body=include_body, 154 | include_kinds=parsed_include_kinds, 155 | exclude_kinds=parsed_exclude_kinds, 156 | substring_matching=substring_matching, 157 | within_relative_path=relative_path, 158 | ) 159 | symbol_dicts = [_sanitize_symbol_dict(s.to_dict(kind=True, location=True, depth=depth, include_body=include_body)) for s in symbols] 160 | result = json.dumps(symbol_dicts) 161 | return self._limit_length(result, max_answer_chars) 162 | 163 | 164 | class FindReferencingSymbolsTool(Tool, ToolMarkerSymbolicRead): 165 | """ 166 | Finds symbols that reference the symbol at the given location (optionally filtered by type). 167 | """ 168 | 169 | # noinspection PyDefaultArgument 170 | def apply( 171 | self, 172 | name_path: str, 173 | relative_path: str, 174 | include_kinds: list[int] = [], # noqa: B006 175 | exclude_kinds: list[int] = [], # noqa: B006 176 | max_answer_chars: int = -1, 177 | ) -> str: 178 | """ 179 | Finds references to the symbol at the given `name_path`. The result will contain metadata about the referencing symbols 180 | as well as a short code snippet around the reference. 181 | 182 | :param name_path: for finding the symbol to find references for, same logic as in the `find_symbol` tool. 183 | :param relative_path: the relative path to the file containing the symbol for which to find references. 184 | Note that here you can't pass a directory but must pass a file. 185 | :param include_kinds: same as in the `find_symbol` tool. 186 | :param exclude_kinds: same as in the `find_symbol` tool. 187 | :param max_answer_chars: same as in the `find_symbol` tool. 188 | :return: a list of JSON objects with the symbols referencing the requested symbol 189 | """ 190 | include_body = False # It is probably never a good idea to include the body of the referencing symbols 191 | parsed_include_kinds: Sequence[SymbolKind] | None = [SymbolKind(k) for k in include_kinds] if include_kinds else None 192 | parsed_exclude_kinds: Sequence[SymbolKind] | None = [SymbolKind(k) for k in exclude_kinds] if exclude_kinds else None 193 | symbol_retriever = self.create_language_server_symbol_retriever() 194 | references_in_symbols = symbol_retriever.find_referencing_symbols( 195 | name_path, 196 | relative_file_path=relative_path, 197 | include_body=include_body, 198 | include_kinds=parsed_include_kinds, 199 | exclude_kinds=parsed_exclude_kinds, 200 | ) 201 | reference_dicts = [] 202 | for ref in references_in_symbols: 203 | ref_dict = ref.symbol.to_dict(kind=True, location=True, depth=0, include_body=include_body) 204 | ref_dict = _sanitize_symbol_dict(ref_dict) 205 | if not include_body: 206 | ref_relative_path = ref.symbol.location.relative_path 207 | assert ref_relative_path is not None, f"Referencing symbol {ref.symbol.name} has no relative path, this is likely a bug." 208 | content_around_ref = self.project.retrieve_content_around_line( 209 | relative_file_path=ref_relative_path, line=ref.line, context_lines_before=1, context_lines_after=1 210 | ) 211 | ref_dict["content_around_reference"] = content_around_ref.to_display_string() 212 | reference_dicts.append(ref_dict) 213 | result = json.dumps(reference_dicts) 214 | return self._limit_length(result, max_answer_chars) 215 | 216 | 217 | class ReplaceSymbolBodyTool(Tool, ToolMarkerSymbolicEdit): 218 | """ 219 | Replaces the full definition of a symbol. 220 | """ 221 | 222 | def apply( 223 | self, 224 | name_path: str, 225 | relative_path: str, 226 | body: str, 227 | ) -> str: 228 | r""" 229 | Replaces the body of the symbol with the given `name_path`. 230 | 231 | The tool shall be used to replace symbol bodies that have been previously retrieved 232 | (e.g. via `find_symbol`). 233 | IMPORTANT: Do not use this tool if you do not know what exactly constitutes the body of the symbol. 234 | 235 | :param name_path: for finding the symbol to replace, same logic as in the `find_symbol` tool. 236 | :param relative_path: the relative path to the file containing the symbol 237 | :param body: the new symbol body. The symbol body is the definition of a symbol 238 | in the programming language, including e.g. the signature line for functions. 239 | IMPORTANT: The body does NOT include any preceding docstrings/comments or imports, in particular. 240 | """ 241 | code_editor = self.create_code_editor() 242 | code_editor.replace_body( 243 | name_path, 244 | relative_file_path=relative_path, 245 | body=body, 246 | ) 247 | return SUCCESS_RESULT 248 | 249 | 250 | class InsertAfterSymbolTool(Tool, ToolMarkerSymbolicEdit): 251 | """ 252 | Inserts content after the end of the definition of a given symbol. 253 | """ 254 | 255 | def apply( 256 | self, 257 | name_path: str, 258 | relative_path: str, 259 | body: str, 260 | ) -> str: 261 | """ 262 | Inserts the given body/content after the end of the definition of the given symbol (via the symbol's location). 263 | A typical use case is to insert a new class, function, method, field or variable assignment. 264 | 265 | :param name_path: name path of the symbol after which to insert content (definitions in the `find_symbol` tool apply) 266 | :param relative_path: the relative path to the file containing the symbol 267 | :param body: the body/content to be inserted. The inserted code shall begin with the next line after 268 | the symbol. 269 | """ 270 | code_editor = self.create_code_editor() 271 | code_editor.insert_after_symbol(name_path, relative_file_path=relative_path, body=body) 272 | return SUCCESS_RESULT 273 | 274 | 275 | class InsertBeforeSymbolTool(Tool, ToolMarkerSymbolicEdit): 276 | """ 277 | Inserts content before the beginning of the definition of a given symbol. 278 | """ 279 | 280 | def apply( 281 | self, 282 | name_path: str, 283 | relative_path: str, 284 | body: str, 285 | ) -> str: 286 | """ 287 | Inserts the given content before the beginning of the definition of the given symbol (via the symbol's location). 288 | A typical use case is to insert a new class, function, method, field or variable assignment; or 289 | a new import statement before the first symbol in the file. 290 | 291 | :param name_path: name path of the symbol before which to insert content (definitions in the `find_symbol` tool apply) 292 | :param relative_path: the relative path to the file containing the symbol 293 | :param body: the body/content to be inserted before the line in which the referenced symbol is defined 294 | """ 295 | code_editor = self.create_code_editor() 296 | code_editor.insert_before_symbol(name_path, relative_file_path=relative_path, body=body) 297 | return SUCCESS_RESULT 298 | 299 | 300 | class RenameSymbolTool(Tool, ToolMarkerSymbolicEdit): 301 | """ 302 | Renames a symbol throughout the codebase using language server refactoring capabilities. 303 | """ 304 | 305 | def apply( 306 | self, 307 | name_path: str, 308 | relative_path: str, 309 | new_name: str, 310 | ) -> str: 311 | """ 312 | Renames the symbol with the given `name_path` to `new_name` throughout the entire codebase. 313 | Note: for languages with method overloading, like Java, name_path may have to include a method's 314 | signature to uniquely identify a method. 315 | 316 | :param name_path: name path of the symbol to rename (definitions in the `find_symbol` tool apply) 317 | :param relative_path: the relative path to the file containing the symbol to rename 318 | :param new_name: the new name for the symbol 319 | :return: result summary indicating success or failure 320 | """ 321 | code_editor = self.create_code_editor() 322 | modified_files = code_editor.rename_symbol(name_path, relative_file_path=relative_path, new_name=new_name) 323 | return f"Successfully renamed '{name_path}' to '{new_name}' in {len(modified_files)} file(s)" 324 | ``` -------------------------------------------------------------------------------- /src/serena/code_editor.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | import logging 3 | import os 4 | from abc import ABC, abstractmethod 5 | from collections.abc import Iterable, Iterator, Reversible 6 | from contextlib import contextmanager 7 | from typing import TYPE_CHECKING, Generic, Optional, TypeVar, cast 8 | 9 | from serena.symbol import JetBrainsSymbol, LanguageServerSymbol, LanguageServerSymbolRetriever, PositionInFile, Symbol 10 | from solidlsp import SolidLanguageServer, ls_types 11 | from solidlsp.ls import LSPFileBuffer 12 | from solidlsp.ls_types import extract_text_edits 13 | from solidlsp.ls_utils import PathUtils, TextUtils 14 | 15 | from .project import Project 16 | from .tools.jetbrains_plugin_client import JetBrainsPluginClient 17 | 18 | if TYPE_CHECKING: 19 | from .agent import SerenaAgent 20 | 21 | 22 | log = logging.getLogger(__name__) 23 | TSymbol = TypeVar("TSymbol", bound=Symbol) 24 | 25 | 26 | class CodeEditor(Generic[TSymbol], ABC): 27 | def __init__(self, project_root: str, agent: Optional["SerenaAgent"] = None) -> None: 28 | self.project_root = project_root 29 | self.agent = agent 30 | 31 | class EditedFile(ABC): 32 | @abstractmethod 33 | def get_contents(self) -> str: 34 | """ 35 | :return: the contents of the file. 36 | """ 37 | 38 | @abstractmethod 39 | def delete_text_between_positions(self, start_pos: PositionInFile, end_pos: PositionInFile) -> None: 40 | pass 41 | 42 | @abstractmethod 43 | def insert_text_at_position(self, pos: PositionInFile, text: str) -> None: 44 | pass 45 | 46 | @contextmanager 47 | def _open_file_context(self, relative_path: str) -> Iterator["CodeEditor.EditedFile"]: 48 | """ 49 | Context manager for opening a file 50 | """ 51 | raise NotImplementedError("This method must be overridden for each subclass") 52 | 53 | @contextmanager 54 | def _edited_file_context(self, relative_path: str) -> Iterator["CodeEditor.EditedFile"]: 55 | """ 56 | Context manager for editing a file. 57 | """ 58 | with self._open_file_context(relative_path) as edited_file: 59 | yield edited_file 60 | # save the file 61 | abs_path = os.path.join(self.project_root, relative_path) 62 | with open(abs_path, "w", encoding="utf-8") as f: 63 | f.write(edited_file.get_contents()) 64 | 65 | @abstractmethod 66 | def _find_unique_symbol(self, name_path: str, relative_file_path: str) -> TSymbol: 67 | """ 68 | Finds the unique symbol with the given name in the given file. 69 | If no such symbol exists, raises a ValueError. 70 | 71 | :param name_path: the name path 72 | :param relative_file_path: the relative path of the file in which to search for the symbol. 73 | :return: the unique symbol 74 | """ 75 | 76 | def replace_body(self, name_path: str, relative_file_path: str, body: str) -> None: 77 | """ 78 | Replaces the body of the symbol with the given name_path in the given file. 79 | 80 | :param name_path: the name path of the symbol to replace. 81 | :param relative_file_path: the relative path of the file in which the symbol is defined. 82 | :param body: the new body 83 | """ 84 | symbol = self._find_unique_symbol(name_path, relative_file_path) 85 | start_pos = symbol.get_body_start_position_or_raise() 86 | end_pos = symbol.get_body_end_position_or_raise() 87 | 88 | with self._edited_file_context(relative_file_path) as edited_file: 89 | # make sure the replacement adds no additional newlines (before or after) - all newlines 90 | # and whitespace before/after should remain the same, so we strip it entirely 91 | body = body.strip() 92 | 93 | edited_file.delete_text_between_positions(start_pos, end_pos) 94 | edited_file.insert_text_at_position(start_pos, body) 95 | 96 | @staticmethod 97 | def _count_leading_newlines(text: Iterable) -> int: 98 | cnt = 0 99 | for c in text: 100 | if c == "\n": 101 | cnt += 1 102 | elif c == "\r": 103 | continue 104 | else: 105 | break 106 | return cnt 107 | 108 | @classmethod 109 | def _count_trailing_newlines(cls, text: Reversible) -> int: 110 | return cls._count_leading_newlines(reversed(text)) 111 | 112 | def insert_after_symbol(self, name_path: str, relative_file_path: str, body: str) -> None: 113 | """ 114 | Inserts content after the symbol with the given name in the given file. 115 | """ 116 | symbol = self._find_unique_symbol(name_path, relative_file_path) 117 | 118 | # make sure body always ends with at least one newline 119 | if not body.endswith("\n"): 120 | body += "\n" 121 | 122 | pos = symbol.get_body_end_position_or_raise() 123 | 124 | # start at the beginning of the next line 125 | col = 0 126 | line = pos.line + 1 127 | 128 | # make sure a suitable number of leading empty lines is used (at least 0/1 depending on the symbol type, 129 | # otherwise as many as the caller wanted to insert) 130 | original_leading_newlines = self._count_leading_newlines(body) 131 | body = body.lstrip("\r\n") 132 | min_empty_lines = 0 133 | if symbol.is_neighbouring_definition_separated_by_empty_line(): 134 | min_empty_lines = 1 135 | num_leading_empty_lines = max(min_empty_lines, original_leading_newlines) 136 | if num_leading_empty_lines: 137 | body = ("\n" * num_leading_empty_lines) + body 138 | 139 | # make sure the one line break succeeding the original symbol, which we repurposed as prefix via 140 | # `line += 1`, is replaced 141 | body = body.rstrip("\r\n") + "\n" 142 | 143 | with self._edited_file_context(relative_file_path) as edited_file: 144 | edited_file.insert_text_at_position(PositionInFile(line, col), body) 145 | 146 | def insert_before_symbol(self, name_path: str, relative_file_path: str, body: str) -> None: 147 | """ 148 | Inserts content before the symbol with the given name in the given file. 149 | """ 150 | symbol = self._find_unique_symbol(name_path, relative_file_path) 151 | symbol_start_pos = symbol.get_body_start_position_or_raise() 152 | 153 | # insert position is the start of line where the symbol is defined 154 | line = symbol_start_pos.line 155 | col = 0 156 | 157 | original_trailing_empty_lines = self._count_trailing_newlines(body) - 1 158 | 159 | # ensure eol is present at end 160 | body = body.rstrip() + "\n" 161 | 162 | # add suitable number of trailing empty lines after the body (at least 0/1 depending on the symbol type, 163 | # otherwise as many as the caller wanted to insert) 164 | min_trailing_empty_lines = 0 165 | if symbol.is_neighbouring_definition_separated_by_empty_line(): 166 | min_trailing_empty_lines = 1 167 | num_trailing_newlines = max(min_trailing_empty_lines, original_trailing_empty_lines) 168 | body += "\n" * num_trailing_newlines 169 | 170 | # apply edit 171 | with self._edited_file_context(relative_file_path) as edited_file: 172 | edited_file.insert_text_at_position(PositionInFile(line=line, col=col), body) 173 | 174 | def insert_at_line(self, relative_path: str, line: int, content: str) -> None: 175 | """ 176 | Inserts content at the given line in the given file. 177 | 178 | :param relative_path: the relative path of the file in which to insert content 179 | :param line: the 0-based index of the line to insert content at 180 | :param content: the content to insert 181 | """ 182 | with self._edited_file_context(relative_path) as edited_file: 183 | edited_file.insert_text_at_position(PositionInFile(line, 0), content) 184 | 185 | def delete_lines(self, relative_path: str, start_line: int, end_line: int) -> None: 186 | """ 187 | Deletes lines in the given file. 188 | 189 | :param relative_path: the relative path of the file in which to delete lines 190 | :param start_line: the 0-based index of the first line to delete (inclusive) 191 | :param end_line: the 0-based index of the last line to delete (inclusive) 192 | """ 193 | start_col = 0 194 | end_line_for_delete = end_line + 1 195 | end_col = 0 196 | with self._edited_file_context(relative_path) as edited_file: 197 | start_pos = PositionInFile(line=start_line, col=start_col) 198 | end_pos = PositionInFile(line=end_line_for_delete, col=end_col) 199 | edited_file.delete_text_between_positions(start_pos, end_pos) 200 | 201 | def delete_symbol(self, name_path: str, relative_file_path: str) -> None: 202 | """ 203 | Deletes the symbol with the given name in the given file. 204 | """ 205 | symbol = self._find_unique_symbol(name_path, relative_file_path) 206 | start_pos = symbol.get_body_start_position_or_raise() 207 | end_pos = symbol.get_body_end_position_or_raise() 208 | with self._edited_file_context(relative_file_path) as edited_file: 209 | edited_file.delete_text_between_positions(start_pos, end_pos) 210 | 211 | @abstractmethod 212 | def rename_symbol(self, name_path: str, relative_file_path: str, new_name: str) -> list[str]: 213 | """ 214 | Renames the symbol with the given name throughout the codebase. 215 | 216 | :param name_path: the name path of the symbol to rename 217 | :param relative_file_path: the relative path of the file containing the symbol 218 | :param new_name: the new name for the symbol 219 | :return: list of files that were modified 220 | """ 221 | 222 | 223 | class LanguageServerCodeEditor(CodeEditor[LanguageServerSymbol]): 224 | def __init__(self, symbol_retriever: LanguageServerSymbolRetriever, agent: Optional["SerenaAgent"] = None): 225 | super().__init__(project_root=symbol_retriever.get_language_server().repository_root_path, agent=agent) 226 | self._symbol_retriever = symbol_retriever 227 | 228 | @property 229 | def _lang_server(self) -> SolidLanguageServer: 230 | return self._symbol_retriever.get_language_server() 231 | 232 | class EditedFile(CodeEditor.EditedFile): 233 | def __init__(self, lang_server: SolidLanguageServer, relative_path: str, file_buffer: LSPFileBuffer): 234 | self._lang_server = lang_server 235 | self._relative_path = relative_path 236 | self._file_buffer = file_buffer 237 | 238 | def get_contents(self) -> str: 239 | return self._file_buffer.contents 240 | 241 | def delete_text_between_positions(self, start_pos: PositionInFile, end_pos: PositionInFile) -> None: 242 | self._lang_server.delete_text_between_positions(self._relative_path, start_pos.to_lsp_position(), end_pos.to_lsp_position()) 243 | 244 | def insert_text_at_position(self, pos: PositionInFile, text: str) -> None: 245 | self._lang_server.insert_text_at_position(self._relative_path, pos.line, pos.col, text) 246 | 247 | def apply_text_edits(self, text_edits: list[ls_types.TextEdit]) -> None: 248 | return self._lang_server.apply_text_edits_to_file(self._relative_path, text_edits) 249 | 250 | @contextmanager 251 | def _open_file_context(self, relative_path: str) -> Iterator["CodeEditor.EditedFile"]: 252 | with self._lang_server.open_file(relative_path) as file_buffer: 253 | yield self.EditedFile(self._lang_server, relative_path, file_buffer) 254 | 255 | def _get_code_file_content(self, relative_path: str) -> str: 256 | """Get the content of a file using the language server.""" 257 | return self._lang_server.language_server.retrieve_full_file_content(relative_path) 258 | 259 | def _find_unique_symbol(self, name_path: str, relative_file_path: str) -> LanguageServerSymbol: 260 | symbol_candidates = self._symbol_retriever.find_by_name(name_path, within_relative_path=relative_file_path) 261 | if len(symbol_candidates) == 0: 262 | raise ValueError(f"No symbol with name {name_path} found in file {relative_file_path}") 263 | if len(symbol_candidates) > 1: 264 | raise ValueError( 265 | f"Found multiple {len(symbol_candidates)} symbols with name {name_path} in file {relative_file_path}. " 266 | "Their locations are: \n " + json.dumps([s.location.to_dict() for s in symbol_candidates], indent=2) 267 | ) 268 | return symbol_candidates[0] 269 | 270 | def _apply_workspace_edit(self, workspace_edit: ls_types.WorkspaceEdit) -> list[str]: 271 | """ 272 | Apply a WorkspaceEdit by making the changes to files. 273 | 274 | :param workspace_edit: The WorkspaceEdit containing the changes to apply 275 | :return: List of relative file paths that were modified 276 | """ 277 | uri_to_edits = extract_text_edits(workspace_edit) 278 | modified_relative_paths = [] 279 | 280 | # Handle the 'changes' format (URI -> list of TextEdits) 281 | for uri, edits in uri_to_edits.items(): 282 | file_path = PathUtils.uri_to_path(uri) 283 | relative_path = os.path.relpath(file_path, self._lang_server.repository_root_path) 284 | modified_relative_paths.append(relative_path) 285 | with self._edited_file_context(relative_path) as edited_file: 286 | edited_file = cast(self.EditedFile, edited_file) 287 | edited_file.apply_text_edits(edits) 288 | return modified_relative_paths 289 | 290 | def rename_symbol(self, name_path: str, relative_file_path: str, new_name: str) -> list[str]: 291 | """ 292 | Renames the symbol with the given name throughout the codebase. 293 | :param name_path: 294 | :param relative_file_path: 295 | :param new_name: 296 | :return: list of files that were modified 297 | """ 298 | symbol = self._find_unique_symbol(name_path, relative_file_path) 299 | if not symbol.location.has_position_in_file(): 300 | raise ValueError(f"Symbol '{name_path}' does not have a valid position in file for renaming") 301 | 302 | # After has_position_in_file check, line and column are guaranteed to be non-None 303 | assert symbol.location.line is not None 304 | assert symbol.location.column is not None 305 | 306 | rename_result = self._lang_server.request_rename_symbol_edit( 307 | relative_file_path=relative_file_path, line=symbol.location.line, column=symbol.location.column, new_name=new_name 308 | ) 309 | if rename_result is None: 310 | raise ValueError( 311 | f"Language server for {self._lang_server.language_id} returned no rename edits for symbol '{name_path}'. " 312 | f"The symbol might not support renaming." 313 | ) 314 | return self._apply_workspace_edit(rename_result) 315 | 316 | 317 | class JetBrainsCodeEditor(CodeEditor[JetBrainsSymbol]): 318 | def __init__(self, project: Project, agent: Optional["SerenaAgent"] = None) -> None: 319 | self._project = project 320 | super().__init__(project_root=project.project_root, agent=agent) 321 | 322 | class EditedFile(CodeEditor.EditedFile): 323 | def __init__(self, relative_path: str, project: Project): 324 | path = os.path.join(project.project_root, relative_path) 325 | log.info("Editing file: %s", path) 326 | with open(path, encoding=project.project_config.encoding) as f: 327 | self._content = f.read() 328 | 329 | def get_contents(self) -> str: 330 | return self._content 331 | 332 | def delete_text_between_positions(self, start_pos: PositionInFile, end_pos: PositionInFile) -> None: 333 | self._content, _ = TextUtils.delete_text_between_positions( 334 | self._content, start_pos.line, start_pos.col, end_pos.line, end_pos.col 335 | ) 336 | 337 | def insert_text_at_position(self, pos: PositionInFile, text: str) -> None: 338 | self._content, _, _ = TextUtils.insert_text_at_position(self._content, pos.line, pos.col, text) 339 | 340 | @contextmanager 341 | def _open_file_context(self, relative_path: str) -> Iterator["CodeEditor.EditedFile"]: 342 | yield self.EditedFile(relative_path, self._project) 343 | 344 | def _find_unique_symbol(self, name_path: str, relative_file_path: str) -> JetBrainsSymbol: 345 | with JetBrainsPluginClient.from_project(self._project) as client: 346 | result = client.find_symbol(name_path, relative_path=relative_file_path, include_body=False, depth=0, include_location=True) 347 | symbols = result["symbols"] 348 | if not symbols: 349 | raise ValueError(f"No symbol with name {name_path} found in file {relative_file_path}") 350 | if len(symbols) > 1: 351 | raise ValueError( 352 | f"Found multiple {len(symbols)} symbols with name {name_path} in file {relative_file_path}. " 353 | "Their locations are: \n " + json.dumps([s["location"] for s in symbols], indent=2) 354 | ) 355 | return JetBrainsSymbol(symbols[0], self._project) 356 | 357 | def rename_symbol(self, name_path: str, relative_file_path: str, new_name: str) -> list[str]: 358 | """ 359 | Renames the symbol with the given name throughout the codebase. 360 | Not yet implemented for JetBrains code editor. 361 | """ 362 | raise NotImplementedError("Symbol renaming is not yet supported for JetBrains code editor") 363 | ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/solargraph.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Provides Ruby specific instantiation of the LanguageServer class using Solargraph. 3 | Contains various configurations and settings specific to Ruby. 4 | """ 5 | 6 | import json 7 | import logging 8 | import os 9 | import pathlib 10 | import re 11 | import shutil 12 | import subprocess 13 | import threading 14 | 15 | from overrides import override 16 | 17 | from solidlsp.ls import SolidLanguageServer 18 | from solidlsp.ls_config import LanguageServerConfig 19 | from solidlsp.ls_logger import LanguageServerLogger 20 | from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams 21 | from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo 22 | from solidlsp.settings import SolidLSPSettings 23 | 24 | 25 | class Solargraph(SolidLanguageServer): 26 | """ 27 | Provides Ruby specific instantiation of the LanguageServer class using Solargraph. 28 | Contains various configurations and settings specific to Ruby. 29 | """ 30 | 31 | def __init__( 32 | self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings 33 | ): 34 | """ 35 | Creates a Solargraph instance. This class is not meant to be instantiated directly. 36 | Use LanguageServer.create() instead. 37 | """ 38 | solargraph_executable_path = self._setup_runtime_dependencies(logger, config, repository_root_path) 39 | super().__init__( 40 | config, 41 | logger, 42 | repository_root_path, 43 | ProcessLaunchInfo(cmd=f"{solargraph_executable_path} stdio", cwd=repository_root_path), 44 | "ruby", 45 | solidlsp_settings, 46 | ) 47 | # Override internal language enum for file matching (excludes .erb files) 48 | # while keeping LSP languageId as "ruby" for protocol compliance 49 | from solidlsp.ls_config import Language 50 | 51 | self.language = Language.RUBY_SOLARGRAPH 52 | self.analysis_complete = threading.Event() 53 | self.service_ready_event = threading.Event() 54 | self.initialize_searcher_command_available = threading.Event() 55 | self.resolve_main_method_available = threading.Event() 56 | 57 | # Set timeout for Solargraph requests - Bundler environments may need more time 58 | self.set_request_timeout(120.0) # 120 seconds for initialization and requests 59 | 60 | @override 61 | def is_ignored_dirname(self, dirname: str) -> bool: 62 | ruby_ignored_dirs = [ 63 | "vendor", # Ruby vendor directory 64 | ".bundle", # Bundler cache 65 | "tmp", # Temporary files 66 | "log", # Log files 67 | "coverage", # Test coverage reports 68 | ".yardoc", # YARD documentation cache 69 | "doc", # Generated documentation 70 | "node_modules", # Node modules (for Rails with JS) 71 | "storage", # Active Storage files (Rails) 72 | ] 73 | return super().is_ignored_dirname(dirname) or dirname in ruby_ignored_dirs 74 | 75 | @staticmethod 76 | def _setup_runtime_dependencies(logger: LanguageServerLogger, config: LanguageServerConfig, repository_root_path: str) -> str: 77 | """ 78 | Setup runtime dependencies for Solargraph and return the command to start the server. 79 | """ 80 | # Check if Ruby is installed 81 | try: 82 | result = subprocess.run(["ruby", "--version"], check=True, capture_output=True, cwd=repository_root_path, text=True) 83 | ruby_version = result.stdout.strip() 84 | logger.log(f"Ruby version: {ruby_version}", logging.INFO) 85 | 86 | # Extract version number for compatibility checks 87 | version_match = re.search(r"ruby (\d+)\.(\d+)\.(\d+)", ruby_version) 88 | if version_match: 89 | major, minor, patch = map(int, version_match.groups()) 90 | if major < 2 or (major == 2 and minor < 6): 91 | logger.log(f"Warning: Ruby {major}.{minor}.{patch} detected. Solargraph works best with Ruby 2.6+", logging.WARNING) 92 | 93 | except subprocess.CalledProcessError as e: 94 | error_msg = e.stderr.decode() if e.stderr else "Unknown error" 95 | raise RuntimeError( 96 | f"Error checking Ruby installation: {error_msg}. Please ensure Ruby is properly installed and in PATH." 97 | ) from e 98 | except FileNotFoundError as e: 99 | raise RuntimeError( 100 | "Ruby is not installed or not found in PATH. Please install Ruby using one of these methods:\n" 101 | " - Using rbenv: rbenv install 3.0.0 && rbenv global 3.0.0\n" 102 | " - Using RVM: rvm install 3.0.0 && rvm use 3.0.0 --default\n" 103 | " - Using asdf: asdf install ruby 3.0.0 && asdf global ruby 3.0.0\n" 104 | " - System package manager (brew install ruby, apt install ruby, etc.)" 105 | ) from e 106 | 107 | # Helper function for Windows-compatible executable search 108 | def find_executable_with_extensions(executable_name: str) -> str | None: 109 | """Find executable with Windows-specific extensions if on Windows.""" 110 | import platform 111 | 112 | if platform.system() == "Windows": 113 | for ext in [".bat", ".cmd", ".exe"]: 114 | path = shutil.which(f"{executable_name}{ext}") 115 | if path: 116 | return path 117 | return shutil.which(executable_name) 118 | else: 119 | return shutil.which(executable_name) 120 | 121 | # Check for Bundler project (Gemfile exists) 122 | gemfile_path = os.path.join(repository_root_path, "Gemfile") 123 | gemfile_lock_path = os.path.join(repository_root_path, "Gemfile.lock") 124 | is_bundler_project = os.path.exists(gemfile_path) 125 | 126 | if is_bundler_project: 127 | logger.log("Detected Bundler project (Gemfile found)", logging.INFO) 128 | 129 | # Check if bundle command is available 130 | bundle_path = find_executable_with_extensions("bundle") 131 | if not bundle_path: 132 | # Try common bundle executables 133 | for bundle_cmd in ["bin/bundle", "bundle"]: 134 | if bundle_cmd.startswith("bin/"): 135 | bundle_full_path = os.path.join(repository_root_path, bundle_cmd) 136 | else: 137 | bundle_full_path = find_executable_with_extensions(bundle_cmd) 138 | if bundle_full_path and os.path.exists(bundle_full_path): 139 | bundle_path = bundle_full_path if bundle_cmd.startswith("bin/") else bundle_cmd 140 | break 141 | 142 | if not bundle_path: 143 | raise RuntimeError( 144 | "Bundler project detected but 'bundle' command not found. Please install Bundler:\n" 145 | " - gem install bundler\n" 146 | " - Or use your Ruby version manager's bundler installation\n" 147 | " - Ensure the bundle command is in your PATH" 148 | ) 149 | 150 | # Check if solargraph is in Gemfile.lock 151 | solargraph_in_bundle = False 152 | if os.path.exists(gemfile_lock_path): 153 | try: 154 | with open(gemfile_lock_path) as f: 155 | content = f.read() 156 | solargraph_in_bundle = "solargraph" in content.lower() 157 | except Exception as e: 158 | logger.log(f"Warning: Could not read Gemfile.lock: {e}", logging.WARNING) 159 | 160 | if solargraph_in_bundle: 161 | logger.log("Found solargraph in Gemfile.lock", logging.INFO) 162 | return f"{bundle_path} exec solargraph" 163 | else: 164 | logger.log( 165 | "solargraph not found in Gemfile.lock. Please add 'gem \"solargraph\"' to your Gemfile and run 'bundle install'", 166 | logging.WARNING, 167 | ) 168 | # Fall through to global installation check 169 | 170 | # Check if solargraph is installed globally 171 | # First, try to find solargraph in PATH (includes asdf shims) with Windows support 172 | solargraph_path = find_executable_with_extensions("solargraph") 173 | if solargraph_path: 174 | logger.log(f"Found solargraph at: {solargraph_path}", logging.INFO) 175 | return solargraph_path 176 | 177 | # Fallback to gem exec (for non-Bundler projects or when global solargraph not found) 178 | if not is_bundler_project: 179 | runtime_dependencies = [ 180 | { 181 | "url": "https://rubygems.org/downloads/solargraph-0.51.1.gem", 182 | "installCommand": "gem install solargraph -v 0.51.1", 183 | "binaryName": "solargraph", 184 | "archiveType": "gem", 185 | } 186 | ] 187 | 188 | dependency = runtime_dependencies[0] 189 | try: 190 | result = subprocess.run( 191 | ["gem", "list", "^solargraph$", "-i"], check=False, capture_output=True, text=True, cwd=repository_root_path 192 | ) 193 | if result.stdout.strip() == "false": 194 | logger.log("Installing Solargraph...", logging.INFO) 195 | subprocess.run(dependency["installCommand"].split(), check=True, capture_output=True, cwd=repository_root_path) 196 | 197 | return "gem exec solargraph" 198 | except subprocess.CalledProcessError as e: 199 | error_msg = e.stderr.decode() if e.stderr else str(e) 200 | raise RuntimeError( 201 | f"Failed to check or install Solargraph: {error_msg}\nPlease try installing manually: gem install solargraph" 202 | ) from e 203 | else: 204 | raise RuntimeError( 205 | "This appears to be a Bundler project, but solargraph is not available. " 206 | "Please add 'gem \"solargraph\"' to your Gemfile and run 'bundle install'." 207 | ) 208 | 209 | @staticmethod 210 | def _detect_rails_project(repository_root_path: str) -> bool: 211 | """ 212 | Detect if this is a Rails project by checking for Rails-specific files. 213 | """ 214 | rails_indicators = [ 215 | "config/application.rb", 216 | "config/environment.rb", 217 | "app/controllers/application_controller.rb", 218 | "Rakefile", 219 | ] 220 | 221 | for indicator in rails_indicators: 222 | if os.path.exists(os.path.join(repository_root_path, indicator)): 223 | return True 224 | 225 | # Check for Rails in Gemfile 226 | gemfile_path = os.path.join(repository_root_path, "Gemfile") 227 | if os.path.exists(gemfile_path): 228 | try: 229 | with open(gemfile_path) as f: 230 | content = f.read().lower() 231 | if "gem 'rails'" in content or 'gem "rails"' in content: 232 | return True 233 | except Exception: 234 | pass 235 | 236 | return False 237 | 238 | @staticmethod 239 | def _get_ruby_exclude_patterns(repository_root_path: str) -> list[str]: 240 | """ 241 | Get Ruby and Rails-specific exclude patterns for better performance. 242 | """ 243 | base_patterns = [ 244 | "**/vendor/**", # Ruby vendor directory (similar to node_modules) 245 | "**/.bundle/**", # Bundler cache 246 | "**/tmp/**", # Temporary files 247 | "**/log/**", # Log files 248 | "**/coverage/**", # Test coverage reports 249 | "**/.yardoc/**", # YARD documentation cache 250 | "**/doc/**", # Generated documentation 251 | "**/.git/**", # Git directory 252 | "**/node_modules/**", # Node modules (for Rails with JS) 253 | "**/public/assets/**", # Rails compiled assets 254 | ] 255 | 256 | # Add Rails-specific patterns if this is a Rails project 257 | if Solargraph._detect_rails_project(repository_root_path): 258 | rails_patterns = [ 259 | "**/public/packs/**", # Webpacker output 260 | "**/public/webpack/**", # Webpack output 261 | "**/storage/**", # Active Storage files 262 | "**/tmp/cache/**", # Rails cache 263 | "**/tmp/pids/**", # Process IDs 264 | "**/tmp/sessions/**", # Session files 265 | "**/tmp/sockets/**", # Socket files 266 | "**/db/*.sqlite3", # SQLite databases 267 | ] 268 | base_patterns.extend(rails_patterns) 269 | 270 | return base_patterns 271 | 272 | @staticmethod 273 | def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: 274 | """ 275 | Returns the initialize params for the Solargraph Language Server. 276 | """ 277 | root_uri = pathlib.Path(repository_absolute_path).as_uri() 278 | exclude_patterns = Solargraph._get_ruby_exclude_patterns(repository_absolute_path) 279 | 280 | initialize_params: InitializeParams = { # type: ignore 281 | "processId": os.getpid(), 282 | "rootPath": repository_absolute_path, 283 | "rootUri": root_uri, 284 | "initializationOptions": { 285 | "exclude": exclude_patterns, 286 | }, 287 | "capabilities": { 288 | "workspace": { 289 | "workspaceEdit": {"documentChanges": True}, 290 | }, 291 | "textDocument": { 292 | "documentSymbol": { 293 | "hierarchicalDocumentSymbolSupport": True, 294 | "symbolKind": {"valueSet": list(range(1, 27))}, 295 | }, 296 | }, 297 | }, 298 | "trace": "verbose", 299 | "workspaceFolders": [ 300 | { 301 | "uri": root_uri, 302 | "name": os.path.basename(repository_absolute_path), 303 | } 304 | ], 305 | } 306 | return initialize_params 307 | 308 | def _start_server(self): 309 | """ 310 | Starts the Solargraph Language Server for Ruby 311 | """ 312 | 313 | def register_capability_handler(params): 314 | assert "registrations" in params 315 | for registration in params["registrations"]: 316 | if registration["method"] == "workspace/executeCommand": 317 | self.initialize_searcher_command_available.set() 318 | self.resolve_main_method_available.set() 319 | return 320 | 321 | def lang_status_handler(params): 322 | self.logger.log(f"LSP: language/status: {params}", logging.INFO) 323 | if params.get("type") == "ServiceReady" and params.get("message") == "Service is ready.": 324 | self.logger.log("Solargraph service is ready.", logging.INFO) 325 | self.analysis_complete.set() 326 | self.completions_available.set() 327 | 328 | def execute_client_command_handler(params): 329 | return [] 330 | 331 | def do_nothing(params): 332 | return 333 | 334 | def window_log_message(msg): 335 | self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) 336 | 337 | self.server.on_request("client/registerCapability", register_capability_handler) 338 | self.server.on_notification("language/status", lang_status_handler) 339 | self.server.on_notification("window/logMessage", window_log_message) 340 | self.server.on_request("workspace/executeClientCommand", execute_client_command_handler) 341 | self.server.on_notification("$/progress", do_nothing) 342 | self.server.on_notification("textDocument/publishDiagnostics", do_nothing) 343 | self.server.on_notification("language/actionableNotification", do_nothing) 344 | 345 | self.logger.log("Starting solargraph server process", logging.INFO) 346 | self.server.start() 347 | initialize_params = self._get_initialize_params(self.repository_root_path) 348 | 349 | self.logger.log( 350 | "Sending initialize request from LSP client to LSP server and awaiting response", 351 | logging.INFO, 352 | ) 353 | self.logger.log(f"Sending init params: {json.dumps(initialize_params, indent=4)}", logging.INFO) 354 | init_response = self.server.send.initialize(initialize_params) 355 | self.logger.log(f"Received init response: {init_response}", logging.INFO) 356 | assert init_response["capabilities"]["textDocumentSync"] == 2 357 | assert "completionProvider" in init_response["capabilities"] 358 | assert init_response["capabilities"]["completionProvider"] == { 359 | "resolveProvider": True, 360 | "triggerCharacters": [".", ":", "@"], 361 | } 362 | self.server.notify.initialized({}) 363 | 364 | # Wait for Solargraph to complete its initial workspace analysis 365 | # This prevents issues by ensuring background tasks finish 366 | self.logger.log("Waiting for Solargraph to complete initial workspace analysis...", logging.INFO) 367 | if self.analysis_complete.wait(timeout=60.0): 368 | self.logger.log("Solargraph initial analysis complete, server ready", logging.INFO) 369 | else: 370 | self.logger.log("Timeout waiting for Solargraph analysis completion, proceeding anyway", logging.WARNING) 371 | # Fallback: assume analysis is complete after timeout 372 | self.analysis_complete.set() 373 | self.completions_available.set() 374 | ``` -------------------------------------------------------------------------------- /test/solidlsp/zig/test_zig_basic.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Basic integration tests for Zig language server functionality. 3 | 4 | These tests validate symbol finding and navigation capabilities using the Zig Language Server (ZLS). 5 | Note: ZLS requires files to be open in the editor to find cross-file references (performance optimization). 6 | """ 7 | 8 | import os 9 | import sys 10 | 11 | import pytest 12 | 13 | from solidlsp import SolidLanguageServer 14 | from solidlsp.ls_config import Language 15 | from solidlsp.ls_types import SymbolKind 16 | 17 | 18 | @pytest.mark.zig 19 | @pytest.mark.skipif( 20 | sys.platform == "win32", reason="ZLS is disabled on Windows - cross-file references don't work reliably. Reason unknown." 21 | ) 22 | class TestZigLanguageServer: 23 | """Test Zig language server symbol finding and navigation capabilities. 24 | 25 | NOTE: All tests are skipped on Windows as ZLS is disabled on that platform 26 | due to unreliable cross-file reference functionality. Reason unknown. 27 | """ 28 | 29 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 30 | def test_find_symbols_in_main(self, language_server: SolidLanguageServer) -> None: 31 | """Test finding specific symbols in main.zig.""" 32 | file_path = os.path.join("src", "main.zig") 33 | symbols = language_server.request_document_symbols(file_path) 34 | 35 | assert symbols is not None 36 | assert len(symbols) > 0 37 | 38 | # Extract symbol names from the returned structure 39 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 40 | symbol_names = {sym.get("name") for sym in symbol_list if isinstance(sym, dict)} 41 | 42 | # Verify specific symbols exist 43 | assert "main" in symbol_names, "main function not found" 44 | assert "greeting" in symbol_names, "greeting function not found" 45 | 46 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 47 | def test_find_symbols_in_calculator(self, language_server: SolidLanguageServer) -> None: 48 | """Test finding Calculator struct and its methods.""" 49 | file_path = os.path.join("src", "calculator.zig") 50 | symbols = language_server.request_document_symbols(file_path) 51 | 52 | assert symbols is not None 53 | assert len(symbols) > 0 54 | 55 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 56 | 57 | # Find Calculator struct 58 | calculator_symbol = None 59 | for sym in symbol_list: 60 | if sym.get("name") == "Calculator": 61 | calculator_symbol = sym 62 | break 63 | 64 | assert calculator_symbol is not None, "Calculator struct not found" 65 | # ZLS may use different symbol kinds for structs (14 = Namespace, 5 = Class, 23 = Struct) 66 | assert calculator_symbol.get("kind") in [ 67 | SymbolKind.Class, 68 | SymbolKind.Struct, 69 | SymbolKind.Namespace, 70 | 5, 71 | 14, 72 | 23, 73 | ], "Calculator should be a struct/class/namespace" 74 | 75 | # Check for Calculator methods (init, add, subtract, etc.) 76 | # Methods might be in children or at the same level 77 | all_symbols = [] 78 | for sym in symbol_list: 79 | all_symbols.append(sym.get("name")) 80 | if "children" in sym: 81 | for child in sym["children"]: 82 | all_symbols.append(child.get("name")) 83 | 84 | # Verify exact calculator methods exist 85 | expected_methods = {"init", "add", "subtract", "multiply", "divide"} 86 | found_methods = set(all_symbols) & expected_methods 87 | assert found_methods == expected_methods, f"Expected exactly {expected_methods}, found: {found_methods}" 88 | 89 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 90 | def test_find_symbols_in_math_utils(self, language_server: SolidLanguageServer) -> None: 91 | """Test finding functions in math_utils.zig.""" 92 | file_path = os.path.join("src", "math_utils.zig") 93 | symbols = language_server.request_document_symbols(file_path) 94 | 95 | assert symbols is not None 96 | assert len(symbols) > 0 97 | 98 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 99 | symbol_names = {sym.get("name") for sym in symbol_list if isinstance(sym, dict)} 100 | 101 | # Verify math utility functions exist 102 | assert "factorial" in symbol_names, "factorial function not found" 103 | assert "isPrime" in symbol_names, "isPrime function not found" 104 | 105 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 106 | def test_find_references_within_file(self, language_server: SolidLanguageServer) -> None: 107 | """Test finding references within the same file.""" 108 | file_path = os.path.join("src", "calculator.zig") 109 | symbols = language_server.request_document_symbols(file_path) 110 | 111 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 112 | 113 | # Find Calculator struct 114 | calculator_symbol = None 115 | for sym in symbol_list: 116 | if sym.get("name") == "Calculator": 117 | calculator_symbol = sym 118 | break 119 | 120 | assert calculator_symbol is not None, "Calculator struct not found" 121 | 122 | # Find references to Calculator within the same file 123 | sel_range = calculator_symbol.get("selectionRange", calculator_symbol.get("range")) 124 | assert sel_range is not None, "Calculator symbol has no range information" 125 | 126 | sel_start = sel_range["start"] 127 | refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) 128 | 129 | assert refs is not None 130 | assert isinstance(refs, list) 131 | # ZLS finds references within the same file 132 | # Calculator is used in 4 test usages (lines 45, 51, 57, 63) 133 | # Note: ZLS may not include the declaration itself as a reference 134 | assert len(refs) >= 4, f"Should find at least 4 Calculator references within calculator.zig, found {len(refs)}" 135 | 136 | # Verify we found the test usages 137 | ref_lines = sorted([ref["range"]["start"]["line"] for ref in refs]) 138 | test_lines = [44, 50, 56, 62] # 0-indexed: tests at lines 45, 51, 57, 63 139 | for line in test_lines: 140 | assert line in ref_lines, f"Should find Calculator reference at line {line + 1}, found at lines {[l + 1 for l in ref_lines]}" 141 | 142 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 143 | @pytest.mark.skipif( 144 | sys.platform == "win32", reason="ZLS cross-file references don't work reliably on Windows - URI path handling issues" 145 | ) 146 | def test_cross_file_references_with_open_files(self, language_server: SolidLanguageServer) -> None: 147 | """ 148 | Test finding cross-file references with files open. 149 | 150 | ZLS limitation: Cross-file references (textDocument/references) only work when 151 | target files are open. This is a performance optimization in ZLS. 152 | 153 | NOTE: Disabled on Windows as cross-file references cannot be made to work reliably 154 | due to URI path handling differences between Windows and Unix systems. 155 | """ 156 | import time 157 | 158 | # Open the files that contain references to enable cross-file search 159 | with language_server.open_file("build.zig"): 160 | with language_server.open_file(os.path.join("src", "main.zig")): 161 | with language_server.open_file(os.path.join("src", "calculator.zig")): 162 | # Give ZLS a moment to analyze the open files 163 | time.sleep(1) 164 | 165 | # Find Calculator struct 166 | symbols = language_server.request_document_symbols(os.path.join("src", "calculator.zig")) 167 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 168 | 169 | calculator_symbol = None 170 | for sym in symbol_list: 171 | if sym.get("name") == "Calculator": 172 | calculator_symbol = sym 173 | break 174 | 175 | assert calculator_symbol is not None, "Calculator struct not found" 176 | 177 | sel_range = calculator_symbol.get("selectionRange", calculator_symbol.get("range")) 178 | assert sel_range is not None, "Calculator symbol has no range information" 179 | 180 | # Find references to Calculator 181 | sel_start = sel_range["start"] 182 | refs = language_server.request_references( 183 | os.path.join("src", "calculator.zig"), sel_start["line"], sel_start["character"] 184 | ) 185 | 186 | assert refs is not None 187 | assert isinstance(refs, list) 188 | 189 | # With files open, ZLS should find cross-file references 190 | main_refs = [ref for ref in refs if "main.zig" in ref.get("uri", "")] 191 | 192 | assert len(main_refs) >= 1, f"Should find at least 1 Calculator reference in main.zig, found {len(main_refs)}" 193 | 194 | # Verify exact location in main.zig (line 8, 0-indexed: 7) 195 | main_ref_line = main_refs[0]["range"]["start"]["line"] 196 | assert ( 197 | main_ref_line == 7 198 | ), f"Calculator reference in main.zig should be at line 8 (0-indexed: 7), found at line {main_ref_line + 1}" 199 | 200 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 201 | def test_cross_file_references_within_file(self, language_server: SolidLanguageServer) -> None: 202 | """ 203 | Test that ZLS finds references within the same file. 204 | 205 | Note: ZLS is designed to be lightweight and only analyzes files that are explicitly opened. 206 | Cross-file references require manually opening the relevant files first. 207 | """ 208 | # Find references to Calculator from calculator.zig 209 | file_path = os.path.join("src", "calculator.zig") 210 | symbols = language_server.request_document_symbols(file_path) 211 | symbol_list = symbols[0] if isinstance(symbols, tuple) else symbols 212 | 213 | calculator_symbol = None 214 | for sym in symbol_list: 215 | if sym.get("name") == "Calculator": 216 | calculator_symbol = sym 217 | break 218 | 219 | assert calculator_symbol is not None, "Calculator struct not found" 220 | 221 | sel_range = calculator_symbol.get("selectionRange", calculator_symbol.get("range")) 222 | assert sel_range is not None, "Calculator symbol has no range information" 223 | 224 | sel_start = sel_range["start"] 225 | refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) 226 | 227 | assert refs is not None 228 | assert isinstance(refs, list) 229 | 230 | # ZLS finds references within the same file 231 | # Calculator is used in 4 test usages (lines 45, 51, 57, 63) 232 | # Note: ZLS may not include the declaration itself as a reference 233 | assert len(refs) >= 4, f"Should find at least 4 Calculator references within calculator.zig, found {len(refs)}" 234 | 235 | # Verify we found the test usages 236 | ref_lines = sorted([ref["range"]["start"]["line"] for ref in refs]) 237 | test_lines = [44, 50, 56, 62] # 0-indexed: tests at lines 45, 51, 57, 63 238 | for line in test_lines: 239 | assert line in ref_lines, f"Should find Calculator reference at line {line + 1}, found at lines {[l + 1 for l in ref_lines]}" 240 | 241 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 242 | @pytest.mark.skipif( 243 | sys.platform == "win32", reason="ZLS cross-file references don't work reliably on Windows - URI path handling issues" 244 | ) 245 | def test_go_to_definition_cross_file(self, language_server: SolidLanguageServer) -> None: 246 | """ 247 | Test go-to-definition from main.zig to calculator.zig. 248 | 249 | ZLS capability: Go-to-definition (textDocument/definition) works cross-file 250 | WITHOUT requiring files to be open. 251 | 252 | NOTE: Disabled on Windows as cross-file references cannot be made to work reliably 253 | due to URI path handling differences between Windows and Unix systems. 254 | """ 255 | file_path = os.path.join("src", "main.zig") 256 | 257 | # Line 8: const calc = calculator.Calculator.init(); 258 | # Test go-to-definition for Calculator 259 | definitions = language_server.request_definition(file_path, 7, 25) # Position of "Calculator" 260 | 261 | assert definitions is not None 262 | assert isinstance(definitions, list) 263 | assert len(definitions) > 0, "Should find definition of Calculator" 264 | 265 | # Should point to calculator.zig 266 | calc_def = definitions[0] 267 | assert "calculator.zig" in calc_def.get("uri", ""), "Definition should be in calculator.zig" 268 | 269 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 270 | @pytest.mark.skipif( 271 | sys.platform == "win32", reason="ZLS cross-file references don't work reliably on Windows - URI path handling issues" 272 | ) 273 | def test_cross_file_function_usage(self, language_server: SolidLanguageServer) -> None: 274 | """Test finding usage of functions from math_utils in main.zig. 275 | 276 | NOTE: Disabled on Windows as cross-file references cannot be made to work reliably 277 | due to URI path handling differences between Windows and Unix systems. 278 | """ 279 | # Line 23 in main.zig: const factorial_result = math_utils.factorial(5); 280 | definitions = language_server.request_definition(os.path.join("src", "main.zig"), 22, 40) # Position of "factorial" 281 | 282 | assert definitions is not None 283 | assert isinstance(definitions, list) 284 | 285 | if len(definitions) > 0: 286 | # Should find factorial definition in math_utils.zig 287 | math_def = [d for d in definitions if "math_utils.zig" in d.get("uri", "")] 288 | assert len(math_def) > 0, "Should find factorial definition in math_utils.zig" 289 | 290 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 291 | def test_verify_cross_file_imports(self, language_server: SolidLanguageServer) -> None: 292 | """Verify that our test files have proper cross-file imports.""" 293 | # Verify main.zig imports 294 | main_symbols = language_server.request_document_symbols(os.path.join("src", "main.zig")) 295 | assert main_symbols is not None 296 | main_list = main_symbols[0] if isinstance(main_symbols, tuple) else main_symbols 297 | main_names = {sym.get("name") for sym in main_list if isinstance(sym, dict)} 298 | 299 | # main.zig should have main and greeting functions 300 | assert "main" in main_names, "main function should be in main.zig" 301 | assert "greeting" in main_names, "greeting function should be in main.zig" 302 | 303 | # Verify calculator.zig exports Calculator 304 | calc_symbols = language_server.request_document_symbols(os.path.join("src", "calculator.zig")) 305 | assert calc_symbols is not None 306 | calc_list = calc_symbols[0] if isinstance(calc_symbols, tuple) else calc_symbols 307 | calc_names = {sym.get("name") for sym in calc_list if isinstance(sym, dict)} 308 | assert "Calculator" in calc_names, "Calculator struct should be in calculator.zig" 309 | 310 | # Verify math_utils.zig exports functions 311 | math_symbols = language_server.request_document_symbols(os.path.join("src", "math_utils.zig")) 312 | assert math_symbols is not None 313 | math_list = math_symbols[0] if isinstance(math_symbols, tuple) else math_symbols 314 | math_names = {sym.get("name") for sym in math_list if isinstance(sym, dict)} 315 | assert "factorial" in math_names, "factorial function should be in math_utils.zig" 316 | assert "isPrime" in math_names, "isPrime function should be in math_utils.zig" 317 | 318 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 319 | def test_hover_information(self, language_server: SolidLanguageServer) -> None: 320 | """Test hover information for symbols.""" 321 | file_path = os.path.join("src", "main.zig") 322 | 323 | # Get hover info for the main function 324 | hover_info = language_server.request_hover(file_path, 4, 8) # Position of "main" function 325 | 326 | assert hover_info is not None, "Should provide hover information for main function" 327 | 328 | # Hover info could be a dict with 'contents' or a string 329 | if isinstance(hover_info, dict): 330 | assert "contents" in hover_info or "value" in hover_info, "Hover should have contents" 331 | 332 | @pytest.mark.parametrize("language_server", [Language.ZIG], indirect=True) 333 | def test_full_symbol_tree(self, language_server: SolidLanguageServer) -> None: 334 | """Test that full symbol tree is not empty.""" 335 | symbols = language_server.request_full_symbol_tree() 336 | 337 | assert symbols is not None 338 | assert len(symbols) > 0, "Symbol tree should not be empty" 339 | 340 | # The tree should have at least one root node 341 | root = symbols[0] 342 | assert isinstance(root, dict), "Root should be a dict" 343 | assert "name" in root, "Root should have a name" 344 | ``` -------------------------------------------------------------------------------- /test/solidlsp/elixir/test_elixir_symbol_retrieval.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Tests for the Elixir language server symbol-related functionality. 3 | 4 | These tests focus on the following methods: 5 | - request_containing_symbol 6 | - request_referencing_symbols 7 | - request_defining_symbol 8 | """ 9 | 10 | import os 11 | 12 | import pytest 13 | 14 | from solidlsp import SolidLanguageServer 15 | from solidlsp.ls_config import Language 16 | from solidlsp.ls_types import SymbolKind 17 | 18 | from . import NEXTLS_UNAVAILABLE, NEXTLS_UNAVAILABLE_REASON 19 | 20 | # These marks will be applied to all tests in this module 21 | pytestmark = [pytest.mark.elixir, pytest.mark.skipif(NEXTLS_UNAVAILABLE, reason=f"Next LS not available: {NEXTLS_UNAVAILABLE_REASON}")] 22 | 23 | 24 | class TestElixirLanguageServerSymbols: 25 | """Test the Elixir language server's symbol-related functionality.""" 26 | 27 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 28 | def test_request_containing_symbol_function(self, language_server: SolidLanguageServer) -> None: 29 | """Test request_containing_symbol for a function.""" 30 | # Test for a position inside the create_user function 31 | file_path = os.path.join("lib", "services.ex") 32 | 33 | # Find the create_user function in the file 34 | content = language_server.retrieve_full_file_content(file_path) 35 | lines = content.split("\n") 36 | create_user_line = None 37 | for i, line in enumerate(lines): 38 | if "def create_user(" in line: 39 | create_user_line = i + 2 # Go inside the function body 40 | break 41 | 42 | if create_user_line is None: 43 | pytest.skip("Could not find create_user function") 44 | 45 | containing_symbol = language_server.request_containing_symbol(file_path, create_user_line, 10, include_body=True) 46 | 47 | # Verify that we found the containing symbol 48 | if containing_symbol: 49 | # Next LS returns the full function signature instead of just the function name 50 | assert containing_symbol["name"] == "def create_user(pid, id, name, email, roles \\\\ [])" 51 | assert containing_symbol["kind"] == SymbolKind.Method or containing_symbol["kind"] == SymbolKind.Function 52 | if "body" in containing_symbol: 53 | assert "def create_user" in containing_symbol["body"] 54 | 55 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 56 | def test_request_containing_symbol_module(self, language_server: SolidLanguageServer) -> None: 57 | """Test request_containing_symbol for a module.""" 58 | # Test for a position inside the UserService module but outside any function 59 | file_path = os.path.join("lib", "services.ex") 60 | 61 | # Find the UserService module definition 62 | content = language_server.retrieve_full_file_content(file_path) 63 | lines = content.split("\n") 64 | user_service_line = None 65 | for i, line in enumerate(lines): 66 | if "defmodule UserService do" in line: 67 | user_service_line = i + 1 # Go inside the module 68 | break 69 | 70 | if user_service_line is None: 71 | pytest.skip("Could not find UserService module") 72 | 73 | containing_symbol = language_server.request_containing_symbol(file_path, user_service_line, 5) 74 | 75 | # Verify that we found the containing symbol 76 | if containing_symbol: 77 | assert "UserService" in containing_symbol["name"] 78 | assert containing_symbol["kind"] == SymbolKind.Module or containing_symbol["kind"] == SymbolKind.Class 79 | 80 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 81 | def test_request_containing_symbol_nested(self, language_server: SolidLanguageServer) -> None: 82 | """Test request_containing_symbol with nested scopes.""" 83 | # Test for a position inside a function which is inside a module 84 | file_path = os.path.join("lib", "services.ex") 85 | 86 | # Find a function inside UserService 87 | content = language_server.retrieve_full_file_content(file_path) 88 | lines = content.split("\n") 89 | function_body_line = None 90 | for i, line in enumerate(lines): 91 | if "def create_user(" in line: 92 | function_body_line = i + 3 # Go deeper into the function body 93 | break 94 | 95 | if function_body_line is None: 96 | pytest.skip("Could not find function body") 97 | 98 | containing_symbol = language_server.request_containing_symbol(file_path, function_body_line, 15) 99 | 100 | # Verify that we found the innermost containing symbol (the function) 101 | if containing_symbol: 102 | expected_names = ["create_user", "UserService"] 103 | assert any(name in containing_symbol["name"] for name in expected_names) 104 | 105 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 106 | def test_request_containing_symbol_none(self, language_server: SolidLanguageServer) -> None: 107 | """Test request_containing_symbol for a position with no containing symbol.""" 108 | # Test for a position outside any function/module (e.g., in module doc) 109 | file_path = os.path.join("lib", "services.ex") 110 | # Line 1-3 are likely in module documentation or imports 111 | containing_symbol = language_server.request_containing_symbol(file_path, 2, 10) 112 | 113 | # Should return None or an empty dictionary, or the top-level module 114 | # This is acceptable behavior for module-level positions 115 | assert containing_symbol is None or containing_symbol == {} or "TestRepo.Services" in str(containing_symbol) 116 | 117 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 118 | def test_request_referencing_symbols_struct(self, language_server: SolidLanguageServer) -> None: 119 | """Test request_referencing_symbols for a struct.""" 120 | # Test referencing symbols for User struct 121 | file_path = os.path.join("lib", "models.ex") 122 | 123 | symbols = language_server.request_document_symbols(file_path) 124 | user_symbol = None 125 | for symbol_group in symbols: 126 | user_symbol = next((s for s in symbol_group if "User" in s.get("name", "")), None) 127 | if user_symbol: 128 | break 129 | 130 | if not user_symbol or "selectionRange" not in user_symbol: 131 | pytest.skip("User symbol or its selectionRange not found") 132 | 133 | sel_start = user_symbol["selectionRange"]["start"] 134 | ref_symbols = [ 135 | ref.symbol for ref in language_server.request_referencing_symbols(file_path, sel_start["line"], sel_start["character"]) 136 | ] 137 | 138 | if ref_symbols: 139 | services_references = [ 140 | symbol 141 | for symbol in ref_symbols 142 | if "location" in symbol and "uri" in symbol["location"] and "services.ex" in symbol["location"]["uri"] 143 | ] 144 | # We expect some references from services.ex 145 | assert len(services_references) >= 0 # At least attempt to find references 146 | 147 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 148 | def test_request_referencing_symbols_none(self, language_server: SolidLanguageServer) -> None: 149 | """Test request_referencing_symbols for a position with no symbol.""" 150 | file_path = os.path.join("lib", "services.ex") 151 | # Line 3 is likely a blank line or comment 152 | try: 153 | ref_symbols = [ref.symbol for ref in language_server.request_referencing_symbols(file_path, 3, 0)] 154 | # If we get here, make sure we got an empty result 155 | assert ref_symbols == [] or ref_symbols is None 156 | except Exception: 157 | # The method might raise an exception for invalid positions 158 | # which is acceptable behavior 159 | pass 160 | 161 | # Tests for request_defining_symbol 162 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 163 | def test_request_defining_symbol_function_call(self, language_server: SolidLanguageServer) -> None: 164 | """Test request_defining_symbol for a function call.""" 165 | # Find a place where User.new is called in services.ex 166 | file_path = os.path.join("lib", "services.ex") 167 | content = language_server.retrieve_full_file_content(file_path) 168 | lines = content.split("\n") 169 | user_new_call_line = None 170 | for i, line in enumerate(lines): 171 | if "User.new(" in line: 172 | user_new_call_line = i 173 | break 174 | 175 | if user_new_call_line is None: 176 | pytest.skip("Could not find User.new call") 177 | 178 | # Try to find the definition of User.new 179 | defining_symbol = language_server.request_defining_symbol(file_path, user_new_call_line, 15) 180 | 181 | if defining_symbol: 182 | assert defining_symbol.get("name") == "new" or "User" in defining_symbol.get("name", "") 183 | if "location" in defining_symbol and "uri" in defining_symbol["location"]: 184 | assert "models.ex" in defining_symbol["location"]["uri"] 185 | 186 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 187 | def test_request_defining_symbol_struct_usage(self, language_server: SolidLanguageServer) -> None: 188 | """Test request_defining_symbol for a struct usage.""" 189 | # Find a place where User struct is used in services.ex 190 | file_path = os.path.join("lib", "services.ex") 191 | content = language_server.retrieve_full_file_content(file_path) 192 | lines = content.split("\n") 193 | user_usage_line = None 194 | for i, line in enumerate(lines): 195 | if "alias TestRepo.Models.{User" in line: 196 | user_usage_line = i 197 | break 198 | 199 | if user_usage_line is None: 200 | pytest.skip("Could not find User struct usage") 201 | 202 | defining_symbol = language_server.request_defining_symbol(file_path, user_usage_line, 30) 203 | 204 | if defining_symbol: 205 | assert "User" in defining_symbol.get("name", "") 206 | 207 | @pytest.mark.xfail( 208 | reason="Known intermittent bug in Next LS v0.23.3: Protocol.UndefinedError for :timeout atom. " 209 | "Occurs in CI environments but may pass locally. " 210 | "See https://github.com/elixir-tools/next-ls/issues/543", 211 | strict=False, 212 | ) 213 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 214 | def test_request_defining_symbol_none(self, language_server: SolidLanguageServer) -> None: 215 | """Test request_defining_symbol for a position with no symbol.""" 216 | # Test for a position with no symbol (e.g., whitespace or comment) 217 | file_path = os.path.join("lib", "services.ex") 218 | # Line 3 is likely a blank line 219 | defining_symbol = language_server.request_defining_symbol(file_path, 3, 0) 220 | 221 | # Should return None or empty 222 | assert defining_symbol is None or defining_symbol == {} 223 | 224 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 225 | def test_symbol_methods_integration(self, language_server: SolidLanguageServer) -> None: 226 | """Test integration between different symbol methods.""" 227 | file_path = os.path.join("lib", "models.ex") 228 | 229 | # Find User struct definition 230 | content = language_server.retrieve_full_file_content(file_path) 231 | lines = content.split("\n") 232 | user_struct_line = None 233 | for i, line in enumerate(lines): 234 | if "defmodule User do" in line: 235 | user_struct_line = i 236 | break 237 | 238 | if user_struct_line is None: 239 | pytest.skip("Could not find User struct") 240 | 241 | # Test containing symbol 242 | containing = language_server.request_containing_symbol(file_path, user_struct_line + 5, 10) 243 | 244 | if containing: 245 | # Test that we can find references to this symbol 246 | if "location" in containing and "range" in containing["location"]: 247 | start_pos = containing["location"]["range"]["start"] 248 | refs = [ 249 | ref.symbol for ref in language_server.request_referencing_symbols(file_path, start_pos["line"], start_pos["character"]) 250 | ] 251 | # We should find some references or none (both are valid outcomes) 252 | assert isinstance(refs, list) 253 | 254 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 255 | def test_symbol_tree_structure(self, language_server: SolidLanguageServer) -> None: 256 | """Test that symbol tree structure is correctly built.""" 257 | symbol_tree = language_server.request_full_symbol_tree() 258 | 259 | # Should get a tree structure 260 | assert len(symbol_tree) > 0 261 | 262 | # Should have our test repository structure 263 | root = symbol_tree[0] 264 | assert "children" in root 265 | 266 | # Look for lib directory 267 | lib_dir = None 268 | for child in root["children"]: 269 | if child["name"] == "lib": 270 | lib_dir = child 271 | break 272 | 273 | if lib_dir: 274 | # Next LS returns module names instead of file names (e.g., 'services' instead of 'services.ex') 275 | file_names = [child["name"] for child in lib_dir.get("children", [])] 276 | expected_modules = ["models", "services", "examples", "utils", "test_repo"] 277 | found_modules = [name for name in expected_modules if name in file_names] 278 | assert len(found_modules) > 0, f"Expected to find some modules from {expected_modules}, but got {file_names}" 279 | 280 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 281 | def test_request_dir_overview(self, language_server: SolidLanguageServer) -> None: 282 | """Test request_dir_overview functionality.""" 283 | lib_overview = language_server.request_dir_overview("lib") 284 | 285 | # Should get an overview of the lib directory 286 | assert lib_overview is not None 287 | # Next LS returns keys like 'lib/services.ex' instead of just 'lib' 288 | overview_keys = list(lib_overview.keys()) if hasattr(lib_overview, "keys") else [] 289 | lib_files = [key for key in overview_keys if key.startswith("lib/")] 290 | assert len(lib_files) > 0, f"Expected to find lib/ files in overview keys: {overview_keys}" 291 | 292 | # Should contain information about our modules 293 | overview_text = str(lib_overview).lower() 294 | expected_terms = ["models", "services", "user", "item"] 295 | found_terms = [term for term in expected_terms if term in overview_text] 296 | assert len(found_terms) > 0, f"Expected to find some terms from {expected_terms} in overview" 297 | 298 | # @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 299 | # def test_request_document_overview(self, language_server: SolidLanguageServer) -> None: 300 | # """Test request_document_overview functionality.""" 301 | # # COMMENTED OUT: Next LS document overview doesn't contain expected terms 302 | # # Next LS return value: [('TestRepo.Models', 2, 0, 0)] - only module info, no detailed content 303 | # # Expected terms like 'user', 'item', 'order', 'struct', 'defmodule' are not present 304 | # # This appears to be a limitation of Next LS document overview functionality 305 | # # 306 | # file_path = os.path.join("lib", "models.ex") 307 | # doc_overview = language_server.request_document_overview(file_path) 308 | # 309 | # # Should get an overview of the models.ex file 310 | # assert doc_overview is not None 311 | # 312 | # # Should contain information about our structs and functions 313 | # overview_text = str(doc_overview).lower() 314 | # expected_terms = ["user", "item", "order", "struct", "defmodule"] 315 | # found_terms = [term for term in expected_terms if term in overview_text] 316 | # assert len(found_terms) > 0, f"Expected to find some terms from {expected_terms} in overview" 317 | 318 | @pytest.mark.parametrize("language_server", [Language.ELIXIR], indirect=True) 319 | def test_containing_symbol_of_module_attribute(self, language_server: SolidLanguageServer) -> None: 320 | """Test containing symbol for module attributes.""" 321 | file_path = os.path.join("lib", "models.ex") 322 | 323 | # Find a module attribute like @type or @doc 324 | content = language_server.retrieve_full_file_content(file_path) 325 | lines = content.split("\n") 326 | attribute_line = None 327 | for i, line in enumerate(lines): 328 | if line.strip().startswith("@type") or line.strip().startswith("@doc"): 329 | attribute_line = i 330 | break 331 | 332 | if attribute_line is None: 333 | pytest.skip("Could not find module attribute") 334 | 335 | containing_symbol = language_server.request_containing_symbol(file_path, attribute_line, 5) 336 | 337 | if containing_symbol: 338 | # Should be contained within a module 339 | assert "name" in containing_symbol 340 | # The containing symbol should be a module 341 | expected_names = ["User", "Item", "Order", "TestRepo.Models"] 342 | assert any(name in containing_symbol["name"] for name in expected_names) 343 | ```