This is page 9 of 11. Use http://codebase.md/oraios/serena?page={x} to view the full context. # Directory Structure ``` ├── .devcontainer │ └── devcontainer.json ├── .dockerignore ├── .env.example ├── .github │ ├── FUNDING.yml │ ├── ISSUE_TEMPLATE │ │ ├── config.yml │ │ ├── feature_request.md │ │ └── issue--bug--performance-problem--question-.md │ └── workflows │ ├── codespell.yml │ ├── docker.yml │ ├── junie.yml │ ├── lint_and_docs.yaml │ ├── publish.yml │ └── pytest.yml ├── .gitignore ├── .serena │ ├── memories │ │ ├── adding_new_language_support_guide.md │ │ ├── serena_core_concepts_and_architecture.md │ │ ├── serena_repository_structure.md │ │ └── suggested_commands.md │ └── project.yml ├── .vscode │ └── settings.json ├── CHANGELOG.md ├── CLAUDE.md ├── compose.yaml ├── CONTRIBUTING.md ├── docker_build_and_run.sh ├── DOCKER.md ├── Dockerfile ├── docs │ ├── custom_agent.md │ └── serena_on_chatgpt.md ├── flake.lock ├── flake.nix ├── lessons_learned.md ├── LICENSE ├── llms-install.md ├── public │ └── .gitignore ├── pyproject.toml ├── README.md ├── resources │ ├── serena-icons.cdr │ ├── serena-logo-dark-mode.svg │ ├── serena-logo.cdr │ ├── serena-logo.svg │ └── vscode_sponsor_logo.png ├── roadmap.md ├── scripts │ ├── agno_agent.py │ ├── demo_run_tools.py │ ├── gen_prompt_factory.py │ ├── mcp_server.py │ ├── print_mode_context_options.py │ └── print_tool_overview.py ├── src │ ├── interprompt │ │ ├── __init__.py │ │ ├── .syncCommitId.remote │ │ ├── .syncCommitId.this │ │ ├── jinja_template.py │ │ ├── multilang_prompt.py │ │ ├── prompt_factory.py │ │ └── util │ │ ├── __init__.py │ │ └── class_decorators.py │ ├── README.md │ ├── serena │ │ ├── __init__.py │ │ ├── agent.py │ │ ├── agno.py │ │ ├── analytics.py │ │ ├── cli.py │ │ ├── code_editor.py │ │ ├── config │ │ │ ├── __init__.py │ │ │ ├── context_mode.py │ │ │ └── serena_config.py │ │ ├── constants.py │ │ ├── dashboard.py │ │ ├── generated │ │ │ └── generated_prompt_factory.py │ │ ├── gui_log_viewer.py │ │ ├── mcp.py │ │ ├── project.py │ │ ├── prompt_factory.py │ │ ├── resources │ │ │ ├── config │ │ │ │ ├── contexts │ │ │ │ │ ├── agent.yml │ │ │ │ │ ├── chatgpt.yml │ │ │ │ │ ├── codex.yml │ │ │ │ │ ├── context.template.yml │ │ │ │ │ ├── desktop-app.yml │ │ │ │ │ ├── ide-assistant.yml │ │ │ │ │ └── oaicompat-agent.yml │ │ │ │ ├── internal_modes │ │ │ │ │ └── jetbrains.yml │ │ │ │ ├── modes │ │ │ │ │ ├── editing.yml │ │ │ │ │ ├── interactive.yml │ │ │ │ │ ├── mode.template.yml │ │ │ │ │ ├── no-onboarding.yml │ │ │ │ │ ├── onboarding.yml │ │ │ │ │ ├── one-shot.yml │ │ │ │ │ └── planning.yml │ │ │ │ └── prompt_templates │ │ │ │ ├── simple_tool_outputs.yml │ │ │ │ └── system_prompt.yml │ │ │ ├── dashboard │ │ │ │ ├── dashboard.js │ │ │ │ ├── index.html │ │ │ │ ├── jquery.min.js │ │ │ │ ├── serena-icon-16.png │ │ │ │ ├── serena-icon-32.png │ │ │ │ ├── serena-icon-48.png │ │ │ │ ├── serena-logs-dark-mode.png │ │ │ │ └── serena-logs.png │ │ │ ├── project.template.yml │ │ │ └── serena_config.template.yml │ │ ├── symbol.py │ │ ├── text_utils.py │ │ ├── tools │ │ │ ├── __init__.py │ │ │ ├── cmd_tools.py │ │ │ ├── config_tools.py │ │ │ ├── file_tools.py │ │ │ ├── jetbrains_plugin_client.py │ │ │ ├── jetbrains_tools.py │ │ │ ├── memory_tools.py │ │ │ ├── symbol_tools.py │ │ │ ├── tools_base.py │ │ │ └── workflow_tools.py │ │ └── util │ │ ├── class_decorators.py │ │ ├── exception.py │ │ ├── file_system.py │ │ ├── general.py │ │ ├── git.py │ │ ├── inspection.py │ │ ├── logging.py │ │ ├── shell.py │ │ └── thread.py │ └── solidlsp │ ├── __init__.py │ ├── .gitignore │ ├── language_servers │ │ ├── al_language_server.py │ │ ├── bash_language_server.py │ │ ├── clangd_language_server.py │ │ ├── clojure_lsp.py │ │ ├── common.py │ │ ├── csharp_language_server.py │ │ ├── dart_language_server.py │ │ ├── eclipse_jdtls.py │ │ ├── elixir_tools │ │ │ ├── __init__.py │ │ │ ├── elixir_tools.py │ │ │ └── README.md │ │ ├── elm_language_server.py │ │ ├── erlang_language_server.py │ │ ├── gopls.py │ │ ├── intelephense.py │ │ ├── jedi_server.py │ │ ├── kotlin_language_server.py │ │ ├── lua_ls.py │ │ ├── marksman.py │ │ ├── nixd_ls.py │ │ ├── omnisharp │ │ │ ├── initialize_params.json │ │ │ ├── runtime_dependencies.json │ │ │ └── workspace_did_change_configuration.json │ │ ├── omnisharp.py │ │ ├── perl_language_server.py │ │ ├── pyright_server.py │ │ ├── r_language_server.py │ │ ├── ruby_lsp.py │ │ ├── rust_analyzer.py │ │ ├── solargraph.py │ │ ├── sourcekit_lsp.py │ │ ├── terraform_ls.py │ │ ├── typescript_language_server.py │ │ ├── vts_language_server.py │ │ └── zls.py │ ├── ls_config.py │ ├── ls_exceptions.py │ ├── ls_handler.py │ ├── ls_logger.py │ ├── ls_request.py │ ├── ls_types.py │ ├── ls_utils.py │ ├── ls.py │ ├── lsp_protocol_handler │ │ ├── lsp_constants.py │ │ ├── lsp_requests.py │ │ ├── lsp_types.py │ │ └── server.py │ ├── settings.py │ └── util │ ├── subprocess_util.py │ └── zip.py ├── test │ ├── __init__.py │ ├── conftest.py │ ├── resources │ │ └── repos │ │ ├── al │ │ │ └── test_repo │ │ │ ├── app.json │ │ │ └── src │ │ │ ├── Codeunits │ │ │ │ ├── CustomerMgt.Codeunit.al │ │ │ │ └── PaymentProcessorImpl.Codeunit.al │ │ │ ├── Enums │ │ │ │ └── CustomerType.Enum.al │ │ │ ├── Interfaces │ │ │ │ └── IPaymentProcessor.Interface.al │ │ │ ├── Pages │ │ │ │ ├── CustomerCard.Page.al │ │ │ │ └── CustomerList.Page.al │ │ │ ├── TableExtensions │ │ │ │ └── Item.TableExt.al │ │ │ └── Tables │ │ │ └── Customer.Table.al │ │ ├── bash │ │ │ └── test_repo │ │ │ ├── config.sh │ │ │ ├── main.sh │ │ │ └── utils.sh │ │ ├── clojure │ │ │ └── test_repo │ │ │ ├── deps.edn │ │ │ └── src │ │ │ └── test_app │ │ │ ├── core.clj │ │ │ └── utils.clj │ │ ├── csharp │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── Models │ │ │ │ └── Person.cs │ │ │ ├── Program.cs │ │ │ ├── serena.sln │ │ │ └── TestProject.csproj │ │ ├── dart │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── helper.dart │ │ │ │ ├── main.dart │ │ │ │ └── models.dart │ │ │ └── pubspec.yaml │ │ ├── elixir │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── lib │ │ │ │ ├── examples.ex │ │ │ │ ├── ignored_dir │ │ │ │ │ └── ignored_module.ex │ │ │ │ ├── models.ex │ │ │ │ ├── services.ex │ │ │ │ ├── test_repo.ex │ │ │ │ └── utils.ex │ │ │ ├── mix.exs │ │ │ ├── mix.lock │ │ │ ├── scripts │ │ │ │ └── build_script.ex │ │ │ └── test │ │ │ ├── models_test.exs │ │ │ └── test_repo_test.exs │ │ ├── elm │ │ │ └── test_repo │ │ │ ├── elm.json │ │ │ ├── Main.elm │ │ │ └── Utils.elm │ │ ├── erlang │ │ │ └── test_repo │ │ │ ├── hello.erl │ │ │ ├── ignored_dir │ │ │ │ └── ignored_module.erl │ │ │ ├── include │ │ │ │ ├── records.hrl │ │ │ │ └── types.hrl │ │ │ ├── math_utils.erl │ │ │ ├── rebar.config │ │ │ ├── src │ │ │ │ ├── app.erl │ │ │ │ ├── models.erl │ │ │ │ ├── services.erl │ │ │ │ └── utils.erl │ │ │ └── test │ │ │ ├── models_tests.erl │ │ │ └── utils_tests.erl │ │ ├── go │ │ │ └── test_repo │ │ │ └── main.go │ │ ├── java │ │ │ └── test_repo │ │ │ ├── pom.xml │ │ │ └── src │ │ │ └── main │ │ │ └── java │ │ │ └── test_repo │ │ │ ├── Main.java │ │ │ ├── Model.java │ │ │ ├── ModelUser.java │ │ │ └── Utils.java │ │ ├── kotlin │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── build.gradle.kts │ │ │ └── src │ │ │ └── main │ │ │ └── kotlin │ │ │ └── test_repo │ │ │ ├── Main.kt │ │ │ ├── Model.kt │ │ │ ├── ModelUser.kt │ │ │ └── Utils.kt │ │ ├── lua │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── main.lua │ │ │ ├── src │ │ │ │ ├── calculator.lua │ │ │ │ └── utils.lua │ │ │ └── tests │ │ │ └── test_calculator.lua │ │ ├── markdown │ │ │ └── test_repo │ │ │ ├── api.md │ │ │ ├── CONTRIBUTING.md │ │ │ ├── guide.md │ │ │ └── README.md │ │ ├── nix │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── default.nix │ │ │ ├── flake.nix │ │ │ ├── lib │ │ │ │ └── utils.nix │ │ │ ├── modules │ │ │ │ └── example.nix │ │ │ └── scripts │ │ │ └── hello.sh │ │ ├── perl │ │ │ └── test_repo │ │ │ ├── helper.pl │ │ │ └── main.pl │ │ ├── php │ │ │ └── test_repo │ │ │ ├── helper.php │ │ │ ├── index.php │ │ │ └── simple_var.php │ │ ├── python │ │ │ └── test_repo │ │ │ ├── .gitignore │ │ │ ├── custom_test │ │ │ │ ├── __init__.py │ │ │ │ └── advanced_features.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── user_management.py │ │ │ ├── ignore_this_dir_with_postfix │ │ │ │ └── ignored_module.py │ │ │ ├── scripts │ │ │ │ ├── __init__.py │ │ │ │ └── run_app.py │ │ │ └── test_repo │ │ │ ├── __init__.py │ │ │ ├── complex_types.py │ │ │ ├── models.py │ │ │ ├── name_collisions.py │ │ │ ├── nested_base.py │ │ │ ├── nested.py │ │ │ ├── overloaded.py │ │ │ ├── services.py │ │ │ ├── utils.py │ │ │ └── variables.py │ │ ├── r │ │ │ └── test_repo │ │ │ ├── .Rbuildignore │ │ │ ├── DESCRIPTION │ │ │ ├── examples │ │ │ │ └── analysis.R │ │ │ ├── NAMESPACE │ │ │ └── R │ │ │ ├── models.R │ │ │ └── utils.R │ │ ├── ruby │ │ │ └── test_repo │ │ │ ├── .solargraph.yml │ │ │ ├── examples │ │ │ │ └── user_management.rb │ │ │ ├── lib.rb │ │ │ ├── main.rb │ │ │ ├── models.rb │ │ │ ├── nested.rb │ │ │ ├── services.rb │ │ │ └── variables.rb │ │ ├── rust │ │ │ ├── test_repo │ │ │ │ ├── Cargo.lock │ │ │ │ ├── Cargo.toml │ │ │ │ └── src │ │ │ │ ├── lib.rs │ │ │ │ └── main.rs │ │ │ └── test_repo_2024 │ │ │ ├── Cargo.lock │ │ │ ├── Cargo.toml │ │ │ └── src │ │ │ ├── lib.rs │ │ │ └── main.rs │ │ ├── swift │ │ │ └── test_repo │ │ │ ├── Package.swift │ │ │ └── src │ │ │ ├── main.swift │ │ │ └── utils.swift │ │ ├── terraform │ │ │ └── test_repo │ │ │ ├── data.tf │ │ │ ├── main.tf │ │ │ ├── outputs.tf │ │ │ └── variables.tf │ │ ├── typescript │ │ │ └── test_repo │ │ │ ├── .serena │ │ │ │ └── project.yml │ │ │ ├── index.ts │ │ │ ├── tsconfig.json │ │ │ └── use_helper.ts │ │ └── zig │ │ └── test_repo │ │ ├── .gitignore │ │ ├── build.zig │ │ ├── src │ │ │ ├── calculator.zig │ │ │ ├── main.zig │ │ │ └── math_utils.zig │ │ └── zls.json │ ├── serena │ │ ├── __init__.py │ │ ├── __snapshots__ │ │ │ └── test_symbol_editing.ambr │ │ ├── config │ │ │ ├── __init__.py │ │ │ └── test_serena_config.py │ │ ├── test_edit_marker.py │ │ ├── test_mcp.py │ │ ├── test_serena_agent.py │ │ ├── test_symbol_editing.py │ │ ├── test_symbol.py │ │ ├── test_text_utils.py │ │ ├── test_tool_parameter_types.py │ │ └── util │ │ ├── test_exception.py │ │ └── test_file_system.py │ └── solidlsp │ ├── al │ │ └── test_al_basic.py │ ├── bash │ │ ├── __init__.py │ │ └── test_bash_basic.py │ ├── clojure │ │ ├── __init__.py │ │ └── test_clojure_basic.py │ ├── csharp │ │ └── test_csharp_basic.py │ ├── dart │ │ ├── __init__.py │ │ └── test_dart_basic.py │ ├── elixir │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_elixir_basic.py │ │ ├── test_elixir_ignored_dirs.py │ │ ├── test_elixir_integration.py │ │ └── test_elixir_symbol_retrieval.py │ ├── elm │ │ └── test_elm_basic.py │ ├── erlang │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_erlang_basic.py │ │ ├── test_erlang_ignored_dirs.py │ │ └── test_erlang_symbol_retrieval.py │ ├── go │ │ └── test_go_basic.py │ ├── java │ │ └── test_java_basic.py │ ├── kotlin │ │ └── test_kotlin_basic.py │ ├── lua │ │ └── test_lua_basic.py │ ├── markdown │ │ ├── __init__.py │ │ └── test_markdown_basic.py │ ├── nix │ │ └── test_nix_basic.py │ ├── perl │ │ └── test_perl_basic.py │ ├── php │ │ └── test_php_basic.py │ ├── python │ │ ├── test_python_basic.py │ │ ├── test_retrieval_with_ignored_dirs.py │ │ └── test_symbol_retrieval.py │ ├── r │ │ ├── __init__.py │ │ └── test_r_basic.py │ ├── ruby │ │ ├── test_ruby_basic.py │ │ └── test_ruby_symbol_retrieval.py │ ├── rust │ │ ├── test_rust_2024_edition.py │ │ └── test_rust_basic.py │ ├── swift │ │ └── test_swift_basic.py │ ├── terraform │ │ └── test_terraform_basic.py │ ├── typescript │ │ └── test_typescript_basic.py │ ├── util │ │ └── test_zip.py │ └── zig │ └── test_zig_basic.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/solidlsp/language_servers/eclipse_jdtls.py: -------------------------------------------------------------------------------- ```python """ Provides Java specific instantiation of the LanguageServer class. Contains various configurations and settings specific to Java. """ import dataclasses import logging import os import pathlib import shutil import threading import uuid from pathlib import PurePath from overrides import override from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.ls_utils import FileUtils, PlatformUtils from solidlsp.lsp_protocol_handler.lsp_types import InitializeParams from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings @dataclasses.dataclass class RuntimeDependencyPaths: """ Stores the paths to the runtime dependencies of EclipseJDTLS """ gradle_path: str lombok_jar_path: str jre_path: str jre_home_path: str jdtls_launcher_jar_path: str jdtls_readonly_config_path: str intellicode_jar_path: str intellisense_members_path: str class EclipseJDTLS(SolidLanguageServer): """ The EclipseJDTLS class provides a Java specific implementation of the LanguageServer class """ def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): """ Creates a new EclipseJDTLS instance initializing the language server settings appropriately. This class is not meant to be instantiated directly. Use LanguageServer.create() instead. """ runtime_dependency_paths = self._setupRuntimeDependencies(logger, config, solidlsp_settings) self.runtime_dependency_paths = runtime_dependency_paths # ws_dir is the workspace directory for the EclipseJDTLS server ws_dir = str( PurePath( solidlsp_settings.ls_resources_dir, "EclipseJDTLS", "workspaces", uuid.uuid4().hex, ) ) # shared_cache_location is the global cache used by Eclipse JDTLS across all workspaces shared_cache_location = str(PurePath(solidlsp_settings.ls_resources_dir, "lsp", "EclipseJDTLS", "sharedIndex")) os.makedirs(shared_cache_location, exist_ok=True) os.makedirs(ws_dir, exist_ok=True) jre_path = self.runtime_dependency_paths.jre_path lombok_jar_path = self.runtime_dependency_paths.lombok_jar_path jdtls_launcher_jar = self.runtime_dependency_paths.jdtls_launcher_jar_path data_dir = str(PurePath(ws_dir, "data_dir")) jdtls_config_path = str(PurePath(ws_dir, "config_path")) jdtls_readonly_config_path = self.runtime_dependency_paths.jdtls_readonly_config_path if not os.path.exists(jdtls_config_path): shutil.copytree(jdtls_readonly_config_path, jdtls_config_path) for static_path in [ jre_path, lombok_jar_path, jdtls_launcher_jar, jdtls_config_path, jdtls_readonly_config_path, ]: assert os.path.exists(static_path), static_path # TODO: Add "self.runtime_dependency_paths.jre_home_path"/bin to $PATH as well proc_env = {"syntaxserver": "false", "JAVA_HOME": self.runtime_dependency_paths.jre_home_path} proc_cwd = repository_root_path cmd = " ".join( [ jre_path, "--add-modules=ALL-SYSTEM", "--add-opens", "java.base/java.util=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.fs=ALL-UNNAMED", "-Declipse.application=org.eclipse.jdt.ls.core.id1", "-Dosgi.bundles.defaultStartLevel=4", "-Declipse.product=org.eclipse.jdt.ls.core.product", "-Djava.import.generatesMetadataFilesAtProjectRoot=false", "-Dfile.encoding=utf8", "-noverify", "-XX:+UseParallelGC", "-XX:GCTimeRatio=4", "-XX:AdaptiveSizePolicyWeight=90", "-Dsun.zip.disableMemoryMapping=true", "-Djava.lsp.joinOnCompletion=true", "-Xmx3G", "-Xms100m", "-Xlog:disable", "-Dlog.level=ALL", f'"-javaagent:{lombok_jar_path}"', f'"-Djdt.core.sharedIndexLocation={shared_cache_location}"', "-jar", f'"{jdtls_launcher_jar}"', "-configuration", f'"{jdtls_config_path}"', "-data", f'"{data_dir}"', ] ) self.service_ready_event = threading.Event() self.intellicode_enable_command_available = threading.Event() self.initialize_searcher_command_available = threading.Event() super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd, proc_env, proc_cwd), "java", solidlsp_settings=solidlsp_settings ) @override def is_ignored_dirname(self, dirname: str) -> bool: # Ignore common Java build directories from different build tools: # - Maven: target # - Gradle: build, .gradle # - Eclipse: bin, .settings # - IntelliJ IDEA: out, .idea # - General: classes, dist, lib return super().is_ignored_dirname(dirname) or dirname in [ "target", # Maven "build", # Gradle "bin", # Eclipse "out", # IntelliJ IDEA "classes", # General "dist", # General "lib", # General ] @classmethod def _setupRuntimeDependencies( cls, logger: LanguageServerLogger, config: LanguageServerConfig, solidlsp_settings: SolidLSPSettings ) -> RuntimeDependencyPaths: """ Setup runtime dependencies for EclipseJDTLS and return the paths. """ platformId = PlatformUtils.get_platform_id() runtime_dependencies = { "gradle": { "platform-agnostic": { "url": "https://services.gradle.org/distributions/gradle-8.14.2-bin.zip", "archiveType": "zip", "relative_extraction_path": ".", } }, "vscode-java": { "darwin-arm64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-darwin-arm64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", }, "osx-arm64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-darwin-arm64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", "jre_home_path": "extension/jre/21.0.7-macosx-aarch64", "jre_path": "extension/jre/21.0.7-macosx-aarch64/bin/java", "lombok_jar_path": "extension/lombok/lombok-1.18.36.jar", "jdtls_launcher_jar_path": "extension/server/plugins/org.eclipse.equinox.launcher_1.7.0.v20250424-1814.jar", "jdtls_readonly_config_path": "extension/server/config_mac_arm", }, "osx-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-darwin-x64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", "jre_home_path": "extension/jre/21.0.7-macosx-x86_64", "jre_path": "extension/jre/21.0.7-macosx-x86_64/bin/java", "lombok_jar_path": "extension/lombok/lombok-1.18.36.jar", "jdtls_launcher_jar_path": "extension/server/plugins/org.eclipse.equinox.launcher_1.7.0.v20250424-1814.jar", "jdtls_readonly_config_path": "extension/server/config_mac", }, "linux-arm64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-linux-arm64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", }, "linux-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-linux-x64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", "jre_home_path": "extension/jre/21.0.7-linux-x86_64", "jre_path": "extension/jre/21.0.7-linux-x86_64/bin/java", "lombok_jar_path": "extension/lombok/lombok-1.18.36.jar", "jdtls_launcher_jar_path": "extension/server/plugins/org.eclipse.equinox.launcher_1.7.0.v20250424-1814.jar", "jdtls_readonly_config_path": "extension/server/config_linux", }, "win-x64": { "url": "https://github.com/redhat-developer/vscode-java/releases/download/v1.42.0/java-win32-x64-1.42.0-561.vsix", "archiveType": "zip", "relative_extraction_path": "vscode-java", "jre_home_path": "extension/jre/21.0.7-win32-x86_64", "jre_path": "extension/jre/21.0.7-win32-x86_64/bin/java.exe", "lombok_jar_path": "extension/lombok/lombok-1.18.36.jar", "jdtls_launcher_jar_path": "extension/server/plugins/org.eclipse.equinox.launcher_1.7.0.v20250424-1814.jar", "jdtls_readonly_config_path": "extension/server/config_win", }, }, "intellicode": { "platform-agnostic": { "url": "https://VisualStudioExptTeam.gallery.vsassets.io/_apis/public/gallery/publisher/VisualStudioExptTeam/extension/vscodeintellicode/1.2.30/assetbyname/Microsoft.VisualStudio.Services.VSIXPackage", "alternate_url": "https://marketplace.visualstudio.com/_apis/public/gallery/publishers/VisualStudioExptTeam/vsextensions/vscodeintellicode/1.2.30/vspackage", "archiveType": "zip", "relative_extraction_path": "intellicode", "intellicode_jar_path": "extension/dist/com.microsoft.jdtls.intellicode.core-0.7.0.jar", "intellisense_members_path": "extension/dist/bundledModels/java_intellisense-members", } }, } # assert platformId.value in [ # "linux-x64", # "win-x64", # ], "Only linux-x64 platform is supported for in multilspy at the moment" gradle_path = str( PurePath( cls.ls_resources_dir(solidlsp_settings), "gradle-8.14.2", ) ) if not os.path.exists(gradle_path): FileUtils.download_and_extract_archive( logger, runtime_dependencies["gradle"]["platform-agnostic"]["url"], str(PurePath(gradle_path).parent), runtime_dependencies["gradle"]["platform-agnostic"]["archiveType"], ) assert os.path.exists(gradle_path) dependency = runtime_dependencies["vscode-java"][platformId.value] vscode_java_path = str(PurePath(cls.ls_resources_dir(solidlsp_settings), dependency["relative_extraction_path"])) os.makedirs(vscode_java_path, exist_ok=True) jre_home_path = str(PurePath(vscode_java_path, dependency["jre_home_path"])) jre_path = str(PurePath(vscode_java_path, dependency["jre_path"])) lombok_jar_path = str(PurePath(vscode_java_path, dependency["lombok_jar_path"])) jdtls_launcher_jar_path = str(PurePath(vscode_java_path, dependency["jdtls_launcher_jar_path"])) jdtls_readonly_config_path = str(PurePath(vscode_java_path, dependency["jdtls_readonly_config_path"])) if not all( [ os.path.exists(vscode_java_path), os.path.exists(jre_home_path), os.path.exists(jre_path), os.path.exists(lombok_jar_path), os.path.exists(jdtls_launcher_jar_path), os.path.exists(jdtls_readonly_config_path), ] ): FileUtils.download_and_extract_archive(logger, dependency["url"], vscode_java_path, dependency["archiveType"]) os.chmod(jre_path, 0o755) assert os.path.exists(vscode_java_path) assert os.path.exists(jre_home_path) assert os.path.exists(jre_path) assert os.path.exists(lombok_jar_path) assert os.path.exists(jdtls_launcher_jar_path) assert os.path.exists(jdtls_readonly_config_path) dependency = runtime_dependencies["intellicode"]["platform-agnostic"] intellicode_directory_path = str(PurePath(cls.ls_resources_dir(solidlsp_settings), dependency["relative_extraction_path"])) os.makedirs(intellicode_directory_path, exist_ok=True) intellicode_jar_path = str(PurePath(intellicode_directory_path, dependency["intellicode_jar_path"])) intellisense_members_path = str(PurePath(intellicode_directory_path, dependency["intellisense_members_path"])) if not all( [ os.path.exists(intellicode_directory_path), os.path.exists(intellicode_jar_path), os.path.exists(intellisense_members_path), ] ): FileUtils.download_and_extract_archive(logger, dependency["url"], intellicode_directory_path, dependency["archiveType"]) assert os.path.exists(intellicode_directory_path) assert os.path.exists(intellicode_jar_path) assert os.path.exists(intellisense_members_path) return RuntimeDependencyPaths( gradle_path=gradle_path, lombok_jar_path=lombok_jar_path, jre_path=jre_path, jre_home_path=jre_home_path, jdtls_launcher_jar_path=jdtls_launcher_jar_path, jdtls_readonly_config_path=jdtls_readonly_config_path, intellicode_jar_path=intellicode_jar_path, intellisense_members_path=intellisense_members_path, ) def _get_initialize_params(self, repository_absolute_path: str) -> InitializeParams: """ Returns the initialize parameters for the EclipseJDTLS server. """ # Look into https://github.com/eclipse/eclipse.jdt.ls/blob/master/org.eclipse.jdt.ls.core/src/org/eclipse/jdt/ls/core/internal/preferences/Preferences.java to understand all the options available if not os.path.isabs(repository_absolute_path): repository_absolute_path = os.path.abspath(repository_absolute_path) repo_uri = pathlib.Path(repository_absolute_path).as_uri() initialize_params = { "locale": "en", "rootPath": repository_absolute_path, "rootUri": pathlib.Path(repository_absolute_path).as_uri(), "capabilities": { "workspace": { "applyEdit": True, "workspaceEdit": { "documentChanges": True, "resourceOperations": ["create", "rename", "delete"], "failureHandling": "textOnlyTransactional", "normalizesLineEndings": True, "changeAnnotationSupport": {"groupsOnLabel": True}, }, "didChangeConfiguration": {"dynamicRegistration": True}, "didChangeWatchedFiles": {"dynamicRegistration": True, "relativePatternSupport": True}, "symbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, "tagSupport": {"valueSet": [1]}, "resolveSupport": {"properties": ["location.range"]}, }, "codeLens": {"refreshSupport": True}, "executeCommand": {"dynamicRegistration": True}, "configuration": True, "workspaceFolders": True, "semanticTokens": {"refreshSupport": True}, "fileOperations": { "dynamicRegistration": True, "didCreate": True, "didRename": True, "didDelete": True, "willCreate": True, "willRename": True, "willDelete": True, }, "inlineValue": {"refreshSupport": True}, "inlayHint": {"refreshSupport": True}, "diagnostics": {"refreshSupport": True}, }, "textDocument": { "publishDiagnostics": { "relatedInformation": True, "versionSupport": False, "tagSupport": {"valueSet": [1, 2]}, "codeDescriptionSupport": True, "dataSupport": True, }, "synchronization": {"dynamicRegistration": True, "willSave": True, "willSaveWaitUntil": True, "didSave": True}, # TODO: we have an assert that completion provider is not included in the capabilities at server startup # Removing this will cause the assert to fail. Investigate why this is the case, simplify config "completion": { "dynamicRegistration": True, "contextSupport": True, "completionItem": { "snippetSupport": False, "commitCharactersSupport": True, "documentationFormat": ["markdown", "plaintext"], "deprecatedSupport": True, "preselectSupport": True, "tagSupport": {"valueSet": [1]}, "insertReplaceSupport": False, "resolveSupport": {"properties": ["documentation", "detail", "additionalTextEdits"]}, "insertTextModeSupport": {"valueSet": [1, 2]}, "labelDetailsSupport": True, }, "insertTextMode": 2, "completionItemKind": { "valueSet": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25] }, "completionList": {"itemDefaults": ["commitCharacters", "editRange", "insertTextFormat", "insertTextMode"]}, }, "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, "signatureHelp": { "dynamicRegistration": True, "signatureInformation": { "documentationFormat": ["markdown", "plaintext"], "parameterInformation": {"labelOffsetSupport": True}, "activeParameterSupport": True, }, }, "definition": {"dynamicRegistration": True, "linkSupport": True}, "references": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, "hierarchicalDocumentSymbolSupport": True, "tagSupport": {"valueSet": [1]}, "labelSupport": True, }, "rename": { "dynamicRegistration": True, "prepareSupport": True, "prepareSupportDefaultBehavior": 1, "honorsChangeAnnotations": True, }, "documentLink": {"dynamicRegistration": True, "tooltipSupport": True}, "typeDefinition": {"dynamicRegistration": True, "linkSupport": True}, "implementation": {"dynamicRegistration": True, "linkSupport": True}, "colorProvider": {"dynamicRegistration": True}, "declaration": {"dynamicRegistration": True, "linkSupport": True}, "selectionRange": {"dynamicRegistration": True}, "callHierarchy": {"dynamicRegistration": True}, "semanticTokens": { "dynamicRegistration": True, "tokenTypes": [ "namespace", "type", "class", "enum", "interface", "struct", "typeParameter", "parameter", "variable", "property", "enumMember", "event", "function", "method", "macro", "keyword", "modifier", "comment", "string", "number", "regexp", "operator", "decorator", ], "tokenModifiers": [ "declaration", "definition", "readonly", "static", "deprecated", "abstract", "async", "modification", "documentation", "defaultLibrary", ], "formats": ["relative"], "requests": {"range": True, "full": {"delta": True}}, "multilineTokenSupport": False, "overlappingTokenSupport": False, "serverCancelSupport": True, "augmentsSyntaxTokens": True, }, "typeHierarchy": {"dynamicRegistration": True}, "inlineValue": {"dynamicRegistration": True}, "diagnostic": {"dynamicRegistration": True, "relatedDocumentSupport": False}, }, "general": { "staleRequestSupport": { "cancel": True, "retryOnContentModified": [ "textDocument/semanticTokens/full", "textDocument/semanticTokens/range", "textDocument/semanticTokens/full/delta", ], }, "regularExpressions": {"engine": "ECMAScript", "version": "ES2020"}, "positionEncodings": ["utf-16"], }, "notebookDocument": {"synchronization": {"dynamicRegistration": True, "executionSummarySupport": True}}, }, "initializationOptions": { "bundles": ["intellicode-core.jar"], "settings": { "java": { "home": None, "jdt": { "ls": { "java": {"home": None}, "vmargs": "-XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -Dsun.zip.disableMemoryMapping=true -Xmx1G -Xms100m -Xlog:disable", "lombokSupport": {"enabled": True}, "protobufSupport": {"enabled": True}, "androidSupport": {"enabled": True}, } }, "errors": {"incompleteClasspath": {"severity": "error"}}, "configuration": { "checkProjectSettingsExclusions": False, "updateBuildConfiguration": "interactive", "maven": { "userSettings": None, "globalSettings": None, "notCoveredPluginExecutionSeverity": "warning", "defaultMojoExecutionAction": "ignore", }, "workspaceCacheLimit": 90, "runtimes": [ {"name": "JavaSE-21", "path": "static/vscode-java/extension/jre/21.0.7-linux-x86_64", "default": True} ], }, "trace": {"server": "verbose"}, "import": { "maven": { "enabled": True, "offline": {"enabled": False}, "disableTestClasspathFlag": False, }, "gradle": { "enabled": True, "wrapper": {"enabled": False}, "version": None, "home": "abs(static/gradle-7.3.3)", "java": {"home": "abs(static/launch_jres/21.0.7-linux-x86_64)"}, "offline": {"enabled": False}, "arguments": None, "jvmArguments": None, "user": {"home": None}, "annotationProcessing": {"enabled": True}, }, "exclusions": [ "**/node_modules/**", "**/.metadata/**", "**/archetype-resources/**", "**/META-INF/maven/**", ], "generatesMetadataFilesAtProjectRoot": False, }, "maven": {"downloadSources": True, "updateSnapshots": True}, "eclipse": {"downloadSources": True}, "signatureHelp": {"enabled": True, "description": {"enabled": True}}, "implementationsCodeLens": {"enabled": True}, "format": { "enabled": True, "settings": {"url": None, "profile": None}, "comments": {"enabled": True}, "onType": {"enabled": True}, "insertSpaces": True, "tabSize": 4, }, "saveActions": {"organizeImports": False}, "project": { "referencedLibraries": ["lib/**/*.jar"], "importOnFirstTimeStartup": "automatic", "importHint": True, "resourceFilters": ["node_modules", "\\.git"], "encoding": "ignore", "exportJar": {"targetPath": "${workspaceFolder}/${workspaceFolderBasename}.jar"}, }, "contentProvider": {"preferred": None}, "autobuild": {"enabled": True}, "maxConcurrentBuilds": 1, "selectionRange": {"enabled": True}, "showBuildStatusOnStart": {"enabled": "notification"}, "server": {"launchMode": "Standard"}, "sources": {"organizeImports": {"starThreshold": 99, "staticStarThreshold": 99}}, "imports": {"gradle": {"wrapper": {"checksums": []}}}, "templates": {"fileHeader": [], "typeComment": []}, "references": {"includeAccessors": True, "includeDecompiledSources": True}, "typeHierarchy": {"lazyLoad": False}, "settings": {"url": None}, "symbols": {"includeSourceMethodDeclarations": False}, "inlayHints": {"parameterNames": {"enabled": "literals", "exclusions": []}}, "codeAction": {"sortMembers": {"avoidVolatileChanges": True}}, "compile": { "nullAnalysis": { "nonnull": [ "javax.annotation.Nonnull", "org.eclipse.jdt.annotation.NonNull", "org.springframework.lang.NonNull", ], "nullable": [ "javax.annotation.Nullable", "org.eclipse.jdt.annotation.Nullable", "org.springframework.lang.Nullable", ], "mode": "automatic", } }, "sharedIndexes": {"enabled": "auto", "location": ""}, "silentNotification": False, "dependency": { "showMembers": False, "syncWithFolderExplorer": True, "autoRefresh": True, "refreshDelay": 2000, "packagePresentation": "flat", }, "help": {"firstView": "auto", "showReleaseNotes": True, "collectErrorLog": False}, "test": {"defaultConfig": "", "config": {}}, } }, }, "trace": "verbose", "processId": os.getpid(), "workspaceFolders": [ { "uri": repo_uri, "name": os.path.basename(repository_absolute_path), } ], } initialize_params["initializationOptions"]["workspaceFolders"] = [repo_uri] bundles = [self.runtime_dependency_paths.intellicode_jar_path] initialize_params["initializationOptions"]["bundles"] = bundles initialize_params["initializationOptions"]["settings"]["java"]["configuration"]["runtimes"] = [ {"name": "JavaSE-21", "path": self.runtime_dependency_paths.jre_home_path, "default": True} ] for runtime in initialize_params["initializationOptions"]["settings"]["java"]["configuration"]["runtimes"]: assert "name" in runtime assert "path" in runtime assert os.path.exists(runtime["path"]), f"Runtime required for eclipse_jdtls at path {runtime['path']} does not exist" gradle_settings = initialize_params["initializationOptions"]["settings"]["java"]["import"]["gradle"] gradle_settings["home"] = self.runtime_dependency_paths.gradle_path gradle_settings["java"]["home"] = self.runtime_dependency_paths.jre_path return initialize_params def _start_server(self): """ Starts the Eclipse JDTLS Language Server """ def register_capability_handler(params): assert "registrations" in params for registration in params["registrations"]: if registration["method"] == "textDocument/completion": assert registration["registerOptions"]["resolveProvider"] == True assert registration["registerOptions"]["triggerCharacters"] == [ ".", "@", "#", "*", " ", ] self.completions_available.set() if registration["method"] == "workspace/executeCommand": if "java.intellicode.enable" in registration["registerOptions"]["commands"]: self.intellicode_enable_command_available.set() return def lang_status_handler(params): # TODO: Should we wait for # server -> client: {'jsonrpc': '2.0', 'method': 'language/status', 'params': {'type': 'ProjectStatus', 'message': 'OK'}} # Before proceeding? if params["type"] == "ServiceReady" and params["message"] == "ServiceReady": self.service_ready_event.set() def execute_client_command_handler(params): assert params["command"] == "_java.reloadBundles.command" assert params["arguments"] == [] return [] def window_log_message(msg): self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO) def do_nothing(params): return self.server.on_request("client/registerCapability", register_capability_handler) self.server.on_notification("language/status", lang_status_handler) self.server.on_notification("window/logMessage", window_log_message) self.server.on_request("workspace/executeClientCommand", execute_client_command_handler) self.server.on_notification("$/progress", do_nothing) self.server.on_notification("textDocument/publishDiagnostics", do_nothing) self.server.on_notification("language/actionableNotification", do_nothing) self.logger.log("Starting EclipseJDTLS server process", logging.INFO) self.server.start() initialize_params = self._get_initialize_params(self.repository_root_path) self.logger.log( "Sending initialize request from LSP client to LSP server and awaiting response", logging.INFO, ) init_response = self.server.send.initialize(initialize_params) assert init_response["capabilities"]["textDocumentSync"]["change"] == 2 assert "completionProvider" not in init_response["capabilities"] assert "executeCommandProvider" not in init_response["capabilities"] self.server.notify.initialized({}) self.server.notify.workspace_did_change_configuration({"settings": initialize_params["initializationOptions"]["settings"]}) self.intellicode_enable_command_available.wait() java_intellisense_members_path = self.runtime_dependency_paths.intellisense_members_path assert os.path.exists(java_intellisense_members_path) intellicode_enable_result = self.server.send.execute_command( { "command": "java.intellicode.enable", "arguments": [True, java_intellisense_members_path], } ) assert intellicode_enable_result # TODO: Add comments about why we wait here, and how this can be optimized self.service_ready_event.wait() ``` -------------------------------------------------------------------------------- /src/serena/cli.py: -------------------------------------------------------------------------------- ```python import glob import json import os import shutil import subprocess import sys from logging import Logger from pathlib import Path from typing import Any, Literal import click from sensai.util import logging from sensai.util.logging import FileLoggerContext, datetime_tag from tqdm import tqdm from serena.agent import SerenaAgent from serena.config.context_mode import SerenaAgentContext, SerenaAgentMode from serena.config.serena_config import ProjectConfig, SerenaConfig, SerenaPaths from serena.constants import ( DEFAULT_CONTEXT, DEFAULT_MODES, PROMPT_TEMPLATES_DIR_IN_USER_HOME, PROMPT_TEMPLATES_DIR_INTERNAL, SERENA_LOG_FORMAT, SERENA_MANAGED_DIR_IN_HOME, SERENAS_OWN_CONTEXT_YAMLS_DIR, SERENAS_OWN_MODE_YAMLS_DIR, USER_CONTEXT_YAMLS_DIR, USER_MODE_YAMLS_DIR, ) from serena.mcp import SerenaMCPFactory, SerenaMCPFactorySingleProcess from serena.project import Project from serena.tools import FindReferencingSymbolsTool, FindSymbolTool, GetSymbolsOverviewTool, SearchForPatternTool, ToolRegistry from serena.util.logging import MemoryLogHandler from solidlsp.ls_config import Language from solidlsp.util.subprocess_util import subprocess_kwargs log = logging.getLogger(__name__) # --------------------- Utilities ------------------------------------- def _open_in_editor(path: str) -> None: """Open the given file in the system's default editor or viewer.""" editor = os.environ.get("EDITOR") run_kwargs = subprocess_kwargs() try: if editor: subprocess.run([editor, path], check=False, **run_kwargs) elif sys.platform.startswith("win"): try: os.startfile(path) except OSError: subprocess.run(["notepad.exe", path], check=False, **run_kwargs) elif sys.platform == "darwin": subprocess.run(["open", path], check=False, **run_kwargs) else: subprocess.run(["xdg-open", path], check=False, **run_kwargs) except Exception as e: print(f"Failed to open {path}: {e}") class ProjectType(click.ParamType): """ParamType allowing either a project name or a path to a project directory.""" name = "[PROJECT_NAME|PROJECT_PATH]" def convert(self, value: str, param: Any, ctx: Any) -> str: path = Path(value).resolve() if path.exists() and path.is_dir(): return str(path) return value PROJECT_TYPE = ProjectType() class AutoRegisteringGroup(click.Group): """ A click.Group subclass that automatically registers any click.Command attributes defined on the class into the group. After initialization, it inspects its own class for attributes that are instances of click.Command (typically created via @click.command) and calls self.add_command(cmd) on each. This lets you define your commands as static methods on the subclass for IDE-friendly organization without manual registration. """ def __init__(self, name: str, help: str): super().__init__(name=name, help=help) # Scan class attributes for click.Command instances and register them. for attr in dir(self.__class__): cmd = getattr(self.__class__, attr) if isinstance(cmd, click.Command): self.add_command(cmd) class TopLevelCommands(AutoRegisteringGroup): """Root CLI group containing the core Serena commands.""" def __init__(self) -> None: super().__init__(name="serena", help="Serena CLI commands. You can run `<command> --help` for more info on each command.") @staticmethod @click.command("start-mcp-server", help="Starts the Serena MCP server.") @click.option("--project", "project", type=PROJECT_TYPE, default=None, help="Path or name of project to activate at startup.") @click.option("--project-file", "project", type=PROJECT_TYPE, default=None, help="[DEPRECATED] Use --project instead.") @click.argument("project_file_arg", type=PROJECT_TYPE, required=False, default=None, metavar="") @click.option( "--context", type=str, default=DEFAULT_CONTEXT, show_default=True, help="Built-in context name or path to custom context YAML." ) @click.option( "--mode", "modes", type=str, multiple=True, default=DEFAULT_MODES, show_default=True, help="Built-in mode names or paths to custom mode YAMLs.", ) @click.option( "--transport", type=click.Choice(["stdio", "sse", "streamable-http"]), default="stdio", show_default=True, help="Transport protocol.", ) @click.option("--host", type=str, default="0.0.0.0", show_default=True) @click.option("--port", type=int, default=8000, show_default=True) @click.option("--enable-web-dashboard", type=bool, is_flag=False, default=None, help="Override dashboard setting in config.") @click.option("--enable-gui-log-window", type=bool, is_flag=False, default=None, help="Override GUI log window setting in config.") @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), default=None, help="Override log level in config.", ) @click.option("--trace-lsp-communication", type=bool, is_flag=False, default=None, help="Whether to trace LSP communication.") @click.option("--tool-timeout", type=float, default=None, help="Override tool execution timeout in config.") def start_mcp_server( project: str | None, project_file_arg: str | None, context: str, modes: tuple[str, ...], transport: Literal["stdio", "sse", "streamable-http"], host: str, port: int, enable_web_dashboard: bool | None, enable_gui_log_window: bool | None, log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] | None, trace_lsp_communication: bool | None, tool_timeout: float | None, ) -> None: # initialize logging, using INFO level initially (will later be adjusted by SerenaAgent according to the config) # * memory log handler (for use by GUI/Dashboard) # * stream handler for stderr (for direct console output, which will also be captured by clients like Claude Desktop) # * file handler # (Note that stdout must never be used for logging, as it is used by the MCP server to communicate with the client.) Logger.root.setLevel(logging.INFO) formatter = logging.Formatter(SERENA_LOG_FORMAT) memory_log_handler = MemoryLogHandler() Logger.root.addHandler(memory_log_handler) stderr_handler = logging.StreamHandler(stream=sys.stderr) stderr_handler.formatter = formatter Logger.root.addHandler(stderr_handler) log_path = SerenaPaths().get_next_log_file_path("mcp") file_handler = logging.FileHandler(log_path, mode="w") file_handler.formatter = formatter Logger.root.addHandler(file_handler) log.info("Initializing Serena MCP server") log.info("Storing logs in %s", log_path) project_file = project_file_arg or project factory = SerenaMCPFactorySingleProcess(context=context, project=project_file, memory_log_handler=memory_log_handler) server = factory.create_mcp_server( host=host, port=port, modes=modes, enable_web_dashboard=enable_web_dashboard, enable_gui_log_window=enable_gui_log_window, log_level=log_level, trace_lsp_communication=trace_lsp_communication, tool_timeout=tool_timeout, ) if project_file_arg: log.warning( "Positional project arg is deprecated; use --project instead. Used: %s", project_file, ) log.info("Starting MCP server …") server.run(transport=transport) @staticmethod @click.command("print-system-prompt", help="Print the system prompt for a project.") @click.argument("project", type=click.Path(exists=True), default=os.getcwd(), required=False) @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), default="WARNING", help="Log level for prompt generation.", ) @click.option("--only-instructions", is_flag=True, help="Print only the initial instructions, without prefix/postfix.") @click.option( "--context", type=str, default=DEFAULT_CONTEXT, show_default=True, help="Built-in context name or path to custom context YAML." ) @click.option( "--mode", "modes", type=str, multiple=True, default=DEFAULT_MODES, show_default=True, help="Built-in mode names or paths to custom mode YAMLs.", ) def print_system_prompt(project: str, log_level: str, only_instructions: bool, context: str, modes: tuple[str, ...]) -> None: prefix = "You will receive access to Serena's symbolic tools. Below are instructions for using them, take them into account." postfix = "You begin by acknowledging that you understood the above instructions and are ready to receive tasks." from serena.tools.workflow_tools import InitialInstructionsTool lvl = logging.getLevelNamesMapping()[log_level.upper()] logging.configure(level=lvl) context_instance = SerenaAgentContext.load(context) mode_instances = [SerenaAgentMode.load(mode) for mode in modes] agent = SerenaAgent( project=os.path.abspath(project), serena_config=SerenaConfig(web_dashboard=False, log_level=lvl), context=context_instance, modes=mode_instances, ) tool = agent.get_tool(InitialInstructionsTool) instr = tool.apply() if only_instructions: print(instr) else: print(f"{prefix}\n{instr}\n{postfix}") class ModeCommands(AutoRegisteringGroup): """Group for 'mode' subcommands.""" def __init__(self) -> None: super().__init__(name="mode", help="Manage Serena modes. You can run `mode <command> --help` for more info on each command.") @staticmethod @click.command("list", help="List available modes.") def list() -> None: mode_names = SerenaAgentMode.list_registered_mode_names() max_len_name = max(len(name) for name in mode_names) if mode_names else 20 for name in mode_names: mode_yml_path = SerenaAgentMode.get_path(name) is_internal = Path(mode_yml_path).is_relative_to(SERENAS_OWN_MODE_YAMLS_DIR) descriptor = "(internal)" if is_internal else f"(at {mode_yml_path})" name_descr_string = f"{name:<{max_len_name + 4}}{descriptor}" click.echo(name_descr_string) @staticmethod @click.command("create", help="Create a new mode or copy an internal one.") @click.option( "--name", "-n", type=str, default=None, help="Name for the new mode. If --from-internal is passed may be left empty to create a mode of the same name, which will then override the internal mode.", ) @click.option("--from-internal", "from_internal", type=str, default=None, help="Copy from an internal mode.") def create(name: str, from_internal: str) -> None: if not (name or from_internal): raise click.UsageError("Provide at least one of --name or --from-internal.") mode_name = name or from_internal dest = os.path.join(USER_MODE_YAMLS_DIR, f"{mode_name}.yml") src = ( os.path.join(SERENAS_OWN_MODE_YAMLS_DIR, f"{from_internal}.yml") if from_internal else os.path.join(SERENAS_OWN_MODE_YAMLS_DIR, "mode.template.yml") ) if not os.path.exists(src): raise FileNotFoundError( f"Internal mode '{from_internal}' not found in {SERENAS_OWN_MODE_YAMLS_DIR}. Available modes: {SerenaAgentMode.list_registered_mode_names()}" ) os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.copyfile(src, dest) click.echo(f"Created mode '{mode_name}' at {dest}") _open_in_editor(dest) @staticmethod @click.command("edit", help="Edit a custom mode YAML file.") @click.argument("mode_name") def edit(mode_name: str) -> None: path = os.path.join(USER_MODE_YAMLS_DIR, f"{mode_name}.yml") if not os.path.exists(path): if mode_name in SerenaAgentMode.list_registered_mode_names(include_user_modes=False): click.echo( f"Mode '{mode_name}' is an internal mode and cannot be edited directly. " f"Use 'mode create --from-internal {mode_name}' to create a custom mode that overrides it before editing." ) else: click.echo(f"Custom mode '{mode_name}' not found. Create it with: mode create --name {mode_name}.") return _open_in_editor(path) @staticmethod @click.command("delete", help="Delete a custom mode file.") @click.argument("mode_name") def delete(mode_name: str) -> None: path = os.path.join(USER_MODE_YAMLS_DIR, f"{mode_name}.yml") if not os.path.exists(path): click.echo(f"Custom mode '{mode_name}' not found.") return os.remove(path) click.echo(f"Deleted custom mode '{mode_name}'.") class ContextCommands(AutoRegisteringGroup): """Group for 'context' subcommands.""" def __init__(self) -> None: super().__init__( name="context", help="Manage Serena contexts. You can run `context <command> --help` for more info on each command." ) @staticmethod @click.command("list", help="List available contexts.") def list() -> None: context_names = SerenaAgentContext.list_registered_context_names() max_len_name = max(len(name) for name in context_names) if context_names else 20 for name in context_names: context_yml_path = SerenaAgentContext.get_path(name) is_internal = Path(context_yml_path).is_relative_to(SERENAS_OWN_CONTEXT_YAMLS_DIR) descriptor = "(internal)" if is_internal else f"(at {context_yml_path})" name_descr_string = f"{name:<{max_len_name + 4}}{descriptor}" click.echo(name_descr_string) @staticmethod @click.command("create", help="Create a new context or copy an internal one.") @click.option( "--name", "-n", type=str, default=None, help="Name for the new context. If --from-internal is passed may be left empty to create a context of the same name, which will then override the internal context", ) @click.option("--from-internal", "from_internal", type=str, default=None, help="Copy from an internal context.") def create(name: str, from_internal: str) -> None: if not (name or from_internal): raise click.UsageError("Provide at least one of --name or --from-internal.") ctx_name = name or from_internal dest = os.path.join(USER_CONTEXT_YAMLS_DIR, f"{ctx_name}.yml") src = ( os.path.join(SERENAS_OWN_CONTEXT_YAMLS_DIR, f"{from_internal}.yml") if from_internal else os.path.join(SERENAS_OWN_CONTEXT_YAMLS_DIR, "context.template.yml") ) if not os.path.exists(src): raise FileNotFoundError( f"Internal context '{from_internal}' not found in {SERENAS_OWN_CONTEXT_YAMLS_DIR}. Available contexts: {SerenaAgentContext.list_registered_context_names()}" ) os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.copyfile(src, dest) click.echo(f"Created context '{ctx_name}' at {dest}") _open_in_editor(dest) @staticmethod @click.command("edit", help="Edit a custom context YAML file.") @click.argument("context_name") def edit(context_name: str) -> None: path = os.path.join(USER_CONTEXT_YAMLS_DIR, f"{context_name}.yml") if not os.path.exists(path): if context_name in SerenaAgentContext.list_registered_context_names(include_user_contexts=False): click.echo( f"Context '{context_name}' is an internal context and cannot be edited directly. " f"Use 'context create --from-internal {context_name}' to create a custom context that overrides it before editing." ) else: click.echo(f"Custom context '{context_name}' not found. Create it with: context create --name {context_name}.") return _open_in_editor(path) @staticmethod @click.command("delete", help="Delete a custom context file.") @click.argument("context_name") def delete(context_name: str) -> None: path = os.path.join(USER_CONTEXT_YAMLS_DIR, f"{context_name}.yml") if not os.path.exists(path): click.echo(f"Custom context '{context_name}' not found.") return os.remove(path) click.echo(f"Deleted custom context '{context_name}'.") class SerenaConfigCommands(AutoRegisteringGroup): """Group for 'config' subcommands.""" def __init__(self) -> None: super().__init__(name="config", help="Manage Serena configuration.") @staticmethod @click.command( "edit", help="Edit serena_config.yml in your default editor. Will create a config file from the template if no config is found." ) def edit() -> None: config_path = os.path.join(SERENA_MANAGED_DIR_IN_HOME, "serena_config.yml") if not os.path.exists(config_path): SerenaConfig.generate_config_file(config_path) _open_in_editor(config_path) class ProjectCommands(AutoRegisteringGroup): """Group for 'project' subcommands.""" def __init__(self) -> None: super().__init__( name="project", help="Manage Serena projects. You can run `project <command> --help` for more info on each command." ) @staticmethod @click.command("generate-yml", help="Generate a project.yml file.") @click.argument("project_path", type=click.Path(exists=True, file_okay=False), default=os.getcwd()) @click.option("--language", type=str, default=None, help="Programming language; inferred if not specified.") def generate_yml(project_path: str, language: str | None = None) -> None: yml_path = os.path.join(project_path, ProjectConfig.rel_path_to_project_yml()) if os.path.exists(yml_path): raise FileExistsError(f"Project file {yml_path} already exists.") lang_inst = None if language: try: lang_inst = Language[language.upper()] except KeyError: all_langs = [l.name.lower() for l in Language.iter_all(include_experimental=True)] raise ValueError(f"Unknown language '{language}'. Supported: {all_langs}") generated_conf = ProjectConfig.autogenerate(project_root=project_path, project_language=lang_inst) print(f"Generated project.yml with language {generated_conf.language.value} at {yml_path}.") @staticmethod @click.command("index", help="Index a project by saving symbols to the LSP cache.") @click.argument("project", type=click.Path(exists=True), default=os.getcwd(), required=False) @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), default="WARNING", help="Log level for indexing.", ) @click.option("--timeout", type=float, default=10, help="Timeout for indexing a single file.") def index(project: str, log_level: str, timeout: float) -> None: ProjectCommands._index_project(project, log_level, timeout=timeout) @staticmethod @click.command("index-deprecated", help="Deprecated alias for 'serena project index'.") @click.argument("project", type=click.Path(exists=True), default=os.getcwd(), required=False) @click.option("--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), default="WARNING") @click.option("--timeout", type=float, default=10, help="Timeout for indexing a single file.") def index_deprecated(project: str, log_level: str, timeout: float) -> None: click.echo("Deprecated! Use `serena project index` instead.") ProjectCommands._index_project(project, log_level, timeout=timeout) @staticmethod def _index_project(project: str, log_level: str, timeout: float) -> None: lvl = logging.getLevelNamesMapping()[log_level.upper()] logging.configure(level=lvl) serena_config = SerenaConfig.from_config_file() proj = Project.load(os.path.abspath(project)) click.echo(f"Indexing symbols in project {project}…") ls = proj.create_language_server(log_level=lvl, ls_timeout=timeout, ls_specific_settings=serena_config.ls_specific_settings) log_file = os.path.join(project, ".serena", "logs", "indexing.txt") collected_exceptions: list[Exception] = [] files_failed = [] with ls.start_server(): files = proj.gather_source_files() for i, f in enumerate(tqdm(files, desc="Indexing")): try: ls.request_document_symbols(f, include_body=False) ls.request_document_symbols(f, include_body=True) except Exception as e: log.error(f"Failed to index {f}, continuing.") collected_exceptions.append(e) files_failed.append(f) if (i + 1) % 10 == 0: ls.save_cache() ls.save_cache() click.echo(f"Symbols saved to {ls.cache_path}") if len(files_failed) > 0: os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, "w") as f: for file, exception in zip(files_failed, collected_exceptions, strict=True): f.write(f"{file}\n") f.write(f"{exception}\n") click.echo(f"Failed to index {len(files_failed)} files, see:\n{log_file}") @staticmethod @click.command("is_ignored_path", help="Check if a path is ignored by the project configuration.") @click.argument("path", type=click.Path(exists=False, file_okay=True, dir_okay=True)) @click.argument("project", type=click.Path(exists=True, file_okay=False, dir_okay=True), default=os.getcwd()) def is_ignored_path(path: str, project: str) -> None: """ Check if a given path is ignored by the project configuration. :param path: The path to check. :param project: The path to the project directory, defaults to the current working directory. """ proj = Project.load(os.path.abspath(project)) if os.path.isabs(path): path = os.path.relpath(path, start=proj.project_root) is_ignored = proj.is_ignored_path(path) click.echo(f"Path '{path}' IS {'ignored' if is_ignored else 'IS NOT ignored'} by the project configuration.") @staticmethod @click.command("index-file", help="Index a single file by saving its symbols to the LSP cache.") @click.argument("file", type=click.Path(exists=True, file_okay=True, dir_okay=False)) @click.argument("project", type=click.Path(exists=True, file_okay=False, dir_okay=True), default=os.getcwd()) @click.option("--verbose", "-v", is_flag=True, help="Print detailed information about the indexed symbols.") def index_file(file: str, project: str, verbose: bool) -> None: """ Index a single file by saving its symbols to the LSP cache, useful for debugging. :param file: path to the file to index, must be inside the project directory. :param project: path to the project directory, defaults to the current working directory. :param verbose: if set, prints detailed information about the indexed symbols. """ proj = Project.load(os.path.abspath(project)) if os.path.isabs(file): file = os.path.relpath(file, start=proj.project_root) if proj.is_ignored_path(file, ignore_non_source_files=True): click.echo(f"'{file}' is ignored or declared as non-code file by the project configuration, won't index.") exit(1) ls = proj.create_language_server() with ls.start_server(): symbols, _ = ls.request_document_symbols(file, include_body=False) ls.request_document_symbols(file, include_body=True) if verbose: click.echo(f"Symbols in file '{file}':") for symbol in symbols: click.echo(f" - {symbol['name']} at line {symbol['selectionRange']['start']['line']} of kind {symbol['kind']}") ls.save_cache() click.echo(f"Successfully indexed file '{file}', {len(symbols)} symbols saved to {ls.cache_path}.") @staticmethod @click.command("health-check", help="Perform a comprehensive health check of the project's tools and language server.") @click.argument("project", type=click.Path(exists=True, file_okay=False, dir_okay=True), default=os.getcwd()) def health_check(project: str) -> None: """ Perform a comprehensive health check of the project's tools and language server. :param project: path to the project directory, defaults to the current working directory. """ # NOTE: completely written by Claude Code, only functionality was reviewed, not implementation logging.configure(level=logging.INFO) project_path = os.path.abspath(project) proj = Project.load(project_path) # Create log file with timestamp timestamp = datetime_tag() log_dir = os.path.join(project_path, ".serena", "logs", "health-checks") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"health_check_{timestamp}.log") with FileLoggerContext(log_file, append=False, enabled=True): log.info("Starting health check for project: %s", project_path) try: # Create SerenaAgent with dashboard disabled log.info("Creating SerenaAgent with disabled dashboard...") config = SerenaConfig(gui_log_window_enabled=False, web_dashboard=False) agent = SerenaAgent(project=project_path, serena_config=config) log.info("SerenaAgent created successfully") # Find first non-empty file that can be analyzed log.info("Searching for analyzable files...") files = proj.gather_source_files() target_file = None for file_path in files: try: full_path = os.path.join(project_path, file_path) if os.path.getsize(full_path) > 0: target_file = file_path log.info("Found analyzable file: %s", target_file) break except (OSError, FileNotFoundError): continue if not target_file: log.error("No analyzable files found in project") click.echo("❌ Health check failed: No analyzable files found") click.echo(f"Log saved to: {log_file}") return # Get tools from agent overview_tool = agent.get_tool(GetSymbolsOverviewTool) find_symbol_tool = agent.get_tool(FindSymbolTool) find_refs_tool = agent.get_tool(FindReferencingSymbolsTool) search_pattern_tool = agent.get_tool(SearchForPatternTool) # Test 1: Get symbols overview log.info("Testing GetSymbolsOverviewTool on file: %s", target_file) overview_result = agent.execute_task(lambda: overview_tool.apply(target_file)) overview_data = json.loads(overview_result) log.info("GetSymbolsOverviewTool returned %d symbols", len(overview_data)) if not overview_data: log.error("No symbols found in file %s", target_file) click.echo("❌ Health check failed: No symbols found in target file") click.echo(f"Log saved to: {log_file}") return # Extract suitable symbol (prefer class or function over variables) # LSP symbol kinds: 5=class, 12=function, 6=method, 9=constructor preferred_kinds = [5, 12, 6, 9] # class, function, method, constructor selected_symbol = None for symbol in overview_data: if symbol.get("kind") in preferred_kinds: selected_symbol = symbol break # If no preferred symbol found, use first available if not selected_symbol: selected_symbol = overview_data[0] log.info("No class or function found, using first available symbol") symbol_name = selected_symbol.get("name_path", "unknown") symbol_kind = selected_symbol.get("kind", "unknown") log.info("Using symbol for testing: %s (kind: %d)", symbol_name, symbol_kind) # Test 2: FindSymbolTool log.info("Testing FindSymbolTool for symbol: %s", symbol_name) find_symbol_result = agent.execute_task( lambda: find_symbol_tool.apply(symbol_name, relative_path=target_file, include_body=True) ) find_symbol_data = json.loads(find_symbol_result) log.info("FindSymbolTool found %d matches for symbol %s", len(find_symbol_data), symbol_name) # Test 3: FindReferencingSymbolsTool log.info("Testing FindReferencingSymbolsTool for symbol: %s", symbol_name) try: find_refs_result = agent.execute_task(lambda: find_refs_tool.apply(symbol_name, relative_path=target_file)) find_refs_data = json.loads(find_refs_result) log.info("FindReferencingSymbolsTool found %d references for symbol %s", len(find_refs_data), symbol_name) except Exception as e: log.warning("FindReferencingSymbolsTool failed for symbol %s: %s", symbol_name, str(e)) find_refs_data = [] # Test 4: SearchForPatternTool to verify references log.info("Testing SearchForPatternTool for pattern: %s", symbol_name) try: search_result = agent.execute_task( lambda: search_pattern_tool.apply(substring_pattern=symbol_name, restrict_search_to_code_files=True) ) search_data = json.loads(search_result) pattern_matches = sum(len(matches) for matches in search_data.values()) log.info("SearchForPatternTool found %d pattern matches for %s", pattern_matches, symbol_name) except Exception as e: log.warning("SearchForPatternTool failed for pattern %s: %s", symbol_name, str(e)) pattern_matches = 0 # Verify tools worked as expected tools_working = True if not find_symbol_data: log.error("FindSymbolTool returned no results") tools_working = False if len(find_refs_data) == 0 and pattern_matches == 0: log.warning("Both FindReferencingSymbolsTool and SearchForPatternTool found no matches - this might indicate an issue") log.info("Health check completed successfully") if tools_working: click.echo("✅ Health check passed - All tools working correctly") else: click.echo("⚠️ Health check completed with warnings - Check log for details") except Exception as e: log.exception("Health check failed with exception: %s", str(e)) click.echo(f"❌ Health check failed: {e!s}") finally: click.echo(f"Log saved to: {log_file}") class ToolCommands(AutoRegisteringGroup): """Group for 'tool' subcommands.""" def __init__(self) -> None: super().__init__( name="tools", help="Commands related to Serena's tools. You can run `serena tools <command> --help` for more info on each command.", ) @staticmethod @click.command( "list", help="Prints an overview of the tools that are active by default (not just the active ones for your project). For viewing all tools, pass `--all / -a`", ) @click.option("--quiet", "-q", is_flag=True) @click.option("--all", "-a", "include_optional", is_flag=True, help="List all tools, including those not enabled by default.") @click.option("--only-optional", is_flag=True, help="List only optional tools (those not enabled by default).") def list(quiet: bool = False, include_optional: bool = False, only_optional: bool = False) -> None: tool_registry = ToolRegistry() if quiet: if only_optional: tool_names = tool_registry.get_tool_names_optional() elif include_optional: tool_names = tool_registry.get_tool_names() else: tool_names = tool_registry.get_tool_names_default_enabled() for tool_name in tool_names: click.echo(tool_name) else: ToolRegistry().print_tool_overview(include_optional=include_optional, only_optional=only_optional) @staticmethod @click.command( "description", help="Print the description of a tool, optionally with a specific context (the latter may modify the default description).", ) @click.argument("tool_name", type=str) @click.option("--context", type=str, default=None, help="Context name or path to context file.") def description(tool_name: str, context: str | None = None) -> None: # Load the context serena_context = None if context: serena_context = SerenaAgentContext.load(context) agent = SerenaAgent( project=None, serena_config=SerenaConfig(web_dashboard=False, log_level=logging.INFO), context=serena_context, ) tool = agent.get_tool_by_name(tool_name) mcp_tool = SerenaMCPFactory.make_mcp_tool(tool) click.echo(mcp_tool.description) class PromptCommands(AutoRegisteringGroup): def __init__(self) -> None: super().__init__(name="prompts", help="Commands related to Serena's prompts that are outside of contexts and modes.") @staticmethod def _get_user_prompt_yaml_path(prompt_yaml_name: str) -> str: os.makedirs(PROMPT_TEMPLATES_DIR_IN_USER_HOME, exist_ok=True) return os.path.join(PROMPT_TEMPLATES_DIR_IN_USER_HOME, prompt_yaml_name) @staticmethod @click.command("list", help="Lists yamls that are used for defining prompts.") def list() -> None: serena_prompt_yaml_names = [os.path.basename(f) for f in glob.glob(PROMPT_TEMPLATES_DIR_INTERNAL + "/*.yml")] for prompt_yaml_name in serena_prompt_yaml_names: user_prompt_yaml_path = PromptCommands._get_user_prompt_yaml_path(prompt_yaml_name) if os.path.exists(user_prompt_yaml_path): click.echo(f"{user_prompt_yaml_path} merged with default prompts in {prompt_yaml_name}") else: click.echo(prompt_yaml_name) @staticmethod @click.command("create-override", help="Create an override of an internal prompts yaml for customizing Serena's prompts") @click.argument("prompt_yaml_name") def create_override(prompt_yaml_name: str) -> None: """ :param prompt_yaml_name: The yaml name of the prompt you want to override. Call the `list` command for discovering valid prompt yaml names. :return: """ # for convenience, we can pass names without .yml if not prompt_yaml_name.endswith(".yml"): prompt_yaml_name = prompt_yaml_name + ".yml" user_prompt_yaml_path = PromptCommands._get_user_prompt_yaml_path(prompt_yaml_name) if os.path.exists(user_prompt_yaml_path): raise FileExistsError(f"{user_prompt_yaml_path} already exists.") serena_prompt_yaml_path = os.path.join(PROMPT_TEMPLATES_DIR_INTERNAL, prompt_yaml_name) shutil.copyfile(serena_prompt_yaml_path, user_prompt_yaml_path) _open_in_editor(user_prompt_yaml_path) @staticmethod @click.command("edit-override", help="Edit an existing prompt override file") @click.argument("prompt_yaml_name") def edit_override(prompt_yaml_name: str) -> None: """ :param prompt_yaml_name: The yaml name of the prompt override to edit. :return: """ # for convenience, we can pass names without .yml if not prompt_yaml_name.endswith(".yml"): prompt_yaml_name = prompt_yaml_name + ".yml" user_prompt_yaml_path = PromptCommands._get_user_prompt_yaml_path(prompt_yaml_name) if not os.path.exists(user_prompt_yaml_path): click.echo(f"Override file '{prompt_yaml_name}' not found. Create it with: prompts create-override {prompt_yaml_name}") return _open_in_editor(user_prompt_yaml_path) @staticmethod @click.command("list-overrides", help="List existing prompt override files") def list_overrides() -> None: os.makedirs(PROMPT_TEMPLATES_DIR_IN_USER_HOME, exist_ok=True) serena_prompt_yaml_names = [os.path.basename(f) for f in glob.glob(PROMPT_TEMPLATES_DIR_INTERNAL + "/*.yml")] override_files = glob.glob(os.path.join(PROMPT_TEMPLATES_DIR_IN_USER_HOME, "*.yml")) for file_path in override_files: if os.path.basename(file_path) in serena_prompt_yaml_names: click.echo(file_path) @staticmethod @click.command("delete-override", help="Delete a prompt override file") @click.argument("prompt_yaml_name") def delete_override(prompt_yaml_name: str) -> None: """ :param prompt_yaml_name: The yaml name of the prompt override to delete." :return: """ # for convenience, we can pass names without .yml if not prompt_yaml_name.endswith(".yml"): prompt_yaml_name = prompt_yaml_name + ".yml" user_prompt_yaml_path = PromptCommands._get_user_prompt_yaml_path(prompt_yaml_name) if not os.path.exists(user_prompt_yaml_path): click.echo(f"Override file '{prompt_yaml_name}' not found.") return os.remove(user_prompt_yaml_path) click.echo(f"Deleted override file '{prompt_yaml_name}'.") # Expose groups so we can reference them in pyproject.toml mode = ModeCommands() context = ContextCommands() project = ProjectCommands() config = SerenaConfigCommands() tools = ToolCommands() prompts = PromptCommands() # Expose toplevel commands for the same reason top_level = TopLevelCommands() start_mcp_server = top_level.start_mcp_server index_project = project.index_deprecated # needed for the help script to work - register all subcommands to the top-level group for subgroup in (mode, context, project, config, tools, prompts): top_level.add_command(subgroup) def get_help() -> str: """Retrieve the help text for the top-level Serena CLI.""" return top_level.get_help(click.Context(top_level, info_name="serena")) ``` -------------------------------------------------------------------------------- /src/solidlsp/language_servers/al_language_server.py: -------------------------------------------------------------------------------- ```python """AL Language Server implementation for Microsoft Dynamics 365 Business Central.""" import logging import os import pathlib import platform import stat import time import zipfile from pathlib import Path import requests from overrides import override from solidlsp.language_servers.common import quote_windows_path from solidlsp.ls import SolidLanguageServer from solidlsp.ls_config import LanguageServerConfig from solidlsp.ls_logger import LanguageServerLogger from solidlsp.lsp_protocol_handler.lsp_types import Definition, DefinitionParams, LocationLink from solidlsp.lsp_protocol_handler.server import ProcessLaunchInfo from solidlsp.settings import SolidLSPSettings class ALLanguageServer(SolidLanguageServer): """ Language server implementation for AL (Microsoft Dynamics 365 Business Central). This implementation uses the AL Language Server from the VS Code AL extension (ms-dynamics-smb.al). The extension must be installed or available locally. Key Features: - Automatic download of AL extension from VS Code marketplace if not present - Platform-specific executable detection (Windows/Linux/macOS) - Special initialization sequence required by AL Language Server - Custom AL-specific LSP commands (al/gotodefinition, al/setActiveWorkspace) - File opening requirement before symbol retrieval """ def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, solidlsp_settings: SolidLSPSettings ): """ Initialize the AL Language Server. Args: config: Language server configuration logger: Logger instance for debugging repository_root_path: Root path of the AL project (must contain app.json) solidlsp_settings: Solid LSP settings Note: The initialization process will automatically: 1. Check for AL extension in the resources directory 2. Download it from VS Code marketplace if not found 3. Extract and configure the platform-specific executable """ # Setup runtime dependencies and get the language server command # This will download the AL extension if needed cmd = self._setup_runtime_dependencies(logger, config, solidlsp_settings) self._project_load_check_supported: bool = True """Whether the AL server supports the project load status check request. Some AL server versions don't support the 'al/hasProjectClosureLoadedRequest' custom LSP request. This flag starts as True and is set to False if the request fails, preventing repeated unsuccessful attempts. """ super().__init__( config, logger, repository_root_path, ProcessLaunchInfo(cmd=cmd, cwd=repository_root_path), "al", # Language ID for LSP solidlsp_settings, ) @classmethod def _download_al_extension(cls, logger: LanguageServerLogger, url: str, target_dir: str) -> bool: """ Download and extract the AL extension from VS Code marketplace. The VS Code marketplace packages extensions as .vsix files (which are ZIP archives). This method downloads the VSIX file and extracts it to get the language server binaries. Args: logger: Logger for tracking download progress url: VS Code marketplace URL for the AL extension target_dir: Directory where the extension will be extracted Returns: True if successful, False otherwise Note: The download includes progress tracking and proper user-agent headers to ensure compatibility with the VS Code marketplace. """ try: logger.log(f"Downloading AL extension from {url}", logging.INFO) # Create target directory for the extension os.makedirs(target_dir, exist_ok=True) # Download with proper headers to mimic VS Code marketplace client # These headers are required for the marketplace to serve the VSIX file headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/octet-stream, application/vsix, */*", } response = requests.get(url, headers=headers, stream=True, timeout=300) response.raise_for_status() # Save to temporary VSIX file (will be deleted after extraction) temp_file = os.path.join(target_dir, "al_extension_temp.vsix") total_size = int(response.headers.get("content-length", 0)) logger.log(f"Downloading {total_size / 1024 / 1024:.1f} MB...", logging.INFO) with open(temp_file, "wb") as f: downloaded = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0 and downloaded % (10 * 1024 * 1024) == 0: # Log progress every 10MB progress = (downloaded / total_size) * 100 logger.log(f"Download progress: {progress:.1f}%", logging.INFO) logger.log("Download complete, extracting...", logging.INFO) # Extract VSIX file (VSIX files are just ZIP archives with a different extension) # This will extract the extension folder containing the language server binaries with zipfile.ZipFile(temp_file, "r") as zip_ref: zip_ref.extractall(target_dir) # Clean up temp file os.remove(temp_file) logger.log("AL extension extracted successfully", logging.INFO) return True except Exception as e: logger.log(f"Error downloading/extracting AL extension: {e}", logging.ERROR) return False @classmethod def _setup_runtime_dependencies( cls, logger: LanguageServerLogger, config: LanguageServerConfig, solidlsp_settings: SolidLSPSettings ) -> str: """ Setup runtime dependencies for AL Language Server and return the command to start the server. This method handles the complete setup process: 1. Checks for existing AL extension installations 2. Downloads from VS Code marketplace if not found 3. Configures executable permissions on Unix systems 4. Returns the properly formatted command string The AL Language Server executable is located in different paths based on the platform: - Windows: bin/win32/Microsoft.Dynamics.Nav.EditorServices.Host.exe - Linux: bin/linux/Microsoft.Dynamics.Nav.EditorServices.Host - macOS: bin/darwin/Microsoft.Dynamics.Nav.EditorServices.Host """ system = platform.system() # Find existing extension or download if needed extension_path = cls._find_al_extension(logger, solidlsp_settings) if extension_path is None: logger.log("AL extension not found on disk, attempting to download...", logging.INFO) extension_path = cls._download_and_install_al_extension(logger, solidlsp_settings) if extension_path is None: raise RuntimeError( "Failed to locate or download AL Language Server. Please either:\n" "1. Set AL_EXTENSION_PATH environment variable to the AL extension directory\n" "2. Install the AL extension in VS Code (ms-dynamics-smb.al)\n" "3. Ensure internet connection for automatic download" ) # Build executable path based on platform executable_path = cls._get_executable_path(extension_path, system) if not os.path.exists(executable_path): raise RuntimeError(f"AL Language Server executable not found at: {executable_path}") # Prepare and return the executable command return cls._prepare_executable(executable_path, system, logger) @classmethod def _find_al_extension(cls, logger: LanguageServerLogger, solidlsp_settings: SolidLSPSettings) -> str | None: """ Find AL extension in various locations. Search order: 1. Environment variable (AL_EXTENSION_PATH) 2. Default download location (~/.serena/ls_resources/al-extension) 3. VS Code installed extensions Returns: Path to AL extension directory or None if not found """ # Check environment variable env_path = os.environ.get("AL_EXTENSION_PATH") if env_path and os.path.exists(env_path): logger.log(f"Found AL extension via AL_EXTENSION_PATH: {env_path}", logging.DEBUG) return env_path elif env_path: logger.log(f"AL_EXTENSION_PATH set but directory not found: {env_path}", logging.WARNING) # Check default download location default_path = os.path.join(cls.ls_resources_dir(solidlsp_settings), "al-extension", "extension") if os.path.exists(default_path): logger.log(f"Found AL extension in default location: {default_path}", logging.DEBUG) return default_path # Search VS Code extensions vscode_path = cls._find_al_extension_in_vscode(logger) if vscode_path: logger.log(f"Found AL extension in VS Code: {vscode_path}", logging.DEBUG) return vscode_path logger.log("AL extension not found in any known location", logging.DEBUG) return None @classmethod def _download_and_install_al_extension(cls, logger: LanguageServerLogger, solidlsp_settings: SolidLSPSettings) -> str | None: """ Download and install AL extension from VS Code marketplace. Returns: Path to installed extension or None if download failed """ al_extension_dir = os.path.join(cls.ls_resources_dir(solidlsp_settings), "al-extension") # AL extension version - using latest stable version AL_VERSION = "latest" url = f"https://marketplace.visualstudio.com/_apis/public/gallery/publishers/ms-dynamics-smb/vsextensions/al/{AL_VERSION}/vspackage" logger.log(f"Downloading AL extension from: {url}", logging.INFO) if cls._download_al_extension(logger, url, al_extension_dir): extension_path = os.path.join(al_extension_dir, "extension") if os.path.exists(extension_path): logger.log("AL extension downloaded and installed successfully", logging.INFO) return extension_path else: logger.log(f"Download completed but extension not found at: {extension_path}", logging.ERROR) else: logger.log("Failed to download AL extension from marketplace", logging.ERROR) return None @classmethod def _get_executable_path(cls, extension_path: str, system: str) -> str: """ Build platform-specific executable path. Args: extension_path: Path to AL extension directory system: Operating system name Returns: Full path to executable """ if system == "Windows": return os.path.join(extension_path, "bin", "win32", "Microsoft.Dynamics.Nav.EditorServices.Host.exe") elif system == "Linux": return os.path.join(extension_path, "bin", "linux", "Microsoft.Dynamics.Nav.EditorServices.Host") elif system == "Darwin": return os.path.join(extension_path, "bin", "darwin", "Microsoft.Dynamics.Nav.EditorServices.Host") else: raise RuntimeError(f"Unsupported platform: {system}") @classmethod def _prepare_executable(cls, executable_path: str, system: str, logger: LanguageServerLogger) -> str: """ Prepare the executable by setting permissions and handling path quoting. Args: executable_path: Path to the executable system: Operating system name logger: Logger instance Returns: Properly formatted command string """ # Make sure executable has proper permissions on Unix-like systems if system in ["Linux", "Darwin"]: st = os.stat(executable_path) os.chmod(executable_path, st.st_mode | stat.S_IEXEC) logger.log(f"Set execute permission on: {executable_path}", logging.DEBUG) logger.log(f"Using AL Language Server executable: {executable_path}", logging.INFO) # The AL Language Server uses stdio for LSP communication by default # Use the utility function to handle Windows path quoting return quote_windows_path(executable_path) @classmethod def _get_language_server_command_fallback(cls, logger: LanguageServerLogger) -> str: """ Get the command to start the AL language server. Returns: Command string to launch the AL language server Raises: RuntimeError: If AL extension cannot be found """ # Check if AL extension path is configured via environment variable al_extension_path = os.environ.get("AL_EXTENSION_PATH") if not al_extension_path: # Try to find the extension in the current working directory # (for development/testing when extension is in the serena repo) cwd_path = Path.cwd() potential_extension = None # Look for ms-dynamics-smb.al-* directories for item in cwd_path.iterdir(): if item.is_dir() and item.name.startswith("ms-dynamics-smb.al-"): potential_extension = item break if potential_extension: al_extension_path = str(potential_extension) logger.log(f"Found AL extension in current directory: {al_extension_path}", logging.DEBUG) else: # Try to find in common VS Code extension locations al_extension_path = cls._find_al_extension_in_vscode(logger) if not al_extension_path: raise RuntimeError( "AL Language Server not found. Please either:\n" "1. Set AL_EXTENSION_PATH environment variable to the VS Code AL extension directory\n" "2. Install the AL extension in VS Code (ms-dynamics-smb.al)\n" "3. Place the extension directory in the current working directory" ) # Determine platform-specific executable system = platform.system() if system == "Windows": executable = os.path.join(al_extension_path, "bin", "win32", "Microsoft.Dynamics.Nav.EditorServices.Host.exe") elif system == "Linux": executable = os.path.join(al_extension_path, "bin", "linux", "Microsoft.Dynamics.Nav.EditorServices.Host") elif system == "Darwin": executable = os.path.join(al_extension_path, "bin", "darwin", "Microsoft.Dynamics.Nav.EditorServices.Host") else: raise RuntimeError(f"Unsupported platform: {system}") # Verify executable exists if not os.path.exists(executable): raise RuntimeError( f"AL Language Server executable not found at: {executable}\nPlease ensure the AL extension is properly installed." ) # Make sure executable has proper permissions on Unix-like systems if system in ["Linux", "Darwin"]: st = os.stat(executable) os.chmod(executable, st.st_mode | stat.S_IEXEC) logger.log(f"Using AL Language Server executable: {executable}", logging.INFO) # The AL Language Server uses stdio for LSP communication (no --stdio flag needed) # Use the utility function to handle Windows path quoting return quote_windows_path(executable) @classmethod def _find_al_extension_in_vscode(cls, logger: LanguageServerLogger) -> str | None: """ Try to find AL extension in common VS Code extension locations. Returns: Path to AL extension directory or None if not found """ home = Path.home() possible_paths = [] # Common VS Code extension paths if platform.system() == "Windows": possible_paths.extend( [ home / ".vscode" / "extensions", home / ".vscode-insiders" / "extensions", Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "extensions", Path(os.environ.get("APPDATA", "")) / "Code - Insiders" / "User" / "extensions", ] ) else: possible_paths.extend( [ home / ".vscode" / "extensions", home / ".vscode-server" / "extensions", home / ".vscode-insiders" / "extensions", ] ) for base_path in possible_paths: if base_path.exists(): logger.log(f"Searching for AL extension in: {base_path}", logging.DEBUG) # Look for AL extension directories for item in base_path.iterdir(): if item.is_dir() and item.name.startswith("ms-dynamics-smb.al-"): logger.log(f"Found AL extension at: {item}", logging.DEBUG) return str(item) return None @staticmethod def _get_initialize_params(repository_absolute_path: str) -> dict: """ Returns the initialize params for the AL Language Server. """ # Ensure we have an absolute path for URI generation repository_path = pathlib.Path(repository_absolute_path).resolve() root_uri = repository_path.as_uri() # AL requires extensive capabilities based on VS Code trace initialize_params = { "processId": os.getpid(), "rootPath": str(repository_path), "rootUri": root_uri, "capabilities": { "workspace": { "applyEdit": True, "workspaceEdit": { "documentChanges": True, "resourceOperations": ["create", "rename", "delete"], "failureHandling": "textOnlyTransactional", "normalizesLineEndings": True, }, "configuration": True, "didChangeWatchedFiles": {"dynamicRegistration": True}, "symbol": {"dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}}, "executeCommand": {"dynamicRegistration": True}, "didChangeConfiguration": {"dynamicRegistration": True}, "workspaceFolders": True, }, "textDocument": { "synchronization": {"dynamicRegistration": True, "willSave": True, "willSaveWaitUntil": True, "didSave": True}, "completion": { "dynamicRegistration": True, "contextSupport": True, "completionItem": { "snippetSupport": True, "commitCharactersSupport": True, "documentationFormat": ["markdown", "plaintext"], "deprecatedSupport": True, "preselectSupport": True, }, }, "hover": {"dynamicRegistration": True, "contentFormat": ["markdown", "plaintext"]}, "definition": {"dynamicRegistration": True, "linkSupport": True}, "references": {"dynamicRegistration": True}, "documentHighlight": {"dynamicRegistration": True}, "documentSymbol": { "dynamicRegistration": True, "symbolKind": {"valueSet": list(range(1, 27))}, "hierarchicalDocumentSymbolSupport": True, }, "codeAction": {"dynamicRegistration": True}, "formatting": {"dynamicRegistration": True}, "rangeFormatting": {"dynamicRegistration": True}, "rename": {"dynamicRegistration": True, "prepareSupport": True}, }, "window": { "showMessage": {"messageActionItem": {"additionalPropertiesSupport": True}}, "showDocument": {"support": True}, "workDoneProgress": True, }, }, "trace": "verbose", "workspaceFolders": [{"uri": root_uri, "name": repository_path.name}], } return initialize_params @override def _start_server(self): """ Starts the AL Language Server process and initializes it. This method sets up custom notification handlers for AL-specific messages before starting the server. The AL server sends various notifications during initialization and project loading that need to be handled. """ # Set up event handlers def do_nothing(params): return def window_log_message(msg): self.logger.log(f"AL LSP: window/logMessage: {msg}", logging.INFO) def publish_diagnostics(params): # AL server publishes diagnostics during initialization uri = params.get("uri", "") diagnostics = params.get("diagnostics", []) self.logger.log(f"AL LSP: Diagnostics for {uri}: {len(diagnostics)} issues", logging.DEBUG) def handle_al_notifications(params): # AL server sends custom notifications during project loading self.logger.log("AL LSP: Notification received", logging.DEBUG) # Register handlers for AL-specific notifications # These notifications are sent by the AL server during initialization and operation self.server.on_notification("window/logMessage", window_log_message) # Server log messages self.server.on_notification("textDocument/publishDiagnostics", publish_diagnostics) # Compilation diagnostics self.server.on_notification("$/progress", do_nothing) # Progress notifications during loading self.server.on_notification("al/refreshExplorerObjects", handle_al_notifications) # AL-specific object updates # Start the server process self.logger.log("Starting AL Language Server process", logging.INFO) self.server.start() # Send initialize request initialize_params = self._get_initialize_params(self.repository_root_path) self.logger.log( "Sending initialize request from LSP client to AL LSP server and awaiting response", logging.INFO, ) # Send initialize and wait for response resp = self.server.send_request("initialize", initialize_params) if resp is None: raise RuntimeError("AL Language Server initialization failed - no response") self.logger.log("AL Language Server initialized successfully", logging.INFO) # Send initialized notification self.server.send_notification("initialized", {}) self.logger.log("Sent initialized notification", logging.INFO) @override def start(self) -> "ALLanguageServer": """ Start the AL Language Server with special initialization. """ # Call parent start method super().start() # AL-specific post-initialization self._post_initialize_al_workspace() # Note: set_active_workspace() can be called manually if needed for multi-workspace scenarios # We don't call it automatically to avoid issues during single-workspace initialization return self def _post_initialize_al_workspace(self) -> None: """ Post-initialization setup for AL Language Server. The AL server requires additional setup after initialization: 1. Send workspace configuration - provides AL settings and paths 2. Open app.json to trigger project loading - AL uses app.json to identify project structure 3. Optionally wait for project to be loaded if supported This special initialization sequence is unique to AL and necessary for proper symbol resolution and navigation features. """ # No sleep needed - server is already initialized # Send workspace configuration first # This tells AL about assembly paths, package caches, and code analysis settings try: self.server.send_notification( "workspace/didChangeConfiguration", { "settings": { "workspacePath": self.repository_root_path, "alResourceConfigurationSettings": { "assemblyProbingPaths": ["./.netpackages"], "codeAnalyzers": [], "enableCodeAnalysis": False, "backgroundCodeAnalysis": "Project", "packageCachePaths": ["./.alpackages"], "ruleSetPath": None, "enableCodeActions": True, "incrementalBuild": False, "outputAnalyzerStatistics": True, "enableExternalRulesets": True, }, "setActiveWorkspace": True, "expectedProjectReferenceDefinitions": [], "activeWorkspaceClosure": [self.repository_root_path], } }, ) self.logger.log("Sent workspace configuration", logging.DEBUG) except Exception as e: self.logger.log(f"Failed to send workspace config: {e}", logging.WARNING) # Check if app.json exists and open it # app.json is the AL project manifest file (similar to package.json for Node.js) # Opening it triggers AL to load the project and index all AL files app_json_path = Path(self.repository_root_path) / "app.json" if app_json_path.exists(): try: with open(app_json_path, encoding="utf-8") as f: app_json_content = f.read() # Use forward slashes for URI app_json_uri = app_json_path.as_uri() # Send textDocument/didOpen for app.json self.server.send_notification( "textDocument/didOpen", {"textDocument": {"uri": app_json_uri, "languageId": "json", "version": 1, "text": app_json_content}}, ) self.logger.log(f"Opened app.json: {app_json_uri}", logging.DEBUG) except Exception as e: self.logger.log(f"Failed to open app.json: {e}", logging.WARNING) # Try to set active workspace (AL-specific custom LSP request) # This is optional and may not be supported by all AL server versions workspace_uri = Path(self.repository_root_path).resolve().as_uri() try: result = self.server.send_request( "al/setActiveWorkspace", { "currentWorkspaceFolderPath": {"uri": workspace_uri, "name": Path(self.repository_root_path).name, "index": 0}, "settings": { "workspacePath": self.repository_root_path, "setActiveWorkspace": True, }, }, timeout=2, # Quick timeout since this is optional ) self.logger.log(f"Set active workspace result: {result}", logging.DEBUG) except Exception as e: # This is a custom AL request, not critical if it fails self.logger.log(f"Failed to set active workspace (non-critical): {e}", logging.DEBUG) # Check if project supports load status check (optional) # Many AL server versions don't support this, so we use a short timeout # and continue regardless of the result self._wait_for_project_load(timeout=3) @override def is_ignored_dirname(self, dirname: str) -> bool: """ Define AL-specific directories to ignore during file scanning. These directories contain generated files, dependencies, or cache data that should not be analyzed for symbols. Args: dirname: Directory name to check Returns: True if directory should be ignored """ al_ignore_dirs = { ".alpackages", # AL package cache - downloaded dependencies ".alcache", # AL compiler cache - intermediate compilation files ".altemplates", # AL templates - code generation templates ".snapshots", # Test snapshots - test result snapshots "out", # Compiled output - generated .app files ".vscode", # VS Code settings - editor configuration "Reference", # Reference assemblies - .NET dependencies ".netpackages", # .NET packages - NuGet packages for AL "bin", # Binary output - compiled binaries "obj", # Object files - intermediate build artifacts } # Check parent class ignore list first, then AL-specific return super().is_ignored_dirname(dirname) or dirname in al_ignore_dirs @override def request_full_symbol_tree(self, within_relative_path: str | None = None, include_body: bool = False) -> list[dict]: """ Override to handle AL's requirement of opening files before requesting symbols. The AL Language Server requires files to be explicitly opened via textDocument/didOpen before it can provide meaningful symbols. Without this, it only returns directory symbols. This is different from most language servers which can provide symbols for unopened files. This method: 1. Scans the repository for all AL files (.al and .dal extensions) 2. Opens each file with the AL server 3. Requests symbols for each file 4. Combines all symbols into a hierarchical tree structure 5. Closes the files to free resources Args: within_relative_path: Restrict search to this file or directory path include_body: Whether to include symbol body content Returns: Full symbol tree with all AL symbols from opened files organized by directory """ self.logger.log("AL: Starting request_full_symbol_tree with file opening", logging.DEBUG) # Determine the root path for scanning if within_relative_path is not None: within_abs_path = os.path.join(self.repository_root_path, within_relative_path) if not os.path.exists(within_abs_path): raise FileNotFoundError(f"File or directory not found: {within_abs_path}") if os.path.isfile(within_abs_path): # Single file case - use parent class implementation _, root_nodes = self.request_document_symbols(within_relative_path, include_body=include_body) return root_nodes # Directory case - scan within this directory scan_root = Path(within_abs_path) else: # Scan entire repository scan_root = Path(self.repository_root_path) # For AL, we always need to open files to get symbols al_files = [] # Walk through the repository to find all AL files for root, dirs, files in os.walk(scan_root): # Skip ignored directories dirs[:] = [d for d in dirs if not self.is_ignored_dirname(d)] # Find AL files for file in files: if file.endswith((".al", ".dal")): file_path = Path(root) / file # Use forward slashes for consistent paths try: relative_path = str(file_path.relative_to(self.repository_root_path)).replace("\\", "/") al_files.append((file_path, relative_path)) except ValueError: # File is outside repository root, skip it continue self.logger.log(f"AL: Found {len(al_files)} AL files", logging.DEBUG) if not al_files: self.logger.log("AL: No AL files found in repository", logging.WARNING) return [] # Collect all symbols from all files all_file_symbols = [] for file_path, relative_path in al_files: try: # Use our overridden request_document_symbols which handles opening self.logger.log(f"AL: Getting symbols for {relative_path}", logging.DEBUG) all_syms, root_syms = self.request_document_symbols(relative_path, include_body=include_body) if root_syms: # Create a file-level symbol containing the document symbols file_symbol = { "name": file_path.stem, # Just the filename without extension "kind": 1, # File "children": root_syms, "location": { "uri": file_path.as_uri(), "relativePath": relative_path, "absolutePath": str(file_path), "range": {"start": {"line": 0, "character": 0}, "end": {"line": 0, "character": 0}}, }, } all_file_symbols.append(file_symbol) self.logger.log(f"AL: Added {len(root_syms)} symbols from {relative_path}", logging.DEBUG) elif all_syms: # If we only got all_syms but not root, use all_syms file_symbol = { "name": file_path.stem, "kind": 1, # File "children": all_syms, "location": { "uri": file_path.as_uri(), "relativePath": relative_path, "absolutePath": str(file_path), "range": {"start": {"line": 0, "character": 0}, "end": {"line": 0, "character": 0}}, }, } all_file_symbols.append(file_symbol) self.logger.log(f"AL: Added {len(all_syms)} symbols from {relative_path}", logging.DEBUG) except Exception as e: self.logger.log(f"AL: Failed to get symbols for {relative_path}: {e}", logging.WARNING) if all_file_symbols: self.logger.log(f"AL: Returning symbols from {len(all_file_symbols)} files", logging.DEBUG) # Group files by directory directory_structure = {} for file_symbol in all_file_symbols: rel_path = file_symbol["location"]["relativePath"] path_parts = rel_path.split("/") if len(path_parts) > 1: # File is in a subdirectory dir_path = "/".join(path_parts[:-1]) if dir_path not in directory_structure: directory_structure[dir_path] = [] directory_structure[dir_path].append(file_symbol) else: # File is in root if "." not in directory_structure: directory_structure["."] = [] directory_structure["."].append(file_symbol) # Build hierarchical structure result = [] repo_path = Path(self.repository_root_path) for dir_path, file_symbols in directory_structure.items(): if dir_path == ".": # Root level files result.extend(file_symbols) else: # Create directory symbol dir_symbol = { "name": Path(dir_path).name, "kind": 4, # Package/Directory "children": file_symbols, "location": { "relativePath": dir_path, "absolutePath": str(repo_path / dir_path), "range": {"start": {"line": 0, "character": 0}, "end": {"line": 0, "character": 0}}, }, } result.append(dir_symbol) return result else: self.logger.log("AL: No symbols found in any files", logging.WARNING) return [] # ===== Phase 1: Custom AL Command Implementations ===== @override def _send_definition_request(self, definition_params: DefinitionParams) -> Definition | list[LocationLink] | None: """ Override to use AL's custom gotodefinition command. AL Language Server uses 'al/gotodefinition' instead of the standard 'textDocument/definition' request. This custom command provides better navigation for AL-specific constructs like table extensions, page extensions, and codeunit references. If the custom command fails, we fall back to the standard LSP method. """ # Convert standard params to AL format (same structure, different method) al_params = {"textDocument": definition_params["textDocument"], "position": definition_params["position"]} try: # Use custom AL command instead of standard LSP response = self.server.send_request("al/gotodefinition", al_params) self.logger.log(f"AL gotodefinition response: {response}", logging.DEBUG) return response except Exception as e: self.logger.log(f"Failed to use al/gotodefinition, falling back to standard: {e}", logging.WARNING) # Fallback to standard LSP method if custom command fails return super()._send_definition_request(definition_params) def check_project_loaded(self) -> bool: """ Check if AL project closure is fully loaded. Uses AL's custom 'al/hasProjectClosureLoadedRequest' to determine if the project and all its dependencies have been fully loaded and indexed. This is important because AL operations may fail or return incomplete results if the project is still loading. Returns: bool: True if project is loaded, False otherwise """ if not hasattr(self, "server") or not self.server_started: self.logger.log("Cannot check project load - server not started", logging.DEBUG) return False # Check if we've already determined this request isn't supported if not self._project_load_check_supported: return True # Assume loaded if check isn't supported try: # Use a very short timeout since this is just a status check response = self.server.send_request("al/hasProjectClosureLoadedRequest", {}, timeout=1) # Response can be boolean directly, dict with 'loaded' field, or None if isinstance(response, bool): return response elif isinstance(response, dict): return response.get("loaded", False) elif response is None: # None typically means the project is still loading self.logger.log("Project load check returned None", logging.DEBUG) return False else: self.logger.log(f"Unexpected response type for project load check: {type(response)}", logging.DEBUG) return False except Exception as e: # Mark as unsupported to avoid repeated failed attempts self._project_load_check_supported = False self.logger.log(f"Project load check not supported by this AL server version: {e}", logging.DEBUG) # Assume loaded if we can't check return True def _wait_for_project_load(self, timeout: int = 3) -> bool: """ Wait for project to be fully loaded. Polls the AL server to check if the project is loaded. This is optional as not all AL server versions support this check. We use a short timeout and continue regardless of the result. Args: timeout: Maximum time to wait in seconds (default 3s) Returns: bool: True if project loaded within timeout, False otherwise """ start_time = time.time() self.logger.log(f"Checking AL project load status (timeout: {timeout}s)...", logging.DEBUG) while time.time() - start_time < timeout: if self.check_project_loaded(): elapsed = time.time() - start_time self.logger.log(f"AL project fully loaded after {elapsed:.1f}s", logging.INFO) return True time.sleep(0.5) self.logger.log(f"Project load check timed out after {timeout}s (non-critical)", logging.DEBUG) return False def set_active_workspace(self, workspace_uri: str | None = None) -> None: """ Set the active AL workspace. This is important when multiple workspaces exist to ensure operations target the correct workspace. The AL server can handle multiple projects simultaneously, but only one can be "active" at a time for operations like symbol search and navigation. This uses the custom 'al/setActiveWorkspace' LSP command. Args: workspace_uri: URI of workspace to set as active, or None to use repository root """ if not hasattr(self, "server") or not self.server_started: self.logger.log("Cannot set active workspace - server not started", logging.DEBUG) return if workspace_uri is None: workspace_uri = Path(self.repository_root_path).resolve().as_uri() params = {"workspaceUri": workspace_uri} try: self.server.send_request("al/setActiveWorkspace", params) self.logger.log(f"Set active workspace to: {workspace_uri}", logging.INFO) except Exception as e: self.logger.log(f"Failed to set active workspace: {e}", logging.WARNING) # Non-critical error, continue operation ``` -------------------------------------------------------------------------------- /src/solidlsp/ls.py: -------------------------------------------------------------------------------- ```python import dataclasses import hashlib import json import logging import os import pathlib import pickle import shutil import subprocess import threading from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Iterator from contextlib import contextmanager from copy import copy from pathlib import Path, PurePath from time import sleep from typing import Self, Union, cast import pathspec from serena.text_utils import MatchedConsecutiveLines from serena.util.file_system import match_path from solidlsp import ls_types from solidlsp.ls_config import Language, LanguageServerConfig from solidlsp.ls_exceptions import SolidLSPException from solidlsp.ls_handler import SolidLanguageServerHandler from solidlsp.ls_logger import LanguageServerLogger from solidlsp.ls_types import UnifiedSymbolInformation from solidlsp.ls_utils import FileUtils, PathUtils, TextUtils from solidlsp.lsp_protocol_handler import lsp_types from solidlsp.lsp_protocol_handler import lsp_types as LSPTypes from solidlsp.lsp_protocol_handler.lsp_constants import LSPConstants from solidlsp.lsp_protocol_handler.lsp_types import Definition, DefinitionParams, LocationLink, SymbolKind from solidlsp.lsp_protocol_handler.server import ( LSPError, ProcessLaunchInfo, StringDict, ) from solidlsp.settings import SolidLSPSettings GenericDocumentSymbol = Union[LSPTypes.DocumentSymbol, LSPTypes.SymbolInformation, ls_types.UnifiedSymbolInformation] @dataclasses.dataclass(kw_only=True) class ReferenceInSymbol: """A symbol retrieved when requesting reference to a symbol, together with the location of the reference""" symbol: ls_types.UnifiedSymbolInformation line: int character: int @dataclasses.dataclass class LSPFileBuffer: """ This class is used to store the contents of an open LSP file in memory. """ # uri of the file uri: str # The contents of the file contents: str # The version of the file version: int # The language id of the file language_id: str # reference count of the file ref_count: int content_hash: str = "" def __post_init__(self): self.content_hash = hashlib.md5(self.contents.encode("utf-8")).hexdigest() class SolidLanguageServer(ABC): """ The LanguageServer class provides a language agnostic interface to the Language Server Protocol. It is used to communicate with Language Servers of different programming languages. """ CACHE_FOLDER_NAME = "cache" # To be overridden and extended by subclasses def is_ignored_dirname(self, dirname: str) -> bool: """ A language-specific condition for directories that should always be ignored. For example, venv in Python and node_modules in JS/TS should be ignored always. """ return dirname.startswith(".") @classmethod def get_language_enum_instance(cls) -> Language: return Language.from_ls_class(cls) @classmethod def ls_resources_dir(cls, solidlsp_settings: SolidLSPSettings, mkdir: bool = True) -> str: """ Returns the directory where the language server resources are downloaded. This is used to store language server binaries, configuration files, etc. """ result = os.path.join(solidlsp_settings.ls_resources_dir, cls.__name__) # Migration of previously downloaded LS resources that were downloaded to a subdir of solidlsp instead of to the user's home pre_migration_ls_resources_dir = os.path.join(os.path.dirname(__file__), "language_servers", "static", cls.__name__) if os.path.exists(pre_migration_ls_resources_dir): if os.path.exists(result): # if the directory already exists, we just remove the old resources shutil.rmtree(result, ignore_errors=True) else: # move old resources to the new location shutil.move(pre_migration_ls_resources_dir, result) if mkdir: os.makedirs(result, exist_ok=True) return result @classmethod def create( cls, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, timeout: float | None = None, solidlsp_settings: SolidLSPSettings | None = None, ) -> "SolidLanguageServer": """ Creates a language specific LanguageServer instance based on the given configuration, and appropriate settings for the programming language. If language is Java, then ensure that jdk-17.0.6 or higher is installed, `java` is in PATH, and JAVA_HOME is set to the installation directory. If language is JS/TS, then ensure that node (v18.16.0 or higher) is installed and in PATH. :param repository_root_path: The root path of the repository. :param config: language server configuration. :param logger: The logger to use. :param timeout: the timeout for requests to the language server. If None, no timeout will be used. :param solidlsp_settings: additional settings :return LanguageServer: A language specific LanguageServer instance. """ ls: SolidLanguageServer if solidlsp_settings is None: solidlsp_settings = SolidLSPSettings() ls_class = config.code_language.get_ls_class() # For now, we assume that all language server implementations have the same signature of the constructor # (which, unfortunately, differs from the signature of the base class). # If this assumption is ever violated, we need branching logic here. ls = ls_class(config, logger, repository_root_path, solidlsp_settings) # type: ignore ls.set_request_timeout(timeout) return ls def __init__( self, config: LanguageServerConfig, logger: LanguageServerLogger, repository_root_path: str, process_launch_info: ProcessLaunchInfo, language_id: str, solidlsp_settings: SolidLSPSettings, ): """ Initializes a LanguageServer instance. Do not instantiate this class directly. Use `LanguageServer.create` method instead. :param config: The Multilspy configuration. :param logger: The logger to use. :param repository_root_path: The root path of the repository. :param process_launch_info: Each language server has a specific command used to start the server. This parameter is the command to launch the language server process. The command must pass appropriate flags to the binary, so that it runs in the stdio mode, as opposed to HTTP, TCP modes supported by some language servers. """ self._solidlsp_settings = solidlsp_settings self.logger = logger self.repository_root_path: str = repository_root_path self.logger.log( f"Creating language server instance for {repository_root_path=} with {language_id=} and process launch info: {process_launch_info}", logging.DEBUG, ) self.language_id = language_id self.open_file_buffers: dict[str, LSPFileBuffer] = {} self.language = Language(language_id) # load cache first to prevent any racing conditions due to asyncio stuff self._document_symbols_cache: dict[ str, tuple[str, tuple[list[ls_types.UnifiedSymbolInformation], list[ls_types.UnifiedSymbolInformation]]] ] = {} """Maps file paths to a tuple of (file_content_hash, result_of_request_document_symbols)""" self._cache_lock = threading.Lock() self._cache_has_changed: bool = False self.load_cache() self.server_started = False self.completions_available = threading.Event() if config.trace_lsp_communication: def logging_fn(source: str, target: str, msg: StringDict | str): self.logger.log(f"LSP: {source} -> {target}: {msg!s}", self.logger.logger.level) else: logging_fn = None # cmd is obtained from the child classes, which provide the language specific command to start the language server # LanguageServerHandler provides the functionality to start the language server and communicate with it self.logger.log( f"Creating language server instance with {language_id=} and process launch info: {process_launch_info}", logging.DEBUG ) self.server = SolidLanguageServerHandler( process_launch_info, logger=logging_fn, start_independent_lsp_process=config.start_independent_lsp_process, ) # Set up the pathspec matcher for the ignored paths # for all absolute paths in ignored_paths, convert them to relative paths processed_patterns = [] for pattern in set(config.ignored_paths): # Normalize separators (pathspec expects forward slashes) pattern = pattern.replace(os.path.sep, "/") processed_patterns.append(pattern) self.logger.log(f"Processing {len(processed_patterns)} ignored paths from the config", logging.DEBUG) # Create a pathspec matcher from the processed patterns self._ignore_spec = pathspec.PathSpec.from_lines(pathspec.patterns.GitWildMatchPattern, processed_patterns) self._server_context = None self._request_timeout: float | None = None self._has_waited_for_cross_file_references = False def _get_wait_time_for_cross_file_referencing(self) -> float: """Meant to be overridden by subclasses for LS that don't have a reliable "finished initializing" signal. LS may return incomplete results on calls to `request_references` (only references found in the same file), if the LS is not fully initialized yet. """ return 2 def set_request_timeout(self, timeout: float | None) -> None: """ :param timeout: the timeout, in seconds, for requests to the language server. """ self.server.set_request_timeout(timeout) def get_ignore_spec(self) -> pathspec.PathSpec: """Returns the pathspec matcher for the paths that were configured to be ignored through the multilspy config. This is is a subset of the full language-specific ignore spec that determines which files are relevant for the language server. This matcher is useful for operations outside of the language server, such as when searching for relevant non-language files in the project. """ return self._ignore_spec def is_ignored_path(self, relative_path: str, ignore_unsupported_files: bool = True) -> bool: """ Determine if a path should be ignored based on file type and ignore patterns. :param relative_path: Relative path to check :param ignore_unsupported_files: whether files that are not supported source files should be ignored :return: True if the path should be ignored, False otherwise """ abs_path = os.path.join(self.repository_root_path, relative_path) if not os.path.exists(abs_path): raise FileNotFoundError(f"File {abs_path} not found, the ignore check cannot be performed") # Check file extension if it's a file is_file = os.path.isfile(abs_path) if is_file and ignore_unsupported_files: fn_matcher = self.language.get_source_fn_matcher() if not fn_matcher.is_relevant_filename(abs_path): return True # Create normalized path for consistent handling rel_path = Path(relative_path) # Check each part of the path against always fulfilled ignore conditions dir_parts = rel_path.parts if is_file: dir_parts = dir_parts[:-1] for part in dir_parts: if not part: # Skip empty parts (e.g., from leading '/') continue if self.is_ignored_dirname(part): return True return match_path(relative_path, self.get_ignore_spec(), root_path=self.repository_root_path) def _shutdown(self, timeout: float = 5.0): """ A robust shutdown process designed to terminate cleanly on all platforms, including Windows, by explicitly closing all I/O pipes. """ if not self.server.is_running(): self.logger.log("Server process not running, skipping shutdown.", logging.DEBUG) return self.logger.log(f"Initiating final robust shutdown with a {timeout}s timeout...", logging.INFO) process = self.server.process # --- Main Shutdown Logic --- # Stage 1: Graceful Termination Request # Send LSP shutdown and close stdin to signal no more input. try: self.logger.log("Sending LSP shutdown request...", logging.DEBUG) # Use a thread to timeout the LSP shutdown call since it can hang shutdown_thread = threading.Thread(target=self.server.shutdown) shutdown_thread.daemon = True shutdown_thread.start() shutdown_thread.join(timeout=2.0) # 2 second timeout for LSP shutdown if shutdown_thread.is_alive(): self.logger.log("LSP shutdown request timed out, proceeding to terminate...", logging.DEBUG) else: self.logger.log("LSP shutdown request completed.", logging.DEBUG) if process.stdin and not process.stdin.is_closing(): process.stdin.close() self.logger.log("Stage 1 shutdown complete.", logging.DEBUG) except Exception as e: self.logger.log(f"Exception during graceful shutdown: {e}", logging.DEBUG) # Ignore errors here, we are proceeding to terminate anyway. # Stage 2: Terminate and Wait for Process to Exit self.logger.log(f"Terminating process {process.pid}, current status: {process.poll()}", logging.DEBUG) process.terminate() # Stage 3: Wait for process termination with timeout try: self.logger.log(f"Waiting for process {process.pid} to terminate...", logging.DEBUG) exit_code = process.wait(timeout=timeout) self.logger.log(f"Language server process terminated successfully with exit code {exit_code}.", logging.INFO) except subprocess.TimeoutExpired: # If termination failed, forcefully kill the process self.logger.log(f"Process {process.pid} termination timed out, killing process forcefully...", logging.WARNING) process.kill() try: exit_code = process.wait(timeout=2.0) self.logger.log(f"Language server process killed successfully with exit code {exit_code}.", logging.INFO) except subprocess.TimeoutExpired: self.logger.log(f"Process {process.pid} could not be killed within timeout.", logging.ERROR) except Exception as e: self.logger.log(f"Error during process shutdown: {e}", logging.ERROR) @contextmanager def start_server(self) -> Iterator["SolidLanguageServer"]: self.start() yield self self.stop() def _start_server_process(self) -> None: self.server_started = True self._start_server() @abstractmethod def _start_server(self): pass @contextmanager def open_file(self, relative_file_path: str) -> Iterator[LSPFileBuffer]: """ Open a file in the Language Server. This is required before making any requests to the Language Server. :param relative_file_path: The relative path of the file to open. """ if not self.server_started: self.logger.log( "open_file called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") absolute_file_path = str(PurePath(self.repository_root_path, relative_file_path)) uri = pathlib.Path(absolute_file_path).as_uri() if uri in self.open_file_buffers: assert self.open_file_buffers[uri].uri == uri assert self.open_file_buffers[uri].ref_count >= 1 self.open_file_buffers[uri].ref_count += 1 yield self.open_file_buffers[uri] self.open_file_buffers[uri].ref_count -= 1 else: contents = FileUtils.read_file(self.logger, absolute_file_path) version = 0 self.open_file_buffers[uri] = LSPFileBuffer(uri, contents, version, self.language_id, 1) self.server.notify.did_open_text_document( { LSPConstants.TEXT_DOCUMENT: { LSPConstants.URI: uri, LSPConstants.LANGUAGE_ID: self.language_id, LSPConstants.VERSION: 0, LSPConstants.TEXT: contents, } } ) yield self.open_file_buffers[uri] self.open_file_buffers[uri].ref_count -= 1 if self.open_file_buffers[uri].ref_count == 0: self.server.notify.did_close_text_document( { LSPConstants.TEXT_DOCUMENT: { LSPConstants.URI: uri, } } ) del self.open_file_buffers[uri] def insert_text_at_position(self, relative_file_path: str, line: int, column: int, text_to_be_inserted: str) -> ls_types.Position: """ Insert text at the given line and column in the given file and return the updated cursor position after inserting the text. :param relative_file_path: The relative path of the file to open. :param line: The line number at which text should be inserted. :param column: The column number at which text should be inserted. :param text_to_be_inserted: The text to insert. """ if not self.server_started: self.logger.log( "insert_text_at_position called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") absolute_file_path = str(PurePath(self.repository_root_path, relative_file_path)) uri = pathlib.Path(absolute_file_path).as_uri() # Ensure the file is open assert uri in self.open_file_buffers file_buffer = self.open_file_buffers[uri] file_buffer.version += 1 new_contents, new_l, new_c = TextUtils.insert_text_at_position(file_buffer.contents, line, column, text_to_be_inserted) file_buffer.contents = new_contents self.server.notify.did_change_text_document( { LSPConstants.TEXT_DOCUMENT: { LSPConstants.VERSION: file_buffer.version, LSPConstants.URI: file_buffer.uri, }, LSPConstants.CONTENT_CHANGES: [ { LSPConstants.RANGE: { "start": {"line": line, "character": column}, "end": {"line": line, "character": column}, }, "text": text_to_be_inserted, } ], } ) return ls_types.Position(line=new_l, character=new_c) def delete_text_between_positions( self, relative_file_path: str, start: ls_types.Position, end: ls_types.Position, ) -> str: """ Delete text between the given start and end positions in the given file and return the deleted text. """ if not self.server_started: self.logger.log( "insert_text_at_position called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") absolute_file_path = str(PurePath(self.repository_root_path, relative_file_path)) uri = pathlib.Path(absolute_file_path).as_uri() # Ensure the file is open assert uri in self.open_file_buffers file_buffer = self.open_file_buffers[uri] file_buffer.version += 1 new_contents, deleted_text = TextUtils.delete_text_between_positions( file_buffer.contents, start_line=start["line"], start_col=start["character"], end_line=end["line"], end_col=end["character"] ) file_buffer.contents = new_contents self.server.notify.did_change_text_document( { LSPConstants.TEXT_DOCUMENT: { LSPConstants.VERSION: file_buffer.version, LSPConstants.URI: file_buffer.uri, }, LSPConstants.CONTENT_CHANGES: [{LSPConstants.RANGE: {"start": start, "end": end}, "text": ""}], } ) return deleted_text def _send_definition_request(self, definition_params: DefinitionParams) -> Definition | list[LocationLink] | None: return self.server.send.definition(definition_params) def request_definition(self, relative_file_path: str, line: int, column: int) -> list[ls_types.Location]: """ Raise a [textDocument/definition](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_definition) request to the Language Server for the symbol at the given line and column in the given file. Wait for the response and return the result. :param relative_file_path: The relative path of the file that has the symbol for which definition should be looked up :param line: The line number of the symbol :param column: The column number of the symbol :return List[multilspy_types.Location]: A list of locations where the symbol is defined """ if not self.server_started: self.logger.log( "request_definition called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") if not self._has_waited_for_cross_file_references: # Some LS require waiting for a while before they can return cross-file definitions. # This is a workaround for such LS that don't have a reliable "finished initializing" signal. sleep(self._get_wait_time_for_cross_file_referencing()) self._has_waited_for_cross_file_references = True with self.open_file(relative_file_path): # sending request to the language server and waiting for response definition_params = cast( DefinitionParams, { LSPConstants.TEXT_DOCUMENT: { LSPConstants.URI: pathlib.Path(str(PurePath(self.repository_root_path, relative_file_path))).as_uri() }, LSPConstants.POSITION: { LSPConstants.LINE: line, LSPConstants.CHARACTER: column, }, }, ) response = self._send_definition_request(definition_params) ret: list[ls_types.Location] = [] if isinstance(response, list): # response is either of type Location[] or LocationLink[] for item in response: assert isinstance(item, dict) if LSPConstants.URI in item and LSPConstants.RANGE in item: new_item: ls_types.Location = {} new_item.update(item) new_item["absolutePath"] = PathUtils.uri_to_path(new_item["uri"]) new_item["relativePath"] = PathUtils.get_relative_path(new_item["absolutePath"], self.repository_root_path) ret.append(ls_types.Location(new_item)) elif LSPConstants.TARGET_URI in item and LSPConstants.TARGET_RANGE in item and LSPConstants.TARGET_SELECTION_RANGE in item: new_item: ls_types.Location = {} new_item["uri"] = item[LSPConstants.TARGET_URI] new_item["absolutePath"] = PathUtils.uri_to_path(new_item["uri"]) new_item["relativePath"] = PathUtils.get_relative_path(new_item["absolutePath"], self.repository_root_path) new_item["range"] = item[LSPConstants.TARGET_SELECTION_RANGE] ret.append(ls_types.Location(**new_item)) else: assert False, f"Unexpected response from Language Server: {item}" elif isinstance(response, dict): # response is of type Location assert LSPConstants.URI in response assert LSPConstants.RANGE in response new_item: ls_types.Location = {} new_item.update(response) new_item["absolutePath"] = PathUtils.uri_to_path(new_item["uri"]) new_item["relativePath"] = PathUtils.get_relative_path(new_item["absolutePath"], self.repository_root_path) ret.append(ls_types.Location(**new_item)) elif response is None: # Some language servers return None when they cannot find a definition # This is expected for certain symbol types like generics or types with incomplete information self.logger.log( f"Language server returned None for definition request at {relative_file_path}:{line}:{column}", logging.WARNING, ) else: assert False, f"Unexpected response from Language Server: {response}" return ret # Some LS cause problems with this, so the call is isolated from the rest to allow overriding in subclasses def _send_references_request(self, relative_file_path: str, line: int, column: int) -> list[lsp_types.Location] | None: return self.server.send.references( { "textDocument": {"uri": PathUtils.path_to_uri(os.path.join(self.repository_root_path, relative_file_path))}, "position": {"line": line, "character": column}, "context": {"includeDeclaration": False}, } ) def request_references(self, relative_file_path: str, line: int, column: int) -> list[ls_types.Location]: """ Raise a [textDocument/references](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_references) request to the Language Server to find references to the symbol at the given line and column in the given file. Wait for the response and return the result. Filters out references located in ignored directories. :param relative_file_path: The relative path of the file that has the symbol for which references should be looked up :param line: The line number of the symbol :param column: The column number of the symbol :return: A list of locations where the symbol is referenced (excluding ignored directories) """ if not self.server_started: self.logger.log( "request_references called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") if not self._has_waited_for_cross_file_references: # Some LS require waiting for a while before they can return cross-file references. # This is a workaround for such LS that don't have a reliable "finished initializing" signal. sleep(self._get_wait_time_for_cross_file_referencing()) self._has_waited_for_cross_file_references = True with self.open_file(relative_file_path): try: response = self._send_references_request(relative_file_path, line=line, column=column) except Exception as e: # Catch LSP internal error (-32603) and raise a more informative exception if isinstance(e, LSPError) and getattr(e, "code", None) == -32603: raise RuntimeError( f"LSP internal error (-32603) when requesting references for {relative_file_path}:{line}:{column}. " "This often occurs when requesting references for a symbol not referenced in the expected way. " ) from e raise if response is None: return [] ret: list[ls_types.Location] = [] assert isinstance(response, list), f"Unexpected response from Language Server (expected list, got {type(response)}): {response}" for item in response: assert isinstance(item, dict), f"Unexpected response from Language Server (expected dict, got {type(item)}): {item}" assert LSPConstants.URI in item assert LSPConstants.RANGE in item abs_path = PathUtils.uri_to_path(item[LSPConstants.URI]) if not Path(abs_path).is_relative_to(self.repository_root_path): self.logger.log( "Found a reference in a path outside the repository, probably the LS is parsing things in installed packages or in the standardlib! " f"Path: {abs_path}. This is a bug but we currently simply skip these references.", logging.WARNING, ) continue rel_path = Path(abs_path).relative_to(self.repository_root_path) if self.is_ignored_path(str(rel_path)): self.logger.log(f"Ignoring reference in {rel_path} since it should be ignored", logging.DEBUG) continue new_item: ls_types.Location = {} new_item.update(item) new_item["absolutePath"] = str(abs_path) new_item["relativePath"] = str(rel_path) ret.append(ls_types.Location(**new_item)) return ret def request_text_document_diagnostics(self, relative_file_path: str) -> list[ls_types.Diagnostic]: """ Raise a [textDocument/diagnostic](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_diagnostic) request to the Language Server to find diagnostics for the given file. Wait for the response and return the result. :param relative_file_path: The relative path of the file to retrieve diagnostics for :return: A list of diagnostics for the file """ if not self.server_started: self.logger.log( "request_text_document_diagnostics called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") with self.open_file(relative_file_path): response = self.server.send.text_document_diagnostic( { LSPConstants.TEXT_DOCUMENT: { LSPConstants.URI: pathlib.Path(str(PurePath(self.repository_root_path, relative_file_path))).as_uri() } } ) if response is None: return [] assert isinstance(response, dict), f"Unexpected response from Language Server (expected list, got {type(response)}): {response}" ret: list[ls_types.Diagnostic] = [] for item in response["items"]: new_item: ls_types.Diagnostic = { "uri": pathlib.Path(str(PurePath(self.repository_root_path, relative_file_path))).as_uri(), "severity": item["severity"], "message": item["message"], "range": item["range"], "code": item["code"], } ret.append(ls_types.Diagnostic(new_item)) return ret def retrieve_full_file_content(self, file_path: str) -> str: """ Retrieve the full content of the given file. """ if os.path.isabs(file_path): file_path = os.path.relpath(file_path, self.repository_root_path) with self.open_file(file_path) as file_data: return file_data.contents def retrieve_content_around_line( self, relative_file_path: str, line: int, context_lines_before: int = 0, context_lines_after: int = 0 ) -> MatchedConsecutiveLines: """ Retrieve the content of the given file around the given line. :param relative_file_path: The relative path of the file to retrieve the content from :param line: The line number to retrieve the content around :param context_lines_before: The number of lines to retrieve before the given line :param context_lines_after: The number of lines to retrieve after the given line :return MatchedConsecutiveLines: A container with the desired lines. """ with self.open_file(relative_file_path) as file_data: file_contents = file_data.contents return MatchedConsecutiveLines.from_file_contents( file_contents, line=line, context_lines_before=context_lines_before, context_lines_after=context_lines_after, source_file_path=relative_file_path, ) def request_completions( self, relative_file_path: str, line: int, column: int, allow_incomplete: bool = False ) -> list[ls_types.CompletionItem]: """ Raise a [textDocument/completion](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_completion) request to the Language Server to find completions at the given line and column in the given file. Wait for the response and return the result. :param relative_file_path: The relative path of the file that has the symbol for which completions should be looked up :param line: The line number of the symbol :param column: The column number of the symbol :return List[multilspy_types.CompletionItem]: A list of completions """ with self.open_file(relative_file_path): open_file_buffer = self.open_file_buffers[pathlib.Path(os.path.join(self.repository_root_path, relative_file_path)).as_uri()] completion_params: LSPTypes.CompletionParams = { "position": {"line": line, "character": column}, "textDocument": {"uri": open_file_buffer.uri}, "context": {"triggerKind": LSPTypes.CompletionTriggerKind.Invoked}, } response: list[LSPTypes.CompletionItem] | LSPTypes.CompletionList | None = None num_retries = 0 while response is None or (response["isIncomplete"] and num_retries < 30): self.completions_available.wait() response: list[LSPTypes.CompletionItem] | LSPTypes.CompletionList | None = self.server.send.completion(completion_params) if isinstance(response, list): response = {"items": response, "isIncomplete": False} num_retries += 1 # TODO: Understand how to appropriately handle `isIncomplete` if response is None or (response["isIncomplete"] and not (allow_incomplete)): return [] if "items" in response: response = response["items"] response = cast(list[LSPTypes.CompletionItem], response) # TODO: Handle the case when the completion is a keyword items = [item for item in response if item["kind"] != LSPTypes.CompletionItemKind.Keyword] completions_list: list[ls_types.CompletionItem] = [] for item in items: assert "insertText" in item or "textEdit" in item assert "kind" in item completion_item = {} if "detail" in item: completion_item["detail"] = item["detail"] if "label" in item: completion_item["completionText"] = item["label"] completion_item["kind"] = item["kind"] elif "insertText" in item: completion_item["completionText"] = item["insertText"] completion_item["kind"] = item["kind"] elif "textEdit" in item and "newText" in item["textEdit"]: completion_item["completionText"] = item["textEdit"]["newText"] completion_item["kind"] = item["kind"] elif "textEdit" in item and "range" in item["textEdit"]: new_dot_lineno, new_dot_colno = ( completion_params["position"]["line"], completion_params["position"]["character"], ) assert all( ( item["textEdit"]["range"]["start"]["line"] == new_dot_lineno, item["textEdit"]["range"]["start"]["character"] == new_dot_colno, item["textEdit"]["range"]["start"]["line"] == item["textEdit"]["range"]["end"]["line"], item["textEdit"]["range"]["start"]["character"] == item["textEdit"]["range"]["end"]["character"], ) ) completion_item["completionText"] = item["textEdit"]["newText"] completion_item["kind"] = item["kind"] elif "textEdit" in item and "insert" in item["textEdit"]: assert False else: assert False completion_item = ls_types.CompletionItem(**completion_item) completions_list.append(completion_item) return [json.loads(json_repr) for json_repr in set(json.dumps(item, sort_keys=True) for item in completions_list)] def request_document_symbols( self, relative_file_path: str, include_body: bool = False ) -> tuple[list[ls_types.UnifiedSymbolInformation], list[ls_types.UnifiedSymbolInformation]]: """ Raise a [textDocument/documentSymbol](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_documentSymbol) request to the Language Server to find symbols in the given file. Wait for the response and return the result. :param relative_file_path: The relative path of the file that has the symbols :param include_body: whether to include the body of the symbols in the result. :return: A list of symbols in the file, and a list of root symbols that represent the tree structure of the symbols. All symbols will have a location, a children, and a parent attribute, where the parent attribute is None for root symbols. Note that this is slightly different from the call to request_full_symbol_tree, where the parent attribute will be the file symbol which in turn may have a package symbol as parent. If you need a symbol tree that contains file symbols as well, you should use `request_full_symbol_tree` instead. """ # TODO: it's kinda dumb to not use the cache if include_body is False after include_body was True once # Should be fixed in the future, it's a small performance optimization cache_key = f"{relative_file_path}-{include_body}" with self.open_file(relative_file_path) as file_data: with self._cache_lock: file_hash_and_result = self._document_symbols_cache.get(cache_key) if file_hash_and_result is not None: file_hash, result = file_hash_and_result if file_hash == file_data.content_hash: self.logger.log(f"Returning cached document symbols for {relative_file_path}", logging.DEBUG) return result else: self.logger.log(f"Content for {relative_file_path} has changed. Will overwrite in-memory cache", logging.DEBUG) else: self.logger.log(f"No cache hit for symbols with {include_body=} in {relative_file_path}", logging.DEBUG) self.logger.log(f"Requesting document symbols for {relative_file_path} from the Language Server", logging.DEBUG) response = self.server.send.document_symbol( {"textDocument": {"uri": pathlib.Path(os.path.join(self.repository_root_path, relative_file_path)).as_uri()}} ) if response is None: self.logger.log( f"Received None response from the Language Server for document symbols in {relative_file_path}. " f"This means the language server can't understand this file (possibly due to syntax errors). It may also be due to a bug or misconfiguration of the LS. " f"Returning empty list", logging.WARNING, ) return [], [] assert isinstance(response, list), f"Unexpected response from Language Server: {response}" self.logger.log( f"Received {len(response)} document symbols for {relative_file_path} from the Language Server", logging.DEBUG, ) def turn_item_into_symbol_with_children(item: GenericDocumentSymbol): item = cast(ls_types.UnifiedSymbolInformation, item) absolute_path = os.path.join(self.repository_root_path, relative_file_path) # handle missing entries in location if "location" not in item: uri = pathlib.Path(absolute_path).as_uri() assert "range" in item tree_location = ls_types.Location( uri=uri, range=item["range"], absolutePath=absolute_path, relativePath=relative_file_path, ) item["location"] = tree_location location = item["location"] if "absolutePath" not in location: location["absolutePath"] = absolute_path if "relativePath" not in location: location["relativePath"] = relative_file_path if include_body: item["body"] = self.retrieve_symbol_body(item) # handle missing selectionRange if "selectionRange" not in item: if "range" in item: item["selectionRange"] = item["range"] else: item["selectionRange"] = item["location"]["range"] children = item.get(LSPConstants.CHILDREN, []) for child in children: child["parent"] = item item[LSPConstants.CHILDREN] = children flat_all_symbol_list: list[ls_types.UnifiedSymbolInformation] = [] root_nodes: list[ls_types.UnifiedSymbolInformation] = [] for root_item in response: if "range" not in root_item and "location" not in root_item: if root_item["kind"] in [SymbolKind.File, SymbolKind.Module]: ... # mutation is more convenient than creating a new dict, # so we cast and rename the var after the mutating call to turn_item_into_symbol_with_children # which turned and item into a "symbol" turn_item_into_symbol_with_children(root_item) root_symbol = cast(ls_types.UnifiedSymbolInformation, root_item) root_symbol["parent"] = None root_nodes.append(root_symbol) assert isinstance(root_symbol, dict) assert LSPConstants.NAME in root_symbol assert LSPConstants.KIND in root_symbol if LSPConstants.CHILDREN in root_symbol: # TODO: l_tree should be a list of TreeRepr. Define the following function to return TreeRepr as well def visit_tree_nodes_and_build_tree_repr(node: GenericDocumentSymbol) -> list[ls_types.UnifiedSymbolInformation]: node = cast(ls_types.UnifiedSymbolInformation, node) l: list[ls_types.UnifiedSymbolInformation] = [] turn_item_into_symbol_with_children(node) assert LSPConstants.CHILDREN in node children = node[LSPConstants.CHILDREN] l.append(node) for child in children: l.extend(visit_tree_nodes_and_build_tree_repr(child)) return l flat_all_symbol_list.extend(visit_tree_nodes_and_build_tree_repr(root_symbol)) else: flat_all_symbol_list.append(ls_types.UnifiedSymbolInformation(**root_symbol)) result = flat_all_symbol_list, root_nodes self.logger.log(f"Caching document symbols for {relative_file_path}", logging.DEBUG) with self._cache_lock: self._document_symbols_cache[cache_key] = (file_data.content_hash, result) self._cache_has_changed = True return result def request_full_symbol_tree( self, within_relative_path: str | None = None, include_body: bool = False ) -> list[ls_types.UnifiedSymbolInformation]: """ Will go through all files in the project or within a relative path and build a tree of symbols. Note: this may be slow the first time it is called, especially if `within_relative_path` is not used to restrict the search. For each file, a symbol of kind File (2) will be created. For directories, a symbol of kind Package (4) will be created. All symbols will have a children attribute, thereby representing the tree structure of all symbols in the project that are within the repository. All symbols except the root packages will have a parent attribute. Will ignore directories starting with '.', language-specific defaults and user-configured directories (e.g. from .gitignore). :param within_relative_path: pass a relative path to only consider symbols within this path. If a file is passed, only the symbols within this file will be considered. If a directory is passed, all files within this directory will be considered. :param include_body: whether to include the body of the symbols in the result. :return: A list of root symbols representing the top-level packages/modules in the project. """ if within_relative_path is not None: within_abs_path = os.path.join(self.repository_root_path, within_relative_path) if not os.path.exists(within_abs_path): raise FileNotFoundError(f"File or directory not found: {within_abs_path}") if os.path.isfile(within_abs_path): if self.is_ignored_path(within_relative_path): self.logger.log( f"You passed a file explicitly, but it is ignored. This is probably an error. File: {within_relative_path}", logging.ERROR, ) return [] else: _, root_nodes = self.request_document_symbols(within_relative_path, include_body=include_body) return root_nodes # Helper function to recursively process directories def process_directory(rel_dir_path: str) -> list[ls_types.UnifiedSymbolInformation]: abs_dir_path = self.repository_root_path if rel_dir_path == "." else os.path.join(self.repository_root_path, rel_dir_path) abs_dir_path = os.path.realpath(abs_dir_path) if self.is_ignored_path(str(Path(abs_dir_path).relative_to(self.repository_root_path))): self.logger.log(f"Skipping directory: {rel_dir_path}\n(because it should be ignored)", logging.DEBUG) return [] result = [] try: contained_dir_or_file_names = os.listdir(abs_dir_path) except OSError: return [] # Create package symbol for directory package_symbol = ls_types.UnifiedSymbolInformation( # type: ignore name=os.path.basename(abs_dir_path), kind=ls_types.SymbolKind.Package, location=ls_types.Location( uri=str(pathlib.Path(abs_dir_path).as_uri()), range={"start": {"line": 0, "character": 0}, "end": {"line": 0, "character": 0}}, absolutePath=str(abs_dir_path), relativePath=str(Path(abs_dir_path).resolve().relative_to(self.repository_root_path)), ), children=[], ) result.append(package_symbol) for contained_dir_or_file_name in contained_dir_or_file_names: contained_dir_or_file_abs_path = os.path.join(abs_dir_path, contained_dir_or_file_name) # obtain relative path try: contained_dir_or_file_rel_path = str( Path(contained_dir_or_file_abs_path).resolve().relative_to(self.repository_root_path) ) except ValueError as e: # Typically happens when the path is not under the repository root (e.g., symlink pointing outside) self.logger.log( f"Skipping path {contained_dir_or_file_abs_path}; likely outside of the repository root {self.repository_root_path} [cause: {e}]", logging.WARNING, ) continue if self.is_ignored_path(contained_dir_or_file_rel_path): self.logger.log(f"Skipping item: {contained_dir_or_file_rel_path}\n(because it should be ignored)", logging.DEBUG) continue if os.path.isdir(contained_dir_or_file_abs_path): child_symbols = process_directory(contained_dir_or_file_rel_path) package_symbol["children"].extend(child_symbols) for child in child_symbols: child["parent"] = package_symbol elif os.path.isfile(contained_dir_or_file_abs_path): _, file_root_nodes = self.request_document_symbols(contained_dir_or_file_rel_path, include_body=include_body) # Create file symbol, link with children file_rel_path = str(Path(contained_dir_or_file_abs_path).resolve().relative_to(self.repository_root_path)) with self.open_file(file_rel_path) as file_data: fileRange = self._get_range_from_file_content(file_data.contents) file_symbol = ls_types.UnifiedSymbolInformation( # type: ignore name=os.path.splitext(contained_dir_or_file_name)[0], kind=ls_types.SymbolKind.File, range=fileRange, selectionRange=fileRange, location=ls_types.Location( uri=str(pathlib.Path(contained_dir_or_file_abs_path).as_uri()), range=fileRange, absolutePath=str(contained_dir_or_file_abs_path), relativePath=str(Path(contained_dir_or_file_abs_path).resolve().relative_to(self.repository_root_path)), ), children=file_root_nodes, parent=package_symbol, ) for child in file_root_nodes: child["parent"] = file_symbol # Link file symbol with package package_symbol["children"].append(file_symbol) # TODO: Not sure if this is actually still needed given recent changes to relative path handling def fix_relative_path(nodes: list[ls_types.UnifiedSymbolInformation]): for node in nodes: if "location" in node and "relativePath" in node["location"]: path = Path(node["location"]["relativePath"]) if path.is_absolute(): try: path = path.relative_to(self.repository_root_path) node["location"]["relativePath"] = str(path) except Exception: pass if "children" in node: fix_relative_path(node["children"]) fix_relative_path(file_root_nodes) return result # Start from the root or the specified directory start_rel_path = within_relative_path or "." return process_directory(start_rel_path) @staticmethod def _get_range_from_file_content(file_content: str) -> ls_types.Range: """ Get the range for the given file. """ lines = file_content.split("\n") end_line = len(lines) end_column = len(lines[-1]) return ls_types.Range(start=ls_types.Position(line=0, character=0), end=ls_types.Position(line=end_line, character=end_column)) def request_dir_overview(self, relative_dir_path: str) -> dict[str, list[UnifiedSymbolInformation]]: """ :return: A mapping of all relative paths analyzed to lists of top-level symbols in the corresponding file. """ symbol_tree = self.request_full_symbol_tree(relative_dir_path) # Initialize result dictionary result: dict[str, list[UnifiedSymbolInformation]] = defaultdict(list) # Helper function to process a symbol and its children def process_symbol(symbol: ls_types.UnifiedSymbolInformation): if symbol["kind"] == ls_types.SymbolKind.File: # For file symbols, process their children (top-level symbols) for child in symbol["children"]: # Handle cross-platform path resolution (fixes Docker/macOS path issues) absolute_path = Path(child["location"]["absolutePath"]).resolve() repository_root = Path(self.repository_root_path).resolve() # Try pathlib first, fallback to alternative approach if paths are incompatible try: path = absolute_path.relative_to(repository_root) except ValueError: # If paths are from different roots (e.g., /workspaces vs /Users), # use the relativePath from location if available, or extract from absolutePath if "relativePath" in child["location"] and child["location"]["relativePath"]: path = Path(child["location"]["relativePath"]) else: # Extract relative path by finding common structure # Example: /workspaces/.../test_repo/file.py -> test_repo/file.py path_parts = absolute_path.parts # Find the last common part or use a fallback if "test_repo" in path_parts: test_repo_idx = path_parts.index("test_repo") path = Path(*path_parts[test_repo_idx:]) else: # Last resort: use filename only path = Path(absolute_path.name) result[str(path)].append(child) # For package/directory symbols, process their children for child in symbol["children"]: process_symbol(child) # Process each root symbol for root in symbol_tree: process_symbol(root) return result def request_document_overview(self, relative_file_path: str) -> list[UnifiedSymbolInformation]: """ :return: the top-level symbols in the given file. """ _, document_roots = self.request_document_symbols(relative_file_path) return document_roots def request_overview(self, within_relative_path: str) -> dict[str, list[UnifiedSymbolInformation]]: """ An overview of all symbols in the given file or directory. :param within_relative_path: the relative path to the file or directory to get the overview of. :return: A mapping of all relative paths analyzed to lists of top-level symbols in the corresponding file. """ abs_path = (Path(self.repository_root_path) / within_relative_path).resolve() if not abs_path.exists(): raise FileNotFoundError(f"File or directory not found: {abs_path}") if abs_path.is_file(): symbols_overview = self.request_document_overview(within_relative_path) return {within_relative_path: symbols_overview} else: return self.request_dir_overview(within_relative_path) def request_hover(self, relative_file_path: str, line: int, column: int) -> ls_types.Hover | None: """ Raise a [textDocument/hover](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_hover) request to the Language Server to find the hover information at the given line and column in the given file. Wait for the response and return the result. :param relative_file_path: The relative path of the file that has the hover information :param line: The line number of the symbol :param column: The column number of the symbol :return None """ with self.open_file(relative_file_path): response = self.server.send.hover( { "textDocument": {"uri": pathlib.Path(os.path.join(self.repository_root_path, relative_file_path)).as_uri()}, "position": { "line": line, "character": column, }, } ) if response is None: return None assert isinstance(response, dict) return ls_types.Hover(**response) def retrieve_symbol_body(self, symbol: ls_types.UnifiedSymbolInformation | LSPTypes.DocumentSymbol | LSPTypes.SymbolInformation) -> str: """ Load the body of the given symbol. If the body is already contained in the symbol, just return it. """ existing_body = symbol.get("body", None) if existing_body: return existing_body assert "location" in symbol symbol_start_line = symbol["location"]["range"]["start"]["line"] symbol_end_line = symbol["location"]["range"]["end"]["line"] assert "relativePath" in symbol["location"] symbol_file = self.retrieve_full_file_content(symbol["location"]["relativePath"]) symbol_lines = symbol_file.split("\n") symbol_body = "\n".join(symbol_lines[symbol_start_line : symbol_end_line + 1]) # remove leading indentation symbol_start_column = symbol["location"]["range"]["start"]["character"] symbol_body = symbol_body[symbol_start_column:] return symbol_body def request_referencing_symbols( self, relative_file_path: str, line: int, column: int, include_imports: bool = True, include_self: bool = False, include_body: bool = False, include_file_symbols: bool = False, ) -> list[ReferenceInSymbol]: """ Finds all symbols that reference the symbol at the given location. This is similar to request_references but filters to only include symbols (functions, methods, classes, etc.) that reference the target symbol. :param relative_file_path: The relative path to the file. :param line: The 0-indexed line number. :param column: The 0-indexed column number. :param include_imports: whether to also include imports as references. Unfortunately, the LSP does not have an import type, so the references corresponding to imports will not be easily distinguishable from definitions. :param include_self: whether to include the references that is the "input symbol" itself. Only has an effect if the relative_file_path, line and column point to a symbol, for example a definition. :param include_body: whether to include the body of the symbols in the result. :param include_file_symbols: whether to include references that are file symbols. This is often a fallback mechanism for when the reference cannot be resolved to a symbol. :return: List of objects containing the symbol and the location of the reference. """ if not self.server_started: self.logger.log( "request_referencing_symbols called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") # First, get all references to the symbol references = self.request_references(relative_file_path, line, column) if not references: return [] # For each reference, find the containing symbol result = [] incoming_symbol = None for ref in references: ref_path = ref["relativePath"] ref_line = ref["range"]["start"]["line"] ref_col = ref["range"]["start"]["character"] with self.open_file(ref_path) as file_data: # Get the containing symbol for this reference containing_symbol = self.request_containing_symbol(ref_path, ref_line, ref_col, include_body=include_body) if containing_symbol is None: # TODO: HORRIBLE HACK! I don't know how to do it better for now... # THIS IS BOUND TO BREAK IN MANY CASES! IT IS ALSO SPECIFIC TO PYTHON! # Background: # When a variable is used to change something, like # # instance = MyClass() # instance.status = "new status" # # we can't find the containing symbol for the reference to `status` # since there is no container on the line of the reference # The hack is to try to find a variable symbol in the containing module # by using the text of the reference to find the variable name (In a very heuristic way) # and then look for a symbol with that name and kind Variable ref_text = file_data.contents.split("\n")[ref_line] if "." in ref_text: containing_symbol_name = ref_text.split(".")[0] all_symbols, _ = self.request_document_symbols(ref_path) for symbol in all_symbols: if symbol["name"] == containing_symbol_name and symbol["kind"] == ls_types.SymbolKind.Variable: containing_symbol = copy(symbol) containing_symbol["location"] = ref containing_symbol["range"] = ref["range"] break # We failed retrieving the symbol, falling back to creating a file symbol if containing_symbol is None and include_file_symbols: self.logger.log( f"Could not find containing symbol for {ref_path}:{ref_line}:{ref_col}. Returning file symbol instead", logging.WARNING, ) fileRange = self._get_range_from_file_content(file_data.contents) location = ls_types.Location( uri=str(pathlib.Path(os.path.join(self.repository_root_path, ref_path)).as_uri()), range=fileRange, absolutePath=str(os.path.join(self.repository_root_path, ref_path)), relativePath=ref_path, ) name = os.path.splitext(os.path.basename(ref_path))[0] if include_body: body = self.retrieve_full_file_content(ref_path) else: body = "" containing_symbol = ls_types.UnifiedSymbolInformation( kind=ls_types.SymbolKind.File, range=fileRange, selectionRange=fileRange, location=location, name=name, children=[], body=body, ) if containing_symbol is None or (not include_file_symbols and containing_symbol["kind"] == ls_types.SymbolKind.File): continue assert "location" in containing_symbol assert "selectionRange" in containing_symbol # Checking for self-reference if ( containing_symbol["location"]["relativePath"] == relative_file_path and containing_symbol["selectionRange"]["start"]["line"] == ref_line and containing_symbol["selectionRange"]["start"]["character"] == ref_col ): incoming_symbol = containing_symbol if include_self: result.append(ReferenceInSymbol(symbol=containing_symbol, line=ref_line, character=ref_col)) continue self.logger.log(f"Found self-reference for {incoming_symbol['name']}, skipping it since {include_self=}", logging.DEBUG) continue # checking whether reference is an import # This is neither really safe nor elegant, but if we don't do it, # there is no way to distinguish between definitions and imports as import is not a symbol-type # and we get the type referenced symbol resulting from imports... if ( not include_imports and incoming_symbol is not None and containing_symbol["name"] == incoming_symbol["name"] and containing_symbol["kind"] == incoming_symbol["kind"] ): self.logger.log( f"Found import of referenced symbol {incoming_symbol['name']}" f"in {containing_symbol['location']['relativePath']}, skipping", logging.DEBUG, ) continue result.append(ReferenceInSymbol(symbol=containing_symbol, line=ref_line, character=ref_col)) return result def request_containing_symbol( self, relative_file_path: str, line: int, column: int | None = None, strict: bool = False, include_body: bool = False, ) -> ls_types.UnifiedSymbolInformation | None: """ Finds the first symbol containing the position for the given file. For Python, container symbols are considered to be those with kinds corresponding to functions, methods, or classes (typically: Function (12), Method (6), Class (5)). The method operates as follows: - Request the document symbols for the file. - Filter symbols to those that start at or before the given line. - From these, first look for symbols whose range contains the (line, column). - If one or more symbols contain the position, return the one with the greatest starting position (i.e. the innermost container). - If none (strictly) contain the position, return the symbol with the greatest starting position among those above the given line. - If no container candidates are found, return None. :param relative_file_path: The relative path to the Python file. :param line: The 0-indexed line number. :param column: The 0-indexed column (also called character). If not passed, the lookup will be based only on the line. :param strict: If True, the position must be strictly within the range of the symbol. Setting to True is useful for example for finding the parent of a symbol, as with strict=False, and the line pointing to a symbol itself, the containing symbol will be the symbol itself (and not the parent). :param include_body: Whether to include the body of the symbol in the result. :return: The container symbol (if found) or None. """ # checking if the line is empty, unfortunately ugly and duplicating code, but I don't want to refactor with self.open_file(relative_file_path): absolute_file_path = str(PurePath(self.repository_root_path, relative_file_path)) content = FileUtils.read_file(self.logger, absolute_file_path) if content.split("\n")[line].strip() == "": self.logger.log( f"Passing empty lines to request_container_symbol is currently not supported, {relative_file_path=}, {line=}", logging.ERROR, ) return None symbols, _ = self.request_document_symbols(relative_file_path) # make jedi and pyright api compatible # the former has no location, the later has no range # we will just always add location of the desired format to all symbols for symbol in symbols: if "location" not in symbol: range = symbol["range"] location = ls_types.Location( uri=f"file:/{absolute_file_path}", range=range, absolutePath=absolute_file_path, relativePath=relative_file_path, ) symbol["location"] = location else: location = symbol["location"] assert "range" in location location["absolutePath"] = absolute_file_path location["relativePath"] = relative_file_path location["uri"] = Path(absolute_file_path).as_uri() # Allowed container kinds, currently only for Python container_symbol_kinds = {ls_types.SymbolKind.Method, ls_types.SymbolKind.Function, ls_types.SymbolKind.Class} def is_position_in_range(line: int, range_d: ls_types.Range) -> bool: start = range_d["start"] end = range_d["end"] column_condition = True if strict: line_condition = end["line"] >= line > start["line"] if column is not None and line == start["line"]: column_condition = column > start["character"] else: line_condition = end["line"] >= line >= start["line"] if column is not None and line == start["line"]: column_condition = column >= start["character"] return line_condition and column_condition # Only consider containers that are not one-liners (otherwise we may get imports) candidate_containers = [ s for s in symbols if s["kind"] in container_symbol_kinds and s["location"]["range"]["start"]["line"] != s["location"]["range"]["end"]["line"] ] var_containers = [s for s in symbols if s["kind"] == ls_types.SymbolKind.Variable] candidate_containers.extend(var_containers) if not candidate_containers: return None # From the candidates, find those whose range contains the given position. containing_symbols = [] for symbol in candidate_containers: s_range = symbol["location"]["range"] if not is_position_in_range(line, s_range): continue containing_symbols.append(symbol) if containing_symbols: # Return the one with the greatest starting position (i.e. the innermost container). containing_symbol = max(containing_symbols, key=lambda s: s["location"]["range"]["start"]["line"]) if include_body: containing_symbol["body"] = self.retrieve_symbol_body(containing_symbol) return containing_symbol else: return None def request_container_of_symbol( self, symbol: ls_types.UnifiedSymbolInformation, include_body: bool = False ) -> ls_types.UnifiedSymbolInformation | None: """ Finds the container of the given symbol if there is one. If the parent attribute is present, the parent is returned without further searching. :param symbol: The symbol to find the container of. :param include_body: whether to include the body of the symbol in the result. :return: The container of the given symbol or None if no container is found. """ if "parent" in symbol: return symbol["parent"] assert "location" in symbol, f"Symbol {symbol} has no location and no parent attribute" return self.request_containing_symbol( symbol["location"]["relativePath"], symbol["location"]["range"]["start"]["line"], symbol["location"]["range"]["start"]["character"], strict=True, include_body=include_body, ) def request_defining_symbol( self, relative_file_path: str, line: int, column: int, include_body: bool = False, ) -> ls_types.UnifiedSymbolInformation | None: """ Finds the symbol that defines the symbol at the given location. This method first finds the definition of the symbol at the given position, then retrieves the full symbol information for that definition. :param relative_file_path: The relative path to the file. :param line: The 0-indexed line number. :param column: The 0-indexed column number. :param include_body: whether to include the body of the symbol in the result. :return: The symbol information for the definition, or None if not found. """ if not self.server_started: self.logger.log( "request_defining_symbol called before Language Server started", logging.ERROR, ) raise SolidLSPException("Language Server not started") # Get the definition location(s) definitions = self.request_definition(relative_file_path, line, column) if not definitions: return None # Use the first definition location definition = definitions[0] def_path = definition["relativePath"] def_line = definition["range"]["start"]["line"] def_col = definition["range"]["start"]["character"] # Find the symbol at or containing this location defining_symbol = self.request_containing_symbol(def_path, def_line, def_col, strict=False, include_body=include_body) return defining_symbol @property def cache_path(self) -> Path: """ The path to the cache file for the document symbols. """ return ( Path(self.repository_root_path) / self._solidlsp_settings.project_data_relative_path / self.CACHE_FOLDER_NAME / self.language_id / "document_symbols_cache_v23-06-25.pkl" ) def save_cache(self): with self._cache_lock: if not self._cache_has_changed: self.logger.log("No changes to document symbols cache, skipping save", logging.DEBUG) return self.logger.log(f"Saving updated document symbols cache to {self.cache_path}", logging.INFO) self.cache_path.parent.mkdir(parents=True, exist_ok=True) try: with open(self.cache_path, "wb") as f: pickle.dump(self._document_symbols_cache, f) self._cache_has_changed = False except Exception as e: self.logger.log( f"Failed to save document symbols cache to {self.cache_path}: {e}. " "Note: this may have resulted in a corrupted cache file.", logging.ERROR, ) def load_cache(self): if not self.cache_path.exists(): return with self._cache_lock: self.logger.log(f"Loading document symbols cache from {self.cache_path}", logging.INFO) try: with open(self.cache_path, "rb") as f: self._document_symbols_cache = pickle.load(f) self.logger.log(f"Loaded {len(self._document_symbols_cache)} document symbols from cache.", logging.INFO) except Exception as e: # cache often becomes corrupt, so just skip loading it self.logger.log( f"Failed to load document symbols cache from {self.cache_path}: {e}. Possible cause: the cache file is corrupted. " "Check for any errors related to saving the cache in the logs.", logging.ERROR, ) def request_workspace_symbol(self, query: str) -> list[ls_types.UnifiedSymbolInformation] | None: """ Raise a [workspace/symbol](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_symbol) request to the Language Server to find symbols across the whole workspace. Wait for the response and return the result. :param query: The query string to filter symbols by :return: A list of matching symbols """ response = self.server.send.workspace_symbol({"query": query}) if response is None: return None assert isinstance(response, list) ret: list[ls_types.UnifiedSymbolInformation] = [] for item in response: assert isinstance(item, dict) assert LSPConstants.NAME in item assert LSPConstants.KIND in item assert LSPConstants.LOCATION in item ret.append(ls_types.UnifiedSymbolInformation(**item)) return ret def start(self) -> "SolidLanguageServer": """ Starts the language server process and connects to it. Call shutdown when ready. :return: self for method chaining """ self.logger.log( f"Starting language server with language {self.language_server.language} for {self.language_server.repository_root_path}", logging.INFO, ) self._server_context = self._start_server_process() return self def stop(self, shutdown_timeout: float = 2.0) -> None: self._shutdown(timeout=shutdown_timeout) @property def language_server(self) -> Self: return self def is_running(self) -> bool: return self.server.is_running() ```