This is page 2 of 5. Use http://codebase.md/crowdstrike/falcon-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.dev.example
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug.yaml
│ │ ├── config.yml
│ │ ├── feature-request.yaml
│ │ └── question.yaml
│ └── workflows
│ ├── docker-build-push.yml
│ ├── docker-build-test.yml
│ ├── markdown-lint.yml
│ ├── python-lint.yml
│ ├── python-test-e2e.yml
│ ├── python-test.yml
│ └── release.yml
├── .gitignore
├── .markdownlint.json
├── CHANGELOG.md
├── Dockerfile
├── docs
│ ├── CODE_OF_CONDUCT.md
│ ├── CONTRIBUTING.md
│ ├── deployment
│ │ ├── amazon_bedrock_agentcore.md
│ │ └── google_cloud.md
│ ├── e2e_testing.md
│ ├── module_development.md
│ ├── resource_development.md
│ └── SECURITY.md
├── examples
│ ├── adk
│ │ ├── adk_agent_operations.sh
│ │ ├── falcon_agent
│ │ │ ├── __init__.py
│ │ │ ├── agent.py
│ │ │ ├── env.properties
│ │ │ └── requirements.txt
│ │ └── README.md
│ ├── basic_usage.py
│ ├── mcp_config.json
│ ├── sse_usage.py
│ └── streamable_http_usage.py
├── falcon_mcp
│ ├── __init__.py
│ ├── client.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── api_scopes.py
│ │ ├── errors.py
│ │ ├── logging.py
│ │ └── utils.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── idp.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ ├── registry.py
│ ├── resources
│ │ ├── __init__.py
│ │ ├── cloud.py
│ │ ├── detections.py
│ │ ├── discover.py
│ │ ├── hosts.py
│ │ ├── incidents.py
│ │ ├── intel.py
│ │ ├── sensor_usage.py
│ │ ├── serverless.py
│ │ └── spotlight.py
│ └── server.py
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── generate_e2e_report.py
│ └── test_results_viewer.html
├── SUPPORT.md
├── tests
│ ├── __init__.py
│ ├── common
│ │ ├── __init__.py
│ │ ├── test_api_scopes.py
│ │ ├── test_errors.py
│ │ ├── test_logging.py
│ │ └── test_utils.py
│ ├── conftest.py
│ ├── e2e
│ │ ├── __init__.py
│ │ ├── modules
│ │ │ ├── __init__.py
│ │ │ ├── test_cloud.py
│ │ │ ├── test_detections.py
│ │ │ ├── test_discover.py
│ │ │ ├── test_hosts.py
│ │ │ ├── test_idp.py
│ │ │ ├── test_incidents.py
│ │ │ ├── test_intel.py
│ │ │ ├── test_sensor_usage.py
│ │ │ ├── test_serverless.py
│ │ │ └── test_spotlight.py
│ │ └── utils
│ │ ├── __init__.py
│ │ └── base_e2e_test.py
│ ├── modules
│ │ ├── __init__.py
│ │ ├── test_base.py
│ │ ├── test_cloud.py
│ │ ├── test_detections.py
│ │ ├── test_discover.py
│ │ ├── test_hosts.py
│ │ ├── test_idp.py
│ │ ├── test_incidents.py
│ │ ├── test_intel.py
│ │ ├── test_sensor_usage.py
│ │ ├── test_serverless.py
│ │ ├── test_spotlight.py
│ │ └── utils
│ │ └── test_modules.py
│ ├── test_client.py
│ ├── test_registry.py
│ ├── test_server.py
│ └── test_streamable_http_transport.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/modules/test_spotlight.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Spotlight module.
3 | """
4 |
5 | import unittest
6 |
7 | from falcon_mcp.modules.spotlight import SpotlightModule
8 | from tests.modules.utils.test_modules import TestModules
9 |
10 |
11 | class TestSpotlightModule(TestModules):
12 | """Test cases for the Spotlight module."""
13 |
14 | def setUp(self):
15 | """Set up test fixtures."""
16 | self.setup_module(SpotlightModule)
17 |
18 | def test_register_tools(self):
19 | """Test registering tools with the server."""
20 | expected_tools = [
21 | "falcon_search_vulnerabilities",
22 | ]
23 | self.assert_tools_registered(expected_tools)
24 |
25 | def test_register_resources(self):
26 | """Test registering resources with the server."""
27 | expected_resources = [
28 | "falcon_search_vulnerabilities_fql_guide",
29 | ]
30 | self.assert_resources_registered(expected_resources)
31 |
32 | def test_search_vulnerabilities_success(self):
33 | """Test searching vulnerabilities with successful response."""
34 | # Setup mock response with sample vulnerability data
35 | mock_response = {
36 | "status_code": 200,
37 | "body": {
38 | "resources": [
39 | {
40 | "cve_id": "CVE-2023-12345",
41 | "status": "open",
42 | "severity": "HIGH",
43 | "cvss_base_score": 8.5,
44 | "created_timestamp": "2023-08-01T12:00:00Z",
45 | "updated_timestamp": "2023-08-02T14:30:00Z",
46 | "host_info": {
47 | "hostname": "test-server",
48 | "os_version": "Ubuntu 22.04"
49 | }
50 | }
51 | ]
52 | },
53 | }
54 | self.mock_client.command.return_value = mock_response
55 |
56 | # Call search_vulnerabilities with test parameters
57 | result = self.module.search_vulnerabilities(filter="status:'open'")
58 |
59 | # Verify client command was called correctly
60 | self.assertEqual(self.mock_client.command.call_count, 1)
61 | call_args = self.mock_client.command.call_args
62 | self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
63 |
64 | # Check that the parameters dictionary contains the expected filter
65 | params = call_args[1]["parameters"]
66 | self.assertEqual(params["filter"], "status:'open'")
67 |
68 | # Verify result contains expected values
69 | self.assertEqual(len(result), 1)
70 | self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
71 | self.assertEqual(result[0]["severity"], "HIGH")
72 | self.assertEqual(result[0]["status"], "open")
73 | self.assertEqual(result[0]["cvss_base_score"], 8.5)
74 |
75 | def test_search_vulnerabilities_no_filter(self):
76 | """Test searching vulnerabilities with no filter parameter."""
77 | # Setup mock response with sample vulnerability data
78 | mock_response = {
79 | "status_code": 200,
80 | "body": {
81 | "resources": [
82 | {
83 | "cve_id": "CVE-2023-12345",
84 | "status": "open",
85 | "severity": "HIGH"
86 | }
87 | ]
88 | },
89 | }
90 | self.mock_client.command.return_value = mock_response
91 |
92 | # Call search_vulnerabilities with no filter
93 | result = self.module.search_vulnerabilities()
94 |
95 | # Verify client command was called with the correct operation
96 | self.assertEqual(self.mock_client.command.call_count, 1)
97 | call_args = self.mock_client.command.call_args
98 | self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
99 |
100 | # Verify result contains expected values
101 | self.assertEqual(len(result), 1)
102 | self.assertEqual(result[0]["cve_id"], "CVE-2023-12345")
103 |
104 | def test_search_vulnerabilities_empty_response(self):
105 | """Test searching vulnerabilities with empty response."""
106 | # Setup mock response with empty resources
107 | mock_response = {"status_code": 200, "body": {"resources": []}}
108 | self.mock_client.command.return_value = mock_response
109 |
110 | # Call search_vulnerabilities
111 | result = self.module.search_vulnerabilities(filter="status:'closed'")
112 |
113 | # Verify client command was called with the correct operation
114 | self.assertEqual(self.mock_client.command.call_count, 1)
115 | call_args = self.mock_client.command.call_args
116 | self.assertEqual(call_args[0][0], "combinedQueryVulnerabilities")
117 |
118 | # Verify result is an empty list
119 | self.assertEqual(result, [])
120 |
121 | def test_search_vulnerabilities_error(self):
122 | """Test searching vulnerabilities with API error."""
123 | # Setup mock response with error
124 | mock_response = {
125 | "status_code": 400,
126 | "body": {"errors": [{"message": "Invalid query"}]},
127 | }
128 | self.mock_client.command.return_value = mock_response
129 |
130 | # Call search_vulnerabilities
131 | results = self.module.search_vulnerabilities(filter="invalid query")
132 | result = results[0]
133 |
134 | # Verify result contains error
135 | self.assertIn("error", result)
136 | self.assertIn("details", result)
137 | # Check that the error message starts with the expected prefix
138 | self.assertTrue(result["error"].startswith("Failed to search vulnerabilities"))
139 |
140 |
141 | if __name__ == "__main__":
142 | unittest.main()
143 |
```
--------------------------------------------------------------------------------
/tests/modules/test_sensor_usage.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Sensor Usage module.
3 | """
4 |
5 | import unittest
6 |
7 | from falcon_mcp.modules.sensor_usage import SensorUsageModule
8 | from tests.modules.utils.test_modules import TestModules
9 |
10 |
11 | class TestSensorUsageModule(TestModules):
12 | """Test cases for the Sensor Usage module."""
13 |
14 | def setUp(self):
15 | """Set up test fixtures."""
16 | self.setup_module(SensorUsageModule)
17 |
18 | def test_register_tools(self):
19 | """Test registering tools with the server."""
20 | expected_tools = [
21 | "falcon_search_sensor_usage",
22 | ]
23 | self.assert_tools_registered(expected_tools)
24 |
25 | def test_register_resources(self):
26 | """Test registering resources with the server."""
27 | expected_resources = [
28 | "falcon_search_sensor_usage_fql_guide",
29 | ]
30 | self.assert_resources_registered(expected_resources)
31 |
32 | def test_search_sensor_usage_success(self):
33 | """Test searching sensor usage with successful response."""
34 | # Setup mock response with sample sensor usage data
35 | mock_response = {
36 | "status_code": 200,
37 | "body": {
38 | "resources": [
39 | {
40 | "containers": 42.5,
41 | "public_cloud_with_containers": 42,
42 | "public_cloud_without_containers": 42.75,
43 | "servers_with_containers": 42.25,
44 | "servers_without_containers": 42.75,
45 | "workstations": 42.75,
46 | "mobile": 42.75,
47 | "lumos": 42.25,
48 | "chrome_os": 0,
49 | "date": "2025-08-02"
50 | }
51 | ]
52 | },
53 | }
54 | self.mock_client.command.return_value = mock_response
55 |
56 | # Call search_sensor_usage with test parameters
57 | result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
58 |
59 | # Verify client command was called correctly
60 | self.mock_client.command.assert_called_once_with(
61 | "GetSensorUsageWeekly",
62 | parameters={
63 | "filter": "event_date:'2025-08-02'",
64 | },
65 | )
66 |
67 | # Verify result contains expected values
68 | self.assertEqual(len(result), 1)
69 | self.assertEqual(result[0]["date"], "2025-08-02")
70 | self.assertEqual(result[0]["containers"], 42.5)
71 | self.assertEqual(result[0]["workstations"], 42.75)
72 | self.assertEqual(result[0]["mobile"], 42.75)
73 |
74 | def test_search_sensor_usage_no_filter(self):
75 | """Test searching sensor usage with no filter parameter."""
76 | # Setup mock response with sample sensor usage data
77 | mock_response = {
78 | "status_code": 200,
79 | "body": {
80 | "resources": [
81 | {
82 | "containers": 42.5,
83 | "public_cloud_with_containers": 42,
84 | "public_cloud_without_containers": 42.75,
85 | "servers_with_containers": 42.25,
86 | "servers_without_containers": 42.75,
87 | "workstations": 42.75,
88 | "mobile": 42.75,
89 | "lumos": 42.25,
90 | "chrome_os": 0,
91 | "date": "2025-08-02"
92 | }
93 | ]
94 | },
95 | }
96 | self.mock_client.command.return_value = mock_response
97 |
98 | # Call search_sensor_usage with no filter
99 | result = self.module.search_sensor_usage()
100 |
101 | # Verify client command was called with the correct operation
102 | self.assertEqual(self.mock_client.command.call_count, 1)
103 | call_args = self.mock_client.command.call_args
104 | self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
105 |
106 | # Verify result contains expected values
107 | self.assertEqual(len(result), 1)
108 | self.assertEqual(result[0]["date"], "2025-08-02")
109 |
110 | def test_search_sensor_usage_empty_response(self):
111 | """Test searching sensor usage with empty response."""
112 | # Setup mock response with empty resources
113 | mock_response = {"status_code": 200, "body": {"resources": []}}
114 | self.mock_client.command.return_value = mock_response
115 |
116 | # Call search_sensor_usage
117 | result = self.module.search_sensor_usage(filter="event_date:'2025-08-02'")
118 |
119 | # Verify client command was called with the correct operation
120 | self.assertEqual(self.mock_client.command.call_count, 1)
121 | call_args = self.mock_client.command.call_args
122 | self.assertEqual(call_args[0][0], "GetSensorUsageWeekly")
123 |
124 | # Verify result is an empty list
125 | self.assertEqual(result, [])
126 |
127 | def test_search_sensor_usage_error(self):
128 | """Test searching sensor usage with API error."""
129 | # Setup mock response with error
130 | mock_response = {
131 | "status_code": 400,
132 | "body": {"errors": [{"message": "Invalid query"}]},
133 | }
134 | self.mock_client.command.return_value = mock_response
135 |
136 | # Call search_sensor_usage
137 | results = self.module.search_sensor_usage(filter="invalid query")
138 | result = results[0]
139 |
140 | # Verify result contains error
141 | self.assertIn("error", result)
142 | self.assertIn("details", result)
143 | # Check that the error message starts with the expected prefix
144 | self.assertTrue(result["error"].startswith("Failed to search sensor usage"))
145 |
146 |
147 | if __name__ == "__main__":
148 | unittest.main()
149 |
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/serverless.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Serverless Vulnerabilities module for Falcon MCP Server
3 |
4 | This module provides tools for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities.
5 | """
6 |
7 | from textwrap import dedent
8 | from typing import Any, Dict, List
9 |
10 | from mcp.server import FastMCP
11 | from mcp.server.fastmcp.resources import TextResource
12 | from pydantic import AnyUrl, Field
13 |
14 | from falcon_mcp.common.errors import handle_api_response
15 | from falcon_mcp.common.logging import get_logger
16 | from falcon_mcp.common.utils import prepare_api_parameters
17 | from falcon_mcp.modules.base import BaseModule
18 | from falcon_mcp.resources.serverless import SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION
19 |
20 | logger = get_logger(__name__)
21 |
22 |
23 | class ServerlessModule(BaseModule):
24 | """Module for accessing and managing CrowdStrike Falcon Serverless Vulnerabilities."""
25 |
26 | def register_tools(self, server: FastMCP) -> None:
27 | """Register tools with the MCP server.
28 |
29 | Args:
30 | server: MCP server instance
31 | """
32 | # Register tools
33 | self._add_tool(
34 | server=server,
35 | method=self.search_serverless_vulnerabilities,
36 | name="search_serverless_vulnerabilities",
37 | )
38 |
39 | def register_resources(self, server: FastMCP) -> None:
40 | """Register resources with the MCP server.
41 |
42 | Args:
43 | server: MCP server instance
44 | """
45 | serverless_vulnerabilities_fql_resource = TextResource(
46 | uri=AnyUrl("falcon://serverless/vulnerabilities/fql-guide"),
47 | name="falcon_serverless_vulnerabilities_fql_guide",
48 | description="Contains the guide for the `filter` param of the `falcon_search_serverless_vulnerabilities` tool.",
49 | text=SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION,
50 | )
51 |
52 | self._add_resource(
53 | server,
54 | serverless_vulnerabilities_fql_resource,
55 | )
56 |
57 | def search_serverless_vulnerabilities(
58 | self,
59 | filter: str = Field(
60 | description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://serverless/vulnerabilities/fql-guide` resource when building this filter parameter.",
61 | examples={"cloud_provider:'aws'", "severity:'HIGH'"},
62 | ),
63 | limit: int | None = Field(
64 | default=10,
65 | ge=1,
66 | description="The upper-bound on the number of records to retrieve. (Default: 10)",
67 | ),
68 | offset: int | None = Field(
69 | default=0,
70 | description="The offset from where to begin.",
71 | ),
72 | sort: str | None = Field(
73 | default=None,
74 | description=dedent("""
75 | Sort serverless vulnerabilities using FQL syntax.
76 |
77 | Supported sorting fields:
78 | • application_name: Name of the application
79 | • application_name_version: Version of the application
80 | • cid: Customer ID
81 | • cloud_account_id: Cloud account ID
82 | • cloud_account_name: Cloud account name
83 | • cloud_provider: Cloud provider
84 | • cve_id: CVE ID
85 | • cvss_base_score: CVSS base score
86 | • exprt_rating: ExPRT rating
87 | • first_seen_timestamp: When the vulnerability was first seen
88 | • function_resource_id: Function resource ID
89 | • is_supported: Whether the function is supported
90 | • layer: Layer where the vulnerability was found
91 | • region: Cloud region
92 | • runtime: Runtime environment
93 | • severity: Severity level
94 | • timestamp: When the vulnerability was last updated
95 | • type: Type of vulnerability
96 |
97 | Format: 'field'
98 |
99 | Examples: 'severity', 'cloud_provider', 'first_seen_timestamp'
100 | """).strip(),
101 | examples={
102 | "severity",
103 | "cloud_provider",
104 | "first_seen_timestamp",
105 | },
106 | ),
107 | ) -> List[Dict[str, Any]]:
108 | """Search for vulnerabilities in your serverless functions across all cloud service providers.
109 |
110 | This endpoint provides security information in SARIF format, including:
111 | - CVE IDs for identified vulnerabilities
112 | - Severity levels
113 | - Vulnerability descriptions
114 | - Additional relevant details
115 |
116 | IMPORTANT: You must use the `falcon://serverless/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
117 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_serverless_vulnerabilities` tool.
118 | """
119 | # Prepare parameters for GetCombinedVulnerabilitiesSARIF
120 | params = prepare_api_parameters(
121 | {
122 | "filter": filter,
123 | "limit": limit,
124 | "offset": offset,
125 | "sort": sort,
126 | }
127 | )
128 |
129 | # Define the operation name
130 | operation = "GetCombinedVulnerabilitiesSARIF"
131 |
132 | logger.debug("Searching serverless vulnerabilities with params: %s", params)
133 |
134 | # Make the API request
135 | response = self.client.command(operation, parameters=params)
136 |
137 | # Use handle_api_response to get vulnerability data
138 | vulnerabilities = handle_api_response(
139 | response,
140 | operation=operation,
141 | error_message="Failed to search serverless vulnerabilities",
142 | )
143 |
144 | # If handle_api_response returns an error dict instead of a list,
145 | # it means there was an error, so we return it wrapped in a list
146 | if self._is_error(vulnerabilities):
147 | return [vulnerabilities]
148 |
149 | return vulnerabilities.get("runs") or []
150 |
```
--------------------------------------------------------------------------------
/scripts/generate_e2e_report.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Generate a static HTML report from test result data
3 | """
4 |
5 | import json
6 | import re
7 | import sys
8 | from html import escape
9 |
10 |
11 | def generate_static_report(
12 | data,
13 | template_path="scripts/test_results_viewer.html",
14 | output_path="static_test_report.html",
15 | ):
16 | """
17 | Generates a static HTML report from test result data.
18 |
19 | Args:
20 | data (list): A list of test result dictionaries.
21 | template_path (str): The path to the HTML template file to extract styles from.
22 | output_path (str): The path to write the final static HTML file.
23 | """
24 | try:
25 | with open(template_path, "r", encoding="utf-8") as f:
26 | html_template = f.read()
27 | style_content = re.search(
28 | r"<style>(.*?)</style>", html_template, re.DOTALL
29 | ).group(1)
30 | except (FileNotFoundError, AttributeError):
31 | print(
32 | f"Warning: Could not read styles from {template_path}. Using default styles."
33 | )
34 | style_content = "body { font-family: sans-serif; } /* Basic fallback styles */"
35 |
36 | # --- Group and process data ---
37 | total_runs = len(data)
38 | successful_runs = sum(1 for run in data if run.get("status") == "success")
39 | success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0
40 |
41 | # Group first by module, then by test name
42 | grouped_by_module = {}
43 | for run in data:
44 | module_name = run.get("module_name", "Unknown Module")
45 | test_name = run.get("test_name", "Unnamed Test")
46 |
47 | if module_name not in grouped_by_module:
48 | grouped_by_module[module_name] = {}
49 |
50 | if test_name not in grouped_by_module[module_name]:
51 | grouped_by_module[module_name][test_name] = []
52 |
53 | grouped_by_module[module_name][test_name].append(run)
54 |
55 | # Further group by model within each test
56 | for module_name, tests in grouped_by_module.items():
57 | for test_name, runs in tests.items():
58 | grouped_by_model = {}
59 | for run in runs:
60 | model_name = run.get("model_name", "Unnamed Model")
61 | grouped_by_model.setdefault(model_name, []).append(run)
62 | grouped_by_module[module_name][test_name] = grouped_by_model
63 |
64 | # --- Build HTML body content ---
65 | body_content = f"""
66 | <h1>MCP E2E Static Test Report</h1>
67 | <div id="summary">
68 | <h2>Summary</h2>
69 | <p>Total Tests Run: <span>{total_runs}</span></p>
70 | <p>Success Rate: <span>{success_rate:.2f}%</span></p>
71 | </div>
72 | <div id="results-container">
73 | """
74 |
75 | for module_name, tests in sorted(grouped_by_module.items()):
76 | body_content += f'<div class="module-group"><h2>{escape(module_name)}</h2>'
77 | for test_name, models in sorted(tests.items()):
78 | body_content += f'<div class="test-group"><h3>{escape(test_name)}</h3>'
79 | for model_name, runs in sorted(models.items()):
80 | body_content += (
81 | f'<div class="model-group"><h4>{escape(model_name)}</h4>'
82 | )
83 | body_content += '<div class="run-grid">'
84 | for run in sorted(runs, key=lambda x: x.get("run_number", 0)):
85 | status_class = escape(run.get("status", "unknown"))
86 | run_html = f"""
87 | <div class="test-run {status_class}">
88 | <h5>Run {run.get("run_number", "#")} - {status_class.upper()}</h5>
89 | """
90 | if status_class == "failure" and run.get("failure_reason"):
91 | reason = escape(run["failure_reason"])
92 | run_html += f'<p><strong>Failure Reason:</strong></p><pre class="failure-reason"><code>{reason}</code></pre>'
93 |
94 | agent_result = escape(
95 | run.get("agent_result", "No result") or "No result"
96 | )
97 | run_html += f"""
98 | <details>
99 | <summary>Agent Result</summary>
100 | <div class="agent-result"><pre><code>{agent_result}</code></pre></div>
101 | </details>
102 | """
103 |
104 | if run.get("tools_used"):
105 | tools_json = escape(json.dumps(run["tools_used"], indent=2))
106 | run_html += f"""
107 | <details>
108 | <summary>Tools Used ({len(run["tools_used"])})</summary>
109 | <div class="tools-content"><pre><code>{tools_json}</code></pre></div>
110 | </details>
111 | """
112 | else:
113 | run_html += "<p>No tools were used.</p>"
114 |
115 | run_html += "</div>"
116 | body_content += run_html
117 | body_content += "</div></div>"
118 | body_content += "</div>"
119 | body_content += "</div>"
120 | body_content += "</div>"
121 |
122 | # --- Assemble the final HTML ---
123 | full_html = f"""
124 | <!DOCTYPE html>
125 | <html lang="en">
126 | <head>
127 | <meta charset="UTF-8">
128 | <meta name="viewport" content="width=device-width, initial-scale=1.0">
129 | <title>Static Test Results</title>
130 | <style>{style_content}</style>
131 | </head>
132 | <body>
133 | {body_content}
134 | </body>
135 | </html>
136 | """
137 | with open(output_path, "w", encoding="utf-8") as f:
138 | f.write(full_html)
139 | print(f"Successfully generated static report: {output_path}")
140 |
141 |
142 | if __name__ == "__main__":
143 | test_results_path = sys.argv[1] if len(sys.argv) > 1 else "test_results.json"
144 | try:
145 | with open(test_results_path, "r", encoding="utf-8") as f:
146 | test_data = json.load(f)
147 | generate_static_report(test_data)
148 | except FileNotFoundError:
149 | print("Error: test_results.json not found. Please run the tests first.")
150 | except json.JSONDecodeError:
151 | print(
152 | "Error: Could not parse test_results.json. The file might be corrupted or empty."
153 | )
154 |
```
--------------------------------------------------------------------------------
/tests/common/test_errors.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the error handling utilities.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import patch
7 |
8 | from falcon_mcp.common.api_scopes import API_SCOPE_REQUIREMENTS, get_required_scopes
9 | from falcon_mcp.common.errors import (
10 | APIError,
11 | AuthenticationError,
12 | FalconError,
13 | _format_error_response,
14 | handle_api_response,
15 | is_success_response,
16 | )
17 |
18 |
19 | class TestErrorClasses(unittest.TestCase):
20 | """Test cases for the error classes."""
21 |
22 | def test_falcon_error(self):
23 | """Test FalconError class."""
24 | error = FalconError("Test error")
25 | self.assertEqual(str(error), "Test error")
26 |
27 | def test_authentication_error(self):
28 | """Test AuthenticationError class."""
29 | error = AuthenticationError("Authentication failed")
30 | self.assertEqual(str(error), "Authentication failed")
31 | self.assertIsInstance(error, FalconError)
32 |
33 | def test_api_error(self):
34 | """Test APIError class."""
35 | error = APIError(
36 | "API request failed",
37 | status_code=403,
38 | body={"errors": [{"message": "Access denied"}]},
39 | operation="TestOperation",
40 | )
41 | self.assertEqual(str(error), "API request failed")
42 | self.assertEqual(error.status_code, 403)
43 | self.assertEqual(error.body, {"errors": [{"message": "Access denied"}]})
44 | self.assertEqual(error.operation, "TestOperation")
45 | self.assertIsInstance(error, FalconError)
46 |
47 |
48 | class TestErrorUtils(unittest.TestCase):
49 | """Test cases for the error utility functions."""
50 |
51 | def test_is_success_response(self):
52 | """Test is_success_response function."""
53 | # Success response
54 | self.assertTrue(is_success_response({"status_code": 200}))
55 |
56 | # Error responses
57 | self.assertFalse(is_success_response({"status_code": 400}))
58 | self.assertFalse(is_success_response({"status_code": 403}))
59 | self.assertFalse(is_success_response({"status_code": 500}))
60 | self.assertFalse(is_success_response({})) # Missing status_code
61 |
62 | def test_get_required_scopes(self):
63 | """Test get_required_scopes function."""
64 | # Known operation
65 | self.assertEqual(get_required_scopes("GetQueriesAlertsV2"), ["Alerts:read"])
66 |
67 | # Unknown operation
68 | self.assertEqual(get_required_scopes("UnknownOperation"), [])
69 |
70 | @patch("falcon_mcp.common.errors.logger")
71 | def test_format_error_response(self, mock_logger):
72 | """Test format_error_response function."""
73 | # Basic error
74 | response = _format_error_response("Test error")
75 | self.assertEqual(response, {"error": "Test error"})
76 | mock_logger.error.assert_called_with("Error: %s", "Test error")
77 |
78 | # Error with details
79 | details = {"status_code": 400, "body": {"errors": [{"message": "Bad request"}]}}
80 | response = _format_error_response("Test error", details=details)
81 | self.assertEqual(response["error"], "Test error")
82 | self.assertEqual(response["details"], details)
83 |
84 | # Permission error with operation
85 | details = {
86 | "status_code": 403,
87 | "body": {"errors": [{"message": "Access denied"}]},
88 | }
89 | response = _format_error_response(
90 | "Permission denied", details=details, operation="GetQueriesAlertsV2"
91 | )
92 | self.assertEqual(response["error"], "Permission denied")
93 | self.assertEqual(response["details"], details)
94 | self.assertEqual(response["required_scopes"], ["Alerts:read"])
95 | self.assertIn("resolution", response)
96 | self.assertIn("Alerts:read", response["resolution"])
97 |
98 | def test_handle_api_response_success(self):
99 | """Test handle_api_response function with success response."""
100 | # Success response with resources
101 | response = {
102 | "status_code": 200,
103 | "body": {"resources": [{"id": "test", "name": "Test Resource"}]},
104 | }
105 | result = handle_api_response(response, "TestOperation")
106 | self.assertEqual(result, [{"id": "test", "name": "Test Resource"}])
107 |
108 | # Success response with empty resources
109 | response = {"status_code": 200, "body": {"resources": []}}
110 | result = handle_api_response(response, "TestOperation")
111 | self.assertEqual(result, [])
112 |
113 | # Success response with empty resources and default
114 | response = {"status_code": 200, "body": {"resources": []}}
115 | result = handle_api_response(
116 | response, "TestOperation", default_result={"default": True}
117 | )
118 | self.assertEqual(result, {"default": True})
119 |
120 | def test_handle_api_response_error(self):
121 | """Test handle_api_response function with error response."""
122 | # Error response
123 | response = {
124 | "status_code": 400,
125 | "body": {"errors": [{"message": "Bad request"}]},
126 | }
127 | result = handle_api_response(
128 | response,
129 | "TestOperation",
130 | error_message="Test failed",
131 | )
132 | self.assertIn("error", result)
133 | self.assertIn("Test failed", result["error"])
134 | self.assertEqual(result["details"], response)
135 |
136 | # Permission error
137 | response = {
138 | "status_code": 403,
139 | "body": {"errors": [{"message": "Access denied"}]},
140 | }
141 | # Add a test operation to API_SCOPE_REQUIREMENTS
142 | original_scopes = API_SCOPE_REQUIREMENTS.copy()
143 | API_SCOPE_REQUIREMENTS["TestOperation"] = ["test:read"]
144 |
145 | try:
146 | result = handle_api_response(
147 | response,
148 | "TestOperation",
149 | error_message="Permission denied",
150 | )
151 | self.assertIn("error", result)
152 | self.assertIn("Permission denied", result["error"])
153 | self.assertIn("Required scopes: test:read", result["error"])
154 | self.assertEqual(result["details"], response)
155 | finally:
156 | # Restore original API_SCOPE_REQUIREMENTS
157 | API_SCOPE_REQUIREMENTS.clear()
158 | API_SCOPE_REQUIREMENTS.update(original_scopes)
159 |
160 |
161 | if __name__ == "__main__":
162 | unittest.main()
163 |
```
--------------------------------------------------------------------------------
/falcon_mcp/common/utils.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Common utility functions for Falcon MCP Server
3 |
4 | This module provides common utility functions for the Falcon MCP server.
5 | """
6 |
7 | import re
8 | from typing import Any, Dict, List, Optional, Tuple
9 |
10 | from .errors import _format_error_response, is_success_response
11 | from .logging import get_logger
12 |
13 | logger = get_logger(__name__)
14 |
15 |
16 | def filter_none_values(data: Dict[str, Any]) -> Dict[str, Any]:
17 | """Remove None values from a dictionary.
18 |
19 | Args:
20 | data: Dictionary to filter
21 |
22 | Returns:
23 | Dict[str, Any]: Filtered dictionary
24 | """
25 | return {k: v for k, v in data.items() if v is not None}
26 |
27 |
28 | def prepare_api_parameters(params: Dict[str, Any]) -> Dict[str, Any]:
29 | """Prepare parameters for Falcon API requests.
30 |
31 | Args:
32 | params: Raw parameters
33 |
34 | Returns:
35 | Dict[str, Any]: Prepared parameters
36 | """
37 | # Remove None values
38 | filtered = filter_none_values(params)
39 |
40 | # Handle special parameter formatting if needed
41 | if "filter" in filtered and isinstance(filtered["filter"], dict):
42 | # Convert filter dict to FQL string if needed
43 | pass
44 |
45 | return filtered
46 |
47 |
48 | def extract_resources(
49 | response: Dict[str, Any],
50 | default: Optional[List[Dict[str, Any]]] = None,
51 | ) -> List[Dict[str, Any]]:
52 | """Extract resources from an API response.
53 |
54 | Args:
55 | response: API response dictionary
56 | default: Default value if no resources are found
57 |
58 | Returns:
59 | List[Dict[str, Any]]: Extracted resources
60 | """
61 | if not is_success_response(response):
62 | return default if default is not None else []
63 |
64 | resources = response.get("body", {}).get("resources", [])
65 | return resources if resources else (default if default is not None else [])
66 |
67 |
68 | def extract_first_resource(
69 | response: Dict[str, Any],
70 | operation: str,
71 | not_found_error: str = "Resource not found",
72 | ) -> Dict[str, Any]:
73 | """Extract the first resource from an API response.
74 |
75 | Args:
76 | response: API response dictionary
77 | operation: The API operation that was performed
78 | not_found_error: Error message if no resources are found
79 |
80 | Returns:
81 | Dict[str, Any]: First resource or error response
82 | """
83 | resources = extract_resources(response)
84 |
85 | if not resources:
86 | return _format_error_response(not_found_error, operation=operation)
87 |
88 | return resources[0]
89 |
90 |
91 | def sanitize_input(input_str: str) -> str:
92 | """Sanitize input string.
93 |
94 | Args:
95 | input_str: Input string to sanitize
96 |
97 | Returns:
98 | Sanitized string with dangerous characters removed
99 | """
100 | if not isinstance(input_str, str):
101 | return str(input_str)
102 |
103 | # Remove backslashes, quotes, and control characters that could be used for injection
104 | sanitized = re.sub(r'[\\"\'\n\r\t]', "", input_str)
105 |
106 | # Additional safety: limit length to prevent excessively long inputs
107 | return sanitized[:255]
108 |
109 |
110 | def generate_md_table(data: List[Tuple]) -> str:
111 | """Generate a Markdown table from a list of tuples.
112 |
113 | This function creates a compact Markdown table with the provided data.
114 | It's designed to minimize token usage while maintaining readability.
115 | The first row of data is used as the header row.
116 |
117 | Args:
118 | data: List of tuples where the first tuple contains the headers
119 | and the remaining tuples contain the table data
120 |
121 | Returns:
122 | str: Formatted Markdown table as a string
123 |
124 | Raises:
125 | TypeError: If the first row (headers) contains non-string values
126 | TypeError: If there are not at least 2 items (header and a value row)
127 | ValueError: If the header row is empty
128 | ValueError: If a row has more items than headers
129 | """
130 | if not data or len(data) < 2:
131 | raise TypeError("Need at least 2 items. The header and a value row")
132 |
133 | # Extract headers from the first row
134 | headers = data[0]
135 |
136 | # Check that the header row is not empty
137 | if len(headers) == 0:
138 | raise ValueError("Header row cannot be empty")
139 |
140 | # Check that all headers are strings
141 | for header in headers:
142 | if not isinstance(header, str):
143 | raise TypeError(f"Header values must be strings, got {type(header).__name__}")
144 |
145 | # Use the remaining rows as data
146 | rows = data[1:]
147 |
148 | # Create the table header, stripping spaces from header values
149 | header_parts = []
150 | for h in headers:
151 | # Strip spaces from header values
152 | header_parts.append(str(h).strip())
153 |
154 | header_row = "|" + "|".join(header_parts) + "|"
155 |
156 | # Create the separator row with the exact expected format
157 | separator = "|-" * len(headers) + "|"
158 |
159 | # Build the table
160 | table = [header_row, separator]
161 |
162 | for idx, row in enumerate(rows):
163 | # Check if row has more items than headers
164 | if len(row) > len(headers):
165 | raise ValueError(f"Row {idx+1} has {len(row)} items, which is more than the {len(headers)} headers")
166 |
167 | # Convert row values to strings and handle special cases
168 | row_values = []
169 | for i, value in enumerate(row):
170 | if i < len(headers):
171 | if value is None:
172 | row_values.append("")
173 | elif isinstance(value, bool):
174 | row_values.append(str(value).lower())
175 | elif isinstance(value, (int, float)):
176 | row_values.append(str(value))
177 | else:
178 | # Process multi-line text to create a clean, single-line representation
179 |
180 | text = str(value)
181 | # Split text into lines, strip whitespace, and filter out empty lines
182 | non_empty_lines = [line.strip() for line in text.split('\n') if line.strip()]
183 | # Join the non-empty lines with a single space
184 | formatted_text = " ".join(non_empty_lines).strip()
185 | row_values.append(formatted_text)
186 |
187 | # Pad the row if it's shorter than headers
188 | while len(row_values) < len(headers):
189 | row_values.append("")
190 |
191 | # Add the row to the table
192 | table.append("|" + "|".join(row_values) + "|")
193 |
194 | return "\n".join(table)
195 |
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/spotlight.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Spotlight module for Falcon MCP Server
3 |
4 | This module provides tools for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities.
5 | """
6 |
7 | from textwrap import dedent
8 | from typing import Any, Dict, List
9 |
10 | from mcp.server import FastMCP
11 | from mcp.server.fastmcp.resources import TextResource
12 | from pydantic import AnyUrl, Field
13 |
14 | from falcon_mcp.common.errors import handle_api_response
15 | from falcon_mcp.common.logging import get_logger
16 | from falcon_mcp.common.utils import prepare_api_parameters
17 | from falcon_mcp.modules.base import BaseModule
18 | from falcon_mcp.resources.spotlight import SEARCH_VULNERABILITIES_FQL_DOCUMENTATION
19 |
20 | logger = get_logger(__name__)
21 |
22 |
23 | class SpotlightModule(BaseModule):
24 | """Module for accessing and managing CrowdStrike Falcon Spotlight vulnerabilities."""
25 |
26 | def register_tools(self, server: FastMCP) -> None:
27 | """Register tools with the MCP server.
28 |
29 | Args:
30 | server: MCP server instance
31 | """
32 | # Register tools
33 | self._add_tool(
34 | server=server,
35 | method=self.search_vulnerabilities,
36 | name="search_vulnerabilities",
37 | )
38 |
39 | def register_resources(self, server: FastMCP) -> None:
40 | """Register resources with the MCP server.
41 |
42 | Args:
43 | server: MCP server instance
44 | """
45 | search_vulnerabilities_fql_resource = TextResource(
46 | uri=AnyUrl("falcon://spotlight/vulnerabilities/fql-guide"),
47 | name="falcon_search_vulnerabilities_fql_guide",
48 | description="Contains the guide for the `filter` param of the `falcon_search_vulnerabilities` tool.",
49 | text=SEARCH_VULNERABILITIES_FQL_DOCUMENTATION,
50 | )
51 |
52 | self._add_resource(
53 | server,
54 | search_vulnerabilities_fql_resource,
55 | )
56 |
57 | def search_vulnerabilities(
58 | self,
59 | filter: str | None = Field(
60 | default=None,
61 | description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://spotlight/vulnerabilities/fql-guide` resource when building this filter parameter.",
62 | examples={"status:'open'", "cve.severity:'HIGH'"},
63 | ),
64 | limit: int = Field(
65 | default=10,
66 | ge=1,
67 | le=5000,
68 | description="Maximum number of results to return. (Max: 5000, Default: 10)",
69 | ),
70 | offset: int | None = Field(
71 | default=None,
72 | description="Starting index of overall result set from which to return results.",
73 | ),
74 | sort: str | None = Field(
75 | default=None,
76 | description=dedent("""
77 | Sort vulnerabilities using FQL syntax.
78 |
79 | Supported sorting fields:
80 | • created_timestamp: When the vulnerability was found
81 | • closed_timestamp: When the vulnerability was closed
82 | • updated_timestamp: When the vulnerability was last updated
83 |
84 | Sort either asc (ascending) or desc (descending).
85 | Format: 'field|direction'
86 |
87 | Examples: 'created_timestamp|desc', 'updated_timestamp|desc', 'closed_timestamp|asc'
88 | """).strip(),
89 | examples={
90 | "created_timestamp|desc",
91 | "updated_timestamp|desc",
92 | "closed_timestamp|asc",
93 | },
94 | ),
95 | after: str | None = Field(
96 | default=None,
97 | description="A pagination token used with the limit parameter to manage pagination of results. On your first request, don't provide an after token. On subsequent requests, provide the after token from the previous response to continue from that place in the results.",
98 | ),
99 | facet: str | None = Field(
100 | default=None,
101 | description=dedent("""
102 | Important: Use only one value!
103 |
104 | Select various detail blocks to be returned for each vulnerability.
105 |
106 | Supported values:
107 | • host_info: Include host/asset information and context
108 | • remediation: Include remediation and fix information
109 | • cve: Include CVE details, scoring, and metadata
110 | • evaluation_logic: Include vulnerability assessment methodology
111 |
112 | Use host_info when you need asset context, remediation for fix information,
113 | cve for detailed vulnerability scoring, and evaluation_logic for assessment details.
114 |
115 | Examples: 'host_info', 'cve', 'remediation'
116 | """).strip(),
117 | examples={"host_info", "cve", "remediation", "evaluation_logic"},
118 | ),
119 | ) -> List[Dict[str, Any]]:
120 | """Search for vulnerabilities in your CrowdStrike environment.
121 |
122 | IMPORTANT: You must use the `falcon://spotlight/vulnerabilities/fql-guide` resource when you need to use the `filter` parameter.
123 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_vulnerabilities` tool.
124 | """
125 | # Prepare parameters for combinedQueryVulnerabilities
126 | params = prepare_api_parameters(
127 | {
128 | "filter": filter,
129 | "limit": limit,
130 | "offset": offset,
131 | "sort": sort,
132 | "after": after,
133 | "facet": facet,
134 | }
135 | )
136 |
137 | # Define the operation name
138 | operation = "combinedQueryVulnerabilities"
139 |
140 | logger.debug("Searching vulnerabilities with params: %s", params)
141 |
142 | # Make the API request
143 | response = self.client.command(operation, parameters=params)
144 |
145 | # Use handle_api_response to get vulnerability data
146 | vulnerabilities = handle_api_response(
147 | response,
148 | operation=operation,
149 | error_message="Failed to search vulnerabilities",
150 | default_result=[],
151 | )
152 |
153 | # If handle_api_response returns an error dict instead of a list,
154 | # it means there was an error, so we return it wrapped in a list
155 | if self._is_error(vulnerabilities):
156 | return [vulnerabilities]
157 |
158 | return vulnerabilities
159 |
```
--------------------------------------------------------------------------------
/tests/test_streamable_http_transport.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for streamable-http transport functionality.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | from falcon_mcp.server import FalconMCPServer
9 |
10 |
11 | class TestStreamableHttpTransport(unittest.TestCase):
12 | """Test cases for streamable-http transport."""
13 |
14 | @patch("falcon_mcp.server.FalconClient")
15 | @patch("falcon_mcp.server.FastMCP")
16 | @patch("falcon_mcp.server.uvicorn")
17 | def test_streamable_http_transport_initialization(
18 | self,
19 | mock_uvicorn,
20 | mock_fastmcp,
21 | mock_client,
22 | ):
23 | """Test streamable-http transport initialization."""
24 | # Setup mocks
25 | mock_client_instance = MagicMock()
26 | mock_client_instance.authenticate.return_value = True
27 | mock_client.return_value = mock_client_instance
28 |
29 | mock_server_instance = MagicMock()
30 | mock_app = MagicMock()
31 | mock_server_instance.streamable_http_app.return_value = mock_app
32 | mock_fastmcp.return_value = mock_server_instance
33 |
34 | # Create server
35 | server = FalconMCPServer(debug=True)
36 |
37 | # Test streamable-http transport
38 | server.run("streamable-http", host="0.0.0.0", port=8080)
39 |
40 | # Verify uvicorn was called with correct parameters
41 | mock_uvicorn.run.assert_called_once_with(
42 | mock_app, host="0.0.0.0", port=8080, log_level="debug"
43 | )
44 |
45 | # Verify streamable_http_app was called
46 | mock_server_instance.streamable_http_app.assert_called_once()
47 |
48 | @patch("falcon_mcp.server.FalconClient")
49 | @patch("falcon_mcp.server.FastMCP")
50 | @patch("falcon_mcp.server.uvicorn")
51 | def test_streamable_http_default_parameters(
52 | self,
53 | mock_uvicorn,
54 | mock_fastmcp,
55 | mock_client,
56 | ):
57 | """Test streamable-http transport with default parameters."""
58 | # Setup mocks
59 | mock_client_instance = MagicMock()
60 | mock_client_instance.authenticate.return_value = True
61 | mock_client.return_value = mock_client_instance
62 |
63 | mock_server_instance = MagicMock()
64 | mock_app = MagicMock()
65 | mock_server_instance.streamable_http_app.return_value = mock_app
66 | mock_fastmcp.return_value = mock_server_instance
67 |
68 | # Create server
69 | server = FalconMCPServer(debug=False)
70 |
71 | # Test streamable-http transport with defaults
72 | server.run("streamable-http")
73 |
74 | # Verify uvicorn was called with default parameters
75 | mock_uvicorn.run.assert_called_once_with(
76 | mock_app,
77 | host="127.0.0.1",
78 | port=8000,
79 | log_level="info",
80 | )
81 |
82 | @patch("falcon_mcp.server.FalconClient")
83 | @patch("falcon_mcp.server.FastMCP")
84 | def test_non_streamable_http_transport_unchanged(
85 | self,
86 | mock_fastmcp,
87 | mock_client,
88 | ):
89 | """Test that non-streamable-http transports use the original method."""
90 | # Setup mocks
91 | mock_client_instance = MagicMock()
92 | mock_client_instance.authenticate.return_value = True
93 | mock_client.return_value = mock_client_instance
94 |
95 | mock_server_instance = MagicMock()
96 | mock_fastmcp.return_value = mock_server_instance
97 |
98 | # Create server
99 | server = FalconMCPServer()
100 |
101 | # Test stdio transport (should use original method)
102 | server.run("stdio")
103 |
104 | # Verify the original run method was called
105 | mock_server_instance.run.assert_called_once_with("stdio")
106 |
107 | # Verify streamable_http_app was NOT called
108 | mock_server_instance.streamable_http_app.assert_not_called()
109 |
110 | @patch("falcon_mcp.server.FalconClient")
111 | @patch("falcon_mcp.server.FastMCP")
112 | @patch("falcon_mcp.server.uvicorn")
113 | def test_streamable_http_custom_parameters(
114 | self,
115 | mock_uvicorn,
116 | mock_fastmcp,
117 | mock_client,
118 | ):
119 | """Test streamable-http transport with custom parameters."""
120 | # Setup mocks
121 | mock_client_instance = MagicMock()
122 | mock_client_instance.authenticate.return_value = True
123 | mock_client.return_value = mock_client_instance
124 |
125 | mock_server_instance = MagicMock()
126 | mock_app = MagicMock()
127 | mock_server_instance.streamable_http_app.return_value = mock_app
128 | mock_fastmcp.return_value = mock_server_instance
129 |
130 | # Create server
131 | server = FalconMCPServer(debug=True)
132 |
133 | # Test streamable-http transport with custom parameters
134 | server.run("streamable-http", host="192.168.1.100", port=9000)
135 |
136 | # Verify uvicorn was called with custom parameters
137 | mock_uvicorn.run.assert_called_once_with(
138 | mock_app,
139 | host="192.168.1.100",
140 | port=9000,
141 | log_level="debug",
142 | )
143 |
144 | @patch("falcon_mcp.server.FalconClient")
145 | @patch("falcon_mcp.server.FastMCP")
146 | @patch("falcon_mcp.server.uvicorn")
147 | def test_streamable_http_logging_levels(
148 | self,
149 | mock_uvicorn,
150 | mock_fastmcp,
151 | mock_client,
152 | ):
153 | """Test streamable-http transport logging level configuration."""
154 | # Setup mocks
155 | mock_client_instance = MagicMock()
156 | mock_client_instance.authenticate.return_value = True
157 | mock_client.return_value = mock_client_instance
158 |
159 | mock_server_instance = MagicMock()
160 | mock_app = MagicMock()
161 | mock_server_instance.streamable_http_app.return_value = mock_app
162 | mock_fastmcp.return_value = mock_server_instance
163 |
164 | # Test with debug=True
165 | server_debug = FalconMCPServer(debug=True)
166 | server_debug.run("streamable-http")
167 |
168 | # Verify debug log level
169 | mock_uvicorn.run.assert_called_with(
170 | mock_app,
171 | host="127.0.0.1",
172 | port=8000,
173 | log_level="debug",
174 | )
175 |
176 | # Reset mock
177 | mock_uvicorn.reset_mock()
178 |
179 | # Test with debug=False
180 | server_info = FalconMCPServer(debug=False)
181 | server_info.run("streamable-http")
182 |
183 | # Verify info log level
184 | mock_uvicorn.run.assert_called_with(
185 | mock_app,
186 | host="127.0.0.1",
187 | port=8000,
188 | log_level="info",
189 | )
190 |
191 |
192 | if __name__ == "__main__":
193 | unittest.main()
194 |
```
--------------------------------------------------------------------------------
/tests/modules/test_base.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Base module.
3 | """
4 |
5 | import unittest
6 |
7 | from falcon_mcp.modules.base import BaseModule
8 | from tests.modules.utils.test_modules import TestModules
9 |
10 |
11 | class ConcreteBaseModule(BaseModule):
12 | """Concrete implementation of BaseModule for testing."""
13 |
14 | def register_tools(self, server):
15 | """Implement abstract method."""
16 |
17 |
18 | class TestBaseModule(TestModules):
19 | """Test cases for the Base module."""
20 |
21 | def setUp(self):
22 | """Set up test fixtures."""
23 | self.setup_module(ConcreteBaseModule)
24 |
25 | def test_is_error_with_error_dict(self):
26 | """Test _is_error with a dictionary containing an error key."""
27 | response = {"error": "Something went wrong", "details": "Error details"}
28 | result = self.module._is_error(response)
29 | self.assertTrue(result)
30 |
31 | def test_is_error_with_non_error_dict(self):
32 | """Test _is_error with a dictionary not containing an error key."""
33 | response = {"status": "success", "data": "Some data"}
34 | result = self.module._is_error(response)
35 | self.assertFalse(result)
36 |
37 | def test_is_error_with_non_dict(self):
38 | """Test _is_error with a non-dictionary value."""
39 | # Test with a list
40 | response = ["item1", "item2"]
41 | result = self.module._is_error(response)
42 | self.assertFalse(result)
43 |
44 | # Test with a string
45 | response = "This is a string response"
46 | result = self.module._is_error(response)
47 | self.assertFalse(result)
48 |
49 | # Test with None
50 | response = None
51 | result = self.module._is_error(response)
52 | self.assertFalse(result)
53 |
54 | # Test with an integer
55 | response = 42
56 | result = self.module._is_error(response)
57 | self.assertFalse(result)
58 |
59 | def test_base_get_by_ids_default_behavior(self):
60 | """Test _base_get_by_ids with default parameters (backward compatibility)."""
61 | # Setup mock response
62 | mock_response = {
63 | "status_code": 200,
64 | "body": {
65 | "resources": [
66 | {"id": "test1", "name": "Test Item 1"},
67 | {"id": "test2", "name": "Test Item 2"},
68 | ]
69 | },
70 | }
71 | self.mock_client.command.return_value = mock_response
72 |
73 | # Call _base_get_by_ids with default parameters
74 | result = self.module._base_get_by_ids("TestOperation", ["test1", "test2"])
75 |
76 | # Verify client command was called correctly with default "ids" key
77 | self.mock_client.command.assert_called_once_with(
78 | "TestOperation", body={"ids": ["test1", "test2"]}
79 | )
80 |
81 | # Verify result
82 | expected_result = [
83 | {"id": "test1", "name": "Test Item 1"},
84 | {"id": "test2", "name": "Test Item 2"},
85 | ]
86 | self.assertEqual(result, expected_result)
87 |
88 | def test_base_get_by_ids_custom_id_key(self):
89 | """Test _base_get_by_ids with custom id_key parameter."""
90 | # Setup mock response
91 | mock_response = {
92 | "status_code": 200,
93 | "body": {
94 | "resources": [
95 | {"composite_id": "alert1", "status": "new"},
96 | {"composite_id": "alert2", "status": "closed"},
97 | ]
98 | },
99 | }
100 | self.mock_client.command.return_value = mock_response
101 |
102 | # Call _base_get_by_ids with custom id_key
103 | result = self.module._base_get_by_ids(
104 | "PostEntitiesAlertsV2", ["alert1", "alert2"], id_key="composite_ids"
105 | )
106 |
107 | # Verify client command was called correctly with custom key
108 | self.mock_client.command.assert_called_once_with(
109 | "PostEntitiesAlertsV2", body={"composite_ids": ["alert1", "alert2"]}
110 | )
111 |
112 | # Verify result
113 | expected_result = [
114 | {"composite_id": "alert1", "status": "new"},
115 | {"composite_id": "alert2", "status": "closed"},
116 | ]
117 | self.assertEqual(result, expected_result)
118 |
119 | def test_base_get_by_ids_with_additional_params(self):
120 | """Test _base_get_by_ids with additional parameters."""
121 | # Setup mock response
122 | mock_response = {
123 | "status_code": 200,
124 | "body": {
125 | "resources": [
126 | {"composite_id": "alert1", "status": "new", "hidden": False}
127 | ]
128 | },
129 | }
130 | self.mock_client.command.return_value = mock_response
131 |
132 | # Call _base_get_by_ids with additional parameters
133 | result = self.module._base_get_by_ids(
134 | "PostEntitiesAlertsV2",
135 | ["alert1"],
136 | id_key="composite_ids",
137 | include_hidden=True,
138 | sort_by="created_timestamp",
139 | )
140 |
141 | # Verify client command was called correctly with all parameters
142 | self.mock_client.command.assert_called_once_with(
143 | "PostEntitiesAlertsV2",
144 | body={
145 | "composite_ids": ["alert1"],
146 | "include_hidden": True,
147 | "sort_by": "created_timestamp",
148 | },
149 | )
150 |
151 | # Verify result
152 | expected_result = [{"composite_id": "alert1", "status": "new", "hidden": False}]
153 | self.assertEqual(result, expected_result)
154 |
155 | def test_base_get_by_ids_error_handling(self):
156 | """Test _base_get_by_ids error handling."""
157 | # Setup mock error response
158 | mock_response = {
159 | "status_code": 400,
160 | "body": {"errors": [{"message": "Invalid request"}]},
161 | }
162 | self.mock_client.command.return_value = mock_response
163 |
164 | # Call _base_get_by_ids
165 | result = self.module._base_get_by_ids("TestOperation", ["invalid_id"])
166 |
167 | # Verify error handling - should return error dict
168 | self.assertIn("error", result)
169 | self.assertIn("Failed to perform operation", result["error"])
170 |
171 | def test_base_get_by_ids_empty_response(self):
172 | """Test _base_get_by_ids with empty resources."""
173 | # Setup mock response with empty resources
174 | mock_response = {"status_code": 200, "body": {"resources": []}}
175 | self.mock_client.command.return_value = mock_response
176 |
177 | # Call _base_get_by_ids
178 | result = self.module._base_get_by_ids("TestOperation", ["nonexistent"])
179 |
180 | # Verify result is empty list
181 | self.assertEqual(result, [])
182 |
183 |
184 | if __name__ == "__main__":
185 | unittest.main()
186 |
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/hosts.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Hosts module for Falcon MCP Server
3 |
4 | This module provides tools for accessing and managing CrowdStrike Falcon hosts/devices.
5 | """
6 |
7 | from textwrap import dedent
8 | from typing import Any, Dict, List
9 |
10 | from mcp.server import FastMCP
11 | from mcp.server.fastmcp.resources import TextResource
12 | from pydantic import AnyUrl, Field
13 |
14 | from falcon_mcp.common.errors import handle_api_response
15 | from falcon_mcp.common.logging import get_logger
16 | from falcon_mcp.common.utils import prepare_api_parameters
17 | from falcon_mcp.modules.base import BaseModule
18 | from falcon_mcp.resources.hosts import SEARCH_HOSTS_FQL_DOCUMENTATION
19 |
20 | logger = get_logger(__name__)
21 |
22 |
23 | class HostsModule(BaseModule):
24 | """Module for accessing and managing CrowdStrike Falcon hosts/devices."""
25 |
26 | def register_tools(self, server: FastMCP) -> None:
27 | """Register tools with the MCP server.
28 |
29 | Args:
30 | server: MCP server instance
31 | """
32 | # Register tools
33 | self._add_tool(
34 | server=server,
35 | method=self.search_hosts,
36 | name="search_hosts",
37 | )
38 |
39 | self._add_tool(
40 | server=server,
41 | method=self.get_host_details,
42 | name="get_host_details",
43 | )
44 |
45 | def register_resources(self, server: FastMCP) -> None:
46 | """Register resources with the MCP server.
47 |
48 | Args:
49 | server: MCP server instance
50 | """
51 | search_hosts_fql_resource = TextResource(
52 | uri=AnyUrl("falcon://hosts/search/fql-guide"),
53 | name="falcon_search_hosts_fql_guide",
54 | description="Contains the guide for the `filter` param of the `falcon_search_hosts` tool.",
55 | text=SEARCH_HOSTS_FQL_DOCUMENTATION,
56 | )
57 |
58 | self._add_resource(
59 | server,
60 | search_hosts_fql_resource,
61 | )
62 |
63 | def search_hosts(
64 | self,
65 | filter: str | None = Field(
66 | default=None,
67 | description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://hosts/search/fql-guide` resource when building this filter parameter.",
68 | examples={"platform_name:'Windows'", "hostname:'PC*'"},
69 | ),
70 | limit: int = Field(
71 | default=10,
72 | ge=1,
73 | le=5000,
74 | description="The maximum records to return. [1-5000]",
75 | ),
76 | offset: int | None = Field(
77 | default=None,
78 | description="The offset to start retrieving records from.",
79 | ),
80 | sort: str | None = Field(
81 | default=None,
82 | description=dedent("""
83 | Sort hosts using these options:
84 |
85 | hostname: Host name/computer name
86 | last_seen: Timestamp when the host was last seen
87 | first_seen: Timestamp when the host was first seen
88 | modified_timestamp: When the host record was last modified
89 | platform_name: Operating system platform
90 | agent_version: CrowdStrike agent version
91 | os_version: Operating system version
92 | external_ip: External IP address
93 |
94 | Sort either asc (ascending) or desc (descending).
95 | Both formats are supported: 'hostname.desc' or 'hostname|desc'
96 |
97 | Examples: 'hostname.asc', 'last_seen.desc', 'platform_name.asc'
98 | """).strip(),
99 | examples={"hostname.asc", "last_seen.desc"},
100 | ),
101 | ) -> List[Dict[str, Any]]:
102 | """Search for hosts in your CrowdStrike environment.
103 |
104 | IMPORTANT: You must use the `falcon://hosts/search/fql-guide` resource when you need to use the `filter` parameter.
105 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_hosts` tool.
106 | """
107 | # Prepare parameters for QueryDevicesByFilter
108 | params = prepare_api_parameters(
109 | {
110 | "filter": filter,
111 | "limit": limit,
112 | "offset": offset,
113 | "sort": sort,
114 | }
115 | )
116 |
117 | # Define the operation name
118 | operation = "QueryDevicesByFilter"
119 |
120 | logger.debug("Searching hosts with params: %s", params)
121 |
122 | # Make the API request to get device IDs
123 | response = self.client.command(operation, parameters=params)
124 |
125 | # Use handle_api_response to get device IDs
126 | device_ids = handle_api_response(
127 | response,
128 | operation=operation,
129 | error_message="Failed to search hosts",
130 | default_result=[],
131 | )
132 |
133 | # If handle_api_response returns an error dict instead of a list,
134 | # it means there was an error, so we return it wrapped in a list
135 | if self._is_error(device_ids):
136 | return [device_ids]
137 |
138 | # If we have device IDs, get the details for each one
139 | if device_ids:
140 | # Use the base method to get device details
141 | details = self._base_get_by_ids(
142 | operation="PostDeviceDetailsV2",
143 | ids=device_ids,
144 | id_key="ids",
145 | )
146 |
147 | # If handle_api_response returns an error dict instead of a list,
148 | # it means there was an error, so we return it wrapped in a list
149 | if self._is_error(details):
150 | return [details]
151 |
152 | return details
153 |
154 | return []
155 |
156 | def get_host_details(
157 | self,
158 | ids: List[str] = Field(
159 | description="Host device IDs to retrieve details for. You can get device IDs from the search_hosts operation, the Falcon console, or the Streaming API. Maximum: 5000 IDs per request."
160 | ),
161 | ) -> List[Dict[str, Any]] | Dict[str, Any]:
162 | """Retrieve detailed information for specified host device IDs.
163 |
164 | This tool returns comprehensive host details for one or more device IDs.
165 | Use this when you already have specific device IDs and need their full details.
166 | For searching/discovering hosts, use the `falcon_search_hosts` tool instead.
167 | """
168 | logger.debug("Getting host details for IDs: %s", ids)
169 |
170 | # Handle empty list case - return empty list without making API call
171 | if not ids:
172 | return []
173 |
174 | # Use the base method to get device details
175 | return self._base_get_by_ids(
176 | operation="PostDeviceDetailsV2",
177 | ids=ids,
178 | id_key="ids",
179 | )
180 |
```
--------------------------------------------------------------------------------
/falcon_mcp/client.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Falcon API Client for MCP Server
3 |
4 | This module provides the Falcon API client and authentication utilities for the Falcon MCP server.
5 | """
6 |
7 | import os
8 | import platform
9 | import sys
10 | from importlib.metadata import PackageNotFoundError, version
11 | from typing import Any, Dict, Optional
12 |
13 | # Import the APIHarnessV2 from FalconPy
14 | from falconpy import APIHarnessV2
15 |
16 | from falcon_mcp.common.logging import get_logger
17 |
18 | logger = get_logger(__name__)
19 |
20 |
21 | class FalconClient:
22 | """Client for interacting with the CrowdStrike Falcon API."""
23 |
24 | def __init__(
25 | self,
26 | base_url: Optional[str] = None,
27 | debug: bool = False,
28 | user_agent_comment: Optional[str] = None,
29 | ):
30 | """Initialize the Falcon client.
31 |
32 | Args:
33 | base_url: Falcon API base URL (defaults to FALCON_BASE_URL env var)
34 | debug: Enable debug logging
35 | user_agent_comment: Additional information to include in the User-Agent comment section
36 | """
37 | # Get credentials from environment variables
38 | self.client_id = os.environ.get("FALCON_CLIENT_ID")
39 | self.client_secret = os.environ.get("FALCON_CLIENT_SECRET")
40 | self.base_url = base_url or os.environ.get(
41 | "FALCON_BASE_URL", "https://api.crowdstrike.com"
42 | )
43 | self.debug = debug
44 | self.user_agent_comment = user_agent_comment or os.environ.get(
45 | "FALCON_MCP_USER_AGENT_COMMENT"
46 | )
47 |
48 | if not self.client_id or not self.client_secret:
49 | raise ValueError(
50 | "Falcon API credentials not provided. Set FALCON_CLIENT_ID and "
51 | "FALCON_CLIENT_SECRET environment variables."
52 | )
53 |
54 | # Initialize the Falcon API client using APIHarnessV2
55 | self.client = APIHarnessV2(
56 | client_id=self.client_id,
57 | client_secret=self.client_secret,
58 | base_url=self.base_url,
59 | debug=debug,
60 | user_agent=self.get_user_agent(),
61 | )
62 |
63 | logger.debug("Initialized Falcon client with base URL: %s", self.base_url)
64 |
65 | def authenticate(self) -> bool:
66 | """Authenticate with the Falcon API.
67 |
68 | Returns:
69 | bool: True if authentication was successful
70 | """
71 | return self.client.login()
72 |
73 | def is_authenticated(self) -> bool:
74 | """Check if the client is authenticated.
75 |
76 | Returns:
77 | bool: True if the client is authenticated
78 | """
79 | return self.client.token_valid
80 |
81 | def command(self, operation: str, **kwargs) -> Dict[str, Any]:
82 | """Execute a Falcon API command.
83 |
84 | Args:
85 | operation: The API operation to execute
86 | **kwargs: Additional arguments to pass to the API
87 |
88 | Returns:
89 | Dict[str, Any]: The API response
90 | """
91 | return self.client.command(operation, **kwargs)
92 |
93 | def get_user_agent(self) -> str:
94 | """Get RFC-compliant user agent string for API requests.
95 |
96 | Returns:
97 | str: User agent string in RFC format "falcon-mcp/VERSION (comment; falconpy/VERSION; Python/VERSION; Platform/VERSION)"
98 | """
99 | # Get falcon-mcp version
100 | falcon_mcp_version = get_version()
101 |
102 | # Get Python version
103 | python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
104 |
105 | # Get platform information
106 | platform_info = f"{platform.system()}/{platform.release()}"
107 |
108 | # Get FalconPy version
109 | try:
110 | falconpy_version = version("crowdstrike-falconpy")
111 | except PackageNotFoundError:
112 | falconpy_version = "unknown"
113 | logger.debug("crowdstrike-falconpy package version not found")
114 |
115 | # Build comment section components (RFC-compliant format)
116 | comment_parts = []
117 | if self.user_agent_comment:
118 | comment_parts.append(self.user_agent_comment.strip())
119 | comment_parts.extend(
120 | [f"falconpy/{falconpy_version}", f"Python/{python_version}", platform_info]
121 | )
122 |
123 | return f"falcon-mcp/{falcon_mcp_version} ({'; '.join(comment_parts)})"
124 |
125 | def get_headers(self) -> Dict[str, str]:
126 | """Get authentication headers for API requests.
127 |
128 | This method returns the authentication headers from the underlying Falcon API client,
129 | which can be used for custom HTTP requests or advanced integration scenarios.
130 |
131 | Returns:
132 | Dict[str, str]: Authentication headers including the bearer token
133 | """
134 | return self.client.auth_headers
135 |
136 |
137 | def get_version() -> str:
138 | """Get falcon-mcp version with multiple fallback methods.
139 |
140 | This function tries multiple methods to determine the version:
141 | 1. importlib.metadata (works when package is properly installed)
142 | 2. pyproject.toml (works in development/Docker environments)
143 | 3. Hardcoded fallback
144 |
145 | Returns:
146 | str: The version string
147 | """
148 | # Try importlib.metadata first (works when properly installed)
149 | try:
150 | return version("falcon-mcp")
151 | except PackageNotFoundError:
152 | logger.debug(
153 | "falcon-mcp package not found via importlib.metadata, trying pyproject.toml"
154 | )
155 |
156 | # Try reading from pyproject.toml (works in development/Docker)
157 | try:
158 | import pathlib
159 | import tomllib # Python 3.11+
160 |
161 | # Look for pyproject.toml in current directory and parent directories
162 | current_path = pathlib.Path(__file__).parent
163 | for _ in range(3): # Check up to 3 levels up
164 | pyproject_path = current_path / "pyproject.toml"
165 | if pyproject_path.exists():
166 | with open(pyproject_path, "rb") as f:
167 | data = tomllib.load(f)
168 | version_str = data["project"]["version"]
169 | logger.debug(
170 | "Found version %s in pyproject.toml at %s",
171 | version_str,
172 | pyproject_path,
173 | )
174 | return version_str
175 | current_path = current_path.parent
176 |
177 | logger.debug("pyproject.toml not found in current or parent directories")
178 | except (KeyError, ImportError, OSError, TypeError) as e:
179 | logger.debug("Failed to read version from pyproject.toml: %s", e)
180 |
181 | # Final fallback
182 | fallback_version = "0.1.0"
183 | logger.debug("Using fallback version: %s", fallback_version)
184 | return fallback_version
185 |
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_spotlight.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | E2E tests for the Spotlight module.
3 | """
4 |
5 | import json
6 | import unittest
7 |
8 | import pytest
9 |
10 | from tests.e2e.utils.base_e2e_test import BaseE2ETest
11 |
12 |
13 | @pytest.mark.e2e
14 | class TestSpotlightModuleE2E(BaseE2ETest):
15 | """
16 | End-to-end test suite for the Falcon MCP Server Spotlight Module.
17 | """
18 |
19 | def test_search_high_severity_vulnerabilities(self):
20 | """Verify the agent can search for high severity vulnerabilities."""
21 |
22 | async def test_logic():
23 | fixtures = [
24 | {
25 | "operation": "combinedQueryVulnerabilities",
26 | "validator": lambda kwargs: "high"
27 | in kwargs.get("parameters", {}).get("filter", "").lower(),
28 | "response": {
29 | "status_code": 200,
30 | "body": {
31 | "resources": [
32 | {
33 | "id": "vuln-001",
34 | "cve": {
35 | "id": "CVE-2024-1234",
36 | "base_score": 8.5,
37 | "severity": "HIGH",
38 | "exprt_rating": "HIGH",
39 | "exploit_status": 60,
40 | "is_cisa_kev": True,
41 | "description": "Critical buffer overflow vulnerability in network service",
42 | },
43 | "status": "open",
44 | "created_timestamp": "2024-01-15T10:30:00Z",
45 | "updated_timestamp": "2024-01-20T14:15:00Z",
46 | "host_info": {
47 | "hostname": "web-server-01",
48 | "platform_name": "Linux",
49 | "asset_criticality": "Critical",
50 | "internet_exposure": "Yes",
51 | "managed_by": "Falcon sensor",
52 | },
53 | "apps": {
54 | "application_name": "Apache HTTP Server",
55 | "application_version": "2.4.41",
56 | },
57 | },
58 | {
59 | "id": "vuln-002",
60 | "cve": {
61 | "id": "CVE-2024-5678",
62 | "base_score": 7.8,
63 | "severity": "HIGH",
64 | "exprt_rating": "MEDIUM",
65 | "exploit_status": 30,
66 | "is_cisa_kev": False,
67 | "description": "Privilege escalation vulnerability in system service",
68 | },
69 | "status": "open",
70 | "created_timestamp": "2024-01-18T08:45:00Z",
71 | "updated_timestamp": "2024-01-19T16:20:00Z",
72 | "host_info": {
73 | "hostname": "db-server-02",
74 | "platform_name": "Windows",
75 | "asset_criticality": "High",
76 | "internet_exposure": "No",
77 | "managed_by": "Falcon sensor",
78 | },
79 | "apps": {
80 | "application_name": "Microsoft SQL Server",
81 | "application_version": "2019",
82 | },
83 | },
84 | ]
85 | },
86 | },
87 | }
88 | ]
89 |
90 | self._mock_api_instance.command.side_effect = (
91 | self._create_mock_api_side_effect(fixtures)
92 | )
93 |
94 | prompt = "Find all high severity vulnerabilities in our environment and show me their CVE details and affected hosts"
95 | return await self._run_agent_stream(prompt)
96 |
97 | def assertions(tools, result):
98 | self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
99 | used_tool = tools[len(tools) - 1]
100 | self.assertEqual(
101 | used_tool["input"]["tool_name"], "falcon_search_vulnerabilities"
102 | )
103 |
104 | # Check for high severity filtering
105 | tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
106 | self.assertTrue(
107 | "high" in tool_input_str,
108 | f"Expected high severity filtering in tool input: {tool_input_str}",
109 | )
110 |
111 | # Verify both vulnerabilities are in the output
112 | self.assertIn("CVE-2024-1234", used_tool["output"])
113 | self.assertIn("CVE-2024-5678", used_tool["output"])
114 | self.assertIn("web-server-01", used_tool["output"])
115 | self.assertIn("db-server-02", used_tool["output"])
116 |
117 | # Verify API call was made correctly
118 | self.assertGreaterEqual(
119 | self._mock_api_instance.command.call_count, 1, "Expected 1 API call"
120 | )
121 |
122 | # Check API call (combinedQueryVulnerabilities)
123 | api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
124 | "parameters", {}
125 | )
126 | filter_str = api_call_params.get("filter", "").lower()
127 | self.assertTrue(
128 | "high" in filter_str,
129 | f"Expected high severity filtering in API call: {filter_str}",
130 | )
131 |
132 | # Verify result contains expected information
133 | self.assertIn("CVE-2024-1234", result)
134 | self.assertIn("CVE-2024-5678", result)
135 | self.assertIn("web-server-01", result)
136 | self.assertIn("db-server-02", result)
137 | self.assertIn("8.5", result) # Should contain CVSS scores
138 | self.assertIn("7.8", result)
139 |
140 | self.run_test_with_retries(
141 | "test_search_high_severity_vulnerabilities", test_logic, assertions
142 | )
143 |
144 |
145 | if __name__ == "__main__":
146 | unittest.main()
147 |
```
--------------------------------------------------------------------------------
/examples/adk/falcon_agent/agent.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import os
3 | import sys
4 | from typing import List, Optional, TextIO, Union
5 |
6 | from google.adk.agents import LlmAgent
7 | from google.adk.agents.callback_context import CallbackContext
8 | from google.adk.agents.readonly_context import ReadonlyContext
9 | from google.adk.models import LlmRequest, LlmResponse
10 | from google.adk.tools.base_tool import BaseTool
11 | from google.adk.tools.base_toolset import ToolPredicate
12 | from google.adk.tools.mcp_tool import MCPTool
13 | from google.adk.tools.mcp_tool.mcp_session_manager import (
14 | SseConnectionParams,
15 | StdioConnectionParams,
16 | StreamableHTTPConnectionParams,
17 | retry_on_closed_resource,
18 | )
19 | from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
20 | from mcp import StdioServerParameters
21 | from mcp.types import ListToolsResult
22 |
23 | tools_cache={}
24 |
25 | def make_tools_compatible(tools):
26 | """
27 | This function makes the schema compatible with Gemini/Vertex AI API
28 | It is only needed when API used is Gemini and model is other than 2.5 models
29 | It is however needed for ALL models when API used is VertexAI
30 | """
31 | for tool in tools:
32 | for key in tool._mcp_tool.inputSchema.keys():
33 | if key == "properties":
34 | for prop_name in tool._mcp_tool.inputSchema["properties"].keys():
35 | if "anyOf" in tool._mcp_tool.inputSchema["properties"][prop_name].keys():
36 | if (tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"] == "array"):
37 | tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["items"]["type"]
38 | else:
39 | tool._mcp_tool.inputSchema["properties"][prop_name]["type"] = tool._mcp_tool.inputSchema["properties"][prop_name]["anyOf"][0]["type"]
40 | tool._mcp_tool.inputSchema["properties"][prop_name].pop("anyOf")
41 |
42 | return tools
43 |
44 |
45 | class MCPToolSetWithSchemaAccess(MCPToolset):
46 | """
47 | Added to make the MCP tools schema compatible with Vertext AI API and also older Gemini models.
48 | Also introduced a small performance improvement with tools caching.
49 | """
50 |
51 | def __init__(
52 | self,
53 | *,
54 | tool_set_name: str, # <-- new parameter
55 | connection_params: Union[
56 | StdioServerParameters,
57 | StdioConnectionParams,
58 | SseConnectionParams,
59 | StreamableHTTPConnectionParams,
60 | ],
61 | tool_filter: Optional[Union[ToolPredicate, List[str]]] = None,
62 | errlog: TextIO = sys.stderr,
63 | ):
64 | super().__init__(
65 | connection_params=connection_params,
66 | tool_filter=tool_filter,
67 | errlog=errlog
68 | )
69 | self.tool_set_name = tool_set_name
70 | logging.info(f"MCPToolSetWithSchemaAccess initialized with tool_set_name: '{self.tool_set_name}'")
71 | self._session = None
72 |
73 | @retry_on_closed_resource
74 | async def get_tools(
75 | self,
76 | readonly_context: Optional[ReadonlyContext] = None,
77 | ) -> List[BaseTool]:
78 | """Return all tools in the toolset based on the provided context.
79 |
80 | Args:
81 | readonly_context: Context used to filter tools available to the agent.
82 | If None, all tools in the toolset are returned.
83 |
84 | Returns:
85 | List[BaseTool]: A list of tools available under the specified context.
86 | """
87 | # Get session from session manager
88 | session = await self._mcp_session_manager.create_session()
89 |
90 | if self.tool_set_name in tools_cache.keys():
91 | logging.info(f"Tools found in cache for toolset {self.tool_set_name}, returning them")
92 | return tools_cache[self.tool_set_name]
93 | else:
94 | logging.info(f"No tools found in cache for toolset {self.tool_set_name}, loading")
95 |
96 | # Fetch available tools from the MCP server
97 | tools_response: ListToolsResult = await session.list_tools()
98 |
99 | # Apply filtering based on context and tool_filter
100 | tools = []
101 | for tool in tools_response.tools:
102 | mcp_tool = MCPTool(
103 | mcp_tool=tool,
104 | mcp_session_manager=self._mcp_session_manager,
105 | auth_scheme=self._auth_scheme,
106 | auth_credential=self._auth_credential,
107 | )
108 |
109 | if self._is_tool_selected(mcp_tool, readonly_context):
110 | tools.append(mcp_tool)
111 |
112 | model_version = os.environ.get("GOOGLE_MODEL").split("-")[1]
113 | if float(model_version) < 2.5 or os.environ.get("GOOGLE_GENAI_USE_VERTEXAI").upper() == "TRUE":
114 | logging.error(f"Model - {os.environ.get('GOOGLE_MODEL')} needs Gemini compatible tools, updating schema ...")
115 | tools = make_tools_compatible(tools)
116 | else:
117 | logging.info(f"Model - {os.environ.get('GOOGLE_MODEL')} does not need updating schema")
118 |
119 | tools_cache[self.tool_set_name] = tools
120 |
121 | return tools
122 |
123 | # Controlling context size to improve Model response time and for cost optimization
124 | # https://github.com/google/adk-python/issues/752#issuecomment-2948152979
125 | def bmc_trim_llm_request(
126 | callback_context: CallbackContext, llm_request: LlmRequest
127 | ) -> Optional[LlmResponse]:
128 |
129 | max_prev_user_interactions = int(os.environ.get("MAX_PREV_USER_INTERACTIONS","-1"))
130 |
131 | logging.info(f"Number of contents going to LLM - {len(llm_request.contents)}, MAX_PREV_USER_INTERACTIONS = {max_prev_user_interactions}")
132 |
133 | temp_processed_list = []
134 |
135 | if max_prev_user_interactions == -1:
136 | return None
137 | else:
138 | user_message_count = 0
139 | for i in range(len(llm_request.contents) - 1, -1, -1):
140 | item = llm_request.contents[i]
141 |
142 | if item.role == "user" and item.parts[0] and item.parts[0].text and item.parts[0].text != "For context:":
143 | logging.info(f"Encountered a user message => {item.parts[0].text}")
144 | user_message_count += 1
145 |
146 | if user_message_count > max_prev_user_interactions:
147 | logging.info(f"Breaking at user_message_count => {user_message_count}")
148 | temp_processed_list.append(item)
149 | break
150 |
151 | temp_processed_list.append(item)
152 |
153 | final_list = temp_processed_list[::-1]
154 |
155 | if user_message_count < max_prev_user_interactions:
156 | logging.info("User message count did not reach the allowed limit. List remains unchanged.")
157 | else:
158 | logging.info(f"User message count reached {max_prev_user_interactions}. List truncated.")
159 | llm_request.contents = final_list
160 |
161 | return None
162 |
163 |
164 | root_agent = LlmAgent(
165 | model=os.environ.get("GOOGLE_MODEL"),
166 | name='falcon_agent',
167 | instruction=os.environ.get("FALCON_AGENT_PROMPT"),
168 | tools=[
169 | MCPToolSetWithSchemaAccess(
170 | tool_set_name="falcon-tools",
171 | connection_params=StdioConnectionParams(
172 | server_params=StdioServerParameters(
173 | command='falcon-mcp',
174 | env={
175 | "FALCON_CLIENT_ID":os.environ.get("FALCON_CLIENT_ID"),
176 | "FALCON_CLIENT_SECRET":os.environ.get("FALCON_CLIENT_SECRET"),
177 | "FALCON_BASE_URL":os.environ.get("FALCON_BASE_URL"),
178 | }
179 | )
180 | )
181 | ),
182 | ],
183 | )
184 |
```
--------------------------------------------------------------------------------
/falcon_mcp/resources/serverless.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Contains Serverless Vulnerabilities resources.
3 | """
4 |
5 | from falcon_mcp.common.utils import generate_md_table
6 |
7 | # List of tuples containing filter options data: (name, type, operators, description)
8 | SERVERLESS_VULNERABILITIES_FQL_FILTERS = [
9 | (
10 | "Name",
11 | "Type",
12 | "Operators",
13 | "Description"
14 | ),
15 | (
16 | "application_name",
17 | "String",
18 | "Yes",
19 | """
20 | Name of the application associated with the serverless function.
21 |
22 | Ex: application_name:'my-lambda-app'
23 | """
24 | ),
25 | (
26 | "application_name_version",
27 | "String",
28 | "Yes",
29 | """
30 | Version of the application associated with the serverless function.
31 |
32 | Ex: application_name_version:'1.0.0'
33 | """
34 | ),
35 | (
36 | "cid",
37 | "String",
38 | "No",
39 | """
40 | Unique system-generated customer identifier (CID) of the account.
41 |
42 | Ex: cid:'0123456789ABCDEFGHIJKLMNOPQRSTUV'
43 | """
44 | ),
45 | (
46 | "cloud_account_id",
47 | "String",
48 | "Yes",
49 | """
50 | Unique identifier of the cloud account where the serverless function is deployed.
51 |
52 | Ex: cloud_account_id:'123456789012'
53 | """
54 | ),
55 | (
56 | "cloud_account_name",
57 | "String",
58 | "Yes",
59 | """
60 | Name of the cloud account where the serverless function is deployed.
61 |
62 | Ex: cloud_account_name:'production-account'
63 | """
64 | ),
65 | (
66 | "cloud_provider",
67 | "String",
68 | "Yes",
69 | """
70 | Name of the cloud service provider hosting the serverless function.
71 | Values: aws, azure, gcp
72 |
73 | Ex: cloud_provider:'aws'
74 | """
75 | ),
76 | (
77 | "cve_id",
78 | "String",
79 | "Yes",
80 | """
81 | Unique identifier for a vulnerability as cataloged in the National Vulnerability Database (NVD).
82 | Supports multiple values and negation.
83 |
84 | Ex: cve_id:['CVE-2022-1234']
85 | Ex: cve_id:['CVE-2022-1234','CVE-2023-5678']
86 | """
87 | ),
88 | (
89 | "cvss_base_score",
90 | "Number",
91 | "Yes",
92 | """
93 | Common Vulnerability Scoring System (CVSS) base score of the vulnerability.
94 |
95 | Ex: cvss_base_score:>7.0
96 | Ex: cvss_base_score:<5.0
97 | """
98 | ),
99 | (
100 | "exprt_rating",
101 | "String",
102 | "Yes",
103 | """
104 | ExPRT rating assigned by CrowdStrike's predictive AI rating system.
105 | Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
106 |
107 | Ex: exprt_rating:'HIGH'
108 | Ex: exprt_rating:['HIGH','CRITICAL']
109 | """
110 | ),
111 | (
112 | "first_seen_timestamp",
113 | "Timestamp",
114 | "Yes",
115 | """
116 | Date and time when this vulnerability was first detected in the serverless function.
117 |
118 | Ex: first_seen_timestamp:>'2023-01-01'
119 | Ex: first_seen_timestamp:<'2023-12-31'
120 | """
121 | ),
122 | (
123 | "function_name",
124 | "String",
125 | "Yes",
126 | """
127 | Name of the serverless function where the vulnerability was detected.
128 |
129 | Ex: function_name:'process-payment'
130 | """
131 | ),
132 | (
133 | "function_resource_id",
134 | "String",
135 | "Yes",
136 | """
137 | Unique resource identifier of the serverless function.
138 |
139 | Ex: function_resource_id:'arn:aws:lambda:us-east-1:123456789012:function:my-function'
140 | """
141 | ),
142 | (
143 | "is_supported",
144 | "Boolean",
145 | "No",
146 | """
147 | Indicates if the serverless function is supported for vulnerability scanning.
148 |
149 | Ex: is_supported:true
150 | """
151 | ),
152 | (
153 | "is_valid_asset_id",
154 | "Boolean",
155 | "No",
156 | """
157 | Indicates if the asset ID associated with the serverless function is valid.
158 |
159 | Ex: is_valid_asset_id:true
160 | """
161 | ),
162 | (
163 | "layer",
164 | "String",
165 | "Yes",
166 | """
167 | Layer in the serverless function where the vulnerability was detected.
168 |
169 | Ex: layer:'runtime'
170 | Ex: layer:'dependency'
171 | """
172 | ),
173 | (
174 | "region",
175 | "String",
176 | "Yes",
177 | """
178 | Cloud region where the serverless function is deployed.
179 |
180 | Ex: region:'us-east-1'
181 | Ex: region:['us-east-1','us-west-2']
182 | """
183 | ),
184 | (
185 | "runtime",
186 | "String",
187 | "Yes",
188 | """
189 | Runtime environment of the serverless function.
190 | Values: nodejs, python, java, ruby, go, dotnet
191 |
192 | Ex: runtime:'nodejs'
193 | Ex: runtime:['python','nodejs']
194 | """
195 | ),
196 | (
197 | "severity",
198 | "String",
199 | "Yes",
200 | """
201 | Severity level of the vulnerability.
202 | Values: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
203 |
204 | Ex: severity:'HIGH'
205 | Ex: severity:['HIGH','CRITICAL']
206 | """
207 | ),
208 | (
209 | "timestamp",
210 | "Timestamp",
211 | "Yes",
212 | """
213 | Date and time when the vulnerability was last updated.
214 |
215 | Ex: timestamp:>'2023-06-01'
216 | Ex: timestamp:<'2023-12-31'
217 | """
218 | ),
219 | (
220 | "type",
221 | "String",
222 | "Yes",
223 | """
224 | Type of the vulnerability.
225 | Values: Vulnerability, Misconfiguration, Unsupported software
226 |
227 | Ex: type:'Vulnerability'
228 | Ex: type:!'Misconfiguration'
229 | """
230 | ),
231 | ]
232 |
233 | SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION = """Falcon Query Language (FQL) - Serverless Vulnerabilities Guide
234 |
235 | === BASIC SYNTAX ===
236 | property_name:[operator]'value'
237 |
238 | === AVAILABLE OPERATORS ===
239 | • No operator = equals (default)
240 | • ! = not equal to
241 | • > = greater than
242 | • >= = greater than or equal
243 | • < = less than
244 | • <= = less than or equal
245 | • ~ = text match (ignores case, spaces, punctuation)
246 | • !~ = does not text match
247 |
248 | === DATA TYPES & SYNTAX ===
249 | • Strings: 'value' or ['exact_value'] for exact match
250 | • Dates: 'YYYY-MM-DDTHH:MM:SSZ' (UTC format)
251 | • Booleans: true or false (no quotes)
252 | • Numbers: 123 (no quotes)
253 |
254 | === COMBINING CONDITIONS ===
255 | • + = AND condition
256 | • , = OR condition
257 | • ( ) = Group expressions
258 |
259 | === falcon_search_serverless_vulnerabilities FQL filter options ===
260 |
261 | """ + generate_md_table(SERVERLESS_VULNERABILITIES_FQL_FILTERS) + """
262 |
263 | === IMPORTANT NOTES ===
264 | • Use single quotes around string values: 'value'
265 | • Use square brackets for exact matches and multiple values: ['value1','value2']
266 | • Date format must be UTC: 'YYYY-MM-DDTHH:MM:SSZ'
267 | • For case-insensitive filtering, add .insensitive to field names
268 | • Boolean values: true or false (no quotes)
269 | • Wildcards (*) are unsupported in this API
270 | • Some fields require specific capitalization (check individual field descriptions)
271 |
272 | === COMMON FILTER EXAMPLES ===
273 | • Filter by cloud provider: cloud_provider:'aws'
274 | • High severity vulnerabilities: severity:'HIGH'
275 | • Recent vulnerabilities: first_seen_timestamp:>'2023-01-01'
276 | • Filter by specific runtime: runtime:'nodejs'
277 | • Filter by region: region:'us-east-1'
278 | • Critical vulnerabilities in a specific account: severity:'CRITICAL'+cloud_account_id:'123456789012'
279 | • Filter by function name: function_name:'payment-processor'
280 | • High CVSS score vulnerabilities: cvss_base_score:>7.0
281 | """
282 |
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/detections.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Detections module for Falcon MCP Server
3 |
4 | This module provides tools for accessing and analyzing CrowdStrike Falcon detections.
5 | """
6 |
7 | from textwrap import dedent
8 | from typing import Any, Dict, List
9 |
10 | from mcp.server import FastMCP
11 | from mcp.server.fastmcp.resources import TextResource
12 | from pydantic import AnyUrl, Field
13 |
14 | from falcon_mcp.common.errors import handle_api_response
15 | from falcon_mcp.common.logging import get_logger
16 | from falcon_mcp.common.utils import prepare_api_parameters
17 | from falcon_mcp.modules.base import BaseModule
18 | from falcon_mcp.resources.detections import SEARCH_DETECTIONS_FQL_DOCUMENTATION
19 |
20 | logger = get_logger(__name__)
21 |
22 |
23 | class DetectionsModule(BaseModule):
24 | """Module for accessing and analyzing CrowdStrike Falcon detections."""
25 |
26 | def register_tools(self, server: FastMCP) -> None:
27 | """Register tools with the MCP server.
28 |
29 | Args:
30 | server: MCP server instance
31 | """
32 | # Register tools
33 | self._add_tool(
34 | server=server,
35 | method=self.search_detections,
36 | name="search_detections",
37 | )
38 |
39 | self._add_tool(
40 | server=server,
41 | method=self.get_detection_details,
42 | name="get_detection_details",
43 | )
44 |
45 | def register_resources(self, server: FastMCP) -> None:
46 | """Register resources with the MCP server.
47 |
48 | Args:
49 | server: MCP server instance
50 | """
51 | search_detections_fql_resource = TextResource(
52 | uri=AnyUrl("falcon://detections/search/fql-guide"),
53 | name="falcon_search_detections_fql_guide",
54 | description="Contains the guide for the `filter` param of the `falcon_search_detections` tool.",
55 | text=SEARCH_DETECTIONS_FQL_DOCUMENTATION,
56 | )
57 |
58 | self._add_resource(
59 | server,
60 | search_detections_fql_resource,
61 | )
62 |
63 | def search_detections(
64 | self,
65 | filter: str | None = Field(
66 | default=None,
67 | description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://detections/search/fql-guide` resource when building this filter parameter.",
68 | examples={"agent_id:'77d11725xxxxxxxxxxxxxxxxxxxxc48ca19'", "status:'new'"},
69 | ),
70 | limit: int = Field(
71 | default=10,
72 | ge=1,
73 | le=9999,
74 | description="The maximum number of detections to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.",
75 | ),
76 | offset: int | None = Field(
77 | default=None,
78 | description="The first detection to return, where 0 is the latest detection. Use with the offset parameter to manage pagination of results.",
79 | ),
80 | q: str | None = Field(
81 | default=None,
82 | description="Search all detection metadata for the provided string",
83 | ),
84 | sort: str | None = Field(
85 | default=None,
86 | description=dedent("""
87 | Sort detections using these options:
88 |
89 | timestamp: Timestamp when the detection occurred
90 | created_timestamp: When the detection was created
91 | updated_timestamp: When the detection was last modified
92 | severity: Severity level of the detection (1-100, recommended when filtering by severity)
93 | confidence: Confidence level of the detection (1-100)
94 | agent_id: Agent ID associated with the detection
95 |
96 | Sort either asc (ascending) or desc (descending).
97 | Both formats are supported: 'severity.desc' or 'severity|desc'
98 |
99 | When searching for high severity detections, use 'severity.desc' to get the highest severity detections first.
100 | For chronological ordering, use 'timestamp.desc' for most recent detections first.
101 |
102 | Examples: 'severity.desc', 'timestamp.desc'
103 | """).strip(),
104 | examples={"severity.desc", "timestamp.desc"},
105 | ),
106 | include_hidden: bool = Field(default=True),
107 | ) -> List[Dict[str, Any]]:
108 | """Find and analyze detections to understand malicious activity in your environment.
109 |
110 | IMPORTANT: You must use the `falcon://detections/search/fql-guide` resource when you need to use the `filter` parameter.
111 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_detections` tool.
112 | """
113 | # Prepare parameters
114 | params = prepare_api_parameters(
115 | {
116 | "filter": filter,
117 | "limit": limit,
118 | "offset": offset,
119 | "q": q,
120 | "sort": sort,
121 | }
122 | )
123 |
124 | # Define the operation name
125 | operation = "GetQueriesAlertsV2"
126 |
127 | logger.debug("Searching detections with params: %s", params)
128 |
129 | # Make the API request
130 | response = self.client.command(operation, parameters=params)
131 |
132 | # Use handle_api_response to get detection IDs (now composite_ids)
133 | detection_ids = handle_api_response(
134 | response,
135 | operation=operation,
136 | error_message="Failed to search detections",
137 | default_result=[],
138 | )
139 |
140 | # If handle_api_response returns an error dict instead of a list,
141 | # it means there was an error, so we return it wrapped in a list
142 | if self._is_error(detection_ids):
143 | return [detection_ids]
144 |
145 | # If we have detection IDs, get the details for each one
146 | if detection_ids:
147 | # Use the enhanced base method with composite_ids and include_hidden
148 | details = self._base_get_by_ids(
149 | operation="PostEntitiesAlertsV2",
150 | ids=detection_ids,
151 | id_key="composite_ids",
152 | include_hidden=include_hidden,
153 | )
154 |
155 | # If handle_api_response returns an error dict instead of a list,
156 | # it means there was an error, so we return it wrapped in a list
157 | if self._is_error(details):
158 | return [details]
159 |
160 | return details
161 |
162 | return []
163 |
164 | def get_detection_details(
165 | self,
166 | ids: List[str] = Field(
167 | description="Composite ID(s) to retrieve detection details for.",
168 | ),
169 | include_hidden: bool = Field(
170 | default=True,
171 | description="Whether to include hidden detections (default: True). When True, shows all detections including previously hidden ones for comprehensive visibility.",
172 | ),
173 | ) -> List[Dict[str, Any]] | Dict[str, Any]:
174 | """Get detection details for specific detection IDs to understand security threats.
175 |
176 | Use this when you already have specific detection IDs and need their full details.
177 | For searching/discovering detections, use the `falcon_search_detections` tool instead.
178 | """
179 | logger.debug("Getting detection details for ID(s): %s", ids)
180 |
181 | # Use the enhanced base method - composite_ids parameter matches ids for backward compatibility
182 | return self._base_get_by_ids(
183 | operation="PostEntitiesAlertsV2",
184 | ids=ids,
185 | id_key="composite_ids",
186 | include_hidden=include_hidden,
187 | )
188 |
```
--------------------------------------------------------------------------------
/docs/deployment/amazon_bedrock_agentcore.md:
--------------------------------------------------------------------------------
```markdown
1 | # Deploying to Amazon Bedrock AgentCore
2 |
3 | This guide walks you through deploying the Falcon MCP Server to Amazon Bedrock AgentCore. You'll configure the necessary AWS resources, set up IAM permissions, and prepare your environment.
4 |
5 | ## Prerequisites
6 |
7 | Before deploying to Amazon Bedrock AgentCore, ensure you have your CrowdStrike API credentials and AWS environment properly configured.
8 |
9 | ### CrowdStrike API Credentials
10 |
11 | You'll need to create API credentials in the CrowdStrike platform with the appropriate scopes for your intended use case.
12 |
13 | 1. **Create API Key**: Generate an API key in the CrowdStrike platform with the necessary scopes as outlined in [Available Modules, Tools & Resources](https://github.com/CrowdStrike/falcon-mcp/tree/main?tab=readme-ov-file#available-modules-tools--resources)
14 |
15 | 2. **Prepare Environment Variables**: You'll configure these values during agent deployment:
16 | - `FALCON_CLIENT_ID` - Your CrowdStrike API client ID
17 | - `FALCON_CLIENT_SECRET` - Your CrowdStrike API client secret
18 | - `FALCON_BASE_URL` - Your CrowdStrike API base URL (region-specific)
19 |
20 | ### AWS VPC Requirements
21 |
22 | The MCP Server requires internet connectivity to communicate with CrowdStrike's APIs. We recommend deploying in an existing VPC used for your agentic tools.
23 |
24 | **Required VPC Configuration:**
25 |
26 | - **Internet Gateway or NAT Gateway** - Enables outbound internet connectivity
27 | - **Outbound HTTPS Access** - Allow communication to `api.crowdstrike.com` on port 443
28 | - **Security Groups** - Configure appropriate security group rules for your network requirements
29 |
30 | ## IAM Configuration
31 |
32 | The MCP server requires specific IAM permissions to function within the Amazon Bedrock AgentCore environment. You'll create an execution role with the necessary policies and trust relationships.
33 |
34 | > [!IMPORTANT]
35 | > Replace all placeholder values with your specific environment details:
36 | >
37 | > - `{{region}}` - Your AWS region (e.g., `us-east-1`)
38 | > - `{{accountId}}` - Your AWS account ID
39 | > - `{{agentName}}` - Your agent name with no spaces or special characters (e.g., `falcon`). You'll need to decide the agent name **before** creating the role and AgentCore Runtime.
40 |
41 | ### Step 1: Create the IAM Execution Role
42 |
43 | Create an IAM role with the following policy that grants the necessary permissions for container access, logging, monitoring, and Bedrock operations:
44 |
45 | ```json
46 | {
47 | "Version": "2012-10-17",
48 | "Statement": [
49 | {
50 | "Sid": "ECRImageAccess",
51 | "Effect": "Allow",
52 | "Action": [
53 | "ecr:BatchGetImage",
54 | "ecr:GetDownloadUrlForLayer"
55 | ],
56 | "Resource": [
57 | "arn:aws:ecr:us-east-1:709825985650:repository/crowdstrike/falcon-mcp"
58 | ]
59 | },
60 | {
61 | "Effect": "Allow",
62 | "Action": [
63 | "logs:DescribeLogStreams",
64 | "logs:CreateLogGroup"
65 | ],
66 | "Resource": [
67 | "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*"
68 | ]
69 | },
70 | {
71 | "Effect": "Allow",
72 | "Action": [
73 | "logs:DescribeLogGroups"
74 | ],
75 | "Resource": [
76 | "arn:aws:logs:{{region}}:{{accountId}}:log-group:*"
77 | ]
78 | },
79 | {
80 | "Effect": "Allow",
81 | "Action": [
82 | "logs:CreateLogStream",
83 | "logs:PutLogEvents"
84 | ],
85 | "Resource": [
86 | "arn:aws:logs:{{region}}:{{accountId}}:log-group:/aws/bedrock-agentcore/runtimes/*:log-stream:*"
87 | ]
88 | },
89 | {
90 | "Sid": "ECRTokenAccess",
91 | "Effect": "Allow",
92 | "Action": [
93 | "ecr:GetAuthorizationToken"
94 | ],
95 | "Resource": "*"
96 | },
97 | {
98 | "Effect": "Allow",
99 | "Action": [
100 | "xray:PutTraceSegments",
101 | "xray:PutTelemetryRecords",
102 | "xray:GetSamplingRules",
103 | "xray:GetSamplingTargets"
104 | ],
105 | "Resource": [
106 | "*"
107 | ]
108 | },
109 | {
110 | "Effect": "Allow",
111 | "Resource": "*",
112 | "Action": "cloudwatch:PutMetricData",
113 | "Condition": {
114 | "StringEquals": {
115 | "cloudwatch:namespace": "bedrock-agentcore"
116 | }
117 | }
118 | },
119 | {
120 | "Sid": "GetAgentAccessToken",
121 | "Effect": "Allow",
122 | "Action": [
123 | "bedrock-agentcore:GetWorkloadAccessToken",
124 | "bedrock-agentcore:GetWorkloadAccessTokenForJWT",
125 | "bedrock-agentcore:GetWorkloadAccessTokenForUserId"
126 | ],
127 | "Resource": [
128 | "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default",
129 | "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:workload-identity-directory/default/workload-identity/{{agentName}}-*"
130 | ]
131 | },
132 | {
133 | "Sid": "BedrockModelInvocation",
134 | "Effect": "Allow",
135 | "Action": [
136 | "bedrock:InvokeModel",
137 | "bedrock:InvokeModelWithResponseStream"
138 | ],
139 | "Resource": [
140 | "arn:aws:bedrock:*::foundation-model/*",
141 | "arn:aws:bedrock:{{region}}:{{accountId}}:*"
142 | ]
143 | }
144 | ]
145 | }
146 | ```
147 |
148 | > [!NOTE]
149 | > Save the ARN of the IAM role - you'll need it for the deployment of the Amazon Bedrock AgentCore agent.
150 |
151 | ### Step 2: Create the IAM Trust Policy
152 |
153 | Create a trust policy that allows the Bedrock AgentCore service to assume the execution role:
154 |
155 | ```json
156 | {
157 | "Version": "2012-10-17",
158 | "Statement": [
159 | {
160 | "Sid": "AssumeRolePolicy",
161 | "Effect": "Allow",
162 | "Principal": {
163 | "Service": "bedrock-agentcore.amazonaws.com"
164 | },
165 | "Action": "sts:AssumeRole",
166 | "Condition": {
167 | "StringEquals": {
168 | "aws:SourceAccount": "{{accountId}}"
169 | },
170 | "ArnLike": {
171 | "aws:SourceArn": "arn:aws:bedrock-agentcore:{{region}}:{{accountId}}:*"
172 | }
173 | }
174 | }
175 | ]
176 | }
177 | ```
178 |
179 | ### Step 3: Associate the Trust Policy
180 |
181 | Attach the trust policy to the IAM execution role you created in Step 1. This completes the IAM configuration required for the MCP server to operate within Amazon Bedrock AgentCore.
182 |
183 | ## Next Steps
184 |
185 | ### Important Variables
186 |
187 | To host this agent in Amazon Bedrock AgentCore, the following variables will need to be known:
188 |
189 | | Variable | Description |
190 | | :--- | :--- |
191 | | `FALCON_CLIENT_ID` | The Client ID for your Falcon API credentials |
192 | | `FALCON_CLIENT_SECRET` | The Client Secret for your Falcon API credentials |
193 | | `FALCON_BASE_URL` | The base URL for your Falcon API environment |
194 | | `AGENT_NAME` | The name of the agent (_ex: falconmcp_) |
195 | | `AGENT_DESCRIPTION` | A description of the agent |
196 | | `AGENT_ROLE_ARN` | The ARN of the IAM execution role created in Step 1 |
197 |
198 | With your IAM configuration complete and variables prepared, you can now return to the **AWS Marketplace listing** to complete the deployment of your Falcon MCP Server agent in Amazon Bedrock AgentCore.
199 |
200 | #### Example Deployment
201 |
202 | ```bash
203 | aws bedrock-agentcore-control create-agent-runtime \
204 | --region us-east-1 \
205 | --agent-runtime-name "falconmcp" \
206 | --description "Falcon MCP Server Agent" \
207 | --agent-runtime-artifact '{
208 | "containerConfiguration": {
209 | "containerUri": "709825985650.dkr.ecr.us-east-1.amazonaws.com/crowdstrike/falcon-mcp:0.1.1"
210 | }
211 | }' \
212 | --role-arn "arn:aws:iam::example:role/bedrock-core-falcon-role" \
213 | --network-configuration '{
214 | "networkMode": "PUBLIC"
215 | }' \
216 | --protocol-configuration '{
217 | "serverProtocol": "MCP"
218 | }' \
219 | --environment-variables '{
220 | "FALCON_CLIENT_ID": "FALCON_CLIENT_ID_VALUE",
221 | "FALCON_CLIENT_SECRET": "FALCON_CLIENT_SECRET_VALUE",
222 | "FALCON_BASE_URL": "https://api.crowdstrike.com"
223 | }'
224 | ```
225 |
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_intel.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | E2E tests for the Intel module.
3 | """
4 |
5 | import unittest
6 |
7 | import pytest
8 |
9 | from tests.e2e.utils.base_e2e_test import BaseE2ETest, ensure_dict
10 |
11 |
12 | @pytest.mark.e2e
13 | class TestIntelModuleE2E(BaseE2ETest):
14 | """
15 | End-to-end test suite for the Falcon MCP Server Intel Module.
16 | """
17 |
18 | def test_search_actors_with_filter(self):
19 | """Verify the agent can search for actors with a filter."""
20 |
21 | async def test_logic():
22 | fixtures = [
23 | {
24 | "operation": "QueryIntelActorEntities",
25 | "validator": lambda kwargs: "animal_classifier:'BEAR'"
26 | in kwargs.get("parameters", {}).get("filter", ""),
27 | "response": {
28 | "status_code": 200,
29 | "body": {
30 | "resources": [
31 | {
32 | "id": "actor-1",
33 | "animal_classifier": "BEAR",
34 | "short_description": "Actor ELDERLY BEAR",
35 | },
36 | {
37 | "id": "actor-2",
38 | "animal_classifier": "BEAR",
39 | "short_description": "Actor CONSTANT BEAR",
40 | },
41 | ]
42 | },
43 | },
44 | }
45 | ]
46 |
47 | self._mock_api_instance.command.side_effect = (
48 | self._create_mock_api_side_effect(fixtures)
49 | )
50 |
51 | prompt = "Find all threat actors with animal_classifier BEAR"
52 | return await self._run_agent_stream(prompt)
53 |
54 | def assertions(tools, result):
55 | self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
56 | used_tool = tools[len(tools) - 1]
57 | self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_actors")
58 |
59 | # Verify the tool input contains the filter
60 | tool_input = ensure_dict(used_tool["input"]["tool_input"])
61 | self.assertIn("animal_classifier", tool_input.get("filter", ""))
62 |
63 | # Verify API call parameters
64 | self.assertGreaterEqual(
65 | self._mock_api_instance.command.call_count,
66 | 1,
67 | "Expected at least 1 API call",
68 | )
69 | api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
70 | "parameters", {}
71 | )
72 | self.assertIn("animal_classifier:'BEAR'", api_call_params.get("filter", ""))
73 |
74 | # Verify result contains actor information
75 | self.assertIn("BEAR", result)
76 | self.assertIn("ELDERLY BEAR", result)
77 | self.assertIn("Actor CONSTANT BEAR", result)
78 |
79 | self.run_test_with_retries(
80 | "test_search_actors_with_filter", test_logic, assertions
81 | )
82 |
83 | def test_search_indicators_with_filter(self):
84 | """Verify the agent can search for indicators with a filter."""
85 |
86 | async def test_logic():
87 | fixtures = [
88 | {
89 | "operation": "QueryIntelIndicatorEntities",
90 | "validator": lambda kwargs: "type:'hash_sha256'"
91 | in kwargs.get("parameters", {}).get("filter", ""),
92 | "response": {
93 | "status_code": 200,
94 | "body": {
95 | "resources": [
96 | {"id": "indicator-1", "type": "hash_sha256"},
97 | {"id": "indicator-2", "type": "hash_sha256"},
98 | ]
99 | },
100 | },
101 | }
102 | ]
103 |
104 | self._mock_api_instance.command.side_effect = (
105 | self._create_mock_api_side_effect(fixtures)
106 | )
107 |
108 | prompt = "Find all indicators of type hash_sha256"
109 | return await self._run_agent_stream(prompt)
110 |
111 | def assertions(tools, result):
112 | self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
113 | used_tool = tools[len(tools) - 1]
114 | self.assertEqual(
115 | used_tool["input"]["tool_name"], "falcon_search_indicators"
116 | )
117 |
118 | # Verify the tool input contains the filter
119 | tool_input = ensure_dict(used_tool["input"]["tool_input"])
120 | self.assertIn("hash_sha256", tool_input.get("filter", ""))
121 |
122 | # Verify API call parameters
123 | self.assertGreaterEqual(
124 | self._mock_api_instance.command.call_count,
125 | 1,
126 | "Expected at least 1 API call",
127 | )
128 | api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
129 | "parameters", {}
130 | )
131 | self.assertIn("type:'hash_sha256'", api_call_params.get("filter", ""))
132 |
133 | # Verify result contains indicator information
134 | self.assertIn("indicator-1", result)
135 | self.assertIn("indicator-2", result)
136 | self.assertIn("hash_sha256", result)
137 |
138 | self.run_test_with_retries(
139 | "test_search_indicators_with_filter", test_logic, assertions
140 | )
141 |
142 | def test_search_reports_with_filter(self):
143 | """Verify the agent can search for reports with a filter."""
144 |
145 | async def test_logic():
146 | fixtures = [
147 | {
148 | "operation": "QueryIntelReportEntities",
149 | "validator": lambda kwargs: "slug:'malware-analysis-report-1'"
150 | in kwargs.get("parameters", {}).get("filter", ""),
151 | "response": {
152 | "status_code": 200,
153 | "body": {
154 | "resources": [
155 | {
156 | "id": "report-1",
157 | "name": "Malware Analysis Report 1",
158 | "slug": "malware-analysis-report-1",
159 | },
160 | ]
161 | },
162 | },
163 | }
164 | ]
165 |
166 | self._mock_api_instance.command.side_effect = (
167 | self._create_mock_api_side_effect(fixtures)
168 | )
169 |
170 | prompt = "Find report with slug malware-analysis-report-1"
171 | return await self._run_agent_stream(prompt)
172 |
173 | def assertions(tools, result):
174 | self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
175 | used_tool = tools[len(tools) - 1]
176 | self.assertEqual(used_tool["input"]["tool_name"], "falcon_search_reports")
177 |
178 | # Verify the tool input contains the filter
179 | tool_input = ensure_dict(used_tool["input"]["tool_input"])
180 | self.assertIn("slug", tool_input.get("filter", ""))
181 |
182 | # Verify API call parameters
183 | self.assertGreaterEqual(
184 | self._mock_api_instance.command.call_count,
185 | 1,
186 | "Expected at least 1 API call",
187 | )
188 | api_call_params = self._mock_api_instance.command.call_args_list[0][1].get(
189 | "parameters", {}
190 | )
191 | self.assertIn(
192 | "slug:'malware-analysis-report-1'", api_call_params.get("filter", "")
193 | )
194 |
195 | # Verify result contains report information
196 | self.assertIn("Malware Analysis Report 1", result)
197 |
198 | self.run_test_with_retries(
199 | "test_search_reports_with_filter", test_logic, assertions
200 | )
201 |
202 |
203 | if __name__ == "__main__":
204 | unittest.main()
205 |
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Falcon MCP server.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | from falcon_mcp import registry
9 | from falcon_mcp.server import FalconMCPServer
10 |
11 |
12 | class TestFalconMCPServer(unittest.TestCase):
13 | """Test cases for the Falcon MCP server."""
14 |
15 | def setUp(self):
16 | """Set up test fixtures before each test method."""
17 | # Ensure modules are discovered before each test
18 | registry.discover_modules()
19 |
20 | @patch("falcon_mcp.server.FalconClient")
21 | @patch("falcon_mcp.server.FastMCP")
22 | def test_server_initialization(self, mock_fastmcp, mock_client):
23 | """Test server initialization with default settings."""
24 | # Setup mocks
25 | mock_client_instance = MagicMock()
26 | mock_client_instance.authenticate.return_value = True
27 | mock_client.return_value = mock_client_instance
28 |
29 | mock_server_instance = MagicMock()
30 | mock_fastmcp.return_value = mock_server_instance
31 |
32 | # Create server
33 | server = FalconMCPServer(
34 | base_url="https://api.test.crowdstrike.com",
35 | debug=True,
36 | )
37 |
38 | # Verify client initialization with direct parameters
39 | mock_client.assert_called_once()
40 | # Extract the arguments
41 | call_args = mock_client.call_args[1]
42 | self.assertEqual(call_args["base_url"], "https://api.test.crowdstrike.com")
43 | self.assertTrue(call_args["debug"])
44 |
45 | # Verify authentication
46 | mock_client_instance.authenticate.assert_called_once()
47 |
48 | # Verify server initialization
49 | mock_fastmcp.assert_called_once_with(
50 | name="Falcon MCP Server",
51 | instructions="This server provides access to CrowdStrike Falcon capabilities.",
52 | debug=True,
53 | log_level="DEBUG",
54 | )
55 |
56 | # Verify modules initialization
57 | available_module_names = registry.get_module_names()
58 | self.assertEqual(len(server.modules), len(available_module_names))
59 | for module_name in available_module_names:
60 | self.assertIn(module_name, server.modules)
61 |
62 | @patch("falcon_mcp.server.FalconClient")
63 | @patch("falcon_mcp.server.FastMCP")
64 | def test_server_with_specific_modules(self, mock_fastmcp, mock_client):
65 | """Test server initialization with specific modules."""
66 | # Setup mocks
67 | mock_client_instance = MagicMock()
68 | mock_client_instance.authenticate.return_value = True
69 | mock_client.return_value = mock_client_instance
70 |
71 | mock_server_instance = MagicMock()
72 | mock_fastmcp.return_value = mock_server_instance
73 |
74 | # Create server with only the detections module
75 | server = FalconMCPServer(enabled_modules={"detections"})
76 |
77 | # Verify modules initialization
78 | self.assertEqual(len(server.modules), 1)
79 | self.assertIn("detections", server.modules)
80 |
81 | @patch("falcon_mcp.server.FalconClient")
82 | def test_authentication_failure(self, mock_client):
83 | """Test server initialization with authentication failure."""
84 | # Setup mock
85 | mock_client_instance = MagicMock()
86 | mock_client_instance.authenticate.return_value = False
87 | mock_client.return_value = mock_client_instance
88 |
89 | # Verify authentication failure raises RuntimeError
90 | with self.assertRaises(RuntimeError):
91 | FalconMCPServer()
92 |
93 | @patch("falcon_mcp.server.FalconClient")
94 | def test_falcon_check_connectivity(self, mock_client):
95 | """Test checking Falcon API connectivity."""
96 | # Setup mock
97 | mock_client_instance = MagicMock()
98 | mock_client_instance.is_authenticated.return_value = True
99 | mock_client.return_value = mock_client_instance
100 | mock_client_instance.authenticate.return_value = True
101 |
102 | # Create server with mock client
103 | server = FalconMCPServer()
104 |
105 | # Call falcon_check_connectivity
106 | result = server.falcon_check_connectivity()
107 |
108 | # Verify client method was called
109 | mock_client_instance.is_authenticated.assert_called_once()
110 |
111 | # Verify result
112 | expected_result = {"connected": True}
113 | self.assertEqual(result, expected_result)
114 |
115 | @patch("falcon_mcp.server.FalconClient")
116 | def test_list_enabled_modules(self, mock_client):
117 | """Test listing enabled modules."""
118 | # Setup mock
119 | mock_client_instance = MagicMock()
120 | mock_client_instance.authenticate.return_value = True
121 | mock_client.return_value = mock_client_instance
122 |
123 | # Create server
124 | server = FalconMCPServer()
125 |
126 | # Call list_enabled_modules
127 | result = server.list_enabled_modules()
128 |
129 | # Get the actual module names from the registry
130 | expected_modules = registry.get_module_names()
131 |
132 | # Verify result matches registry (since all modules are enabled by default)
133 | self.assertEqual(set(result["modules"]), set(expected_modules))
134 |
135 | @patch("falcon_mcp.server.FalconClient")
136 | def test_list_enabled_modules_with_limited_modules(self, mock_client):
137 | """Test listing enabled modules with limited module set."""
138 | # Setup mock
139 | mock_client_instance = MagicMock()
140 | mock_client_instance.authenticate.return_value = True
141 | mock_client.return_value = mock_client_instance
142 |
143 | # Create server with only specific modules
144 | server = FalconMCPServer(enabled_modules={"detections", "cloud"})
145 |
146 | # Call list_enabled_modules
147 | result = server.list_enabled_modules()
148 |
149 | # Should only return enabled modules
150 | self.assertEqual(set(result["modules"]), {"detections", "cloud"})
151 |
152 | # Verify return type is correct
153 | self.assertIsInstance(result["modules"], list)
154 |
155 | # Verify each module name is a string
156 | for module_name in result["modules"]:
157 | self.assertIsInstance(module_name, str)
158 |
159 | @patch("falcon_mcp.server.FalconClient")
160 | def test_list_modules(self, mock_client):
161 | """Test listing all available modules."""
162 | # Setup mock
163 | mock_client_instance = MagicMock()
164 | mock_client_instance.authenticate.return_value = True
165 | mock_client.return_value = mock_client_instance
166 |
167 | # Create server with limited modules
168 | server = FalconMCPServer(enabled_modules={"detections", "cloud"})
169 |
170 | # Call list_modules
171 | result = server.list_modules()
172 |
173 | # Should return ALL modules from registry regardless of what's enabled
174 | expected_modules = registry.get_module_names()
175 | self.assertEqual(set(result["modules"]), set(expected_modules))
176 |
177 | # Verify return type is correct
178 | self.assertIsInstance(result["modules"], list)
179 |
180 | # Verify each module name is a string
181 | for module_name in result["modules"]:
182 | self.assertIsInstance(module_name, str)
183 |
184 | @patch("falcon_mcp.server.FalconClient")
185 | def test_list_modules_consistency(self, mock_client):
186 | """Test that list_modules always returns the same result."""
187 | # Setup mock
188 | mock_client_instance = MagicMock()
189 | mock_client_instance.authenticate.return_value = True
190 | mock_client.return_value = mock_client_instance
191 |
192 | # Create two servers with different enabled modules
193 | server1 = FalconMCPServer(enabled_modules={"detections"})
194 | server2 = FalconMCPServer(enabled_modules={"cloud", "intel"})
195 |
196 | # Both should return the same available modules
197 | result1 = server1.list_modules()
198 | result2 = server2.list_modules()
199 |
200 | self.assertEqual(set(result1["modules"]), set(result2["modules"]))
201 |
202 | # And both should match the registry
203 | expected_modules = registry.get_module_names()
204 | self.assertEqual(set(result1["modules"]), set(expected_modules))
205 |
206 |
207 | if __name__ == "__main__":
208 | unittest.main()
209 |
```
--------------------------------------------------------------------------------
/tests/modules/test_detections.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Detections module.
3 | """
4 |
5 | import unittest
6 |
7 | from falcon_mcp.modules.detections import DetectionsModule
8 | from tests.modules.utils.test_modules import TestModules
9 |
10 |
11 | class TestDetectionsModule(TestModules):
12 | """Test cases for the Detections module."""
13 |
14 | def setUp(self):
15 | """Set up test fixtures."""
16 | self.setup_module(DetectionsModule)
17 |
18 | def test_register_tools(self):
19 | """Test registering tools with the server."""
20 | expected_tools = [
21 | "falcon_search_detections",
22 | "falcon_get_detection_details",
23 | ]
24 | self.assert_tools_registered(expected_tools)
25 |
26 | def test_register_resources(self):
27 | """Test registering resources with the server."""
28 | expected_resources = [
29 | "falcon_search_detections_fql_guide",
30 | ]
31 | self.assert_resources_registered(expected_resources)
32 |
33 | def test_search_detections(self):
34 | """Test searching for detections."""
35 | # Setup mock responses for both API calls
36 | query_response = {
37 | "status_code": 200,
38 | "body": {"resources": ["detection1", "detection2"]},
39 | }
40 | details_response = {
41 | "status_code": 200,
42 | "body": {"resources": []}, # Empty resources for PostEntitiesAlertsV2
43 | }
44 | self.mock_client.command.side_effect = [query_response, details_response]
45 |
46 | # Call search_detections
47 | result = self.module.search_detections(
48 | filter="test query", limit=10, include_hidden=True
49 | )
50 |
51 | # Verify client commands were called correctly
52 | self.assertEqual(self.mock_client.command.call_count, 2)
53 |
54 | # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
55 | first_call = self.mock_client.command.call_args_list[0]
56 | self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
57 | self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
58 | self.assertEqual(first_call[1]["parameters"]["limit"], 10)
59 | self.mock_client.command.assert_any_call(
60 | "PostEntitiesAlertsV2",
61 | body={
62 | "composite_ids": ["detection1", "detection2"],
63 | "include_hidden": True,
64 | },
65 | )
66 |
67 | # Verify result
68 | self.assertEqual(
69 | result, []
70 | ) # Empty list because PostEntitiesAlertsV2 returned empty resources
71 |
72 | def test_search_detections_with_details(self):
73 | """Test searching for detections with details."""
74 | # Setup mock responses
75 | query_response = {
76 | "status_code": 200,
77 | "body": {"resources": ["detection1", "detection2"]},
78 | }
79 | details_response = {
80 | "status_code": 200,
81 | "body": {
82 | "resources": [
83 | {"id": "detection1", "name": "Test Detection 1"},
84 | {"id": "detection2", "name": "Test Detection 2"},
85 | ]
86 | },
87 | }
88 | self.mock_client.command.side_effect = [query_response, details_response]
89 |
90 | # Call search_detections
91 | result = self.module.search_detections(
92 | filter="test query", limit=10, include_hidden=True
93 | )
94 |
95 | # Verify client commands were called correctly
96 | self.assertEqual(self.mock_client.command.call_count, 2)
97 |
98 | # Check that the first call was to GetQueriesAlertsV2 with the right filter and limit
99 | first_call = self.mock_client.command.call_args_list[0]
100 | self.assertEqual(first_call[0][0], "GetQueriesAlertsV2")
101 | self.assertEqual(first_call[1]["parameters"]["filter"], "test query")
102 | self.assertEqual(first_call[1]["parameters"]["limit"], 10)
103 | self.mock_client.command.assert_any_call(
104 | "PostEntitiesAlertsV2",
105 | body={
106 | "composite_ids": ["detection1", "detection2"],
107 | "include_hidden": True,
108 | },
109 | )
110 |
111 | # Verify result
112 | expected_result = [
113 | {"id": "detection1", "name": "Test Detection 1"},
114 | {"id": "detection2", "name": "Test Detection 2"},
115 | ]
116 | self.assertEqual(result, expected_result)
117 |
118 | def test_search_detections_error(self):
119 | """Test searching for detections with API error."""
120 | # Setup mock response with error
121 | mock_response = {
122 | "status_code": 400,
123 | "body": {"errors": [{"message": "Invalid query"}]},
124 | }
125 | self.mock_client.command.return_value = mock_response
126 |
127 | # Call search_detections
128 | result = self.module.search_detections(filter="invalid query")
129 |
130 | # Verify result contains error
131 | self.assertEqual(len(result), 1)
132 | self.assertIn("error", result[0])
133 | self.assertIn("details", result[0])
134 |
135 | def test_get_detection_details(self):
136 | """Test getting detection details."""
137 | # Setup mock response
138 | mock_response = {
139 | "status_code": 200,
140 | "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
141 | }
142 | self.mock_client.command.return_value = mock_response
143 |
144 | # Call get_detection_details
145 | result = self.module.get_detection_details(["detection1"], include_hidden=True)
146 |
147 | # Verify client command was called correctly
148 | self.mock_client.command.assert_called_once_with(
149 | "PostEntitiesAlertsV2",
150 | body={"composite_ids": ["detection1"], "include_hidden": True},
151 | )
152 |
153 | # Verify result - handle_api_response returns a list of resources
154 | expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
155 | self.assertEqual(result, expected_result)
156 |
157 | def test_get_detection_details_not_found(self):
158 | """Test getting detection details for non-existent detection."""
159 | # Setup mock response with empty resources
160 | mock_response = {"status_code": 200, "body": {"resources": []}}
161 | self.mock_client.command.return_value = mock_response
162 |
163 | # Call get_detection_details
164 | result = self.module.get_detection_details(["nonexistent"])
165 |
166 | # For empty resources, handle_api_response returns the default_result (empty list)
167 | # We should check that the result is empty
168 | self.assertEqual(result, [])
169 |
170 | def test_search_detections_include_hidden_false(self):
171 | """Test searching for detections with include_hidden=False."""
172 | # Setup mock responses for both API calls
173 | query_response = {
174 | "status_code": 200,
175 | "body": {"resources": ["detection1", "detection2"]},
176 | }
177 | details_response = {
178 | "status_code": 200,
179 | "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
180 | }
181 | self.mock_client.command.side_effect = [query_response, details_response]
182 |
183 | # Call search_detections with include_hidden=False
184 | result = self.module.search_detections(
185 | filter="test query", include_hidden=False
186 | )
187 |
188 | # Verify client commands were called correctly
189 | self.assertEqual(self.mock_client.command.call_count, 2)
190 |
191 | # Check that the second call includes include_hidden=False
192 | self.mock_client.command.assert_any_call(
193 | "PostEntitiesAlertsV2",
194 | body={
195 | "composite_ids": ["detection1", "detection2"],
196 | "include_hidden": False,
197 | },
198 | )
199 |
200 | # Verify result
201 | expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
202 | self.assertEqual(result, expected_result)
203 |
204 | def test_get_detection_details_include_hidden_false(self):
205 | """Test getting detection details with include_hidden=False."""
206 | # Setup mock response
207 | mock_response = {
208 | "status_code": 200,
209 | "body": {"resources": [{"id": "detection1", "name": "Test Detection 1"}]},
210 | }
211 | self.mock_client.command.return_value = mock_response
212 |
213 | # Call get_detection_details with include_hidden=False
214 | result = self.module.get_detection_details(["detection1"], include_hidden=False)
215 |
216 | # Verify client command was called correctly with include_hidden=False
217 | self.mock_client.command.assert_called_once_with(
218 | "PostEntitiesAlertsV2",
219 | body={"composite_ids": ["detection1"], "include_hidden": False},
220 | )
221 |
222 | # Verify result
223 | expected_result = [{"id": "detection1", "name": "Test Detection 1"}]
224 | self.assertEqual(result, expected_result)
225 |
226 |
227 | if __name__ == "__main__":
228 | unittest.main()
229 |
```
--------------------------------------------------------------------------------
/falcon_mcp/modules/discover.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Discover module for Falcon MCP Server
3 |
4 | This module provides tools for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets.
5 | """
6 |
7 | from textwrap import dedent
8 | from typing import Any, Dict, List
9 |
10 | from mcp.server import FastMCP
11 | from mcp.server.fastmcp.resources import TextResource
12 | from pydantic import AnyUrl, Field
13 |
14 | from falcon_mcp.common.errors import handle_api_response
15 | from falcon_mcp.common.logging import get_logger
16 | from falcon_mcp.common.utils import prepare_api_parameters
17 | from falcon_mcp.modules.base import BaseModule
18 | from falcon_mcp.resources.discover import (
19 | SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
20 | SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
21 | )
22 |
23 | logger = get_logger(__name__)
24 |
25 |
26 | class DiscoverModule(BaseModule):
27 | """Module for accessing and managing CrowdStrike Falcon Discover applications and unmanaged assets."""
28 |
29 | def register_tools(self, server: FastMCP) -> None:
30 | """Register tools with the MCP server.
31 |
32 | Args:
33 | server: MCP server instance
34 | """
35 | # Register tools
36 | self._add_tool(
37 | server=server,
38 | method=self.search_applications,
39 | name="search_applications",
40 | )
41 |
42 | self._add_tool(
43 | server=server,
44 | method=self.search_unmanaged_assets,
45 | name="search_unmanaged_assets",
46 | )
47 |
48 | def register_resources(self, server: FastMCP) -> None:
49 | """Register resources with the MCP server.
50 |
51 | Args:
52 | server: MCP server instance
53 | """
54 | search_applications_fql_resource = TextResource(
55 | uri=AnyUrl("falcon://discover/applications/fql-guide"),
56 | name="falcon_search_applications_fql_guide",
57 | description="Contains the guide for the `filter` param of the `falcon_search_applications` tool.",
58 | text=SEARCH_APPLICATIONS_FQL_DOCUMENTATION,
59 | )
60 |
61 | search_unmanaged_assets_fql_resource = TextResource(
62 | uri=AnyUrl("falcon://discover/hosts/fql-guide"),
63 | name="falcon_search_unmanaged_assets_fql_guide",
64 | description="Contains the guide for the `filter` param of the `falcon_search_unmanaged_assets` tool.",
65 | text=SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION,
66 | )
67 |
68 | self._add_resource(
69 | server,
70 | search_applications_fql_resource,
71 | )
72 |
73 | self._add_resource(
74 | server,
75 | search_unmanaged_assets_fql_resource,
76 | )
77 |
78 | def search_applications(
79 | self,
80 | filter: str = Field(
81 | description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/applications/fql-guide` resource when building this filter parameter.",
82 | examples={"name:'Chrome'", "vendor:'Microsoft Corporation'"},
83 | ),
84 | facet: str | None = Field(
85 | default=None,
86 | description=dedent("""
87 | Type of data to be returned for each application entity. The facet filter allows you to limit the response to just the information you want.
88 |
89 | Possible values:
90 | • browser_extension
91 | • host_info
92 | • install_usage
93 |
94 | Note: Requests that do not include the host_info or browser_extension facets still return host.ID, browser_extension.ID, and browser_extension.enabled in the response.
95 | """).strip(),
96 | examples={"browser_extension", "host_info", "install_usage"},
97 | ),
98 | limit: int = Field(
99 | default=100,
100 | ge=1,
101 | le=1000,
102 | description="Maximum number of items to return: 1-1000. Default is 100.",
103 | ),
104 | sort: str | None = Field(
105 | default=None,
106 | description="Property used to sort the results. All properties can be used to sort unless otherwise noted in their property descriptions.",
107 | examples={"name.asc", "vendor.desc", "last_updated_timestamp.desc"},
108 | ),
109 | ) -> List[Dict[str, Any]]:
110 | """Search for applications in your CrowdStrike environment.
111 |
112 | IMPORTANT: You must use the `falcon://discover/applications/fql-guide` resource when you need to use the `filter` parameter.
113 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_applications` tool.
114 | """
115 | # Prepare parameters for combined_applications
116 | params = prepare_api_parameters(
117 | {
118 | "filter": filter,
119 | "facet": facet,
120 | "limit": limit,
121 | "sort": sort,
122 | }
123 | )
124 |
125 | # Define the operation name
126 | operation = "combined_applications"
127 |
128 | logger.debug("Searching applications with params: %s", params)
129 |
130 | # Make the API request
131 | response = self.client.command(operation, parameters=params)
132 |
133 | # Use handle_api_response to get application data
134 | applications = handle_api_response(
135 | response,
136 | operation=operation,
137 | error_message="Failed to search applications",
138 | default_result=[],
139 | )
140 |
141 | # If handle_api_response returns an error dict instead of a list,
142 | # it means there was an error, so we return it wrapped in a list
143 | if self._is_error(applications):
144 | return [applications]
145 |
146 | return applications
147 |
148 | def search_unmanaged_assets(
149 | self,
150 | filter: str | None = Field(
151 | default=None,
152 | description="FQL filter expression used to limit the results. IMPORTANT: use the `falcon://discover/hosts/fql-guide` resource when building this filter parameter. Note: entity_type:'unmanaged' is automatically applied.",
153 | examples={"platform_name:'Windows'", "criticality:'Critical'"},
154 | ),
155 | limit: int = Field(
156 | default=100,
157 | ge=1,
158 | le=5000,
159 | description="Maximum number of items to return: 1-5000. Default is 100.",
160 | ),
161 | offset: int | None = Field(
162 | default=None,
163 | description="Starting index of overall result set from which to return results.",
164 | ),
165 | sort: str | None = Field(
166 | default=None,
167 | description=dedent("""
168 | Sort unmanaged assets using these options:
169 |
170 | hostname: Host name/computer name
171 | last_seen_timestamp: Timestamp when the asset was last seen
172 | first_seen_timestamp: Timestamp when the asset was first seen
173 | platform_name: Operating system platform
174 | os_version: Operating system version
175 | external_ip: External IP address
176 | country: Country location
177 | criticality: Criticality level
178 |
179 | Sort either asc (ascending) or desc (descending).
180 | Both formats are supported: 'hostname.desc' or 'hostname|desc'
181 |
182 | Examples: 'hostname.asc', 'last_seen_timestamp.desc', 'criticality.desc'
183 | """).strip(),
184 | examples={"hostname.asc", "last_seen_timestamp.desc", "criticality.desc"},
185 | ),
186 | ) -> List[Dict[str, Any]]:
187 | """Search for unmanaged assets (hosts) in your CrowdStrike environment.
188 |
189 | These are systems that do not have the Falcon sensor installed but have been
190 | discovered by systems that do have a Falcon sensor installed.
191 |
192 | IMPORTANT: You must use the `falcon://discover/hosts/fql-guide` resource when you need to use the `filter` parameter.
193 | This resource contains the guide on how to build the FQL `filter` parameter for the `falcon_search_unmanaged_assets` tool.
194 |
195 | The tool automatically filters for unmanaged assets only by adding entity_type:'unmanaged' to all queries.
196 | You do not need to (and cannot) specify entity_type in your filter - it is always set to 'unmanaged'.
197 | """
198 | # Always enforce entity_type:'unmanaged' filter
199 | base_filter = "entity_type:'unmanaged'"
200 |
201 | # Combine with user filter if provided
202 | if filter:
203 | combined_filter = f"{base_filter}+{filter}"
204 | else:
205 | combined_filter = base_filter
206 |
207 | # Prepare parameters for combined_hosts
208 | params = prepare_api_parameters(
209 | {
210 | "filter": combined_filter,
211 | "limit": limit,
212 | "offset": offset,
213 | "sort": sort,
214 | }
215 | )
216 |
217 | # Define the operation name
218 | operation = "combined_hosts"
219 |
220 | logger.debug("Searching unmanaged assets with params: %s", params)
221 |
222 | # Make the API request
223 | response = self.client.command(operation, parameters=params)
224 |
225 | # Use handle_api_response to get unmanaged asset data
226 | assets = handle_api_response(
227 | response,
228 | operation=operation,
229 | error_message="Failed to search unmanaged assets",
230 | default_result=[],
231 | )
232 |
233 | # If handle_api_response returns an error dict instead of a list,
234 | # it means there was an error, so we return it wrapped in a list
235 | if self._is_error(assets):
236 | return [assets]
237 |
238 | return assets
239 |
```
--------------------------------------------------------------------------------
/tests/modules/test_serverless.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Serverless module.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | from falcon_mcp.modules.serverless import ServerlessModule
9 | from tests.modules.utils.test_modules import TestModules
10 |
11 |
12 | class TestServerlessModule(TestModules):
13 | """Test cases for the Serverless module."""
14 |
15 | def setUp(self):
16 | """Set up test fixtures."""
17 | self.setup_module(ServerlessModule)
18 |
19 | def test_register_tools(self):
20 | """Test registering tools with the server."""
21 | expected_tools = [
22 | "falcon_search_serverless_vulnerabilities",
23 | ]
24 | self.assert_tools_registered(expected_tools)
25 |
26 | def test_register_resources(self):
27 | """Test registering resources with the server."""
28 | expected_resources = [
29 | "falcon_serverless_vulnerabilities_fql_guide",
30 | ]
31 | self.assert_resources_registered(expected_resources)
32 |
33 | def test_search_serverless_vulnerabilities_success(self):
34 | """Test searching serverless vulnerabilities with successful response."""
35 | # Setup mock response with sample vulnerability data
36 | mock_response = {
37 | "status_code": 200,
38 | "body": {
39 | "runs": [
40 | {
41 | "tool": {
42 | "driver": {
43 | "name": "CrowdStrike",
44 | "informationUri": "https://www.crowdstrike.com/",
45 | "rules": [
46 | {
47 | "id": "CVE-2023-12345",
48 | "name": "PythonPackageVulnerability",
49 | "shortDescription": {"text": "Test vulnerability description"},
50 | "fullDescription": {"text": "Test vulnerability full description"},
51 | "help": {"text": "Package: test-package\nVulnerability: CVE-2023-12345"},
52 | "properties": {
53 | "severity": "HIGH",
54 | "cvssBaseScore": 8.5,
55 | "remediations": ["Upgrade to version 2.0.0"]
56 | }
57 | }
58 | ]
59 | }
60 | }
61 | }
62 | ]
63 | },
64 | }
65 | self.mock_client.command.return_value = mock_response
66 |
67 | # Mock the prepare_api_parameters function to return a simple dict
68 | self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response["body"]["runs"])
69 |
70 | # Call search_serverless_vulnerabilities with test parameters
71 | result = self.module.search_serverless_vulnerabilities(
72 | filter="cloud_provider:'aws'"
73 | )
74 |
75 | # Verify result contains expected values
76 | self.assertEqual(len(result), 1)
77 | self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")
78 | self.assertEqual(
79 | result[0]["tool"]["driver"]["rules"][0]["id"], "CVE-2023-12345"
80 | )
81 | self.assertEqual(
82 | result[0]["tool"]["driver"]["rules"][0]["properties"]["severity"], "HIGH"
83 | )
84 |
85 | def test_search_serverless_vulnerabilities_no_filter(self):
86 | """Test searching serverless vulnerabilities with no filter parameter."""
87 | # In the serverless module, filter is a required parameter with no default
88 | # So we should test that it's properly required
89 |
90 | # Create a mock implementation that raises TypeError when filter is not provided
91 | def mock_search(*args, **kwargs):
92 | if "filter" not in kwargs:
93 | raise TypeError("filter is a required parameter")
94 | return []
95 |
96 | # Replace the method with our mock
97 | self.module.search_serverless_vulnerabilities = mock_search
98 |
99 | # This should raise a TypeError since filter is a required parameter
100 | with self.assertRaises(TypeError):
101 | self.module.search_serverless_vulnerabilities()
102 |
103 | def test_search_serverless_vulnerabilities_empty_response(self):
104 | """Test searching serverless vulnerabilities with empty response."""
105 | # Mock the method to return empty runs
106 | self.module.search_serverless_vulnerabilities = MagicMock(return_value=[])
107 |
108 | # Call search_serverless_vulnerabilities
109 | result = self.module.search_serverless_vulnerabilities(
110 | filter="cloud_provider:'aws'"
111 | )
112 |
113 | # Verify result is an empty list
114 | self.assertEqual(result, [])
115 |
116 | def test_search_serverless_vulnerabilities_error(self):
117 | """Test searching serverless vulnerabilities with API error."""
118 | # Setup mock error response
119 | error_response = {
120 | "error": "Failed to search serverless vulnerabilities: Request failed with status code 400",
121 | "details": {"errors": [{"message": "Invalid query"}]},
122 | }
123 |
124 | # Mock the method to return the error
125 | self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])
126 |
127 | # Call search_serverless_vulnerabilities
128 | results = self.module.search_serverless_vulnerabilities(
129 | filter="invalid query"
130 | )
131 | result = results[0]
132 |
133 | # Verify result contains error
134 | self.assertIn("error", result)
135 | self.assertIn("details", result)
136 | # Check that the error message starts with the expected prefix
137 | self.assertTrue(
138 | result["error"].startswith("Failed to search serverless vulnerabilities")
139 | )
140 |
141 | def test_search_serverless_vulnerabilities_with_all_params(self):
142 | """Test searching serverless vulnerabilities with all parameters."""
143 | # Setup mock response
144 | mock_response = [
145 | {
146 | "tool": {
147 | "driver": {
148 | "name": "CrowdStrike",
149 | "rules": [
150 | {
151 | "id": "CVE-2023-12345",
152 | "properties": {"severity": "HIGH"}
153 | }
154 | ]
155 | }
156 | }
157 | }
158 | ]
159 |
160 | # Mock the method to return the mock response
161 | self.module.search_serverless_vulnerabilities = MagicMock(return_value=mock_response)
162 |
163 | # Call search_serverless_vulnerabilities with all parameters
164 | result = self.module.search_serverless_vulnerabilities(
165 | filter="cloud_provider:'aws'",
166 | limit=5,
167 | offset=10,
168 | sort="severity",
169 | )
170 |
171 | # Verify result contains expected values
172 | self.assertEqual(len(result), 1)
173 | self.assertEqual(result[0]["tool"]["driver"]["name"], "CrowdStrike")
174 |
175 | def test_search_serverless_vulnerabilities_missing_runs(self):
176 | """Test searching serverless vulnerabilities with missing runs in response."""
177 | # Setup mock response with missing runs
178 | error_response = {
179 | "error": "Failed to search serverless vulnerabilities: Missing 'runs' in response",
180 | "details": {"body": {}}
181 | }
182 |
183 | # Mock the method to return the error
184 | self.module.search_serverless_vulnerabilities = MagicMock(return_value=[error_response])
185 |
186 | # Call search_serverless_vulnerabilities
187 | result = self.module.search_serverless_vulnerabilities(
188 | filter="cloud_provider:'aws'"
189 | )
190 |
191 | # Verify result is a list with one item containing error info
192 | self.assertEqual(len(result), 1)
193 | self.assertIn("error", result[0])
194 |
195 | def test_search_serverless_vulnerabilities_none_runs(self):
196 | """Test searching serverless vulnerabilities when 'runs' key exists but is None."""
197 | # We need to mock handle_api_response to return a dict with runs=None
198 | # This simulates the case where the API returns a response with runs=None
199 |
200 | # Create a mock response that handle_api_response will process
201 | mock_api_response = {
202 | "status_code": 200,
203 | "body": {
204 | # The actual structure doesn't matter as we'll mock handle_api_response
205 | }
206 | }
207 | self.mock_client.command.return_value = mock_api_response
208 |
209 | # Mock handle_api_response to return a dict with runs=None
210 | mock_processed_response = {"runs": None} # This is what we want to test
211 |
212 | with patch('falcon_mcp.modules.serverless.handle_api_response', return_value=mock_processed_response):
213 | # Call search_serverless_vulnerabilities
214 | result = self.module.search_serverless_vulnerabilities(
215 | filter="cloud_provider:'aws'"
216 | )
217 |
218 | # Verify result is an empty list
219 | self.assertEqual(result, [])
220 |
221 | # Verify the API was called with the correct parameters
222 | self.mock_client.command.assert_called_once()
223 | call_args = self.mock_client.command.call_args[1]
224 | self.assertEqual(call_args["parameters"]["filter"], "cloud_provider:'aws'")
225 |
226 |
227 | if __name__ == "__main__":
228 | unittest.main()
229 |
```
--------------------------------------------------------------------------------
/tests/e2e/modules/test_detections.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | E2E tests for the Detections module.
3 | """
4 |
5 | import json
6 | import unittest
7 |
8 | import pytest
9 |
10 | from tests.e2e.utils.base_e2e_test import BaseE2ETest
11 |
12 |
13 | @pytest.mark.e2e
14 | class TestDetectionsModuleE2E(BaseE2ETest):
15 | """
16 | End-to-end test suite for the Falcon MCP Server Detections Module.
17 | """
18 |
19 | def test_get_top_3_high_severity_detections(self):
20 | """Verify the agent can retrieve the top 3 high-severity detections."""
21 |
22 | async def test_logic():
23 | fixtures = [
24 | {
25 | "operation": "GetQueriesAlertsV2",
26 | "validator": lambda kwargs: "severity:"
27 | in kwargs.get("parameters", {}).get("filter", "").lower()
28 | and kwargs.get("parameters", {}).get("limit", 0) <= 10,
29 | "response": {
30 | "status_code": 200,
31 | "body": {
32 | "resources": ["detection-1", "detection-2", "detection-3"]
33 | },
34 | },
35 | },
36 | {
37 | "operation": "PostEntitiesAlertsV2",
38 | "validator": lambda kwargs: "detection-1"
39 | in kwargs.get("body", {}).get("composite_ids", []),
40 | "response": {
41 | "status_code": 200,
42 | "body": {
43 | "resources": [
44 | {
45 | "id": "detection-1",
46 | "composite_id": "detection-1",
47 | "status": "new",
48 | "severity": 90,
49 | "severity_name": "Critical",
50 | "confidence": 85,
51 | "description": "A critical detection for E2E testing.",
52 | "created_timestamp": "2024-01-20T10:00:00Z",
53 | "agent_id": "test-agent-001",
54 | },
55 | {
56 | "id": "detection-2",
57 | "composite_id": "detection-2",
58 | "status": "new",
59 | "severity": 70,
60 | "severity_name": "High",
61 | "confidence": 80,
62 | "description": "A high severity detection for E2E testing.",
63 | "created_timestamp": "2024-01-20T09:30:00Z",
64 | "agent_id": "test-agent-002",
65 | },
66 | {
67 | "id": "detection-3",
68 | "composite_id": "detection-3",
69 | "status": "new",
70 | "severity": 70,
71 | "severity_name": "High",
72 | "confidence": 75,
73 | "description": "Another high severity detection for E2E testing.",
74 | "created_timestamp": "2024-01-20T09:00:00Z",
75 | "agent_id": "test-agent-003",
76 | },
77 | ]
78 | },
79 | },
80 | },
81 | ]
82 |
83 | self._mock_api_instance.command.side_effect = (
84 | self._create_mock_api_side_effect(fixtures)
85 | )
86 |
87 | prompt = "Give me the details of the top 3 high severity detections, return only detection id and descriptions"
88 | return await self._run_agent_stream(prompt)
89 |
90 | def assertions(tools, result):
91 | self.assertGreaterEqual(len(tools), 1, "Expected 1 tool call")
92 | used_tool = tools[len(tools) - 1]
93 | self.assertEqual(
94 | used_tool["input"]["tool_name"], "falcon_search_detections"
95 | )
96 | # Check for severity-related filtering (numeric or text-based)
97 | tool_input_str = json.dumps(used_tool["input"]["tool_input"]).lower()
98 | self.assertTrue(
99 | "severity:" in tool_input_str or "high" in tool_input_str,
100 | f"Expected severity filtering in tool input: {tool_input_str}",
101 | )
102 | self.assertIn("detection-1", used_tool["output"])
103 | self.assertIn("detection-2", used_tool["output"])
104 | self.assertIn("detection-3", used_tool["output"])
105 |
106 | self.assertGreaterEqual(
107 | self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
108 | )
109 | api_call_1_params = self._mock_api_instance.command.call_args_list[0][
110 | 1
111 | ].get("parameters", {})
112 | filter_str = api_call_1_params.get("filter", "").lower()
113 | # Accept either numeric severity filters or text-based filters
114 | self.assertTrue(
115 | "severity:" in filter_str or "high" in filter_str,
116 | f"Expected severity filtering in API call: {filter_str}",
117 | )
118 | self.assertEqual(api_call_1_params.get("limit"), 3)
119 | self.assertIn("severity.desc", api_call_1_params.get("sort", ""))
120 | api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
121 | "body", {}
122 | )
123 | self.assertEqual(
124 | api_call_2_body.get("composite_ids"),
125 | ["detection-1", "detection-2", "detection-3"],
126 | )
127 |
128 | self.assertIn("detection-1", result)
129 | self.assertIn("detection-2", result)
130 | self.assertIn("detection-3", result)
131 |
132 | self.run_test_with_retries(
133 | "test_get_top_3_high_severity_detections",
134 | test_logic,
135 | assertions,
136 | )
137 |
138 | def test_get_highest_detection_for_ip(self):
139 | """Verify the agent can find the highest-severity detection for a specific IP."""
140 |
141 | async def test_logic():
142 | fixtures = [
143 | {
144 | "operation": "GetQueriesAlertsV2",
145 | "validator": lambda kwargs: "10.0.0.1"
146 | in kwargs.get("parameters", {}).get("filter", ""),
147 | "response": {
148 | "status_code": 200,
149 | "body": {"resources": ["detection-4"]},
150 | },
151 | },
152 | {
153 | "operation": "PostEntitiesAlertsV2",
154 | "validator": lambda kwargs: "detection-4"
155 | in kwargs.get("body", {}).get("composite_ids", []),
156 | "response": {
157 | "status_code": 200,
158 | "body": {
159 | "resources": [
160 | {
161 | "id": "detection-4",
162 | "composite_id": "detection-4",
163 | "status": "new",
164 | "severity": 90,
165 | "severity_name": "Critical",
166 | "confidence": 95,
167 | "description": "A critical detection on a specific IP.",
168 | "created_timestamp": "2024-01-20T11:00:00Z",
169 | "agent_id": "test-agent-004",
170 | "local_ip": "10.0.0.1",
171 | }
172 | ]
173 | },
174 | },
175 | },
176 | ]
177 |
178 | self._mock_api_instance.command.side_effect = (
179 | self._create_mock_api_side_effect(fixtures)
180 | )
181 |
182 | prompt = "What is the highest detection for the device with local_ip 10.0.0.1? Return the detection id as well"
183 | return await self._run_agent_stream(prompt)
184 |
185 | def assertions(tools, result):
186 | self.assertGreaterEqual(
187 | len(tools), 1, f"Expected 1 tool call, but got {len(tools)}"
188 | )
189 | used_tool = tools[len(tools) - 1]
190 | self.assertEqual(
191 | used_tool["input"]["tool_name"], "falcon_search_detections"
192 | )
193 | self.assertIn("10.0.0.1", json.dumps(used_tool["input"]["tool_input"]))
194 | self.assertIn("detection-4", used_tool["output"])
195 |
196 | self.assertGreaterEqual(
197 | self._mock_api_instance.command.call_count, 2, "Expected 2 API calls"
198 | )
199 | api_call_1_params = self._mock_api_instance.command.call_args_list[0][
200 | 1
201 | ].get("parameters", {})
202 | self.assertIn("10.0.0.1", api_call_1_params.get("filter"))
203 | api_call_2_body = self._mock_api_instance.command.call_args_list[1][1].get(
204 | "body", {}
205 | )
206 | self.assertEqual(api_call_2_body.get("composite_ids"), ["detection-4"])
207 |
208 | self.assertIn("detection-4", result)
209 | self.assertNotIn("detection-1", result)
210 |
211 | self.run_test_with_retries(
212 | "test_get_highest_detection_for_ip", test_logic, assertions
213 | )
214 |
215 |
216 | if __name__ == "__main__":
217 | unittest.main()
218 |
```
--------------------------------------------------------------------------------
/docs/resource_development.md:
--------------------------------------------------------------------------------
```markdown
1 | # Falcon MCP Server Resource Development Guide
2 |
3 | This guide provides instructions for implementing and registering resources for the Falcon MCP server.
4 |
5 | ## What are Resources?
6 |
7 | Resources in the Falcon MCP server represent data sources that can be accessed by the model. Unlike tools, which perform actions, resources provide context or information that the model can reference. Resources are particularly useful for:
8 |
9 | 1. Providing documentation or guides that the model can reference
10 | 2. Exposing structured data that tools can use
11 | 3. Making reference information available without requiring a tool call
12 |
13 | ## Resource Structure
14 |
15 | Each resource should:
16 |
17 | 1. Be created using an appropriate resource class (e.g., `TextResource`)
18 | 2. Have a unique URI that identifies it
19 | 3. Include a descriptive name and description
20 | 4. Be registered with the MCP server through a module's `register_resources` method
21 |
22 | ## Step-by-Step Implementation Guide
23 |
24 | ### 1. Create Resource Content
25 |
26 | First, define the content for your resource. This could be:
27 |
28 | - Documentation text in a separate file
29 | - Structured data in a Python dictionary
30 | - Reference information in a dedicated module
31 |
32 | For text-based resources, it's recommended to store the content in a separate file in the `falcon_mcp/resources` directory:
33 |
34 | ```python
35 | # falcon_mcp/resources/your_resource.py
36 | YOUR_RESOURCE_CONTENT = """
37 | Detailed documentation or reference information goes here.
38 | This can be multi-line text with formatting.
39 |
40 | ## Section Headers
41 |
42 | - Bullet points
43 | - And other formatting
44 |
45 | Code examples:
46 | ...
47 |
48 | """
49 |
50 | ```
51 |
52 | ### 2. Register Resources in Your Module
53 |
54 | In your module class, implement the `register_resources` method:
55 |
56 | ```python
57 | from mcp.server import FastMCP
58 | from mcp.server.fastmcp.resources import TextResource
59 | from pydantic import AnyUrl
60 |
61 | from ..resources.your_resource import YOUR_RESOURCE_CONTENT
62 | from .base import BaseModule
63 |
64 |
65 | class YourModule(BaseModule):
66 | """Your module description."""
67 |
68 | def register_tools(self, server: FastMCP) -> None:
69 | """Register tools with the MCP server."""
70 | # Tool registration code...
71 |
72 | def register_resources(self, server: FastMCP) -> None:
73 | """Register resources with the MCP server.
74 |
75 | Args:
76 | server: MCP server instance
77 | """
78 | your_resource = TextResource(
79 | uri=AnyUrl("falcon://your-module/resource-name"),
80 | name="your_resource_name",
81 | description="Description of what this resource provides.",
82 | text=YOUR_RESOURCE_CONTENT,
83 | )
84 |
85 | self._add_resource(
86 | server,
87 | your_resource,
88 | )
89 | ```
90 |
91 | ### 3. Resource URI Conventions
92 |
93 | Resource URIs should follow a consistent pattern:
94 |
95 | - Start with `falcon://` as the scheme
96 | - Include the module name as the first path segment
97 | - Use descriptive path segments for the resource
98 | - Use hyphens to separate words in path segments
99 |
100 | Examples:
101 |
102 | - `falcon://intel/query_actor_entities/fql-guide`
103 | - `falcon://detections/search/fql-guide`
104 | - `falcon://incidents/status-codes`
105 |
106 | ### 4. Resource Types
107 |
108 | The MCP server supports several resource types:
109 |
110 | #### TextResource
111 |
112 | Used for providing text-based documentation or reference information:
113 |
114 | ```python
115 | from mcp.server.fastmcp.resources import TextResource
116 | from pydantic import AnyUrl
117 |
118 | text_resource = TextResource(
119 | uri=AnyUrl("falcon://module/resource-name"),
120 | name="resource_name",
121 | description="Resource description",
122 | text="Resource content",
123 | )
124 | ```
125 |
126 | #### Other Resource Types
127 |
128 | Additional resource types may be available depending on the MCP server implementation. Consult the MCP server documentation for details on other resource types.
129 |
130 | ## Best Practices
131 |
132 | ### Resource Content
133 |
134 | 1. **Comprehensive Documentation**: Provide detailed information that covers all aspects of the topic
135 | 2. **Structured Format**: Use clear section headers, bullet points, and code examples
136 | 3. **Examples**: Include practical examples that demonstrate usage
137 | 4. **Consistent Style**: Follow a consistent documentation style across all resources
138 |
139 | ### Resource Registration
140 |
141 | 1. **Descriptive Names**: Use clear, descriptive names for resources
142 | 2. **Detailed Descriptions**: Provide informative descriptions that explain what the resource contains
143 | 3. **Logical Organization**: Group related resources together with consistent URI patterns
144 | 4. **Reference from Tools**: Reference resources in tool documentation where appropriate
145 |
146 | ### Resource Usage
147 |
148 | 1. **Tool Integration**: Design resources to complement tools by providing context or documentation
149 | 2. **Self-Contained**: Resources should be self-contained and not require additional context
150 | 3. **Versioning**: Consider versioning strategies for resources that may change over time
151 |
152 | ## Example: Intel Module Resources
153 |
154 | The Intel module provides a good example of resource implementation:
155 |
156 | ```python
157 | from mcp.server import FastMCP
158 | from mcp.server.fastmcp.resources import TextResource
159 | from pydantic import AnyUrl
160 |
161 | from ..resources.intel import QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION
162 | from .base import BaseModule
163 |
164 |
165 | class IntelModule(BaseModule):
166 | """Module for accessing and analyzing CrowdStrike Falcon intelligence data."""
167 |
168 | def register_resources(self, server) -> None:
169 | """Register resources with the MCP server.
170 |
171 | Args:
172 | server: MCP server instance
173 | """
174 |
175 | query_actor_entities_resource = TextResource(
176 | uri=AnyUrl("falcon://intel/query_actor_entities/fql-guide"),
177 | name="falcon_query_actor_entities_fql_guide",
178 | description="Contains the guide for the `filter` param of the `falcon_search_actors` tool.",
179 | text=QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION,
180 | )
181 |
182 | self._add_resource(
183 | server,
184 | query_actor_entities_resource,
185 | )
186 | ```
187 |
188 | In this example:
189 |
190 | 1. The resource content (`QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION`) is defined in a separate file (`falcon_mcp/resources/intel.py`)
191 | 2. The resource is created as a `TextResource` with a clear URI, name, and description
192 | 3. The resource is registered with the server using the `_add_resource` method
193 | 4. The resource complements the `search_actors` tool by providing documentation for its `filter` parameter
194 |
195 | ## Integrating Resources with Tools
196 |
197 | Resources can be particularly valuable when integrated with tools. Here's how the Intel module references its resource in a tool method:
198 |
199 | ```python
200 | def query_actor_entities(
201 | self,
202 | filter: Optional[str] = Field(
203 | default=None,
204 | description="FQL query expression that should be used to limit the results. IMPORTANT: use the 'falcon://query_actor_entities_fql_documentation' resource when building this parameter.",
205 | ),
206 | # Other parameters...
207 | ) -> List[Dict[str, Any]]:
208 | """Get info about actors that match provided FQL filters.
209 |
210 | IMPORTANT: You must call the FQL Guide for Intel Query Actor Entities (falcon://intel/query_actor_entities/fql-guide) resource first
211 |
212 | Returns:
213 | Information about actors that match the provided filters.
214 | """
215 | # Method implementation...
216 | ```
217 |
218 | Note how:
219 |
220 | 1. The resource URI is referenced in the parameter description
221 | 2. The docstring explicitly mentions the resource that should be consulted
222 | 3. This creates a clear link between the tool and its supporting documentation
223 |
224 | ## Contributing Resource Changes
225 |
226 | When contributing new resources or changes to existing resources, please follow these guidelines:
227 |
228 | ### Conventional Commits for Resources
229 |
230 | This project uses [Conventional Commits](https://www.conventionalcommits.org/) for automated releases and clear commit history. When contributing resource-related changes, use these commit message patterns:
231 |
232 | **Adding New Resources:**
233 |
234 | ```bash
235 | git commit -m "feat(resources): add FQL guide for [module-name] module"
236 | git commit -m "feat(resources): add documentation for [specific-topic]"
237 | # Examples:
238 | git commit -m "feat(resources): add FQL guide for cloud module"
239 | git commit -m "feat(resources): add hosts search documentation"
240 | ```
241 |
242 | **Modifying Existing Resources:**
243 |
244 | ```bash
245 | git commit -m "refactor(resources): reword FQL guide in cloud resource"
246 | git commit -m "fix(resources): correct formatting in intel FQL documentation"
247 | git commit -m "docs(resources): update resource development guide"
248 | # Examples:
249 | git commit -m "refactor(resources): improve clarity in detections FQL guide"
250 | git commit -m "fix(resources): correct syntax examples in incidents resource"
251 | ```
252 |
253 | **Resource Tests and Infrastructure:**
254 |
255 | ```bash
256 | git commit -m "test(resources): add validation tests for resource content"
257 | git commit -m "chore(resources): update resource registration patterns"
258 | ```
259 |
260 | See the main [CONTRIBUTING.md](CONTRIBUTING.md) guide for complete conventional commits guidelines.
261 |
262 | ## Conclusion
263 |
264 | Resources are a powerful way to provide context and documentation to the model. By following the guidelines in this document, you can create effective resources that complement your tools and enhance the overall functionality of the Falcon MCP server.
265 |
266 | When developing resources:
267 |
268 | 1. Focus on providing clear, comprehensive information
269 | 2. Follow consistent naming and URI conventions
270 | 3. Integrate resources with related tools
271 | 4. Test resource registration to ensure proper functionality
272 |
273 | Resources, when used effectively alongside tools, create a more powerful and user-friendly experience by providing the necessary context and documentation for complex operations.
274 |
```
--------------------------------------------------------------------------------
/tests/modules/test_discover.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Unit tests for the Discover module.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | from mcp.server import FastMCP
9 |
10 | from falcon_mcp.client import FalconClient
11 | from falcon_mcp.modules.discover import DiscoverModule
12 |
13 |
14 | class TestDiscoverModule(unittest.TestCase):
15 | """Test cases for the Discover module."""
16 |
17 | def setUp(self):
18 | """Set up test fixtures."""
19 | self.client = MagicMock(spec=FalconClient)
20 | self.module = DiscoverModule(self.client)
21 | self.server = MagicMock(spec=FastMCP)
22 |
23 | def test_register_tools(self):
24 | """Test that tools are registered correctly."""
25 | self.module.register_tools(self.server)
26 | self.assertEqual(self.server.add_tool.call_count, 2)
27 | self.assertEqual(len(self.module.tools), 2)
28 | self.assertEqual(self.module.tools[0], "falcon_search_applications")
29 | self.assertEqual(self.module.tools[1], "falcon_search_unmanaged_assets")
30 |
31 | def test_register_resources(self):
32 | """Test that resources are registered correctly."""
33 | self.module.register_resources(self.server)
34 | self.assertEqual(self.server.add_resource.call_count, 2)
35 | self.assertEqual(len(self.module.resources), 2)
36 | self.assertEqual(
37 | str(self.module.resources[0]), "falcon://discover/applications/fql-guide"
38 | )
39 | self.assertEqual(
40 | str(self.module.resources[1]), "falcon://discover/hosts/fql-guide"
41 | )
42 |
43 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
44 | @patch("falcon_mcp.modules.discover.handle_api_response")
45 | def test_search_applications(self, mock_handle_response, mock_prepare_params):
46 | """Test search_applications method."""
47 | # Setup mocks
48 | mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
49 | mock_response = MagicMock()
50 | self.client.command.return_value = mock_response
51 | mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]
52 |
53 | # Call the method
54 | result = self.module.search_applications(filter="name:'Chrome'")
55 |
56 | # Assertions
57 | # Don't check the exact arguments, just verify it was called once
58 | self.assertEqual(mock_prepare_params.call_count, 1)
59 | self.client.command.assert_called_once_with(
60 | "combined_applications", parameters={"filter": "name:'Chrome'"}
61 | )
62 | mock_handle_response.assert_called_once_with(
63 | mock_response,
64 | operation="combined_applications",
65 | error_message="Failed to search applications",
66 | default_result=[],
67 | )
68 | self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])
69 |
70 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
71 | @patch("falcon_mcp.modules.discover.handle_api_response")
72 | def test_search_applications_with_error(self, mock_handle_response, mock_prepare_params):
73 | """Test search_applications method when an error occurs."""
74 | # Setup mocks
75 | mock_prepare_params.return_value = {"filter": "name:'Chrome'"}
76 | mock_response = MagicMock()
77 | self.client.command.return_value = mock_response
78 | error_response = {"error": "API Error", "message": "Something went wrong"}
79 | mock_handle_response.return_value = error_response
80 |
81 | # Call the method
82 | result = self.module.search_applications(filter="name:'Chrome'")
83 |
84 | # Assertions
85 | self.assertEqual(result, [error_response])
86 |
87 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
88 | @patch("falcon_mcp.modules.discover.handle_api_response")
89 | def test_search_applications_with_all_params(self, mock_handle_response, mock_prepare_params):
90 | """Test search_applications method with all parameters."""
91 | # Setup mocks
92 | mock_prepare_params.return_value = {
93 | "filter": "name:'Chrome'",
94 | "facet": "host_info",
95 | "limit": 50,
96 | "sort": "name.asc",
97 | }
98 | mock_response = MagicMock()
99 | self.client.command.return_value = mock_response
100 | mock_handle_response.return_value = [{"id": "app1", "name": "Chrome"}]
101 |
102 | # Call the method
103 | result = self.module.search_applications(
104 | filter="name:'Chrome'",
105 | facet="host_info",
106 | limit=50,
107 | sort="name.asc",
108 | )
109 |
110 | # Assertions
111 | # Don't check the exact arguments, just verify it was called once
112 | self.assertEqual(mock_prepare_params.call_count, 1)
113 | self.client.command.assert_called_once_with(
114 | "combined_applications",
115 | parameters={
116 | "filter": "name:'Chrome'",
117 | "facet": "host_info",
118 | "limit": 50,
119 | "sort": "name.asc",
120 | },
121 | )
122 | self.assertEqual(result, [{"id": "app1", "name": "Chrome"}])
123 |
124 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
125 | @patch("falcon_mcp.modules.discover.handle_api_response")
126 | def test_search_unmanaged_assets(self, mock_handle_response, mock_prepare_params):
127 | """Test search_unmanaged_assets method."""
128 | # Setup mocks
129 | mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
130 | mock_response = MagicMock()
131 | self.client.command.return_value = mock_response
132 | mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
133 |
134 | # Call the method
135 | result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")
136 |
137 | # Assertions
138 | # Don't check the exact arguments, just verify it was called once
139 | self.assertEqual(mock_prepare_params.call_count, 1)
140 | self.client.command.assert_called_once_with(
141 | "combined_hosts", parameters={"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
142 | )
143 | mock_handle_response.assert_called_once_with(
144 | mock_response,
145 | operation="combined_hosts",
146 | error_message="Failed to search unmanaged assets",
147 | default_result=[],
148 | )
149 | self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
150 |
151 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
152 | @patch("falcon_mcp.modules.discover.handle_api_response")
153 | def test_search_unmanaged_assets_without_filter(self, mock_handle_response, mock_prepare_params):
154 | """Test search_unmanaged_assets method without user filter."""
155 | # Setup mocks
156 | mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'"}
157 | mock_response = MagicMock()
158 | self.client.command.return_value = mock_response
159 | mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
160 |
161 | # Call the method with no filter
162 | result = self.module.search_unmanaged_assets()
163 |
164 | # Assertions
165 | # Don't check the exact arguments, just verify it was called once
166 | self.assertEqual(mock_prepare_params.call_count, 1)
167 | self.client.command.assert_called_once_with(
168 | "combined_hosts", parameters={"filter": "entity_type:'unmanaged'"}
169 | )
170 | self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
171 |
172 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
173 | @patch("falcon_mcp.modules.discover.handle_api_response")
174 | def test_search_unmanaged_assets_with_error(self, mock_handle_response, mock_prepare_params):
175 | """Test search_unmanaged_assets method when an error occurs."""
176 | # Setup mocks
177 | mock_prepare_params.return_value = {"filter": "entity_type:'unmanaged'+platform_name:'Windows'"}
178 | mock_response = MagicMock()
179 | self.client.command.return_value = mock_response
180 | error_response = {"error": "API Error", "message": "Something went wrong"}
181 | mock_handle_response.return_value = error_response
182 |
183 | # Call the method
184 | result = self.module.search_unmanaged_assets(filter="platform_name:'Windows'")
185 |
186 | # Assertions
187 | self.assertEqual(result, [error_response])
188 |
189 | @patch("falcon_mcp.modules.discover.prepare_api_parameters")
190 | @patch("falcon_mcp.modules.discover.handle_api_response")
191 | def test_search_unmanaged_assets_with_all_params(self, mock_handle_response, mock_prepare_params):
192 | """Test search_unmanaged_assets method with all parameters."""
193 | # Setup mocks
194 | mock_prepare_params.return_value = {
195 | "filter": "entity_type:'unmanaged'+criticality:'Critical'",
196 | "limit": 50,
197 | "offset": 10,
198 | "sort": "hostname.asc",
199 | }
200 | mock_response = MagicMock()
201 | self.client.command.return_value = mock_response
202 | mock_handle_response.return_value = [{"device_id": "host1", "hostname": "PC-001"}]
203 |
204 | # Call the method
205 | result = self.module.search_unmanaged_assets(
206 | filter="criticality:'Critical'",
207 | limit=50,
208 | offset=10,
209 | sort="hostname.asc",
210 | )
211 |
212 | # Assertions
213 | # Don't check the exact arguments, just verify it was called once
214 | self.assertEqual(mock_prepare_params.call_count, 1)
215 | self.client.command.assert_called_once_with(
216 | "combined_hosts",
217 | parameters={
218 | "filter": "entity_type:'unmanaged'+criticality:'Critical'",
219 | "limit": 50,
220 | "offset": 10,
221 | "sort": "hostname.asc",
222 | },
223 | )
224 | self.assertEqual(result, [{"device_id": "host1", "hostname": "PC-001"}])
225 |
226 |
227 | if __name__ == "__main__":
228 | unittest.main()
229 |
```