#
tokens: 49365/50000 24/111 files (page 2/5)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 5. Use http://codebase.md/crowdstrike/falcon-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug.yaml
│   │   ├── config.yml
│   │   ├── feature-request.yaml
│   │   └── question.yaml
│   └── workflows
│       ├── docker-build-push.yml
│       ├── docker-build-test.yml
│       ├── markdown-lint.yml
│       ├── python-lint.yml
│       ├── python-test-e2e.yml
│       ├── python-test.yml
│       └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│   ├── CODE_OF_CONDUCT.md
│   ├── CONTRIBUTING.md
│   ├── deployment
│   │   ├── amazon_bedrock_agentcore.md
│   │   └── google_cloud.md
│   ├── e2e_testing.md
│   ├── module_development.md
│   ├── resource_development.md
│   └── SECURITY.md
├── examples
│   ├── adk
│   │   ├── adk_agent_operations.sh
│   │   ├── falcon_agent
│   │   │   ├── __init__.py
│   │   │   ├── agent.py
│   │   │   ├── env.properties
│   │   │   └── requirements.txt
│   │   └── README.md
│   ├── basic_usage.py
│   ├── mcp_config.json
│   ├── sse_usage.py
│   └── streamable_http_usage.py
├── falcon_mcp
│   ├── __init__.py
│   ├── client.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── api_scopes.py
│   │   ├── errors.py
│   │   ├── logging.py
│   │   └── utils.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── idp.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   ├── registry.py
│   ├── resources
│   │   ├── __init__.py
│   │   ├── cloud.py
│   │   ├── detections.py
│   │   ├── discover.py
│   │   ├── hosts.py
│   │   ├── incidents.py
│   │   ├── intel.py
│   │   ├── sensor_usage.py
│   │   ├── serverless.py
│   │   └── spotlight.py
│   └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── generate_e2e_report.py
│   └── test_results_viewer.html
├── SUPPORT.md
├── tests
│   ├── __init__.py
│   ├── common
│   │   ├── __init__.py
│   │   ├── test_api_scopes.py
│   │   ├── test_errors.py
│   │   ├── test_logging.py
│   │   └── test_utils.py
│   ├── conftest.py
│   ├── e2e
│   │   ├── __init__.py
│   │   ├── modules
│   │   │   ├── __init__.py
│   │   │   ├── test_cloud.py
│   │   │   ├── test_detections.py
│   │   │   ├── test_discover.py
│   │   │   ├── test_hosts.py
│   │   │   ├── test_idp.py
│   │   │   ├── test_incidents.py
│   │   │   ├── test_intel.py
│   │   │   ├── test_sensor_usage.py
│   │   │   ├── test_serverless.py
│   │   │   └── test_spotlight.py
│   │   └── utils
│   │       ├── __init__.py
│   │       └── base_e2e_test.py
│   ├── modules
│   │   ├── __init__.py
│   │   ├── test_base.py
│   │   ├── test_cloud.py
│   │   ├── test_detections.py
│   │   ├── test_discover.py
│   │   ├── test_hosts.py
│   │   ├── test_idp.py
│   │   ├── test_incidents.py
│   │   ├── test_intel.py
│   │   ├── test_sensor_usage.py
│   │   ├── test_serverless.py
│   │   ├── test_spotlight.py
│   │   └── utils
│   │       └── test_modules.py
│   ├── test_client.py
│   ├── test_registry.py
│   ├── test_server.py
│   └── test_streamable_http_transport.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/modules/test_spotlight.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Spotlight module.
  3 | """
  4 | 
  5 | import unittest
  6 | 
  7 | from falcon_mcp.modules.spotlight import SpotlightModule
  8 | from tests.modules.utils.test_modules import TestModules
  9 | 
 10 | 
 11 | class TestSpotlightModule(TestModules):
 12 |     """Test cases for the Spotlight module."""
 13 | 
 14 |     def setUp(self):
 15 |         """Set up test fixtures."""
 16 |         self.setup_module(SpotlightModule)
 17 | 
 18 |     def test_register_tools(self):
 19 |         """Test registering tools with the server."""
 20 |         expected_tools = [
 21 |             "falcon_search_vulnerabilities",
 22 |         ]
 23 |         self.assert_tools_registered(expected_tools)
 24 | 
 25 |     def test_register_resources(self):
 26 |         """Test registering resources with the server."""
 27 |         expected_resources = [
 28 |             "falcon_search_vulnerabilities_fql_guide",
 29 |         ]
 30 |         self.assert_resources_registered(expected_resources)
 31 | 
 32 |     def test_search_vulnerabilities_success(self):
 33 |         """Test searching vulnerabilities with successful response."""
 34 |         # Setup mock response with sample vulnerability data
 35 |         mock_response = {
 36 |             "status_code": 200,
 37 |             "body": {
 38 |                 "resources": [
 39 |                     {
 40 |                         "cve_id": "CVE-2023-12345",
 41 |                         "status": "open",
 42 |                         "severity": "HIGH",
 43 |                         "cvss_base_score": 8.5,
 44 |                         "created_timestamp": "2023-08-01T12:00:00Z",
 45 |                         "updated_timestamp": "2023-08-02T14:30:00Z",
 46 |                         "host_info": {
 47 |                             "hostname": "test-server",
 48 |                             "os_version": "Ubuntu 22.04"
 49 |                         }
 50 |                     }
 51 |                 ]
 52 |             },
 53 |         }
 54 |         self.mock_client.command.return_value = mock_response
 55 | 
 56 |         # Call search_vulnerabilities with test parameters
 57 |         result = self.module.search_vulnerabilities(filter="status:'open'")
 58 | 
 59 |         # Verify client command was called correctly
 60 |         self.assertEqual(self.mock_client.command.call_count, 1)
 61 |         call_args = self.mock_client.command.call_args
 62 |         self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
 63 |         
 64 |         # Check that the parameters dictionary contains the expected filter
 65 |         params = call_args[1]["parameters"]
 66 |         self.assertEqual(params["filter"], "status:'open'")
 67 | 
 68 |         # Verify result contains expected values
 69 |         self.assertEqual(len(result), 1)
 70 |         self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
 71 |         self.assertEqual(result[0]["severity"], "HIGH")
 72 |         self.assertEqual(result[0]["status"], "open")
 73 |         self.assertEqual(result[0]["cvss_base_score"], 8.5)
 74 | 
 75 |     def test_search_vulnerabilities_no_filter(self):
 76 |         """Test searching vulnerabilities with no filter parameter."""
 77 |         # Setup mock response with sample vulnerability data
 78 |         mock_response = {
 79 |             "status_code": 200,
 80 |             "body": {
 81 |                 "resources": [
 82 |                     {
 83 |                         "cve_id": "CVE-2023-12345",
 84 |                         "status": "open",
 85 |                         "severity": "HIGH"
 86 |                     }
 87 |                 ]
 88 |             },
 89 |         }
 90 |         self.mock_client.command.return_value = mock_response
 91 | 
 92 |         # Call search_vulnerabilities with no filter
 93 |         result = self.module.search_vulnerabilities()
 94 | 
 95 |         # Verify client command was called with the correct operation
 96 |         self.assertEqual(self.mock_client.command.call_count, 1)
 97 |         call_args = self.mock_client.command.call_args
 98 |         self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
 99 | 
100 |         # Verify result contains expected values
101 |         self.assertEqual(len(result), 1)
102 |         self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
103 | 
104 |     def test_search_vulnerabilities_empty_response(self):
105 |         """Test searching vulnerabilities with empty response."""
106 |         # Setup mock response with empty resources
107 |         mock_response = {"status_code": 200, "body": {"resources": []}}
108 |         self.mock_client.command.return_value = mock_response
109 | 
110 |         # Call search_vulnerabilities
111 |         result = self.module.search_vulnerabilities(filter="status:'closed'")
112 | 
113 |         # Verify client command was called with the correct operation
114 |         self.assertEqual(self.mock_client.command.call_count, 1)
115 |         call_args = self.mock_client.command.call_args
116 |         self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
117 | 
118 |         # Verify result is an empty list
119 |         self.assertEqual(result, [])
120 | 
121 |     def test_search_vulnerabilities_error(self):
122 |         """Test searching vulnerabilities with API error."""
123 |         # Setup mock response with error
124 |         mock_response = {
125 |             "status_code": 400,
126 |             "body": {"errors": [{"message": "Invalid query"}]},
127 |         }
128 |         self.mock_client.command.return_value = mock_response
129 | 
130 |         # Call search_vulnerabilities
131 |         results = self.module.search_vulnerabilities(filter="invalid query")
132 |         result = results[0]
133 | 
134 |         # Verify result contains error
135 |         self.assertIn("error", result)
136 |         self.assertIn("details", result)
137 |         # Check that the error message starts with the expected prefix
138 |         self.assertTrue(result["error"].startswith("Failed to search vulnerabilities"))
139 | 
140 | 
141 | if __name__ == "__main__":
142 |     unittest.main()
143 | 
```

--------------------------------------------------------------------------------
/tests/modules/test_sensor_usage.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Sensor Usage module.
  3 | """
  4 | 
  5 | import unittest
  6 | 
  7 | from falcon_mcp.modules.sensor_usage import SensorUsageModule
  8 | from tests.modules.utils.test_modules import TestModules
  9 | 
 10 | 
 11 | class TestSensorUsageModule(TestModules):
 12 |     """Test cases for the Sensor Usage module."""
 13 | 
 14 |     def setUp(self):
 15 |         """Set up test fixtures."""
 16 |         self.setup_module(SensorUsageModule)
 17 | 
 18 |     def test_register_tools(self):
 19 |         """Test registering tools with the server."""
 20 |         expected_tools = [
 21 |             "falcon_search_sensor_usage",
 22 |         ]
 23 |         self.assert_tools_registered(expected_tools)
 24 | 
 25 |     def test_register_resources(self):
 26 |         """Test registering resources with the server."""
 27 |         expected_resources = [
 28 |             "falcon_search_sensor_usage_fql_guide",
 29 |         ]
 30 |         self.assert_resources_registered(expected_resources)
 31 | 
 32 |     def test_search_sensor_usage_success(self):
 33 |         """Test searching sensor usage with successful response."""
 34 |         # Setup mock response with sample sensor usage data
 35 |         mock_response = {
 36 |             "status_code": 200,
 37 |             "body": {
 38 |                 "resources": [
 39 |                     {
 40 |                         "containers": 42.5,
 41 |                         "public_cloud_with_containers": 42,
 42 |                         "public_cloud_without_containers": 42.75,
 43 |                         "servers_with_containers": 42.25,
 44 |                         "servers_without_containers": 42.75,
 45 |                         "workstations": 42.75,
 46 |                         "mobile": 42.75,
 47 |                         "lumos": 42.25,
 48 |                         "chrome_os": 0,
 49 |                         "date": "2025-08-02"
 50 |                     }
 51 |                 ]
 52 |             },
 53 |         }
 54 |         self.mock_client.command.return_value = mock_response
 55 | 
 56 |         # Call search_sensor_usage with test parameters
 57 |         result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
 58 | 
 59 |         # Verify client command was called correctly
 60 |         self.mock_client.command.assert_called_once_with(
 61 |             "GetSensorUsageWeekly",
 62 |             parameters={
 63 |                 "filter": "event_date:'2025-08-02'",
 64 |             },
 65 |         )
 66 | 
 67 |         # Verify result contains expected values
 68 |         self.assertEqual(len(result), 1)
 69 |         self.assertEqual(result[0]["date"], "2025-08-02")
 70 |         self.assertEqual(result[0]["containers"], 42.5)
 71 |         self.assertEqual(result[0]["workstations"], 42.75)
 72 |         self.assertEqual(result[0]["mobile"], 42.75)
 73 | 
 74 |     def test_search_sensor_usage_no_filter(self):
 75 |         """Test searching sensor usage with no filter parameter."""
 76 |         # Setup mock response with sample sensor usage data
 77 |         mock_response = {
 78 |             "status_code": 200,
 79 |             "body": {
 80 |                 "resources": [
 81 |                     {
 82 |                         "containers": 42.5,
 83 |                         "public_cloud_with_containers": 42,
 84 |                         "public_cloud_without_containers": 42.75,
 85 |                         "servers_with_containers": 42.25,
 86 |                         "servers_without_containers": 42.75,
 87 |                         "workstations": 42.75,
 88 |                         "mobile": 42.75,
 89 |                         "lumos": 42.25,
 90 |                         "chrome_os": 0,
 91 |                         "date": "2025-08-02"
 92 |                     }
 93 |                 ]
 94 |             },
 95 |         }
 96 |         self.mock_client.command.return_value = mock_response
 97 | 
 98 |         # Call search_sensor_usage with no filter
 99 |         result = self.module.search_sensor_usage()
100 | 
101 |         # Verify client command was called with the correct operation
102 |         self.assertEqual(self.mock_client.command.call_count, 1)
103 |         call_args = self.mock_client.command.call_args
104 |         self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
105 | 
106 |         # Verify result contains expected values
107 |         self.assertEqual(len(result), 1)
108 |         self.assertEqual(result[0]["date"], "2025-08-02")
109 | 
110 |     def test_search_sensor_usage_empty_response(self):
111 |         """Test searching sensor usage with empty response."""
112 |         # Setup mock response with empty resources
113 |         mock_response = {"status_code": 200, "body": {"resources": []}}
114 |         self.mock_client.command.return_value = mock_response
115 | 
116 |         # Call search_sensor_usage
117 |         result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
118 | 
119 |         # Verify client command was called with the correct operation
120 |         self.assertEqual(self.mock_client.command.call_count, 1)
121 |         call_args = self.mock_client.command.call_args
122 |         self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
123 | 
124 |         # Verify result is an empty list
125 |         self.assertEqual(result, [])
126 | 
127 |     def test_search_sensor_usage_error(self):
128 |         """Test searching sensor usage with API error."""
129 |         # Setup mock response with error
130 |         mock_response = {
131 |             "status_code": 400,
132 |             "body": {"errors": [{"message": "Invalid query"}]},
133 |         }
134 |         self.mock_client.command.return_value = mock_response
135 | 
136 |         # Call search_sensor_usage
137 |         results = self.module.search_sensor_usage(filter="invalid query")
138 |         result = results[0]
139 | 
140 |         # Verify result contains error
141 |         self.assertIn("error", result)
142 |         self.assertIn("details", result)
143 |         # Check that the error message starts with the expected prefix
144 |         self.assertTrue(result["error"].startswith("Failed to search sensor usage"))
145 | 
146 | 
147 | if __name__ == "__main__":
148 |     unittest.main()
149 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/modules/serverless.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Serverless Vulnerabilities module for Falcon MCP Server
  3 | 
  4 | This module provides tools for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities.
  5 | """
  6 | 
  7 | from textwrap import dedent
  8 | from typing import Any, Dict, List
  9 | 
 10 | from mcp.server import FastMCP
 11 | from mcp.server.fastmcp.resources import TextResource
 12 | from pydantic import AnyUrl, Field
 13 | 
 14 | from falcon_mcp.common.errors import handle_api_response
 15 | from falcon_mcp.common.logging import get_logger
 16 | from falcon_mcp.common.utils import prepare_api_parameters
 17 | from falcon_mcp.modules.base import BaseModule
 18 | from falcon_mcp.resources.serverless import SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION
 19 | 
 20 | logger = get_logger(__name__)
 21 | 
 22 | 
 23 | class ServerlessModule(BaseModule):
 24 |     """Module for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities."""
 25 | 
 26 |     def register_tools(self, server: FastMCP) -> None:
 27 |         """Register tools with the MCP server.
 28 | 
 29 |         Args:
 30 |             server: MCP server instance
 31 |         """
 32 |         # Register tools
 33 |         self._add_tool(
 34 |             server=server,
 35 |             method=self.search_serverless_vulnerabilities,
 36 |             name="search_serverless_vulnerabilities",
 37 |         )
 38 | 
 39 |     def register_resources(self, server: FastMCP) -> None:
 40 |         """Register resources with the MCP server.
 41 | 
 42 |         Args:
 43 |             server: MCP server instance
 44 |         """
 45 |         serverless_vulnerabilities_fql_resource = TextResource(
 46 |             uri=AnyUrl("falcon://serverless/vulnerabilities/fql-guide"),
 47 |             name="falcon_serverless_vulnerabilities_fql_guide",
 48 |             description="Contains the guide for the `filter` param of the `falcon_search_serverless_vulnerabilities` tool.",
 49 |             text=SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION,
 50 |         )
 51 | 
 52 |         self._add_resource(
 53 |             server,
 54 |             serverless_vulnerabilities_fql_resource,
 55 |         )
 56 | 
 57 |     def search_serverless_vulnerabilities(
 58 |         self,
 59 |         filter: str = Field(
 60 |             description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://serverless/vulnerabilities/fql-guide` resource when building this filter parameter.",
 61 |             examples={"cloud_provider:'aws'", "severity:'HIGH'"},
 62 |         ),
 63 |         limit: int | None = Field(
 64 |             default=10,
 65 |             ge=1,
 66 |             description="The upper-bound on the number of records to retrieve. (Default: 10)",
 67 |         ),
 68 |         offset: int | None = Field(
 69 |             default=0,
 70 |             description="The offset from where to begin.",
 71 |         ),
 72 |         sort: str | None = Field(
 73 |             default=None,
 74 |             description=dedent("""
 75 |                 Sort serverless vulnerabilities using FQL syntax.
 76 | 
 77 |                 Supported sorting fields:
 78 |                 • application_name: Name of the application
 79 |                 • application_name_version: Version of the application
 80 |                 • cid: Customer ID
 81 |                 • cloud_account_id: Cloud account ID
 82 |                 • cloud_account_name: Cloud account name
 83 |                 • cloud_provider: Cloud provider
 84 |                 • cve_id: CVE ID
 85 |                 • cvss_base_score: CVSS base score
 86 |                 • exprt_rating: ExPRT rating
 87 |                 • first_seen_timestamp: When the vulnerability was first seen
 88 |                 • function_resource_id: Function resource ID
 89 |                 • is_supported: Whether the function is supported
 90 |                 • layer: Layer where the vulnerability was found
 91 |                 • region: Cloud region
 92 |                 • runtime: Runtime environment
 93 |                 • severity: Severity level
 94 |                 • timestamp: When the vulnerability was last updated
 95 |                 • type: Type of vulnerability
 96 | 
 97 |                 Format: 'field'
 98 | 
 99 |                 Examples: 'severity', 'cloud_provider', 'first_seen_timestamp'
100 |             """).strip(),
101 |             examples={
102 |                 "severity",
103 |                 "cloud_provider",
104 |                 "first_seen_timestamp",
105 |             },
106 |         ),
107 |     ) -> List[Dict[str, Any]]:
108 |         """Search for vulnerabilities in your serverless functions across all cloud service providers.
109 | 
110 |         This endpoint provides security information in SARIF format, including:
111 |         - CVE IDs for identified vulnerabilities
112 |         - Severity levels
113 |         - Vulnerability descriptions
114 |         - Additional relevant details
115 | 
116 |         IMPORTANT: You must use the `falcon://serverless/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
117 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_serverless_vulnerabilities` tool.
118 |         """
119 |         # Prepare parameters for GetCombinedVulnerabilitiesSARIF
120 |         params = prepare_api_parameters(
121 |             {
122 |                 "filter": filter,
123 |                 "limit": limit,
124 |                 "offset": offset,
125 |                 "sort": sort,
126 |             }
127 |         )
128 | 
129 |         # Define the operation name
130 |         operation = "GetCombinedVulnerabilitiesSARIF"
131 | 
132 |         logger.debug("Searching serverless vulnerabilities with params: %s", params)
133 | 
134 |         # Make the API request
135 |         response = self.client.command(operation, parameters=params)
136 | 
137 |         # Use handle_api_response to get vulnerability data
138 |         vulnerabilities = handle_api_response(
139 |             response,
140 |             operation=operation,
141 |             error_message="Failed to search serverless vulnerabilities",
142 |         )
143 | 
144 |         # If handle_api_response returns an error dict instead of a list,
145 |         # it means there was an error, so we return it wrapped in a list
146 |         if self._is_error(vulnerabilities):
147 |             return [vulnerabilities]
148 | 
149 |         return vulnerabilities.get("runs") or []
150 | 
```

--------------------------------------------------------------------------------
/scripts/generate_e2e_report.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Generate a static HTML report from test result data
  3 | """
  4 | 
  5 | import json
  6 | import re
  7 | import sys
  8 | from html import escape
  9 | 
 10 | 
 11 | def generate_static_report(
 12 |     data,
 13 |     template_path="scripts/test_results_viewer.html",
 14 |     output_path="static_test_report.html",
 15 | ):
 16 |     """
 17 |     Generates a static HTML report from test result data.
 18 | 
 19 |     Args:
 20 |         data (list): A list of test result dictionaries.
 21 |         template_path (str): The path to the HTML template file to extract styles from.
 22 |         output_path (str): The path to write the final static HTML file.
 23 |     """
 24 |     try:
 25 |         with open(template_path, "r", encoding="utf-8") as f:
 26 |             html_template = f.read()
 27 |         style_content = re.search(
 28 |             r"<style>(.*?)</style>", html_template, re.DOTALL
 29 |         ).group(1)
 30 |     except (FileNotFoundError, AttributeError):
 31 |         print(
 32 |             f"Warning: Could not read styles from {template_path}. Using default styles."
 33 |         )
 34 |         style_content = "body { font-family: sans-serif; } /* Basic fallback styles */"
 35 | 
 36 |     # --- Group and process data ---
 37 |     total_runs = len(data)
 38 |     successful_runs = sum(1 for run in data if run.get("status") == "success")
 39 |     success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0
 40 | 
 41 |     # Group first by module, then by test name
 42 |     grouped_by_module = {}
 43 |     for run in data:
 44 |         module_name = run.get("module_name", "Unknown Module")
 45 |         test_name = run.get("test_name", "Unnamed Test")
 46 | 
 47 |         if module_name not in grouped_by_module:
 48 |             grouped_by_module[module_name] = {}
 49 | 
 50 |         if test_name not in grouped_by_module[module_name]:
 51 |             grouped_by_module[module_name][test_name] = []
 52 | 
 53 |         grouped_by_module[module_name][test_name].append(run)
 54 | 
 55 |     # Further group by model within each test
 56 |     for module_name, tests in grouped_by_module.items():
 57 |         for test_name, runs in tests.items():
 58 |             grouped_by_model = {}
 59 |             for run in runs:
 60 |                 model_name = run.get("model_name", "Unnamed Model")
 61 |                 grouped_by_model.setdefault(model_name, []).append(run)
 62 |             grouped_by_module[module_name][test_name] = grouped_by_model
 63 | 
 64 |     # --- Build HTML body content ---
 65 |     body_content = f"""
 66 |     <h1>MCP E2E Static Test Report</h1>
 67 |     <div id="summary">
 68 |         <h2>Summary</h2>
 69 |         <p>Total Tests Run: <span>{total_runs}</span></p>
 70 |         <p>Success Rate: <span>{success_rate:.2f}%</span></p>
 71 |     </div>
 72 |     <div id="results-container">
 73 |     """
 74 | 
 75 |     for module_name, tests in sorted(grouped_by_module.items()):
 76 |         body_content += f'<div class="module-group"><h2>{escape(module_name)}</h2>'
 77 |         for test_name, models in sorted(tests.items()):
 78 |             body_content += f'<div class="test-group"><h3>{escape(test_name)}</h3>'
 79 |             for model_name, runs in sorted(models.items()):
 80 |                 body_content += (
 81 |                     f'<div class="model-group"><h4>{escape(model_name)}</h4>'
 82 |                 )
 83 |                 body_content += '<div class="run-grid">'
 84 |                 for run in sorted(runs, key=lambda x: x.get("run_number", 0)):
 85 |                     status_class = escape(run.get("status", "unknown"))
 86 |                     run_html = f"""
 87 |                     <div class="test-run {status_class}">
 88 |                         <h5>Run {run.get("run_number", "#")} - {status_class.upper()}</h5>
 89 |                     """
 90 |                     if status_class == "failure" and run.get("failure_reason"):
 91 |                         reason = escape(run["failure_reason"])
 92 |                         run_html += f'<p><strong>Failure Reason:</strong></p><pre class="failure-reason"><code>{reason}</code></pre>'
 93 | 
 94 |                     agent_result = escape(
 95 |                         run.get("agent_result", "No result") or "No result"
 96 |                     )
 97 |                     run_html += f"""
 98 |                         <details>
 99 |                             <summary>Agent Result</summary>
100 |                             <div class="agent-result"><pre><code>{agent_result}</code></pre></div>
101 |                         </details>
102 |                     """
103 | 
104 |                     if run.get("tools_used"):
105 |                         tools_json = escape(json.dumps(run["tools_used"], indent=2))
106 |                         run_html += f"""
107 |                             <details>
108 |                                 <summary>Tools Used ({len(run["tools_used"])})</summary>
109 |                                 <div class="tools-content"><pre><code>{tools_json}</code></pre></div>
110 |                             </details>
111 |                         """
112 |                     else:
113 |                         run_html += "<p>No tools were used.</p>"
114 | 
115 |                     run_html += "</div>"
116 |                     body_content += run_html
117 |                 body_content += "</div></div>"
118 |             body_content += "</div>"
119 |         body_content += "</div>"
120 |     body_content += "</div>"
121 | 
122 |     # --- Assemble the final HTML ---
123 |     full_html = f"""
124 | <!DOCTYPE html>
125 | <html lang="en">
126 | <head>
127 |     <meta charset="UTF-8">
128 |     <meta name="viewport" content="width=device-width, initial-scale=1.0">
129 |     <title>Static Test Results</title>
130 |     <style>{style_content}</style>
131 | </head>
132 | <body>
133 |     {body_content}
134 | </body>
135 | </html>
136 | """
137 |     with open(output_path, "w", encoding="utf-8") as f:
138 |         f.write(full_html)
139 |     print(f"Successfully generated static report: {output_path}")
140 | 
141 | 
142 | if __name__ == "__main__":
143 |     test_results_path = sys.argv[1] if len(sys.argv) > 1 else "test_results.json"
144 |     try:
145 |         with open(test_results_path, "r", encoding="utf-8") as f:
146 |             test_data = json.load(f)
147 |         generate_static_report(test_data)
148 |     except FileNotFoundError:
149 |         print("Error: test_results.json not found. Please run the tests first.")
150 |     except json.JSONDecodeError:
151 |         print(
152 |             "Error: Could not parse test_results.json. The file might be corrupted or empty."
153 |         )
154 | 
```

--------------------------------------------------------------------------------
/tests/common/test_errors.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the error handling utilities.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import patch
  7 | 
  8 | from falcon_mcp.common.api_scopes import API_SCOPE_REQUIREMENTS, get_required_scopes
  9 | from falcon_mcp.common.errors import (
 10 |     APIError,
 11 |     AuthenticationError,
 12 |     FalconError,
 13 |     _format_error_response,
 14 |     handle_api_response,
 15 |     is_success_response,
 16 | )
 17 | 
 18 | 
 19 | class TestErrorClasses(unittest.TestCase):
 20 |     """Test cases for the error classes."""
 21 | 
 22 |     def test_falcon_error(self):
 23 |         """Test FalconError class."""
 24 |         error = FalconError("Test error")
 25 |         self.assertEqual(str(error), "Test error")
 26 | 
 27 |     def test_authentication_error(self):
 28 |         """Test AuthenticationError class."""
 29 |         error = AuthenticationError("Authentication failed")
 30 |         self.assertEqual(str(error), "Authentication failed")
 31 |         self.assertIsInstance(error, FalconError)
 32 | 
 33 |     def test_api_error(self):
 34 |         """Test APIError class."""
 35 |         error = APIError(
 36 |             "API request failed",
 37 |             status_code=403,
 38 |             body={"errors": [{"message": "Access denied"}]},
 39 |             operation="TestOperation",
 40 |         )
 41 |         self.assertEqual(str(error), "API request failed")
 42 |         self.assertEqual(error.status_code, 403)
 43 |         self.assertEqual(error.body, {"errors": [{"message": "Access denied"}]})
 44 |         self.assertEqual(error.operation, "TestOperation")
 45 |         self.assertIsInstance(error, FalconError)
 46 | 
 47 | 
 48 | class TestErrorUtils(unittest.TestCase):
 49 |     """Test cases for the error utility functions."""
 50 | 
 51 |     def test_is_success_response(self):
 52 |         """Test is_success_response function."""
 53 |         # Success response
 54 |         self.assertTrue(is_success_response({"status_code": 200}))
 55 | 
 56 |         # Error responses
 57 |         self.assertFalse(is_success_response({"status_code": 400}))
 58 |         self.assertFalse(is_success_response({"status_code": 403}))
 59 |         self.assertFalse(is_success_response({"status_code": 500}))
 60 |         self.assertFalse(is_success_response({}))  # Missing status_code
 61 | 
 62 |     def test_get_required_scopes(self):
 63 |         """Test get_required_scopes function."""
 64 |         # Known operation
 65 |         self.assertEqual(get_required_scopes("GetQueriesAlertsV2"), ["Alerts:read"])
 66 | 
 67 |         # Unknown operation
 68 |         self.assertEqual(get_required_scopes("UnknownOperation"), [])
 69 | 
 70 |     @patch("falcon_mcp.common.errors.logger")
 71 |     def test_format_error_response(self, mock_logger):
 72 |         """Test format_error_response function."""
 73 |         # Basic error
 74 |         response = _format_error_response("Test error")
 75 |         self.assertEqual(response, {"error": "Test error"})
 76 |         mock_logger.error.assert_called_with("Error: %s", "Test error")
 77 | 
 78 |         # Error with details
 79 |         details = {"status_code": 400, "body": {"errors": [{"message": "Bad request"}]}}
 80 |         response = _format_error_response("Test error", details=details)
 81 |         self.assertEqual(response["error"], "Test error")
 82 |         self.assertEqual(response["details"], details)
 83 | 
 84 |         # Permission error with operation
 85 |         details = {
 86 |             "status_code": 403,
 87 |             "body": {"errors": [{"message": "Access denied"}]},
 88 |         }
 89 |         response = _format_error_response(
 90 |             "Permission denied", details=details, operation="GetQueriesAlertsV2"
 91 |         )
 92 |         self.assertEqual(response["error"], "Permission denied")
 93 |         self.assertEqual(response["details"], details)
 94 |         self.assertEqual(response["required_scopes"], ["Alerts:read"])
 95 |         self.assertIn("resolution", response)
 96 |         self.assertIn("Alerts:read", response["resolution"])
 97 | 
 98 |     def test_handle_api_response_success(self):
 99 |         """Test handle_api_response function with success response."""
100 |         # Success response with resources
101 |         response = {
102 |             "status_code": 200,
103 |             "body": {"resources": [{"id": "test", "name": "Test Resource"}]},
104 |         }
105 |         result = handle_api_response(response, "TestOperation")
106 |         self.assertEqual(result, [{"id": "test", "name": "Test Resource"}])
107 | 
108 |         # Success response with empty resources
109 |         response = {"status_code": 200, "body": {"resources": []}}
110 |         result = handle_api_response(response, "TestOperation")
111 |         self.assertEqual(result, [])
112 | 
113 |         # Success response with empty resources and default
114 |         response = {"status_code": 200, "body": {"resources": []}}
115 |         result = handle_api_response(
116 |             response, "TestOperation", default_result={"default": True}
117 |         )
118 |         self.assertEqual(result, {"default": True})
119 | 
120 |     def test_handle_api_response_error(self):
121 |         """Test handle_api_response function with error response."""
122 |         # Error response
123 |         response = {
124 |             "status_code": 400,
125 |             "body": {"errors": [{"message": "Bad request"}]},
126 |         }
127 |         result = handle_api_response(
128 |             response,
129 |             "TestOperation",
130 |             error_message="Test failed",
131 |         )
132 |         self.assertIn("error", result)
133 |         self.assertIn("Test failed", result["error"])
134 |         self.assertEqual(result["details"], response)
135 | 
136 |         # Permission error
137 |         response = {
138 |             "status_code": 403,
139 |             "body": {"errors": [{"message": "Access denied"}]},
140 |         }
141 |         # Add a test operation to API_SCOPE_REQUIREMENTS
142 |         original_scopes = API_SCOPE_REQUIREMENTS.copy()
143 |         API_SCOPE_REQUIREMENTS["TestOperation"] = ["test:read"]
144 | 
145 |         try:
146 |             result = handle_api_response(
147 |                 response,
148 |                 "TestOperation",
149 |                 error_message="Permission denied",
150 |             )
151 |             self.assertIn("error", result)
152 |             self.assertIn("Permission denied", result["error"])
153 |             self.assertIn("Required scopes: test:read", result["error"])
154 |             self.assertEqual(result["details"], response)
155 |         finally:
156 |             # Restore original API_SCOPE_REQUIREMENTS
157 |             API_SCOPE_REQUIREMENTS.clear()
158 |             API_SCOPE_REQUIREMENTS.update(original_scopes)
159 | 
160 | 
161 | if __name__ == "__main__":
162 |     unittest.main()
163 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/common/utils.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Common utility functions for Falcon MCP Server
  3 | 
  4 | This module provides common utility functions for the Falcon MCP server.
  5 | """
  6 | 
  7 | import re
  8 | from typing import Any, Dict, List, Optional, Tuple
  9 | 
 10 | from .errors import _format_error_response, is_success_response
 11 | from .logging import get_logger
 12 | 
 13 | logger = get_logger(__name__)
 14 | 
 15 | 
 16 | def filter_none_values(data: Dict[str, Any]) -> Dict[str, Any]:
 17 |     """Remove None values from a dictionary.
 18 | 
 19 |     Args:
 20 |         data: Dictionary to filter
 21 | 
 22 |     Returns:
 23 |         Dict[str, Any]: Filtered dictionary
 24 |     """
 25 |     return {k: v for k, v in data.items() if v is not None}
 26 | 
 27 | 
 28 | def prepare_api_parameters(params: Dict[str, Any]) -> Dict[str, Any]:
 29 |     """Prepare parameters for Falcon API requests.
 30 | 
 31 |     Args:
 32 |         params: Raw parameters
 33 | 
 34 |     Returns:
 35 |         Dict[str, Any]: Prepared parameters
 36 |     """
 37 |     # Remove None values
 38 |     filtered = filter_none_values(params)
 39 | 
 40 |     # Handle special parameter formatting if needed
 41 |     if "filter" in filtered and isinstance(filtered["filter"], dict):
 42 |         # Convert filter dict to FQL string if needed
 43 |         pass
 44 | 
 45 |     return filtered
 46 | 
 47 | 
 48 | def extract_resources(
 49 |     response: Dict[str, Any],
 50 |     default: Optional[List[Dict[str, Any]]] = None,
 51 | ) -> List[Dict[str, Any]]:
 52 |     """Extract resources from an API response.
 53 | 
 54 |     Args:
 55 |         response: API response dictionary
 56 |         default: Default value if no resources are found
 57 | 
 58 |     Returns:
 59 |         List[Dict[str, Any]]: Extracted resources
 60 |     """
 61 |     if not is_success_response(response):
 62 |         return default if default is not None else []
 63 | 
 64 |     resources = response.get("body", {}).get("resources", [])
 65 |     return resources if resources else (default if default is not None else [])
 66 | 
 67 | 
 68 | def extract_first_resource(
 69 |     response: Dict[str, Any],
 70 |     operation: str,
 71 |     not_found_error: str = "Resource not found",
 72 | ) -> Dict[str, Any]:
 73 |     """Extract the first resource from an API response.
 74 | 
 75 |     Args:
 76 |         response: API response dictionary
 77 |         operation: The API operation that was performed
 78 |         not_found_error: Error message if no resources are found
 79 | 
 80 |     Returns:
 81 |         Dict[str, Any]: First resource or error response
 82 |     """
 83 |     resources = extract_resources(response)
 84 | 
 85 |     if not resources:
 86 |         return _format_error_response(not_found_error, operation=operation)
 87 | 
 88 |     return resources[0]
 89 | 
 90 | 
 91 | def sanitize_input(input_str: str) -> str:
 92 |     """Sanitize input string.
 93 | 
 94 |     Args:
 95 |         input_str: Input string to sanitize
 96 | 
 97 |     Returns:
 98 |         Sanitized string with dangerous characters removed
 99 |     """
100 |     if not isinstance(input_str, str):
101 |         return str(input_str)
102 | 
103 |     # Remove backslashes, quotes, and control characters that could be used for injection
104 |     sanitized = re.sub(r'[\\"\'\n\r\t]', "", input_str)
105 | 
106 |     # Additional safety: limit length to prevent excessively long inputs
107 |     return sanitized[:255]
108 | 
109 | 
110 | def generate_md_table(data: List[Tuple]) -> str:
111 |     """Generate a Markdown table from a list of tuples.
112 | 
113 |     This function creates a compact Markdown table with the provided data.
114 |     It's designed to minimize token usage while maintaining readability.
115 |     The first row of data is used as the header row.
116 | 
117 |     Args:
118 |         data: List of tuples where the first tuple contains the headers
119 |               and the remaining tuples contain the table data
120 | 
121 |     Returns:
122 |         str: Formatted Markdown table as a string
123 | 
124 |     Raises:
125 |         TypeError: If the first row (headers) contains non-string values
126 |         TypeError: If there are not at least 2 items (header and a value row)
127 |         ValueError: If the header row is empty
128 |         ValueError: If a row has more items than headers
129 |     """
130 |     if not data or len(data) < 2:
131 |         raise TypeError("Need at least 2 items. The header and a value row")
132 | 
133 |     # Extract headers from the first row
134 |     headers = data[0]
135 |     
136 |     # Check that the header row is not empty
137 |     if len(headers) == 0:
138 |         raise ValueError("Header row cannot be empty")
139 |     
140 |     # Check that all headers are strings
141 |     for header in headers:
142 |         if not isinstance(header, str):
143 |             raise TypeError(f"Header values must be strings, got {type(header).__name__}")
144 | 
145 |     # Use the remaining rows as data
146 |     rows = data[1:]
147 | 
148 |     # Create the table header, stripping spaces from header values
149 |     header_parts = []
150 |     for h in headers:
151 |         # Strip spaces from header values
152 |         header_parts.append(str(h).strip())
153 | 
154 |     header_row = "|" + "|".join(header_parts) + "|"
155 | 
156 |     # Create the separator row with the exact expected format
157 |     separator = "|-" * len(headers) + "|"
158 | 
159 |     # Build the table
160 |     table = [header_row, separator]
161 | 
162 |     for idx, row in enumerate(rows):
163 |         # Check if row has more items than headers
164 |         if len(row) > len(headers):
165 |             raise ValueError(f"Row {idx+1} has {len(row)} items, which is more than the {len(headers)} headers")
166 | 
167 |         # Convert row values to strings and handle special cases
168 |         row_values = []
169 |         for i, value in enumerate(row):
170 |             if i < len(headers):
171 |                 if value is None:
172 |                     row_values.append("")
173 |                 elif isinstance(value, bool):
174 |                     row_values.append(str(value).lower())
175 |                 elif isinstance(value, (int, float)):
176 |                     row_values.append(str(value))
177 |                 else:
178 |                     # Process multi-line text to create a clean, single-line representation
179 | 
180 |                     text = str(value)
181 |                     # Split text into lines, strip whitespace, and filter out empty lines
182 |                     non_empty_lines = [line.strip() for line in text.split('\n') if line.strip()]
183 |                     # Join the non-empty lines with a single space
184 |                     formatted_text = " ".join(non_empty_lines).strip()
185 |                     row_values.append(formatted_text)
186 | 
187 |         # Pad the row if it's shorter than headers
188 |         while len(row_values) < len(headers):
189 |             row_values.append("")
190 | 
191 |         # Add the row to the table
192 |         table.append("|" + "|".join(row_values) + "|")
193 | 
194 |     return "\n".join(table)
195 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/modules/spotlight.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Spotlight module for Falcon MCP Server
  3 | 
  4 | This module provides tools for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities.
  5 | """
  6 | 
  7 | from textwrap import dedent
  8 | from typing import Any, Dict, List
  9 | 
 10 | from mcp.server import FastMCP
 11 | from mcp.server.fastmcp.resources import TextResource
 12 | from pydantic import AnyUrl, Field
 13 | 
 14 | from falcon_mcp.common.errors import handle_api_response
 15 | from falcon_mcp.common.logging import get_logger
 16 | from falcon_mcp.common.utils import prepare_api_parameters
 17 | from falcon_mcp.modules.base import BaseModule
 18 | from falcon_mcp.resources.spotlight import SEARCH_VULNERABILITIES_FQL_DOCUMENTATION
 19 | 
 20 | logger = get_logger(__name__)
 21 | 
 22 | 
 23 | class SpotlightModule(BaseModule):
 24 |     """Module for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities."""
 25 | 
 26 |     def register_tools(self, server: FastMCP) -> None:
 27 |         """Register tools with the MCP server.
 28 | 
 29 |         Args:
 30 |             server: MCP server instance
 31 |         """
 32 |         # Register tools
 33 |         self._add_tool(
 34 |             server=server,
 35 |             method=self.search_vulnerabilities,
 36 |             name="search_vulnerabilities",
 37 |         )
 38 | 
 39 |     def register_resources(self, server: FastMCP) -> None:
 40 |         """Register resources with the MCP server.
 41 | 
 42 |         Args:
 43 |             server: MCP server instance
 44 |         """
 45 |         search_vulnerabilities_fql_resource = TextResource(
 46 |             uri=AnyUrl("falcon://spotlight/vulnerabilities/fql-guide"),
 47 |             name="falcon_search_vulnerabilities_fql_guide",
 48 |             description="Contains the guide for the `filter` param of the `falcon_search_vulnerabilities` tool.",
 49 |             text=SEARCH_VULNERABILITIES_FQL_DOCUMENTATION,
 50 |         )
 51 | 
 52 |         self._add_resource(
 53 |             server,
 54 |             search_vulnerabilities_fql_resource,
 55 |         )
 56 | 
 57 |     def search_vulnerabilities(
 58 |         self,
 59 |         filter: str | None = Field(
 60 |             default=None,
 61 |             description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://spotlight/vulnerabilities/fql-guide` resource when building this filter parameter.",
 62 |             examples={"status:'open'", "cve.severity:'HIGH'"},
 63 |         ),
 64 |         limit: int = Field(
 65 |             default=10,
 66 |             ge=1,
 67 |             le=5000,
 68 |             description="Maximum number of results to return. (Max: 5000, Default: 10)",
 69 |         ),
 70 |         offset: int | None = Field(
 71 |             default=None,
 72 |             description="Starting index of overall result set from which to return results.",
 73 |         ),
 74 |         sort: str | None = Field(
 75 |             default=None,
 76 |             description=dedent("""
 77 |                 Sort vulnerabilities using FQL syntax.
 78 | 
 79 |                 Supported sorting fields:
 80 |                 • created_timestamp: When the vulnerability was found
 81 |                 • closed_timestamp: When the vulnerability was closed
 82 |                 • updated_timestamp: When the vulnerability was last updated
 83 | 
 84 |                 Sort either asc (ascending) or desc (descending).
 85 |                 Format: 'field|direction'
 86 | 
 87 |                 Examples: 'created_timestamp|desc', 'updated_timestamp|desc', 'closed_timestamp|asc'
 88 |             """).strip(),
 89 |             examples={
 90 |                 "created_timestamp|desc",
 91 |                 "updated_timestamp|desc",
 92 |                 "closed_timestamp|asc",
 93 |             },
 94 |         ),
 95 |         after: str | None = Field(
 96 |             default=None,
 97 |             description="A pagination token used with the limit parameter to manage pagination of results. On your first request, don't provide an after token. On subsequent requests, provide the after token from the previous response to continue from that place in the results.",
 98 |         ),
 99 |         facet: str | None = Field(
100 |             default=None,
101 |             description=dedent("""
102 |                 Important: Use only one value!
103 | 
104 |                 Select various detail blocks to be returned for each vulnerability.
105 | 
106 |                 Supported values:
107 |                 • host_info: Include host/asset information and context
108 |                 • remediation: Include remediation and fix information
109 |                 • cve: Include CVE details, scoring, and metadata
110 |                 • evaluation_logic: Include vulnerability assessment methodology
111 | 
112 |                 Use host_info when you need asset context, remediation for fix information,
113 |                 cve for detailed vulnerability scoring, and evaluation_logic for assessment details.
114 | 
115 |                 Examples: 'host_info', 'cve', 'remediation'
116 |             """).strip(),
117 |             examples={"host_info", "cve", "remediation", "evaluation_logic"},
118 |         ),
119 |     ) -> List[Dict[str, Any]]:
120 |         """Search for vulnerabilities in your CrowdStrike environment.
121 | 
122 |         IMPORTANT: You must use the `falcon://spotlight/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
123 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_vulnerabilities` tool.
124 |         """
125 |         # Prepare parameters for combinedQueryVulnerabilities
126 |         params = prepare_api_parameters(
127 |             {
128 |                 "filter": filter,
129 |                 "limit": limit,
130 |                 "offset": offset,
131 |                 "sort": sort,
132 |                 "after": after,
133 |                 "facet": facet,
134 |             }
135 |         )
136 | 
137 |         # Define the operation name
138 |         operation = "combinedQueryVulnerabilities"
139 | 
140 |         logger.debug("Searching vulnerabilities with params: %s", params)
141 | 
142 |         # Make the API request
143 |         response = self.client.command(operation, parameters=params)
144 | 
145 |         # Use handle_api_response to get vulnerability data
146 |         vulnerabilities = handle_api_response(
147 |             response,
148 |             operation=operation,
149 |             error_message="Failed to search vulnerabilities",
150 |             default_result=[],
151 |         )
152 | 
153 |         # If handle_api_response returns an error dict instead of a list,
154 |         # it means there was an error, so we return it wrapped in a list
155 |         if self._is_error(vulnerabilities):
156 |             return [vulnerabilities]
157 | 
158 |         return vulnerabilities
159 | 
```

--------------------------------------------------------------------------------
/tests/test_streamable_http_transport.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for streamable-http transport functionality.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | from falcon_mcp.server import FalconMCPServer
  9 | 
 10 | 
 11 | class TestStreamableHttpTransport(unittest.TestCase):
 12 |     """Test cases for streamable-http transport."""
 13 | 
 14 |     @patch("falcon_mcp.server.FalconClient")
 15 |     @patch("falcon_mcp.server.FastMCP")
 16 |     @patch("falcon_mcp.server.uvicorn")
 17 |     def test_streamable_http_transport_initialization(
 18 |         self,
 19 |         mock_uvicorn,
 20 |         mock_fastmcp,
 21 |         mock_client,
 22 |     ):
 23 |         """Test streamable-http transport initialization."""
 24 |         # Setup mocks
 25 |         mock_client_instance = MagicMock()
 26 |         mock_client_instance.authenticate.return_value = True
 27 |         mock_client.return_value = mock_client_instance
 28 | 
 29 |         mock_server_instance = MagicMock()
 30 |         mock_app = MagicMock()
 31 |         mock_server_instance.streamable_http_app.return_value = mock_app
 32 |         mock_fastmcp.return_value = mock_server_instance
 33 | 
 34 |         # Create server
 35 |         server = FalconMCPServer(debug=True)
 36 | 
 37 |         # Test streamable-http transport
 38 |         server.run("streamable-http", host="0.0.0.0", port=8080)
 39 | 
 40 |         # Verify uvicorn was called with correct parameters
 41 |         mock_uvicorn.run.assert_called_once_with(
 42 |             mock_app, host="0.0.0.0", port=8080, log_level="debug"
 43 |         )
 44 | 
 45 |         # Verify streamable_http_app was called
 46 |         mock_server_instance.streamable_http_app.assert_called_once()
 47 | 
 48 |     @patch("falcon_mcp.server.FalconClient")
 49 |     @patch("falcon_mcp.server.FastMCP")
 50 |     @patch("falcon_mcp.server.uvicorn")
 51 |     def test_streamable_http_default_parameters(
 52 |         self,
 53 |         mock_uvicorn,
 54 |         mock_fastmcp,
 55 |         mock_client,
 56 |     ):
 57 |         """Test streamable-http transport with default parameters."""
 58 |         # Setup mocks
 59 |         mock_client_instance = MagicMock()
 60 |         mock_client_instance.authenticate.return_value = True
 61 |         mock_client.return_value = mock_client_instance
 62 | 
 63 |         mock_server_instance = MagicMock()
 64 |         mock_app = MagicMock()
 65 |         mock_server_instance.streamable_http_app.return_value = mock_app
 66 |         mock_fastmcp.return_value = mock_server_instance
 67 | 
 68 |         # Create server
 69 |         server = FalconMCPServer(debug=False)
 70 | 
 71 |         # Test streamable-http transport with defaults
 72 |         server.run("streamable-http")
 73 | 
 74 |         # Verify uvicorn was called with default parameters
 75 |         mock_uvicorn.run.assert_called_once_with(
 76 |             mock_app,
 77 |             host="127.0.0.1",
 78 |             port=8000,
 79 |             log_level="info",
 80 |         )
 81 | 
 82 |     @patch("falcon_mcp.server.FalconClient")
 83 |     @patch("falcon_mcp.server.FastMCP")
 84 |     def test_non_streamable_http_transport_unchanged(
 85 |         self,
 86 |         mock_fastmcp,
 87 |         mock_client,
 88 |     ):
 89 |         """Test that non-streamable-http transports use the original method."""
 90 |         # Setup mocks
 91 |         mock_client_instance = MagicMock()
 92 |         mock_client_instance.authenticate.return_value = True
 93 |         mock_client.return_value = mock_client_instance
 94 | 
 95 |         mock_server_instance = MagicMock()
 96 |         mock_fastmcp.return_value = mock_server_instance
 97 | 
 98 |         # Create server
 99 |         server = FalconMCPServer()
100 | 
101 |         # Test stdio transport (should use original method)
102 |         server.run("stdio")
103 | 
104 |         # Verify the original run method was called
105 |         mock_server_instance.run.assert_called_once_with("stdio")
106 | 
107 |         # Verify streamable_http_app was NOT called
108 |         mock_server_instance.streamable_http_app.assert_not_called()
109 | 
110 |     @patch("falcon_mcp.server.FalconClient")
111 |     @patch("falcon_mcp.server.FastMCP")
112 |     @patch("falcon_mcp.server.uvicorn")
113 |     def test_streamable_http_custom_parameters(
114 |         self,
115 |         mock_uvicorn,
116 |         mock_fastmcp,
117 |         mock_client,
118 |     ):
119 |         """Test streamable-http transport with custom parameters."""
120 |         # Setup mocks
121 |         mock_client_instance = MagicMock()
122 |         mock_client_instance.authenticate.return_value = True
123 |         mock_client.return_value = mock_client_instance
124 | 
125 |         mock_server_instance = MagicMock()
126 |         mock_app = MagicMock()
127 |         mock_server_instance.streamable_http_app.return_value = mock_app
128 |         mock_fastmcp.return_value = mock_server_instance
129 | 
130 |         # Create server
131 |         server = FalconMCPServer(debug=True)
132 | 
133 |         # Test streamable-http transport with custom parameters
134 |         server.run("streamable-http", host="192.168.1.100", port=9000)
135 | 
136 |         # Verify uvicorn was called with custom parameters
137 |         mock_uvicorn.run.assert_called_once_with(
138 |             mock_app,
139 |             host="192.168.1.100",
140 |             port=9000,
141 |             log_level="debug",
142 |         )
143 | 
144 |     @patch("falcon_mcp.server.FalconClient")
145 |     @patch("falcon_mcp.server.FastMCP")
146 |     @patch("falcon_mcp.server.uvicorn")
147 |     def test_streamable_http_logging_levels(
148 |         self,
149 |         mock_uvicorn,
150 |         mock_fastmcp,
151 |         mock_client,
152 |     ):
153 |         """Test streamable-http transport logging level configuration."""
154 |         # Setup mocks
155 |         mock_client_instance = MagicMock()
156 |         mock_client_instance.authenticate.return_value = True
157 |         mock_client.return_value = mock_client_instance
158 | 
159 |         mock_server_instance = MagicMock()
160 |         mock_app = MagicMock()
161 |         mock_server_instance.streamable_http_app.return_value = mock_app
162 |         mock_fastmcp.return_value = mock_server_instance
163 | 
164 |         # Test with debug=True
165 |         server_debug = FalconMCPServer(debug=True)
166 |         server_debug.run("streamable-http")
167 | 
168 |         # Verify debug log level
169 |         mock_uvicorn.run.assert_called_with(
170 |             mock_app,
171 |             host="127.0.0.1",
172 |             port=8000,
173 |             log_level="debug",
174 |         )
175 | 
176 |         # Reset mock
177 |         mock_uvicorn.reset_mock()
178 | 
179 |         # Test with debug=False
180 |         server_info = FalconMCPServer(debug=False)
181 |         server_info.run("streamable-http")
182 | 
183 |         # Verify info log level
184 |         mock_uvicorn.run.assert_called_with(
185 |             mock_app,
186 |             host="127.0.0.1",
187 |             port=8000,
188 |             log_level="info",
189 |         )
190 | 
191 | 
192 | if __name__ == "__main__":
193 |     unittest.main()
194 | 
```

--------------------------------------------------------------------------------
/tests/modules/test_base.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Base module.
  3 | """
  4 | 
  5 | import unittest
  6 | 
  7 | from falcon_mcp.modules.base import BaseModule
  8 | from tests.modules.utils.test_modules import TestModules
  9 | 
 10 | 
 11 | class ConcreteBaseModule(BaseModule):
 12 |     """Concrete implementation of BaseModule for testing."""
 13 | 
 14 |     def register_tools(self, server):
 15 |         """Implement abstract method."""
 16 | 
 17 | 
 18 | class TestBaseModule(TestModules):
 19 |     """Test cases for the Base module."""
 20 | 
 21 |     def setUp(self):
 22 |         """Set up test fixtures."""
 23 |         self.setup_module(ConcreteBaseModule)
 24 | 
 25 |     def test_is_error_with_error_dict(self):
 26 |         """Test _is_error with a dictionary containing an error key."""
 27 |         response = {"error": "Something went wrong", "details": "Error details"}
 28 |         result = self.module._is_error(response)
 29 |         self.assertTrue(result)
 30 | 
 31 |     def test_is_error_with_non_error_dict(self):
 32 |         """Test _is_error with a dictionary not containing an error key."""
 33 |         response = {"status": "success", "data": "Some data"}
 34 |         result = self.module._is_error(response)
 35 |         self.assertFalse(result)
 36 | 
 37 |     def test_is_error_with_non_dict(self):
 38 |         """Test _is_error with a non-dictionary value."""
 39 |         # Test with a list
 40 |         response = ["item1", "item2"]
 41 |         result = self.module._is_error(response)
 42 |         self.assertFalse(result)
 43 | 
 44 |         # Test with a string
 45 |         response = "This is a string response"
 46 |         result = self.module._is_error(response)
 47 |         self.assertFalse(result)
 48 | 
 49 |         # Test with None
 50 |         response = None
 51 |         result = self.module._is_error(response)
 52 |         self.assertFalse(result)
 53 | 
 54 |         # Test with an integer
 55 |         response = 42
 56 |         result = self.module._is_error(response)
 57 |         self.assertFalse(result)
 58 | 
 59 |     def test_base_get_by_ids_default_behavior(self):
 60 |         """Test _base_get_by_ids with default parameters (backward compatibility)."""
 61 |         # Setup mock response
 62 |         mock_response = {
 63 |             "status_code": 200,
 64 |             "body": {
 65 |                 "resources": [
 66 |                     {"id": "test1", "name": "Test Item 1"},
 67 |                     {"id": "test2", "name": "Test Item 2"},
 68 |                 ]
 69 |             },
 70 |         }
 71 |         self.mock_client.command.return_value = mock_response
 72 | 
 73 |         # Call _base_get_by_ids with default parameters
 74 |         result = self.module._base_get_by_ids("TestOperation", ["test1", "test2"])
 75 | 
 76 |         # Verify client command was called correctly with default "ids" key
 77 |         self.mock_client.command.assert_called_once_with(
 78 |             "TestOperation", body={"ids": ["test1", "test2"]}
 79 |         )
 80 | 
 81 |         # Verify result
 82 |         expected_result = [
 83 |             {"id": "test1", "name": "Test Item 1"},
 84 |             {"id": "test2", "name": "Test Item 2"},
 85 |         ]
 86 |         self.assertEqual(result, expected_result)
 87 | 
 88 |     def test_base_get_by_ids_custom_id_key(self):
 89 |         """Test _base_get_by_ids with custom id_key parameter."""
 90 |         # Setup mock response
 91 |         mock_response = {
 92 |             "status_code": 200,
 93 |             "body": {
 94 |                 "resources": [
 95 |                     {"composite_id": "alert1", "status": "new"},
 96 |                     {"composite_id": "alert2", "status": "closed"},
 97 |                 ]
 98 |             },
 99 |         }
100 |         self.mock_client.command.return_value = mock_response
101 | 
102 |         # Call _base_get_by_ids with custom id_key
103 |         result = self.module._base_get_by_ids(
104 |             "PostEntitiesAlertsV2", ["alert1", "alert2"], id_key="composite_ids"
105 |         )
106 | 
107 |         # Verify client command was called correctly with custom key
108 |         self.mock_client.command.assert_called_once_with(
109 |             "PostEntitiesAlertsV2", body={"composite_ids": ["alert1", "alert2"]}
110 |         )
111 | 
112 |         # Verify result
113 |         expected_result = [
114 |             {"composite_id": "alert1", "status": "new"},
115 |             {"composite_id": "alert2", "status": "closed"},
116 |         ]
117 |         self.assertEqual(result, expected_result)
118 | 
119 |     def test_base_get_by_ids_with_additional_params(self):
120 |         """Test _base_get_by_ids with additional parameters."""
121 |         # Setup mock response
122 |         mock_response = {
123 |             "status_code": 200,
124 |             "body": {
125 |                 "resources": [
126 |                     {"composite_id": "alert1", "status": "new", "hidden": False}
127 |                 ]
128 |             },
129 |         }
130 |         self.mock_client.command.return_value = mock_response
131 | 
132 |         # Call _base_get_by_ids with additional parameters
133 |         result = self.module._base_get_by_ids(
134 |             "PostEntitiesAlertsV2",
135 |             ["alert1"],
136 |             id_key="composite_ids",
137 |             include_hidden=True,
138 |             sort_by="created_timestamp",
139 |         )
140 | 
141 |         # Verify client command was called correctly with all parameters
142 |         self.mock_client.command.assert_called_once_with(
143 |             "PostEntitiesAlertsV2",
144 |             body={
145 |                 "composite_ids": ["alert1"],
146 |                 "include_hidden": True,
147 |                 "sort_by": "created_timestamp",
148 |             },
149 |         )
150 | 
151 |         # Verify result
152 |         expected_result = [{"composite_id": "alert1", "status": "new", "hidden": False}]
153 |         self.assertEqual(result, expected_result)
154 | 
155 |     def test_base_get_by_ids_error_handling(self):
156 |         """Test _base_get_by_ids error handling."""
157 |         # Setup mock error response
158 |         mock_response = {
159 |             "status_code": 400,
160 |             "body": {"errors": [{"message": "Invalid request"}]},
161 |         }
162 |         self.mock_client.command.return_value = mock_response
163 | 
164 |         # Call _base_get_by_ids
165 |         result = self.module._base_get_by_ids("TestOperation", ["invalid_id"])
166 | 
167 |         # Verify error handling - should return error dict
168 |         self.assertIn("error", result)
169 |         self.assertIn("Failed to perform operation", result["error"])
170 | 
171 |     def test_base_get_by_ids_empty_response(self):
172 |         """Test _base_get_by_ids with empty resources."""
173 |         # Setup mock response with empty resources
174 |         mock_response = {"status_code": 200, "body": {"resources": []}}
175 |         self.mock_client.command.return_value = mock_response
176 | 
177 |         # Call _base_get_by_ids
178 |         result = self.module._base_get_by_ids("TestOperation", ["nonexistent"])
179 | 
180 |         # Verify result is empty list
181 |         self.assertEqual(result, [])
182 | 
183 | 
184 | if __name__ == "__main__":
185 |     unittest.main()
186 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/modules/hosts.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Hosts module for Falcon MCP Server
  3 | 
  4 | This module provides tools for accessing and managing CrowdStrike Falcon hosts/devices.
  5 | """
  6 | 
  7 | from textwrap import dedent
  8 | from typing import Any, Dict, List
  9 | 
 10 | from mcp.server import FastMCP
 11 | from mcp.server.fastmcp.resources import TextResource
 12 | from pydantic import AnyUrl, Field
 13 | 
 14 | from falcon_mcp.common.errors import handle_api_response
 15 | from falcon_mcp.common.logging import get_logger
 16 | from falcon_mcp.common.utils import prepare_api_parameters
 17 | from falcon_mcp.modules.base import BaseModule
 18 | from falcon_mcp.resources.hosts import SEARCH_HOSTS_FQL_DOCUMENTATION
 19 | 
 20 | logger = get_logger(__name__)
 21 | 
 22 | 
 23 | class HostsModule(BaseModule):
 24 |     """Module for accessing and managing CrowdStrike Falcon hosts/devices."""
 25 | 
 26 |     def register_tools(self, server: FastMCP) -> None:
 27 |         """Register tools with the MCP server.
 28 | 
 29 |         Args:
 30 |             server: MCP server instance
 31 |         """
 32 |         # Register tools
 33 |         self._add_tool(
 34 |             server=server,
 35 |             method=self.search_hosts,
 36 |             name="search_hosts",
 37 |         )
 38 | 
 39 |         self._add_tool(
 40 |             server=server,
 41 |             method=self.get_host_details,
 42 |             name="get_host_details",
 43 |         )
 44 | 
 45 |     def register_resources(self, server: FastMCP) -> None:
 46 |         """Register resources with the MCP server.
 47 | 
 48 |         Args:
 49 |             server: MCP server instance
 50 |         """
 51 |         search_hosts_fql_resource = TextResource(
 52 |             uri=AnyUrl("falcon://hosts/search/fql-guide"),
 53 |             name="falcon_search_hosts_fql_guide",
 54 |             description="Contains the guide for the `filter` param of the `falcon_search_hosts` tool.",
 55 |             text=SEARCH_HOSTS_FQL_DOCUMENTATION,
 56 |         )
 57 | 
 58 |         self._add_resource(
 59 |             server,
 60 |             search_hosts_fql_resource,
 61 |         )
 62 | 
 63 |     def search_hosts(
 64 |         self,
 65 |         filter: str | None = Field(
 66 |             default=None,
 67 |             description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://hosts/search/fql-guide` resource when building this filter parameter.",
 68 |             examples={"platform_name:'Windows'", "hostname:'PC*'"},
 69 |         ),
 70 |         limit: int = Field(
 71 |             default=10,
 72 |             ge=1,
 73 |             le=5000,
 74 |             description="The maximum records to return. [1-5000]",
 75 |         ),
 76 |         offset: int | None = Field(
 77 |             default=None,
 78 |             description="The offset to start retrieving records from.",
 79 |         ),
 80 |         sort: str | None = Field(
 81 |             default=None,
 82 |             description=dedent("""
 83 |                 Sort hosts using these options:
 84 | 
 85 |                 hostname: Host name/computer name
 86 |                 last_seen: Timestamp when the host was last seen
 87 |                 first_seen: Timestamp when the host was first seen
 88 |                 modified_timestamp: When the host record was last modified
 89 |                 platform_name: Operating system platform
 90 |                 agent_version: CrowdStrike agent version
 91 |                 os_version: Operating system version
 92 |                 external_ip: External IP address
 93 | 
 94 |                 Sort either asc (ascending) or desc (descending).
 95 |                 Both formats are supported: 'hostname.desc' or 'hostname|desc'
 96 | 
 97 |                 Examples: 'hostname.asc', 'last_seen.desc', 'platform_name.asc'
 98 |             """).strip(),
 99 |             examples={"hostname.asc", "last_seen.desc"},
100 |         ),
101 |     ) -> List[Dict[str, Any]]:
102 |         """Search for hosts in your CrowdStrike environment.
103 | 
104 |         IMPORTANT: You must use the `falcon://hosts/search/fql-guide` resource when you need to use the `filter` parameter.
105 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_hosts` tool.
106 |         """
107 |         # Prepare parameters for QueryDevicesByFilter
108 |         params = prepare_api_parameters(
109 |             {
110 |                 "filter": filter,
111 |                 "limit": limit,
112 |                 "offset": offset,
113 |                 "sort": sort,
114 |             }
115 |         )
116 | 
117 |         # Define the operation name
118 |         operation = "QueryDevicesByFilter"
119 | 
120 |         logger.debug("Searching hosts with params: %s", params)
121 | 
122 |         # Make the API request to get device IDs
123 |         response = self.client.command(operation, parameters=params)
124 | 
125 |         # Use handle_api_response to get device IDs
126 |         device_ids = handle_api_response(
127 |             response,
128 |             operation=operation,
129 |             error_message="Failed to search hosts",
130 |             default_result=[],
131 |         )
132 | 
133 |         # If handle_api_response returns an error dict instead of a list,
134 |         # it means there was an error, so we return it wrapped in a list
135 |         if self._is_error(device_ids):
136 |             return [device_ids]
137 | 
138 |         # If we have device IDs, get the details for each one
139 |         if device_ids:
140 |             # Use the base method to get device details
141 |             details = self._base_get_by_ids(
142 |                 operation="PostDeviceDetailsV2",
143 |                 ids=device_ids,
144 |                 id_key="ids",
145 |             )
146 | 
147 |             # If handle_api_response returns an error dict instead of a list,
148 |             # it means there was an error, so we return it wrapped in a list
149 |             if self._is_error(details):
150 |                 return [details]
151 | 
152 |             return details
153 | 
154 |         return []
155 | 
156 |     def get_host_details(
157 |         self,
158 |         ids: List[str] = Field(
159 |             description="Host device IDs to retrieve details for. You can get device IDs from the search_hosts operation, the Falcon console, or the Streaming API. Maximum: 5000 IDs per request."
160 |         ),
161 |     ) -> List[Dict[str, Any]] | Dict[str, Any]:
162 |         """Retrieve detailed information for specified host device IDs.
163 | 
164 |         This tool returns comprehensive host details for one or more device IDs.
165 |         Use this when you already have specific device IDs and need their full details.
166 |         For searching/discovering hosts, use the `falcon_search_hosts` tool instead.
167 |         """
168 |         logger.debug("Getting host details for IDs: %s", ids)
169 | 
170 |         # Handle empty list case - return empty list without making API call
171 |         if not ids:
172 |             return []
173 | 
174 |         # Use the base method to get device details
175 |         return self._base_get_by_ids(
176 |             operation="PostDeviceDetailsV2",
177 |             ids=ids,
178 |             id_key="ids",
179 |         )
180 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/client.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Falcon API Client for MCP Server
  3 | 
  4 | This module provides the Falcon API client and authentication utilities for the Falcon MCP server.
  5 | """
  6 | 
  7 | import os
  8 | import platform
  9 | import sys
 10 | from importlib.metadata import PackageNotFoundError, version
 11 | from typing import Any, Dict, Optional
 12 | 
 13 | # Import the APIHarnessV2 from FalconPy
 14 | from falconpy import APIHarnessV2
 15 | 
 16 | from falcon_mcp.common.logging import get_logger
 17 | 
 18 | logger = get_logger(__name__)
 19 | 
 20 | 
 21 | class FalconClient:
 22 |     """Client for interacting with the CrowdStrike Falcon API."""
 23 | 
 24 |     def __init__(
 25 |         self,
 26 |         base_url: Optional[str] = None,
 27 |         debug: bool = False,
 28 |         user_agent_comment: Optional[str] = None,
 29 |     ):
 30 |         """Initialize the Falcon client.
 31 | 
 32 |         Args:
 33 |             base_url: Falcon API base URL (defaults to FALCON_BASE_URL env var)
 34 |             debug: Enable debug logging
 35 |             user_agent_comment: Additional information to include in the User-Agent comment section
 36 |         """
 37 |         # Get credentials from environment variables
 38 |         self.client_id = os.environ.get("FALCON_CLIENT_ID")
 39 |         self.client_secret = os.environ.get("FALCON_CLIENT_SECRET")
 40 |         self.base_url = base_url or os.environ.get(
 41 |             "FALCON_BASE_URL", "https://api.crowdstrike.com"
 42 |         )
 43 |         self.debug = debug
 44 |         self.user_agent_comment = user_agent_comment or os.environ.get(
 45 |             "FALCON_MCP_USER_AGENT_COMMENT"
 46 |         )
 47 | 
 48 |         if not self.client_id or not self.client_secret:
 49 |             raise ValueError(
 50 |                 "Falcon API credentials not provided. Set FALCON_CLIENT_ID and "
 51 |                 "FALCON_CLIENT_SECRET environment variables."
 52 |             )
 53 | 
 54 |         # Initialize the Falcon API client using APIHarnessV2
 55 |         self.client = APIHarnessV2(
 56 |             client_id=self.client_id,
 57 |             client_secret=self.client_secret,
 58 |             base_url=self.base_url,
 59 |             debug=debug,
 60 |             user_agent=self.get_user_agent(),
 61 |         )
 62 | 
 63 |         logger.debug("Initialized Falcon client with base URL: %s", self.base_url)
 64 | 
 65 |     def authenticate(self) -> bool:
 66 |         """Authenticate with the Falcon API.
 67 | 
 68 |         Returns:
 69 |             bool: True if authentication was successful
 70 |         """
 71 |         return self.client.login()
 72 | 
 73 |     def is_authenticated(self) -> bool:
 74 |         """Check if the client is authenticated.
 75 | 
 76 |         Returns:
 77 |             bool: True if the client is authenticated
 78 |         """
 79 |         return self.client.token_valid
 80 | 
 81 |     def command(self, operation: str, **kwargs) -> Dict[str, Any]:
 82 |         """Execute a Falcon API command.
 83 | 
 84 |         Args:
 85 |             operation: The API operation to execute
 86 |             **kwargs: Additional arguments to pass to the API
 87 | 
 88 |         Returns:
 89 |             Dict[str, Any]: The API response
 90 |         """
 91 |         return self.client.command(operation, **kwargs)
 92 | 
 93 |     def get_user_agent(self) -> str:
 94 |         """Get RFC-compliant user agent string for API requests.
 95 | 
 96 |         Returns:
 97 |             str: User agent string in RFC format "falcon-mcp/VERSION (comment; falconpy/VERSION; Python/VERSION; Platform/VERSION)"
 98 |         """
 99 |         # Get falcon-mcp version
100 |         falcon_mcp_version = get_version()
101 | 
102 |         # Get Python version
103 |         python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
104 | 
105 |         # Get platform information
106 |         platform_info = f"{platform.system()}/{platform.release()}"
107 | 
108 |         # Get FalconPy version
109 |         try:
110 |             falconpy_version = version("crowdstrike-falconpy")
111 |         except PackageNotFoundError:
112 |             falconpy_version = "unknown"
113 |             logger.debug("crowdstrike-falconpy package version not found")
114 | 
115 |         # Build comment section components (RFC-compliant format)
116 |         comment_parts = []
117 |         if self.user_agent_comment:
118 |             comment_parts.append(self.user_agent_comment.strip())
119 |         comment_parts.extend(
120 |             [f"falconpy/{falconpy_version}", f"Python/{python_version}", platform_info]
121 |         )
122 | 
123 |         return f"falcon-mcp/{falcon_mcp_version} ({'; '.join(comment_parts)})"
124 | 
125 |     def get_headers(self) -> Dict[str, str]:
126 |         """Get authentication headers for API requests.
127 | 
128 |         This method returns the authentication headers from the underlying Falcon API client,
129 |         which can be used for custom HTTP requests or advanced integration scenarios.
130 | 
131 |         Returns:
132 |             Dict[str, str]: Authentication headers including the bearer token
133 |         """
134 |         return self.client.auth_headers
135 | 
136 | 
137 | def get_version() -> str:
138 |     """Get falcon-mcp version with multiple fallback methods.
139 | 
140 |     This function tries multiple methods to determine the version:
141 |     1. importlib.metadata (works when package is properly installed)
142 |     2. pyproject.toml (works in development/Docker environments)
143 |     3. Hardcoded fallback
144 | 
145 |     Returns:
146 |         str: The version string
147 |     """
148 |     # Try importlib.metadata first (works when properly installed)
149 |     try:
150 |         return version("falcon-mcp")
151 |     except PackageNotFoundError:
152 |         logger.debug(
153 |             "falcon-mcp package not found via importlib.metadata, trying pyproject.toml"
154 |         )
155 | 
156 |     # Try reading from pyproject.toml (works in development/Docker)
157 |     try:
158 |         import pathlib
159 |         import tomllib  # Python 3.11+
160 | 
161 |         # Look for pyproject.toml in current directory and parent directories
162 |         current_path = pathlib.Path(__file__).parent
163 |         for _ in range(3):  # Check up to 3 levels up
164 |             pyproject_path = current_path / "pyproject.toml"
165 |             if pyproject_path.exists():
166 |                 with open(pyproject_path, "rb") as f:
167 |                     data = tomllib.load(f)
168 |                     version_str = data["project"]["version"]
169 |                     logger.debug(
170 |                         "Found version %s in pyproject.toml at %s",
171 |                         version_str,
172 |                         pyproject_path,
173 |                     )
174 |                     return version_str
175 |             current_path = current_path.parent
176 | 
177 |         logger.debug("pyproject.toml not found in current or parent directories")
178 |     except (KeyError, ImportError, OSError, TypeError) as e:
179 |         logger.debug("Failed to read version from pyproject.toml: %s", e)
180 | 
181 |     # Final fallback
182 |     fallback_version = "0.1.0"
183 |     logger.debug("Using fallback version: %s", fallback_version)
184 |     return fallback_version
185 | 
```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_spotlight.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | E2E tests for the Spotlight module.
  3 | """
  4 | 
  5 | import json
  6 | import unittest
  7 | 
  8 | import pytest
  9 | 
 10 | from tests.e2e.utils.base_e2e_test import BaseE2ETest
 11 | 
 12 | 
 13 | @pytest.mark.e2e
 14 | class TestSpotlightModuleE2E(BaseE2ETest):
 15 |     """
 16 |     End-to-end test suite for the Falcon MCP Server Spotlight Module.
 17 |     """
 18 | 
 19 |     def test_search_high_severity_vulnerabilities(self):
 20 |         """Verify the agent can search for high severity vulnerabilities."""
 21 | 
 22 |         async def test_logic():
 23 |             fixtures = [
 24 |                 {
 25 |                     "operation": "combinedQueryVulnerabilities",
 26 |                     "validator": lambda kwargs: "high"
 27 |                     in kwargs.get("parameters", {}).get("filter", "").lower(),
 28 |                     "response": {
 29 |                         "status_code": 200,
 30 |                         "body": {
 31 |                             "resources": [
 32 |                                 {
 33 |                                     "id": "vuln-001",
 34 |                                     "cve": {
 35 |                                         "id": "CVE-2024-1234",
 36 |                                         "base_score": 8.5,
 37 |                                         "severity": "HIGH",
 38 |                                         "exprt_rating": "HIGH",
 39 |                                         "exploit_status": 60,
 40 |                                         "is_cisa_kev": True,
 41 |                                         "description": "Critical buffer overflow vulnerability in network service",
 42 |                                     },
 43 |                                     "status": "open",
 44 |                                     "created_timestamp": "2024-01-15T10:30:00Z",
 45 |                                     "updated_timestamp": "2024-01-20T14:15:00Z",
 46 |                                     "host_info": {
 47 |                                         "hostname": "web-server-01",
 48 |                                         "platform_name": "Linux",
 49 |                                         "asset_criticality": "Critical",
 50 |                                         "internet_exposure": "Yes",
 51 |                                         "managed_by": "Falcon sensor",
 52 |                                     },
 53 |                                     "apps": {
 54 |                                         "application_name": "Apache HTTP Server",
 55 |                                         "application_version": "2.4.41",
 56 |                                     },
 57 |                                 },
 58 |                                 {
 59 |                                     "id": "vuln-002",
 60 |                                     "cve": {
 61 |                                         "id": "CVE-2024-5678",
 62 |                                         "base_score": 7.8,
 63 |                                         "severity": "HIGH",
 64 |                                         "exprt_rating": "MEDIUM",
 65 |                                         "exploit_status": 30,
 66 |                                         "is_cisa_kev": False,
 67 |                                         "description": "Privilege escalation vulnerability in system service",
 68 |                                     },
 69 |                                     "status": "open",
 70 |                                     "created_timestamp": "2024-01-18T08:45:00Z",
 71 |                                     "updated_timestamp": "2024-01-19T16:20:00Z",
 72 |                                     "host_info": {
 73 |                                         "hostname": "db-server-02",
 74 |                                         "platform_name": "Windows",
 75 |                                         "asset_criticality": "High",
 76 |                                         "internet_exposure": "No",
 77 |                                         "managed_by": "Falcon sensor",
 78 |                                     },
 79 |                                     "apps": {
 80 |                                         "application_name": "Microsoft SQL Server",
 81 |                                         "application_version": "2019",
 82 |                                     },
 83 |                                 },
 84 |                             ]
 85 |                         },
 86 |                     },
 87 |                 }
 88 |             ]
 89 | 
 90 |             self._mock_api_instance.command.side_effect = (
 91 |                 self._create_mock_api_side_effect(fixtures)
 92 |             )
 93 | 
 94 |             prompt = "Find all high severity vulnerabilities in our environment and show me their CVE details and affected hosts"
 95 |             return await self._run_agent_stream(prompt)
 96 | 
 97 |         def assertions(tools, result):
 98 |             self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
 99 |             used_tool = tools[len(tools) - 1]
100 |             self.assertEqual(
101 |                 used_tool["input"]["tool_name"], "falcon_search_vulnerabilities"
102 |             )
103 | 
104 |             # Check for high severity filtering
105 |             tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
106 |             self.assertTrue(
107 |                 "high" in tool_input_str,
108 |                 f"Expected high severity filtering in tool input: {tool_input_str}",
109 |             )
110 | 
111 |             # Verify both vulnerabilities are in the output
112 |             self.assertIn("CVE-2024-1234", used_tool["output"])
113 |             self.assertIn("CVE-2024-5678", used_tool["output"])
114 |             self.assertIn("web-server-01", used_tool["output"])
115 |             self.assertIn("db-server-02", used_tool["output"])
116 | 
117 |             # Verify API call was made correctly
118 |             self.assertGreaterEqual(
119 |                 self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
120 |             )
121 | 
122 |             # Check API call (combinedQueryVulnerabilities)
123 |             api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
124 |                 "parameters", {}
125 |             )
126 |             filter_str = api_call_params.get("filter", "").lower()
127 |             self.assertTrue(
128 |                 "high" in filter_str,
129 |                 f"Expected high severity filtering in API call: {filter_str}",
130 |             )
131 | 
132 |             # Verify result contains expected information
133 |             self.assertIn("CVE-2024-1234", result)
134 |             self.assertIn("CVE-2024-5678", result)
135 |             self.assertIn("web-server-01", result)
136 |             self.assertIn("db-server-02", result)
137 |             self.assertIn("8.5", result)  # Should contain CVSS scores
138 |             self.assertIn("7.8", result)
139 | 
140 |         self.run_test_with_retries(
141 |             "test_search_high_severity_vulnerabilities", test_logic, assertions
142 |         )
143 | 
144 | 
145 | if __name__ == "__main__":
146 |     unittest.main()
147 | 
```

--------------------------------------------------------------------------------
/examples/adk/falcon_agent/agent.py:
--------------------------------------------------------------------------------

```python
  1 | import logging
  2 | import os
  3 | import sys
  4 | from typing import List, Optional, TextIO, Union
  5 | 
  6 | from google.adk.agents import LlmAgent
  7 | from google.adk.agents.callback_context import CallbackContext
  8 | from google.adk.agents.readonly_context import ReadonlyContext
  9 | from google.adk.models import LlmRequest, LlmResponse
 10 | from google.adk.tools.base_tool import BaseTool
 11 | from google.adk.tools.base_toolset import ToolPredicate
 12 | from google.adk.tools.mcp_tool import MCPTool
 13 | from google.adk.tools.mcp_tool.mcp_session_manager import (
 14 |   SseConnectionParams,
 15 |   StdioConnectionParams,
 16 |   StreamableHTTPConnectionParams,
 17 |   retry_on_closed_resource,
 18 | )
 19 | from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
 20 | from mcp import StdioServerParameters
 21 | from mcp.types import ListToolsResult
 22 | 
 23 | tools_cache={}
 24 | 
 25 | def make_tools_compatible(tools):
 26 |   """
 27 |   This function makes the schema compatible with Gemini/Vertex AI API
 28 |   It is only needed when API used is Gemini and model is other than 2.5 models
 29 |   It is however needed for ALL models when API used is VertexAI
 30 |   """
 31 |   for tool in tools:
 32 |     for key in tool._mcp_tool.inputSchema.keys():
 33 |       if key == "properties":
 34 |           for prop_name in tool._mcp_tool.inputSchema["properties"].keys():
 35 |             if "anyOf" in tool._mcp_tool.inputSchema["properties"][prop_name].keys():
 36 |               if (tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"] == "array"):
 37 |                 tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["items"]["type"]
 38 |               else:
 39 |                  tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"]
 40 |               tool._mcp_tool.inputSchema["properties"][prop_name].pop("anyOf")
 41 | 
 42 |   return tools
 43 | 
 44 | 
 45 | class MCPToolSetWithSchemaAccess(MCPToolset):
 46 |   """
 47 |     Added to make the MCP tools schema compatible with Vertext AI API and also older Gemini models.
 48 |     Also introduced a small performance improvement with tools caching.
 49 |   """
 50 | 
 51 |   def __init__(
 52 |       self,
 53 |       *,
 54 |       tool_set_name: str, # <-- new parameter
 55 |       connection_params: Union[
 56 |           StdioServerParameters,
 57 |           StdioConnectionParams,
 58 |           SseConnectionParams,
 59 |           StreamableHTTPConnectionParams,
 60 |       ],
 61 |       tool_filter: Optional[Union[ToolPredicate, List[str]]] = None,
 62 |       errlog: TextIO = sys.stderr,
 63 |   ):
 64 |     super().__init__(
 65 |         connection_params=connection_params,
 66 |         tool_filter=tool_filter,
 67 |         errlog=errlog
 68 |     )
 69 |     self.tool_set_name = tool_set_name
 70 |     logging.info(f"MCPToolSetWithSchemaAccess initialized with tool_set_name: '{self.tool_set_name}'")
 71 |     self._session = None
 72 | 
 73 |   @retry_on_closed_resource
 74 |   async def get_tools(
 75 |       self,
 76 |       readonly_context: Optional[ReadonlyContext] = None,
 77 |   ) -> List[BaseTool]:
 78 |     """Return all tools in the toolset based on the provided context.
 79 | 
 80 |     Args:
 81 |         readonly_context: Context used to filter tools available to the agent.
 82 |             If None, all tools in the toolset are returned.
 83 | 
 84 |     Returns:
 85 |         List[BaseTool]: A list of tools available under the specified context.
 86 |     """
 87 |     # Get session from session manager
 88 |     session = await self._mcp_session_manager.create_session()
 89 | 
 90 |     if self.tool_set_name in tools_cache.keys():
 91 |       logging.info(f"Tools found in cache for toolset {self.tool_set_name}, returning them")
 92 |       return tools_cache[self.tool_set_name]
 93 |     else:
 94 |       logging.info(f"No tools found in cache for toolset {self.tool_set_name}, loading")
 95 | 
 96 |     # Fetch available tools from the MCP server
 97 |     tools_response: ListToolsResult = await session.list_tools()
 98 | 
 99 |     # Apply filtering based on context and tool_filter
100 |     tools = []
101 |     for tool in tools_response.tools:
102 |       mcp_tool = MCPTool(
103 |           mcp_tool=tool,
104 |           mcp_session_manager=self._mcp_session_manager,
105 |           auth_scheme=self._auth_scheme,
106 |           auth_credential=self._auth_credential,
107 |       )
108 | 
109 |       if self._is_tool_selected(mcp_tool, readonly_context):
110 |         tools.append(mcp_tool)
111 | 
112 |     model_version = os.environ.get("GOOGLE_MODEL").split("-")[1]
113 |     if float(model_version) < 2.5 or os.environ.get("GOOGLE_GENAI_USE_VERTEXAI").upper() == "TRUE":
114 |       logging.error(f"Model - {os.environ.get('GOOGLE_MODEL')} needs Gemini compatible tools, updating schema ...")
115 |       tools = make_tools_compatible(tools)
116 |     else:
117 |       logging.info(f"Model - {os.environ.get('GOOGLE_MODEL')} does not need updating schema")
118 | 
119 |     tools_cache[self.tool_set_name] = tools
120 | 
121 |     return tools
122 | 
123 | # Controlling context size to improve Model response time and for cost optimization
124 | # https://github.com/google/adk-python/issues/752#issuecomment-2948152979
125 | def bmc_trim_llm_request(
126 |     callback_context: CallbackContext, llm_request: LlmRequest
127 | ) -> Optional[LlmResponse]:
128 | 
129 |     max_prev_user_interactions = int(os.environ.get("MAX_PREV_USER_INTERACTIONS","-1"))
130 | 
131 |     logging.info(f"Number of contents going to LLM - {len(llm_request.contents)}, MAX_PREV_USER_INTERACTIONS = {max_prev_user_interactions}")
132 | 
133 |     temp_processed_list = []
134 | 
135 |     if max_prev_user_interactions == -1:
136 |         return None
137 |     else:
138 |         user_message_count = 0
139 |         for i in range(len(llm_request.contents) - 1, -1, -1):
140 |             item = llm_request.contents[i]
141 | 
142 |             if item.role == "user" and item.parts[0] and item.parts[0].text and item.parts[0].text != "For context:":
143 |                 logging.info(f"Encountered a user message => {item.parts[0].text}")
144 |                 user_message_count += 1
145 | 
146 |             if user_message_count > max_prev_user_interactions:
147 |                 logging.info(f"Breaking at user_message_count => {user_message_count}")
148 |                 temp_processed_list.append(item)
149 |                 break
150 | 
151 |             temp_processed_list.append(item)
152 | 
153 |         final_list = temp_processed_list[::-1]
154 | 
155 |         if user_message_count < max_prev_user_interactions:
156 |             logging.info("User message count did not reach the allowed limit. List remains unchanged.")
157 |         else:
158 |             logging.info(f"User message count reached {max_prev_user_interactions}. List truncated.")
159 |             llm_request.contents = final_list
160 | 
161 |     return None
162 | 
163 | 
164 | root_agent = LlmAgent(
165 |     model=os.environ.get("GOOGLE_MODEL"),
166 |     name='falcon_agent',
167 |     instruction=os.environ.get("FALCON_AGENT_PROMPT"),
168 |     tools=[
169 |         MCPToolSetWithSchemaAccess(
170 |           tool_set_name="falcon-tools",
171 |             connection_params=StdioConnectionParams(
172 |                     server_params=StdioServerParameters(
173 |                       command='falcon-mcp',
174 |                       env={
175 |                       "FALCON_CLIENT_ID":os.environ.get("FALCON_CLIENT_ID"),
176 |                       "FALCON_CLIENT_SECRET":os.environ.get("FALCON_CLIENT_SECRET"),
177 |                       "FALCON_BASE_URL":os.environ.get("FALCON_BASE_URL"),
178 |                       }
179 |                     )
180 |                     )
181 |             ),
182 |     ],
183 | )
184 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/resources/serverless.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Contains Serverless Vulnerabilities resources.
  3 | """
  4 | 
  5 | from falcon_mcp.common.utils import generate_md_table
  6 | 
  7 | # List of tuples containing filter options data: (name, type, operators, description)
  8 | SERVERLESS_VULNERABILITIES_FQL_FILTERS = [
  9 |     (
 10 |         "Name",
 11 |         "Type",
 12 |         "Operators",
 13 |         "Description"
 14 |     ),
 15 |     (
 16 |         "application_name",
 17 |         "String",
 18 |         "Yes",
 19 |         """
 20 |         Name of the application associated with the serverless function.
 21 | 
 22 |         Ex: application_name:'my-lambda-app'
 23 |         """
 24 |     ),
 25 |     (
 26 |         "application_name_version",
 27 |         "String",
 28 |         "Yes",
 29 |         """
 30 |         Version of the application associated with the serverless function.
 31 | 
 32 |         Ex: application_name_version:'1.0.0'
 33 |         """
 34 |     ),
 35 |     (
 36 |         "cid",
 37 |         "String",
 38 |         "No",
 39 |         """
 40 |         Unique system-generated customer identifier (CID) of the account.
 41 | 
 42 |         Ex: cid:'0123456789ABCDEFGHIJKLMNOPQRSTUV'
 43 |         """
 44 |     ),
 45 |     (
 46 |         "cloud_account_id",
 47 |         "String",
 48 |         "Yes",
 49 |         """
 50 |         Unique identifier of the cloud account where the serverless function is deployed.
 51 | 
 52 |         Ex: cloud_account_id:'123456789012'
 53 |         """
 54 |     ),
 55 |     (
 56 |         "cloud_account_name",
 57 |         "String",
 58 |         "Yes",
 59 |         """
 60 |         Name of the cloud account where the serverless function is deployed.
 61 | 
 62 |         Ex: cloud_account_name:'production-account'
 63 |         """
 64 |     ),
 65 |     (
 66 |         "cloud_provider",
 67 |         "String",
 68 |         "Yes",
 69 |         """
 70 |         Name of the cloud service provider hosting the serverless function.
 71 |         Values: aws, azure, gcp
 72 | 
 73 |         Ex: cloud_provider:'aws'
 74 |         """
 75 |     ),
 76 |     (
 77 |         "cve_id",
 78 |         "String",
 79 |         "Yes",
 80 |         """
 81 |         Unique identifier for a vulnerability as cataloged in the National Vulnerability Database (NVD).
 82 |         Supports multiple values and negation.
 83 | 
 84 |         Ex: cve_id:['CVE-2022-1234']
 85 |         Ex: cve_id:['CVE-2022-1234','CVE-2023-5678']
 86 |         """
 87 |     ),
 88 |     (
 89 |         "cvss_base_score",
 90 |         "Number",
 91 |         "Yes",
 92 |         """
 93 |         Common Vulnerability Scoring System (CVSS) base score of the vulnerability.
 94 | 
 95 |         Ex: cvss_base_score:>7.0
 96 |         Ex: cvss_base_score:<5.0
 97 |         """
 98 |     ),
 99 |     (
100 |         "exprt_rating",
101 |         "String",
102 |         "Yes",
103 |         """
104 |         ExPRT rating assigned by CrowdStrike's predictive AI rating system.
105 |         Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
106 | 
107 |         Ex: exprt_rating:'HIGH'
108 |         Ex: exprt_rating:['HIGH','CRITICAL']
109 |         """
110 |     ),
111 |     (
112 |         "first_seen_timestamp",
113 |         "Timestamp",
114 |         "Yes",
115 |         """
116 |         Date and time when this vulnerability was first detected in the serverless function.
117 | 
118 |         Ex: first_seen_timestamp:>'2023-01-01'
119 |         Ex: first_seen_timestamp:<'2023-12-31'
120 |         """
121 |     ),
122 |     (
123 |         "function_name",
124 |         "String",
125 |         "Yes",
126 |         """
127 |         Name of the serverless function where the vulnerability was detected.
128 | 
129 |         Ex: function_name:'process-payment'
130 |         """
131 |     ),
132 |     (
133 |         "function_resource_id",
134 |         "String",
135 |         "Yes",
136 |         """
137 |         Unique resource identifier of the serverless function.
138 | 
139 |         Ex: function_resource_id:'arn:aws:lambda:us-east-1:123456789012:function:my-function'
140 |         """
141 |     ),
142 |     (
143 |         "is_supported",
144 |         "Boolean",
145 |         "No",
146 |         """
147 |         Indicates if the serverless function is supported for vulnerability scanning.
148 | 
149 |         Ex: is_supported:true
150 |         """
151 |     ),
152 |     (
153 |         "is_valid_asset_id",
154 |         "Boolean",
155 |         "No",
156 |         """
157 |         Indicates if the asset ID associated with the serverless function is valid.
158 | 
159 |         Ex: is_valid_asset_id:true
160 |         """
161 |     ),
162 |     (
163 |         "layer",
164 |         "String",
165 |         "Yes",
166 |         """
167 |         Layer in the serverless function where the vulnerability was detected.
168 | 
169 |         Ex: layer:'runtime'
170 |         Ex: layer:'dependency'
171 |         """
172 |     ),
173 |     (
174 |         "region",
175 |         "String",
176 |         "Yes",
177 |         """
178 |         Cloud region where the serverless function is deployed.
179 | 
180 |         Ex: region:'us-east-1'
181 |         Ex: region:['us-east-1','us-west-2']
182 |         """
183 |     ),
184 |     (
185 |         "runtime",
186 |         "String",
187 |         "Yes",
188 |         """
189 |         Runtime environment of the serverless function.
190 |         Values: nodejs, python, java, ruby, go, dotnet
191 | 
192 |         Ex: runtime:'nodejs'
193 |         Ex: runtime:['python','nodejs']
194 |         """
195 |     ),
196 |     (
197 |         "severity",
198 |         "String",
199 |         "Yes",
200 |         """
201 |         Severity level of the vulnerability.
202 |         Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
203 | 
204 |         Ex: severity:'HIGH'
205 |         Ex: severity:['HIGH','CRITICAL']
206 |         """
207 |     ),
208 |     (
209 |         "timestamp",
210 |         "Timestamp",
211 |         "Yes",
212 |         """
213 |         Date and time when the vulnerability was last updated.
214 | 
215 |         Ex: timestamp:>'2023-06-01'
216 |         Ex: timestamp:<'2023-12-31'
217 |         """
218 |     ),
219 |     (
220 |         "type",
221 |         "String",
222 |         "Yes",
223 |         """
224 |         Type of the vulnerability.
225 |         Values: Vulnerability, Misconfiguration, Unsupported software
226 | 
227 |         Ex: type:'Vulnerability'
228 |         Ex: type:!'Misconfiguration'
229 |         """
230 |     ),
231 | ]
232 | 
233 | SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Serverless Vulnerabilities Guide
234 | 
235 | === BASIC SYNTAX ===
236 | property_name:[operator]'value'
237 | 
238 | === AVAILABLE OPERATORS ===
239 | • No operator = equals (default)
240 | • ! = not equal to
241 | • > = greater than
242 | • >= = greater than or equal
243 | • < = less than
244 | • <= = less than or equal
245 | • ~ = text match (ignores case, spaces, punctuation)
246 | • !~ = does not text match
247 | 
248 | === DATA TYPES & SYNTAX ===
249 | • Strings: 'value' or ['exact_value'] for exact match
250 | • Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
251 | • Booleans: true or false (no quotes)
252 | • Numbers: 123 (no quotes)
253 | 
254 | === COMBINING CONDITIONS ===
255 | • + = AND condition
256 | • , = OR condition
257 | • ( ) = Group expressions
258 | 
259 | === falcon_search_serverless_vulnerabilities FQL filter options ===
260 | 
261 | """ + generate_md_table(SERVERLESS_VULNERABILITIES_FQL_FILTERS) + """
262 | 
263 | === IMPORTANT NOTES ===
264 | • Use single quotes around string values: 'value'
265 | • Use square brackets for exact matches and multiple values: ['value1','value2']
266 | • Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
267 | • For case-insensitive filtering, add .insensitive to field names
268 | • Boolean values: true or false (no quotes)
269 | • Wildcards (*) are unsupported in this API
270 | • Some fields require specific capitalization (check individual field descriptions)
271 | 
272 | === COMMON FILTER EXAMPLES ===
273 | • Filter by cloud provider: cloud_provider:'aws'
274 | • High severity vulnerabilities: severity:'HIGH'
275 | • Recent vulnerabilities: first_seen_timestamp:>'2023-01-01'
276 | • Filter by specific runtime: runtime:'nodejs'
277 | • Filter by region: region:'us-east-1'
278 | • Critical vulnerabilities in a specific account: severity:'CRITICAL'+cloud_account_id:'123456789012'
279 | • Filter by function name: function_name:'payment-processor'
280 | • High CVSS score vulnerabilities: cvss_base_score:>7.0
281 | """
282 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/modules/detections.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Detections module for Falcon MCP Server
  3 | 
  4 | This module provides tools for accessing and analyzing CrowdStrike Falcon detections.
  5 | """
  6 | 
  7 | from textwrap import dedent
  8 | from typing import Any, Dict, List
  9 | 
 10 | from mcp.server import FastMCP
 11 | from mcp.server.fastmcp.resources import TextResource
 12 | from pydantic import AnyUrl, Field
 13 | 
 14 | from falcon_mcp.common.errors import handle_api_response
 15 | from falcon_mcp.common.logging import get_logger
 16 | from falcon_mcp.common.utils import prepare_api_parameters
 17 | from falcon_mcp.modules.base import BaseModule
 18 | from falcon_mcp.resources.detections import SEARCH_DETECTIONS_FQL_DOCUMENTATION
 19 | 
 20 | logger = get_logger(__name__)
 21 | 
 22 | 
 23 | class DetectionsModule(BaseModule):
 24 |     """Module for accessing and analyzing CrowdStrike Falcon detections."""
 25 | 
 26 |     def register_tools(self, server: FastMCP) -> None:
 27 |         """Register tools with the MCP server.
 28 | 
 29 |         Args:
 30 |             server: MCP server instance
 31 |         """
 32 |         # Register tools
 33 |         self._add_tool(
 34 |             server=server,
 35 |             method=self.search_detections,
 36 |             name="search_detections",
 37 |         )
 38 | 
 39 |         self._add_tool(
 40 |             server=server,
 41 |             method=self.get_detection_details,
 42 |             name="get_detection_details",
 43 |         )
 44 | 
 45 |     def register_resources(self, server: FastMCP) -> None:
 46 |         """Register resources with the MCP server.
 47 | 
 48 |         Args:
 49 |             server: MCP server instance
 50 |         """
 51 |         search_detections_fql_resource = TextResource(
 52 |             uri=AnyUrl("falcon://detections/search/fql-guide"),
 53 |             name="falcon_search_detections_fql_guide",
 54 |             description="Contains the guide for the `filter` param of the `falcon_search_detections` tool.",
 55 |             text=SEARCH_DETECTIONS_FQL_DOCUMENTATION,
 56 |         )
 57 | 
 58 |         self._add_resource(
 59 |             server,
 60 |             search_detections_fql_resource,
 61 |         )
 62 | 
 63 |     def search_detections(
 64 |         self,
 65 |         filter: str | None = Field(
 66 |             default=None,
 67 |             description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://detections/search/fql-guide` resource when building this filter parameter.",
 68 |             examples={"agent_id:'77d11725xxxxxxxxxxxxxxxxxxxxc48ca19'", "status:'new'"},
 69 |         ),
 70 |         limit: int = Field(
 71 |             default=10,
 72 |             ge=1,
 73 |             le=9999,
 74 |             description="The maximum number of detections to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.",
 75 |         ),
 76 |         offset: int | None = Field(
 77 |             default=None,
 78 |             description="The first detection to return, where 0 is the latest detection. Use with the offset parameter to manage pagination of results.",
 79 |         ),
 80 |         q: str | None = Field(
 81 |             default=None,
 82 |             description="Search all detection metadata for the provided string",
 83 |         ),
 84 |         sort: str | None = Field(
 85 |             default=None,
 86 |             description=dedent("""
 87 |                 Sort detections using these options:
 88 | 
 89 |                 timestamp: Timestamp when the detection occurred
 90 |                 created_timestamp: When the detection was created
 91 |                 updated_timestamp: When the detection was last modified
 92 |                 severity: Severity level of the detection (1-100, recommended when filtering by severity)
 93 |                 confidence: Confidence level of the detection (1-100)
 94 |                 agent_id: Agent ID associated with the detection
 95 | 
 96 |                 Sort either asc (ascending) or desc (descending).
 97 |                 Both formats are supported: 'severity.desc' or 'severity|desc'
 98 | 
 99 |                 When searching for high severity detections, use 'severity.desc' to get the highest severity detections first.
100 |                 For chronological ordering, use 'timestamp.desc' for most recent detections first.
101 | 
102 |                 Examples: 'severity.desc', 'timestamp.desc'
103 |             """).strip(),
104 |             examples={"severity.desc", "timestamp.desc"},
105 |         ),
106 |         include_hidden: bool = Field(default=True),
107 |     ) -> List[Dict[str, Any]]:
108 |         """Find and analyze detections to understand malicious activity in your environment.
109 | 
110 |         IMPORTANT: You must use the `falcon://detections/search/fql-guide` resource when you need to use the `filter` parameter.
111 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_detections` tool.
112 |         """
113 |         # Prepare parameters
114 |         params = prepare_api_parameters(
115 |             {
116 |                 "filter": filter,
117 |                 "limit": limit,
118 |                 "offset": offset,
119 |                 "q": q,
120 |                 "sort": sort,
121 |             }
122 |         )
123 | 
124 |         # Define the operation name
125 |         operation = "GetQueriesAlertsV2"
126 | 
127 |         logger.debug("Searching detections with params: %s", params)
128 | 
129 |         # Make the API request
130 |         response = self.client.command(operation, parameters=params)
131 | 
132 |         # Use handle_api_response to get detection IDs (now composite_ids)
133 |         detection_ids = handle_api_response(
134 |             response,
135 |             operation=operation,
136 |             error_message="Failed to search detections",
137 |             default_result=[],
138 |         )
139 | 
140 |         # If handle_api_response returns an error dict instead of a list,
141 |         # it means there was an error, so we return it wrapped in a list
142 |         if self._is_error(detection_ids):
143 |             return [detection_ids]
144 | 
145 |         # If we have detection IDs, get the details for each one
146 |         if detection_ids:
147 |             # Use the enhanced base method with composite_ids and include_hidden
148 |             details = self._base_get_by_ids(
149 |                 operation="PostEntitiesAlertsV2",
150 |                 ids=detection_ids,
151 |                 id_key="composite_ids",
152 |                 include_hidden=include_hidden,
153 |             )
154 | 
155 |             # If handle_api_response returns an error dict instead of a list,
156 |             # it means there was an error, so we return it wrapped in a list
157 |             if self._is_error(details):
158 |                 return [details]
159 | 
160 |             return details
161 | 
162 |         return []
163 | 
164 |     def get_detection_details(
165 |         self,
166 |         ids: List[str] = Field(
167 |             description="Composite ID(s) to retrieve detection details for.",
168 |         ),
169 |         include_hidden: bool = Field(
170 |             default=True,
171 |             description="Whether to include hidden detections (default: True). When True, shows all detections including previously hidden ones for comprehensive visibility.",
172 |         ),
173 |     ) -> List[Dict[str, Any]] | Dict[str, Any]:
174 |         """Get detection details for specific detection IDs to understand security threats.
175 | 
176 |         Use this when you already have specific detection IDs and need their full details.
177 |         For searching/discovering detections, use the `falcon_search_detections` tool instead.
178 |         """
179 |         logger.debug("Getting detection details for ID(s): %s", ids)
180 | 
181 |         # Use the enhanced base method - composite_ids parameter matches ids for backward compatibility
182 |         return self._base_get_by_ids(
183 |             operation="PostEntitiesAlertsV2",
184 |             ids=ids,
185 |             id_key="composite_ids",
186 |             include_hidden=include_hidden,
187 |         )
188 | 
```

--------------------------------------------------------------------------------
/docs/deployment/amazon_bedrock_agentcore.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Deploying to Amazon Bedrock AgentCore
  2 | 
  3 | This guide walks you through deploying the Falcon MCP Server to Amazon Bedrock AgentCore. You'll configure the necessary AWS resources, set up IAM permissions, and prepare your environment.
  4 | 
  5 | ## Prerequisites
  6 | 
  7 | Before deploying to Amazon Bedrock AgentCore, ensure you have your CrowdStrike API credentials and AWS environment properly configured.
  8 | 
  9 | ### CrowdStrike API Credentials
 10 | 
 11 | You'll need to create API credentials in the CrowdStrike platform with the appropriate scopes for your intended use case.
 12 | 
 13 | 1. **Create API Key**: Generate an API key in the CrowdStrike platform with the necessary scopes as outlined in [Available Modules, Tools & Resources](https://github.com/CrowdStrike/falcon-mcp/tree/main?tab=readme-ov-file#available-modules-tools--resources)
 14 | 
 15 | 2. **Prepare Environment Variables**: You'll configure these values during agent deployment:
 16 |    - `FALCON_CLIENT_ID` - Your CrowdStrike API client ID
 17 |    - `FALCON_CLIENT_SECRET` - Your CrowdStrike API client secret
 18 |    - `FALCON_BASE_URL` - Your CrowdStrike API base URL (region-specific)
 19 | 
 20 | ### AWS VPC Requirements
 21 | 
 22 | The MCP Server requires internet connectivity to communicate with CrowdStrike's APIs. We recommend deploying in an existing VPC used for your agentic tools.
 23 | 
 24 | **Required VPC Configuration:**
 25 | 
 26 | - **Internet Gateway or NAT Gateway** - Enables outbound internet connectivity
 27 | - **Outbound HTTPS Access** - Allow communication to `api.crowdstrike.com` on port 443
 28 | - **Security Groups** - Configure appropriate security group rules for your network requirements
 29 | 
 30 | ## IAM Configuration
 31 | 
 32 | The MCP server requires specific IAM permissions to function within the Amazon Bedrock AgentCore environment. You'll create an execution role with the necessary policies and trust relationships.
 33 | 
 34 | > [!IMPORTANT]
 35 | > Replace all placeholder values with your specific environment details:
 36 | >
 37 | > - `{{region}}` - Your AWS region (e.g., `us-east-1`)
 38 | > - `{{accountId}}` - Your AWS account ID
 39 | > - `{{agentName}}` - Your agent name with no spaces or special characters (e.g., `falcon`). You'll need to decide the agent name **before** creating the role and AgentCore Runtime.
 40 | 
 41 | ### Step 1: Create the IAM Execution Role
 42 | 
 43 | Create an IAM role with the following policy that grants the necessary permissions for container access, logging, monitoring, and Bedrock operations:
 44 | 
 45 | ```json
 46 | {
 47 |   "Version": "2012-10-17",
 48 |   "Statement": [
 49 |     {
 50 |       "Sid": "ECRImageAccess",
 51 |       "Effect": "Allow",
 52 |       "Action": [
 53 |         "ecr:BatchGetImage",
 54 |         "ecr:GetDownloadUrlForLayer"
 55 |       ],
 56 |       "Resource": [
 57 |         "arn:aws:ecr:us-east-1:709825985650:repository/crowdstrike/falcon-mcp"
 58 |       ]
 59 |     },
 60 |     {
 61 |       "Effect": "Allow",
 62 |       "Action": [
 63 |         "logs:DescribeLogStreams",
 64 |         "logs:CreateLogGroup"
 65 |       ],
 66 |       "Resource": [
 67 |         "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*"
 68 |       ]
 69 |     },
 70 |     {
 71 |       "Effect": "Allow",
 72 |       "Action": [
 73 |         "logs:DescribeLogGroups"
 74 |       ],
 75 |       "Resource": [
 76 |         "arn:aws:logs:{{region}}:{{accountId}}:log-group:*"
 77 |       ]
 78 |     },
 79 |     {
 80 |       "Effect": "Allow",
 81 |       "Action": [
 82 |         "logs:CreateLogStream",
 83 |         "logs:PutLogEvents"
 84 |       ],
 85 |       "Resource": [
 86 |         "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*:log-stream:*"
 87 |       ]
 88 |     },
 89 |     {
 90 |       "Sid": "ECRTokenAccess",
 91 |       "Effect": "Allow",
 92 |       "Action": [
 93 |         "ecr:GetAuthorizationToken"
 94 |       ],
 95 |       "Resource": "*"
 96 |     },
 97 |     {
 98 |       "Effect": "Allow",
 99 |       "Action": [
100 |         "xray:PutTraceSegments",
101 |         "xray:PutTelemetryRecords",
102 |         "xray:GetSamplingRules",
103 |         "xray:GetSamplingTargets"
104 |       ],
105 |       "Resource": [
106 |         "*"
107 |       ]
108 |     },
109 |     {
110 |       "Effect": "Allow",
111 |       "Resource": "*",
112 |       "Action": "cloudwatch:PutMetricData",
113 |       "Condition": {
114 |         "StringEquals": {
115 |           "cloudwatch:namespace": "bedrock-agentcore"
116 |         }
117 |       }
118 |     },
119 |     {
120 |       "Sid": "GetAgentAccessToken",
121 |       "Effect": "Allow",
122 |       "Action": [
123 |         "bedrock-agentcore:GetWorkloadAccessToken",
124 |         "bedrock-agentcore:GetWorkloadAccessTokenForJWT",
125 |         "bedrock-agentcore:GetWorkloadAccessTokenForUserId"
126 |       ],
127 |       "Resource": [
128 |         "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default",
129 |         "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default/workload-identity/{{agentName}}-*"
130 |       ]
131 |     },
132 |     {
133 |       "Sid": "BedrockModelInvocation",
134 |       "Effect": "Allow",
135 |       "Action": [
136 |         "bedrock:InvokeModel",
137 |         "bedrock:InvokeModelWithResponseStream"
138 |       ],
139 |       "Resource": [
140 |         "arn:aws:bedrock:*::foundation-model/*",
141 |         "arn:aws:bedrock:{{region}}:{{accountId}}:*"
142 |       ]
143 |     }
144 |   ]
145 | }
146 | ```
147 | 
148 | > [!NOTE]
149 | > Save the ARN of the IAM role - you'll need it for the deployment of the Amazon Bedrock AgentCore agent.
150 | 
151 | ### Step 2: Create the IAM Trust Policy
152 | 
153 | Create a trust policy that allows the Bedrock AgentCore service to assume the execution role:
154 | 
155 | ```json
156 | {
157 |   "Version": "2012-10-17",
158 |   "Statement": [
159 |     {
160 |       "Sid": "AssumeRolePolicy",
161 |       "Effect": "Allow",
162 |       "Principal": {
163 |         "Service": "bedrock-agentcore.amazonaws.com"
164 |       },
165 |       "Action": "sts:AssumeRole",
166 |       "Condition": {
167 |         "StringEquals": {
168 |           "aws:SourceAccount": "{{accountId}}"
169 |         },
170 |         "ArnLike": {
171 |           "aws:SourceArn": "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:*"
172 |         }
173 |       }
174 |     }
175 |   ]
176 | }
177 | ```
178 | 
179 | ### Step 3: Associate the Trust Policy
180 | 
181 | Attach the trust policy to the IAM execution role you created in Step 1. This completes the IAM configuration required for the MCP server to operate within Amazon Bedrock AgentCore.
182 | 
183 | ## Next Steps
184 | 
185 | ### Important Variables
186 | 
187 | To host this agent in Amazon Bedrock AgentCore, the following variables will need to be known:
188 | 
189 | | Variable | Description |
190 | | :--- | :--- |
191 | | `FALCON_CLIENT_ID` | The Client ID for your Falcon API credentials |
192 | | `FALCON_CLIENT_SECRET` | The Client Secret for your Falcon API credentials |
193 | | `FALCON_BASE_URL` | The base URL for your Falcon API environment |
194 | | `AGENT_NAME` | The name of the agent (_ex: falconmcp_) |
195 | | `AGENT_DESCRIPTION` | A description of the agent |
196 | | `AGENT_ROLE_ARN` | The ARN of the IAM execution role created in Step 1 |
197 | 
198 | With your IAM configuration complete and variables prepared, you can now return to the **AWS Marketplace listing** to complete the deployment of your Falcon MCP Server agent in Amazon Bedrock AgentCore.
199 | 
200 | #### Example Deployment
201 | 
202 | ```bash
203 | aws bedrock-agentcore-control create-agent-runtime \
204 |   --region us-east-1 \
205 |   --agent-runtime-name "falconmcp" \
206 |   --description "Falcon MCP Server Agent" \
207 |   --agent-runtime-artifact '{
208 |     "containerConfiguration": {
209 |       "containerUri": "709825985650.dkr.ecr.us-east-1.amazonaws.com/crowdstrike/falcon-mcp:0.1.1"
210 |     }
211 |   }' \
212 |   --role-arn "arn:aws:iam::example:role/bedrock-core-falcon-role" \
213 |   --network-configuration '{
214 |     "networkMode": "PUBLIC"
215 |   }' \
216 |   --protocol-configuration '{
217 |     "serverProtocol": "MCP"
218 |   }' \
219 |   --environment-variables '{
220 |     "FALCON_CLIENT_ID": "FALCON_CLIENT_ID_VALUE",
221 |     "FALCON_CLIENT_SECRET": "FALCON_CLIENT_SECRET_VALUE",
222 |     "FALCON_BASE_URL": "https://api.crowdstrike.com"
223 |   }'
224 | ```
225 | 
```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_intel.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | E2E tests for the Intel module.
  3 | """
  4 | 
  5 | import unittest
  6 | 
  7 | import pytest
  8 | 
  9 | from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict
 10 | 
 11 | 
 12 | @pytest.mark.e2e
 13 | class TestIntelModuleE2E(BaseE2ETest):
 14 |     """
 15 |     End-to-end test suite for the Falcon MCP Server Intel Module.
 16 |     """
 17 | 
 18 |     def test_search_actors_with_filter(self):
 19 |         """Verify the agent can search for actors with a filter."""
 20 | 
 21 |         async def test_logic():
 22 |             fixtures = [
 23 |                 {
 24 |                     "operation": "QueryIntelActorEntities",
 25 |                     "validator": lambda kwargs: "animal_classifier:'BEAR'"
 26 |                     in kwargs.get("parameters", {}).get("filter", ""),
 27 |                     "response": {
 28 |                         "status_code": 200,
 29 |                         "body": {
 30 |                             "resources": [
 31 |                                 {
 32 |                                     "id": "actor-1",
 33 |                                     "animal_classifier": "BEAR",
 34 |                                     "short_description": "Actor ELDERLY BEAR",
 35 |                                 },
 36 |                                 {
 37 |                                     "id": "actor-2",
 38 |                                     "animal_classifier": "BEAR",
 39 |                                     "short_description": "Actor CONSTANT BEAR",
 40 |                                 },
 41 |                             ]
 42 |                         },
 43 |                     },
 44 |                 }
 45 |             ]
 46 | 
 47 |             self._mock_api_instance.command.side_effect = (
 48 |                 self._create_mock_api_side_effect(fixtures)
 49 |             )
 50 | 
 51 |             prompt = "Find all threat actors with animal_classifier BEAR"
 52 |             return await self._run_agent_stream(prompt)
 53 | 
 54 |         def assertions(tools, result):
 55 |             self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
 56 |             used_tool = tools[len(tools) - 1]
 57 |             self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_actors")
 58 | 
 59 |             # Verify the tool input contains the filter
 60 |             tool_input = ensure_dict(used_tool["input"]["tool_input"])
 61 |             self.assertIn("animal_classifier", tool_input.get("filter", ""))
 62 | 
 63 |             # Verify API call parameters
 64 |             self.assertGreaterEqual(
 65 |                 self._mock_api_instance.command.call_count,
 66 |                 1,
 67 |                 "Expected at least 1 API call",
 68 |             )
 69 |             api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
 70 |                 "parameters", {}
 71 |             )
 72 |             self.assertIn("animal_classifier:'BEAR'", api_call_params.get("filter", ""))
 73 | 
 74 |             # Verify result contains actor information
 75 |             self.assertIn("BEAR", result)
 76 |             self.assertIn("ELDERLY BEAR", result)
 77 |             self.assertIn("Actor CONSTANT BEAR", result)
 78 | 
 79 |         self.run_test_with_retries(
 80 |             "test_search_actors_with_filter", test_logic, assertions
 81 |         )
 82 | 
 83 |     def test_search_indicators_with_filter(self):
 84 |         """Verify the agent can search for indicators with a filter."""
 85 | 
 86 |         async def test_logic():
 87 |             fixtures = [
 88 |                 {
 89 |                     "operation": "QueryIntelIndicatorEntities",
 90 |                     "validator": lambda kwargs: "type:'hash_sha256'"
 91 |                     in kwargs.get("parameters", {}).get("filter", ""),
 92 |                     "response": {
 93 |                         "status_code": 200,
 94 |                         "body": {
 95 |                             "resources": [
 96 |                                 {"id": "indicator-1", "type": "hash_sha256"},
 97 |                                 {"id": "indicator-2", "type": "hash_sha256"},
 98 |                             ]
 99 |                         },
100 |                     },
101 |                 }
102 |             ]
103 | 
104 |             self._mock_api_instance.command.side_effect = (
105 |                 self._create_mock_api_side_effect(fixtures)
106 |             )
107 | 
108 |             prompt = "Find all indicators of type hash_sha256"
109 |             return await self._run_agent_stream(prompt)
110 | 
111 |         def assertions(tools, result):
112 |             self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
113 |             used_tool = tools[len(tools) - 1]
114 |             self.assertEqual(
115 |                 used_tool["input"]["tool_name"], "falcon_search_indicators"
116 |             )
117 | 
118 |             # Verify the tool input contains the filter
119 |             tool_input = ensure_dict(used_tool["input"]["tool_input"])
120 |             self.assertIn("hash_sha256", tool_input.get("filter", ""))
121 | 
122 |             # Verify API call parameters
123 |             self.assertGreaterEqual(
124 |                 self._mock_api_instance.command.call_count,
125 |                 1,
126 |                 "Expected at least 1 API call",
127 |             )
128 |             api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
129 |                 "parameters", {}
130 |             )
131 |             self.assertIn("type:'hash_sha256'", api_call_params.get("filter", ""))
132 | 
133 |             # Verify result contains indicator information
134 |             self.assertIn("indicator-1", result)
135 |             self.assertIn("indicator-2", result)
136 |             self.assertIn("hash_sha256", result)
137 | 
138 |         self.run_test_with_retries(
139 |             "test_search_indicators_with_filter", test_logic, assertions
140 |         )
141 | 
142 |     def test_search_reports_with_filter(self):
143 |         """Verify the agent can search for reports with a filter."""
144 | 
145 |         async def test_logic():
146 |             fixtures = [
147 |                 {
148 |                     "operation": "QueryIntelReportEntities",
149 |                     "validator": lambda kwargs: "slug:'malware-analysis-report-1'"
150 |                     in kwargs.get("parameters", {}).get("filter", ""),
151 |                     "response": {
152 |                         "status_code": 200,
153 |                         "body": {
154 |                             "resources": [
155 |                                 {
156 |                                     "id": "report-1",
157 |                                     "name": "Malware Analysis Report 1",
158 |                                     "slug": "malware-analysis-report-1",
159 |                                 },
160 |                             ]
161 |                         },
162 |                     },
163 |                 }
164 |             ]
165 | 
166 |             self._mock_api_instance.command.side_effect = (
167 |                 self._create_mock_api_side_effect(fixtures)
168 |             )
169 | 
170 |             prompt = "Find report with slug malware-analysis-report-1"
171 |             return await self._run_agent_stream(prompt)
172 | 
173 |         def assertions(tools, result):
174 |             self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
175 |             used_tool = tools[len(tools) - 1]
176 |             self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_reports")
177 | 
178 |             # Verify the tool input contains the filter
179 |             tool_input = ensure_dict(used_tool["input"]["tool_input"])
180 |             self.assertIn("slug", tool_input.get("filter", ""))
181 | 
182 |             # Verify API call parameters
183 |             self.assertGreaterEqual(
184 |                 self._mock_api_instance.command.call_count,
185 |                 1,
186 |                 "Expected at least 1 API call",
187 |             )
188 |             api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
189 |                 "parameters", {}
190 |             )
191 |             self.assertIn(
192 |                 "slug:'malware-analysis-report-1'", api_call_params.get("filter", "")
193 |             )
194 | 
195 |             # Verify result contains report information
196 |             self.assertIn("Malware Analysis Report 1", result)
197 | 
198 |         self.run_test_with_retries(
199 |             "test_search_reports_with_filter", test_logic, assertions
200 |         )
201 | 
202 | 
203 | if __name__ == "__main__":
204 |     unittest.main()
205 | 
```

--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Falcon MCP server.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | from falcon_mcp import registry
  9 | from falcon_mcp.server import FalconMCPServer
 10 | 
 11 | 
 12 | class TestFalconMCPServer(unittest.TestCase):
 13 |     """Test cases for the Falcon MCP server."""
 14 | 
 15 |     def setUp(self):
 16 |         """Set up test fixtures before each test method."""
 17 |         # Ensure modules are discovered before each test
 18 |         registry.discover_modules()
 19 | 
 20 |     @patch("falcon_mcp.server.FalconClient")
 21 |     @patch("falcon_mcp.server.FastMCP")
 22 |     def test_server_initialization(self, mock_fastmcp, mock_client):
 23 |         """Test server initialization with default settings."""
 24 |         # Setup mocks
 25 |         mock_client_instance = MagicMock()
 26 |         mock_client_instance.authenticate.return_value = True
 27 |         mock_client.return_value = mock_client_instance
 28 | 
 29 |         mock_server_instance = MagicMock()
 30 |         mock_fastmcp.return_value = mock_server_instance
 31 | 
 32 |         # Create server
 33 |         server = FalconMCPServer(
 34 |             base_url="https://api.test.crowdstrike.com",
 35 |             debug=True,
 36 |         )
 37 | 
 38 |         # Verify client initialization with direct parameters
 39 |         mock_client.assert_called_once()
 40 |         # Extract the arguments
 41 |         call_args = mock_client.call_args[1]
 42 |         self.assertEqual(call_args["base_url"], "https://api.test.crowdstrike.com")
 43 |         self.assertTrue(call_args["debug"])
 44 | 
 45 |         # Verify authentication
 46 |         mock_client_instance.authenticate.assert_called_once()
 47 | 
 48 |         # Verify server initialization
 49 |         mock_fastmcp.assert_called_once_with(
 50 |             name="Falcon MCP Server",
 51 |             instructions="This server provides access to CrowdStrike Falcon capabilities.",
 52 |             debug=True,
 53 |             log_level="DEBUG",
 54 |         )
 55 | 
 56 |         # Verify modules initialization
 57 |         available_module_names = registry.get_module_names()
 58 |         self.assertEqual(len(server.modules), len(available_module_names))
 59 |         for module_name in available_module_names:
 60 |             self.assertIn(module_name, server.modules)
 61 | 
 62 |     @patch("falcon_mcp.server.FalconClient")
 63 |     @patch("falcon_mcp.server.FastMCP")
 64 |     def test_server_with_specific_modules(self, mock_fastmcp, mock_client):
 65 |         """Test server initialization with specific modules."""
 66 |         # Setup mocks
 67 |         mock_client_instance = MagicMock()
 68 |         mock_client_instance.authenticate.return_value = True
 69 |         mock_client.return_value = mock_client_instance
 70 | 
 71 |         mock_server_instance = MagicMock()
 72 |         mock_fastmcp.return_value = mock_server_instance
 73 | 
 74 |         # Create server with only the detections module
 75 |         server = FalconMCPServer(enabled_modules={"detections"})
 76 | 
 77 |         # Verify modules initialization
 78 |         self.assertEqual(len(server.modules), 1)
 79 |         self.assertIn("detections", server.modules)
 80 | 
 81 |     @patch("falcon_mcp.server.FalconClient")
 82 |     def test_authentication_failure(self, mock_client):
 83 |         """Test server initialization with authentication failure."""
 84 |         # Setup mock
 85 |         mock_client_instance = MagicMock()
 86 |         mock_client_instance.authenticate.return_value = False
 87 |         mock_client.return_value = mock_client_instance
 88 | 
 89 |         # Verify authentication failure raises RuntimeError
 90 |         with self.assertRaises(RuntimeError):
 91 |             FalconMCPServer()
 92 | 
 93 |     @patch("falcon_mcp.server.FalconClient")
 94 |     def test_falcon_check_connectivity(self, mock_client):
 95 |         """Test checking Falcon API connectivity."""
 96 |         # Setup mock
 97 |         mock_client_instance = MagicMock()
 98 |         mock_client_instance.is_authenticated.return_value = True
 99 |         mock_client.return_value = mock_client_instance
100 |         mock_client_instance.authenticate.return_value = True
101 | 
102 |         # Create server with mock client
103 |         server = FalconMCPServer()
104 | 
105 |         # Call falcon_check_connectivity
106 |         result = server.falcon_check_connectivity()
107 | 
108 |         # Verify client method was called
109 |         mock_client_instance.is_authenticated.assert_called_once()
110 | 
111 |         # Verify result
112 |         expected_result = {"connected": True}
113 |         self.assertEqual(result, expected_result)
114 | 
115 |     @patch("falcon_mcp.server.FalconClient")
116 |     def test_list_enabled_modules(self, mock_client):
117 |         """Test listing enabled modules."""
118 |         # Setup mock
119 |         mock_client_instance = MagicMock()
120 |         mock_client_instance.authenticate.return_value = True
121 |         mock_client.return_value = mock_client_instance
122 | 
123 |         # Create server
124 |         server = FalconMCPServer()
125 | 
126 |         # Call list_enabled_modules
127 |         result = server.list_enabled_modules()
128 | 
129 |         # Get the actual module names from the registry
130 |         expected_modules = registry.get_module_names()
131 | 
132 |         # Verify result matches registry (since all modules are enabled by default)
133 |         self.assertEqual(set(result["modules"]), set(expected_modules))
134 | 
135 |     @patch("falcon_mcp.server.FalconClient")
136 |     def test_list_enabled_modules_with_limited_modules(self, mock_client):
137 |         """Test listing enabled modules with limited module set."""
138 |         # Setup mock
139 |         mock_client_instance = MagicMock()
140 |         mock_client_instance.authenticate.return_value = True
141 |         mock_client.return_value = mock_client_instance
142 | 
143 |         # Create server with only specific modules
144 |         server = FalconMCPServer(enabled_modules={"detections", "cloud"})
145 | 
146 |         # Call list_enabled_modules
147 |         result = server.list_enabled_modules()
148 | 
149 |         # Should only return enabled modules
150 |         self.assertEqual(set(result["modules"]), {"detections", "cloud"})
151 | 
152 |         # Verify return type is correct
153 |         self.assertIsInstance(result["modules"], list)
154 | 
155 |         # Verify each module name is a string
156 |         for module_name in result["modules"]:
157 |             self.assertIsInstance(module_name, str)
158 | 
159 |     @patch("falcon_mcp.server.FalconClient")
160 |     def test_list_modules(self, mock_client):
161 |         """Test listing all available modules."""
162 |         # Setup mock
163 |         mock_client_instance = MagicMock()
164 |         mock_client_instance.authenticate.return_value = True
165 |         mock_client.return_value = mock_client_instance
166 | 
167 |         # Create server with limited modules
168 |         server = FalconMCPServer(enabled_modules={"detections", "cloud"})
169 | 
170 |         # Call list_modules
171 |         result = server.list_modules()
172 | 
173 |         # Should return ALL modules from registry regardless of what's enabled
174 |         expected_modules = registry.get_module_names()
175 |         self.assertEqual(set(result["modules"]), set(expected_modules))
176 | 
177 |         # Verify return type is correct
178 |         self.assertIsInstance(result["modules"], list)
179 | 
180 |         # Verify each module name is a string
181 |         for module_name in result["modules"]:
182 |             self.assertIsInstance(module_name, str)
183 | 
184 |     @patch("falcon_mcp.server.FalconClient")
185 |     def test_list_modules_consistency(self, mock_client):
186 |         """Test that list_modules always returns the same result."""
187 |         # Setup mock
188 |         mock_client_instance = MagicMock()
189 |         mock_client_instance.authenticate.return_value = True
190 |         mock_client.return_value = mock_client_instance
191 | 
192 |         # Create two servers with different enabled modules
193 |         server1 = FalconMCPServer(enabled_modules={"detections"})
194 |         server2 = FalconMCPServer(enabled_modules={"cloud", "intel"})
195 | 
196 |         # Both should return the same available modules
197 |         result1 = server1.list_modules()
198 |         result2 = server2.list_modules()
199 | 
200 |         self.assertEqual(set(result1["modules"]), set(result2["modules"]))
201 | 
202 |         # And both should match the registry
203 |         expected_modules = registry.get_module_names()
204 |         self.assertEqual(set(result1["modules"]), set(expected_modules))
205 | 
206 | 
207 | if __name__ == "__main__":
208 |     unittest.main()
209 | 
```

--------------------------------------------------------------------------------
/tests/modules/test_detections.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Detections module.
  3 | """
  4 | 
  5 | import unittest
  6 | 
  7 | from falcon_mcp.modules.detections import DetectionsModule
  8 | from tests.modules.utils.test_modules import TestModules
  9 | 
 10 | 
 11 | class TestDetectionsModule(TestModules):
 12 |     """Test cases for the Detections module."""
 13 | 
 14 |     def setUp(self):
 15 |         """Set up test fixtures."""
 16 |         self.setup_module(DetectionsModule)
 17 | 
 18 |     def test_register_tools(self):
 19 |         """Test registering tools with the server."""
 20 |         expected_tools = [
 21 |             "falcon_search_detections",
 22 |             "falcon_get_detection_details",
 23 |         ]
 24 |         self.assert_tools_registered(expected_tools)
 25 | 
 26 |     def test_register_resources(self):
 27 |         """Test registering resources with the server."""
 28 |         expected_resources = [
 29 |             "falcon_search_detections_fql_guide",
 30 |         ]
 31 |         self.assert_resources_registered(expected_resources)
 32 | 
 33 |     def test_search_detections(self):
 34 |         """Test searching for detections."""
 35 |         # Setup mock responses for both API calls
 36 |         query_response = {
 37 |             "status_code": 200,
 38 |             "body": {"resources": ["detection1", "detection2"]},
 39 |         }
 40 |         details_response = {
 41 |             "status_code": 200,
 42 |             "body": {"resources": []},  # Empty resources for PostEntitiesAlertsV2
 43 |         }
 44 |         self.mock_client.command.side_effect = [query_response, details_response]
 45 | 
 46 |         # Call search_detections
 47 |         result = self.module.search_detections(
 48 |             filter="test query", limit=10, include_hidden=True
 49 |         )
 50 | 
 51 |         # Verify client commands were called correctly
 52 |         self.assertEqual(self.mock_client.command.call_count, 2)
 53 | 
 54 |         # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
 55 |         first_call = self.mock_client.command.call_args_list[0]
 56 |         self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
 57 |         self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
 58 |         self.assertEqual(first_call[1]["parameters"]["limit"], 10)
 59 |         self.mock_client.command.assert_any_call(
 60 |             "PostEntitiesAlertsV2",
 61 |             body={
 62 |                 "composite_ids": ["detection1", "detection2"],
 63 |                 "include_hidden": True,
 64 |             },
 65 |         )
 66 | 
 67 |         # Verify result
 68 |         self.assertEqual(
 69 |             result, []
 70 |         )  # Empty list because PostEntitiesAlertsV2 returned empty resources
 71 | 
 72 |     def test_search_detections_with_details(self):
 73 |         """Test searching for detections with details."""
 74 |         # Setup mock responses
 75 |         query_response = {
 76 |             "status_code": 200,
 77 |             "body": {"resources": ["detection1", "detection2"]},
 78 |         }
 79 |         details_response = {
 80 |             "status_code": 200,
 81 |             "body": {
 82 |                 "resources": [
 83 |                     {"id": "detection1", "name": "Test Detection 1"},
 84 |                     {"id": "detection2", "name": "Test Detection 2"},
 85 |                 ]
 86 |             },
 87 |         }
 88 |         self.mock_client.command.side_effect = [query_response, details_response]
 89 | 
 90 |         # Call search_detections
 91 |         result = self.module.search_detections(
 92 |             filter="test query", limit=10, include_hidden=True
 93 |         )
 94 | 
 95 |         # Verify client commands were called correctly
 96 |         self.assertEqual(self.mock_client.command.call_count, 2)
 97 | 
 98 |         # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
 99 |         first_call = self.mock_client.command.call_args_list[0]
100 |         self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
101 |         self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
102 |         self.assertEqual(first_call[1]["parameters"]["limit"], 10)
103 |         self.mock_client.command.assert_any_call(
104 |             "PostEntitiesAlertsV2",
105 |             body={
106 |                 "composite_ids": ["detection1", "detection2"],
107 |                 "include_hidden": True,
108 |             },
109 |         )
110 | 
111 |         # Verify result
112 |         expected_result = [
113 |             {"id": "detection1", "name": "Test Detection 1"},
114 |             {"id": "detection2", "name": "Test Detection 2"},
115 |         ]
116 |         self.assertEqual(result, expected_result)
117 | 
118 |     def test_search_detections_error(self):
119 |         """Test searching for detections with API error."""
120 |         # Setup mock response with error
121 |         mock_response = {
122 |             "status_code": 400,
123 |             "body": {"errors": [{"message": "Invalid query"}]},
124 |         }
125 |         self.mock_client.command.return_value = mock_response
126 | 
127 |         # Call search_detections
128 |         result = self.module.search_detections(filter="invalid query")
129 | 
130 |         # Verify result contains error
131 |         self.assertEqual(len(result), 1)
132 |         self.assertIn("error", result[0])
133 |         self.assertIn("details", result[0])
134 | 
135 |     def test_get_detection_details(self):
136 |         """Test getting detection details."""
137 |         # Setup mock response
138 |         mock_response = {
139 |             "status_code": 200,
140 |             "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
141 |         }
142 |         self.mock_client.command.return_value = mock_response
143 | 
144 |         # Call get_detection_details
145 |         result = self.module.get_detection_details(["detection1"], include_hidden=True)
146 | 
147 |         # Verify client command was called correctly
148 |         self.mock_client.command.assert_called_once_with(
149 |             "PostEntitiesAlertsV2",
150 |             body={"composite_ids": ["detection1"], "include_hidden": True},
151 |         )
152 | 
153 |         # Verify result - handle_api_response returns a list of resources
154 |         expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
155 |         self.assertEqual(result, expected_result)
156 | 
157 |     def test_get_detection_details_not_found(self):
158 |         """Test getting detection details for non-existent detection."""
159 |         # Setup mock response with empty resources
160 |         mock_response = {"status_code": 200, "body": {"resources": []}}
161 |         self.mock_client.command.return_value = mock_response
162 | 
163 |         # Call get_detection_details
164 |         result = self.module.get_detection_details(["nonexistent"])
165 | 
166 |         # For empty resources, handle_api_response returns the default_result (empty list)
167 |         # We should check that the result is empty
168 |         self.assertEqual(result, [])
169 | 
170 |     def test_search_detections_include_hidden_false(self):
171 |         """Test searching for detections with include_hidden=False."""
172 |         # Setup mock responses for both API calls
173 |         query_response = {
174 |             "status_code": 200,
175 |             "body": {"resources": ["detection1", "detection2"]},
176 |         }
177 |         details_response = {
178 |             "status_code": 200,
179 |             "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
180 |         }
181 |         self.mock_client.command.side_effect = [query_response, details_response]
182 | 
183 |         # Call search_detections with include_hidden=False
184 |         result = self.module.search_detections(
185 |             filter="test query", include_hidden=False
186 |         )
187 | 
188 |         # Verify client commands were called correctly
189 |         self.assertEqual(self.mock_client.command.call_count, 2)
190 | 
191 |         # Check that the second call includes include_hidden=False
192 |         self.mock_client.command.assert_any_call(
193 |             "PostEntitiesAlertsV2",
194 |             body={
195 |                 "composite_ids": ["detection1", "detection2"],
196 |                 "include_hidden": False,
197 |             },
198 |         )
199 | 
200 |         # Verify result
201 |         expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
202 |         self.assertEqual(result, expected_result)
203 | 
204 |     def test_get_detection_details_include_hidden_false(self):
205 |         """Test getting detection details with include_hidden=False."""
206 |         # Setup mock response
207 |         mock_response = {
208 |             "status_code": 200,
209 |             "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
210 |         }
211 |         self.mock_client.command.return_value = mock_response
212 | 
213 |         # Call get_detection_details with include_hidden=False
214 |         result = self.module.get_detection_details(["detection1"], include_hidden=False)
215 | 
216 |         # Verify client command was called correctly with include_hidden=False
217 |         self.mock_client.command.assert_called_once_with(
218 |             "PostEntitiesAlertsV2",
219 |             body={"composite_ids": ["detection1"], "include_hidden": False},
220 |         )
221 | 
222 |         # Verify result
223 |         expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
224 |         self.assertEqual(result, expected_result)
225 | 
226 | 
227 | if __name__ == "__main__":
228 |     unittest.main()
229 | 
```

--------------------------------------------------------------------------------
/falcon_mcp/modules/discover.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Discover module for Falcon MCP Server
  3 | 
  4 | This module provides tools for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets.
  5 | """
  6 | 
  7 | from textwrap import dedent
  8 | from typing import Any, Dict, List
  9 | 
 10 | from mcp.server import FastMCP
 11 | from mcp.server.fastmcp.resources import TextResource
 12 | from pydantic import AnyUrl, Field
 13 | 
 14 | from falcon_mcp.common.errors import handle_api_response
 15 | from falcon_mcp.common.logging import get_logger
 16 | from falcon_mcp.common.utils import prepare_api_parameters
 17 | from falcon_mcp.modules.base import BaseModule
 18 | from falcon_mcp.resources.discover import (
 19 |     SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
 20 |     SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
 21 | )
 22 | 
 23 | logger = get_logger(__name__)
 24 | 
 25 | 
 26 | class DiscoverModule(BaseModule):
 27 |     """Module for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets."""
 28 | 
 29 |     def register_tools(self, server: FastMCP) -> None:
 30 |         """Register tools with the MCP server.
 31 | 
 32 |         Args:
 33 |             server: MCP server instance
 34 |         """
 35 |         # Register tools
 36 |         self._add_tool(
 37 |             server=server,
 38 |             method=self.search_applications,
 39 |             name="search_applications",
 40 |         )
 41 | 
 42 |         self._add_tool(
 43 |             server=server,
 44 |             method=self.search_unmanaged_assets,
 45 |             name="search_unmanaged_assets",
 46 |         )
 47 | 
 48 |     def register_resources(self, server: FastMCP) -> None:
 49 |         """Register resources with the MCP server.
 50 | 
 51 |         Args:
 52 |             server: MCP server instance
 53 |         """
 54 |         search_applications_fql_resource = TextResource(
 55 |             uri=AnyUrl("falcon://discover/applications/fql-guide"),
 56 |             name="falcon_search_applications_fql_guide",
 57 |             description="Contains the guide for the `filter` param of the `falcon_search_applications` tool.",
 58 |             text=SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
 59 |         )
 60 | 
 61 |         search_unmanaged_assets_fql_resource = TextResource(
 62 |             uri=AnyUrl("falcon://discover/hosts/fql-guide"),
 63 |             name="falcon_search_unmanaged_assets_fql_guide",
 64 |             description="Contains the guide for the `filter` param of the `falcon_search_unmanaged_assets` tool.",
 65 |             text=SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
 66 |         )
 67 | 
 68 |         self._add_resource(
 69 |             server,
 70 |             search_applications_fql_resource,
 71 |         )
 72 | 
 73 |         self._add_resource(
 74 |             server,
 75 |             search_unmanaged_assets_fql_resource,
 76 |         )
 77 | 
 78 |     def search_applications(
 79 |         self,
 80 |         filter: str = Field(
 81 |             description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/applications/fql-guide` resource when building this filter parameter.",
 82 |             examples={"name:'Chrome'", "vendor:'Microsoft Corporation'"},
 83 |         ),
 84 |         facet: str | None = Field(
 85 |             default=None,
 86 |             description=dedent("""
 87 |                 Type of data to be returned for each application entity. The facet filter allows you to limit the response to just the information you want.
 88 | 
 89 |                 Possible values:
 90 |                 • browser_extension
 91 |                 • host_info
 92 |                 • install_usage
 93 | 
 94 |                 Note: Requests that do not include the host_info or browser_extension facets still return host.ID, browser_extension.ID, and browser_extension.enabled in the response.
 95 |             """).strip(),
 96 |             examples={"browser_extension", "host_info", "install_usage"},
 97 |         ),
 98 |         limit: int = Field(
 99 |             default=100,
100 |             ge=1,
101 |             le=1000,
102 |             description="Maximum number of items to return: 1-1000. Default is 100.",
103 |         ),
104 |         sort: str | None = Field(
105 |             default=None,
106 |             description="Property used to sort the results. All properties can be used to sort unless otherwise noted in their property descriptions.",
107 |             examples={"name.asc", "vendor.desc", "last_updated_timestamp.desc"},
108 |         ),
109 |     ) -> List[Dict[str, Any]]:
110 |         """Search for applications in your CrowdStrike environment.
111 | 
112 |         IMPORTANT: You must use the `falcon://discover/applications/fql-guide` resource when you need to use the `filter` parameter.
113 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_applications` tool.
114 |         """
115 |         # Prepare parameters for combined_applications
116 |         params = prepare_api_parameters(
117 |             {
118 |                 "filter": filter,
119 |                 "facet": facet,
120 |                 "limit": limit,
121 |                 "sort": sort,
122 |             }
123 |         )
124 | 
125 |         # Define the operation name
126 |         operation = "combined_applications"
127 | 
128 |         logger.debug("Searching applications with params: %s", params)
129 | 
130 |         # Make the API request
131 |         response = self.client.command(operation, parameters=params)
132 | 
133 |         # Use handle_api_response to get application data
134 |         applications = handle_api_response(
135 |             response,
136 |             operation=operation,
137 |             error_message="Failed to search applications",
138 |             default_result=[],
139 |         )
140 | 
141 |         # If handle_api_response returns an error dict instead of a list,
142 |         # it means there was an error, so we return it wrapped in a list
143 |         if self._is_error(applications):
144 |             return [applications]
145 | 
146 |         return applications
147 | 
148 |     def search_unmanaged_assets(
149 |         self,
150 |         filter: str | None = Field(
151 |             default=None,
152 |             description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/hosts/fql-guide` resource when building this filter parameter. Note: entity_type:'unmanaged' is automatically applied.",
153 |             examples={"platform_name:'Windows'", "criticality:'Critical'"},
154 |         ),
155 |         limit: int = Field(
156 |             default=100,
157 |             ge=1,
158 |             le=5000,
159 |             description="Maximum number of items to return: 1-5000. Default is 100.",
160 |         ),
161 |         offset: int | None = Field(
162 |             default=None,
163 |             description="Starting index of overall result set from which to return results.",
164 |         ),
165 |         sort: str | None = Field(
166 |             default=None,
167 |             description=dedent("""
168 |                 Sort unmanaged assets using these options:
169 | 
170 |                 hostname: Host name/computer name
171 |                 last_seen_timestamp: Timestamp when the asset was last seen
172 |                 first_seen_timestamp: Timestamp when the asset was first seen
173 |                 platform_name: Operating system platform
174 |                 os_version: Operating system version
175 |                 external_ip: External IP address
176 |                 country: Country location
177 |                 criticality: Criticality level
178 | 
179 |                 Sort either asc (ascending) or desc (descending).
180 |                 Both formats are supported: 'hostname.desc' or 'hostname|desc'
181 | 
182 |                 Examples: 'hostname.asc', 'last_seen_timestamp.desc', 'criticality.desc'
183 |             """).strip(),
184 |             examples={"hostname.asc", "last_seen_timestamp.desc", "criticality.desc"},
185 |         ),
186 |     ) -> List[Dict[str, Any]]:
187 |         """Search for unmanaged assets (hosts) in your CrowdStrike environment.
188 | 
189 |         These are systems that do not have the Falcon sensor installed but have been
190 |         discovered by systems that do have a Falcon sensor installed.
191 | 
192 |         IMPORTANT: You must use the `falcon://discover/hosts/fql-guide` resource when you need to use the `filter` parameter.
193 |         This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_unmanaged_assets` tool.
194 | 
195 |         The tool automatically filters for unmanaged assets only by adding entity_type:'unmanaged' to all queries.
196 |         You do not need to (and cannot) specify entity_type in your filter - it is always set to 'unmanaged'.
197 |         """
198 |         # Always enforce entity_type:'unmanaged' filter
199 |         base_filter = "entity_type:'unmanaged'"
200 | 
201 |         # Combine with user filter if provided
202 |         if filter:
203 |             combined_filter = f"{base_filter}+{filter}"
204 |         else:
205 |             combined_filter = base_filter
206 | 
207 |         # Prepare parameters for combined_hosts
208 |         params = prepare_api_parameters(
209 |             {
210 |                 "filter": combined_filter,
211 |                 "limit": limit,
212 |                 "offset": offset,
213 |                 "sort": sort,
214 |             }
215 |         )
216 | 
217 |         # Define the operation name
218 |         operation = "combined_hosts"
219 | 
220 |         logger.debug("Searching unmanaged assets with params: %s", params)
221 | 
222 |         # Make the API request
223 |         response = self.client.command(operation, parameters=params)
224 | 
225 |         # Use handle_api_response to get unmanaged asset data
226 |         assets = handle_api_response(
227 |             response,
228 |             operation=operation,
229 |             error_message="Failed to search unmanaged assets",
230 |             default_result=[],
231 |         )
232 | 
233 |         # If handle_api_response returns an error dict instead of a list,
234 |         # it means there was an error, so we return it wrapped in a list
235 |         if self._is_error(assets):
236 |             return [assets]
237 | 
238 |         return assets
239 | 
```

--------------------------------------------------------------------------------
/tests/modules/test_serverless.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the Serverless module.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | from falcon_mcp.modules.serverless import ServerlessModule
  9 | from tests.modules.utils.test_modules import TestModules
 10 | 
 11 | 
 12 | class TestServerlessModule(TestModules):
 13 |     """Test cases for the Serverless module."""
 14 | 
 15 |     def setUp(self):
 16 |         """Set up test fixtures."""
 17 |         self.setup_module(ServerlessModule)
 18 | 
 19 |     def test_register_tools(self):
 20 |         """Test registering tools with the server."""
 21 |         expected_tools = [
 22 |             "falcon_search_serverless_vulnerabilities",
 23 |         ]
 24 |         self.assert_tools_registered(expected_tools)
 25 | 
 26 |     def test_register_resources(self):
 27 |         """Test registering resources with the server."""
 28 |         expected_resources = [
 29 |             "falcon_serverless_vulnerabilities_fql_guide",
 30 |         ]
 31 |         self.assert_resources_registered(expected_resources)
 32 | 
 33 |     def test_search_serverless_vulnerabilities_success(self):
 34 |         """Test searching serverless vulnerabilities with successful response."""
 35 |         # Setup mock response with sample vulnerability data
 36 |         mock_response = {
 37 |             "status_code": 200,
 38 |             "body": {
 39 |                 "runs": [
 40 |                     {
 41 |                         "tool": {
 42 |                             "driver": {
 43 |                                 "name": "CrowdStrike",
 44 |                                 "informationUri": "https://www.crowdstrike.com/",
 45 |                                 "rules": [
 46 |                                     {
 47 |                                         "id": "CVE-2023-12345",
 48 |                                         "name": "PythonPackageVulnerability",
 49 |                                         "shortDescription": {"text": "Test vulnerability description"},
 50 |                                         "fullDescription": {"text": "Test vulnerability full description"},
 51 |                                         "help": {"text": "Package: test-package\nVulnerability: CVE-2023-12345"},
 52 |                                         "properties": {
 53 |                                             "severity": "HIGH",
 54 |                                             "cvssBaseScore": 8.5,
 55 |                                             "remediations": ["Upgrade to version 2.0.0"]
 56 |                                         }
 57 |                                     }
 58 |                                 ]
 59 |                             }
 60 |                         }
 61 |                     }
 62 |                 ]
 63 |             },
 64 |         }
 65 |         self.mock_client.command.return_value = mock_response
 66 | 
 67 |         # Mock the prepare_api_parameters function to return a simple dict
 68 |         self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response["body"]["runs"])
 69 | 
 70 |         # Call search_serverless_vulnerabilities with test parameters
 71 |         result = self.module.search_serverless_vulnerabilities(
 72 |             filter="cloud_provider:'aws'"
 73 |         )
 74 | 
 75 |         # Verify result contains expected values
 76 |         self.assertEqual(len(result), 1)
 77 |         self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")
 78 |         self.assertEqual(
 79 |             result[0]["tool"]["driver"]["rules"][0]["id"], "CVE-2023-12345"
 80 |         )
 81 |         self.assertEqual(
 82 |             result[0]["tool"]["driver"]["rules"][0]["properties"]["severity"], "HIGH"
 83 |         )
 84 | 
 85 |     def test_search_serverless_vulnerabilities_no_filter(self):
 86 |         """Test searching serverless vulnerabilities with no filter parameter."""
 87 |         # In the serverless module, filter is a required parameter with no default
 88 |         # So we should test that it's properly required
 89 | 
 90 |         # Create a mock implementation that raises TypeError when filter is not provided
 91 |         def mock_search(*args, **kwargs):
 92 |             if "filter" not in kwargs:
 93 |                 raise TypeError("filter is a required parameter")
 94 |             return []
 95 | 
 96 |         # Replace the method with our mock
 97 |         self.module.search_serverless_vulnerabilities = mock_search
 98 | 
 99 |         # This should raise a TypeError since filter is a required parameter
100 |         with self.assertRaises(TypeError):
101 |             self.module.search_serverless_vulnerabilities()
102 | 
103 |     def test_search_serverless_vulnerabilities_empty_response(self):
104 |         """Test searching serverless vulnerabilities with empty response."""
105 |         # Mock the method to return empty runs
106 |         self.module.search_serverless_vulnerabilities = MagicMock(return_value=[])
107 | 
108 |         # Call search_serverless_vulnerabilities
109 |         result = self.module.search_serverless_vulnerabilities(
110 |             filter="cloud_provider:'aws'"
111 |         )
112 | 
113 |         # Verify result is an empty list
114 |         self.assertEqual(result, [])
115 | 
116 |     def test_search_serverless_vulnerabilities_error(self):
117 |         """Test searching serverless vulnerabilities with API error."""
118 |         # Setup mock error response
119 |         error_response = {
120 |             "error": "Failed to search serverless vulnerabilities: Request failed with status code 400",
121 |             "details": {"errors": [{"message": "Invalid query"}]},
122 |         }
123 | 
124 |         # Mock the method to return the error
125 |         self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])
126 | 
127 |         # Call search_serverless_vulnerabilities
128 |         results = self.module.search_serverless_vulnerabilities(
129 |             filter="invalid query"
130 |         )
131 |         result = results[0]
132 | 
133 |         # Verify result contains error
134 |         self.assertIn("error", result)
135 |         self.assertIn("details", result)
136 |         # Check that the error message starts with the expected prefix
137 |         self.assertTrue(
138 |             result["error"].startswith("Failed to search serverless vulnerabilities")
139 |         )
140 | 
141 |     def test_search_serverless_vulnerabilities_with_all_params(self):
142 |         """Test searching serverless vulnerabilities with all parameters."""
143 |         # Setup mock response
144 |         mock_response = [
145 |             {
146 |                 "tool": {
147 |                     "driver": {
148 |                         "name": "CrowdStrike",
149 |                         "rules": [
150 |                             {
151 |                                 "id": "CVE-2023-12345",
152 |                                 "properties": {"severity": "HIGH"}
153 |                             }
154 |                         ]
155 |                     }
156 |                 }
157 |             }
158 |         ]
159 | 
160 |         # Mock the method to return the mock response
161 |         self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response)
162 | 
163 |         # Call search_serverless_vulnerabilities with all parameters
164 |         result = self.module.search_serverless_vulnerabilities(
165 |             filter="cloud_provider:'aws'",
166 |             limit=5,
167 |             offset=10,
168 |             sort="severity",
169 |         )
170 | 
171 |         # Verify result contains expected values
172 |         self.assertEqual(len(result), 1)
173 |         self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")
174 | 
175 |     def test_search_serverless_vulnerabilities_missing_runs(self):
176 |         """Test searching serverless vulnerabilities with missing runs in response."""
177 |         # Setup mock response with missing runs
178 |         error_response = {
179 |             "error": "Failed to search serverless vulnerabilities: Missing 'runs' in response",
180 |             "details": {"body": {}}
181 |         }
182 | 
183 |         # Mock the method to return the error
184 |         self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])
185 | 
186 |         # Call search_serverless_vulnerabilities
187 |         result = self.module.search_serverless_vulnerabilities(
188 |             filter="cloud_provider:'aws'"
189 |         )
190 | 
191 |         # Verify result is a list with one item containing error info
192 |         self.assertEqual(len(result), 1)
193 |         self.assertIn("error", result[0])
194 | 
195 |     def test_search_serverless_vulnerabilities_none_runs(self):
196 |         """Test searching serverless vulnerabilities when 'runs' key exists but is None."""
197 |         # We need to mock handle_api_response to return a dict with runs=None
198 |         # This simulates the case where the API returns a response with runs=None
199 | 
200 |         # Create a mock response that handle_api_response will process
201 |         mock_api_response = {
202 |             "status_code": 200,
203 |             "body": {
204 |                 # The actual structure doesn't matter as we'll mock handle_api_response
205 |             }
206 |         }
207 |         self.mock_client.command.return_value = mock_api_response
208 | 
209 |         # Mock handle_api_response to return a dict with runs=None
210 |         mock_processed_response = {"runs": None}  # This is what we want to test
211 | 
212 |         with patch('falcon_mcp.modules.serverless.handle_api_response', return_value=mock_processed_response):
213 |             # Call search_serverless_vulnerabilities
214 |             result = self.module.search_serverless_vulnerabilities(
215 |                 filter="cloud_provider:'aws'"
216 |             )
217 | 
218 |             # Verify result is an empty list
219 |             self.assertEqual(result, [])
220 | 
221 |             # Verify the API was called with the correct parameters
222 |             self.mock_client.command.assert_called_once()
223 |             call_args = self.mock_client.command.call_args[1]
224 |             self.assertEqual(call_args["parameters"]["filter"], "cloud_provider:'aws'")
225 | 
226 | 
227 | if __name__ == "__main__":
228 |     unittest.main()
229 | 
```

--------------------------------------------------------------------------------
/tests/e2e/modules/test_detections.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | E2E tests for the Detections module.
  3 | """
  4 | 
  5 | import json
  6 | import unittest
  7 | 
  8 | import pytest
  9 | 
 10 | from tests.e2e.utils.base_e2e_test import BaseE2ETest
 11 | 
 12 | 
 13 | @pytest.mark.e2e
 14 | class TestDetectionsModuleE2E(BaseE2ETest):
 15 |     """
 16 |     End-to-end test suite for the Falcon MCP Server Detections Module.
 17 |     """
 18 | 
 19 |     def test_get_top_3_high_severity_detections(self):
 20 |         """Verify the agent can retrieve the top 3 high-severity detections."""
 21 | 
 22 |         async def test_logic():
 23 |             fixtures = [
 24 |                 {
 25 |                     "operation": "GetQueriesAlertsV2",
 26 |                     "validator": lambda kwargs: "severity:"
 27 |                     in kwargs.get("parameters", {}).get("filter", "").lower()
 28 |                     and kwargs.get("parameters", {}).get("limit", 0) <= 10,
 29 |                     "response": {
 30 |                         "status_code": 200,
 31 |                         "body": {
 32 |                             "resources": ["detection-1", "detection-2", "detection-3"]
 33 |                         },
 34 |                     },
 35 |                 },
 36 |                 {
 37 |                     "operation": "PostEntitiesAlertsV2",
 38 |                     "validator": lambda kwargs: "detection-1"
 39 |                     in kwargs.get("body", {}).get("composite_ids", []),
 40 |                     "response": {
 41 |                         "status_code": 200,
 42 |                         "body": {
 43 |                             "resources": [
 44 |                                 {
 45 |                                     "id": "detection-1",
 46 |                                     "composite_id": "detection-1",
 47 |                                     "status": "new",
 48 |                                     "severity": 90,
 49 |                                     "severity_name": "Critical",
 50 |                                     "confidence": 85,
 51 |                                     "description": "A critical detection for E2E testing.",
 52 |                                     "created_timestamp": "2024-01-20T10:00:00Z",
 53 |                                     "agent_id": "test-agent-001",
 54 |                                 },
 55 |                                 {
 56 |                                     "id": "detection-2",
 57 |                                     "composite_id": "detection-2",
 58 |                                     "status": "new",
 59 |                                     "severity": 70,
 60 |                                     "severity_name": "High",
 61 |                                     "confidence": 80,
 62 |                                     "description": "A high severity detection for E2E testing.",
 63 |                                     "created_timestamp": "2024-01-20T09:30:00Z",
 64 |                                     "agent_id": "test-agent-002",
 65 |                                 },
 66 |                                 {
 67 |                                     "id": "detection-3",
 68 |                                     "composite_id": "detection-3",
 69 |                                     "status": "new",
 70 |                                     "severity": 70,
 71 |                                     "severity_name": "High",
 72 |                                     "confidence": 75,
 73 |                                     "description": "Another high severity detection for E2E testing.",
 74 |                                     "created_timestamp": "2024-01-20T09:00:00Z",
 75 |                                     "agent_id": "test-agent-003",
 76 |                                 },
 77 |                             ]
 78 |                         },
 79 |                     },
 80 |                 },
 81 |             ]
 82 | 
 83 |             self._mock_api_instance.command.side_effect = (
 84 |                 self._create_mock_api_side_effect(fixtures)
 85 |             )
 86 | 
 87 |             prompt = "Give me the details of the top 3 high severity detections, return only detection id and descriptions"
 88 |             return await self._run_agent_stream(prompt)
 89 | 
 90 |         def assertions(tools, result):
 91 |             self.assertGreaterEqual(len(tools), 1, "Expected 1 tool call")
 92 |             used_tool = tools[len(tools) - 1]
 93 |             self.assertEqual(
 94 |                 used_tool["input"]["tool_name"], "falcon_search_detections"
 95 |             )
 96 |             # Check for severity-related filtering (numeric or text-based)
 97 |             tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
 98 |             self.assertTrue(
 99 |                 "severity:" in tool_input_str or "high" in tool_input_str,
100 |                 f"Expected severity filtering in tool input: {tool_input_str}",
101 |             )
102 |             self.assertIn("detection-1", used_tool["output"])
103 |             self.assertIn("detection-2", used_tool["output"])
104 |             self.assertIn("detection-3", used_tool["output"])
105 | 
106 |             self.assertGreaterEqual(
107 |                 self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
108 |             )
109 |             api_call_1_params = self._mock_api_instance.command.call_args_list[0][
110 |                 1
111 |             ].get("parameters", {})
112 |             filter_str = api_call_1_params.get("filter", "").lower()
113 |             # Accept either numeric severity filters or text-based filters
114 |             self.assertTrue(
115 |                 "severity:" in filter_str or "high" in filter_str,
116 |                 f"Expected severity filtering in API call: {filter_str}",
117 |             )
118 |             self.assertEqual(api_call_1_params.get("limit"), 3)
119 |             self.assertIn("severity.desc", api_call_1_params.get("sort", ""))
120 |             api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
121 |                 "body", {}
122 |             )
123 |             self.assertEqual(
124 |                 api_call_2_body.get("composite_ids"),
125 |                 ["detection-1", "detection-2", "detection-3"],
126 |             )
127 | 
128 |             self.assertIn("detection-1", result)
129 |             self.assertIn("detection-2", result)
130 |             self.assertIn("detection-3", result)
131 | 
132 |         self.run_test_with_retries(
133 |             "test_get_top_3_high_severity_detections",
134 |             test_logic,
135 |             assertions,
136 |         )
137 | 
138 |     def test_get_highest_detection_for_ip(self):
139 |         """Verify the agent can find the highest-severity detection for a specific IP."""
140 | 
141 |         async def test_logic():
142 |             fixtures = [
143 |                 {
144 |                     "operation": "GetQueriesAlertsV2",
145 |                     "validator": lambda kwargs: "10.0.0.1"
146 |                     in kwargs.get("parameters", {}).get("filter", ""),
147 |                     "response": {
148 |                         "status_code": 200,
149 |                         "body": {"resources": ["detection-4"]},
150 |                     },
151 |                 },
152 |                 {
153 |                     "operation": "PostEntitiesAlertsV2",
154 |                     "validator": lambda kwargs: "detection-4"
155 |                     in kwargs.get("body", {}).get("composite_ids", []),
156 |                     "response": {
157 |                         "status_code": 200,
158 |                         "body": {
159 |                             "resources": [
160 |                                 {
161 |                                     "id": "detection-4",
162 |                                     "composite_id": "detection-4",
163 |                                     "status": "new",
164 |                                     "severity": 90,
165 |                                     "severity_name": "Critical",
166 |                                     "confidence": 95,
167 |                                     "description": "A critical detection on a specific IP.",
168 |                                     "created_timestamp": "2024-01-20T11:00:00Z",
169 |                                     "agent_id": "test-agent-004",
170 |                                     "local_ip": "10.0.0.1",
171 |                                 }
172 |                             ]
173 |                         },
174 |                     },
175 |                 },
176 |             ]
177 | 
178 |             self._mock_api_instance.command.side_effect = (
179 |                 self._create_mock_api_side_effect(fixtures)
180 |             )
181 | 
182 |             prompt = "What is the highest detection for the device with local_ip 10.0.0.1? Return the detection id as well"
183 |             return await self._run_agent_stream(prompt)
184 | 
185 |         def assertions(tools, result):
186 |             self.assertGreaterEqual(
187 |                 len(tools), 1, f"Expected 1 tool call, but got {len(tools)}"
188 |             )
189 |             used_tool = tools[len(tools) - 1]
190 |             self.assertEqual(
191 |                 used_tool["input"]["tool_name"], "falcon_search_detections"
192 |             )
193 |             self.assertIn("10.0.0.1", json.dumps(used_tool["input"]["tool_input"]))
194 |             self.assertIn("detection-4", used_tool["output"])
195 | 
196 |             self.assertGreaterEqual(
197 |                 self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
198 |             )
199 |             api_call_1_params = self._mock_api_instance.command.call_args_list[0][
200 |                 1
201 |             ].get("parameters", {})
202 |             self.assertIn("10.0.0.1", api_call_1_params.get("filter"))
203 |             api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
204 |                 "body", {}
205 |             )
206 |             self.assertEqual(api_call_2_body.get("composite_ids"), ["detection-4"])
207 | 
208 |             self.assertIn("detection-4", result)
209 |             self.assertNotIn("detection-1", result)
210 | 
211 |         self.run_test_with_retries(
212 |             "test_get_highest_detection_for_ip", test_logic, assertions
213 |         )
214 | 
215 | 
216 | if __name__ == "__main__":
217 |     unittest.main()
218 | 
```

--------------------------------------------------------------------------------
/docs/resource_development.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Falcon MCP Server Resource Development Guide
  2 | 
  3 | This guide provides instructions for implementing and registering resources for the Falcon MCP server.
  4 | 
  5 | ## What are Resources?
  6 | 
  7 | Resources in the Falcon MCP server represent data sources that can be accessed by the model. Unlike tools, which perform actions, resources provide context or information that the model can reference. Resources are particularly useful for:
  8 | 
  9 | 1. Providing documentation or guides that the model can reference
 10 | 2. Exposing structured data that tools can use
 11 | 3. Making reference information available without requiring a tool call
 12 | 
 13 | ## Resource Structure
 14 | 
 15 | Each resource should:
 16 | 
 17 | 1. Be created using an appropriate resource class (e.g., `TextResource`)
 18 | 2. Have a unique URI that identifies it
 19 | 3. Include a descriptive name and description
 20 | 4. Be registered with the MCP server through a module's `register_resources` method
 21 | 
 22 | ## Step-by-Step Implementation Guide
 23 | 
 24 | ### 1. Create Resource Content
 25 | 
 26 | First, define the content for your resource. This could be:
 27 | 
 28 | - Documentation text in a separate file
 29 | - Structured data in a Python dictionary
 30 | - Reference information in a dedicated module
 31 | 
 32 | For text-based resources, it's recommended to store the content in a separate file in the `falcon_mcp/resources` directory:
 33 | 
 34 | ```python
 35 | # falcon_mcp/resources/your_resource.py
 36 | YOUR_RESOURCE_CONTENT = """
 37 | Detailed documentation or reference information goes here.
 38 | This can be multi-line text with formatting.
 39 | 
 40 | ## Section Headers
 41 | 
 42 | - Bullet points
 43 | - And other formatting
 44 | 
 45 | Code examples:
 46 | ...
 47 | 
 48 | """
 49 | 
 50 | ```
 51 | 
 52 | ### 2. Register Resources in Your Module
 53 | 
 54 | In your module class, implement the `register_resources` method:
 55 | 
 56 | ```python
 57 | from mcp.server import FastMCP
 58 | from mcp.server.fastmcp.resources import TextResource
 59 | from pydantic import AnyUrl
 60 | 
 61 | from ..resources.your_resource import YOUR_RESOURCE_CONTENT
 62 | from .base import BaseModule
 63 | 
 64 | 
 65 | class YourModule(BaseModule):
 66 |     """Your module description."""
 67 | 
 68 |     def register_tools(self, server: FastMCP) -> None:
 69 |         """Register tools with the MCP server."""
 70 |         # Tool registration code...
 71 | 
 72 |     def register_resources(self, server: FastMCP) -> None:
 73 |         """Register resources with the MCP server.
 74 | 
 75 |         Args:
 76 |             server: MCP server instance
 77 |         """
 78 |         your_resource = TextResource(
 79 |             uri=AnyUrl("falcon://your-module/resource-name"),
 80 |             name="your_resource_name",
 81 |             description="Description of what this resource provides.",
 82 |             text=YOUR_RESOURCE_CONTENT,
 83 |         )
 84 | 
 85 |         self._add_resource(
 86 |             server,
 87 |             your_resource,
 88 |         )
 89 | ```
 90 | 
 91 | ### 3. Resource URI Conventions
 92 | 
 93 | Resource URIs should follow a consistent pattern:
 94 | 
 95 | - Start with `falcon://` as the scheme
 96 | - Include the module name as the first path segment
 97 | - Use descriptive path segments for the resource
 98 | - Use hyphens to separate words in path segments
 99 | 
100 | Examples:
101 | 
102 | - `falcon://intel/query_actor_entities/fql-guide`
103 | - `falcon://detections/search/fql-guide`
104 | - `falcon://incidents/status-codes`
105 | 
106 | ### 4. Resource Types
107 | 
108 | The MCP server supports several resource types:
109 | 
110 | #### TextResource
111 | 
112 | Used for providing text-based documentation or reference information:
113 | 
114 | ```python
115 | from mcp.server.fastmcp.resources import TextResource
116 | from pydantic import AnyUrl
117 | 
118 | text_resource = TextResource(
119 |     uri=AnyUrl("falcon://module/resource-name"),
120 |     name="resource_name",
121 |     description="Resource description",
122 |     text="Resource content",
123 | )
124 | ```
125 | 
126 | #### Other Resource Types
127 | 
128 | Additional resource types may be available depending on the MCP server implementation. Consult the MCP server documentation for details on other resource types.
129 | 
130 | ## Best Practices
131 | 
132 | ### Resource Content
133 | 
134 | 1. **Comprehensive Documentation**: Provide detailed information that covers all aspects of the topic
135 | 2. **Structured Format**: Use clear section headers, bullet points, and code examples
136 | 3. **Examples**: Include practical examples that demonstrate usage
137 | 4. **Consistent Style**: Follow a consistent documentation style across all resources
138 | 
139 | ### Resource Registration
140 | 
141 | 1. **Descriptive Names**: Use clear, descriptive names for resources
142 | 2. **Detailed Descriptions**: Provide informative descriptions that explain what the resource contains
143 | 3. **Logical Organization**: Group related resources together with consistent URI patterns
144 | 4. **Reference from Tools**: Reference resources in tool documentation where appropriate
145 | 
146 | ### Resource Usage
147 | 
148 | 1. **Tool Integration**: Design resources to complement tools by providing context or documentation
149 | 2. **Self-Contained**: Resources should be self-contained and not require additional context
150 | 3. **Versioning**: Consider versioning strategies for resources that may change over time
151 | 
152 | ## Example: Intel Module Resources
153 | 
154 | The Intel module provides a good example of resource implementation:
155 | 
156 | ```python
157 | from mcp.server import FastMCP
158 | from mcp.server.fastmcp.resources import TextResource
159 | from pydantic import AnyUrl
160 | 
161 | from ..resources.intel import QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION
162 | from .base import BaseModule
163 | 
164 | 
165 | class IntelModule(BaseModule):
166 |     """Module for accessing and analyzing CrowdStrike Falcon intelligence data."""
167 | 
168 |     def register_resources(self, server) -> None:
169 |         """Register resources with the MCP server.
170 | 
171 |         Args:
172 |             server: MCP server instance
173 |         """
174 | 
175 |         query_actor_entities_resource = TextResource(
176 |             uri=AnyUrl("falcon://intel/query_actor_entities/fql-guide"),
177 |             name="falcon_query_actor_entities_fql_guide",
178 |             description="Contains the guide for the `filter` param of the `falcon_search_actors` tool.",
179 |             text=QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION,
180 |         )
181 | 
182 |         self._add_resource(
183 |             server,
184 |             query_actor_entities_resource,
185 |         )
186 | ```
187 | 
188 | In this example:
189 | 
190 | 1. The resource content (`QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION`) is defined in a separate file (`falcon_mcp/resources/intel.py`)
191 | 2. The resource is created as a `TextResource` with a clear URI, name, and description
192 | 3. The resource is registered with the server using the `_add_resource` method
193 | 4. The resource complements the `search_actors` tool by providing documentation for its `filter` parameter
194 | 
195 | ## Integrating Resources with Tools
196 | 
197 | Resources can be particularly valuable when integrated with tools. Here's how the Intel module references its resource in a tool method:
198 | 
199 | ```python
200 | def query_actor_entities(
201 |     self,
202 |     filter: Optional[str] = Field(
203 |         default=None,
204 |         description="FQL query expression that should be used to limit the results. IMPORTANT: use the 'falcon://query_actor_entities_fql_documentation' resource when building this parameter.",
205 |     ),
206 |     # Other parameters...
207 | ) -> List[Dict[str, Any]]:
208 |     """Get info about actors that match provided FQL filters.
209 | 
210 |     IMPORTANT: You must call the FQL Guide for Intel Query Actor Entities (falcon://intel/query_actor_entities/fql-guide) resource first
211 | 
212 |     Returns:
213 |         Information about actors that match the provided filters.
214 |     """
215 |     # Method implementation...
216 | ```
217 | 
218 | Note how:
219 | 
220 | 1. The resource URI is referenced in the parameter description
221 | 2. The docstring explicitly mentions the resource that should be consulted
222 | 3. This creates a clear link between the tool and its supporting documentation
223 | 
224 | ## Contributing Resource Changes
225 | 
226 | When contributing new resources or changes to existing resources, please follow these guidelines:
227 | 
228 | ### Conventional Commits for Resources
229 | 
230 | This project uses [Conventional Commits](https://www.conventionalcommits.org/) for automated releases and clear commit history. When contributing resource-related changes, use these commit message patterns:
231 | 
232 | **Adding New Resources:**
233 | 
234 | ```bash
235 | git commit -m "feat(resources): add FQL guide for [module-name] module"
236 | git commit -m "feat(resources): add documentation for [specific-topic]"
237 | # Examples:
238 | git commit -m "feat(resources): add FQL guide for cloud module"
239 | git commit -m "feat(resources): add hosts search documentation"
240 | ```
241 | 
242 | **Modifying Existing Resources:**
243 | 
244 | ```bash
245 | git commit -m "refactor(resources): reword FQL guide in cloud resource"
246 | git commit -m "fix(resources): correct formatting in intel FQL documentation"
247 | git commit -m "docs(resources): update resource development guide"
248 | # Examples:
249 | git commit -m "refactor(resources): improve clarity in detections FQL guide"
250 | git commit -m "fix(resources): correct syntax examples in incidents resource"
251 | ```
252 | 
253 | **Resource Tests and Infrastructure:**
254 | 
255 | ```bash
256 | git commit -m "test(resources): add validation tests for resource content"
257 | git commit -m "chore(resources): update resource registration patterns"
258 | ```
259 | 
260 | See the main [CONTRIBUTING.md](CONTRIBUTING.md) guide for complete conventional commits guidelines.
261 | 
262 | ## Conclusion
263 | 
264 | Resources are a powerful way to provide context and documentation to the model. By following the guidelines in this document, you can create effective resources that complement your tools and enhance the overall functionality of the Falcon MCP server.
265 | 
266 | When developing resources:
267 | 
268 | 1. Focus on providing clear, comprehensive information
269 | 2. Follow consistent naming and URI conventions
270 | 3. Integrate resources with related tools
271 | 4. Test resource registration to ensure proper functionality
272 | 
273 | Resources, when used effectively alongside tools, create a more powerful and user-friendly experience by providing the necessary context and documentation for complex operations.
274 | 
```

--------------------------------------------------------------------------------
/tests/modules/test_discover.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Unit tests for the Discover module.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | from mcp.server import FastMCP
  9 | 
 10 | from falcon_mcp.client import FalconClient
 11 | from falcon_mcp.modules.discover import DiscoverModule
 12 | 
 13 | 
 14 | class TestDiscoverModule(unittest.TestCase):
 15 |     """Test cases for the Discover module."""
 16 | 
 17 |     def setUp(self):
 18 |         """Set up test fixtures."""
 19 |         self.client = MagicMock(spec=FalconClient)
 20 |         self.module = DiscoverModule(self.client)
 21 |         self.server = MagicMock(spec=FastMCP)
 22 | 
 23 |     def test_register_tools(self):
 24 |         """Test that tools are registered correctly."""
 25 |         self.module.register_tools(self.server)
 26 |         self.assertEqual(self.server.add_tool.call_count, 2)
 27 |         self.assertEqual(len(self.module.tools), 2)
 28 |         self.assertEqual(self.module.tools[0], "falcon_search_applications")
 29 |         self.assertEqual(self.module.tools[1], "falcon_search_unmanaged_assets")
 30 | 
 31 |     def test_register_resources(self):
 32 |         """Test that resources are registered correctly."""
 33 |         self.module.register_resources(self.server)
 34 |         self.assertEqual(self.server.add_resource.call_count, 2)
 35 |         self.assertEqual(len(self.module.resources), 2)
 36 |         self.assertEqual(
 37 |             str(self.module.resources[0]), "falcon://discover/applications/fql-guide"
 38 |         )
 39 |         self.assertEqual(
 40 |             str(self.module.resources[1]), "falcon://discover/hosts/fql-guide"
 41 |         )
 42 | 
 43 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
 44 |     @patch("falcon_mcp.modules.discover.handle_api_response")
 45 |     def test_search_applications(self, mock_handle_response, mock_prepare_params):
 46 |         """Test search_applications method."""
 47 |         # Setup mocks
 48 |         mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
 49 |         mock_response = MagicMock()
 50 |         self.client.command.return_value = mock_response
 51 |         mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]
 52 | 
 53 |         # Call the method
 54 |         result = self.module.search_applications(filter="name:'Chrome'")
 55 | 
 56 |         # Assertions
 57 |         # Don't check the exact arguments, just verify it was called once
 58 |         self.assertEqual(mock_prepare_params.call_count, 1)
 59 |         self.client.command.assert_called_once_with(
 60 |             "combined_applications", parameters={"filter": "name:'Chrome'"}
 61 |         )
 62 |         mock_handle_response.assert_called_once_with(
 63 |             mock_response,
 64 |             operation="combined_applications",
 65 |             error_message="Failed to search applications",
 66 |             default_result=[],
 67 |         )
 68 |         self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])
 69 | 
 70 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
 71 |     @patch("falcon_mcp.modules.discover.handle_api_response")
 72 |     def test_search_applications_with_error(self, mock_handle_response, mock_prepare_params):
 73 |         """Test search_applications method when an error occurs."""
 74 |         # Setup mocks
 75 |         mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
 76 |         mock_response = MagicMock()
 77 |         self.client.command.return_value = mock_response
 78 |         error_response = {"error": "API Error", "message": "Something went wrong"}
 79 |         mock_handle_response.return_value = error_response
 80 | 
 81 |         # Call the method
 82 |         result = self.module.search_applications(filter="name:'Chrome'")
 83 | 
 84 |         # Assertions
 85 |         self.assertEqual(result, [error_response])
 86 | 
 87 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
 88 |     @patch("falcon_mcp.modules.discover.handle_api_response")
 89 |     def test_search_applications_with_all_params(self, mock_handle_response, mock_prepare_params):
 90 |         """Test search_applications method with all parameters."""
 91 |         # Setup mocks
 92 |         mock_prepare_params.return_value = {
 93 |             "filter": "name:'Chrome'",
 94 |             "facet": "host_info",
 95 |             "limit": 50,
 96 |             "sort": "name.asc",
 97 |         }
 98 |         mock_response = MagicMock()
 99 |         self.client.command.return_value = mock_response
100 |         mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]
101 | 
102 |         # Call the method
103 |         result = self.module.search_applications(
104 |             filter="name:'Chrome'",
105 |             facet="host_info",
106 |             limit=50,
107 |             sort="name.asc",
108 |         )
109 | 
110 |         # Assertions
111 |         # Don't check the exact arguments, just verify it was called once
112 |         self.assertEqual(mock_prepare_params.call_count, 1)
113 |         self.client.command.assert_called_once_with(
114 |             "combined_applications",
115 |             parameters={
116 |                 "filter": "name:'Chrome'",
117 |                 "facet": "host_info",
118 |                 "limit": 50,
119 |                 "sort": "name.asc",
120 |             },
121 |         )
122 |         self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])
123 | 
124 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
125 |     @patch("falcon_mcp.modules.discover.handle_api_response")
126 |     def test_search_unmanaged_assets(self, mock_handle_response, mock_prepare_params):
127 |         """Test search_unmanaged_assets method."""
128 |         # Setup mocks
129 |         mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
130 |         mock_response = MagicMock()
131 |         self.client.command.return_value = mock_response
132 |         mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
133 | 
134 |         # Call the method
135 |         result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")
136 | 
137 |         # Assertions
138 |         # Don't check the exact arguments, just verify it was called once
139 |         self.assertEqual(mock_prepare_params.call_count, 1)
140 |         self.client.command.assert_called_once_with(
141 |             "combined_hosts", parameters={"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
142 |         )
143 |         mock_handle_response.assert_called_once_with(
144 |             mock_response,
145 |             operation="combined_hosts",
146 |             error_message="Failed to search unmanaged assets",
147 |             default_result=[],
148 |         )
149 |         self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
150 | 
151 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
152 |     @patch("falcon_mcp.modules.discover.handle_api_response")
153 |     def test_search_unmanaged_assets_without_filter(self, mock_handle_response, mock_prepare_params):
154 |         """Test search_unmanaged_assets method without user filter."""
155 |         # Setup mocks
156 |         mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'"}
157 |         mock_response = MagicMock()
158 |         self.client.command.return_value = mock_response
159 |         mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
160 | 
161 |         # Call the method with no filter
162 |         result = self.module.search_unmanaged_assets()
163 | 
164 |         # Assertions
165 |         # Don't check the exact arguments, just verify it was called once
166 |         self.assertEqual(mock_prepare_params.call_count, 1)
167 |         self.client.command.assert_called_once_with(
168 |             "combined_hosts", parameters={"filter": "entity_type:'unmanaged'"}
169 |         )
170 |         self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
171 | 
172 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
173 |     @patch("falcon_mcp.modules.discover.handle_api_response")
174 |     def test_search_unmanaged_assets_with_error(self, mock_handle_response, mock_prepare_params):
175 |         """Test search_unmanaged_assets method when an error occurs."""
176 |         # Setup mocks
177 |         mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
178 |         mock_response = MagicMock()
179 |         self.client.command.return_value = mock_response
180 |         error_response = {"error": "API Error", "message": "Something went wrong"}
181 |         mock_handle_response.return_value = error_response
182 | 
183 |         # Call the method
184 |         result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")
185 | 
186 |         # Assertions
187 |         self.assertEqual(result, [error_response])
188 | 
189 |     @patch("falcon_mcp.modules.discover.prepare_api_parameters")
190 |     @patch("falcon_mcp.modules.discover.handle_api_response")
191 |     def test_search_unmanaged_assets_with_all_params(self, mock_handle_response, mock_prepare_params):
192 |         """Test search_unmanaged_assets method with all parameters."""
193 |         # Setup mocks
194 |         mock_prepare_params.return_value = {
195 |             "filter": "entity_type:'unmanaged'+criticality:'Critical'",
196 |             "limit": 50,
197 |             "offset": 10,
198 |             "sort": "hostname.asc",
199 |         }
200 |         mock_response = MagicMock()
201 |         self.client.command.return_value = mock_response
202 |         mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
203 | 
204 |         # Call the method
205 |         result = self.module.search_unmanaged_assets(
206 |             filter="criticality:'Critical'",
207 |             limit=50,
208 |             offset=10,
209 |             sort="hostname.asc",
210 |         )
211 | 
212 |         # Assertions
213 |         # Don't check the exact arguments, just verify it was called once
214 |         self.assertEqual(mock_prepare_params.call_count, 1)
215 |         self.client.command.assert_called_once_with(
216 |             "combined_hosts",
217 |             parameters={
218 |                 "filter": "entity_type:'unmanaged'+criticality:'Critical'",
219 |                 "limit": 50,
220 |                 "offset": 10,
221 |                 "sort": "hostname.asc",
222 |             },
223 |         )
224 |         self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
225 | 
226 | 
227 | if __name__ == "__main__":
228 |     unittest.main()
229 | 
```
Page 2/5FirstPrevNextLast