This is page 7 of 12. Use http://codebase.md/oraios/serena?page={x} to view the full context. # Directory Structure ``` ├── .devcontainer │ └── devcontainer.json ├── .dockerignore ├── .env.example ├── .github │ ├── FUNDING.yml │ ├── ISSUE_TEMPLATE │ │ ├── config.yml │ │ ├── feature_request.md │ │ └── issue--bug--performance-problem--question-.md │ └── workflows │ ├── codespell.yml │ ├── docker.yml │ ├── junie.yml │ ├── lint_and_docs.yaml │ ├── publish.yml │ └── pytest.yml ├── .gitignore ├── .serena │ ├── memories │ │ ├── adding_new_language_support_guide.md │ │ ├── serena_core_concepts_and_architecture.md │ │ ├── serena_repository_structure.md │ │ └── suggested_commands.md │ └── project.yml ├── .vscode │ └── settings.json ├── CHANGELOG.md ├── CLAUDE.md ├── compose.yaml ├── CONTRIBUTING.md ├── docker_build_and_run.sh ├── DOCKER.md ├── Dockerfile ├── docs │ ├── custom_agent.md │ └── serena_on_chatgpt.md ├── flake.lock ├── flake.nix ├── lessons_learned.md ├── LICENSE ├── llms-install.md ├── public │ └── .gitignore ├── pyproject.toml ├── README.md ├── resources │ ├── serena-icons.cdr │ ├── serena-logo-dark-mode.svg │ ├── serena-logo.cdr │ ├── serena-logo.svg │ └── vscode_sponsor_logo.png ├── roadmap.md ├── scripts │ ├── agno_agent.py │ ├── demo_run_tools.py │ ├── gen_prompt_factory.py │ ├── mcp_server.py │ ├── print_mode_context_options.py │ └── print_tool_overview.py ├── src │ ├── interprompt │ │ ├── __init__.py │ │ ├── .syncCommitId.remote │ │ ├── .syncCommitId.this │ │ ├── jinja_template.py │ │ ├── multilang_prompt.py │ │ ├── prompt_factory.py │ │ └── util │ │ ├── __init__.py │ │ └── class_decorators.py │ ├── README.md │ ├── serena │ │ ├── __init__.py │ │ ├── agent.py │ │ ├── agno.py │ │ ├── analytics.py │ │ ├── cli.py │ │ ├── code_editor.py │ │ ├── config │ │ │ ├── __init__.py │ │ │ ├── context_mode.py │ │ │ └── serena_config.py │ │ ├── constants.py │ │ ├── dashboard.py │ │ ├── generated │ │ │ └── generated_prompt_factory.py │ │ ├── gui_log_viewer.py │ │ ├── mcp.py │ │ ├── project.py │ │ ├── prompt_factory.py │ │ ├── resources │ │ │ ├── config │ │ │ │ ├── contexts │ │ │ │ │ ├── agent.yml │ │ │ │ │ ├── chatgpt.yml │ │ │ │ │ ├── codex.yml │ │ │ │ │ ├── context.template.yml │ │ │ │ │ ├── desktop-app.yml │ │ │ │ │ ├── ide-assistant.yml │ │ │ │ │ └── oaicompat-agent.yml │ │ │ │ ├── internal_modes │ │ │ │ │ └── jetbrains.yml │ │ │ │ ├── modes │ │ │ │ │ ├── editing.yml │ │ │ │ │ ├── interactive.yml │ │ │ │ │ ├── mode.template.yml │ │ │ │ │ ├── no-onboarding.yml │ │ │ │ │ ├── onboarding.yml │ │ │ │ │ ├── one-shot.yml │ │ │ │ │ └── planning.yml │ │ │ │ └── prompt_templates │ │ │ │ ├── simple_tool_outputs.yml │ │ │ │ └── system_prompt.yml │ │ │ ├── dashboard │ │ │ │ ├── dashboard.js │ │ │ │ ├── index.html │ │ │ │ ├── jquery.min.js │ │ │ │ ├── serena-icon-16.png │ │ │ │ ├── serena-icon-32.png │ │ │ │ ├── serena-icon-48.png │ │ │ │ ├── serena-logs-dark-mode.png │ │ │ │ └── serena-logs.png │ │ │ ├── project.template.yml │ │ │ └── serena_config.template.yml │ │ ├── symbol.py │ │ ├── text_utils.py │ │ ├── tools │ │ │ ├── __init__.py │ │ │ ├── cmd_tools.py │ │ │ ├── config_tools.py │ │ │ ├── file_tools.py │ │ │ ├── jetbrains_plugin_client.py │ │ │ ├── jetbrains_tools.py │ │ │ ├── memory_tools.py │ │ │ ├── symbol_tools.py │ │ │ ├── tools_base.py │ │ │ └── workflow_tools.py │ │ └── util │ │ ├── class_decorators.py │ │ ├── exception.py │ │ ├── file_system.py │ │ ├── general.py │ │ ├── git.py │ │ ├── inspection.py │ │ ├── logging.py │ │ ├── shell.py │ │ └── thread.py │ └── solidlsp │ ├── __init__.py │ ├── .gitignore │ ├── language_servers │ │ ├── al_language_server.py │ │ ├── bash_language_server.py │ │ ├── clangd_language_server.py │ │ ├── clojure_lsp.py │ │ ├── common.py │ │ ├── csharp_language_server.py │ │ ├── dart_language_server.py │ │ ├── eclipse_jdtls.py │ │ ├── elixir_tools │ │ │ ├── __init__.py │ │ │ ├── elixir_tools.py │ │ │ └── README.md │ │ ├── elm_language_server.py │ │ ├── erlang_language_server.py │ │ ├── gopls.py │ │ ├── intelephense.py │ │ ├── jedi_server.py │ │ ├── kotlin_language_server.py │ │ ├── lua_ls.py │ │ ├── marksman.py │ │ ├── nixd_ls.py │ │ ├── omnisharp │ │ │ ├── initialize_params.json │ │ │ ├── runtime_dependencies.json │ │ │ └── workspace_did_change_configuration.json │ │ ├── omnisharp.py │ │ ├── perl_language_server.py │ │ ├── pyright_server.py │ │ ├── r_language_server.py │ │ ├── regal_server.py │ │ ├── ruby_lsp.py │ │ ├── rust_analyzer.py │ │ ├── solargraph.py │ │ ├── sourcekit_lsp.py │ │ ├── terraform_ls.py │ │ ├── typescript_language_server.py │ │ ├── vts_language_server.py │ │ └── zls.py │ ├── ls_config.py │ ├── ls_exceptions.py │ ├── ls_handler.py │ ├── ls_logger.py │ ├── ls_request.py │ ├── ls_types.py │ ├── ls_utils.py │ ├── ls.py │ ├── lsp_protocol_handler │ │ ├── lsp_constants.py │ │ ├── lsp_requests.py │ │ ├── lsp_types.py │ │ └── server.py │ ├── settings.py │ └── util │ ├── subprocess_util.py │ └── zip.py ├── test │ ├── __init__.py │ ├── conftest.py │ ├── resources │ │ └── repos │ │ ├── al │ │ │ └── test_repo │ │ │ ├── app.json │ │ │ └── src │ │ │ ├── Codeunits │ │ │ │ ├── CustomerMgt.Codeunit.al │ │ │ │ └── PaymentProcessorImpl.Codeunit.al │ │ │ ├── Enums │ │ │ │ └── CustomerType.Enum.al │ │ │ ├── Interfaces │ │ │ │ └── IPaymentProcessor.Interface.al │ │ │ ├── Pages │ │ │ │ ├── CustomerCard.Page.al │ │ │ │ └── CustomerList.Page.al │ │ │ ├── TableExtensions │ │ │ │ └── Item.TableExt.al │ │ │ └── Tables │ │ │ └── Customer.Table.al │ │ ├── bash │ │ │ └── test_repo │ │ │ ├── config.sh │ │ │ ├── main.sh │ │ │ └── utils.sh │ │ ├── clojure │ │ │ └── test_repo │ │ │ ├── deps.edn │ │ │ └── src │ │ │ └── test_app │ │ │ ├── core.clj │ │ │ └── utils.clj │ │ ├── csharp │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── Models │ │ │ │ └── Person.cs │ │ │ ├── Program.cs │ │ │ ├── serena.sln │ │ │ └── TestProject.csproj │ │ ├── dart │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── helper.dart │ │ │ │ ├── main.dart │ │ │ │ └── models.dart │ │ │ └── pubspec.yaml │ │ ├── elixir │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── examples.ex │ │ │ │ ├── ignored_dir │ │ │ │ │ └── ignored_module.ex │ │ │ │ ├── models.ex │ │ │ │ ├── services.ex │ │ │ │ ├── test_repo.ex │ │ │ │ └── utils.ex │ │ │ ├── mix.exs │ │ │ ├── mix.lock │ │ │ ├── scripts │ │ │ │ └── build_script.ex │ │ │ └── test │ │ │ ├── models_test.exs │ │ │ └── test_repo_test.exs │ │ ├── elm │ │ │ └── test_repo │ │ │ ├── elm.json │ │ │ ├── Main.elm │ │ │ └── Utils.elm │ │ ├── erlang │ │ │ └── test_repo │ │ │ ├── hello.erl │ │ │ ├── ignored_dir │ │ │ │ └── ignored_module.erl │ │ │ ├── include │ │ │ │ ├── records.hrl │ │ │ │ └── types.hrl │ │ │ ├── math_utils.erl │ │ │ ├── rebar.config │ │ │ ├── src │ │ │ │ ├── app.erl │ │ │ │ ├── models.erl │ │ │ │ ├── services.erl │ │ │ │ └── utils.erl │ │ │ └── test │ │ │ ├── models_tests.erl │ │ │ └── utils_tests.erl │ │ ├── go │ │ │ └── test_repo │ │ │ └── main.go │ │ ├── java │ │ │ └── test_repo │ │ │ ├── pom.xml │ │ │ └── src │ │ │ └── main │ │ │ └── java │ │ │ └── test_repo │ │ │ ├── Main.java │ │ │ ├── Model.java │ │ │ ├── ModelUser.java │ │ │ └── Utils.java │ │ ├── kotlin │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── build.gradle.kts │ │ │ └── src │ │ │ └── main │ │ │ └── kotlin │ │ │ └── test_repo │ │ │ ├── Main.kt │ │ │ ├── Model.kt │ │ │ ├── ModelUser.kt │ │ │ └── Utils.kt │ │ ├── lua │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── main.lua │ │ │ ├── src │ │ │ │ ├── calculator.lua │ │ │ │ └── utils.lua │ │ │ └── tests │ │ │ └── test_calculator.lua │ │ ├── markdown │ │ │ └── test_repo │ │ │ ├── api.md │ │ │ ├── CONTRIBUTING.md │ │ │ ├── guide.md │ │ │ └── README.md │ │ ├── nix │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── default.nix │ │ │ ├── flake.nix │ │ │ ├── lib │ │ │ │ └── utils.nix │ │ │ ├── modules │ │ │ │ └── example.nix │ │ │ └── scripts │ │ │ └── hello.sh │ │ ├── perl │ │ │ └── test_repo │ │ │ ├── helper.pl │ │ │ └── main.pl │ │ ├── php │ │ │ └── test_repo │ │ │ ├── helper.php │ │ │ ├── index.php │ │ │ └── simple_var.php │ │ ├── python │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── custom_test │ │ │ │ ├── __init__.py │ │ │ │ └── advanced_features.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── user_management.py │ │ │ ├── ignore_this_dir_with_postfix │ │ │ │ └── ignored_module.py │ │ │ ├── scripts │ │ │ │ ├── __init__.py │ │ │ │ └── run_app.py │ │ │ └── test_repo │ │ │ ├── __init__.py │ │ │ ├── complex_types.py │ │ │ ├── models.py │ │ │ ├── name_collisions.py │ │ │ ├── nested_base.py │ │ │ ├── nested.py │ │ │ ├── overloaded.py │ │ │ ├── services.py │ │ │ ├── utils.py │ │ │ └── variables.py │ │ ├── r │ │ │ └── test_repo │ │ │ ├── .Rbuildignore │ │ │ ├── DESCRIPTION │ │ │ ├── examples │ │ │ │ └── analysis.R │ │ │ ├── NAMESPACE │ │ │ └── R │ │ │ ├── models.R │ │ │ └── utils.R │ │ ├── rego │ │ │ └── test_repo │ │ │ ├── policies │ │ │ │ ├── authz.rego │ │ │ │ └── validation.rego │ │ │ └── utils │ │ │ └── helpers.rego │ │ ├── ruby │ │ │ └── test_repo │ │ │ ├── .solargraph.yml │ │ │ ├── examples │ │ │ │ └── user_management.rb │ │ │ ├── lib.rb │ │ │ ├── main.rb │ │ │ ├── models.rb │ │ │ ├── nested.rb │ │ │ ├── services.rb │ │ │ └── variables.rb │ │ ├── rust │ │ │ ├── test_repo │ │ │ │ ├── Cargo.lock │ │ │ │ ├── Cargo.toml │ │ │ │ └── src │ │ │ │ ├── lib.rs │ │ │ │ └── main.rs │ │ │ └── test_repo_2024 │ │ │ ├── Cargo.lock │ │ │ ├── Cargo.toml │ │ │ └── src │ │ │ ├── lib.rs │ │ │ └── main.rs │ │ ├── swift │ │ │ └── test_repo │ │ │ ├── Package.swift │ │ │ └── src │ │ │ ├── main.swift │ │ │ └── utils.swift │ │ ├── terraform │ │ │ └── test_repo │ │ │ ├── data.tf │ │ │ ├── main.tf │ │ │ ├── outputs.tf │ │ │ └── variables.tf │ │ ├── typescript │ │ │ └── test_repo │ │ │ ├── .serena │ │ │ │ └── project.yml │ │ │ ├── index.ts │ │ │ ├── tsconfig.json │ │ │ └── use_helper.ts │ │ └── zig │ │ └── test_repo │ │ ├── .gitignore │ │ ├── build.zig │ │ ├── src │ │ │ ├── calculator.zig │ │ │ ├── main.zig │ │ │ └── math_utils.zig │ │ └── zls.json │ ├── serena │ │ ├── __init__.py │ │ ├── __snapshots__ │ │ │ └── test_symbol_editing.ambr │ │ ├── config │ │ │ ├── __init__.py │ │ │ └── test_serena_config.py │ │ ├── test_edit_marker.py │ │ ├── test_mcp.py │ │ ├── test_serena_agent.py │ │ ├── test_symbol_editing.py │ │ ├── test_symbol.py │ │ ├── test_text_utils.py │ │ ├── test_tool_parameter_types.py │ │ └── util │ │ ├── test_exception.py │ │ └── test_file_system.py │ └── solidlsp │ ├── al │ │ └── test_al_basic.py │ ├── bash │ │ ├── __init__.py │ │ └── test_bash_basic.py │ ├── clojure │ │ ├── __init__.py │ │ └── test_clojure_basic.py │ ├── csharp │ │ └── test_csharp_basic.py │ ├── dart │ │ ├── __init__.py │ │ └── test_dart_basic.py │ ├── elixir │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_elixir_basic.py │ │ ├── test_elixir_ignored_dirs.py │ │ ├── test_elixir_integration.py │ │ └── test_elixir_symbol_retrieval.py │ ├── elm │ │ └── test_elm_basic.py │ ├── erlang │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_erlang_basic.py │ │ ├── test_erlang_ignored_dirs.py │ │ └── test_erlang_symbol_retrieval.py │ ├── go │ │ └── test_go_basic.py │ ├── java │ │ └── test_java_basic.py │ ├── kotlin │ │ └── test_kotlin_basic.py │ ├── lua │ │ └── test_lua_basic.py │ ├── markdown │ │ ├── __init__.py │ │ └── test_markdown_basic.py │ ├── nix │ │ └── test_nix_basic.py │ ├── perl │ │ └── test_perl_basic.py │ ├── php │ │ └── test_php_basic.py │ ├── python │ │ ├── test_python_basic.py │ │ ├── test_retrieval_with_ignored_dirs.py │ │ └── test_symbol_retrieval.py │ ├── r │ │ ├── __init__.py │ │ └── test_r_basic.py │ ├── rego │ │ └── test_rego_basic.py │ ├── ruby │ │ ├── test_ruby_basic.py │ │ └── test_ruby_symbol_retrieval.py │ ├── rust │ │ ├── test_rust_2024_edition.py │ │ └── test_rust_basic.py │ ├── swift │ │ └── test_swift_basic.py │ ├── terraform │ │ └── test_terraform_basic.py │ ├── typescript │ │ └── test_typescript_basic.py │ ├── util │ │ └── test_zip.py │ └── zig │ └── test_zig_basic.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/solidlsp/ls_request.py: -------------------------------------------------------------------------------- ```python from typing import TYPE_CHECKING, Any, Union from solidlsp.lsp_protocol_handler import lsp_types if TYPE_CHECKING: from .ls_handler import SolidLanguageServerHandler class LanguageServerRequest: def __init__(self, handler: "SolidLanguageServerHandler"): self.handler = handler def _send_request(self, method: str, params: Any | None = None) -> Any: return self.handler.send_request(method, params) def implementation(self, params: lsp_types.ImplementationParams) -> Union["lsp_types.Definition", list["lsp_types.LocationLink"], None]: """A request to resolve the implementation locations of a symbol at a given text document position. The request's parameter is of type [TextDocumentPositionParams] (#TextDocumentPositionParams) the response is of type {@link Definition} or a Thenable that resolves to such. """ return self._send_request("textDocument/implementation", params) def type_definition( self, params: lsp_types.TypeDefinitionParams ) -> Union["lsp_types.Definition", list["lsp_types.LocationLink"], None]: """A request to resolve the type definition locations of a symbol at a given text document position. The request's parameter is of type [TextDocumentPositionParams] (#TextDocumentPositionParams) the response is of type {@link Definition} or a Thenable that resolves to such. """ return self._send_request("textDocument/typeDefinition", params) def document_color(self, params: lsp_types.DocumentColorParams) -> list["lsp_types.ColorInformation"]: """A request to list all color symbols found in a given text document. The request's parameter is of type {@link DocumentColorParams} the response is of type {@link ColorInformation ColorInformation[]} or a Thenable that resolves to such. """ return self._send_request("textDocument/documentColor", params) def color_presentation(self, params: lsp_types.ColorPresentationParams) -> list["lsp_types.ColorPresentation"]: """A request to list all presentation for a color. The request's parameter is of type {@link ColorPresentationParams} the response is of type {@link ColorInformation ColorInformation[]} or a Thenable that resolves to such. """ return self._send_request("textDocument/colorPresentation", params) def folding_range(self, params: lsp_types.FoldingRangeParams) -> list["lsp_types.FoldingRange"] | None: """A request to provide folding ranges in a document. The request's parameter is of type {@link FoldingRangeParams}, the response is of type {@link FoldingRangeList} or a Thenable that resolves to such. """ return self._send_request("textDocument/foldingRange", params) def declaration(self, params: lsp_types.DeclarationParams) -> Union["lsp_types.Declaration", list["lsp_types.LocationLink"], None]: """A request to resolve the type definition locations of a symbol at a given text document position. The request's parameter is of type [TextDocumentPositionParams] (#TextDocumentPositionParams) the response is of type {@link Declaration} or a typed array of {@link DeclarationLink} or a Thenable that resolves to such. """ return self._send_request("textDocument/declaration", params) def selection_range(self, params: lsp_types.SelectionRangeParams) -> list["lsp_types.SelectionRange"] | None: """A request to provide selection ranges in a document. The request's parameter is of type {@link SelectionRangeParams}, the response is of type {@link SelectionRange SelectionRange[]} or a Thenable that resolves to such. """ return self._send_request("textDocument/selectionRange", params) def prepare_call_hierarchy(self, params: lsp_types.CallHierarchyPrepareParams) -> list["lsp_types.CallHierarchyItem"] | None: """A request to result a `CallHierarchyItem` in a document at a given position. Can be used as an input to an incoming or outgoing call hierarchy. @since 3.16.0 """ return self._send_request("textDocument/prepareCallHierarchy", params) def incoming_calls(self, params: lsp_types.CallHierarchyIncomingCallsParams) -> list["lsp_types.CallHierarchyIncomingCall"] | None: """A request to resolve the incoming calls for a given `CallHierarchyItem`. @since 3.16.0 """ return self._send_request("callHierarchy/incomingCalls", params) def outgoing_calls(self, params: lsp_types.CallHierarchyOutgoingCallsParams) -> list["lsp_types.CallHierarchyOutgoingCall"] | None: """A request to resolve the outgoing calls for a given `CallHierarchyItem`. @since 3.16.0 """ return self._send_request("callHierarchy/outgoingCalls", params) def semantic_tokens_full(self, params: lsp_types.SemanticTokensParams) -> Union["lsp_types.SemanticTokens", None]: """@since 3.16.0""" return self._send_request("textDocument/semanticTokens/full", params) def semantic_tokens_delta( self, params: lsp_types.SemanticTokensDeltaParams ) -> Union["lsp_types.SemanticTokens", "lsp_types.SemanticTokensDelta", None]: """@since 3.16.0""" return self._send_request("textDocument/semanticTokens/full/delta", params) def semantic_tokens_range(self, params: lsp_types.SemanticTokensRangeParams) -> Union["lsp_types.SemanticTokens", None]: """@since 3.16.0""" return self._send_request("textDocument/semanticTokens/range", params) def linked_editing_range(self, params: lsp_types.LinkedEditingRangeParams) -> Union["lsp_types.LinkedEditingRanges", None]: """A request to provide ranges that can be edited together. @since 3.16.0 """ return self._send_request("textDocument/linkedEditingRange", params) def will_create_files(self, params: lsp_types.CreateFilesParams) -> Union["lsp_types.WorkspaceEdit", None]: """The will create files request is sent from the client to the server before files are actually created as long as the creation is triggered from within the client. @since 3.16.0 """ return self._send_request("workspace/willCreateFiles", params) def will_rename_files(self, params: lsp_types.RenameFilesParams) -> Union["lsp_types.WorkspaceEdit", None]: """The will rename files request is sent from the client to the server before files are actually renamed as long as the rename is triggered from within the client. @since 3.16.0 """ return self._send_request("workspace/willRenameFiles", params) def will_delete_files(self, params: lsp_types.DeleteFilesParams) -> Union["lsp_types.WorkspaceEdit", None]: """The did delete files notification is sent from the client to the server when files were deleted from within the client. @since 3.16.0 """ return self._send_request("workspace/willDeleteFiles", params) def moniker(self, params: lsp_types.MonikerParams) -> list["lsp_types.Moniker"] | None: """A request to get the moniker of a symbol at a given text document position. The request parameter is of type {@link TextDocumentPositionParams}. The response is of type {@link Moniker Moniker[]} or `null`. """ return self._send_request("textDocument/moniker", params) def prepare_type_hierarchy(self, params: lsp_types.TypeHierarchyPrepareParams) -> list["lsp_types.TypeHierarchyItem"] | None: """A request to result a `TypeHierarchyItem` in a document at a given position. Can be used as an input to a subtypes or supertypes type hierarchy. @since 3.17.0 """ return self._send_request("textDocument/prepareTypeHierarchy", params) def type_hierarchy_supertypes(self, params: lsp_types.TypeHierarchySupertypesParams) -> list["lsp_types.TypeHierarchyItem"] | None: """A request to resolve the supertypes for a given `TypeHierarchyItem`. @since 3.17.0 """ return self._send_request("typeHierarchy/supertypes", params) def type_hierarchy_subtypes(self, params: lsp_types.TypeHierarchySubtypesParams) -> list["lsp_types.TypeHierarchyItem"] | None: """A request to resolve the subtypes for a given `TypeHierarchyItem`. @since 3.17.0 """ return self._send_request("typeHierarchy/subtypes", params) def inline_value(self, params: lsp_types.InlineValueParams) -> list["lsp_types.InlineValue"] | None: """A request to provide inline values in a document. The request's parameter is of type {@link InlineValueParams}, the response is of type {@link InlineValue InlineValue[]} or a Thenable that resolves to such. @since 3.17.0 """ return self._send_request("textDocument/inlineValue", params) def inlay_hint(self, params: lsp_types.InlayHintParams) -> list["lsp_types.InlayHint"] | None: """A request to provide inlay hints in a document. The request's parameter is of type {@link InlayHintsParams}, the response is of type {@link InlayHint InlayHint[]} or a Thenable that resolves to such. @since 3.17.0 """ return self._send_request("textDocument/inlayHint", params) def resolve_inlay_hint(self, params: lsp_types.InlayHint) -> "lsp_types.InlayHint": """A request to resolve additional properties for an inlay hint. The request's parameter is of type {@link InlayHint}, the response is of type {@link InlayHint} or a Thenable that resolves to such. @since 3.17.0 """ return self._send_request("inlayHint/resolve", params) def text_document_diagnostic(self, params: lsp_types.DocumentDiagnosticParams) -> "lsp_types.DocumentDiagnosticReport": """The document diagnostic request definition. @since 3.17.0 """ return self._send_request("textDocument/diagnostic", params) def workspace_diagnostic(self, params: lsp_types.WorkspaceDiagnosticParams) -> "lsp_types.WorkspaceDiagnosticReport": """The workspace diagnostic request definition. @since 3.17.0 """ return self._send_request("workspace/diagnostic", params) def initialize(self, params: lsp_types.InitializeParams) -> "lsp_types.InitializeResult": """The initialize request is sent from the client to the server. It is sent once as the request after starting up the server. The requests parameter is of type {@link InitializeParams} the response if of type {@link InitializeResult} of a Thenable that resolves to such. """ return self._send_request("initialize", params) def shutdown(self) -> None: """A shutdown request is sent from the client to the server. It is sent once when the client decides to shutdown the server. The only notification that is sent after a shutdown request is the exit event. """ return self._send_request("shutdown") def will_save_wait_until(self, params: lsp_types.WillSaveTextDocumentParams) -> list["lsp_types.TextEdit"] | None: """A document will save request is sent from the client to the server before the document is actually saved. The request can return an array of TextEdits which will be applied to the text document before it is saved. Please note that clients might drop results if computing the text edits took too long or if a server constantly fails on this request. This is done to keep the save fast and reliable. """ return self._send_request("textDocument/willSaveWaitUntil", params) def completion(self, params: lsp_types.CompletionParams) -> Union[list["lsp_types.CompletionItem"], "lsp_types.CompletionList", None]: """Request to request completion at a given text document position. The request's parameter is of type {@link TextDocumentPosition} the response is of type {@link CompletionItem CompletionItem[]} or {@link CompletionList} or a Thenable that resolves to such. The request can delay the computation of the {@link CompletionItem.detail `detail`} and {@link CompletionItem.documentation `documentation`} properties to the `completionItem/resolve` request. However, properties that are needed for the initial sorting and filtering, like `sortText`, `filterText`, `insertText`, and `textEdit`, must not be changed during resolve. """ return self._send_request("textDocument/completion", params) def resolve_completion_item(self, params: lsp_types.CompletionItem) -> "lsp_types.CompletionItem": """Request to resolve additional information for a given completion item.The request's parameter is of type {@link CompletionItem} the response is of type {@link CompletionItem} or a Thenable that resolves to such. """ return self._send_request("completionItem/resolve", params) def hover(self, params: lsp_types.HoverParams) -> Union["lsp_types.Hover", None]: """Request to request hover information at a given text document position. The request's parameter is of type {@link TextDocumentPosition} the response is of type {@link Hover} or a Thenable that resolves to such. """ return self._send_request("textDocument/hover", params) def signature_help(self, params: lsp_types.SignatureHelpParams) -> Union["lsp_types.SignatureHelp", None]: return self._send_request("textDocument/signatureHelp", params) def definition(self, params: lsp_types.DefinitionParams) -> Union["lsp_types.Definition", list["lsp_types.LocationLink"], None]: """A request to resolve the definition location of a symbol at a given text document position. The request's parameter is of type [TextDocumentPosition] (#TextDocumentPosition) the response is of either type {@link Definition} or a typed array of {@link DefinitionLink} or a Thenable that resolves to such. """ return self._send_request("textDocument/definition", params) def references(self, params: lsp_types.ReferenceParams) -> list["lsp_types.Location"] | None: """A request to resolve project-wide references for the symbol denoted by the given text document position. The request's parameter is of type {@link ReferenceParams} the response is of type {@link Location Location[]} or a Thenable that resolves to such. """ return self._send_request("textDocument/references", params) def document_highlight(self, params: lsp_types.DocumentHighlightParams) -> list["lsp_types.DocumentHighlight"] | None: """Request to resolve a {@link DocumentHighlight} for a given text document position. The request's parameter is of type [TextDocumentPosition] (#TextDocumentPosition) the request response is of type [DocumentHighlight[]] (#DocumentHighlight) or a Thenable that resolves to such. """ return self._send_request("textDocument/documentHighlight", params) def document_symbol( self, params: lsp_types.DocumentSymbolParams ) -> list["lsp_types.SymbolInformation"] | list["lsp_types.DocumentSymbol"] | None: """A request to list all symbols found in a given text document. The request's parameter is of type {@link TextDocumentIdentifier} the response is of type {@link SymbolInformation SymbolInformation[]} or a Thenable that resolves to such. """ return self._send_request("textDocument/documentSymbol", params) def code_action(self, params: lsp_types.CodeActionParams) -> list[Union["lsp_types.Command", "lsp_types.CodeAction"]] | None: """A request to provide commands for the given text document and range.""" return self._send_request("textDocument/codeAction", params) def resolve_code_action(self, params: lsp_types.CodeAction) -> "lsp_types.CodeAction": """Request to resolve additional information for a given code action.The request's parameter is of type {@link CodeAction} the response is of type {@link CodeAction} or a Thenable that resolves to such. """ return self._send_request("codeAction/resolve", params) def workspace_symbol( self, params: lsp_types.WorkspaceSymbolParams ) -> list["lsp_types.SymbolInformation"] | list["lsp_types.WorkspaceSymbol"] | None: """A request to list project-wide symbols matching the query string given by the {@link WorkspaceSymbolParams}. The response is of type {@link SymbolInformation SymbolInformation[]} or a Thenable that resolves to such. @since 3.17.0 - support for WorkspaceSymbol in the returned data. Clients need to advertise support for WorkspaceSymbols via the client capability `workspace.symbol.resolveSupport`. """ return self._send_request("workspace/symbol", params) def resolve_workspace_symbol(self, params: lsp_types.WorkspaceSymbol) -> "lsp_types.WorkspaceSymbol": """A request to resolve the range inside the workspace symbol's location. @since 3.17.0 """ return self._send_request("workspaceSymbol/resolve", params) def code_lens(self, params: lsp_types.CodeLensParams) -> list["lsp_types.CodeLens"] | None: """A request to provide code lens for the given text document.""" return self._send_request("textDocument/codeLens", params) def resolve_code_lens(self, params: lsp_types.CodeLens) -> "lsp_types.CodeLens": """A request to resolve a command for a given code lens.""" return self._send_request("codeLens/resolve", params) def document_link(self, params: lsp_types.DocumentLinkParams) -> list["lsp_types.DocumentLink"] | None: """A request to provide document links""" return self._send_request("textDocument/documentLink", params) def resolve_document_link(self, params: lsp_types.DocumentLink) -> "lsp_types.DocumentLink": """Request to resolve additional information for a given document link. The request's parameter is of type {@link DocumentLink} the response is of type {@link DocumentLink} or a Thenable that resolves to such. """ return self._send_request("documentLink/resolve", params) def formatting(self, params: lsp_types.DocumentFormattingParams) -> list["lsp_types.TextEdit"] | None: """A request to to format a whole document.""" return self._send_request("textDocument/formatting", params) def range_formatting(self, params: lsp_types.DocumentRangeFormattingParams) -> list["lsp_types.TextEdit"] | None: """A request to to format a range in a document.""" return self._send_request("textDocument/rangeFormatting", params) def on_type_formatting(self, params: lsp_types.DocumentOnTypeFormattingParams) -> list["lsp_types.TextEdit"] | None: """A request to format a document on type.""" return self._send_request("textDocument/onTypeFormatting", params) def rename(self, params: lsp_types.RenameParams) -> Union["lsp_types.WorkspaceEdit", None]: """A request to rename a symbol.""" return self._send_request("textDocument/rename", params) def prepare_rename(self, params: lsp_types.PrepareRenameParams) -> Union["lsp_types.PrepareRenameResult", None]: """A request to test and perform the setup necessary for a rename. @since 3.16 - support for default behavior """ return self._send_request("textDocument/prepareRename", params) def execute_command(self, params: lsp_types.ExecuteCommandParams) -> Union["lsp_types.LSPAny", None]: """A request send from the client to the server to execute a command. The request might return a workspace edit which the client will apply to the workspace. """ return self._send_request("workspace/executeCommand", params) ``` -------------------------------------------------------------------------------- /test/solidlsp/dart/test_dart_basic.py: -------------------------------------------------------------------------------- ```python import os from pathlib import Path import pytest from solidlsp import SolidLanguageServer from solidlsp.ls_config import Language from solidlsp.ls_types import SymbolKind from solidlsp.ls_utils import SymbolUtils @pytest.mark.dart class TestDartLanguageServer: @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_ls_is_running(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test that the language server starts and stops successfully.""" # The fixture already handles start and stop assert language_server.is_running() assert Path(language_server.language_server.repository_root_path).resolve() == repo_path.resolve() @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_definition_within_file(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding definition of a method within the same file.""" # In lib/main.dart: # Line 105: final result1 = calc.add(5, 3); // Reference to add method # Line 12: int add(int a, int b) { // Definition of add method # Find definition of 'add' method from its usage main_dart_path = str(repo_path / "lib" / "main.dart") # Position: calc.add(5, 3) - cursor on 'add' # Line 105 (1-indexed) = line 104 (0-indexed), char position around 22 definition_location_list = language_server.request_definition(main_dart_path, 104, 22) assert definition_location_list, f"Expected non-empty definition_location_list but got {definition_location_list=}" assert len(definition_location_list) >= 1 definition_location = definition_location_list[0] assert definition_location["uri"].endswith("main.dart") # Definition of add method should be around line 11 (0-indexed) # But language server may return different positions assert definition_location["range"]["start"]["line"] >= 0 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_definition_across_files(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding definition across different files.""" # Test finding definition of MathHelper class which is in helper.dart # In lib/main.dart line 50: MathHelper.power(step1, 2) main_dart_path = str(repo_path / "lib" / "main.dart") # Position: MathHelper.power(step1, 2) - cursor on 'MathHelper' # Line 50 (1-indexed) = line 49 (0-indexed), char position around 18 definition_location_list = language_server.request_definition(main_dart_path, 49, 18) # Skip the test if language server doesn't find cross-file references # This is acceptable for a basic test - the important thing is that LS is working if not definition_location_list: pytest.skip("Language server doesn't support cross-file definition lookup for this case") assert len(definition_location_list) >= 1 definition_location = definition_location_list[0] assert definition_location["uri"].endswith("helper.dart") assert definition_location["range"]["start"]["line"] >= 0 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_definition_class_method(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding definition of a class method.""" # In lib/main.dart: # Line 50: final step2 = MathHelper.power(step1, 2); // Reference to MathHelper.power method # In lib/helper.dart: # Line 14: static double power(double base, int exponent) { // Definition of power method main_dart_path = str(repo_path / "lib" / "main.dart") # Position: MathHelper.power(step1, 2) - cursor on 'power' # Line 50 (1-indexed) = line 49 (0-indexed), char position around 30 definition_location_list = language_server.request_definition(main_dart_path, 49, 30) assert definition_location_list, f"Expected non-empty definition_location_list but got {definition_location_list=}" assert len(definition_location_list) >= 1 definition_location = definition_location_list[0] assert definition_location["uri"].endswith("helper.dart") # Definition of power method should be around line 13 (0-indexed) assert 12 <= definition_location["range"]["start"]["line"] <= 16 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_references_within_file(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding references to a method within the same file.""" main_dart_path = str(repo_path / "lib" / "main.dart") # Find references to the 'add' method from its definition # Line 12: int add(int a, int b) { // Definition of add method # Line 105: final result1 = calc.add(5, 3); // Usage of add method references = language_server.request_references(main_dart_path, 11, 6) # cursor on 'add' in definition assert references, f"Expected non-empty references but got {references=}" # Should find at least the usage of add method assert len(references) >= 1 # Check that we have a reference in main.dart main_dart_references = [ref for ref in references if ref["uri"].endswith("main.dart")] assert len(main_dart_references) >= 1 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_references_across_files(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding references across different files.""" helper_dart_path = str(repo_path / "lib" / "helper.dart") # Find references to the 'subtract' function from its definition in helper.dart # Definition is in helper.dart, usage is in main.dart references = language_server.request_references(helper_dart_path, 4, 4) # cursor on 'subtract' in definition assert references, f"Expected non-empty references for subtract function but got {references=}" # Should find references in main.dart main_dart_references = [ref for ref in references if ref["uri"].endswith("main.dart")] assert len(main_dart_references) >= 1 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_definition_constructor(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding definition of a constructor call.""" main_dart_path = str(repo_path / "lib" / "main.dart") # In lib/main.dart: # Line 104: final calc = Calculator(); // Reference to Calculator constructor # Line 4: class Calculator { // Definition of Calculator class definition_location_list = language_server.request_definition(main_dart_path, 103, 18) # cursor on 'Calculator' assert definition_location_list, f"Expected non-empty definition_location_list but got {definition_location_list=}" assert len(definition_location_list) >= 1 definition_location = definition_location_list[0] assert definition_location["uri"].endswith("main.dart") # Definition of Calculator class should be around line 3 (0-indexed) assert 3 <= definition_location["range"]["start"]["line"] <= 7 @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) @pytest.mark.parametrize("repo_path", [Language.DART], indirect=True) def test_find_definition_import(self, language_server: SolidLanguageServer, repo_path: Path) -> None: """Test finding definition through imports.""" models_dart_path = str(repo_path / "lib" / "models.dart") # Test finding definition of User class name where it's used # In lib/models.dart line 27 (constructor): User(this.id, this.name, this.email, this._age); definition_location_list = language_server.request_definition(models_dart_path, 26, 2) # cursor on 'User' in constructor # Skip if language server doesn't find definition in this case if not definition_location_list: pytest.skip("Language server doesn't support definition lookup for this case") assert len(definition_location_list) >= 1 definition_location = definition_location_list[0] # Language server might return SDK files instead of local files # This is acceptable behavior - the important thing is that it found a definition assert "dart" in definition_location["uri"].lower() @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_find_symbol(self, language_server: SolidLanguageServer) -> None: """Test finding symbols in the full symbol tree.""" symbols = language_server.request_full_symbol_tree() assert SymbolUtils.symbol_tree_contains_name(symbols, "Calculator"), "Calculator class not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "add"), "add method not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "subtract"), "subtract function not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "MathHelper"), "MathHelper class not found in symbol tree" assert SymbolUtils.symbol_tree_contains_name(symbols, "User"), "User class not found in symbol tree" @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_find_referencing_symbols(self, language_server: SolidLanguageServer) -> None: """Test finding references using symbol selection range.""" file_path = os.path.join("lib", "main.dart") symbols = language_server.request_document_symbols(file_path) # Handle nested symbol structure - symbols can be nested in lists symbol_list = symbols[0] if symbols and isinstance(symbols[0], list) else symbols # Find the 'add' method symbol in Calculator class add_symbol = None for sym in symbol_list: if sym.get("name") == "add": add_symbol = sym break # Check for nested symbols (methods inside classes) if "children" in sym and sym.get("name") == "Calculator": for child in sym["children"]: if child.get("name") == "add": add_symbol = child break if add_symbol: break assert add_symbol is not None, "Could not find 'add' method symbol in main.dart" sel_start = add_symbol["selectionRange"]["start"] refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) # Check that we found references - at least one should be in main.dart assert any( "main.dart" in ref.get("relativePath", "") or "main.dart" in ref.get("uri", "") for ref in refs ), "main.dart should reference add method (tried all positions in selectionRange)" @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_containing_symbol_method(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol for a method.""" file_path = os.path.join("lib", "main.dart") # Line 14 is inside the add method body (around 'final result = a + b;') containing_symbol = language_server.request_containing_symbol(file_path, 13, 10, include_body=True) # Verify that we found the containing symbol if containing_symbol is not None: assert containing_symbol["name"] == "add" assert containing_symbol["kind"] == SymbolKind.Method if "body" in containing_symbol: assert "add" in containing_symbol["body"] or "final result" in containing_symbol["body"] @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_containing_symbol_class(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol for a class.""" file_path = os.path.join("lib", "main.dart") # Line 4 is the Calculator class definition line containing_symbol = language_server.request_containing_symbol(file_path, 4, 6) # Verify that we found the containing symbol if containing_symbol is not None: assert containing_symbol["name"] == "Calculator" assert containing_symbol["kind"] == SymbolKind.Class @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_containing_symbol_nested(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol with nested scopes.""" file_path = os.path.join("lib", "main.dart") # Line 14 is inside the add method inside Calculator class containing_symbol = language_server.request_containing_symbol(file_path, 13, 20) # Verify that we found the innermost containing symbol (the method) if containing_symbol is not None: assert containing_symbol["name"] == "add" assert containing_symbol["kind"] == SymbolKind.Method @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_defining_symbol_variable(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a variable usage.""" file_path = os.path.join("lib", "main.dart") # Line 14 contains 'final result = a + b;' - test position on 'result' defining_symbol = language_server.request_defining_symbol(file_path, 13, 10) # The defining symbol might be the variable itself or the containing method # This is acceptable behavior - different language servers handle this differently if defining_symbol is not None: assert defining_symbol.get("name") in ["result", "add"] if defining_symbol.get("name") == "add": assert defining_symbol.get("kind") == SymbolKind.Method.value @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_defining_symbol_imported_class(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for an imported class/function.""" file_path = os.path.join("lib", "main.dart") # Line 20 references 'subtract' which was imported from helper.dart defining_symbol = language_server.request_defining_symbol(file_path, 19, 18) # Verify that we found the defining symbol - this should be the subtract function from helper.dart if defining_symbol is not None: assert defining_symbol.get("name") == "subtract" # Could be Function or Method depending on language server interpretation assert defining_symbol.get("kind") in [SymbolKind.Function.value, SymbolKind.Method.value] @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_defining_symbol_class_method(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a static class method.""" file_path = os.path.join("lib", "main.dart") # Line 50 references MathHelper.power - test position on 'power' defining_symbol = language_server.request_defining_symbol(file_path, 49, 30) # Verify that we found the defining symbol - should be the power method if defining_symbol is not None: assert defining_symbol.get("name") == "power" assert defining_symbol.get("kind") == SymbolKind.Method.value @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_document_symbols(self, language_server: SolidLanguageServer) -> None: """Test getting document symbols from a Dart file.""" file_path = os.path.join("lib", "main.dart") symbols = language_server.request_document_symbols(file_path) # Check that we have symbols assert len(symbols) > 0 # Flatten the symbols if they're nested symbol_list = symbols[0] if symbols and isinstance(symbols[0], list) else symbols # Look for expected classes and methods symbol_names = [s.get("name") for s in symbol_list] assert "Calculator" in symbol_names # Check for nested symbols (methods inside classes) - optional calculator_symbol = next((s for s in symbol_list if s.get("name") == "Calculator"), None) if calculator_symbol and "children" in calculator_symbol and calculator_symbol["children"]: method_names = [child.get("name") for child in calculator_symbol["children"]] # If children are populated, we should find the add method assert "add" in method_names else: # Some language servers may not populate children in document symbols # This is acceptable behavior - the important thing is we found the class pass @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_request_referencing_symbols_comprehensive(self, language_server: SolidLanguageServer) -> None: """Test comprehensive referencing symbols functionality.""" file_path = os.path.join("lib", "main.dart") symbols = language_server.request_document_symbols(file_path) # Handle nested symbol structure symbol_list = symbols[0] if symbols and isinstance(symbols[0], list) else symbols # Find Calculator class and test its references calculator_symbol = None for sym in symbol_list: if sym.get("name") == "Calculator": calculator_symbol = sym break if calculator_symbol and "selectionRange" in calculator_symbol: sel_start = calculator_symbol["selectionRange"]["start"] refs = language_server.request_references(file_path, sel_start["line"], sel_start["character"]) # Should find references to Calculator (constructor calls, etc.) if refs: # Verify the structure of referencing symbols for ref in refs: assert "uri" in ref or "relativePath" in ref if "range" in ref: assert "start" in ref["range"] assert "end" in ref["range"] @pytest.mark.parametrize("language_server", [Language.DART], indirect=True) def test_cross_file_symbol_resolution(self, language_server: SolidLanguageServer) -> None: """Test symbol resolution across multiple files.""" helper_file_path = os.path.join("lib", "helper.dart") # Test finding references to subtract function from helper.dart in main.dart helper_symbols = language_server.request_document_symbols(helper_file_path) symbol_list = helper_symbols[0] if helper_symbols and isinstance(helper_symbols[0], list) else helper_symbols subtract_symbol = next((s for s in symbol_list if s.get("name") == "subtract"), None) if subtract_symbol and "selectionRange" in subtract_symbol: sel_start = subtract_symbol["selectionRange"]["start"] refs = language_server.request_references(helper_file_path, sel_start["line"], sel_start["character"]) # Should find references in main.dart main_dart_refs = [ref for ref in refs if "main.dart" in ref.get("uri", "") or "main.dart" in ref.get("relativePath", "")] # Note: This may not always work depending on language server capabilities # So we don't assert - just verify the structure if we get results if main_dart_refs: for ref in main_dart_refs: assert "range" in ref or "location" in ref ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/omnisharp/initialize_params.json: -------------------------------------------------------------------------------- ```json { "_description": "The parameters sent by the client when initializing the language server with the \"initialize\" request. More details at https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize", "processId": "os.getpid()", "clientInfo": { "name": "Visual Studio Code - Insiders", "version": "1.82.0-insider" }, "locale": "en", "rootPath": "$rootPath", "rootUri": "$rootUri", "capabilities": { "workspace": { "applyEdit": true, "workspaceEdit": { "documentChanges": true, "resourceOperations": [ "create", "rename", "delete" ], "failureHandling": "textOnlyTransactional", "normalizesLineEndings": true, "changeAnnotationSupport": { "groupsOnLabel": true } }, "configuration": false, "didChangeWatchedFiles": { "dynamicRegistration": true, "relativePatternSupport": true }, "symbol": { "dynamicRegistration": true, "symbolKind": { "valueSet": [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26 ] }, "tagSupport": { "valueSet": [ 1 ] }, "resolveSupport": { "properties": [ "location.range" ] } }, "codeLens": { "refreshSupport": true }, "executeCommand": { "dynamicRegistration": true }, "didChangeConfiguration": { "dynamicRegistration": true }, "workspaceFolders": true, "semanticTokens": { "refreshSupport": true }, "fileOperations": { "dynamicRegistration": true, "didCreate": true, "didRename": true, "didDelete": true, "willCreate": true, "willRename": true, "willDelete": true }, "inlineValue": { "refreshSupport": true }, "inlayHint": { "refreshSupport": true }, "diagnostics": { "refreshSupport": true } }, "textDocument": { "publishDiagnostics": { "relatedInformation": true, "versionSupport": false, "tagSupport": { "valueSet": [ 1, 2 ] }, "codeDescriptionSupport": true, "dataSupport": true }, "synchronization": { "dynamicRegistration": true, "willSave": true, "willSaveWaitUntil": true, "didSave": true }, "completion": { "dynamicRegistration": true, "contextSupport": true, "completionItem": { "snippetSupport": true, "commitCharactersSupport": true, "documentationFormat": [ "markdown", "plaintext" ], "deprecatedSupport": true, "preselectSupport": true, "tagSupport": { "valueSet": [ 1 ] }, "insertReplaceSupport": true, "resolveSupport": { "properties": [ "documentation", "detail", "additionalTextEdits" ] }, "insertTextModeSupport": { "valueSet": [ 1, 2 ] }, "labelDetailsSupport": true }, "insertTextMode": 2, "completionItemKind": { "valueSet": [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25 ] }, "completionList": { "itemDefaults": [ "commitCharacters", "editRange", "insertTextFormat", "insertTextMode" ] } }, "hover": { "dynamicRegistration": true, "contentFormat": [ "markdown", "plaintext" ] }, "signatureHelp": { "dynamicRegistration": true, "signatureInformation": { "documentationFormat": [ "markdown", "plaintext" ], "parameterInformation": { "labelOffsetSupport": true }, "activeParameterSupport": true }, "contextSupport": true }, "definition": { "dynamicRegistration": true, "linkSupport": true }, "references": { "dynamicRegistration": true }, "documentHighlight": { "dynamicRegistration": true }, "documentSymbol": { "dynamicRegistration": true, "symbolKind": { "valueSet": [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26 ] }, "hierarchicalDocumentSymbolSupport": true, "tagSupport": { "valueSet": [ 1 ] }, "labelSupport": true }, "codeAction": { "dynamicRegistration": true, "isPreferredSupport": true, "disabledSupport": true, "dataSupport": true, "resolveSupport": { "properties": [ "edit" ] }, "codeActionLiteralSupport": { "codeActionKind": { "valueSet": [ "", "quickfix", "refactor", "refactor.extract", "refactor.inline", "refactor.rewrite", "source", "source.organizeImports" ] } }, "honorsChangeAnnotations": false }, "codeLens": { "dynamicRegistration": true }, "formatting": { "dynamicRegistration": true }, "rangeFormatting": { "dynamicRegistration": true }, "onTypeFormatting": { "dynamicRegistration": true }, "rename": { "dynamicRegistration": true, "prepareSupport": true, "prepareSupportDefaultBehavior": 1, "honorsChangeAnnotations": true }, "documentLink": { "dynamicRegistration": true, "tooltipSupport": true }, "typeDefinition": { "dynamicRegistration": true, "linkSupport": true }, "implementation": { "dynamicRegistration": true, "linkSupport": true }, "colorProvider": { "dynamicRegistration": true }, "foldingRange": { "dynamicRegistration": true, "rangeLimit": 5000, "lineFoldingOnly": true, "foldingRangeKind": { "valueSet": [ "comment", "imports", "region" ] }, "foldingRange": { "collapsedText": false } }, "declaration": { "dynamicRegistration": true, "linkSupport": true }, "selectionRange": { "dynamicRegistration": true }, "callHierarchy": { "dynamicRegistration": true }, "semanticTokens": { "dynamicRegistration": true, "tokenTypes": [ "namespace", "type", "class", "enum", "interface", "struct", "typeParameter", "parameter", "variable", "property", "enumMember", "event", "function", "method", "macro", "keyword", "modifier", "comment", "string", "number", "regexp", "operator", "decorator" ], "tokenModifiers": [ "declaration", "definition", "readonly", "static", "deprecated", "abstract", "async", "modification", "documentation", "defaultLibrary" ], "formats": [ "relative" ], "requests": { "range": true, "full": { "delta": true } }, "multilineTokenSupport": false, "overlappingTokenSupport": false, "serverCancelSupport": true, "augmentsSyntaxTokens": false }, "linkedEditingRange": { "dynamicRegistration": true }, "typeHierarchy": { "dynamicRegistration": true }, "inlineValue": { "dynamicRegistration": true }, "inlayHint": { "dynamicRegistration": true, "resolveSupport": { "properties": [ "tooltip", "textEdits", "label.tooltip", "label.location", "label.command" ] } }, "diagnostic": { "dynamicRegistration": true, "relatedDocumentSupport": false } }, "window": { "showMessage": { "messageActionItem": { "additionalPropertiesSupport": true } }, "showDocument": { "support": true }, "workDoneProgress": true }, "general": { "staleRequestSupport": { "cancel": true, "retryOnContentModified": [ "textDocument/semanticTokens/full", "textDocument/semanticTokens/range", "textDocument/semanticTokens/full/delta" ] }, "regularExpressions": { "engine": "ECMAScript", "version": "ES2020" }, "markdown": { "parser": "marked", "version": "1.1.0", "allowedTags": [ "ul", "li", "p", "code", "blockquote", "ol", "h1", "h2", "h3", "h4", "h5", "h6", "hr", "em", "pre", "table", "thead", "tbody", "tr", "th", "td", "div", "del", "a", "strong", "br", "img", "span" ] }, "positionEncodings": [ "utf-16" ] }, "notebookDocument": { "synchronization": { "dynamicRegistration": true, "executionSummarySupport": true } }, "experimental": { "snippetTextEdit": true, "codeActionGroup": true, "hoverActions": true, "serverStatusNotification": true, "colorDiagnosticOutput": true, "openServerLogs": true, "commands": { "commands": [ "editor.action.triggerParameterHints" ] } } }, "initializationOptions": { "RoslynExtensionsOptions": { "EnableDecompilationSupport": false, "EnableAnalyzersSupport": true, "EnableImportCompletion": true, "EnableAsyncCompletion": false, "DocumentAnalysisTimeoutMs": 30000, "DiagnosticWorkersThreadCount": 18, "AnalyzeOpenDocumentsOnly": true, "InlayHintsOptions": { "EnableForParameters": false, "ForLiteralParameters": false, "ForIndexerParameters": false, "ForObjectCreationParameters": false, "ForOtherParameters": false, "SuppressForParametersThatDifferOnlyBySuffix": false, "SuppressForParametersThatMatchMethodIntent": false, "SuppressForParametersThatMatchArgumentName": false, "EnableForTypes": false, "ForImplicitVariableTypes": false, "ForLambdaParameterTypes": false, "ForImplicitObjectCreation": false }, "LocationPaths": null }, "FormattingOptions": { "OrganizeImports": false, "EnableEditorConfigSupport": true, "NewLine": "\n", "UseTabs": false, "TabSize": 4, "IndentationSize": 4, "SpacingAfterMethodDeclarationName": false, "SeparateImportDirectiveGroups": false, "SpaceWithinMethodDeclarationParenthesis": false, "SpaceBetweenEmptyMethodDeclarationParentheses": false, "SpaceAfterMethodCallName": false, "SpaceWithinMethodCallParentheses": false, "SpaceBetweenEmptyMethodCallParentheses": false, "SpaceAfterControlFlowStatementKeyword": true, "SpaceWithinExpressionParentheses": false, "SpaceWithinCastParentheses": false, "SpaceWithinOtherParentheses": false, "SpaceAfterCast": false, "SpaceBeforeOpenSquareBracket": false, "SpaceBetweenEmptySquareBrackets": false, "SpaceWithinSquareBrackets": false, "SpaceAfterColonInBaseTypeDeclaration": true, "SpaceAfterComma": true, "SpaceAfterDot": false, "SpaceAfterSemicolonsInForStatement": true, "SpaceBeforeColonInBaseTypeDeclaration": true, "SpaceBeforeComma": false, "SpaceBeforeDot": false, "SpaceBeforeSemicolonsInForStatement": false, "SpacingAroundBinaryOperator": "single", "IndentBraces": false, "IndentBlock": true, "IndentSwitchSection": true, "IndentSwitchCaseSection": true, "IndentSwitchCaseSectionWhenBlock": true, "LabelPositioning": "oneLess", "WrappingPreserveSingleLine": true, "WrappingKeepStatementsOnSingleLine": true, "NewLinesForBracesInTypes": true, "NewLinesForBracesInMethods": true, "NewLinesForBracesInProperties": true, "NewLinesForBracesInAccessors": true, "NewLinesForBracesInAnonymousMethods": true, "NewLinesForBracesInControlBlocks": true, "NewLinesForBracesInAnonymousTypes": true, "NewLinesForBracesInObjectCollectionArrayInitializers": true, "NewLinesForBracesInLambdaExpressionBody": true, "NewLineForElse": true, "NewLineForCatch": true, "NewLineForFinally": true, "NewLineForMembersInObjectInit": true, "NewLineForMembersInAnonymousTypes": true, "NewLineForClausesInQuery": true }, "FileOptions": { "SystemExcludeSearchPatterns": [ "**/node_modules/**/*", "**/bin/**/*", "**/obj/**/*", "**/.git/**/*", "**/.git", "**/.svn", "**/.hg", "**/CVS", "**/.DS_Store", "**/Thumbs.db" ], "ExcludeSearchPatterns": [] }, "RenameOptions": { "RenameOverloads": false, "RenameInStrings": false, "RenameInComments": false }, "ImplementTypeOptions": { "InsertionBehavior": 0, "PropertyGenerationBehavior": 0 }, "DotNetCliOptions": { "LocationPaths": null }, "Plugins": { "LocationPaths": null } }, "trace": "verbose", "workspaceFolders": [ { "uri": "$uri", "name": "$name" } ] } ``` -------------------------------------------------------------------------------- /test/solidlsp/erlang/test_erlang_symbol_retrieval.py: -------------------------------------------------------------------------------- ```python """ Tests for the Erlang language server symbol-related functionality. These tests focus on the following methods: - request_containing_symbol - request_referencing_symbols - request_defining_symbol """ import os import pytest from solidlsp import SolidLanguageServer from solidlsp.ls_config import Language from solidlsp.ls_types import SymbolKind from . import ERLANG_LS_UNAVAILABLE, ERLANG_LS_UNAVAILABLE_REASON # These marks will be applied to all tests in this module pytestmark = [ pytest.mark.erlang, pytest.mark.skipif(ERLANG_LS_UNAVAILABLE, reason=f"Erlang LS not available: {ERLANG_LS_UNAVAILABLE_REASON}"), ] class TestErlangLanguageServerSymbols: """Test the Erlang language server's symbol-related functionality.""" @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_containing_symbol_function(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol for a function.""" # Test for a position inside the create_user function file_path = os.path.join("src", "models.erl") # Find the create_user function in the file content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") create_user_line = None for i, line in enumerate(lines): if "create_user(" in line and "-spec" not in line: create_user_line = i + 1 # Go inside the function body break if create_user_line is None: pytest.skip("Could not find create_user function") containing_symbol = language_server.request_containing_symbol(file_path, create_user_line, 10, include_body=True) # Verify that we found the containing symbol if containing_symbol: assert "create_user" in containing_symbol["name"] assert containing_symbol["kind"] == SymbolKind.Method or containing_symbol["kind"] == SymbolKind.Function if "body" in containing_symbol: assert "create_user" in containing_symbol["body"] @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_containing_symbol_module(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol for a module.""" # Test for a position inside the models module but outside any function file_path = os.path.join("src", "models.erl") # Find the module definition content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") module_line = None for i, line in enumerate(lines): if "-module(models)" in line: module_line = i + 2 # Go inside the module break if module_line is None: pytest.skip("Could not find models module") containing_symbol = language_server.request_containing_symbol(file_path, module_line, 5) # Verify that we found the containing symbol if containing_symbol: assert "models" in containing_symbol["name"] or "module" in containing_symbol["name"].lower() assert containing_symbol["kind"] == SymbolKind.Module or containing_symbol["kind"] == SymbolKind.Class @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_containing_symbol_nested(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol with nested scopes.""" # Test for a position inside a function which is inside a module file_path = os.path.join("src", "models.erl") # Find a function inside models module content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") function_body_line = None for i, line in enumerate(lines): if "create_user(" in line and "-spec" not in line: # Go deeper into the function body where there might be case expressions for j in range(i + 1, min(i + 10, len(lines))): if lines[j].strip() and not lines[j].strip().startswith("%"): function_body_line = j break break if function_body_line is None: pytest.skip("Could not find function body") containing_symbol = language_server.request_containing_symbol(file_path, function_body_line, 15) # Verify that we found the innermost containing symbol (the function) if containing_symbol: expected_names = ["create_user", "models"] assert any(name in containing_symbol["name"] for name in expected_names) @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_containing_symbol_none(self, language_server: SolidLanguageServer) -> None: """Test request_containing_symbol for a position with no containing symbol.""" # Test for a position outside any function/module (e.g., in comments) file_path = os.path.join("src", "models.erl") # Line 1-2 are likely module declaration or comments containing_symbol = language_server.request_containing_symbol(file_path, 2, 10) # Should return None or an empty dictionary, or the top-level module # This is acceptable behavior for module-level positions assert containing_symbol is None or containing_symbol == {} or "models" in str(containing_symbol) @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_referencing_symbols_record(self, language_server: SolidLanguageServer) -> None: """Test request_referencing_symbols for a record.""" # Test referencing symbols for user record file_path = os.path.join("include", "records.hrl") symbols = language_server.request_document_symbols(file_path) user_symbol = None for symbol_group in symbols: user_symbol = next((s for s in symbol_group if "user" in s.get("name", "")), None) if user_symbol: break if not user_symbol or "selectionRange" not in user_symbol: pytest.skip("User record symbol or its selectionRange not found") sel_start = user_symbol["selectionRange"]["start"] ref_symbols = [ ref.symbol for ref in language_server.request_referencing_symbols(file_path, sel_start["line"], sel_start["character"]) ] if ref_symbols: models_references = [ symbol for symbol in ref_symbols if "location" in symbol and "uri" in symbol["location"] and "models.erl" in symbol["location"]["uri"] ] # We expect some references from models.erl assert len(models_references) >= 0 # At least attempt to find references @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_referencing_symbols_function(self, language_server: SolidLanguageServer) -> None: """Test request_referencing_symbols for a function.""" # Test referencing symbols for create_user function file_path = os.path.join("src", "models.erl") symbols = language_server.request_document_symbols(file_path) create_user_symbol = None for symbol_group in symbols: create_user_symbol = next((s for s in symbol_group if "create_user" in s.get("name", "")), None) if create_user_symbol: break if not create_user_symbol or "selectionRange" not in create_user_symbol: pytest.skip("create_user function symbol or its selectionRange not found") sel_start = create_user_symbol["selectionRange"]["start"] ref_symbols = [ ref.symbol for ref in language_server.request_referencing_symbols(file_path, sel_start["line"], sel_start["character"]) ] if ref_symbols: # We might find references from services.erl or test files service_references = [ symbol for symbol in ref_symbols if "location" in symbol and "uri" in symbol["location"] and ("services.erl" in symbol["location"]["uri"] or "test" in symbol["location"]["uri"]) ] assert len(service_references) >= 0 # At least attempt to find references @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_referencing_symbols_none(self, language_server: SolidLanguageServer) -> None: """Test request_referencing_symbols for a position with no symbol.""" file_path = os.path.join("src", "models.erl") # Line 3 is likely a blank line or comment try: ref_symbols = [ref.symbol for ref in language_server.request_referencing_symbols(file_path, 3, 0)] # If we get here, make sure we got an empty result assert ref_symbols == [] or ref_symbols is None except Exception: # The method might raise an exception for invalid positions # which is acceptable behavior pass # Tests for request_defining_symbol @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_defining_symbol_function_call(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a function call.""" # Find a place where models:create_user is called in services.erl file_path = os.path.join("src", "services.erl") content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") models_call_line = None for i, line in enumerate(lines): if "models:create_user(" in line: models_call_line = i break if models_call_line is None: pytest.skip("Could not find models:create_user call") # Try to find the definition of models:create_user defining_symbol = language_server.request_defining_symbol(file_path, models_call_line, 20) if defining_symbol: assert "create_user" in defining_symbol.get("name", "") or "models" in defining_symbol.get("name", "") if "location" in defining_symbol and "uri" in defining_symbol["location"]: assert "models.erl" in defining_symbol["location"]["uri"] @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_defining_symbol_record_usage(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a record usage.""" # Find a place where #user{} record is used in models.erl file_path = os.path.join("src", "models.erl") content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") record_usage_line = None for i, line in enumerate(lines): if "#user{" in line: record_usage_line = i break if record_usage_line is None: pytest.skip("Could not find #user{} record usage") defining_symbol = language_server.request_defining_symbol(file_path, record_usage_line, 10) if defining_symbol: assert "user" in defining_symbol.get("name", "").lower() if "location" in defining_symbol and "uri" in defining_symbol["location"]: assert "records.hrl" in defining_symbol["location"]["uri"] @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_defining_symbol_module_call(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a module function call.""" # Find a place where utils:validate_input is called file_path = os.path.join("src", "models.erl") content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") utils_call_line = None for i, line in enumerate(lines): if "validate_email(" in line: utils_call_line = i break if utils_call_line is None: pytest.skip("Could not find function call in models.erl") defining_symbol = language_server.request_defining_symbol(file_path, utils_call_line, 15) if defining_symbol: assert "validate" in defining_symbol.get("name", "") or "email" in defining_symbol.get("name", "") @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_defining_symbol_none(self, language_server: SolidLanguageServer) -> None: """Test request_defining_symbol for a position with no symbol.""" # Test for a position with no symbol (e.g., whitespace or comment) file_path = os.path.join("src", "models.erl") # Line 3 is likely a blank line or comment defining_symbol = language_server.request_defining_symbol(file_path, 3, 0) # Should return None or empty assert defining_symbol is None or defining_symbol == {} @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_symbol_methods_integration(self, language_server: SolidLanguageServer) -> None: """Test integration between different symbol methods.""" file_path = os.path.join("src", "models.erl") # Find create_user function definition content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") create_user_line = None for i, line in enumerate(lines): if "create_user(" in line and "-spec" not in line: create_user_line = i break if create_user_line is None: pytest.skip("Could not find create_user function") # Test containing symbol containing = language_server.request_containing_symbol(file_path, create_user_line + 2, 10) if containing: # Test that we can find references to this symbol if "location" in containing and "range" in containing["location"]: start_pos = containing["location"]["range"]["start"] refs = [ ref.symbol for ref in language_server.request_referencing_symbols(file_path, start_pos["line"], start_pos["character"]) ] # We should find some references or none (both are valid outcomes) assert isinstance(refs, list) @pytest.mark.timeout(120) # Add explicit timeout for this complex test @pytest.mark.xfail( reason="Known intermittent timeout issue in Erlang LS in CI environments. " "May pass locally but can timeout on slower CI systems.", strict=False, ) @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_symbol_tree_structure(self, language_server: SolidLanguageServer) -> None: """Test that symbol tree structure is correctly built.""" symbol_tree = language_server.request_full_symbol_tree() # Should get a tree structure assert len(symbol_tree) > 0 # Should have our test repository structure root = symbol_tree[0] assert "children" in root # Look for src directory src_dir = None for child in root["children"]: if child["name"] == "src": src_dir = child break if src_dir: # Check for our Erlang modules file_names = [child["name"] for child in src_dir.get("children", [])] expected_modules = ["models", "services", "utils", "app"] found_modules = [name for name in expected_modules if any(name in fname for fname in file_names)] assert len(found_modules) > 0, f"Expected to find some modules from {expected_modules}, but got {file_names}" @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_request_dir_overview(self, language_server: SolidLanguageServer) -> None: """Test request_dir_overview functionality.""" src_overview = language_server.request_dir_overview("src") # Should get an overview of the src directory assert src_overview is not None overview_keys = list(src_overview.keys()) if hasattr(src_overview, "keys") else [] src_files = [key for key in overview_keys if key.startswith("src/") or "src" in key] assert len(src_files) > 0, f"Expected to find src/ files in overview keys: {overview_keys}" # Should contain information about our modules overview_text = str(src_overview).lower() expected_terms = ["models", "services", "user", "create_user", "gen_server"] found_terms = [term for term in expected_terms if term in overview_text] assert len(found_terms) > 0, f"Expected to find some terms from {expected_terms} in overview" @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_containing_symbol_of_record_field(self, language_server: SolidLanguageServer) -> None: """Test containing symbol for record field access.""" file_path = os.path.join("src", "models.erl") # Find a record field access like User#user.name content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") record_field_line = None for i, line in enumerate(lines): if "#user{" in line and ("name" in line or "email" in line or "id" in line): record_field_line = i break if record_field_line is None: pytest.skip("Could not find record field access") containing_symbol = language_server.request_containing_symbol(file_path, record_field_line, 10) if containing_symbol: # Should be contained within a function assert "name" in containing_symbol expected_names = ["create_user", "update_user", "format_user_info"] assert any(name in containing_symbol["name"] for name in expected_names) @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_containing_symbol_of_spec(self, language_server: SolidLanguageServer) -> None: """Test containing symbol for function specs.""" file_path = os.path.join("src", "models.erl") # Find a -spec directive content = language_server.retrieve_full_file_content(file_path) lines = content.split("\n") spec_line = None for i, line in enumerate(lines): if line.strip().startswith("-spec") and "create_user" in line: spec_line = i break if spec_line is None: pytest.skip("Could not find -spec directive") containing_symbol = language_server.request_containing_symbol(file_path, spec_line, 5) if containing_symbol: # Should be contained within the module or the function it specifies assert "name" in containing_symbol expected_names = ["models", "create_user"] assert any(name in containing_symbol["name"] for name in expected_names) @pytest.mark.timeout(90) # Add explicit timeout @pytest.mark.xfail( reason="Known intermittent timeout issue in Erlang LS in CI environments. " "May pass locally but can timeout on slower CI systems, especially macOS. " "Similar to known Next LS timeout issues.", strict=False, ) @pytest.mark.parametrize("language_server", [Language.ERLANG], indirect=True) def test_referencing_symbols_across_files(self, language_server: SolidLanguageServer) -> None: """Test finding references across different files.""" # Test that we can find references to models module functions in services.erl file_path = os.path.join("src", "models.erl") symbols = language_server.request_document_symbols(file_path) create_user_symbol = None for symbol_group in symbols: create_user_symbol = next((s for s in symbol_group if "create_user" in s.get("name", "")), None) if create_user_symbol: break if not create_user_symbol or "selectionRange" not in create_user_symbol: pytest.skip("create_user function symbol not found") sel_start = create_user_symbol["selectionRange"]["start"] ref_symbols = [ ref.symbol for ref in language_server.request_referencing_symbols(file_path, sel_start["line"], sel_start["character"]) ] # Look for cross-file references cross_file_refs = [ symbol for symbol in ref_symbols if "location" in symbol and "uri" in symbol["location"] and not symbol["location"]["uri"].endswith("models.erl") ] # We might find references in services.erl or test files if cross_file_refs: assert len(cross_file_refs) > 0, "Should find some cross-file references" ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/ruby_lsp.py: -------------------------------------------------------------------------------- ```python """ Ruby LSP Language Server implementation using Shopify's ruby-lsp. Provides modern Ruby language server capabilities with improved performance. """ import json import logging import os import pathlib import shutil import subprocess import threading from overrides import override from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings class RubyLsp(SolidLanguageServer): """ Provides Ruby specific instantiation of the LanguageServer class using ruby-lsp. Contains various configurations and settings specific to Ruby with modern LSP features. """ def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): """ Creates a RubyLsp instance. This class is not meant to be instantiated directly. Use LanguageServer.create() instead. """ ruby_lsp_executable = self._setup_runtime_dependencies(logger, config, repository_root_path) super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd=ruby_lsp_executable, cwd=repository_root_path), "ruby", solidlsp_settings, ) self.analysis_complete = threading.Event() self.service_ready_event = threading.Event() # Set timeout for ruby-lsp requests - ruby-lsp is fast self.set_request_timeout(30.0) # 30 seconds for initialization and requests @override def is_ignored_dirname(self, dirname: str) -> bool: """Override to ignore Ruby-specific directories that cause performance issues.""" ruby_ignored_dirs = [ "vendor", # Ruby vendor directory ".bundle", # Bundler cache "tmp", # Temporary files "log", # Log files "coverage", # Test coverage reports ".yardoc", # YARD documentation cache "doc", # Generated documentation "node_modules", # Node modules (for Rails with JS) "storage", # Active Storage files (Rails) "public/packs", # Webpacker output "public/webpack", # Webpack output "public/assets", # Rails compiled assets ] return super().is_ignored_dirname(dirname) or dirname in ruby_ignored_dirs @override def _get_wait_time_for_cross_file_referencing(self) -> float: """Override to provide optimal wait time for ruby-lsp cross-file reference resolution. ruby-lsp typically initializes quickly, but may need a brief moment for cross-file analysis in larger projects. """ return 0.5 # 500ms should be sufficient for ruby-lsp @staticmethod def _find_executable_with_extensions(executable_name: str) -> str | None: """ Find executable with Windows-specific extensions (.bat, .cmd, .exe) if on Windows. Returns the full path to the executable or None if not found. """ import platform if platform.system() == "Windows": # Try Windows-specific extensions first for ext in [".bat", ".cmd", ".exe"]: path = shutil.which(f"{executable_name}{ext}") if path: return path # Fall back to default search return shutil.which(executable_name) else: # Unix systems return shutil.which(executable_name) @staticmethod def _setup_runtime_dependencies(logger: LanguageServerLogger, config: LanguageServerConfig, repository_root_path: str) -> list[str]: """ Setup runtime dependencies for ruby-lsp and return the command list to start the server. Installation strategy: Bundler project > global ruby-lsp > gem install ruby-lsp """ # Detect rbenv-managed Ruby environment # When .ruby-version exists, it indicates the project uses rbenv for version management. # rbenv automatically reads .ruby-version to determine which Ruby version to use. # Using "rbenv exec" ensures commands run with the correct Ruby version and its gems. # # Why rbenv is preferred over system Ruby: # - Respects project-specific Ruby versions # - Avoids bundler version mismatches between system and project # - Ensures consistent environment across developers # # Fallback behavior: # If .ruby-version doesn't exist or rbenv isn't installed, we fall back to system Ruby. # This may cause issues if: # - System Ruby version differs from what the project expects # - System bundler version is incompatible with Gemfile.lock # - Project gems aren't installed in system Ruby ruby_version_file = os.path.join(repository_root_path, ".ruby-version") use_rbenv = os.path.exists(ruby_version_file) and shutil.which("rbenv") is not None if use_rbenv: ruby_cmd = ["rbenv", "exec", "ruby"] bundle_cmd = ["rbenv", "exec", "bundle"] logger.log(f"Using rbenv-managed Ruby (found {ruby_version_file})", logging.INFO) else: ruby_cmd = ["ruby"] bundle_cmd = ["bundle"] if os.path.exists(ruby_version_file): logger.log( f"Found {ruby_version_file} but rbenv is not installed. " "Using system Ruby. Consider installing rbenv for better version management: https://github.com/rbenv/rbenv", logging.WARNING, ) else: logger.log("No .ruby-version file found, using system Ruby", logging.INFO) # Check if Ruby is installed try: result = subprocess.run(ruby_cmd + ["--version"], check=True, capture_output=True, cwd=repository_root_path, text=True) ruby_version = result.stdout.strip() logger.log(f"Ruby version: {ruby_version}", logging.INFO) # Extract version number for compatibility checks import re version_match = re.search(r"ruby (\d+)\.(\d+)\.(\d+)", ruby_version) if version_match: major, minor, patch = map(int, version_match.groups()) if major < 2 or (major == 2 and minor < 6): logger.log(f"Warning: Ruby {major}.{minor}.{patch} detected. ruby-lsp works best with Ruby 2.6+", logging.WARNING) except subprocess.CalledProcessError as e: error_msg = e.stderr if isinstance(e.stderr, str) else e.stderr.decode() if e.stderr else "Unknown error" raise RuntimeError( f"Error checking Ruby installation: {error_msg}. Please ensure Ruby is properly installed and in PATH." ) from e except FileNotFoundError as e: raise RuntimeError( "Ruby is not installed or not found in PATH. Please install Ruby using one of these methods:\n" " - Using rbenv: rbenv install 3.0.0 && rbenv global 3.0.0\n" " - Using RVM: rvm install 3.0.0 && rvm use 3.0.0 --default\n" " - Using asdf: asdf install ruby 3.0.0 && asdf global ruby 3.0.0\n" " - System package manager (brew install ruby, apt install ruby, etc.)" ) from e # Check for Bundler project (Gemfile exists) gemfile_path = os.path.join(repository_root_path, "Gemfile") gemfile_lock_path = os.path.join(repository_root_path, "Gemfile.lock") is_bundler_project = os.path.exists(gemfile_path) if is_bundler_project: logger.log("Detected Bundler project (Gemfile found)", logging.INFO) # Check if bundle command is available using Windows-compatible search bundle_path = RubyLsp._find_executable_with_extensions(bundle_cmd[0] if len(bundle_cmd) == 1 else "bundle") if not bundle_path: # Try common bundle executables for bundle_executable in ["bin/bundle", "bundle"]: if bundle_executable.startswith("bin/"): bundle_full_path = os.path.join(repository_root_path, bundle_executable) else: bundle_full_path = RubyLsp._find_executable_with_extensions(bundle_executable) if bundle_full_path and os.path.exists(bundle_full_path): bundle_path = bundle_full_path if bundle_executable.startswith("bin/") else bundle_executable break if not bundle_path: logger.log( "Bundler project detected but 'bundle' command not found. Falling back to global ruby-lsp installation.", logging.WARNING, ) else: # Check if ruby-lsp is in Gemfile.lock ruby_lsp_in_bundle = False if os.path.exists(gemfile_lock_path): try: with open(gemfile_lock_path) as f: content = f.read() ruby_lsp_in_bundle = "ruby-lsp" in content.lower() except Exception as e: logger.log(f"Warning: Could not read Gemfile.lock: {e}", logging.WARNING) if ruby_lsp_in_bundle: logger.log("Found ruby-lsp in Gemfile.lock", logging.INFO) return bundle_cmd + ["exec", "ruby-lsp"] else: logger.log( "ruby-lsp not found in Gemfile.lock. Consider adding 'gem \"ruby-lsp\"' to your Gemfile for better compatibility.", logging.INFO, ) # Fall through to global installation check # Check if ruby-lsp is available globally using Windows-compatible search ruby_lsp_path = RubyLsp._find_executable_with_extensions("ruby-lsp") if ruby_lsp_path: logger.log(f"Found ruby-lsp at: {ruby_lsp_path}", logging.INFO) return [ruby_lsp_path] # Try to install ruby-lsp globally logger.log("ruby-lsp not found, attempting to install globally...", logging.INFO) try: subprocess.run(["gem", "install", "ruby-lsp"], check=True, capture_output=True, cwd=repository_root_path) logger.log("Successfully installed ruby-lsp globally", logging.INFO) # Find the newly installed ruby-lsp executable ruby_lsp_path = RubyLsp._find_executable_with_extensions("ruby-lsp") return [ruby_lsp_path] if ruby_lsp_path else ["ruby-lsp"] except subprocess.CalledProcessError as e: error_msg = e.stderr if isinstance(e.stderr, str) else e.stderr.decode() if e.stderr else str(e) if is_bundler_project: raise RuntimeError( f"Failed to install ruby-lsp globally: {error_msg}\n" "For Bundler projects, please add 'gem \"ruby-lsp\"' to your Gemfile and run 'bundle install'.\n" "Alternatively, install globally: gem install ruby-lsp" ) from e raise RuntimeError(f"Failed to install ruby-lsp: {error_msg}\nPlease try installing manually: gem install ruby-lsp") from e @staticmethod def _detect_rails_project(repository_root_path: str) -> bool: """ Detect if this is a Rails project by checking for Rails-specific files. """ rails_indicators = [ "config/application.rb", "config/environment.rb", "app/controllers/application_controller.rb", "Rakefile", ] for indicator in rails_indicators: if os.path.exists(os.path.join(repository_root_path, indicator)): return True # Check for Rails in Gemfile gemfile_path = os.path.join(repository_root_path, "Gemfile") if os.path.exists(gemfile_path): try: with open(gemfile_path) as f: content = f.read().lower() if "gem 'rails'" in content or 'gem "rails"' in content: return True except Exception: pass return False @staticmethod def _get_ruby_exclude_patterns(repository_root_path: str) -> list[str]: """ Get Ruby and Rails-specific exclude patterns for better performance. """ base_patterns = [ "**/vendor/**", # Ruby vendor directory "**/.bundle/**", # Bundler cache "**/tmp/**", # Temporary files "**/log/**", # Log files "**/coverage/**", # Test coverage reports "**/.yardoc/**", # YARD documentation cache "**/doc/**", # Generated documentation "**/.git/**", # Git directory "**/node_modules/**", # Node modules (for Rails with JS) "**/public/assets/**", # Rails compiled assets ] # Add Rails-specific patterns if this is a Rails project if RubyLsp._detect_rails_project(repository_root_path): base_patterns.extend( [ "**/app/assets/builds/**", # Rails 7+ CSS builds "**/storage/**", # Active Storage "**/public/packs/**", # Webpacker "**/public/webpack/**", # Webpack ] ) return base_patterns def _get_initialize_params(self) -> InitializeParams: """ Returns ruby-lsp specific initialization parameters. """ exclude_patterns = self._get_ruby_exclude_patterns(self.repository_root_path) initialize_params = { "processId": os.getpid(), "rootPath": self.repository_root_path, "rootUri": pathlib.Path(self.repository_root_path).as_uri(), "capabilities": { "workspace": { "workspaceEdit": {"documentChanges": True}, "configuration": True, }, "window": { "workDoneProgress": True, }, "textDocument": { "documentSymbol": { "hierarchicalDocumentSymbolSupport": True, "symbolKind": {"valueSet": list(range(1, 27))}, }, "completion": { "completionItem": { "snippetSupport": True, "commitCharactersSupport": True, } }, }, }, "initializationOptions": { # ruby-lsp enables all features by default, so we don't need to specify enabledFeatures "experimentalFeaturesEnabled": False, "featuresConfiguration": {}, "indexing": { "includedPatterns": ["**/*.rb", "**/*.rake", "**/*.ru", "**/*.erb"], "excludedPatterns": exclude_patterns, }, }, } return initialize_params def _start_server(self) -> None: """ Starts the ruby-lsp Language Server for Ruby """ def register_capability_handler(params: dict) -> None: assert "registrations" in params for registration in params["registrations"]: self.logger.log(f"Registered capability: {registration['method']}", logging.INFO) return def lang_status_handler(params: dict) -> None: self.logger.log(f"LSP: language/status: {params}", logging.INFO) if params.get("type") == "ready": self.logger.log("ruby-lsp service is ready.", logging.INFO) self.analysis_complete.set() self.completions_available.set() def execute_client_command_handler(params: dict) -> list: return [] def do_nothing(params: dict) -> None: return def window_log_message(msg: dict) -> None: self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) def progress_handler(params: dict) -> None: # ruby-lsp sends progress notifications during indexing self.logger.log(f"LSP: $/progress: {params}", logging.DEBUG) if "value" in params: value = params["value"] # Check for completion indicators if value.get("kind") == "end": self.logger.log("ruby-lsp indexing complete ($/progress end)", logging.INFO) self.analysis_complete.set() self.completions_available.set() elif value.get("kind") == "begin": self.logger.log("ruby-lsp indexing started ($/progress begin)", logging.INFO) elif "percentage" in value: percentage = value.get("percentage", 0) self.logger.log(f"ruby-lsp indexing progress: {percentage}%", logging.DEBUG) # Handle direct progress format (fallback) elif "token" in params and "value" in params: token = params.get("token") if isinstance(token, str) and "indexing" in token.lower(): value = params.get("value", {}) if value.get("kind") == "end" or value.get("percentage") == 100: self.logger.log("ruby-lsp indexing complete (token progress)", logging.INFO) self.analysis_complete.set() self.completions_available.set() def window_work_done_progress_create(params: dict) -> None: """Handle workDoneProgress/create requests from ruby-lsp""" self.logger.log(f"LSP: window/workDoneProgress/create: {params}", logging.DEBUG) return {} self.server.on_request("client/registerCapability", register_capability_handler) self.server.on_notification("language/status", lang_status_handler) self.server.on_notification("window/logMessage", window_log_message) self.server.on_request("workspace/executeClientCommand", execute_client_command_handler) self.server.on_notification("$/progress", progress_handler) self.server.on_request("window/workDoneProgress/create", window_work_done_progress_create) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) self.logger.log("Starting ruby-lsp server process", logging.INFO) self.server.start() initialize_params = self._get_initialize_params() self.logger.log( "Sending initialize request from LSP client to LSP server and awaiting response", logging.INFO, ) self.logger.log(f"Sending init params: {json.dumps(initialize_params, indent=4)}", logging.INFO) init_response = self.server.send.initialize(initialize_params) self.logger.log(f"Received init response: {init_response}", logging.INFO) # Verify expected capabilities # Note: ruby-lsp may return textDocumentSync in different formats (number or object) text_document_sync = init_response["capabilities"].get("textDocumentSync") if isinstance(text_document_sync, int): assert text_document_sync in [1, 2], f"Unexpected textDocumentSync value: {text_document_sync}" elif isinstance(text_document_sync, dict): # ruby-lsp returns an object with change property assert "change" in text_document_sync, "textDocumentSync object should have 'change' property" assert "completionProvider" in init_response["capabilities"] self.server.notify.initialized({}) # Wait for ruby-lsp to complete its initial indexing # ruby-lsp has fast indexing self.logger.log("Waiting for ruby-lsp to complete initial indexing...", logging.INFO) if self.analysis_complete.wait(timeout=30.0): self.logger.log("ruby-lsp initial indexing complete, server ready", logging.INFO) else: self.logger.log("Timeout waiting for ruby-lsp indexing completion, proceeding anyway", logging.WARNING) # Fallback: assume indexing is complete after timeout self.analysis_complete.set() self.completions_available.set() def _handle_initialization_response(self, init_response): """ Handle the initialization response from ruby-lsp and validate capabilities. """ if "capabilities" in init_response: capabilities = init_response["capabilities"] # Validate textDocumentSync (ruby-lsp may return different formats) text_document_sync = capabilities.get("textDocumentSync") if isinstance(text_document_sync, int): assert text_document_sync in [1, 2], f"Unexpected textDocumentSync value: {text_document_sync}" elif isinstance(text_document_sync, dict): # ruby-lsp returns an object with change property assert "change" in text_document_sync, "textDocumentSync object should have 'change' property" # Log important capabilities important_capabilities = [ "completionProvider", "hoverProvider", "definitionProvider", "referencesProvider", "documentSymbolProvider", "codeActionProvider", "documentFormattingProvider", "semanticTokensProvider", ] for cap in important_capabilities: if cap in capabilities: self.logger.log(f"ruby-lsp {cap}: available", logging.DEBUG) # Signal that the service is ready self.service_ready_event.set() ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/kotlin_language_server.py: -------------------------------------------------------------------------------- ```python """ Provides Kotlin specific instantiation of the LanguageServer class. Contains various configurations and settings specific to Kotlin. """ import dataclasses import logging import os import pathlib import stat from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.ls_utils import FileUtils, PlatformUtils from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings @dataclasses.dataclass class KotlinRuntimeDependencyPaths: """ Stores the paths to the runtime dependencies of Kotlin Language Server """ java_path: str java_home_path: str kotlin_executable_path: str class KotlinLanguageServer(SolidLanguageServer): """ Provides Kotlin specific instantiation of the LanguageServer class. Contains various configurations and settings specific to Kotlin. """ def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): """ Creates a Kotlin Language Server instance. This class is not meant to be instantiated directly. Use LanguageServer.create() instead. """ runtime_dependency_paths = self._setup_runtime_dependencies(logger, config, solidlsp_settings) self.runtime_dependency_paths = runtime_dependency_paths # Create command to execute the Kotlin Language Server script cmd = [self.runtime_dependency_paths.kotlin_executable_path, "--stdio"] # Set environment variables including JAVA_HOME proc_env = {"JAVA_HOME": self.runtime_dependency_paths.java_home_path} super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd=cmd, env=proc_env, cwd=repository_root_path), "kotlin", solidlsp_settings, ) @classmethod def _setup_runtime_dependencies( cls, logger: LanguageServerLogger, config: LanguageServerConfig, solidlsp_settings: SolidLSPSettings ) -> KotlinRuntimeDependencyPaths: """ Setup runtime dependencies for Kotlin Language Server and return the paths. """ platform_id = PlatformUtils.get_platform_id() # Verify platform support assert ( platform_id.value.startswith("win-") or platform_id.value.startswith("linux-") or platform_id.value.startswith("osx-") ), "Only Windows, Linux and macOS platforms are supported for Kotlin in multilspy at the moment" # Runtime dependency information runtime_dependencies = { "runtimeDependency": { "id": "KotlinLsp", "description": "Kotlin Language Server", "url": "https://download-cdn.jetbrains.com/kotlin-lsp/0.253.10629/kotlin-0.253.10629.zip", "archiveType": "zip", }, "java": { "win-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-win32-x64-1.42.0-561.vsix", "archiveType": "zip", "java_home_path": "extension/jre/21.0.7-win32-x86_64", "java_path": "extension/jre/21.0.7-win32-x86_64/bin/java.exe", }, "linux-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-linux-x64-1.42.0-561.vsix", "archiveType": "zip", "java_home_path": "extension/jre/21.0.7-linux-x86_64", "java_path": "extension/jre/21.0.7-linux-x86_64/bin/java", }, "linux-arm64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-linux-arm64-1.42.0-561.vsix", "archiveType": "zip", "java_home_path": "extension/jre/21.0.7-linux-aarch64", "java_path": "extension/jre/21.0.7-linux-aarch64/bin/java", }, "osx-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-darwin-x64-1.42.0-561.vsix", "archiveType": "zip", "java_home_path": "extension/jre/21.0.7-macosx-x86_64", "java_path": "extension/jre/21.0.7-macosx-x86_64/bin/java", }, "osx-arm64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-darwin-arm64-1.42.0-561.vsix", "archiveType": "zip", "java_home_path": "extension/jre/21.0.7-macosx-aarch64", "java_path": "extension/jre/21.0.7-macosx-aarch64/bin/java", }, }, } kotlin_dependency = runtime_dependencies["runtimeDependency"] java_dependency = runtime_dependencies["java"][platform_id.value] # Setup paths for dependencies static_dir = os.path.join(cls.ls_resources_dir(solidlsp_settings), "kotlin_language_server") os.makedirs(static_dir, exist_ok=True) # Setup Java paths java_dir = os.path.join(static_dir, "java") os.makedirs(java_dir, exist_ok=True) java_home_path = os.path.join(java_dir, java_dependency["java_home_path"]) java_path = os.path.join(java_dir, java_dependency["java_path"]) # Download and extract Java if not exists if not os.path.exists(java_path): logger.log(f"Downloading Java for {platform_id.value}...", logging.INFO) FileUtils.download_and_extract_archive(logger, java_dependency["url"], java_dir, java_dependency["archiveType"]) # Make Java executable if not platform_id.value.startswith("win-"): os.chmod(java_path, 0o755) assert os.path.exists(java_path), f"Java executable not found at {java_path}" # Setup Kotlin Language Server paths kotlin_ls_dir = static_dir # Get platform-specific executable script path if platform_id.value.startswith("win-"): kotlin_script = os.path.join(kotlin_ls_dir, "kotlin-lsp.cmd") else: kotlin_script = os.path.join(kotlin_ls_dir, "kotlin-lsp.sh") # Download and extract Kotlin Language Server if script doesn't exist if not os.path.exists(kotlin_script): logger.log("Downloading Kotlin Language Server...", logging.INFO) FileUtils.download_and_extract_archive(logger, kotlin_dependency["url"], static_dir, kotlin_dependency["archiveType"]) # Make script executable on Unix platforms if os.path.exists(kotlin_script) and not platform_id.value.startswith("win-"): os.chmod( kotlin_script, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH ) # Use script file if os.path.exists(kotlin_script): kotlin_executable_path = kotlin_script logger.log(f"Using Kotlin Language Server script at {kotlin_script}", logging.INFO) else: raise FileNotFoundError(f"Kotlin Language Server script not found at {kotlin_script}") return KotlinRuntimeDependencyPaths( java_path=java_path, java_home_path=java_home_path, kotlin_executable_path=kotlin_executable_path ) @staticmethod def _get_initialize_params(repository_absolute_path: str) -> InitializeParams: """ Returns the initialize params for the Kotlin Language Server. """ if not os.path.isabs(repository_absolute_path): repository_absolute_path = os.path.abspath(repository_absolute_path) root_uri = pathlib.Path(repository_absolute_path).as_uri() initialize_params = { "clientInfo": {"name": "Multilspy Kotlin Client", "version": "1.0.0"}, "locale": "en", "rootPath": repository_absolute_path, "rootUri": root_uri, "capabilities": { "workspace": { "applyEdit": True, "workspaceEdit": { "documentChanges": True, "resourceOperations": ["create", "rename", "delete"], "failureHandling": "textOnlyTransactional", "normalizesLineEndings": True, "changeAnnotationSupport": {"groupsOnLabel": True}, }, "didChangeConfiguration": {"dynamicRegistration": True}, "didChangeWatchedFiles": {"dynamicRegistration": True, "relativePatternSupport": True}, "symbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, "tagSupport": {"valueSet": [1]}, "resolveSupport": {"properties": ["location.range"]}, }, "codeLens": {"refreshSupport": True}, "executeCommand": {"dynamicRegistration": True}, "configuration": True, "workspaceFolders": True, "semanticTokens": {"refreshSupport": True}, "fileOperations": { "dynamicRegistration": True, "didCreate": True, "didRename": True, "didDelete": True, "willCreate": True, "willRename": True, "willDelete": True, }, "inlineValue": {"refreshSupport": True}, "inlayHint": {"refreshSupport": True}, "diagnostics": {"refreshSupport": True}, }, "textDocument": { "publishDiagnostics": { "relatedInformation": True, "versionSupport": False, "tagSupport": {"valueSet": [1, 2]}, "codeDescriptionSupport": True, "dataSupport": True, }, "synchronization": {"dynamicRegistration": True, "willSave": True, "willSaveWaitUntil": True, "didSave": True}, "completion": { "dynamicRegistration": True, "contextSupport": True, "completionItem": { "snippetSupport": False, "commitCharactersSupport": True, "documentationFormat": ["markdown", "plaintext"], "deprecatedSupport": True, "preselectSupport": True, "tagSupport": {"valueSet": [1]}, "insertReplaceSupport": False, "resolveSupport": {"properties": ["documentation", "detail", "additionalTextEdits"]}, "insertTextModeSupport": {"valueSet": [1, 2]}, "labelDetailsSupport": True, }, "insertTextMode": 2, "completionItemKind": { "valueSet": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25] }, "completionList": {"itemDefaults": ["commitCharacters", "editRange", "insertTextFormat", "insertTextMode"]}, }, "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, "signatureHelp": { "dynamicRegistration": True, "signatureInformation": { "documentationFormat": ["markdown", "plaintext"], "parameterInformation": {"labelOffsetSupport": True}, "activeParameterSupport": True, }, "contextSupport": True, }, "definition": {"dynamicRegistration": True, "linkSupport": True}, "references": {"dynamicRegistration": True}, "documentHighlight": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, "hierarchicalDocumentSymbolSupport": True, "tagSupport": {"valueSet": [1]}, "labelSupport": True, }, "codeAction": { "dynamicRegistration": True, "isPreferredSupport": True, "disabledSupport": True, "dataSupport": True, "resolveSupport": {"properties": ["edit"]}, "codeActionLiteralSupport": { "codeActionKind": { "valueSet": [ "", "quickfix", "refactor", "refactor.extract", "refactor.inline", "refactor.rewrite", "source", "source.organizeImports", ] } }, "honorsChangeAnnotations": False, }, "codeLens": {"dynamicRegistration": True}, "formatting": {"dynamicRegistration": True}, "rangeFormatting": {"dynamicRegistration": True}, "onTypeFormatting": {"dynamicRegistration": True}, "rename": { "dynamicRegistration": True, "prepareSupport": True, "prepareSupportDefaultBehavior": 1, "honorsChangeAnnotations": True, }, "documentLink": {"dynamicRegistration": True, "tooltipSupport": True}, "typeDefinition": {"dynamicRegistration": True, "linkSupport": True}, "implementation": {"dynamicRegistration": True, "linkSupport": True}, "colorProvider": {"dynamicRegistration": True}, "foldingRange": { "dynamicRegistration": True, "rangeLimit": 5000, "lineFoldingOnly": True, "foldingRangeKind": {"valueSet": ["comment", "imports", "region"]}, "foldingRange": {"collapsedText": False}, }, "declaration": {"dynamicRegistration": True, "linkSupport": True}, "selectionRange": {"dynamicRegistration": True}, "callHierarchy": {"dynamicRegistration": True}, "semanticTokens": { "dynamicRegistration": True, "tokenTypes": [ "namespace", "type", "class", "enum", "interface", "struct", "typeParameter", "parameter", "variable", "property", "enumMember", "event", "function", "method", "macro", "keyword", "modifier", "comment", "string", "number", "regexp", "operator", "decorator", ], "tokenModifiers": [ "declaration", "definition", "readonly", "static", "deprecated", "abstract", "async", "modification", "documentation", "defaultLibrary", ], "formats": ["relative"], "requests": {"range": True, "full": {"delta": True}}, "multilineTokenSupport": False, "overlappingTokenSupport": False, "serverCancelSupport": True, "augmentsSyntaxTokens": True, }, "linkedEditingRange": {"dynamicRegistration": True}, "typeHierarchy": {"dynamicRegistration": True}, "inlineValue": {"dynamicRegistration": True}, "inlayHint": { "dynamicRegistration": True, "resolveSupport": {"properties": ["tooltip", "textEdits", "label.tooltip", "label.location", "label.command"]}, }, "diagnostic": {"dynamicRegistration": True, "relatedDocumentSupport": False}, }, "window": { "showMessage": {"messageActionItem": {"additionalPropertiesSupport": True}}, "showDocument": {"support": True}, "workDoneProgress": True, }, "general": { "staleRequestSupport": { "cancel": True, "retryOnContentModified": [ "textDocument/semanticTokens/full", "textDocument/semanticTokens/range", "textDocument/semanticTokens/full/delta", ], }, "regularExpressions": {"engine": "ECMAScript", "version": "ES2020"}, "markdown": {"parser": "marked", "version": "1.1.0"}, "positionEncodings": ["utf-16"], }, "notebookDocument": {"synchronization": {"dynamicRegistration": True, "executionSummarySupport": True}}, }, "initializationOptions": { "workspaceFolders": [root_uri], "storagePath": None, "codegen": {"enabled": False}, "compiler": {"jvm": {"target": "default"}}, "completion": {"snippets": {"enabled": True}}, "diagnostics": {"enabled": True, "level": 4, "debounceTime": 250}, "scripts": {"enabled": True, "buildScriptsEnabled": True}, "indexing": {"enabled": True}, "externalSources": {"useKlsScheme": False, "autoConvertToKotlin": False}, "inlayHints": {"typeHints": False, "parameterHints": False, "chainedHints": False}, "formatting": { "formatter": "ktfmt", "ktfmt": { "style": "google", "indent": 4, "maxWidth": 100, "continuationIndent": 8, "removeUnusedImports": True, }, }, }, "trace": "verbose", "processId": os.getpid(), "workspaceFolders": [ { "uri": root_uri, "name": os.path.basename(repository_absolute_path), } ], } return initialize_params def _start_server(self): """ Starts the Kotlin Language Server """ def execute_client_command_handler(params): return [] def do_nothing(params): return def window_log_message(msg): self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) self.server.on_request("client/registerCapability", do_nothing) self.server.on_notification("language/status", do_nothing) self.server.on_notification("window/logMessage", window_log_message) self.server.on_request("workspace/executeClientCommand", execute_client_command_handler) self.server.on_notification("$/progress", do_nothing) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) self.server.on_notification("language/actionableNotification", do_nothing) self.logger.log("Starting Kotlin server process", logging.INFO) self.server.start() initialize_params = self._get_initialize_params(self.repository_root_path) self.logger.log( "Sending initialize request from LSP client to LSP server and awaiting response", logging.INFO, ) init_response = self.server.send.initialize(initialize_params) capabilities = init_response["capabilities"] assert "textDocumentSync" in capabilities, "Server must support textDocumentSync" assert "hoverProvider" in capabilities, "Server must support hover" assert "completionProvider" in capabilities, "Server must support code completion" assert "signatureHelpProvider" in capabilities, "Server must support signature help" assert "definitionProvider" in capabilities, "Server must support go to definition" assert "referencesProvider" in capabilities, "Server must support find references" assert "documentSymbolProvider" in capabilities, "Server must support document symbols" assert "workspaceSymbolProvider" in capabilities, "Server must support workspace symbols" assert "semanticTokensProvider" in capabilities, "Server must support semantic tokens" self.server.notify.initialized({}) self.completions_available.set() ``` -------------------------------------------------------------------------------- /src/serena/resources/dashboard/dashboard.js: -------------------------------------------------------------------------------- ```javascript class LogMessage { constructor(message, toolNames) { message = this.escapeHtml(message); const logLevel = this.determineLogLevel(message); const highlightedMessage = this.highlightToolNames(message, toolNames); this.$elem = $('<div>').addClass('log-' + logLevel).html(highlightedMessage + '\n'); } determineLogLevel(message) { if (message.startsWith('DEBUG')) { return 'debug'; } else if (message.startsWith('INFO')) { return 'info'; } else if (message.startsWith('WARNING')) { return 'warning'; } else if (message.startsWith('ERROR')) { return 'error'; } else { return 'default'; } } highlightToolNames(message, toolNames) { let highlightedMessage = message; toolNames.forEach(function(toolName) { const regex = new RegExp('\\b' + toolName + '\\b', 'gi'); highlightedMessage = highlightedMessage.replace(regex, '<span class="tool-name">' + toolName + '</span>'); }); return highlightedMessage; } escapeHtml (convertString) { if (typeof convertString !== 'string') return convertString; const patterns = { '<' : '<', '>' : '>', '&' : '&', '"' : '"', '\'' : ''', '`' : '`' }; return convertString.replace(/[<>&"'`]/g, match => patterns[match]); }; } class Dashboard { constructor() { let self = this; this.toolNames = []; this.currentMaxIdx = -1; this.pollInterval = null; this.failureCount = 0; this.$logContainer = $('#log-container'); this.$errorContainer = $('#error-container'); this.$loadButton = $('#load-logs'); this.$shutdownButton = $('#shutdown'); this.$toggleStats = $('#toggle-stats'); this.$statsSection = $('#stats-section'); this.$refreshStats = $('#refresh-stats'); this.$clearStats = $('#clear-stats'); this.$themeToggle = $('#theme-toggle'); this.$themeIcon = $('#theme-icon'); this.$themeText = $('#theme-text'); this.countChart = null; this.tokensChart = null; this.inputChart = null; this.outputChart = null; // register event handlers this.$loadButton.click(this.loadLogs.bind(this)); this.$shutdownButton.click(this.shutdown.bind(this)); this.$toggleStats.click(this.toggleStats.bind(this)); this.$refreshStats.click(this.loadStats.bind(this)); this.$clearStats.click(this.clearStats.bind(this)); this.$themeToggle.click(this.toggleTheme.bind(this)); // initialize theme this.initializeTheme(); // initialize the application this.loadToolNames().then(function() { // Load logs on page load after tool names are loaded self.loadLogs(); }); } displayLogMessage(message) { $('#log-container').append(new LogMessage(message, this.toolNames).$elem); } loadToolNames() { let self = this; return $.ajax({ url: '/get_tool_names', type: 'GET', success: function(response) { self.toolNames = response.tool_names || []; console.log('Loaded tool names:', self.toolNames); }, error: function(xhr, status, error) { console.error('Error loading tool names:', error); } }); } updateTitle(activeProject) { document.title = activeProject ? `${activeProject} – Serena Dashboard` : 'Serena Dashboard'; } loadLogs() { console.log("Loading logs"); let self = this; // Disable button and show loading state self.$loadButton.prop('disabled', true).text('Loading...'); self.$errorContainer.empty(); // Make API call $.ajax({ url: '/get_log_messages', type: 'POST', contentType: 'application/json', data: JSON.stringify({ start_idx: 0 }), success: function(response) { // Clear existing logs self.$logContainer.empty(); // Update max_idx self.currentMaxIdx = response.max_idx || -1; // Display each log message if (response.messages && response.messages.length > 0) { response.messages.forEach(function(message) { self.displayLogMessage(message); }); // Auto-scroll to bottom const logContainer = $('#log-container')[0]; logContainer.scrollTop = logContainer.scrollHeight; } else { $('#log-container').html('<div class="loading">No log messages found.</div>'); } self.updateTitle(response.active_project); // Start periodic polling for new logs self.startPeriodicPolling(); }, error: function(xhr, status, error) { console.error('Error loading logs:', error); self.$errorContainer.html('<div class="error-message">Error loading logs: ' + (xhr.responseJSON ? xhr.responseJSON.detail : error) + '</div>'); }, complete: function() { // Re-enable button self.$loadButton.prop('disabled', false).text('Reload Log'); } }); } pollForNewLogs() { let self = this; console.log("Polling logs", this.currentMaxIdx); $.ajax({ url: '/get_log_messages', type: 'POST', contentType: 'application/json', data: JSON.stringify({ start_idx: self.currentMaxIdx + 1 }), success: function(response) { self.failureCount = 0; // Only append new messages if we have any if (response.messages && response.messages.length > 0) { let wasAtBottom = false; const logContainer = $('#log-container')[0]; // Check if user was at the bottom before adding new logs if (logContainer.scrollHeight > 0) { wasAtBottom = (logContainer.scrollTop + logContainer.clientHeight) >= (logContainer.scrollHeight - 10); } // Append new messages response.messages.forEach(function(message) { self.displayLogMessage(message); }); // Update max_idx self.currentMaxIdx = response.max_idx || self.currentMaxIdx; // Auto-scroll to bottom if user was already at bottom if (wasAtBottom) { logContainer.scrollTop = logContainer.scrollHeight; } } else { // Update max_idx even if no new messages self.currentMaxIdx = response.max_idx || self.currentMaxIdx; } // Update window title with active project self.updateTitle(response.active_project); }, error: function(xhr, status, error) { console.error('Error polling for new logs:', error); self.failureCount++; if (self.failureCount >= 3) { console.log('Server appears to be down, closing tab'); window.close(); } } }); } startPeriodicPolling() { // Clear any existing interval if (this.pollInterval) { clearInterval(this.pollInterval); } // Start polling every second (1000ms) this.pollInterval = setInterval(this.pollForNewLogs.bind(this), 1000); } toggleStats() { if (this.$statsSection.is(':visible')) { this.$statsSection.hide(); this.$toggleStats.text('Show Stats'); } else { this.$statsSection.show(); this.$toggleStats.text('Hide Stats'); this.loadStats(); } } loadStats() { let self = this; $.when( $.ajax({ url: '/get_tool_stats', type: 'GET' }), $.ajax({ url: '/get_token_count_estimator_name', type: 'GET' }) ).done(function(statsResp, estimatorResp) { const stats = statsResp[0].stats; const tokenCountEstimatorName = estimatorResp[0].token_count_estimator_name; self.displayStats(stats, tokenCountEstimatorName); }).fail(function() { console.error('Error loading stats or estimator name'); }); } clearStats() { let self = this; $.ajax({ url: '/clear_tool_stats', type: 'POST', success: function() { self.loadStats(); }, error: function(xhr, status, error) { console.error('Error clearing stats:', error); } }); } displayStats(stats, tokenCountEstimatorName) { const names = Object.keys(stats); // If no stats collected if (names.length === 0) { // hide summary, charts, estimator name $('#stats-summary').hide(); $('#estimator-name').hide(); $('.charts-container').hide(); // show no-stats message $('#no-stats-message').show(); return; } else { // Ensure everything is visible $('#estimator-name').show(); $('#stats-summary').show(); $('.charts-container').show(); $('#no-stats-message').hide(); } $('#estimator-name').html(`<strong>Token count estimator:</strong> ${tokenCountEstimatorName}`); const counts = names.map(n => stats[n].num_times_called); const inputTokens = names.map(n => stats[n].input_tokens); const outputTokens = names.map(n => stats[n].output_tokens); const totalTokens = names.map(n => stats[n].input_tokens + stats[n].output_tokens); // Calculate totals for summary table const totalCalls = counts.reduce((sum, count) => sum + count, 0); const totalInputTokens = inputTokens.reduce((sum, tokens) => sum + tokens, 0); const totalOutputTokens = outputTokens.reduce((sum, tokens) => sum + tokens, 0); // Generate consistent colors for tools const colors = this.generateColors(names.length); const countCtx = document.getElementById('count-chart'); const tokensCtx = document.getElementById('tokens-chart'); const inputCtx = document.getElementById('input-chart'); const outputCtx = document.getElementById('output-chart'); if (this.countChart) this.countChart.destroy(); if (this.tokensChart) this.tokensChart.destroy(); if (this.inputChart) this.inputChart.destroy(); if (this.outputChart) this.outputChart.destroy(); // Update summary table this.updateSummaryTable(totalCalls, totalInputTokens, totalOutputTokens); // Register datalabels plugin Chart.register(ChartDataLabels); // Get theme-aware colors const isDark = document.documentElement.getAttribute('data-theme') === 'dark'; const textColor = isDark ? '#ffffff' : '#000000'; const gridColor = isDark ? '#444' : '#ddd'; // Tool calls pie chart this.countChart = new Chart(countCtx, { type: 'pie', data: { labels: names, datasets: [{ data: counts, backgroundColor: colors }] }, options: { plugins: { legend: { display: true, labels: { color: textColor } }, datalabels: { display: true, color: 'white', font: { weight: 'bold' }, formatter: (value) => value } } } }); // Input tokens pie chart this.inputChart = new Chart(inputCtx, { type: 'pie', data: { labels: names, datasets: [{ data: inputTokens, backgroundColor: colors }] }, options: { plugins: { legend: { display: true, labels: { color: textColor } }, datalabels: { display: true, color: 'white', font: { weight: 'bold' }, formatter: (value) => value } } } }); // Output tokens pie chart this.outputChart = new Chart(outputCtx, { type: 'pie', data: { labels: names, datasets: [{ data: outputTokens, backgroundColor: colors }] }, options: { plugins: { legend: { display: true, labels: { color: textColor } }, datalabels: { display: true, color: 'white', font: { weight: 'bold' }, formatter: (value) => value } } } }); // Combined input/output tokens bar chart this.tokensChart = new Chart(tokensCtx, { type: 'bar', data: { labels: names, datasets: [ { label: 'Input Tokens', data: inputTokens, backgroundColor: colors.map(color => color + '80'), // Semi-transparent borderColor: colors, borderWidth: 2, borderSkipped: false, yAxisID: 'y' }, { label: 'Output Tokens', data: outputTokens, backgroundColor: colors, yAxisID: 'y1' } ] }, options: { responsive: true, plugins: { legend: { labels: { color: textColor } } }, scales: { x: { ticks: { color: textColor }, grid: { color: gridColor } }, y: { type: 'linear', display: true, position: 'left', beginAtZero: true, title: { display: true, text: 'Input Tokens', color: textColor }, ticks: { color: textColor }, grid: { color: gridColor } }, y1: { type: 'linear', display: true, position: 'right', beginAtZero: true, title: { display: true, text: 'Output Tokens', color: textColor }, ticks: { color: textColor }, grid: { drawOnChartArea: false, color: gridColor } } } } }); } generateColors(count) { const colors = [ '#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF', '#FF9F40', '#FF6384', '#C9CBCF', '#4BC0C0', '#FF6384' ]; return Array.from({length: count}, (_, i) => colors[i % colors.length]); } updateSummaryTable(totalCalls, totalInputTokens, totalOutputTokens) { const tableHtml = ` <table class="stats-summary"> <tr><th>Metric</th><th>Total</th></tr> <tr><td>Tool Calls</td><td>${totalCalls}</td></tr> <tr><td>Input Tokens</td><td>${totalInputTokens}</td></tr> <tr><td>Output Tokens</td><td>${totalOutputTokens}</td></tr> <tr><td>Total Tokens</td><td>${totalInputTokens + totalOutputTokens}</td></tr> </table> `; $('#stats-summary').html(tableHtml); } initializeTheme() { // Check if user has manually set a theme preference const savedTheme = localStorage.getItem('serena-theme'); if (savedTheme) { // User has manually set a preference, use it this.setTheme(savedTheme); } else { // No manual preference, detect system color scheme this.detectSystemTheme(); } // Listen for system theme changes this.setupSystemThemeListener(); } detectSystemTheme() { // Check if system prefers dark mode const prefersDark = window.matchMedia('(prefers-color-scheme: dark)').matches; const theme = prefersDark ? 'dark' : 'light'; this.setTheme(theme); } setupSystemThemeListener() { // Listen for changes in system color scheme const mediaQuery = window.matchMedia('(prefers-color-scheme: dark)'); const handleSystemThemeChange = (e) => { // Only auto-switch if user hasn't manually set a preference const savedTheme = localStorage.getItem('serena-theme'); if (!savedTheme) { const newTheme = e.matches ? 'dark' : 'light'; this.setTheme(newTheme); } }; // Add listener for system theme changes if (mediaQuery.addEventListener) { mediaQuery.addEventListener('change', handleSystemThemeChange); } else { // Fallback for older browsers mediaQuery.addListener(handleSystemThemeChange); } } toggleTheme() { const currentTheme = document.documentElement.getAttribute('data-theme') || 'light'; const newTheme = currentTheme === 'light' ? 'dark' : 'light'; // When user manually toggles, save their preference localStorage.setItem('serena-theme', newTheme); this.setTheme(newTheme); } setTheme(theme) { // Set the theme on the document element document.documentElement.setAttribute('data-theme', theme); // Update the toggle button if (theme === 'dark') { this.$themeIcon.text('☀️'); this.$themeText.text('Light'); } else { this.$themeIcon.text('🌙'); this.$themeText.text('Dark'); } // Update the logo based on theme this.updateLogo(theme); // Save to localStorage localStorage.setItem('serena-theme', theme); // Update charts if they exist this.updateChartsTheme(); } updateLogo(theme) { const logoElement = document.getElementById('serena-logo'); if (logoElement) { if (theme === 'dark') { logoElement.src = 'serena-logs-dark-mode.png'; } else { logoElement.src = 'serena-logs.png'; } } } updateChartsTheme() { const isDark = document.documentElement.getAttribute('data-theme') === 'dark'; const textColor = isDark ? '#ffffff' : '#000000'; const gridColor = isDark ? '#444' : '#ddd'; // Update existing charts if (this.countChart) { this.countChart.options.scales.x.ticks.color = textColor; this.countChart.options.scales.y.ticks.color = textColor; this.countChart.options.scales.x.grid.color = gridColor; this.countChart.options.scales.y.grid.color = gridColor; this.countChart.update(); } if (this.inputChart) { this.inputChart.options.scales.x.ticks.color = textColor; this.inputChart.options.scales.y.ticks.color = textColor; this.inputChart.options.scales.x.grid.color = gridColor; this.inputChart.options.scales.y.grid.color = gridColor; this.inputChart.update(); } if (this.outputChart) { this.outputChart.options.scales.x.ticks.color = textColor; this.outputChart.options.scales.y.ticks.color = textColor; this.outputChart.options.scales.x.grid.color = gridColor; this.outputChart.options.scales.y.grid.color = gridColor; this.outputChart.update(); } if (this.tokensChart) { this.tokensChart.options.scales.x.ticks.color = textColor; this.tokensChart.options.scales.y.ticks.color = textColor; this.tokensChart.options.scales.y1.ticks.color = textColor; this.tokensChart.options.scales.x.grid.color = gridColor; this.tokensChart.options.scales.y.grid.color = gridColor; this.tokensChart.options.scales.y1.grid.color = gridColor; this.tokensChart.update(); } } shutdown() { const self = this; const _shutdown = function () { console.log("Triggering shutdown"); $.ajax({ url: '/shutdown', type: "PUT", contentType: 'application/json', }); self.$errorContainer.html('<div class="error-message">Shutting down ...</div>') setTimeout(function() { window.close(); }, 2000); } // ask for confirmation using a dialog if (confirm("This will fully terminate the Serena server.")) { _shutdown(); } else { console.log("Shutdown cancelled"); } } } ``` -------------------------------------------------------------------------------- /src/solidlsp/ls_handler.py: -------------------------------------------------------------------------------- ```python import asyncio import json import logging import os import platform import subprocess import threading import time from collections.abc import Callable from dataclasses import dataclass from queue import Empty, Queue from typing import Any import psutil from sensai.util.string import ToStringMixin from solidlsp.ls_exceptions import SolidLSPException from solidlsp.ls_request import LanguageServerRequest from solidlsp.lsp_protocol_handler.lsp_requests import LspNotification from solidlsp.lsp_protocol_handler.lsp_types import ErrorCodes from solidlsp.lsp_protocol_handler.server import ( ENCODING, LSPError, MessageType, PayloadLike, ProcessLaunchInfo, StringDict, content_length, create_message, make_error_response, make_notification, make_request, make_response, ) from solidlsp.util.subprocess_util import subprocess_kwargs log = logging.getLogger(__name__) class LanguageServerTerminatedException(Exception): """ Exception raised when the language server process has terminated unexpectedly. """ def __init__(self, message: str, cause: Exception | None = None) -> None: super().__init__(message) self.message = message self.cause = cause def __str__(self) -> str: return f"LanguageServerTerminatedException: {self.message}" + (f"; Cause: {self.cause}" if self.cause else "") class Request(ToStringMixin): @dataclass class Result: payload: PayloadLike | None = None error: Exception | None = None def is_error(self) -> bool: return self.error is not None def __init__(self, request_id: int, method: str) -> None: self._request_id = request_id self._method = method self._status = "pending" self._result_queue = Queue() def _tostring_includes(self) -> list[str]: return ["_request_id", "_status", "_method"] def on_result(self, params: PayloadLike) -> None: self._status = "completed" self._result_queue.put(Request.Result(payload=params)) def on_error(self, err: Exception) -> None: """ :param err: the error that occurred while processing the request (typically an LSPError for errors returned by the LS or LanguageServerTerminatedException if the error is due to the language server process terminating unexpectedly). """ self._status = "error" self._result_queue.put(Request.Result(error=err)) def get_result(self, timeout: float | None = None) -> Result: try: return self._result_queue.get(timeout=timeout) except Empty as e: if timeout is not None: raise TimeoutError(f"Request timed out ({timeout=})") from e raise e class SolidLanguageServerHandler: """ This class provides the implementation of Python client for the Language Server Protocol. A class that launches the language server and communicates with it using the Language Server Protocol (LSP). It provides methods for sending requests, responses, and notifications to the server and for registering handlers for requests and notifications from the server. Uses JSON-RPC 2.0 for communication with the server over stdin/stdout. Attributes: send: A LspRequest object that can be used to send requests to the server and await for the responses. notify: A LspNotification object that can be used to send notifications to the server. cmd: A string that represents the command to launch the language server process. process: A subprocess.Popen object that represents the language server process. request_id: An integer that represents the next available request id for the client. _pending_requests: A dictionary that maps request ids to Request objects that store the results or errors of the requests. on_request_handlers: A dictionary that maps method names to callback functions that handle requests from the server. on_notification_handlers: A dictionary that maps method names to callback functions that handle notifications from the server. logger: An optional function that takes two strings (source and destination) and a payload dictionary, and logs the communication between the client and the server. tasks: A dictionary that maps task ids to asyncio.Task objects that represent the asynchronous tasks created by the handler. task_counter: An integer that represents the next available task id for the handler. loop: An asyncio.AbstractEventLoop object that represents the event loop used by the handler. start_independent_lsp_process: An optional boolean flag that indicates whether to start the language server process in an independent process group. Default is `True`. Setting it to `False` means that the language server process will be in the same process group as the the current process, and any SIGINT and SIGTERM signals will be sent to both processes. """ def __init__( self, process_launch_info: ProcessLaunchInfo, logger: Callable[[str, str, StringDict | str], None] | None = None, start_independent_lsp_process=True, request_timeout: float | None = None, ) -> None: self.send = LanguageServerRequest(self) self.notify = LspNotification(self.send_notification) self.process_launch_info = process_launch_info self.process: subprocess.Popen | None = None self._is_shutting_down = False self.request_id = 1 self._pending_requests: dict[Any, Request] = {} self.on_request_handlers = {} self.on_notification_handlers = {} self.logger = logger self.tasks = {} self.task_counter = 0 self.loop = None self.start_independent_lsp_process = start_independent_lsp_process self._request_timeout = request_timeout # Add thread locks for shared resources to prevent race conditions self._stdin_lock = threading.Lock() self._request_id_lock = threading.Lock() self._response_handlers_lock = threading.Lock() self._tasks_lock = threading.Lock() def set_request_timeout(self, timeout: float | None) -> None: """ :param timeout: the timeout, in seconds, for all requests sent to the language server. """ self._request_timeout = timeout def is_running(self) -> bool: """ Checks if the language server process is currently running. """ return self.process is not None and self.process.returncode is None def start(self) -> None: """ Starts the language server process and creates a task to continuously read from its stdout to handle communications from the server to the client """ child_proc_env = os.environ.copy() child_proc_env.update(self.process_launch_info.env) cmd = self.process_launch_info.cmd is_windows = platform.system() == "Windows" if not isinstance(cmd, str) and not is_windows: # Since we are using the shell, we need to convert the command list to a single string # on Linux/macOS cmd = " ".join(cmd) log.info("Starting language server process via command: %s", self.process_launch_info.cmd) kwargs = subprocess_kwargs() kwargs["start_new_session"] = self.start_independent_lsp_process self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env=child_proc_env, cwd=self.process_launch_info.cwd, shell=True, **kwargs, ) # Check if process terminated immediately if self.process.returncode is not None: log.error("Language server has already terminated/could not be started") # Process has already terminated stderr_data = self.process.stderr.read() error_message = stderr_data.decode("utf-8", errors="replace") raise RuntimeError(f"Process terminated immediately with code {self.process.returncode}. Error: {error_message}") # start threads to read stdout and stderr of the process threading.Thread( target=self._read_ls_process_stdout, name="LSP-stdout-reader", daemon=True, ).start() threading.Thread( target=self._read_ls_process_stderr, name="LSP-stderr-reader", daemon=True, ).start() def stop(self) -> None: """ Sends the terminate signal to the language server process and waits for it to exit, with a timeout, killing it if necessary """ process = self.process self.process = None if process: self._cleanup_process(process) def _cleanup_process(self, process): """Clean up a process: close stdin, terminate/kill process, close stdout/stderr.""" # Close stdin first to prevent deadlocks # See: https://bugs.python.org/issue35539 self._safely_close_pipe(process.stdin) # Terminate/kill the process if it's still running if process.returncode is None: self._terminate_or_kill_process(process) # Close stdout and stderr pipes after process has exited # This is essential to prevent "I/O operation on closed pipe" errors and # "Event loop is closed" errors during garbage collection # See: https://bugs.python.org/issue41320 and https://github.com/python/cpython/issues/88050 self._safely_close_pipe(process.stdout) self._safely_close_pipe(process.stderr) def _safely_close_pipe(self, pipe): """Safely close a pipe, ignoring any exceptions.""" if pipe: try: pipe.close() except Exception: pass def _terminate_or_kill_process(self, process): """Try to terminate the process gracefully, then forcefully if necessary.""" # First try to terminate the process tree gracefully self._signal_process_tree(process, terminate=True) def _signal_process_tree(self, process, terminate=True): """Send signal (terminate or kill) to the process and all its children.""" signal_method = "terminate" if terminate else "kill" # Try to get the parent process parent = None try: parent = psutil.Process(process.pid) except (psutil.NoSuchProcess, psutil.AccessDenied, Exception): pass # If we have the parent process and it's running, signal the entire tree if parent and parent.is_running(): # Signal children first for child in parent.children(recursive=True): try: getattr(child, signal_method)() except (psutil.NoSuchProcess, psutil.AccessDenied, Exception): pass # Then signal the parent try: getattr(parent, signal_method)() except (psutil.NoSuchProcess, psutil.AccessDenied, Exception): pass else: # Fall back to direct process signaling try: getattr(process, signal_method)() except Exception: pass def shutdown(self) -> None: """ Perform the shutdown sequence for the client, including sending the shutdown request to the server and notifying it of exit """ self._is_shutting_down = True self._log("Sending shutdown request to server") self.send.shutdown() self._log("Received shutdown response from server") self._log("Sending exit notification to server") self.notify.exit() self._log("Sent exit notification to server") def _log(self, message: str | StringDict) -> None: """ Create a log message """ if self.logger is not None: self.logger("client", "logger", message) @staticmethod def _read_bytes_from_process(process, stream, num_bytes): """Read exactly num_bytes from process stdout""" data = b"" while len(data) < num_bytes: chunk = stream.read(num_bytes - len(data)) if not chunk: if process.poll() is not None: raise LanguageServerTerminatedException( f"Process terminated while trying to read response (read {num_bytes} of {len(data)} bytes before termination)" ) # Process still running but no data available yet, retry after a short delay time.sleep(0.01) continue data += chunk return data def _read_ls_process_stdout(self) -> None: """ Continuously read from the language server process stdout and handle the messages invoking the registered response and notification handlers """ exception: Exception | None = None try: while self.process and self.process.stdout: if self.process.poll() is not None: # process has terminated break line = self.process.stdout.readline() if not line: continue try: num_bytes = content_length(line) except ValueError: continue if num_bytes is None: continue while line and line.strip(): line = self.process.stdout.readline() if not line: continue body = self._read_bytes_from_process(self.process, self.process.stdout, num_bytes) self._handle_body(body) except LanguageServerTerminatedException as e: exception = e except (BrokenPipeError, ConnectionResetError) as e: exception = LanguageServerTerminatedException("Language server process terminated while reading stdout", cause=e) except Exception as e: exception = LanguageServerTerminatedException("Unexpected error while reading stdout from language server process", cause=e) log.info("Language server stdout reader thread has terminated") if not self._is_shutting_down: if exception is None: exception = LanguageServerTerminatedException("Language server stdout read process terminated unexpectedly") log.error(str(exception)) self._cancel_pending_requests(exception) def _read_ls_process_stderr(self) -> None: """ Continuously read from the language server process stderr and log the messages """ try: while self.process and self.process.stderr: if self.process.poll() is not None: # process has terminated break line = self.process.stderr.readline() if not line: continue line = line.decode(ENCODING, errors="replace") line_lower = line.lower() if "error" in line_lower or "exception" in line_lower or line.startswith("E["): level = logging.ERROR else: level = logging.INFO log.log(level, line) except Exception as e: log.error("Error while reading stderr from language server process: %s", e, exc_info=e) if not self._is_shutting_down: log.error("Language server stderr reader thread terminated unexpectedly") else: log.info("Language server stderr reader thread has terminated") def _handle_body(self, body: bytes) -> None: """ Parse the body text received from the language server process and invoke the appropriate handler """ try: self._receive_payload(json.loads(body)) except OSError as ex: self._log(f"malformed {ENCODING}: {ex}") except UnicodeDecodeError as ex: self._log(f"malformed {ENCODING}: {ex}") except json.JSONDecodeError as ex: self._log(f"malformed JSON: {ex}") def _receive_payload(self, payload: StringDict) -> None: """ Determine if the payload received from server is for a request, response, or notification and invoke the appropriate handler """ if self.logger: self.logger("server", "client", payload) try: if "method" in payload: if "id" in payload: self._request_handler(payload) else: self._notification_handler(payload) elif "id" in payload: self._response_handler(payload) else: self._log(f"Unknown payload type: {payload}") except Exception as err: self._log(f"Error handling server payload: {err}") def send_notification(self, method: str, params: dict | None = None) -> None: """ Send notification pertaining to the given method to the server with the given parameters """ self._send_payload(make_notification(method, params)) def send_response(self, request_id: Any, params: PayloadLike) -> None: """ Send response to the given request id to the server with the given parameters """ self._send_payload(make_response(request_id, params)) def send_error_response(self, request_id: Any, err: LSPError) -> None: """ Send error response to the given request id to the server with the given error """ # Use lock to prevent race conditions on tasks and task_counter self._send_payload(make_error_response(request_id, err)) def _cancel_pending_requests(self, exception: Exception) -> None: """ Cancel all pending requests by setting their results to an error """ with self._response_handlers_lock: log.info("Cancelling %d pending language server requests", len(self._pending_requests)) for request in self._pending_requests.values(): log.info("Cancelling %s", request) request.on_error(exception) self._pending_requests.clear() def send_request(self, method: str, params: dict | None = None) -> PayloadLike: """ Send request to the server, register the request id, and wait for the response """ with self._request_id_lock: request_id = self.request_id self.request_id += 1 request = Request(request_id=request_id, method=method) log.debug("Starting: %s", request) with self._response_handlers_lock: self._pending_requests[request_id] = request self._send_payload(make_request(method, request_id, params)) self._log(f"Waiting for response to request {method} with params:\n{params}") result = request.get_result(timeout=self._request_timeout) log.debug("Completed: %s", request) self._log("Processing result") if result.is_error(): raise SolidLSPException(f"Error processing request {method} with params:\n{params}", cause=result.error) from result.error self._log(f"Returning non-error result, which is:\n{result.payload}") return result.payload def _send_payload(self, payload: StringDict) -> None: """ Send the payload to the server by writing to its stdin asynchronously. """ if not self.process or not self.process.stdin: return self._log(payload) msg = create_message(payload) # Use lock to prevent concurrent writes to stdin that cause buffer corruption with self._stdin_lock: try: self.process.stdin.writelines(msg) self.process.stdin.flush() except (BrokenPipeError, ConnectionResetError, OSError) as e: # Log the error but don't raise to prevent cascading failures if self.logger: self.logger("client", "logger", f"Failed to write to stdin: {e}") return def on_request(self, method: str, cb) -> None: """ Register the callback function to handle requests from the server to the client for the given method """ self.on_request_handlers[method] = cb def on_notification(self, method: str, cb) -> None: """ Register the callback function to handle notifications from the server to the client for the given method """ self.on_notification_handlers[method] = cb def _response_handler(self, response: StringDict) -> None: """ Handle the response received from the server for a request, using the id to determine the request """ response_id = response["id"] with self._response_handlers_lock: request = self._pending_requests.pop(response_id, None) if request is None and isinstance(response_id, str) and response_id.isdigit(): request = self._pending_requests.pop(int(response_id), None) if request is None: # need to convert response_id to the right type log.debug("Request interrupted by user or not found for ID %s", response_id) return if "result" in response and "error" not in response: request.on_result(response["result"]) elif "result" not in response and "error" in response: request.on_error(LSPError.from_lsp(response["error"])) else: request.on_error(LSPError(ErrorCodes.InvalidRequest, "")) def _request_handler(self, response: StringDict) -> None: """ Handle the request received from the server: call the appropriate callback function and return the result """ method = response.get("method", "") params = response.get("params") request_id = response.get("id") handler = self.on_request_handlers.get(method) if not handler: self.send_error_response( request_id, LSPError( ErrorCodes.MethodNotFound, f"method '{method}' not handled on client.", ), ) return try: self.send_response(request_id, handler(params)) except LSPError as ex: self.send_error_response(request_id, ex) except Exception as ex: self.send_error_response(request_id, LSPError(ErrorCodes.InternalError, str(ex))) def _notification_handler(self, response: StringDict) -> None: """ Handle the notification received from the server: call the appropriate callback function """ method = response.get("method", "") params = response.get("params") handler = self.on_notification_handlers.get(method) if not handler: self._log(f"unhandled {method}") return try: handler(params) except asyncio.CancelledError: return except Exception as ex: if (not self._is_shutting_down) and self.logger: self.logger( "client", "logger", str( { "type": MessageType.error, "message": str(ex), "method": method, "params": params, } ), ) ``` -------------------------------------------------------------------------------- /test/serena/util/test_file_system.py: -------------------------------------------------------------------------------- ```python import os import shutil import tempfile from pathlib import Path # Assuming the gitignore parser code is in a module named 'gitignore_parser' from serena.util.file_system import GitignoreParser, GitignoreSpec class TestGitignoreParser: """Test class for GitignoreParser functionality.""" def setup_method(self): """Set up test environment before each test method.""" # Create a temporary directory for testing self.test_dir = tempfile.mkdtemp() self.repo_path = Path(self.test_dir) # Create test repository structure self._create_repo_structure() def teardown_method(self): """Clean up test environment after each test method.""" # Remove the temporary directory shutil.rmtree(self.test_dir) def _create_repo_structure(self): """ Create a test repository structure with multiple gitignore files. Structure: repo/ ├── .gitignore ├── file1.txt ├── test.log ├── src/ │ ├── .gitignore │ ├── main.py │ ├── test.log │ ├── build/ │ │ └── output.o │ └── lib/ │ ├── .gitignore │ └── cache.tmp └── docs/ ├── .gitignore ├── api.md └── temp/ └── draft.md """ # Create directories (self.repo_path / "src").mkdir() (self.repo_path / "src" / "build").mkdir() (self.repo_path / "src" / "lib").mkdir() (self.repo_path / "docs").mkdir() (self.repo_path / "docs" / "temp").mkdir() # Create files (self.repo_path / "file1.txt").touch() (self.repo_path / "test.log").touch() (self.repo_path / "src" / "main.py").touch() (self.repo_path / "src" / "test.log").touch() (self.repo_path / "src" / "build" / "output.o").touch() (self.repo_path / "src" / "lib" / "cache.tmp").touch() (self.repo_path / "docs" / "api.md").touch() (self.repo_path / "docs" / "temp" / "draft.md").touch() # Create root .gitignore root_gitignore = self.repo_path / ".gitignore" root_gitignore.write_text( """# Root gitignore *.log /build/ """ ) # Create src/.gitignore src_gitignore = self.repo_path / "src" / ".gitignore" src_gitignore.write_text( """# Source gitignore *.o build/ !important.log """ ) # Create src/lib/.gitignore (deeply nested) src_lib_gitignore = self.repo_path / "src" / "lib" / ".gitignore" src_lib_gitignore.write_text( """# Library gitignore *.tmp *.cache """ ) # Create docs/.gitignore docs_gitignore = self.repo_path / "docs" / ".gitignore" docs_gitignore.write_text( """# Docs gitignore temp/ *.tmp """ ) def test_initialization(self): """Test GitignoreParser initialization.""" parser = GitignoreParser(str(self.repo_path)) assert parser.repo_root == str(self.repo_path.absolute()) assert len(parser.get_ignore_specs()) == 4 def test_find_gitignore_files(self): """Test finding all gitignore files in repository, including deeply nested ones.""" parser = GitignoreParser(str(self.repo_path)) # Get file paths from specs gitignore_files = [spec.file_path for spec in parser.get_ignore_specs()] # Convert to relative paths for easier testing rel_paths = [os.path.relpath(f, self.repo_path) for f in gitignore_files] rel_paths.sort() assert len(rel_paths) == 4 assert ".gitignore" in rel_paths assert os.path.join("src", ".gitignore") in rel_paths assert os.path.join("src", "lib", ".gitignore") in rel_paths # Deeply nested assert os.path.join("docs", ".gitignore") in rel_paths def test_parse_patterns_root_directory(self): """Test parsing gitignore patterns in root directory.""" # Create a simple test case with only root gitignore test_dir = self.repo_path / "test_root" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """*.log build/ /temp.txt """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns assert "*.log" in patterns assert "build/" in patterns assert "/temp.txt" in patterns def test_parse_patterns_subdirectory(self): """Test parsing gitignore patterns in subdirectory.""" # Create a test case with subdirectory gitignore test_dir = self.repo_path / "test_sub" test_dir.mkdir() subdir = test_dir / "src" subdir.mkdir() gitignore = subdir / ".gitignore" gitignore.write_text( """*.o /build/ test.log """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns # Non-anchored pattern should get ** prefix assert "src/**/*.o" in patterns # Anchored pattern should not get ** prefix assert "src/build/" in patterns # Non-anchored pattern without slash assert "src/**/test.log" in patterns def test_should_ignore_root_patterns(self): """Test ignoring files based on root .gitignore.""" parser = GitignoreParser(str(self.repo_path)) # Files that should be ignored assert parser.should_ignore("test.log") assert parser.should_ignore(str(self.repo_path / "test.log")) # Files that should NOT be ignored assert not parser.should_ignore("file1.txt") assert not parser.should_ignore("src/main.py") def test_should_ignore_subdirectory_patterns(self): """Test ignoring files based on subdirectory .gitignore files.""" parser = GitignoreParser(str(self.repo_path)) # .o files in src should be ignored assert parser.should_ignore("src/build/output.o") # build/ directory in src should be ignored assert parser.should_ignore("src/build/") # temp/ directory in docs should be ignored assert parser.should_ignore("docs/temp/draft.md") # But temp/ outside docs should not be ignored by docs/.gitignore assert not parser.should_ignore("temp/file.txt") # Test deeply nested .gitignore in src/lib/ # .tmp files in src/lib should be ignored assert parser.should_ignore("src/lib/cache.tmp") # .cache files in src/lib should also be ignored assert parser.should_ignore("src/lib/data.cache") # But .tmp files outside src/lib should not be ignored by src/lib/.gitignore assert not parser.should_ignore("src/other.tmp") def test_anchored_vs_non_anchored_patterns(self): """Test the difference between anchored and non-anchored patterns.""" # Create new test structure test_dir = self.repo_path / "test_anchored" test_dir.mkdir() (test_dir / "src").mkdir() (test_dir / "src" / "subdir").mkdir() (test_dir / "src" / "subdir" / "deep").mkdir() # Create src/.gitignore with both anchored and non-anchored patterns gitignore = test_dir / "src" / ".gitignore" gitignore.write_text( """/temp.txt data.json """ ) # Create test files (test_dir / "src" / "temp.txt").touch() (test_dir / "src" / "data.json").touch() (test_dir / "src" / "subdir" / "temp.txt").touch() (test_dir / "src" / "subdir" / "data.json").touch() (test_dir / "src" / "subdir" / "deep" / "data.json").touch() parser = GitignoreParser(str(test_dir)) # Anchored pattern /temp.txt should only match in src/ assert parser.should_ignore("src/temp.txt") assert not parser.should_ignore("src/subdir/temp.txt") # Non-anchored pattern data.json should match anywhere under src/ assert parser.should_ignore("src/data.json") assert parser.should_ignore("src/subdir/data.json") assert parser.should_ignore("src/subdir/deep/data.json") def test_root_anchored_patterns(self): """Test anchored patterns in root .gitignore only match root-level files.""" # Create new test structure for root anchored patterns test_dir = self.repo_path / "test_root_anchored" test_dir.mkdir() (test_dir / "src").mkdir() (test_dir / "docs").mkdir() (test_dir / "src" / "nested").mkdir() # Create root .gitignore with anchored patterns gitignore = test_dir / ".gitignore" gitignore.write_text( """/config.json /temp.log /build *.pyc """ ) # Create test files at root level (test_dir / "config.json").touch() (test_dir / "temp.log").touch() (test_dir / "build").mkdir() (test_dir / "file.pyc").touch() # Create same-named files in subdirectories (test_dir / "src" / "config.json").touch() (test_dir / "src" / "temp.log").touch() (test_dir / "src" / "build").mkdir() (test_dir / "src" / "file.pyc").touch() (test_dir / "docs" / "config.json").touch() (test_dir / "docs" / "temp.log").touch() (test_dir / "src" / "nested" / "config.json").touch() (test_dir / "src" / "nested" / "temp.log").touch() (test_dir / "src" / "nested" / "build").mkdir() parser = GitignoreParser(str(test_dir)) # Anchored patterns should only match root-level files assert parser.should_ignore("config.json") assert not parser.should_ignore("src/config.json") assert not parser.should_ignore("docs/config.json") assert not parser.should_ignore("src/nested/config.json") assert parser.should_ignore("temp.log") assert not parser.should_ignore("src/temp.log") assert not parser.should_ignore("docs/temp.log") assert not parser.should_ignore("src/nested/temp.log") assert parser.should_ignore("build") assert not parser.should_ignore("src/build") assert not parser.should_ignore("src/nested/build") # Non-anchored patterns should match everywhere assert parser.should_ignore("file.pyc") assert parser.should_ignore("src/file.pyc") def test_mixed_anchored_and_non_anchored_root_patterns(self): """Test mix of anchored and non-anchored patterns in root .gitignore.""" test_dir = self.repo_path / "test_mixed_patterns" test_dir.mkdir() (test_dir / "app").mkdir() (test_dir / "tests").mkdir() (test_dir / "app" / "modules").mkdir() # Create root .gitignore with mixed patterns gitignore = test_dir / ".gitignore" gitignore.write_text( """/secrets.env /dist/ node_modules/ *.tmp /app/local.config debug.log """ ) # Create test files and directories (test_dir / "secrets.env").touch() (test_dir / "dist").mkdir() (test_dir / "node_modules").mkdir() (test_dir / "file.tmp").touch() (test_dir / "app" / "local.config").touch() (test_dir / "debug.log").touch() # Create same files in subdirectories (test_dir / "app" / "secrets.env").touch() (test_dir / "app" / "dist").mkdir() (test_dir / "app" / "node_modules").mkdir() (test_dir / "app" / "file.tmp").touch() (test_dir / "app" / "debug.log").touch() (test_dir / "tests" / "secrets.env").touch() (test_dir / "tests" / "node_modules").mkdir() (test_dir / "tests" / "debug.log").touch() (test_dir / "app" / "modules" / "local.config").touch() parser = GitignoreParser(str(test_dir)) # Anchored patterns should only match at root assert parser.should_ignore("secrets.env") assert not parser.should_ignore("app/secrets.env") assert not parser.should_ignore("tests/secrets.env") assert parser.should_ignore("dist") assert not parser.should_ignore("app/dist") assert parser.should_ignore("app/local.config") assert not parser.should_ignore("app/modules/local.config") # Non-anchored patterns should match everywhere assert parser.should_ignore("node_modules") assert parser.should_ignore("app/node_modules") assert parser.should_ignore("tests/node_modules") assert parser.should_ignore("file.tmp") assert parser.should_ignore("app/file.tmp") assert parser.should_ignore("debug.log") assert parser.should_ignore("app/debug.log") assert parser.should_ignore("tests/debug.log") def test_negation_patterns(self): """Test negation patterns are parsed correctly.""" test_dir = self.repo_path / "test_negation" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """*.log !important.log !src/keep.log """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns assert "*.log" in patterns assert "!important.log" in patterns assert "!src/keep.log" in patterns def test_comments_and_empty_lines(self): """Test that comments and empty lines are ignored.""" test_dir = self.repo_path / "test_comments" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """# This is a comment *.log # Another comment # Indented comment build/ """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns assert len(patterns) == 2 assert "*.log" in patterns assert "build/" in patterns def test_escaped_characters(self): """Test escaped special characters.""" test_dir = self.repo_path / "test_escaped" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """\\#not-a-comment.txt \\!not-negation.txt """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns assert "#not-a-comment.txt" in patterns assert "!not-negation.txt" in patterns def test_escaped_negation_patterns(self): test_dir = self.repo_path / "test_escaped_negation" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """*.log \\!not-negation.log !actual-negation.log """ ) parser = GitignoreParser(str(test_dir)) specs = parser.get_ignore_specs() assert len(specs) == 1 patterns = specs[0].patterns # Key assertions: escaped exclamation becomes literal, real negation preserved assert "!not-negation.log" in patterns # escaped -> literal assert "!actual-negation.log" in patterns # real negation preserved # Test the actual behavioral difference between escaped and real negation: # *.log pattern should ignore test.log assert parser.should_ignore("test.log") # Escaped negation file should still be ignored by *.log pattern assert parser.should_ignore("!not-negation.log") # Actual negation should override the *.log pattern assert not parser.should_ignore("actual-negation.log") def test_glob_patterns(self): """Test various glob patterns work correctly.""" test_dir = self.repo_path / "test_glob" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """*.pyc **/*.tmp src/*.o !src/important.o [Tt]est* """ ) # Create test files (test_dir / "src").mkdir() (test_dir / "src" / "nested").mkdir() (test_dir / "file.pyc").touch() (test_dir / "src" / "file.pyc").touch() (test_dir / "file.tmp").touch() (test_dir / "src" / "nested" / "file.tmp").touch() (test_dir / "src" / "file.o").touch() (test_dir / "src" / "important.o").touch() (test_dir / "Test.txt").touch() (test_dir / "test.log").touch() parser = GitignoreParser(str(test_dir)) # *.pyc should match everywhere assert parser.should_ignore("file.pyc") assert parser.should_ignore("src/file.pyc") # **/*.tmp should match all .tmp files assert parser.should_ignore("file.tmp") assert parser.should_ignore("src/nested/file.tmp") # src/*.o should only match .o files directly in src/ assert parser.should_ignore("src/file.o") # Character class patterns assert parser.should_ignore("Test.txt") assert parser.should_ignore("test.log") def test_empty_gitignore(self): """Test handling of empty gitignore files.""" test_dir = self.repo_path / "test_empty" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text("") parser = GitignoreParser(str(test_dir)) # Should not crash and should return empty list assert len(parser.get_ignore_specs()) == 0 def test_malformed_gitignore(self): """Test handling of malformed gitignore content.""" test_dir = self.repo_path / "test_malformed" test_dir.mkdir() gitignore = test_dir / ".gitignore" gitignore.write_text( """# Only comments and empty lines # More comments """ ) parser = GitignoreParser(str(test_dir)) # Should handle gracefully assert len(parser.get_ignore_specs()) == 0 def test_reload(self): """Test reloading gitignore files.""" test_dir = self.repo_path / "test_reload" test_dir.mkdir() # Create initial gitignore gitignore = test_dir / ".gitignore" gitignore.write_text("*.log") parser = GitignoreParser(str(test_dir)) assert len(parser.get_ignore_specs()) == 1 assert parser.should_ignore("test.log") # Modify gitignore gitignore.write_text("*.tmp") # Without reload, should still use old patterns assert parser.should_ignore("test.log") assert not parser.should_ignore("test.tmp") # After reload, should use new patterns parser.reload() assert not parser.should_ignore("test.log") assert parser.should_ignore("test.tmp") def test_gitignore_spec_matches(self): """Test GitignoreSpec.matches method.""" spec = GitignoreSpec("/path/to/.gitignore", ["*.log", "build/", "!important.log"]) assert spec.matches("test.log") assert spec.matches("build/output.o") assert spec.matches("src/test.log") # Note: Negation patterns in pathspec work differently than in git # This is a limitation of the pathspec library def test_subdirectory_gitignore_pattern_scoping(self): """Test that subdirectory .gitignore patterns are scoped correctly.""" # Create test structure: foo/ with subdirectory bar/ test_dir = self.repo_path / "test_subdir_scoping" test_dir.mkdir() (test_dir / "foo").mkdir() (test_dir / "foo" / "bar").mkdir() # Create files in various locations (test_dir / "foo.txt").touch() # root level (test_dir / "foo" / "foo.txt").touch() # in foo/ (test_dir / "foo" / "bar" / "foo.txt").touch() # in foo/bar/ # Test case 1: foo.txt in foo/.gitignore should only ignore in foo/ subtree gitignore = test_dir / "foo" / ".gitignore" gitignore.write_text("foo.txt\n") parser = GitignoreParser(str(test_dir)) # foo.txt at root should NOT be ignored by foo/.gitignore assert not parser.should_ignore("foo.txt"), "Root foo.txt should not be ignored by foo/.gitignore" # foo.txt in foo/ should be ignored assert parser.should_ignore("foo/foo.txt"), "foo/foo.txt should be ignored" # foo.txt in foo/bar/ should be ignored (within foo/ subtree) assert parser.should_ignore("foo/bar/foo.txt"), "foo/bar/foo.txt should be ignored" def test_anchored_pattern_in_subdirectory(self): """Test that anchored patterns in subdirectory only match immediate children.""" test_dir = self.repo_path / "test_anchored_subdir" test_dir.mkdir() (test_dir / "foo").mkdir() (test_dir / "foo" / "bar").mkdir() # Create files (test_dir / "foo.txt").touch() # root level (test_dir / "foo" / "foo.txt").touch() # in foo/ (test_dir / "foo" / "bar" / "foo.txt").touch() # in foo/bar/ # Test case 2: /foo.txt in foo/.gitignore should only match foo/foo.txt gitignore = test_dir / "foo" / ".gitignore" gitignore.write_text("/foo.txt\n") parser = GitignoreParser(str(test_dir)) # foo.txt at root should NOT be ignored assert not parser.should_ignore("foo.txt"), "Root foo.txt should not be ignored" # foo.txt directly in foo/ should be ignored assert parser.should_ignore("foo/foo.txt"), "foo/foo.txt should be ignored by /foo.txt pattern" # foo.txt in foo/bar/ should NOT be ignored (anchored pattern only matches immediate children) assert not parser.should_ignore("foo/bar/foo.txt"), "foo/bar/foo.txt should NOT be ignored by /foo.txt pattern" def test_double_star_pattern_scoping(self): """Test that **/pattern in subdirectory only applies within that subtree.""" test_dir = self.repo_path / "test_doublestar_scope" test_dir.mkdir() (test_dir / "foo").mkdir() (test_dir / "foo" / "bar").mkdir() (test_dir / "other").mkdir() # Create files (test_dir / "foo.txt").touch() # root level (test_dir / "foo" / "foo.txt").touch() # in foo/ (test_dir / "foo" / "bar" / "foo.txt").touch() # in foo/bar/ (test_dir / "other" / "foo.txt").touch() # in other/ # Test case 3: **/foo.txt in foo/.gitignore should only ignore within foo/ subtree gitignore = test_dir / "foo" / ".gitignore" gitignore.write_text("**/foo.txt\n") parser = GitignoreParser(str(test_dir)) # foo.txt at root should NOT be ignored assert not parser.should_ignore("foo.txt"), "Root foo.txt should not be ignored by foo/.gitignore" # foo.txt in foo/ should be ignored assert parser.should_ignore("foo/foo.txt"), "foo/foo.txt should be ignored" # foo.txt in foo/bar/ should be ignored (within foo/ subtree) assert parser.should_ignore("foo/bar/foo.txt"), "foo/bar/foo.txt should be ignored" # foo.txt in other/ should NOT be ignored (outside foo/ subtree) assert not parser.should_ignore("other/foo.txt"), "other/foo.txt should NOT be ignored by foo/.gitignore" def test_anchored_double_star_pattern(self): """Test that /**/pattern in subdirectory works correctly.""" test_dir = self.repo_path / "test_anchored_doublestar" test_dir.mkdir() (test_dir / "foo").mkdir() (test_dir / "foo" / "bar").mkdir() (test_dir / "other").mkdir() # Create files (test_dir / "foo.txt").touch() # root level (test_dir / "foo" / "foo.txt").touch() # in foo/ (test_dir / "foo" / "bar" / "foo.txt").touch() # in foo/bar/ (test_dir / "other" / "foo.txt").touch() # in other/ # Test case 4: /**/foo.txt in foo/.gitignore should correctly ignore only within foo/ subtree gitignore = test_dir / "foo" / ".gitignore" gitignore.write_text("/**/foo.txt\n") parser = GitignoreParser(str(test_dir)) # foo.txt at root should NOT be ignored assert not parser.should_ignore("foo.txt"), "Root foo.txt should not be ignored" # foo.txt in foo/ should be ignored assert parser.should_ignore("foo/foo.txt"), "foo/foo.txt should be ignored" # foo.txt in foo/bar/ should be ignored (within foo/ subtree) assert parser.should_ignore("foo/bar/foo.txt"), "foo/bar/foo.txt should be ignored" # foo.txt in other/ should NOT be ignored (outside foo/ subtree) assert not parser.should_ignore("other/foo.txt"), "other/foo.txt should NOT be ignored by foo/.gitignore" ```